AMQSession_0_8.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client;
022 
023 
024 import javax.jms.*;
025 import javax.jms.IllegalStateException;
026 
027 import org.apache.qpid.AMQException;
028 import org.apache.qpid.AMQUndeliveredException;
029 import org.apache.qpid.client.failover.FailoverException;
030 import org.apache.qpid.client.failover.FailoverProtectedOperation;
031 import org.apache.qpid.client.failover.FailoverRetrySupport;
032 import org.apache.qpid.client.message.*;
033 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
034 import org.apache.qpid.client.protocol.AMQProtocolHandler;
035 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
036 import org.apache.qpid.common.AMQPFilterTypes;
037 import org.apache.qpid.framing.*;
038 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
039 import org.apache.qpid.jms.Session;
040 import org.apache.qpid.protocol.AMQConstant;
041 import org.apache.qpid.protocol.AMQMethodEvent;
042 import org.slf4j.Logger;
043 import org.slf4j.LoggerFactory;
044 
045 import java.util.Map;
046 
047 public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
048 {
049 
050     /** Used for debugging. */
051     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
052 
053     /**
054      * Creates a new session on a connection.
055      *
056      @param con                     The connection on which to create the session.
057      @param channelId               The unique identifier for the session.
058      @param transacted              Indicates whether or not the session is transactional.
059      @param acknowledgeMode         The acknoledgement mode for the session.
060      @param messageFactoryRegistry  The message factory factory for the session.
061      @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
062      @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
063      */
064     AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
065                MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
066     {
067 
068          super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
069     }
070 
071     /**
072      * Creates a new session on a connection with the default message factory factory.
073      *
074      @param con                     The connection on which to create the session.
075      @param channelId               The unique identifier for the session.
076      @param transacted              Indicates whether or not the session is transactional.
077      @param acknowledgeMode         The acknoledgement mode for the session.
078      @param defaultPrefetchHigh     The maximum number of messages to prefetched before suspending the session.
079      @param defaultPrefetchLow      The number of prefetched messages at which to resume the session.
080      */
081     AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
082                int defaultPrefetchLow)
083     {
084         this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
085              defaultPrefetchLow);
086     }
087 
088     private ProtocolVersion getProtocolVersion()
089     {
090         return getProtocolHandler().getProtocolVersion();
091     }
092 
093     public void acknowledgeMessage(long deliveryTag, boolean multiple)
094     {
095         BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
096 
097         final AMQFrame ackFrame = body.generateFrame(_channelId);
098 
099         if (_logger.isDebugEnabled())
100         {
101             _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
102         }
103 
104         getProtocolHandler().writeFrame(ackFrame);
105         _unacknowledgedMessageTags.remove(deliveryTag);
106     }
107 
108     public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
109             final AMQShortString exchangeName, final AMQDestination destthrows AMQException, FailoverException
110     {
111         getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
112                                         (getTicket(),queueName,exchangeName,routingKey,false,arguments).
113                                         generateFrame(_channelId), QueueBindOkBody.class);
114     }
115 
116     public void sendClose(long timeoutthrows AMQException, FailoverException
117     {
118         getProtocolHandler().closeSession(this);
119         getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
120                 new AMQShortString("JMS client closing channel")00).generateFrame(_channelId)
121                                        ChannelCloseOkBody.class, timeout);
122         // When control resumes at this point, a reply will have been received that
123         // indicates the broker has closed the channel successfully.
124     }
125 
126     public void sendCommit() throws AMQException, FailoverException
127     {
128         final AMQProtocolHandler handler = getProtocolHandler();
129 
130         handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
131     }
132 
133     public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> argumentsthrows AMQException,
134             FailoverException
135     {
136         FieldTable table = null;
137         if(arguments != null && !arguments.isEmpty())
138         {
139             table = new FieldTable();
140             for(Map.Entry<String, Object> entry : arguments.entrySet())
141             {
142                 table.setObject(entry.getKey(), entry.getValue());
143             }
144         }
145         QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
146         AMQFrame queueDeclare = body.generateFrame(_channelId);
147         getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
148     }
149 
150     public void sendRecover() throws AMQException, FailoverException
151     {
152         _unacknowledgedMessageTags.clear();
153 
154         if (isStrictAMQP())
155         {
156             // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
157 
158             BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
159             _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
160             _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
161         }
162         else
163         {
164             // in Qpid the 0-8 spec was hacked to have a recover-ok method... this is bad
165             // in 0-9 we used the cleaner addition of a new sync recover method with its own ok
166             if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
167             {
168                 BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
169                 _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
170             }
171             else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
172             {
173                 BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
174                 _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
175             }
176             else
177             {
178                 throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion());
179             }
180         }
181     }
182 
183     public void releaseForRollback()
184     {
185         while (true)
186         {
187             Long tag = _deliveredMessageTags.poll();
188             if (tag == null)
189             {
190                 break;
191             }
192 
193             rejectMessage(tag, true);
194         }
195 
196         if (_dispatcher != null)
197         {
198             _dispatcher.rollback();
199         }
200     }
201 
202     public void rejectMessage(long deliveryTag, boolean requeue)
203     {
204         if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE|| (_acknowledgeMode == SESSION_TRANSACTED))
205         {
206             if (_logger.isDebugEnabled())
207             {
208                 _logger.debug("Rejecting delivery tag:" + deliveryTag + ":SessionHC:" this.hashCode());
209             }
210 
211             BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
212             AMQFrame frame = body.generateFrame(_channelId);
213 
214             _connection.getProtocolHandler().writeFrame(frame);
215         }
216     }
217 
218     public boolean isQueueBound(final AMQDestination destinationthrows JMSException
219     {
220         return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
221     }
222 
223 
224     public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
225             throws JMSException
226     {
227         try
228         {
229             AMQMethodEvent response = new FailoverRetrySupport<AMQMethodEvent, AMQException>(
230                     new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
231                     {
232                         public AMQMethodEvent execute() throws AMQException, FailoverException
233                         {
234                             AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
235                                                     (exchangeName, routingKey, queueName).generateFrame(_channelId);
236 
237                             return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
238 
239                         }
240                     }, _connection).execute();
241 
242             // Extract and return the response code from the query.
243             ExchangeBoundOkBody responseBody = (ExchangeBoundOkBodyresponse.getMethod();
244 
245             return (responseBody.getReplyCode() == 0);
246         }
247         catch (AMQException e)
248         {
249             throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
250         }
251     }    
252 
253     @Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
254                                       AMQShortString queueName,
255                                       AMQProtocolHandler protocolHandler,
256                                       boolean nowait,
257                                       String messageSelector,
258                                       int tagthrows AMQException, FailoverException
259     {
260         FieldTable arguments = FieldTableFactory.newFieldTable();
261         if ((messageSelector != null&& !messageSelector.equals(""))
262         {
263             arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
264         }
265 
266         if (consumer.isAutoClose())
267         {
268             arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
269         }
270 
271         if (consumer.isNoConsume())
272         {
273             arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
274         }
275 
276         BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
277                                                                            queueName,
278                                                                            new AMQShortString(String.valueOf(tag)),
279                                                                            consumer.isNoLocal(),
280                                                                            consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
281                                                                            consumer.isExclusive(),
282                                                                            nowait,
283                                                                            arguments);
284 
285 
286         AMQFrame jmsConsume = body.generateFrame(_channelId);
287 
288         if (nowait)
289         {
290             protocolHandler.writeFrame(jmsConsume);
291         }
292         else
293         {
294             protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
295         }
296     }
297 
298     public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
299             final boolean nowaitthrows AMQException, FailoverException
300     {
301         ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
302                                                                                  name.toString().startsWith("amq."),
303                                                                                  false,false,false,nowait,null);
304         AMQFrame exchangeDeclare = body.generateFrame(_channelId);
305 
306         protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
307     }
308 
309     public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandlerthrows AMQException, FailoverException
310     {
311         QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
312 
313         AMQFrame queueDeclare = body.generateFrame(_channelId);
314 
315         protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
316     }
317 
318     public void sendQueueDelete(final AMQShortString queueNamethrows AMQException, FailoverException
319     {
320         QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(),
321                                                                          queueName,
322                                                                          false,
323                                                                          false,
324                                                                          true);
325         AMQFrame queueDeleteFrame = body.generateFrame(_channelId);
326 
327         getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
328     }
329 
330     public void sendSuspendChannel(boolean suspendthrows AMQException, FailoverException
331     {
332         ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend);
333         AMQFrame channelFlowFrame = body.generateFrame(_channelId);
334         _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
335     }
336 
337     public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
338             final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments,
339             final boolean noConsume, final boolean autoClose)  throws JMSException
340     {
341 
342         final AMQProtocolHandler protocolHandler = getProtocolHandler();
343        return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
344                                  _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow,
345                                  exclusive, _acknowledgeMode, noConsume, autoClose);
346     }
347 
348 
349     public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
350             final boolean immediate, final boolean waitUntilSent, long producerId)
351     {
352 
353        return new BasicMessageProducer_0_8(_connection, (AMQDestinationdestination, _transacted, _channelId,
354                                  this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
355     }
356 
357 
358     @Override public void messageReceived(UnprocessedMessage message)
359     {
360 
361         if (message instanceof ReturnMessage)
362         {
363             // Return of the bounced message.
364             returnBouncedMessage((ReturnMessagemessage);
365         }
366         else
367         {
368             super.messageReceived(message);
369         }
370     }
371 
372     private void returnBouncedMessage(final ReturnMessage msg)
373     {
374         _connection.performConnectionTask(new Runnable()
375         {
376             public void run()
377             {
378                 try
379                 {
380                     // Bounced message is processed here, away from the mina thread
381                     AbstractJMSMessage bouncedMessage =
382                             _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
383                                                                   msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
384                     AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
385                     AMQShortString reason = msg.getReplyText();
386                     _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
387 
388                     // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
389                     if (errorCode == AMQConstant.NO_CONSUMERS)
390                     {
391                         _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
392                     }
393                     else if (errorCode == AMQConstant.NO_ROUTE)
394                     {
395                         _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
396                     }
397                     else
398                     {
399                         _connection.exceptionReceived(
400                                 new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
401                     }
402 
403                 }
404                 catch (Exception e)
405                 {
406                     _logger.error(
407                             "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
408                             e);
409                 }
410             }
411         });
412     }
413 
414 
415 
416 
417     public void sendRollback() throws AMQException, FailoverException
418     {
419         TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
420         AMQFrame frame = body.generateFrame(getChannelId());
421         getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
422     }
423 
424     public  TopicSubscriber createDurableSubscriber(Topic topic, String namethrows JMSException
425     {
426 
427         checkNotClosed();
428         AMQTopic origTopic = checkValidTopic(topic);
429         AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
430         TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name);
431         if (subscriber != null)
432         {
433             if (subscriber.getTopic().equals(topic))
434             {
435                 throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
436                                                 + name);
437             }
438             else
439             {
440                 unsubscribe(name);
441             }
442         }
443         else
444         {
445             AMQShortString topicName;
446             if (topic instanceof AMQTopic)
447             {
448                 topicName = ((AMQTopictopic).getRoutingKey();
449             }
450             else
451             {
452                 topicName = new AMQShortString(topic.getTopicName());
453             }
454 
455             if (_strictAMQP)
456             {
457                 if (_strictAMQPFATAL)
458                 {
459                     throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
460                 }
461                 else
462                 {
463                     _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
464                                  "for creation durableSubscriber. Requesting queue deletion regardless.");
465                 }
466 
467                 deleteQueue(dest.getAMQQueueName());
468             }
469             else
470             {
471                 // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
472                 // says we must trash the subscription.
473                 if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
474                     && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
475                 {
476                     deleteQueue(dest.getAMQQueueName());
477                 }
478             }
479         }
480 
481         subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumercreateConsumer(dest));
482 
483         _subscriptions.put(name, subscriber);
484         _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
485 
486         return subscriber;
487     }
488 
489 
490 
491 
492     public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetchthrows AMQException
493     {
494         new FailoverRetrySupport<Object, AMQException>(
495                 new FailoverProtectedOperation<Object, AMQException>()
496                 {
497                     public Object execute() throws AMQException, FailoverException
498                     {
499 
500                         BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
501 
502                         // todo send low water mark when protocol allows.
503                         // todo Be aware of possible changes to parameter order as versions change.
504                         getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
505                   
506                         return null;
507                     }
508                  }, _connection).execute();
509     }
510 
511     class QueueDeclareOkHandler extends SpecificMethodFrameListener
512     {
513 
514         private long _messageCount;
515         private long _consumerCount;
516 
517         public QueueDeclareOkHandler()
518         {
519             super(getChannelId(), QueueDeclareOkBody.class);
520         }
521 
522         public boolean processMethod(int channelId, AMQMethodBody frame//throws AMQException
523         {
524             boolean matches = super.processMethod(channelId, frame);
525             if (matches)
526             {
527                 QueueDeclareOkBody declareOk = (QueueDeclareOkBodyframe;
528                 _messageCount = declareOk.getMessageCount();
529                 _consumerCount = declareOk.getConsumerCount();
530             }
531             return matches;
532         }
533 
534     }
535 
536     protected Long requestQueueDepth(AMQDestination amqdthrows AMQException, FailoverException
537     {
538         AMQFrame queueDeclare =
539             getMethodRegistry().createQueueDeclareBody(getTicket(),
540                                                        amqd.getAMQQueueName(),
541                                                        true,
542                                                        amqd.isDurable(),
543                                                        amqd.isExclusive(),
544                                                        amqd.isAutoDelete(),
545                                                        false,
546                                                        null).generateFrame(_channelId);
547         QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
548         getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);        
549         return okHandler._messageCount;
550     }
551 
552     protected final boolean tagLE(long tag1, long tag2)
553     {
554         return tag1 <= tag2;
555     }
556 
557     protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
558     {
559         return false;
560     }
561 
562     public AMQMessageDelegateFactory getMessageDelegateFactory()
563     {
564         return AMQMessageDelegateFactory.FACTORY_0_8;
565     }
566 
567 }