001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 *
019 *
020 */
021
022 package org.apache.qpid.client;
023
024 import javax.jms.Destination;
025 import javax.jms.InvalidDestinationException;
026 import javax.jms.JMSException;
027 import javax.jms.Message;
028 import javax.jms.Queue;
029 import javax.jms.QueueSender;
030
031 public class QueueSenderAdapter implements QueueSender
032 {
033
034 private BasicMessageProducer _delegate;
035 private Queue _queue;
036 private boolean closed = false;
037
038 public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue)
039 {
040 _delegate = msgProducer;
041 _queue = queue;
042 }
043
044 public Queue getQueue() throws JMSException
045 {
046 checkPreConditions();
047
048 return _queue;
049 }
050
051 public void send(Message msg) throws JMSException
052 {
053 checkPreConditions();
054 _delegate.send(msg);
055 }
056
057 public void send(Queue queue, Message msg) throws JMSException
058 {
059 checkPreConditions(queue);
060 _delegate.send(queue, msg);
061 }
062
063 public void publish(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
064 {
065 checkPreConditions();
066 _delegate.send(msg, deliveryMode, priority, timeToLive);
067 }
068
069 public void send(Queue queue, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
070 {
071 checkPreConditions(queue);
072 _delegate.send(queue, msg, deliveryMode, priority, timeToLive);
073 }
074
075 public void close() throws JMSException
076 {
077 _delegate.close();
078 closed = true;
079 }
080
081 public int getDeliveryMode() throws JMSException
082 {
083 checkPreConditions();
084
085 return _delegate.getDeliveryMode();
086 }
087
088 public Destination getDestination() throws JMSException
089 {
090 checkPreConditions();
091
092 return _delegate.getDestination();
093 }
094
095 public boolean getDisableMessageID() throws JMSException
096 {
097 checkPreConditions();
098
099 return _delegate.getDisableMessageID();
100 }
101
102 public boolean getDisableMessageTimestamp() throws JMSException
103 {
104 checkPreConditions();
105
106 return _delegate.getDisableMessageTimestamp();
107 }
108
109 public int getPriority() throws JMSException
110 {
111 checkPreConditions();
112
113 return _delegate.getPriority();
114 }
115
116 public long getTimeToLive() throws JMSException
117 {
118 checkPreConditions();
119
120 return _delegate.getTimeToLive();
121 }
122
123 public void send(Destination dest, Message msg) throws JMSException
124 {
125 checkPreConditions((Queue) dest);
126 _delegate.send(dest, msg);
127 }
128
129 public void send(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
130 {
131 checkPreConditions();
132 _delegate.send(msg, deliveryMode, priority, timeToLive);
133 }
134
135 public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
136 {
137 checkPreConditions((Queue) dest);
138 _delegate.send(dest, msg, deliveryMode, priority, timeToLive);
139 }
140
141 public void setDeliveryMode(int deliveryMode) throws JMSException
142 {
143 checkPreConditions();
144 _delegate.setDeliveryMode(deliveryMode);
145 }
146
147 public void setDisableMessageID(boolean disableMessageID) throws JMSException
148 {
149 checkPreConditions();
150 _delegate.setDisableMessageID(disableMessageID);
151 }
152
153 public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException
154 {
155 checkPreConditions();
156 _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
157 }
158
159 public void setPriority(int priority) throws JMSException
160 {
161 checkPreConditions();
162 _delegate.setPriority(priority);
163 }
164
165 public void setTimeToLive(long timeToLive) throws JMSException
166 {
167 checkPreConditions();
168 _delegate.setTimeToLive(timeToLive);
169 }
170
171 private void checkPreConditions() throws JMSException
172 {
173 checkPreConditions(_queue);
174 }
175
176 private void checkPreConditions(Queue queue) throws JMSException
177 {
178 if (closed)
179 {
180 throw new javax.jms.IllegalStateException("Publisher is closed");
181 }
182
183 AMQSession session = ((BasicMessageProducer) _delegate).getSession();
184
185 if ((session == null) || session.isClosed())
186 {
187 throw new javax.jms.IllegalStateException("Invalid Session");
188 }
189
190 if (queue == null)
191 {
192 throw new UnsupportedOperationException("Queue is null.");
193 }
194
195 if (!(queue instanceof AMQDestination))
196 {
197 throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue");
198 }
199
200 AMQDestination destination = (AMQDestination) queue;
201 if (!destination.isCheckedForQueueBinding() && checkQueueBeforePublish())
202 {
203
204 if (_delegate.getSession().isStrictAMQP())
205 {
206 _delegate._logger.warn("AMQP does not support destination validation before publish, ");
207 destination.setCheckedForQueueBinding(true);
208 }
209 else
210 {
211 if (_delegate.isBound(destination))
212 {
213 destination.setCheckedForQueueBinding(true);
214 }
215 else
216 {
217 throw new InvalidDestinationException("Queue: " + queue
218 + " is not a valid destination (no bindings on server");
219 }
220 }
221 }
222 }
223
224 private boolean checkQueueBeforePublish()
225 {
226 return "true".equalsIgnoreCase(System.getProperty("org.apache.qpid.client.verifyQueueBindingBeforePublish", "true"));
227 }
228 }
|