BasicMessageProducer.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client;
022 
023 import java.io.UnsupportedEncodingException;
024 import java.util.UUID;
025 
026 import javax.jms.BytesMessage;
027 import javax.jms.DeliveryMode;
028 import javax.jms.Destination;
029 import javax.jms.InvalidDestinationException;
030 import javax.jms.JMSException;
031 import javax.jms.MapMessage;
032 import javax.jms.Message;
033 import javax.jms.ObjectMessage;
034 import javax.jms.StreamMessage;
035 import javax.jms.TextMessage;
036 
037 import org.apache.qpid.AMQException;
038 import org.apache.qpid.client.message.AbstractJMSMessage;
039 import org.apache.qpid.client.message.MessageConverter;
040 import org.apache.qpid.client.protocol.AMQProtocolHandler;
041 import org.apache.qpid.framing.ContentBody;
042 import org.apache.qpid.util.UUIDGen;
043 import org.apache.qpid.util.UUIDs;
044 import org.slf4j.Logger;
045 import org.slf4j.LoggerFactory;
046 
047 public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
048 {
049     protected final Logger _logger = LoggerFactory.getLogger(getClass());
050 
051     private AMQConnection _connection;
052 
053     /**
054      * If true, messages will not get a timestamp. 
055      */
056     protected boolean _disableTimestamps;
057 
058     /**
059      * Priority of messages created by this producer.
060      */
061     private int _messagePriority;
062 
063     /**
064      * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
065      */
066     private long _timeToLive;
067 
068     /**
069      * Delivery mode used for this producer.
070      */
071     private int _deliveryMode = DeliveryMode.PERSISTENT;
072 
073     /**
074      * The Destination used for this consumer, if specified upon creation.
075      */
076     protected AMQDestination _destination;
077 
078     /**
079      * Default encoding used for messages produced by this producer.
080      */
081     private String _encoding;
082 
083     /**
084      * Default encoding used for message produced by this producer.
085      */
086     private String _mimeType;
087 
088     protected AMQProtocolHandler _protocolHandler;
089 
090     /**
091      * True if this producer was created from a transacted session
092      */
093     private boolean _transacted;
094 
095     protected int _channelId;
096 
097     /**
098      * This is an id generated by the session and is used to tie individual producers to the session. This means we
099      * can deregister a producer with the session when the producer is clsoed. We need to be able to tie producers
100      * to the session so that when an error is propagated to the session it can close the producer (meaning that
101      * a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
102      */
103     private long _producerId;
104 
105     /**
106      * The session used to create this producer 
107      */
108     protected AMQSession _session;
109 
110     private final boolean _immediate;
111 
112     private final boolean _mandatory;
113 
114     private final boolean _waitUntilSent;
115 
116     private boolean _disableMessageId;
117 
118     private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
119     
120     protected String _userID;  // ref user id used in the connection.
121 
122     private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
123 
124     protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
125                                    AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
126                                    boolean waitUntilSent)
127     {
128         _connection = connection;
129         _destination = destination;
130         _transacted = transacted;
131         _protocolHandler = protocolHandler;
132         _channelId = channelId;
133         _session = session;
134         _producerId = producerId;
135         if (destination != null  && !(destination instanceof AMQUndefinedDestination))
136         {
137             declareDestination(destination);
138         }
139 
140         _immediate = immediate;
141         _mandatory = mandatory;
142         _waitUntilSent = waitUntilSent;
143         _userID = connection.getUsername();
144     }
145 
146     void resubscribe() throws AMQException
147     {
148         if (_destination != null && !(_destination instanceof AMQUndefinedDestination))
149         {
150             declareDestination(_destination);
151         }
152     }
153 
154     abstract void declareDestination(AMQDestination destination);
155 
156     public void setDisableMessageID(boolean bthrows JMSException
157     {
158         checkPreConditions();
159         checkNotClosed();
160         _disableMessageId = b;
161     }
162 
163     public boolean getDisableMessageID() throws JMSException
164     {
165         checkNotClosed();
166 
167         return _disableMessageId;
168     }
169 
170     public void setDisableMessageTimestamp(boolean bthrows JMSException
171     {
172         checkPreConditions();
173         _disableTimestamps = b;
174     }
175 
176     public boolean getDisableMessageTimestamp() throws JMSException
177     {
178         checkNotClosed();
179 
180         return _disableTimestamps;
181     }
182 
183     public void setDeliveryMode(int ithrows JMSException
184     {
185         checkPreConditions();
186         if ((i != DeliveryMode.NON_PERSISTENT&& (i != DeliveryMode.PERSISTENT))
187         {
188             throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i
189                                    " is illegal");
190         }
191 
192         _deliveryMode = i;
193     }
194 
195     public int getDeliveryMode() throws JMSException
196     {
197         checkNotClosed();
198 
199         return _deliveryMode;
200     }
201 
202     public void setPriority(int ithrows JMSException
203     {
204         checkPreConditions();
205         if ((i < 0|| (i > 9))
206         {
207             throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
208         }
209 
210         _messagePriority = i;
211     }
212 
213     public int getPriority() throws JMSException
214     {
215         checkNotClosed();
216 
217         return _messagePriority;
218     }
219 
220     public void setTimeToLive(long lthrows JMSException
221     {
222         checkPreConditions();
223         if (l < 0)
224         {
225             throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
226         }
227 
228         _timeToLive = l;
229     }
230 
231     public long getTimeToLive() throws JMSException
232     {
233         checkNotClosed();
234 
235         return _timeToLive;
236     }
237 
238     public Destination getDestination() throws JMSException
239     {
240         checkNotClosed();
241 
242         return _destination;
243     }
244 
245     public void close() throws JMSException
246     {
247         _closed.set(true);
248         _session.deregisterProducer(_producerId);
249     }
250 
251     public void send(Message messagethrows JMSException
252     {
253         checkPreConditions();
254         checkInitialDestination();
255 
256         synchronized (_connection.getFailoverMutex())
257         {
258             sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
259         }
260     }
261 
262     public void send(Message message, int deliveryModethrows JMSException
263     {
264         checkPreConditions();
265         checkInitialDestination();
266 
267         synchronized (_connection.getFailoverMutex())
268         {
269             sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
270         }
271     }
272 
273     public void send(Message message, int deliveryMode, boolean immediatethrows JMSException
274     {
275         checkPreConditions();
276         checkInitialDestination();
277         synchronized (_connection.getFailoverMutex())
278         {
279             sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate);
280         }
281     }
282 
283     public void send(Message message, int deliveryMode, int priority, long timeToLivethrows JMSException
284     {
285         checkPreConditions();
286         checkInitialDestination();
287         synchronized (_connection.getFailoverMutex())
288         {
289             sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
290         }
291     }
292 
293     public void send(Destination destination, Message messagethrows JMSException
294     {
295         checkPreConditions();
296         checkDestination(destination);
297         synchronized (_connection.getFailoverMutex())
298         {
299             validateDestination(destination);
300             sendImpl((AMQDestinationdestination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
301                      _immediate);
302         }
303     }
304 
305     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
306         throws JMSException
307     {
308         checkPreConditions();
309         checkDestination(destination);
310         synchronized (_connection.getFailoverMutex())
311         {
312             validateDestination(destination);
313             sendImpl((AMQDestinationdestination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
314         }
315     }
316 
317     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
318                      boolean mandatorythrows JMSException
319     {
320         checkPreConditions();
321         checkDestination(destination);
322         synchronized (_connection.getFailoverMutex())
323         {
324             validateDestination(destination);
325             sendImpl((AMQDestinationdestination, message, deliveryMode, priority, timeToLive, mandatory, _immediate);
326         }
327     }
328 
329     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
330                      boolean mandatory, boolean immediatethrows JMSException
331     {
332         checkPreConditions();
333         checkDestination(destination);
334         synchronized (_connection.getFailoverMutex())
335         {
336             validateDestination(destination);
337             sendImpl((AMQDestinationdestination, message, deliveryMode, priority, timeToLive, mandatory, immediate);
338         }
339     }
340 
341     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
342                      boolean mandatory, boolean immediate, boolean waitUntilSentthrows JMSException
343     {
344         checkPreConditions();
345         checkDestination(destination);
346         synchronized (_connection.getFailoverMutex())
347         {
348             validateDestination(destination);
349             sendImpl((AMQDestinationdestination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
350                      waitUntilSent);
351         }
352     }
353 
354     private AbstractJMSMessage convertToNativeMessage(Message messagethrows JMSException
355     {
356         if (message instanceof AbstractJMSMessage)
357         {
358             return (AbstractJMSMessagemessage;
359         }
360         else
361         {
362             AbstractJMSMessage newMessage;
363 
364             if (message instanceof BytesMessage)
365             {
366                 newMessage = new MessageConverter(_session, (BytesMessagemessage).getConvertedMessage();
367             }
368             else if (message instanceof MapMessage)
369             {
370                 newMessage = new MessageConverter(_session, (MapMessagemessage).getConvertedMessage();
371             }
372             else if (message instanceof ObjectMessage)
373             {
374                 newMessage = new MessageConverter(_session, (ObjectMessagemessage).getConvertedMessage();
375             }
376             else if (message instanceof TextMessage)
377             {
378                 newMessage = new MessageConverter(_session, (TextMessagemessage).getConvertedMessage();
379             }
380             else if (message instanceof StreamMessage)
381             {
382                 newMessage = new MessageConverter(_session, (StreamMessagemessage).getConvertedMessage();
383             }
384             else
385             {
386                 newMessage = new MessageConverter(_session, message).getConvertedMessage();
387             }
388 
389             if (newMessage != null)
390             {
391                 return newMessage;
392             }
393             else
394             {
395                 throw new JMSException("Unable to send message, due to class conversion error: "
396                                        + message.getClass().getName());
397             }
398         }
399     }
400 
401     private void validateDestination(Destination destinationthrows JMSException
402     {
403         if (!(destination instanceof AMQDestination))
404         {
405             throw new JMSException("Unsupported destination class: "
406                                    ((destination != null? destination.getClass() null));
407         }
408 
409         AMQDestination amqDestination = (AMQDestinationdestination;
410         if(!amqDestination.isExchangeExistsChecked())
411         {
412             declareDestination(amqDestination);
413             amqDestination.setExchangeExistsChecked(true);
414         }
415     }
416 
417     protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
418                             boolean mandatory, boolean immediatethrows JMSException
419     {
420         sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
421     }
422 
423     /**
424      * The caller of this method must hold the failover mutex.
425      *
426      @param destination
427      @param origMessage
428      @param deliveryMode
429      @param priority
430      @param timeToLive
431      @param mandatory
432      @param immediate
433      *
434      @throws JMSException
435      */
436     protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
437                             boolean mandatory, boolean immediate, boolean waitthrows JMSException
438     {
439         checkTemporaryDestination(destination);
440         origMessage.setJMSDestination(destination);
441 
442         AbstractJMSMessage message = convertToNativeMessage(origMessage);
443 
444         if (_transacted)
445         {
446             if (_session.hasFailedOver() && _session.isDirty())
447             {
448                 throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
449                                           new AMQSessionDirtyException("Failover has occurred and session is dirty " +
450                                                                        "so unable to send."));
451             }
452         }
453 
454         UUID messageId = null;
455         if (_disableMessageId)
456         {
457             message.setJMSMessageID((UUID)null);
458         }
459         else
460         {
461             messageId = _messageIdGenerator.generate();
462             message.setJMSMessageID(messageId);
463         }
464 
465         sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
466 
467         if (message != origMessage)
468         {
469             _logger.debug("Updating original message");
470             origMessage.setJMSPriority(message.getJMSPriority());
471             origMessage.setJMSTimestamp(message.getJMSTimestamp());
472             _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
473             origMessage.setJMSExpiration(message.getJMSExpiration());
474             origMessage.setJMSMessageID(message.getJMSMessageID());
475         }
476 
477         if (_transacted)
478         {
479             _session.markDirty();
480         }
481     }
482 
483     abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
484                               UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
485                               boolean immediate, boolean waitthrows JMSException;
486 
487     private void checkTemporaryDestination(AMQDestination destinationthrows JMSException
488     {
489         if (destination instanceof TemporaryDestination)
490         {
491             _logger.debug("destination is temporary destination");
492             TemporaryDestination tempDest = (TemporaryDestinationdestination;
493             if (tempDest.getSession().isClosed())
494             {
495                 _logger.debug("session is closed");
496                 throw new JMSException("Session for temporary destination has been closed");
497             }
498 
499             if (tempDest.isDeleted())
500             {
501                 _logger.debug("destination is deleted");
502                 throw new JMSException("Cannot send to a deleted temporary destination");
503             }
504         }
505     }
506 
507     public void setMimeType(String mimeTypethrows JMSException
508     {
509         checkNotClosed();
510         _mimeType = mimeType;
511     }
512 
513     public void setEncoding(String encodingthrows JMSException, UnsupportedEncodingException
514     {
515         checkNotClosed();
516         _encoding = encoding;
517     }
518 
519     private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
520     {
521         checkNotClosed();
522 
523         if ((_session == null|| _session.isClosed())
524         {
525             throw new javax.jms.IllegalStateException("Invalid Session");
526         }
527     }
528 
529     private void checkInitialDestination()
530     {
531         if (_destination == null)
532         {
533             throw new UnsupportedOperationException("Destination is null");
534         }
535     }
536 
537     private void checkDestination(Destination suppliedDestinationthrows InvalidDestinationException
538     {
539         if ((_destination != null&& (suppliedDestination != null))
540         {
541             throw new UnsupportedOperationException(
542                     "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
543         }
544 
545         if (suppliedDestination == null)
546         {
547             throw new InvalidDestinationException("Supplied Destination was invalid");
548         }
549 
550     }
551 
552     public AMQSession getSession()
553     {
554         return _session;
555     }
556 
557     public boolean isBound(AMQDestination destinationthrows JMSException
558     {
559         return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
560     }
561 }