QueueSenderAdapter.java
001 /*
002  *  Licensed to the Apache Software Foundation (ASF) under one
003  *  or more contributor license agreements.  See the NOTICE file
004  *  distributed with this work for additional information
005  *  regarding copyright ownership.  The ASF licenses this file
006  *  to you under the Apache License, Version 2.0 (the
007  *  "License"); you may not use this file except in compliance
008  *  with the License.  You may obtain a copy of the License at
009  *
010  *    http://www.apache.org/licenses/LICENSE-2.0
011  *
012  *  Unless required by applicable law or agreed to in writing,
013  *  software distributed under the License is distributed on an
014  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015  *  KIND, either express or implied.  See the License for the
016  *  specific language governing permissions and limitations
017  *  under the License.
018  *
019  *
020  */
021 
022 package org.apache.qpid.client;
023 
024 import javax.jms.Destination;
025 import javax.jms.InvalidDestinationException;
026 import javax.jms.JMSException;
027 import javax.jms.Message;
028 import javax.jms.Queue;
029 import javax.jms.QueueSender;
030 
031 public class QueueSenderAdapter implements QueueSender
032 {
033 
034     private BasicMessageProducer _delegate;
035     private Queue _queue;
036     private boolean closed = false;
037 
038     public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue)
039     {
040         _delegate = msgProducer;
041         _queue = queue;
042     }
043 
044     public Queue getQueue() throws JMSException
045     {
046         checkPreConditions();
047 
048         return _queue;
049     }
050 
051     public void send(Message msgthrows JMSException
052     {
053         checkPreConditions();
054         _delegate.send(msg);
055     }
056 
057     public void send(Queue queue, Message msgthrows JMSException
058     {
059         checkPreConditions(queue);
060         _delegate.send(queue, msg);
061     }
062 
063     public void publish(Message msg, int deliveryMode, int priority, long timeToLivethrows JMSException
064     {
065         checkPreConditions();
066         _delegate.send(msg, deliveryMode, priority, timeToLive);
067     }
068 
069     public void send(Queue queue, Message msg, int deliveryMode, int priority, long timeToLivethrows JMSException
070     {
071         checkPreConditions(queue);
072         _delegate.send(queue, msg, deliveryMode, priority, timeToLive);
073     }
074 
075     public void close() throws JMSException
076     {
077         _delegate.close();
078         closed = true;
079     }
080 
081     public int getDeliveryMode() throws JMSException
082     {
083         checkPreConditions();
084 
085         return _delegate.getDeliveryMode();
086     }
087 
088     public Destination getDestination() throws JMSException
089     {
090         checkPreConditions();
091 
092         return _delegate.getDestination();
093     }
094 
095     public boolean getDisableMessageID() throws JMSException
096     {
097         checkPreConditions();
098 
099         return _delegate.getDisableMessageID();
100     }
101 
102     public boolean getDisableMessageTimestamp() throws JMSException
103     {
104         checkPreConditions();
105 
106         return _delegate.getDisableMessageTimestamp();
107     }
108 
109     public int getPriority() throws JMSException
110     {
111         checkPreConditions();
112 
113         return _delegate.getPriority();
114     }
115 
116     public long getTimeToLive() throws JMSException
117     {
118         checkPreConditions();
119 
120         return _delegate.getTimeToLive();
121     }
122 
123     public void send(Destination dest, Message msgthrows JMSException
124     {
125         checkPreConditions((Queuedest);
126         _delegate.send(dest, msg);
127     }
128 
129     public void send(Message msg, int deliveryMode, int priority, long timeToLivethrows JMSException
130     {
131         checkPreConditions();
132         _delegate.send(msg, deliveryMode, priority, timeToLive);
133     }
134 
135     public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLivethrows JMSException
136     {
137         checkPreConditions((Queuedest);
138         _delegate.send(dest, msg, deliveryMode, priority, timeToLive);
139     }
140 
141     public void setDeliveryMode(int deliveryModethrows JMSException
142     {
143         checkPreConditions();
144         _delegate.setDeliveryMode(deliveryMode);
145     }
146 
147     public void setDisableMessageID(boolean disableMessageIDthrows JMSException
148     {
149         checkPreConditions();
150         _delegate.setDisableMessageID(disableMessageID);
151     }
152 
153     public void setDisableMessageTimestamp(boolean disableMessageTimestampthrows JMSException
154     {
155         checkPreConditions();
156         _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
157     }
158 
159     public void setPriority(int prioritythrows JMSException
160     {
161         checkPreConditions();
162         _delegate.setPriority(priority);
163     }
164 
165     public void setTimeToLive(long timeToLivethrows JMSException
166     {
167         checkPreConditions();
168         _delegate.setTimeToLive(timeToLive);
169     }
170 
171     private void checkPreConditions() throws JMSException
172     {
173         checkPreConditions(_queue);
174     }
175 
176     private void checkPreConditions(Queue queuethrows JMSException
177     {
178         if (closed)
179         {
180             throw new javax.jms.IllegalStateException("Publisher is closed");
181         }
182 
183         AMQSession session = ((BasicMessageProducer_delegate).getSession();
184 
185         if ((session == null|| session.isClosed())
186         {
187             throw new javax.jms.IllegalStateException("Invalid Session");
188         }
189 
190         if (queue == null)
191         {
192             throw new UnsupportedOperationException("Queue is null.");
193         }
194 
195         if (!(queue instanceof AMQDestination))
196         {
197             throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue");
198         }
199 
200         AMQDestination destination = (AMQDestinationqueue;
201         if (!destination.isCheckedForQueueBinding() && checkQueueBeforePublish())
202         {
203 
204             if (_delegate.getSession().isStrictAMQP())
205             {
206                 _delegate._logger.warn("AMQP does not support destination validation before publish, ");
207                 destination.setCheckedForQueueBinding(true);
208             }
209             else
210             {
211                 if (_delegate.isBound(destination))
212                 {
213                     destination.setCheckedForQueueBinding(true);
214                 }
215                 else
216                 {
217                     throw new InvalidDestinationException("Queue: " + queue
218                         " is not a valid destination (no bindings on server");
219                 }
220             }
221         }
222     }
223 
224     private boolean checkQueueBeforePublish()
225     {
226         return "true".equalsIgnoreCase(System.getProperty("org.apache.qpid.client.verifyQueueBindingBeforePublish""true"));
227     }
228 }