001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client;
022
023 import java.io.UnsupportedEncodingException;
024 import java.util.UUID;
025
026 import javax.jms.BytesMessage;
027 import javax.jms.DeliveryMode;
028 import javax.jms.Destination;
029 import javax.jms.InvalidDestinationException;
030 import javax.jms.JMSException;
031 import javax.jms.MapMessage;
032 import javax.jms.Message;
033 import javax.jms.ObjectMessage;
034 import javax.jms.StreamMessage;
035 import javax.jms.TextMessage;
036
037 import org.apache.qpid.AMQException;
038 import org.apache.qpid.client.message.AbstractJMSMessage;
039 import org.apache.qpid.client.message.MessageConverter;
040 import org.apache.qpid.client.protocol.AMQProtocolHandler;
041 import org.apache.qpid.framing.ContentBody;
042 import org.apache.qpid.util.UUIDGen;
043 import org.apache.qpid.util.UUIDs;
044 import org.slf4j.Logger;
045 import org.slf4j.LoggerFactory;
046
047 public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
048 {
049 protected final Logger _logger = LoggerFactory.getLogger(getClass());
050
051 private AMQConnection _connection;
052
053 /**
054 * If true, messages will not get a timestamp.
055 */
056 protected boolean _disableTimestamps;
057
058 /**
059 * Priority of messages created by this producer.
060 */
061 private int _messagePriority;
062
063 /**
064 * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
065 */
066 private long _timeToLive;
067
068 /**
069 * Delivery mode used for this producer.
070 */
071 private int _deliveryMode = DeliveryMode.PERSISTENT;
072
073 /**
074 * The Destination used for this consumer, if specified upon creation.
075 */
076 protected AMQDestination _destination;
077
078 /**
079 * Default encoding used for messages produced by this producer.
080 */
081 private String _encoding;
082
083 /**
084 * Default encoding used for message produced by this producer.
085 */
086 private String _mimeType;
087
088 protected AMQProtocolHandler _protocolHandler;
089
090 /**
091 * True if this producer was created from a transacted session
092 */
093 private boolean _transacted;
094
095 protected int _channelId;
096
097 /**
098 * This is an id generated by the session and is used to tie individual producers to the session. This means we
099 * can deregister a producer with the session when the producer is clsoed. We need to be able to tie producers
100 * to the session so that when an error is propagated to the session it can close the producer (meaning that
101 * a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
102 */
103 private long _producerId;
104
105 /**
106 * The session used to create this producer
107 */
108 protected AMQSession _session;
109
110 private final boolean _immediate;
111
112 private final boolean _mandatory;
113
114 private final boolean _waitUntilSent;
115
116 private boolean _disableMessageId;
117
118 private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
119
120 protected String _userID; // ref user id used in the connection.
121
122 private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
123
124 protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
125 AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
126 boolean waitUntilSent)
127 {
128 _connection = connection;
129 _destination = destination;
130 _transacted = transacted;
131 _protocolHandler = protocolHandler;
132 _channelId = channelId;
133 _session = session;
134 _producerId = producerId;
135 if (destination != null && !(destination instanceof AMQUndefinedDestination))
136 {
137 declareDestination(destination);
138 }
139
140 _immediate = immediate;
141 _mandatory = mandatory;
142 _waitUntilSent = waitUntilSent;
143 _userID = connection.getUsername();
144 }
145
146 void resubscribe() throws AMQException
147 {
148 if (_destination != null && !(_destination instanceof AMQUndefinedDestination))
149 {
150 declareDestination(_destination);
151 }
152 }
153
154 abstract void declareDestination(AMQDestination destination);
155
156 public void setDisableMessageID(boolean b) throws JMSException
157 {
158 checkPreConditions();
159 checkNotClosed();
160 _disableMessageId = b;
161 }
162
163 public boolean getDisableMessageID() throws JMSException
164 {
165 checkNotClosed();
166
167 return _disableMessageId;
168 }
169
170 public void setDisableMessageTimestamp(boolean b) throws JMSException
171 {
172 checkPreConditions();
173 _disableTimestamps = b;
174 }
175
176 public boolean getDisableMessageTimestamp() throws JMSException
177 {
178 checkNotClosed();
179
180 return _disableTimestamps;
181 }
182
183 public void setDeliveryMode(int i) throws JMSException
184 {
185 checkPreConditions();
186 if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT))
187 {
188 throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i
189 + " is illegal");
190 }
191
192 _deliveryMode = i;
193 }
194
195 public int getDeliveryMode() throws JMSException
196 {
197 checkNotClosed();
198
199 return _deliveryMode;
200 }
201
202 public void setPriority(int i) throws JMSException
203 {
204 checkPreConditions();
205 if ((i < 0) || (i > 9))
206 {
207 throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
208 }
209
210 _messagePriority = i;
211 }
212
213 public int getPriority() throws JMSException
214 {
215 checkNotClosed();
216
217 return _messagePriority;
218 }
219
220 public void setTimeToLive(long l) throws JMSException
221 {
222 checkPreConditions();
223 if (l < 0)
224 {
225 throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
226 }
227
228 _timeToLive = l;
229 }
230
231 public long getTimeToLive() throws JMSException
232 {
233 checkNotClosed();
234
235 return _timeToLive;
236 }
237
238 public Destination getDestination() throws JMSException
239 {
240 checkNotClosed();
241
242 return _destination;
243 }
244
245 public void close() throws JMSException
246 {
247 _closed.set(true);
248 _session.deregisterProducer(_producerId);
249 }
250
251 public void send(Message message) throws JMSException
252 {
253 checkPreConditions();
254 checkInitialDestination();
255
256 synchronized (_connection.getFailoverMutex())
257 {
258 sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
259 }
260 }
261
262 public void send(Message message, int deliveryMode) throws JMSException
263 {
264 checkPreConditions();
265 checkInitialDestination();
266
267 synchronized (_connection.getFailoverMutex())
268 {
269 sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
270 }
271 }
272
273 public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
274 {
275 checkPreConditions();
276 checkInitialDestination();
277 synchronized (_connection.getFailoverMutex())
278 {
279 sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate);
280 }
281 }
282
283 public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
284 {
285 checkPreConditions();
286 checkInitialDestination();
287 synchronized (_connection.getFailoverMutex())
288 {
289 sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
290 }
291 }
292
293 public void send(Destination destination, Message message) throws JMSException
294 {
295 checkPreConditions();
296 checkDestination(destination);
297 synchronized (_connection.getFailoverMutex())
298 {
299 validateDestination(destination);
300 sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
301 _immediate);
302 }
303 }
304
305 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
306 throws JMSException
307 {
308 checkPreConditions();
309 checkDestination(destination);
310 synchronized (_connection.getFailoverMutex())
311 {
312 validateDestination(destination);
313 sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
314 }
315 }
316
317 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
318 boolean mandatory) throws JMSException
319 {
320 checkPreConditions();
321 checkDestination(destination);
322 synchronized (_connection.getFailoverMutex())
323 {
324 validateDestination(destination);
325 sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate);
326 }
327 }
328
329 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
330 boolean mandatory, boolean immediate) throws JMSException
331 {
332 checkPreConditions();
333 checkDestination(destination);
334 synchronized (_connection.getFailoverMutex())
335 {
336 validateDestination(destination);
337 sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate);
338 }
339 }
340
341 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
342 boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
343 {
344 checkPreConditions();
345 checkDestination(destination);
346 synchronized (_connection.getFailoverMutex())
347 {
348 validateDestination(destination);
349 sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
350 waitUntilSent);
351 }
352 }
353
354 private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
355 {
356 if (message instanceof AbstractJMSMessage)
357 {
358 return (AbstractJMSMessage) message;
359 }
360 else
361 {
362 AbstractJMSMessage newMessage;
363
364 if (message instanceof BytesMessage)
365 {
366 newMessage = new MessageConverter(_session, (BytesMessage) message).getConvertedMessage();
367 }
368 else if (message instanceof MapMessage)
369 {
370 newMessage = new MessageConverter(_session, (MapMessage) message).getConvertedMessage();
371 }
372 else if (message instanceof ObjectMessage)
373 {
374 newMessage = new MessageConverter(_session, (ObjectMessage) message).getConvertedMessage();
375 }
376 else if (message instanceof TextMessage)
377 {
378 newMessage = new MessageConverter(_session, (TextMessage) message).getConvertedMessage();
379 }
380 else if (message instanceof StreamMessage)
381 {
382 newMessage = new MessageConverter(_session, (StreamMessage) message).getConvertedMessage();
383 }
384 else
385 {
386 newMessage = new MessageConverter(_session, message).getConvertedMessage();
387 }
388
389 if (newMessage != null)
390 {
391 return newMessage;
392 }
393 else
394 {
395 throw new JMSException("Unable to send message, due to class conversion error: "
396 + message.getClass().getName());
397 }
398 }
399 }
400
401 private void validateDestination(Destination destination) throws JMSException
402 {
403 if (!(destination instanceof AMQDestination))
404 {
405 throw new JMSException("Unsupported destination class: "
406 + ((destination != null) ? destination.getClass() : null));
407 }
408
409 AMQDestination amqDestination = (AMQDestination) destination;
410 if(!amqDestination.isExchangeExistsChecked())
411 {
412 declareDestination(amqDestination);
413 amqDestination.setExchangeExistsChecked(true);
414 }
415 }
416
417 protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
418 boolean mandatory, boolean immediate) throws JMSException
419 {
420 sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
421 }
422
423 /**
424 * The caller of this method must hold the failover mutex.
425 *
426 * @param destination
427 * @param origMessage
428 * @param deliveryMode
429 * @param priority
430 * @param timeToLive
431 * @param mandatory
432 * @param immediate
433 *
434 * @throws JMSException
435 */
436 protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
437 boolean mandatory, boolean immediate, boolean wait) throws JMSException
438 {
439 checkTemporaryDestination(destination);
440 origMessage.setJMSDestination(destination);
441
442 AbstractJMSMessage message = convertToNativeMessage(origMessage);
443
444 if (_transacted)
445 {
446 if (_session.hasFailedOver() && _session.isDirty())
447 {
448 throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
449 new AMQSessionDirtyException("Failover has occurred and session is dirty " +
450 "so unable to send."));
451 }
452 }
453
454 UUID messageId = null;
455 if (_disableMessageId)
456 {
457 message.setJMSMessageID((UUID)null);
458 }
459 else
460 {
461 messageId = _messageIdGenerator.generate();
462 message.setJMSMessageID(messageId);
463 }
464
465 sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
466
467 if (message != origMessage)
468 {
469 _logger.debug("Updating original message");
470 origMessage.setJMSPriority(message.getJMSPriority());
471 origMessage.setJMSTimestamp(message.getJMSTimestamp());
472 _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
473 origMessage.setJMSExpiration(message.getJMSExpiration());
474 origMessage.setJMSMessageID(message.getJMSMessageID());
475 }
476
477 if (_transacted)
478 {
479 _session.markDirty();
480 }
481 }
482
483 abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
484 UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
485 boolean immediate, boolean wait) throws JMSException;
486
487 private void checkTemporaryDestination(AMQDestination destination) throws JMSException
488 {
489 if (destination instanceof TemporaryDestination)
490 {
491 _logger.debug("destination is temporary destination");
492 TemporaryDestination tempDest = (TemporaryDestination) destination;
493 if (tempDest.getSession().isClosed())
494 {
495 _logger.debug("session is closed");
496 throw new JMSException("Session for temporary destination has been closed");
497 }
498
499 if (tempDest.isDeleted())
500 {
501 _logger.debug("destination is deleted");
502 throw new JMSException("Cannot send to a deleted temporary destination");
503 }
504 }
505 }
506
507 public void setMimeType(String mimeType) throws JMSException
508 {
509 checkNotClosed();
510 _mimeType = mimeType;
511 }
512
513 public void setEncoding(String encoding) throws JMSException, UnsupportedEncodingException
514 {
515 checkNotClosed();
516 _encoding = encoding;
517 }
518
519 private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
520 {
521 checkNotClosed();
522
523 if ((_session == null) || _session.isClosed())
524 {
525 throw new javax.jms.IllegalStateException("Invalid Session");
526 }
527 }
528
529 private void checkInitialDestination()
530 {
531 if (_destination == null)
532 {
533 throw new UnsupportedOperationException("Destination is null");
534 }
535 }
536
537 private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
538 {
539 if ((_destination != null) && (suppliedDestination != null))
540 {
541 throw new UnsupportedOperationException(
542 "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
543 }
544
545 if (suppliedDestination == null)
546 {
547 throw new InvalidDestinationException("Supplied Destination was invalid");
548 }
549
550 }
551
552 public AMQSession getSession()
553 {
554 return _session;
555 }
556
557 public boolean isBound(AMQDestination destination) throws JMSException
558 {
559 return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
560 }
561 }
|