AMQSession.java
0001 /*
0002  *
0003  * Licensed to the Apache Software Foundation (ASF) under one
0004  * or more contributor license agreements.  See the NOTICE file
0005  * distributed with this work for additional information
0006  * regarding copyright ownership.  The ASF licenses this file
0007  * to you under the Apache License, Version 2.0 (the
0008  * "License"); you may not use this file except in compliance
0009  * with the License.  You may obtain a copy of the License at
0010  *
0011  *   http://www.apache.org/licenses/LICENSE-2.0
0012  *
0013  * Unless required by applicable law or agreed to in writing,
0014  * software distributed under the License is distributed on an
0015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0016  * KIND, either express or implied.  See the License for the
0017  * specific language governing permissions and limitations
0018  * under the License.
0019  *
0020  */
0021 package org.apache.qpid.client;
0022 
0023 import java.io.Serializable;
0024 import java.net.URISyntaxException;
0025 import java.text.MessageFormat;
0026 import java.util.ArrayList;
0027 import java.util.Collection;
0028 import java.util.Iterator;
0029 import java.util.Map;
0030 import java.util.concurrent.ConcurrentHashMap;
0031 import java.util.concurrent.ConcurrentLinkedQueue;
0032 import java.util.concurrent.CopyOnWriteArrayList;
0033 import java.util.concurrent.CountDownLatch;
0034 import java.util.concurrent.atomic.AtomicBoolean;
0035 import java.util.concurrent.atomic.AtomicInteger;
0036 import java.util.concurrent.atomic.AtomicLong;
0037 
0038 import javax.jms.BytesMessage;
0039 import javax.jms.Destination;
0040 import javax.jms.IllegalStateException;
0041 import javax.jms.InvalidDestinationException;
0042 import javax.jms.InvalidSelectorException;
0043 import javax.jms.JMSException;
0044 import javax.jms.MapMessage;
0045 import javax.jms.MessageConsumer;
0046 import javax.jms.MessageListener;
0047 import javax.jms.MessageProducer;
0048 import javax.jms.ObjectMessage;
0049 import javax.jms.Queue;
0050 import javax.jms.QueueBrowser;
0051 import javax.jms.QueueReceiver;
0052 import javax.jms.QueueSender;
0053 import javax.jms.QueueSession;
0054 import javax.jms.StreamMessage;
0055 import javax.jms.TemporaryQueue;
0056 import javax.jms.TemporaryTopic;
0057 import javax.jms.TextMessage;
0058 import javax.jms.Topic;
0059 import javax.jms.TopicPublisher;
0060 import javax.jms.TopicSession;
0061 import javax.jms.TopicSubscriber;
0062 
0063 import org.apache.qpid.AMQDisconnectedException;
0064 import org.apache.qpid.AMQException;
0065 import org.apache.qpid.AMQInvalidArgumentException;
0066 import org.apache.qpid.AMQInvalidRoutingKeyException;
0067 import org.apache.qpid.client.failover.FailoverException;
0068 import org.apache.qpid.client.failover.FailoverNoopSupport;
0069 import org.apache.qpid.client.failover.FailoverProtectedOperation;
0070 import org.apache.qpid.client.failover.FailoverRetrySupport;
0071 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
0072 import org.apache.qpid.client.message.AbstractJMSMessage;
0073 import org.apache.qpid.client.message.CloseConsumerMessage;
0074 import org.apache.qpid.client.message.JMSBytesMessage;
0075 import org.apache.qpid.client.message.JMSMapMessage;
0076 import org.apache.qpid.client.message.JMSObjectMessage;
0077 import org.apache.qpid.client.message.JMSStreamMessage;
0078 import org.apache.qpid.client.message.JMSTextMessage;
0079 import org.apache.qpid.client.message.MessageFactoryRegistry;
0080 import org.apache.qpid.client.message.UnprocessedMessage;
0081 import org.apache.qpid.client.protocol.AMQProtocolHandler;
0082 import org.apache.qpid.client.state.AMQState;
0083 import org.apache.qpid.client.state.AMQStateManager;
0084 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
0085 import org.apache.qpid.framing.AMQShortString;
0086 import org.apache.qpid.framing.FieldTable;
0087 import org.apache.qpid.framing.FieldTableFactory;
0088 import org.apache.qpid.framing.MethodRegistry;
0089 import org.apache.qpid.jms.Session;
0090 import org.apache.qpid.thread.Threading;
0091 import org.apache.qpid.url.AMQBindingURL;
0092 import org.slf4j.Logger;
0093 import org.slf4j.LoggerFactory;
0094 
0095 /**
0096  <p/><table id="crc"><caption>CRC Card</caption>
0097  <tr><th> Responsibilities <th> Collaborations
0098  <tr><td>
0099  </table>
0100  *
0101  * @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For
0102  * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
0103  * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
0104  * the fail-over process, the retry handler could be used to automatically retry the operation once the connection
0105  * has been reestablished. All fail-over protected operations should be placed in private methods, with
0106  * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
0107  * fail-over process sets a nowait flag and uses an async method call instead.
0108  * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
0109  * after looking at worse bottlenecks first.
0110  */
0111 public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
0112 {
0113 
0114 
0115     public static final class IdToConsumerMap<C extends BasicMessageConsumer>
0116     {
0117         private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
0118         private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
0119 
0120         public C get(int id)
0121         {
0122             if ((id & 0xFFFFFFF0== 0)
0123             {
0124                 return (C_fastAccessConsumers[id];
0125             }
0126             else
0127             {
0128                 return _slowAccessConsumers.get(id);
0129             }
0130         }
0131 
0132         public C put(int id, C consumer)
0133         {
0134             C oldVal;
0135             if ((id & 0xFFFFFFF0== 0)
0136             {
0137                 oldVal = (C_fastAccessConsumers[id];
0138                 _fastAccessConsumers[id= consumer;
0139             }
0140             else
0141             {
0142                 oldVal = _slowAccessConsumers.put(id, consumer);
0143             }
0144 
0145             return consumer;
0146 
0147         }
0148 
0149         public C remove(int id)
0150         {
0151             C consumer;
0152             if ((id & 0xFFFFFFF0== 0)
0153             {
0154                 consumer = (C_fastAccessConsumers[id];
0155                 _fastAccessConsumers[idnull;
0156             }
0157             else
0158             {
0159                 consumer = _slowAccessConsumers.remove(id);
0160             }
0161 
0162             return consumer;
0163 
0164         }
0165 
0166         public Collection<C> values()
0167         {
0168             ArrayList<C> values = new ArrayList<C>();
0169 
0170             for (int i = 0; i < 16; i++)
0171             {
0172                 if (_fastAccessConsumers[i!= null)
0173                 {
0174                     values.add((C_fastAccessConsumers[i]);
0175                 }
0176             }
0177             values.addAll(_slowAccessConsumers.values());
0178 
0179             return values;
0180         }
0181 
0182         public void clear()
0183         {
0184             _slowAccessConsumers.clear();
0185             for (int i = 0; i < 16; i++)
0186             {
0187                 _fastAccessConsumers[inull;
0188             }
0189         }
0190     }
0191 
0192     /** Used for debugging. */
0193     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
0194 
0195     /**
0196      * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
0197      * not need to be attached to a queue.
0198      */
0199     protected static final boolean DEFAULT_IMMEDIATE = false;
0200 
0201     /**
0202      * The default value for mandatory flag used by producers created by this session is true. That is, server will not
0203      * silently drop messages where no queue is connected to the exchange for the message.
0204      */
0205     protected static final boolean DEFAULT_MANDATORY = true;
0206 
0207     /** System property to enable strict AMQP compliance. */
0208     public static final String STRICT_AMQP = "STRICT_AMQP";
0209 
0210     /** Strict AMQP default setting. */
0211     public static final String STRICT_AMQP_DEFAULT = "false";
0212 
0213     /** System property to enable failure if strict AMQP compliance is violated. */
0214     public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
0215 
0216     /** Strickt AMQP failure default. */
0217     public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
0218 
0219     /** System property to enable immediate message prefetching. */
0220     public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
0221 
0222     /** Immediate message prefetch default. */
0223     public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
0224 
0225     /** The connection to which this session belongs. */
0226     protected AMQConnection _connection;
0227 
0228     /** Used to indicate whether or not this is a transactional session. */
0229     protected boolean _transacted;
0230 
0231     /** Holds the sessions acknowledgement mode. */
0232     protected final int _acknowledgeMode;
0233 
0234     /** Holds this session unique identifier, used to distinguish it from other sessions. */
0235     protected int _channelId;
0236 
0237     private int _ticket;
0238 
0239     /** Holds the high mark for prefetched message, at which the session is suspended. */
0240     private int _defaultPrefetchHighMark;
0241 
0242     /** Holds the low mark for prefetched messages, below which the session is resumed. */
0243     private int _defaultPrefetchLowMark;
0244 
0245     /** Holds the message listener, if any, which is attached to this session. */
0246     private MessageListener _messageListener = null;
0247 
0248     /** Used to indicate that this session has been started at least once. */
0249     private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
0250 
0251     /**
0252      * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly.  Note this only
0253      * keeps a record of subscriptions which have been created in the current instance. It does not remember
0254      * subscriptions between executions of the client.
0255      */
0256     protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
0257             new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
0258 
0259     /**
0260      * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
0261      * up in the {@link #_subscriptions} map.
0262      */
0263     protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap =
0264             new ConcurrentHashMap<C, String>();
0265 
0266     /**
0267      * Used to hold incoming messages.
0268      *
0269      * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
0270      */
0271     protected final FlowControllingBlockingQueue _queue;
0272 
0273     /** Holds the highest received delivery tag. */
0274     private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
0275     private final AtomicLong _rollbackMark = new AtomicLong(-1);
0276 
0277     /** All the not yet acknowledged message tags */
0278     protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
0279 
0280     /** All the delivered message tags */
0281     protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
0282 
0283     /** Holds the dispatcher thread for this session. */
0284     protected Dispatcher _dispatcher;
0285     
0286     protected Thread _dispatcherThread;
0287 
0288     /** Holds the message factory factory for this session. */
0289     protected MessageFactoryRegistry _messageFactoryRegistry;
0290 
0291     /** Holds all of the producers created by this session, keyed by their unique identifiers. */
0292     private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
0293 
0294     /**
0295      * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume
0296      * methods.
0297      */
0298     private int _nextTag = 1;
0299 
0300     /**
0301      * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
0302      * consumer.
0303      */
0304     protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
0305 
0306     //Map<AMQShortString, BasicMessageConsumer> _consumers =
0307     //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
0308 
0309     /**
0310      * Contains a list of consumers which have been removed but which might still have
0311      * messages to acknowledge, eg in client ack or transacted modes
0312      */
0313     private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>();
0314 
0315     /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
0316     private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
0317             new ConcurrentHashMap<Destination, AtomicInteger>();
0318 
0319     /**
0320      * Used as a source of unique identifiers for producers within the session.
0321      *
0322      <p/> Access to this id does not require to be synchronized since according to the JMS specification only one
0323      * thread of control is allowed to create producers for any given session instance.
0324      */
0325     private long _nextProducerId;
0326 
0327     /**
0328      * Set when recover is called. This is to handle the case where recover() is called by application code during
0329      * onMessage() processing to enure that an auto ack is not sent.
0330      */
0331     private boolean _inRecovery;
0332 
0333     /** Used to indicates that the connection to which this session belongs, has been stopped. */
0334     private boolean _connectionStopped;
0335 
0336     /** Used to indicate that this session has a message listener attached to it. */
0337     private boolean _hasMessageListeners;
0338 
0339     /** Used to indicate that this session has been suspended. */
0340     private boolean _suspended;
0341 
0342     /**
0343      * Used to protect the suspension of this session, so that critical code can be executed during suspension,
0344      * without the session being resumed by other threads.
0345      */
0346     private final Object _suspensionLock = new Object();
0347 
0348     /**
0349      * Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel.
0350      *
0351      * @todo This is accessed only within a synchronized method, so does not need to be atomic.
0352      */
0353     protected final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
0354 
0355     /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
0356     protected final boolean _immediatePrefetch;
0357 
0358     /** Indicates that warnings should be generated on violations of the strict AMQP. */
0359     protected final boolean _strictAMQP;
0360 
0361     /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
0362     protected final boolean _strictAMQPFATAL;
0363     private final Object _messageDeliveryLock = new Object();
0364 
0365     /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
0366     private boolean _dirty;
0367     /** Has failover occured on this session with outstanding actions to commit? */
0368     private boolean _failedOverDirty;
0369 
0370     private static final class FlowControlIndicator
0371     {
0372         private volatile boolean _flowControl = true;
0373 
0374         public synchronized void setFlowControl(boolean flowControl)
0375         {
0376             _flowControl = flowControl;
0377             notify();
0378         }
0379 
0380         public boolean getFlowControl()
0381         {
0382             return _flowControl;
0383         }
0384     }
0385 
0386     /** Flow control */
0387     private FlowControlIndicator _flowControl = new FlowControlIndicator();
0388 
0389     /**
0390      * Creates a new session on a connection.
0391      *
0392      @param con                     The connection on which to create the session.
0393      @param channelId               The unique identifier for the session.
0394      @param transacted              Indicates whether or not the session is transactional.
0395      @param acknowledgeMode         The acknoledgement mode for the session.
0396      @param messageFactoryRegistry  The message factory factory for the session.
0397      @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
0398      @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
0399      */
0400     protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
0401                MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
0402     {
0403 
0404         _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
0405         _strictAMQPFATAL =
0406                 Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
0407         _immediatePrefetch =
0408                 _strictAMQP
0409                 || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
0410 
0411         _connection = con;
0412         _transacted = transacted;
0413         if (transacted)
0414         {
0415             _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED;
0416         }
0417         else
0418         {
0419             _acknowledgeMode = acknowledgeMode;
0420         }
0421 
0422         _channelId = channelId;
0423         _messageFactoryRegistry = messageFactoryRegistry;
0424         _defaultPrefetchHighMark = defaultPrefetchHighMark;
0425         _defaultPrefetchLowMark = defaultPrefetchLowMark;
0426 
0427         if (_acknowledgeMode == NO_ACKNOWLEDGE)
0428         {
0429             _queue =
0430                     new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
0431                                                      new FlowControllingBlockingQueue.ThresholdListener()
0432                                                      {
0433                                                          private final AtomicBoolean _suspendState = new AtomicBoolean();
0434 
0435                                                          public void aboveThreshold(int currentValue)
0436                                                          {
0437                                                              _logger.debug(
0438                                                                      "Above threshold(" + _defaultPrefetchHighMark
0439                                                                      ") so suspending channel. Current value is " + currentValue);
0440                                                              _suspendState.set(true);
0441                                                              new Thread(new SuspenderRunner(_suspendState)).start();
0442 
0443                                                          }
0444 
0445                                                          public void underThreshold(int currentValue)
0446                                                          {
0447                                                              _logger.debug(
0448                                                                      "Below threshold(" + _defaultPrefetchLowMark
0449                                                                      ") so unsuspending channel. Current value is " + currentValue);
0450                                                              _suspendState.set(false);
0451                                                              new Thread(new SuspenderRunner(_suspendState)).start();
0452 
0453                                                          }
0454                                                      });
0455         }
0456         else
0457         {
0458             _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
0459         }
0460     }
0461 
0462     /**
0463      * Creates a new session on a connection with the default message factory factory.
0464      *
0465      @param con                 The connection on which to create the session.
0466      @param channelId           The unique identifier for the session.
0467      @param transacted          Indicates whether or not the session is transactional.
0468      @param acknowledgeMode     The acknoledgement mode for the session.
0469      @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
0470      @param defaultPrefetchLow  The number of prefetched messages at which to resume the session.
0471      */
0472     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
0473                int defaultPrefetchLow)
0474     {
0475         this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
0476              defaultPrefetchLow);
0477     }
0478 
0479     // ===== JMS Session methods.
0480 
0481     /**
0482      * Closes the session with no timeout.
0483      *
0484      @throws JMSException If the JMS provider fails to close the session due to some internal error.
0485      */
0486     public void close() throws JMSException
0487     {
0488         close(-1);
0489     }
0490 
0491     public void checkNotClosed() throws JMSException
0492     {
0493         try
0494         {
0495             super.checkNotClosed();
0496         }
0497         catch (IllegalStateException ise)
0498         {
0499             // if the Connection has closed then we should throw any exception that has occured that we were not waiting for
0500             AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
0501 
0502             if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED&& manager.getLastException() != null)
0503             {
0504                 ise.setLinkedException(manager.getLastException());
0505             }
0506 
0507             throw ise;
0508         }
0509     }
0510 
0511     public BytesMessage createBytesMessage() throws JMSException
0512     {
0513         checkNotClosed();
0514         return new JMSBytesMessage(getMessageDelegateFactory());
0515     }
0516 
0517     /**
0518      * Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
0519      *
0520      @throws IllegalStateException If the session is closed.
0521      */
0522     public void acknowledge() throws IllegalStateException
0523     {
0524         if (isClosed())
0525         {
0526             throw new IllegalStateException("Session is already closed");
0527         }
0528         else if (hasFailedOver())
0529         {
0530             throw new IllegalStateException("has failed over");
0531         }
0532 
0533         while (true)
0534         {
0535             Long tag = _unacknowledgedMessageTags.poll();
0536             if (tag == null)
0537             {
0538                 break;
0539             }
0540             acknowledgeMessage(tag, false);
0541         }
0542     }
0543 
0544     /**
0545      * Acknowledge one or many messages.
0546      *
0547      @param deliveryTag The tag of the last message to be acknowledged.
0548      @param multiple    <tt>true</tt> to acknowledge all messages up to and including the one specified by the
0549      *                    delivery tag, <tt>false</tt> to just acknowledge that message.
0550      *
0551      * @todo Be aware of possible changes to parameter order as versions change.
0552      */
0553     public abstract void acknowledgeMessage(long deliveryTag, boolean multiple);
0554 
0555     public MethodRegistry getMethodRegistry()
0556     {
0557         MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
0558         return methodRegistry;
0559     }
0560 
0561     /**
0562      * Binds the named queue, with the specified routing key, to the named exchange.
0563      *
0564      <p/>Note that this operation automatically retries in the event of fail-over.
0565      *
0566      @param queueName    The name of the queue to bind.
0567      @param routingKey   The routing key to bind the queue with.
0568      @param arguments    Additional arguments.
0569      @param exchangeName The exchange to bind the queue on.
0570      *
0571      @throws AMQException If the queue cannot be bound for any reason.
0572      * @todo Be aware of possible changes to parameter order as versions change.
0573      * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
0574      */
0575     public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
0576                           final AMQShortString exchangeName, final AMQDestination destinationthrows AMQException
0577     {
0578         /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
0579         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
0580         {
0581             public Object execute() throws AMQException, FailoverException
0582             {
0583                 sendQueueBind(queueName, routingKey, arguments, exchangeName, destination);
0584                 return null;
0585             }
0586         }, _connection).execute();
0587     }
0588 
0589     public void addBindingKey(C consumer, AMQDestination amqd, String routingKeythrows AMQException
0590     {
0591         if (consumer.getQueuename() != null)
0592         {
0593             bindQueue(consumer.getQueuename()new AMQShortString(routingKey)new FieldTable(), amqd.getExchangeName(), amqd);
0594         }
0595     }
0596 
0597     public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
0598                                        final AMQShortString exchangeName, AMQDestination destinationthrows AMQException, FailoverException;
0599 
0600     /**
0601      * Closes the session.
0602      *
0603      <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close
0604      * the channel. This is because the channel is marked as closed before the request to close it is made, so the
0605      * fail-over should not re-open it.
0606      *
0607      @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
0608      *
0609      @throws JMSException If the JMS provider fails to close the session due to some internal error.
0610      * @todo Be aware of possible changes to parameter order as versions change.
0611      * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be
0612      * re-opened. May need to examine this more carefully.
0613      * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover,
0614      * because the failover process sends the failover event before acquiring the mutex itself.
0615      */
0616     public void close(long timeoutthrows JMSException
0617     {
0618         if (_logger.isInfoEnabled())
0619         {
0620             StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
0621             _logger.info("Closing session: " this)// + ":"
0622             // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
0623         }
0624 
0625         // Ensure we only try and close an open session.
0626         if (!_closed.getAndSet(true))
0627         {
0628             synchronized (getFailoverMutex())
0629             {
0630                 // We must close down all producers and consumers in an orderly fashion. This is the only method
0631                 // that can be called from a different thread of control from the one controlling the session.
0632                 synchronized (_messageDeliveryLock)
0633                 {
0634                     // we pass null since this is not an error case
0635                     closeProducersAndConsumers(null);
0636 
0637                     try
0638                     {
0639                         sendClose(timeout);
0640                     }
0641                     catch (AMQException e)
0642                     {
0643                         JMSException jmse = new JMSException("Error closing session: " + e);
0644                         jmse.setLinkedException(e);
0645                         throw jmse;
0646                     }
0647                     // This is ignored because the channel is already marked as closed so the fail-over process will
0648                     // not re-open it.
0649                     catch (FailoverException e)
0650                     {
0651                         _logger.debug(
0652                                 "Got FailoverException during channel close, ignored as channel already marked as closed.");
0653                     }
0654                     finally
0655                     {
0656                         _connection.deregisterSession(_channelId);
0657                     }
0658                 }
0659             }
0660         }
0661     }
0662 
0663     public abstract void sendClose(long timeoutthrows AMQException, FailoverException;
0664 
0665     /**
0666      * Called when the server initiates the closure of the session unilaterally.
0667      *
0668      @param e the exception that caused this session to be closed. Null causes the
0669      */
0670     public void closed(Throwable ethrows JMSException
0671     {
0672         // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
0673         // calls through connection.closeAllSessions which is also called by the public connection.close()
0674         // with a null cause
0675         // When we are closing the Session due to a protocol session error we simply create a new AMQException
0676         // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
0677         // We need to determin here if the connection should be
0678 
0679         if (instanceof AMQDisconnectedException)
0680         {
0681             if (_dispatcher != null)
0682             {
0683                 // Failover failed and ain't coming back. Knife the dispatcher.
0684                 _dispatcherThread.interrupt();
0685             }
0686         }
0687 
0688         if (!_closed.getAndSet(true))
0689         {
0690             synchronized (getFailoverMutex())
0691             {
0692                 synchronized (_messageDeliveryLock)
0693                 {
0694                     // An AMQException has an error code and message already and will be passed in when closure occurs as a
0695                     // result of a channel close request
0696                     AMQException amqe;
0697                     if (instanceof AMQException)
0698                     {
0699                         amqe = (AMQExceptione;
0700                     }
0701                     else
0702                     {
0703                         amqe = new AMQException("Closing session forcibly", e);
0704                     }
0705 
0706                     _connection.deregisterSession(_channelId);
0707                     closeProducersAndConsumers(amqe);
0708                 }
0709             }
0710         }
0711     }
0712 
0713     /**
0714      * Commits all messages done in this transaction and releases any locks currently held.
0715      *
0716      <p/>If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the
0717      * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown.
0718      * The client will be unable to determine whether or not the commit actually happened on the broker in this case.
0719      *
0720      @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
0721      *                      not mean that the commit is known to have failed, merely that it is not known whether it
0722      *                      failed or not.
0723      * @todo Be aware of possible changes to parameter order as versions change.
0724      */
0725     public void commit() throws JMSException
0726     {
0727         checkTransacted();
0728 
0729         try
0730         {
0731 
0732             // TGM FIXME: what about failover?
0733             // Acknowledge all delivered messages
0734             while (true)
0735             {
0736                 Long tag = _deliveredMessageTags.poll();
0737                 if (tag == null)
0738                 {
0739                     break;
0740                 }
0741 
0742                 acknowledgeMessage(tag, false);
0743             }
0744             // Commits outstanding messages and acknowledgments
0745             sendCommit();
0746             markClean();
0747         }
0748         catch (AMQException e)
0749         {
0750             throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
0751         }
0752         catch (FailoverException e)
0753         {
0754             throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
0755         }
0756     }
0757 
0758     public abstract void sendCommit() throws AMQException, FailoverException;
0759 
0760 
0761     public void confirmConsumerCancelled(int consumerTag)
0762     {
0763 
0764         // Remove the consumer from the map
0765         C consumer = _consumers.get(consumerTag);
0766         if (consumer != null)
0767         {
0768             if (!consumer.isNoConsume())  // Normal Consumer
0769             {
0770                 // Clean the Maps up first
0771                 // Flush any pending messages for this consumerTag
0772                 if (_dispatcher != null)
0773                 {
0774                     _logger.info("Dispatcher is not null");
0775                 }
0776                 else
0777                 {
0778                     _logger.info("Dispatcher is null so created stopped dispatcher");
0779                     startDispatcherIfNecessary(true);
0780                 }
0781 
0782                 _dispatcher.rejectPending(consumer);
0783             }
0784             else // Queue Browser
0785             {
0786                 // Just close the consumer
0787                 // fixme  the CancelOK is being processed before the arriving messages..
0788                 // The dispatcher is still to process them so the server sent in order but the client
0789                 // has yet to receive before the close comes in.
0790 
0791                 // consumer.markClosed();
0792 
0793                 if (consumer.isAutoClose())
0794                 {
0795                     // There is a small window where the message is between the two queues in the dispatcher.
0796                     if (consumer.isClosed())
0797                     {
0798                         if (_logger.isInfoEnabled())
0799                         {
0800                             _logger.info("Closing consumer:" + consumer.debugIdentity());
0801                         }
0802 
0803                         deregisterConsumer(consumer);
0804                     }
0805                     else
0806                     {
0807                         _queue.add(new CloseConsumerMessage(consumer));
0808                     }
0809                 }
0810             }
0811         }
0812     }
0813 
0814     public QueueBrowser createBrowser(Queue queuethrows JMSException
0815     {
0816         if (isStrictAMQP())
0817         {
0818             throw new UnsupportedOperationException();
0819         }
0820 
0821         return createBrowser(queue, null);
0822     }
0823 
0824     public QueueBrowser createBrowser(Queue queue, String messageSelectorthrows JMSException
0825     {
0826         if (isStrictAMQP())
0827         {
0828             throw new UnsupportedOperationException();
0829         }
0830 
0831         checkNotClosed();
0832         checkValidQueue(queue);
0833 
0834         return new AMQQueueBrowser(this, (AMQQueuequeue, messageSelector);
0835     }
0836 
0837     public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
0838             throws JMSException
0839     {
0840         checkValidDestination(destination);
0841 
0842         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
0843                                   messageSelector, null, true, true);
0844     }
0845 
0846     public MessageConsumer createConsumer(Destination destinationthrows JMSException
0847     {
0848         checkValidDestination(destination);
0849 
0850         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), null, null,
0851                                   false, false);
0852     }
0853 
0854     public C createExclusiveConsumer(Destination destinationthrows JMSException
0855     {
0856         checkValidDestination(destination);
0857 
0858         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
0859                                   false, false);
0860     }
0861 
0862     public MessageConsumer createConsumer(Destination destination, String messageSelectorthrows JMSException
0863     {
0864         checkValidDestination(destination);
0865 
0866         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic),
0867                                   messageSelector, null, false, false);
0868     }
0869 
0870     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
0871             throws JMSException
0872     {
0873         checkValidDestination(destination);
0874 
0875         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, (destination instanceof Topic),
0876                                   messageSelector, null, false, false);
0877     }
0878 
0879     public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
0880             throws JMSException
0881     {
0882         checkValidDestination(destination);
0883 
0884         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
0885                                   messageSelector, null, false, false);
0886     }
0887 
0888     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
0889                                           String selectorthrows JMSException
0890     {
0891         checkValidDestination(destination);
0892 
0893         return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false);
0894     }
0895 
0896     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
0897                                           boolean exclusive, String selectorthrows JMSException
0898     {
0899         checkValidDestination(destination);
0900 
0901         return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
0902     }
0903 
0904     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
0905                                           String selector, FieldTable rawSelectorthrows JMSException
0906     {
0907         checkValidDestination(destination);
0908 
0909         return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false);
0910     }
0911 
0912     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
0913                                           boolean exclusive, String selector, FieldTable rawSelectorthrows JMSException
0914     {
0915         checkValidDestination(destination);
0916 
0917         return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false,
0918                                   false);
0919     }
0920 
0921     public abstract TopicSubscriber createDurableSubscriber(Topic topic, String namethrows JMSException;
0922 
0923     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
0924             throws JMSException
0925     {
0926         checkNotClosed();
0927         checkValidTopic(topic);
0928         if (_subscriptions.containsKey(name))
0929         {
0930             _subscriptions.get(name).close();
0931         }
0932         AMQTopic dest = AMQTopic.createDurableTopic((AMQTopictopic, name, _connection);
0933         C consumer = (CcreateConsumer(dest, messageSelector, noLocal);
0934         TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer);
0935         _subscriptions.put(name, subscriber);
0936         _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
0937 
0938         return subscriber;
0939     }
0940 
0941     public MapMessage createMapMessage() throws JMSException
0942     {
0943         checkNotClosed();
0944         return new JMSMapMessage(getMessageDelegateFactory());
0945     }
0946 
0947     public javax.jms.Message createMessage() throws JMSException
0948     {
0949         return createBytesMessage();
0950     }
0951 
0952     public ObjectMessage createObjectMessage() throws JMSException
0953     {
0954         checkNotClosed();
0955         return (ObjectMessagenew JMSObjectMessage(getMessageDelegateFactory());
0956     }
0957 
0958     public ObjectMessage createObjectMessage(Serializable objectthrows JMSException
0959     {
0960         ObjectMessage msg = createObjectMessage();
0961         msg.setObject(object);
0962 
0963         return msg;
0964     }
0965 
0966     public P createProducer(Destination destinationthrows JMSException
0967     {
0968         return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
0969     }
0970 
0971     public P createProducer(Destination destination, boolean immediatethrows JMSException
0972     {
0973         return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
0974     }
0975 
0976     public P createProducer(Destination destination, boolean mandatory, boolean immediate)
0977             throws JMSException
0978     {
0979         return createProducerImpl(destination, mandatory, immediate);
0980     }
0981 
0982     public P createProducer(Destination destination, boolean mandatory, boolean immediate,
0983                                                boolean waitUntilSentthrows JMSException
0984     {
0985         return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
0986     }
0987 
0988     public TopicPublisher createPublisher(Topic topicthrows JMSException
0989     {
0990         checkNotClosed();
0991 
0992         return new TopicPublisherAdapter((PcreateProducer(topic, false, false), topic);
0993     }
0994 
0995     public Queue createQueue(String queueNamethrows JMSException
0996     {
0997         checkNotClosed();
0998         if (queueName.indexOf('/'== -1)
0999         {
1000             return new AMQQueue(getDefaultQueueExchangeName()new AMQShortString(queueName));
1001         }
1002         else
1003         {
1004             try
1005             {
1006                 return new AMQQueue(new AMQBindingURL(queueName));
1007             }
1008             catch (URISyntaxException urlse)
1009             {
1010                 JMSException jmse = new JMSException(urlse.getReason());
1011                 jmse.setLinkedException(urlse);
1012 
1013                 throw jmse;
1014             }
1015         }
1016     }
1017 
1018     /**
1019      * Declares the named queue.
1020      *
1021      <p/>Note that this operation automatically retries in the event of fail-over.
1022      *
1023      @param name       The name of the queue to declare.
1024      @param autoDelete
1025      @param durable    Flag to indicate that the queue is durable.
1026      @param exclusive  Flag to indicate that the queue is exclusive to this client.
1027      *
1028      @throws AMQException If the queue cannot be declared for any reason.
1029      * @todo Be aware of possible changes to parameter order as versions change.
1030      */
1031     public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
1032                             final boolean exclusivethrows AMQException
1033     {
1034         createQueue(name, autoDelete, durable, exclusive, null);
1035     }
1036 
1037     /**
1038      * Declares the named queue.
1039      *
1040      <p/>Note that this operation automatically retries in the event of fail-over.
1041      *
1042      @param name       The name of the queue to declare.
1043      @param autoDelete
1044      @param durable    Flag to indicate that the queue is durable.
1045      @param exclusive  Flag to indicate that the queue is exclusive to this client.
1046      @param arguments  Arguments used to set special properties of the queue
1047      *
1048      @throws AMQException If the queue cannot be declared for any reason.
1049      * @todo Be aware of possible changes to parameter order as versions change.
1050      */
1051     public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
1052                             final boolean exclusive, final Map<String, Object> argumentsthrows AMQException
1053     {
1054         new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
1055         {
1056             public Object execute() throws AMQException, FailoverException
1057             {
1058                 sendCreateQueue(name, autoDelete, durable, exclusive, arguments);
1059                 return null;
1060             }
1061         }, _connection).execute();
1062     }
1063 
1064     public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
1065                                          final boolean exclusive, final Map<String, Object> argumentsthrows AMQException, FailoverException;
1066 
1067     /**
1068      * Creates a QueueReceiver
1069      *
1070      @param destination
1071      *
1072      @return QueueReceiver - a wrapper around our MessageConsumer
1073      *
1074      @throws JMSException
1075      */
1076     public QueueReceiver createQueueReceiver(Destination destinationthrows JMSException
1077     {
1078         checkValidDestination(destination);
1079         AMQQueue dest = (AMQQueuedestination;
1080         C consumer = (CcreateConsumer(destination);
1081 
1082         return new QueueReceiverAdaptor(dest, consumer);
1083     }
1084 
1085     /**
1086      * Creates a QueueReceiver using a message selector
1087      *
1088      @param destination
1089      @param messageSelector
1090      *
1091      @return QueueReceiver - a wrapper around our MessageConsumer
1092      *
1093      @throws JMSException
1094      */
1095     public QueueReceiver createQueueReceiver(Destination destination, String messageSelectorthrows JMSException
1096     {
1097         checkValidDestination(destination);
1098         AMQQueue dest = (AMQQueuedestination;
1099         C consumer = (CcreateConsumer(destination, messageSelector);
1100 
1101         return new QueueReceiverAdaptor(dest, consumer);
1102     }
1103 
1104     /**
1105      * Creates a QueueReceiver wrapping a MessageConsumer
1106      *
1107      @param queue
1108      *
1109      @return QueueReceiver
1110      *
1111      @throws JMSException
1112      */
1113     public QueueReceiver createReceiver(Queue queuethrows JMSException
1114     {
1115         checkNotClosed();
1116         AMQQueue dest = (AMQQueuequeue;
1117         C consumer = (CcreateConsumer(dest);
1118 
1119         return new QueueReceiverAdaptor(dest, consumer);
1120     }
1121 
1122     /**
1123      * Creates a QueueReceiver wrapping a MessageConsumer using a message selector
1124      *
1125      @param queue
1126      @param messageSelector
1127      *
1128      @return QueueReceiver
1129      *
1130      @throws JMSException
1131      */
1132     public QueueReceiver createReceiver(Queue queue, String messageSelectorthrows JMSException
1133     {
1134         checkNotClosed();
1135         AMQQueue dest = (AMQQueuequeue;
1136         C consumer = (CcreateConsumer(dest, messageSelector);
1137 
1138         return new QueueReceiverAdaptor(dest, consumer);
1139     }
1140 
1141     public QueueSender createSender(Queue queuethrows JMSException
1142     {
1143         checkNotClosed();
1144 
1145         // return (QueueSender) createProducer(queue);
1146         return new QueueSenderAdapter(createProducer(queue), queue);
1147     }
1148 
1149     public StreamMessage createStreamMessage() throws JMSException
1150     {
1151         // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
1152         // calls through connection.closeAllSessions which is also called by the public connection.close()
1153         // with a null cause
1154         // When we are closing the Session due to a protocol session error we simply create a new AMQException
1155         // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
1156         // We need to determin here if the connection should be
1157 
1158         synchronized (getFailoverMutex())
1159         {
1160             checkNotClosed();
1161 
1162             return new JMSStreamMessage(getMessageDelegateFactory());
1163         }
1164     }
1165 
1166     /**
1167      * Creates a non-durable subscriber
1168      *
1169      @param topic
1170      *
1171      @return TopicSubscriber - a wrapper round our MessageConsumer
1172      *
1173      @throws JMSException
1174      */
1175     public TopicSubscriber createSubscriber(Topic topicthrows JMSException
1176     {
1177         checkNotClosed();
1178         AMQTopic dest = checkValidTopic(topic);
1179 
1180         // AMQTopic dest = new AMQTopic(topic.getTopicName());
1181         return new TopicSubscriberAdaptor(dest, (CcreateExclusiveConsumer(dest));
1182     }
1183 
1184     /**
1185      * Creates a non-durable subscriber with a message selector
1186      *
1187      @param topic
1188      @param messageSelector
1189      @param noLocal
1190      *
1191      @return TopicSubscriber - a wrapper round our MessageConsumer
1192      *
1193      @throws JMSException
1194      */
1195     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocalthrows JMSException
1196     {
1197         checkNotClosed();
1198         AMQTopic dest = checkValidTopic(topic);
1199 
1200         // AMQTopic dest = new AMQTopic(topic.getTopicName());
1201         return new TopicSubscriberAdaptor(dest, (CcreateExclusiveConsumer(dest, messageSelector, noLocal));
1202     }
1203 
1204     public TemporaryQueue createTemporaryQueue() throws JMSException
1205     {
1206         checkNotClosed();
1207         try
1208         {
1209             AMQTemporaryQueue result = new AMQTemporaryQueue(this);
1210 
1211             // this is done so that we can produce to a temporary queue before we create a consumer
1212             result.setQueueName(result.getRoutingKey());
1213             createQueue(result.getAMQQueueName(), result.isAutoDelete()
1214                         result.isDurable(), result.isExclusive());
1215             bindQueue(result.getAMQQueueName(), result.getRoutingKey()
1216                     new FieldTable(), result.getExchangeName(), result);
1217             return result;
1218         }
1219         catch (Exception e)
1220         {
1221            JMSException ex = new JMSException("Cannot create temporary queue");
1222            ex.setLinkedException(e);
1223            e.printStackTrace();
1224            throw ex;
1225         }
1226     }
1227 
1228     public TemporaryTopic createTemporaryTopic() throws JMSException
1229     {
1230         checkNotClosed();
1231 
1232         return new AMQTemporaryTopic(this);
1233     }
1234 
1235     public TextMessage createTextMessage() throws JMSException
1236     {
1237         synchronized (getFailoverMutex())
1238         {
1239             checkNotClosed();
1240 
1241             return new JMSTextMessage(getMessageDelegateFactory());
1242         }
1243     }
1244 
1245     protected Object getFailoverMutex()
1246     {
1247         return _connection.getFailoverMutex();
1248     }
1249 
1250     public TextMessage createTextMessage(String textthrows JMSException
1251     {
1252 
1253         TextMessage msg = createTextMessage();
1254         msg.setText(text);
1255 
1256         return msg;
1257     }
1258 
1259     public Topic createTopic(String topicNamethrows JMSException
1260     {
1261         checkNotClosed();
1262 
1263         if (topicName.indexOf('/'== -1)
1264         {
1265             return new AMQTopic(getDefaultTopicExchangeName()new AMQShortString(topicName));
1266         }
1267         else
1268         {
1269             try
1270             {
1271                 return new AMQTopic(new AMQBindingURL(topicName));
1272             }
1273             catch (URISyntaxException urlse)
1274             {
1275                 JMSException jmse = new JMSException(urlse.getReason());
1276                 jmse.setLinkedException(urlse);
1277 
1278                 throw jmse;
1279             }
1280         }
1281     }
1282 
1283     public void declareExchange(AMQShortString name, AMQShortString type, boolean nowaitthrows AMQException
1284     {
1285         declareExchange(name, type, getProtocolHandler(), nowait);
1286     }
1287 
1288     public int getAcknowledgeMode() throws JMSException
1289     {
1290         checkNotClosed();
1291 
1292         return _acknowledgeMode;
1293     }
1294 
1295     public AMQConnection getAMQConnection()
1296     {
1297         return _connection;
1298     }
1299 
1300     public int getChannelId()
1301     {
1302         return _channelId;
1303     }
1304 
1305     public int getDefaultPrefetch()
1306     {
1307         return _defaultPrefetchHighMark;
1308     }
1309 
1310     public int getDefaultPrefetchHigh()
1311     {
1312         return _defaultPrefetchHighMark;
1313     }
1314 
1315     public int getDefaultPrefetchLow()
1316     {
1317         return _defaultPrefetchLowMark;
1318     }
1319 
1320     public AMQShortString getDefaultQueueExchangeName()
1321     {
1322         return _connection.getDefaultQueueExchangeName();
1323     }
1324 
1325     public AMQShortString getDefaultTopicExchangeName()
1326     {
1327         return _connection.getDefaultTopicExchangeName();
1328     }
1329 
1330     public MessageListener getMessageListener() throws JMSException
1331     {
1332         // checkNotClosed();
1333         return _messageListener;
1334     }
1335 
1336     public AMQShortString getTemporaryQueueExchangeName()
1337     {
1338         return _connection.getTemporaryQueueExchangeName();
1339     }
1340 
1341     public AMQShortString getTemporaryTopicExchangeName()
1342     {
1343         return _connection.getTemporaryTopicExchangeName();
1344     }
1345 
1346     public int getTicket()
1347     {
1348         return _ticket;
1349     }
1350 
1351     public boolean getTransacted() throws JMSException
1352     {
1353         checkNotClosed();
1354 
1355         return _transacted;
1356     }
1357 
1358     public boolean hasConsumer(Destination destination)
1359     {
1360         AtomicInteger counter = _destinationConsumerCount.get(destination);
1361 
1362         return (counter != null&& (counter.get() != 0);
1363     }
1364 
1365     public boolean isStrictAMQP()
1366     {
1367         return _strictAMQP;
1368     }
1369 
1370     public boolean isSuspended()
1371     {
1372         return _suspended;
1373     }
1374 
1375     protected void addUnacknowledgedMessage(long id)
1376     {
1377         _unacknowledgedMessageTags.add(id);
1378     }
1379 
1380     protected void addDeliveredMessage(long id)
1381     {
1382         _deliveredMessageTags.add(id);
1383     }
1384 
1385     /**
1386      * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto
1387      * the queue read by the dispatcher.
1388      *
1389      @param message the message that has been received
1390      */
1391     public void messageReceived(UnprocessedMessage message)
1392     {
1393         if (_logger.isDebugEnabled())
1394         {
1395             _logger.debug("Message[" + message.toString() "] received in session");
1396         }
1397         _highestDeliveryTag.set(message.getDeliveryTag());
1398         _queue.add(message);
1399     }
1400 
1401     public void declareAndBind(AMQDestination amqd)
1402             throws
1403             AMQException
1404     {
1405         AMQProtocolHandler protocolHandler = getProtocolHandler();
1406         declareExchange(amqd, protocolHandler, false);
1407         AMQShortString queueName = declareQueue(amqd, protocolHandler, false);
1408         bindQueue(queueName, amqd.getRoutingKey()new FieldTable(), amqd.getExchangeName(), amqd);
1409     }
1410 
1411     /**
1412      * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
1413      *
1414      <p/>All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges
1415      * all messages that have been delivered to the client.
1416      *
1417      <p/>Restarting a session causes it to take the following actions:
1418      *
1419      <ul>
1420      <li>Stop message delivery.</li>
1421      <li>Mark all messages that might have been delivered but not acknowledged as "redelivered".
1422      <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
1423      * Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
1424      </ul>
1425      *
1426      <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
1427      * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible
1428      * for the client to determine whether the broker is going to recover the session or not.
1429      *
1430      @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error.
1431      *                      Not that this does not necessarily mean that the recovery has failed, but simply that it is
1432      *                      not possible to tell if it has or not.
1433      * @todo Be aware of possible changes to parameter order as versions change.
1434      */
1435     public void recover() throws JMSException
1436     {
1437         // Ensure that the session is open.
1438         checkNotClosed();
1439 
1440         // Ensure that the session is not transacted.
1441         checkNotTransacted();
1442 
1443         // this is set only here, and the before the consumer's onMessage is called it is set to false
1444         _inRecovery = true;
1445         try
1446         {
1447 
1448             boolean isSuspended = isSuspended();
1449 
1450             if (!isSuspended)
1451             {
1452                 suspendChannel(true);
1453             }
1454 
1455             if (_dispatcher != null)
1456             {
1457                 _dispatcher.rollback();
1458             }
1459 
1460             sendRecover();
1461 
1462             if (!isSuspended)
1463             {
1464                 suspendChannel(false);
1465             }
1466         }
1467         catch (AMQException e)
1468         {
1469             throw new JMSAMQException("Recover failed: " + e.getMessage(), e);
1470         }
1471         catch (FailoverException e)
1472         {
1473             throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
1474         }
1475     }
1476 
1477     protected abstract void sendRecover() throws AMQException, FailoverException;
1478 
1479     public void rejectMessage(UnprocessedMessage message, boolean requeue)
1480     {
1481 
1482         if (_logger.isDebugEnabled())
1483         {
1484             _logger.debug("Rejecting Unacked message:" + message.getDeliveryTag());
1485         }
1486 
1487         rejectMessage(message.getDeliveryTag(), requeue);
1488     }
1489 
1490     public void rejectMessage(AbstractJMSMessage message, boolean requeue)
1491     {
1492         if (_logger.isDebugEnabled())
1493         {
1494             _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
1495         }
1496 
1497         rejectMessage(message.getDeliveryTag(), requeue);
1498 
1499     }
1500 
1501     public abstract void rejectMessage(long deliveryTag, boolean requeue);
1502 
1503     /**
1504      * Commits all messages done in this transaction and releases any locks currently held.
1505      *
1506      <p/>If the rollback fails, because the rollback itself is interrupted by a fail-over between requesting that the
1507      * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown.
1508      * The client will be unable to determine whether or not the rollback actually happened on the broker in this case.
1509      *
1510      @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does
1511      *                      not mean that the rollback is known to have failed, merely that it is not known whether it
1512      *                      failed or not.
1513      * @todo Be aware of possible changes to parameter order as versions change.
1514      */
1515     public void rollback() throws JMSException
1516     {
1517         synchronized (_suspensionLock)
1518         {
1519             checkTransacted();
1520 
1521             try
1522             {
1523                 boolean isSuspended = isSuspended();
1524 
1525                 if (!isSuspended)
1526                 {
1527                     suspendChannel(true);
1528                 }
1529 
1530                 releaseForRollback();
1531 
1532                 sendRollback();
1533 
1534                 markClean();
1535 
1536                 if (!isSuspended)
1537                 {
1538                     suspendChannel(false);
1539                 }
1540             }
1541             catch (AMQException e)
1542             {
1543                 throw new JMSAMQException("Failed to rollback: " + e, e);
1544             }
1545             catch (FailoverException e)
1546             {
1547                 throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e);
1548             }
1549         }
1550     }
1551 
1552     public abstract void releaseForRollback();
1553 
1554     public abstract void sendRollback() throws AMQException, FailoverException;
1555 
1556     public void run()
1557     {
1558         throw new java.lang.UnsupportedOperationException();
1559     }
1560 
1561     public void setMessageListener(MessageListener listenerthrows JMSException
1562     {
1563         // checkNotClosed();
1564         //
1565         // if (_dispatcher != null && !_dispatcher.connectionStopped())
1566         // {
1567         // throw new javax.njms.IllegalStateException("Attempt to set listener while session is started.");
1568         // }
1569         //
1570         // // We are stopped
1571         // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
1572         // {
1573         // BasicMessageConsumer consumer = i.next();
1574         //
1575         // if (consumer.isReceiving())
1576         // {
1577         // throw new javax.njms.IllegalStateException("Another thread is already receiving synchronously.");
1578         // }
1579         // }
1580         //
1581         // _messageListener = listener;
1582         //
1583         // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
1584         // {
1585         // i.next().setMessageListener(_messageListener);
1586         // }
1587 
1588     }
1589 
1590     /*public void setTicket(int ticket)
1591     {
1592         _ticket = ticket;
1593     }*/
1594 
1595     public void unsubscribe(String namethrows JMSException
1596     {
1597         checkNotClosed();
1598         TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
1599         if (subscriber != null)
1600         {
1601             // send a queue.delete for the subscription
1602             deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
1603             _subscriptions.remove(name);
1604             _reverseSubscriptionMap.remove(subscriber);
1605         }
1606         else
1607         {
1608             if (_strictAMQP)
1609             {
1610                 if (_strictAMQPFATAL)
1611                 {
1612                     throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
1613                 }
1614                 else
1615                 {
1616                     _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
1617                                  " Requesting queue deletion regardless.");
1618                 }
1619 
1620                 deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
1621             }
1622             else // Queue Browser
1623             {
1624 
1625                 if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection)))
1626                 {
1627                     deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
1628                 }
1629                 else
1630                 {
1631                     throw new InvalidDestinationException("Unknown subscription exchange:" + name);
1632                 }
1633             }
1634         }
1635     }
1636 
1637     protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
1638                                                  final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
1639                                                  final boolean noConsume, final boolean autoClosethrows JMSException
1640     {
1641         checkTemporaryDestination(destination);
1642 
1643         final String messageSelector;
1644 
1645         if (_strictAMQP && !((selector == null|| selector.equals("")))
1646         {
1647             if (_strictAMQPFATAL)
1648             {
1649                 throw new UnsupportedOperationException("Selectors not currently supported by AMQP.");
1650             }
1651             else
1652             {
1653                 messageSelector = null;
1654             }
1655         }
1656         else
1657         {
1658             messageSelector = selector;
1659         }
1660 
1661         return new FailoverRetrySupport<C, JMSException>(
1662                 new FailoverProtectedOperation<C, JMSException>()
1663                 {
1664                     public C execute() throws JMSException, FailoverException
1665                     {
1666                         checkNotClosed();
1667 
1668                         AMQDestination amqd = (AMQDestinationdestination;
1669 
1670                         final AMQProtocolHandler protocolHandler = getProtocolHandler();
1671                         // TODO: Define selectors in AMQP
1672                         // TODO: construct the rawSelector from the selector string if rawSelector == null
1673                         final FieldTable ft = FieldTableFactory.newFieldTable();
1674                         // if (rawSelector != null)
1675                         // ft.put("headers", rawSelector.getDataAsBytes());
1676                         // rawSelector is used by HeadersExchange and is not a JMS Selector
1677                         if (rawSelector != null
1678                         {
1679                             ft.addAll(rawSelector);
1680                         }
1681                         
1682                         if (messageSelector != null)
1683                         {
1684                             ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
1685                         }
1686 
1687                         C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
1688                                                                               noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
1689 
1690                         if (_messageListener != null)
1691                         {
1692                             consumer.setMessageListener(_messageListener);
1693                         }
1694 
1695                         try
1696                         {
1697                             registerConsumer(consumer, false);
1698                         }
1699                         catch (AMQInvalidArgumentException ise)
1700                         {
1701                             JMSException ex = new InvalidSelectorException(ise.getMessage());
1702                             ex.setLinkedException(ise);
1703                             throw ex;
1704                         }
1705                         catch (AMQInvalidRoutingKeyException e)
1706                         {
1707                             JMSException ide =
1708                                     new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString());
1709                             ide.setLinkedException(e);
1710                             throw ide;
1711                         }
1712                         catch (AMQException e)
1713                         {
1714                             JMSException ex = new JMSException("Error registering consumer: " + e);
1715 
1716                             ex.setLinkedException(e);
1717                             throw ex;
1718                         }
1719 
1720                         synchronized (destination)
1721                         {
1722                             _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
1723                             _destinationConsumerCount.get(destination).incrementAndGet();
1724                         }
1725 
1726                         return consumer;
1727                     }
1728                 }, _connection).execute();
1729     }
1730 
1731     public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
1732                                                                final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments,
1733                                                                final boolean noConsume, final boolean autoClosethrows JMSException;
1734 
1735     /**
1736      * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
1737      * instance.
1738      *
1739      @param consumer the consum
1740      */
1741     void deregisterConsumer(C consumer)
1742     {
1743         if (_consumers.remove(consumer.getConsumerTag()) != null)
1744         {
1745             String subscriptionName = _reverseSubscriptionMap.remove(consumer);
1746             if (subscriptionName != null)
1747             {
1748                 _subscriptions.remove(subscriptionName);
1749             }
1750 
1751             Destination dest = consumer.getDestination();
1752             synchronized (dest)
1753             {
1754                 if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
1755                 {
1756                     _destinationConsumerCount.remove(dest);
1757                 }
1758             }
1759 
1760             // Consumers that are closed in a transaction must be stored
1761             // so that messages they have received can be acknowledged on commit
1762             if (_transacted)
1763             {
1764                 _removedConsumers.add(consumer);
1765             }
1766         }
1767     }
1768 
1769     void deregisterProducer(long producerId)
1770     {
1771         _producers.remove(new Long(producerId));
1772     }
1773 
1774     boolean isInRecovery()
1775     {
1776         return _inRecovery;
1777     }
1778 
1779     boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueNamethrows JMSException
1780     {
1781         return isQueueBound(exchangeName, queueName, null);
1782     }
1783 
1784     /**
1785      * Tests whether or not the specified queue is bound to the specified exchange under a particular routing key.
1786      *
1787      <p/>Note that this operation automatically retries in the event of fail-over.
1788      *
1789      @param exchangeName The exchange name to test for binding against.
1790      @param queueName    The queue name to check if bound.
1791      @param routingKey   The routing key to check if the queue is bound under.
1792      *
1793      @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not.
1794      *
1795      @throws JMSException If the query fails for any reason.
1796      * @todo Be aware of possible changes to parameter order as versions change.
1797      */
1798     public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
1799             throws JMSException;
1800 
1801     public abstract boolean isQueueBound(final AMQDestination destinationthrows JMSException;
1802 
1803     /**
1804      * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
1805      * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex.
1806      */
1807     void markClosed()
1808     {
1809         _closed.set(true);
1810         _connection.deregisterSession(_channelId);
1811         markClosedProducersAndConsumers();
1812 
1813     }
1814 
1815     void failoverPrep()
1816     {
1817         startDispatcherIfNecessary();
1818         final CountDownLatch signal = new CountDownLatch(1);
1819         _queue.add(new Dispatchable() {
1820             public void dispatch(AMQSession ssn)
1821             {
1822                 signal.countDown();
1823             }
1824         });
1825         try
1826         {
1827             signal.await();
1828         }
1829         catch (InterruptedException e)
1830         {
1831             // pass
1832         }
1833     }
1834 
1835     /**
1836      * Resubscribes all producers and consumers. This is called when performing failover.
1837      *
1838      @throws AMQException
1839      */
1840     void resubscribe() throws AMQException
1841     {
1842         if (_dirty)
1843         {
1844             _failedOverDirty = true;
1845         }
1846 
1847         _rollbackMark.set(-1);
1848         resubscribeProducers();
1849         resubscribeConsumers();
1850     }
1851 
1852     void setHasMessageListeners()
1853     {
1854         _hasMessageListeners = true;
1855     }
1856 
1857     void setInRecovery(boolean inRecovery)
1858     {
1859         _inRecovery = inRecovery;
1860     }
1861 
1862     /**
1863      * Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
1864      *
1865      @throws AMQException If the session cannot be started for any reason.
1866      * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the
1867      * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
1868      * for each subsequent call to flow.. only need to do this if we have called stop.
1869      */
1870     void start() throws AMQException
1871     {
1872         // Check if the session has perviously been started and suspended, in which case it must be unsuspended.
1873         if (_startedAtLeastOnce.getAndSet(true))
1874         {
1875             suspendChannel(false);
1876         }
1877 
1878         // If the event dispatcher is not running then start it too.
1879         if (hasMessageListeners())
1880         {
1881             startDispatcherIfNecessary();
1882         }
1883     }
1884 
1885     void startDispatcherIfNecessary()
1886     {
1887         //If we are the dispatcher then we don't need to check we are started
1888         if (Thread.currentThread() == _dispatcherThread)
1889         {
1890             return;
1891         }
1892 
1893         // If IMMEDIATE_PREFETCH is not set then we need to start fetching
1894         // This is final per session so will be multi-thread safe.
1895         if (!_immediatePrefetch)
1896         {
1897             // We do this now if this is the first call on a started connection
1898             if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false))
1899             {
1900                 try
1901                 {
1902                     suspendChannel(false);
1903                 }
1904                 catch (AMQException e)
1905                 {
1906                     _logger.info("Unsuspending channel threw an exception:" + e);
1907                 }
1908             }
1909         }
1910 
1911         startDispatcherIfNecessary(false);
1912     }
1913 
1914     synchronized void startDispatcherIfNecessary(boolean initiallyStopped)
1915     {
1916         if (_dispatcher == null)
1917         {
1918             _dispatcher = new Dispatcher();
1919             try
1920             {
1921                 _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);       
1922                 
1923             }
1924             catch(Exception e)
1925             {
1926                 throw new Error("Error creating Dispatcher thread",e);
1927             }            
1928             _dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
1929             _dispatcherThread.setDaemon(true);
1930             _dispatcher.setConnectionStopped(initiallyStopped);
1931             _dispatcherThread.start();
1932             if (_dispatcherLogger.isInfoEnabled())
1933             {
1934                 _dispatcherLogger.info(_dispatcherThread.getName() " created");
1935             }
1936         }
1937         else
1938         {
1939             _dispatcher.setConnectionStopped(initiallyStopped);
1940         }
1941     }
1942 
1943     void stop() throws AMQException
1944     {
1945         // Stop the server delivering messages to this session.
1946         suspendChannel(true);
1947 
1948         if (_dispatcher != null)
1949         {
1950             _dispatcher.setConnectionStopped(true);
1951         }
1952     }
1953 
1954     /*
1955      * Binds the named queue, with the specified routing key, to the named exchange.
1956      *
1957      * <p/>Note that this operation automatically retries in the event of fail-over.
1958      *
1959      * @param queueName    The name of the queue to bind.
1960      * @param routingKey   The routing key to bind the queue with.
1961      * @param arguments    Additional arguments.
1962      * @param exchangeName The exchange to bind the queue on.
1963      *
1964      * @throws AMQException If the queue cannot be bound for any reason.
1965      */
1966     /*private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft)
1967         throws AMQException, FailoverException
1968     {
1969         AMQFrame queueBind =
1970             QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), ft, // arguments
1971                 amqd.getExchangeName(), // exchange
1972                 false, // nowait
1973                 queueName, // queue
1974                 amqd.getRoutingKey(), // routingKey
1975                 getTicket()); // ticket
1976 
1977         protocolHandler.syncWrite(queueBind, QueueBindOkBody.class);
1978     }*/
1979 
1980     private void checkNotTransacted() throws JMSException
1981     {
1982         if (getTransacted())
1983         {
1984             throw new IllegalStateException("Session is transacted");
1985         }
1986     }
1987 
1988     private void checkTemporaryDestination(Destination destinationthrows JMSException
1989     {
1990         if ((destination instanceof TemporaryDestination))
1991         {
1992             _logger.debug("destination is temporary");
1993             final TemporaryDestination tempDest = (TemporaryDestinationdestination;
1994             if (tempDest.getSession() != this)
1995             {
1996                 _logger.debug("destination is on different session");
1997                 throw new JMSException("Cannot consume from a temporary destination created onanother session");
1998             }
1999 
2000             if (tempDest.isDeleted())
2001             {
2002                 _logger.debug("destination is deleted");
2003                 throw new JMSException("Cannot consume from a deleted destination");
2004             }
2005         }
2006     }
2007 
2008     protected void checkTransacted() throws JMSException
2009     {
2010         if (!getTransacted())
2011         {
2012             throw new IllegalStateException("Session is not transacted");
2013         }
2014     }
2015 
2016     private void checkValidDestination(Destination destinationthrows InvalidDestinationException
2017     {
2018         if (destination == null)
2019         {
2020             throw new javax.jms.InvalidDestinationException("Invalid Queue");
2021         }
2022     }
2023 
2024     private void checkValidQueue(Queue queuethrows InvalidDestinationException
2025     {
2026         if (queue == null)
2027         {
2028             throw new javax.jms.InvalidDestinationException("Invalid Queue");
2029         }
2030     }
2031 
2032     /*
2033      * I could have combined the last 3 methods, but this way it improves readability
2034      */
2035     protected AMQTopic checkValidTopic(Topic topicthrows JMSException
2036     {
2037         if (topic == null)
2038         {
2039             throw new javax.jms.InvalidDestinationException("Invalid Topic");
2040         }
2041 
2042         if ((topic instanceof TemporaryDestination&& (((TemporaryDestinationtopic).getSession() != this))
2043         {
2044             throw new javax.jms.InvalidDestinationException(
2045                     "Cannot create a subscription on a temporary topic created in another session");
2046         }
2047 
2048         if (!(topic instanceof AMQTopic))
2049         {
2050             throw new javax.jms.InvalidDestinationException(
2051                     "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
2052                     + topic.getClass().getName());
2053         }
2054 
2055         return (AMQTopictopic;
2056     }
2057 
2058     /**
2059      * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
2060      *
2061      @param error not null if this is a result of an error occurring at the connection level
2062      */
2063     private void closeConsumers(Throwable errorthrows JMSException
2064     {
2065         // we need to clone the list of consumers since the close() method updates the _consumers collection
2066         // which would result in a concurrent modification exception
2067         final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
2068 
2069         final Iterator<C> it = clonedConsumers.iterator();
2070         while (it.hasNext())
2071         {
2072             final C con = it.next();
2073             if (error != null)
2074             {
2075                 con.notifyError(error);
2076             }
2077             else
2078             {
2079                 con.close(false);
2080             }
2081         }
2082         // at this point the _consumers map will be empty
2083         if (_dispatcher != null)
2084         {
2085             _dispatcher.close();
2086             _dispatcher = null;
2087         }
2088     }
2089 
2090     /**
2091      * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
2092      * currently no way of propagating errors to message producers (this is a JMS limitation).
2093      */
2094     private void closeProducers() throws JMSException
2095     {
2096         // we need to clone the list of producers since the close() method updates the _producers collection
2097         // which would result in a concurrent modification exception
2098         final ArrayList clonedProducers = new ArrayList(_producers.values());
2099 
2100         final Iterator it = clonedProducers.iterator();
2101         while (it.hasNext())
2102         {
2103             final P prod = (Pit.next();
2104             prod.close();
2105         }
2106         // at this point the _producers map is empty
2107     }
2108 
2109     /**
2110      * Close all producers or consumers. This is called either in the error case or when closing the session normally.
2111      *
2112      @param amqe the exception, may be null to indicate no error has occurred
2113      */
2114     private void closeProducersAndConsumers(AMQException amqethrows JMSException
2115     {
2116         JMSException jmse = null;
2117         try
2118         {
2119             closeProducers();
2120         }
2121         catch (JMSException e)
2122         {
2123             _logger.error("Error closing session: " + e, e);
2124             jmse = e;
2125         }
2126 
2127         try
2128         {
2129             closeConsumers(amqe);
2130         }
2131         catch (JMSException e)
2132         {
2133             _logger.error("Error closing session: " + e, e);
2134             if (jmse == null)
2135             {
2136                 jmse = e;
2137             }
2138         }
2139 
2140         if (jmse != null)
2141         {
2142             throw jmse;
2143         }
2144     }
2145 
2146     /**
2147      * Register to consume from the queue.
2148      *
2149      @param queueName
2150      */
2151     private void consumeFromQueue(C consumer, AMQShortString queueName,
2152                                   AMQProtocolHandler protocolHandler, boolean nowait, String messageSelectorthrows AMQException, FailoverException
2153     {
2154         int tagId = _nextTag++;
2155 
2156         consumer.setConsumerTag(tagId);
2157         // we must register the consumer in the map before we actually start listening
2158         _consumers.put(tagId, consumer);
2159 
2160         try
2161         {
2162             sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId);
2163         }
2164         catch (AMQException e)
2165         {
2166             // clean-up the map in the event of an error
2167             _consumers.remove(tagId);
2168             throw e;
2169         }
2170     }
2171 
2172     public abstract void sendConsume(C consumer, AMQShortString queueName,
2173                                      AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tagthrows AMQException, FailoverException;
2174 
2175     private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
2176             throws JMSException
2177     {
2178         return createProducerImpl(destination, mandatory, immediate, false);
2179     }
2180 
2181     private P createProducerImpl(final Destination destination, final boolean mandatory,
2182                                                     final boolean immediate, final boolean waitUntilSentthrows JMSException
2183     {
2184         return new FailoverRetrySupport<P, JMSException>(
2185                 new FailoverProtectedOperation<P, JMSException>()
2186                 {
2187                     public P execute() throws JMSException, FailoverException
2188                     {
2189                         checkNotClosed();
2190                         long producerId = getNextProducerId();
2191                         P producer = createMessageProducer(destination, mandatory,
2192                                                                               immediate, waitUntilSent, producerId);
2193                         registerProducer(producerId, producer);
2194 
2195                         return producer;
2196                     }
2197                 }, _connection).execute();
2198     }
2199 
2200     public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
2201                                                                final boolean immediate, final boolean waitUntilSent, long producerId);
2202 
2203     private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowaitthrows AMQException
2204     {
2205         declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
2206     }
2207 
2208     /**
2209      * Returns the number of messages currently queued for the given destination.
2210      *
2211      <p/>Note that this operation automatically retries in the event of fail-over.
2212      *
2213      @param amqd The destination to be checked
2214      *
2215      @return the number of queued messages.
2216      *
2217      @throws AMQException If the queue cannot be declared for any reason.
2218      */
2219     public long getQueueDepth(final AMQDestination amqd)
2220             throws AMQException
2221     {
2222         return new FailoverNoopSupport<Long, AMQException>(
2223                 new FailoverProtectedOperation<Long, AMQException>()
2224                 {
2225                     public Long execute() throws AMQException, FailoverException
2226                     {
2227                         return requestQueueDepth(amqd);
2228                     }
2229                 }, _connection).execute();
2230 
2231     }
2232 
2233     protected abstract Long requestQueueDepth(AMQDestination amqdthrows AMQException, FailoverException;
2234 
2235     /**
2236      * Declares the named exchange and type of exchange.
2237      *
2238      <p/>Note that this operation automatically retries in the event of fail-over.
2239      *
2240      @param name            The name of the exchange to declare.
2241      @param type            The type of the exchange to declare.
2242      @param protocolHandler The protocol handler to process the communication through.
2243      @param nowait
2244      *
2245      @throws AMQException If the exchange cannot be declared for any reason.
2246      * @todo Be aware of possible changes to parameter order as versions change.
2247      */
2248     private void declareExchange(final AMQShortString name, final AMQShortString type,
2249                                  final AMQProtocolHandler protocolHandler, final boolean nowaitthrows AMQException
2250     {
2251         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
2252         {
2253             public Object execute() throws AMQException, FailoverException
2254             {
2255                 sendExchangeDeclare(name, type, protocolHandler, nowait);
2256                 return null;
2257             }
2258         }, _connection).execute();
2259     }
2260 
2261     public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
2262                                              final boolean nowaitthrows AMQException, FailoverException;
2263 
2264     /**
2265      * Declares a queue for a JMS destination.
2266      *
2267      <p/>Note that for queues but not topics the name is generated in the client rather than the server. This allows
2268      * the name to be reused on failover if required. In general, the destination indicates whether it wants a name
2269      * generated or not.
2270      *
2271      <p/>Note that this operation automatically retries in the event of fail-over.
2272      *
2273      @param amqd            The destination to declare as a queue.
2274      @param protocolHandler The protocol handler to communicate through.
2275      *
2276      @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of
2277      *         the client.
2278      *
2279      @throws AMQException If the queue cannot be declared for any reason.
2280      * @todo Verify the destiation is valid or throw an exception.
2281      * @todo Be aware of possible changes to parameter order as versions change.
2282      */
2283     protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
2284                                           final boolean noLocal)
2285             throws AMQException
2286     {
2287         /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
2288         return new FailoverNoopSupport<AMQShortString, AMQException>(
2289                 new FailoverProtectedOperation<AMQShortString, AMQException>()
2290                 {
2291                     public AMQShortString execute() throws AMQException, FailoverException
2292                     {
2293                         // Generate the queue name if the destination indicates that a client generated name is to be used.
2294                         if (amqd.isNameRequired())
2295                         {
2296                             amqd.setQueueName(protocolHandler.generateQueueName());
2297                         }
2298 
2299                         sendQueueDeclare(amqd, protocolHandler);
2300 
2301                         return amqd.getAMQQueueName();
2302                     }
2303                 }, _connection).execute();
2304     }
2305 
2306     public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandlerthrows AMQException, FailoverException;
2307 
2308     /**
2309      * Undeclares the specified queue.
2310      *
2311      <p/>Note that this operation automatically retries in the event of fail-over.
2312      *
2313      @param queueName The name of the queue to delete.
2314      *
2315      @throws JMSException If the queue could not be deleted for any reason.
2316      * @todo Be aware of possible changes to parameter order as versions change.
2317      */
2318     protected void deleteQueue(final AMQShortString queueNamethrows JMSException
2319     {
2320         try
2321         {
2322             new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
2323             {
2324                 public Object execute() throws AMQException, FailoverException
2325                 {
2326                     sendQueueDelete(queueName);
2327                     return null;
2328                 }
2329             }, _connection).execute();
2330         }
2331         catch (AMQException e)
2332         {
2333             throw new JMSAMQException("The queue deletion failed: " + e.getMessage(), e);
2334         }
2335     }
2336 
2337     public abstract void sendQueueDelete(final AMQShortString queueNamethrows AMQException, FailoverException;
2338 
2339     private long getNextProducerId()
2340     {
2341         return ++_nextProducerId;
2342     }
2343 
2344     protected AMQProtocolHandler getProtocolHandler()
2345     {
2346         return _connection.getProtocolHandler();
2347     }
2348 
2349     public byte getProtocolMajorVersion()
2350     {
2351         return getProtocolHandler().getProtocolMajorVersion();
2352     }
2353 
2354     public byte getProtocolMinorVersion()
2355     {
2356         return getProtocolHandler().getProtocolMinorVersion();
2357     }
2358 
2359     protected boolean hasMessageListeners()
2360     {
2361         return _hasMessageListeners;
2362     }
2363 
2364     private void markClosedConsumers() throws JMSException
2365     {
2366         if (_dispatcher != null)
2367         {
2368             _dispatcher.close();
2369             _dispatcher = null;
2370         }
2371         // we need to clone the list of consumers since the close() method updates the _consumers collection
2372         // which would result in a concurrent modification exception
2373         final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
2374 
2375         final Iterator<C> it = clonedConsumers.iterator();
2376         while (it.hasNext())
2377         {
2378             final C con = it.next();
2379             con.markClosed();
2380         }
2381         // at this point the _consumers map will be empty
2382     }
2383 
2384     private void markClosedProducersAndConsumers()
2385     {
2386         try
2387         {
2388             // no need for a markClosed* method in this case since there is no protocol traffic closing a producer
2389             closeProducers();
2390         }
2391         catch (JMSException e)
2392         {
2393             _logger.error("Error closing session: " + e, e);
2394         }
2395 
2396         try
2397         {
2398             markClosedConsumers();
2399         }
2400         catch (JMSException e)
2401         {
2402             _logger.error("Error closing session: " + e, e);
2403         }
2404     }
2405 
2406     /**
2407      * Callers must hold the failover mutex before calling this method.
2408      *
2409      @param consumer
2410      *
2411      @throws AMQException
2412      */
2413     private void registerConsumer(C consumer, boolean nowaitthrows AMQException // , FailoverException
2414     {
2415         AMQDestination amqd = consumer.getDestination();
2416 
2417         AMQProtocolHandler protocolHandler = getProtocolHandler();
2418 
2419         declareExchange(amqd, protocolHandler, false);
2420 
2421         AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal());
2422 
2423         // store the consumer queue name
2424         consumer.setQueuename(queueName);
2425 
2426         bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd);
2427 
2428         // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
2429         if (!_immediatePrefetch)
2430         {
2431             // The dispatcher will be null if we have just created this session
2432             // so suspend the channel before we register our consumer so that we don't
2433             // start prefetching until a receive/mListener is set.
2434             if (_dispatcher == null)
2435             {
2436                 if (!isSuspended())
2437                 {
2438                     try
2439                     {
2440                         suspendChannel(true);
2441                         _logger.info(
2442                                 "Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
2443                     }
2444                     catch (AMQException e)
2445                     {
2446                         _logger.info("Suspending channel threw an exception:" + e);
2447                     }
2448                 }
2449             }
2450         }
2451         else
2452         {
2453             _logger.info("Immediately prefetching existing messages to new consumer.");
2454         }
2455 
2456         try
2457         {
2458             consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
2459         }
2460         catch (JMSException e// thrown by getMessageSelector
2461         {
2462             throw new AMQException(null, e.getMessage(), e);
2463         }
2464         catch (FailoverException e)
2465         {
2466             throw new AMQException(null, "Fail-over exception interrupted basic consume.", e);
2467         }
2468     }
2469 
2470     private void registerProducer(long producerId, MessageProducer producer)
2471     {
2472         _producers.put(new Long(producerId), producer);
2473     }
2474 
2475     private void rejectAllMessages(boolean requeue)
2476     {
2477         rejectMessagesForConsumerTag(0, requeue, true);
2478     }
2479 
2480     /**
2481      @param consumerTag The consumerTag to prune from queue or all if null
2482      @param requeue     Should the removed messages be requeued (or discarded. Possibly to DLQ)
2483      @param rejectAllConsumers
2484      */
2485 
2486     private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
2487     {
2488         Iterator messages = _queue.iterator();
2489         if (_logger.isInfoEnabled())
2490         {
2491             _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
2492                          + requeue);
2493 
2494             if (messages.hasNext())
2495             {
2496                 _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")");
2497             }
2498             else
2499             {
2500                 _logger.info("No messages in _queue to reject");
2501             }
2502         }
2503         while (messages.hasNext())
2504         {
2505             UnprocessedMessage message = (UnprocessedMessagemessages.next();
2506 
2507             if (rejectAllConsumers || (message.getConsumerTag() == consumerTag))
2508             {
2509                 if (_logger.isDebugEnabled())
2510                 {
2511                     _logger.debug("Removing message(" + System.identityHashCode(message") from _queue DT:"
2512                                   + message.getDeliveryTag());
2513                 }
2514 
2515                 messages.remove();
2516 
2517                 rejectMessage(message, requeue);
2518 
2519                 if (_logger.isDebugEnabled())
2520                 {
2521                     _logger.debug("Rejected the message(" + message.toString() ") for consumer :" + consumerTag);
2522                 }
2523             }
2524         }
2525     }
2526 
2527     private void resubscribeConsumers() throws AMQException
2528     {
2529         ArrayList<C> consumers = new ArrayList<C>(_consumers.values());
2530         _consumers.clear();
2531 
2532         for (C consumer : consumers)
2533         {
2534             consumer.failedOver();
2535             registerConsumer(consumer, true);
2536         }
2537     }
2538 
2539     private void resubscribeProducers() throws AMQException
2540     {
2541         ArrayList producers = new ArrayList(_producers.values());
2542         _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size()))// FIXME: removeKey
2543         for (Iterator it = producers.iterator(); it.hasNext();)
2544         {
2545             P producer = (Pit.next();
2546             producer.resubscribe();
2547         }
2548     }
2549 
2550     /**
2551      * Suspends or unsuspends this session.
2552      *
2553      @param suspend <tt>true</tt> indicates that the session should be suspended, <tt>false<tt> indicates that it
2554      *                should be unsuspended.
2555      *
2556      @throws AMQException If the session cannot be suspended for any reason.
2557      * @todo Be aware of possible changes to parameter order as versions change.
2558      */
2559     protected void suspendChannel(boolean suspendthrows AMQException // , FailoverException
2560     {
2561         synchronized (_suspensionLock)
2562         {
2563             try
2564             {
2565                 if (_logger.isDebugEnabled())
2566                 {
2567                     _logger.debug("Setting channel flow : " (suspend ? "suspended" "unsuspended"));
2568                 }
2569 
2570                 _suspended = suspend;
2571                 sendSuspendChannel(suspend);
2572             }
2573             catch (FailoverException e)
2574             {
2575                 throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e);
2576             }
2577         }
2578     }
2579 
2580     public abstract void sendSuspendChannel(boolean suspendthrows AMQException, FailoverException;
2581 
2582     Object getMessageDeliveryLock()
2583     {
2584         return _messageDeliveryLock;
2585     }
2586 
2587     /**
2588      * Indicates whether this session consumers pre-fetche messages
2589      *
2590      @return true if this session consumers pre-fetche messages false otherwise
2591      */
2592     public boolean prefetch()
2593     {
2594         return getAMQConnection().getMaxPrefetch() 0;
2595     }
2596 
2597     /** Signifies that the session has pending sends to commit. */
2598     public void markDirty()
2599     {
2600         _dirty = true;
2601     }
2602 
2603     /** Signifies that the session has no pending sends to commit. */
2604     public void markClean()
2605     {
2606         _dirty = false;
2607         _failedOverDirty = false;
2608     }
2609 
2610     /**
2611      * Check to see if failover has occured since the last call to markClean(commit or rollback).
2612      *
2613      @return boolean true if failover has occured.
2614      */
2615     public boolean hasFailedOver()
2616     {
2617         return _failedOverDirty;
2618     }
2619 
2620     /**
2621      * Check to see if any message have been sent in this transaction and have not been commited.
2622      *
2623      @return boolean true if a message has been sent but not commited
2624      */
2625     public boolean isDirty()
2626     {
2627         return _dirty;
2628     }
2629 
2630     public void setTicket(int ticket)
2631     {
2632         _ticket = ticket;
2633     }
2634 
2635     public void setFlowControl(final boolean active)
2636     {
2637         _flowControl.setFlowControl(active);
2638     }
2639 
2640     public void checkFlowControl() throws InterruptedException
2641     {
2642         synchronized (_flowControl)
2643         {
2644             while (!_flowControl.getFlowControl())
2645             {
2646                 _flowControl.wait();
2647             }
2648         }
2649 
2650     }
2651 
2652     public interface Dispatchable
2653     {
2654         void dispatch(AMQSession ssn);
2655     }
2656 
2657     public void dispatch(UnprocessedMessage message)
2658     {
2659         if (_dispatcher == null)
2660         {
2661             throw new java.lang.IllegalStateException("dispatcher is not started");
2662         }
2663 
2664         _dispatcher.dispatchMessage(message);
2665     }
2666 
2667     /** Used for debugging in the dispatcher. */
2668     private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
2669 
2670     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
2671     class Dispatcher implements Runnable
2672     {
2673 
2674         /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
2675         private final AtomicBoolean _closed = new AtomicBoolean(false);
2676 
2677         private final Object _lock = new Object();
2678         private String dispatcherID = "" + System.identityHashCode(this);
2679 
2680         public Dispatcher()
2681         {
2682         }
2683 
2684         public void close()
2685         {
2686             _closed.set(true);
2687             _dispatcherThread.interrupt();
2688 
2689             // fixme awaitTermination
2690 
2691         }
2692 
2693         public void rejectPending(C consumer)
2694         {
2695             synchronized (_lock)
2696             {
2697                 boolean stopped = _dispatcher.connectionStopped();
2698 
2699                 if (!stopped)
2700                 {
2701                     _dispatcher.setConnectionStopped(true);
2702                 }
2703 
2704                 // Reject messages on pre-receive queue
2705                 consumer.rollbackPendingMessages();
2706 
2707                 // Reject messages on pre-dispatch queue
2708                 rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
2709                 //Let the dispatcher deal with this when it gets to them.
2710 
2711                 // closeConsumer
2712                 consumer.markClosed();
2713 
2714                 _dispatcher.setConnectionStopped(stopped);
2715 
2716             }
2717         }
2718 
2719         public void rollback()
2720         {
2721 
2722             synchronized (_lock)
2723             {
2724                 boolean isStopped = connectionStopped();
2725 
2726                 if (!isStopped)
2727                 {
2728                     setConnectionStopped(true);
2729                 }
2730 
2731                 _rollbackMark.set(_highestDeliveryTag.get());
2732 
2733                 _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
2734 
2735                 for (C consumer : _consumers.values())
2736                 {
2737                     if (!consumer.isNoConsume())
2738                     {
2739                         consumer.rollback();
2740                     }
2741                     else
2742                     {
2743                         // should perhaps clear the _SQ here.
2744                         // consumer._synchronousQueue.clear();
2745                         consumer.clearReceiveQueue();
2746                     }
2747 
2748                 }
2749 
2750                 for (int i = 0; i < _removedConsumers.size(); i++)
2751                 {
2752                     // Sends acknowledgement to server
2753                     _removedConsumers.get(i).rollback();
2754                     _removedConsumers.remove(i);
2755                 }
2756 
2757                 setConnectionStopped(isStopped);
2758             }
2759 
2760         }
2761 
2762         public void run()
2763         {
2764             if (_dispatcherLogger.isInfoEnabled())
2765             {
2766                 _dispatcherLogger.info(_dispatcherThread.getName() " started");
2767             }
2768 
2769             UnprocessedMessage message;
2770 
2771             // Allow disptacher to start stopped
2772             synchronized (_lock)
2773             {
2774                 while (!_closed.get() && connectionStopped())
2775                 {
2776                     try
2777                     {
2778                         _lock.wait();
2779                     }
2780                     catch (InterruptedException e)
2781                     {
2782                         // ignore
2783                     }
2784                 }
2785             }
2786 
2787             try
2788             {
2789                 Dispatchable disp;
2790                 while (!_closed.get() && ((disp = (Dispatchable_queue.take()) != null))
2791                 {
2792                     disp.dispatch(AMQSession.this);
2793                 }
2794             }
2795             catch (InterruptedException e)
2796             {
2797                 // ignore
2798             }
2799 
2800             if (_dispatcherLogger.isInfoEnabled())
2801             {
2802                 _dispatcherLogger.info(_dispatcherThread.getName() " thread terminating for channel " + _channelId);
2803             }
2804         }
2805 
2806         // only call while holding lock
2807         final boolean connectionStopped()
2808         {
2809             return _connectionStopped;
2810         }
2811 
2812         boolean setConnectionStopped(boolean connectionStopped)
2813         {
2814             boolean currently;
2815             synchronized (_lock)
2816             {
2817                 currently = _connectionStopped;
2818                 _connectionStopped = connectionStopped;
2819                 _lock.notify();
2820 
2821                 if (_dispatcherLogger.isDebugEnabled())
2822                 {
2823                     _dispatcherLogger.debug("Set Dispatcher Connection " (connectionStopped ? "Stopped" "Started")
2824                                             ": Currently " (currently ? "Stopped" "Started"));
2825                 }
2826             }
2827 
2828             return currently;
2829         }
2830 
2831         private void dispatchMessage(UnprocessedMessage message)
2832         {
2833             long deliveryTag = message.getDeliveryTag();
2834 
2835             synchronized (_lock)
2836             {
2837 
2838                 try
2839                 {
2840                     while (connectionStopped())
2841                     {
2842                         _lock.wait();
2843                     }
2844                 }
2845                 catch (InterruptedException e)
2846                 {
2847                     // pass
2848                 }
2849 
2850                 if (!(message instanceof CloseConsumerMessage)
2851                     && tagLE(deliveryTag, _rollbackMark.get()))
2852                 {
2853                     rejectMessage(message, true);
2854                 }
2855                 else
2856                 {
2857                     synchronized (_messageDeliveryLock)
2858                     {
2859                         notifyConsumer(message);
2860                     }
2861                 }
2862             }
2863 
2864             long current = _rollbackMark.get();
2865             if (updateRollbackMark(current, deliveryTag))
2866             {
2867                 _rollbackMark.compareAndSet(current, deliveryTag);
2868             }
2869         }
2870 
2871         private void notifyConsumer(UnprocessedMessage message)
2872         {
2873             final C consumer = _consumers.get(message.getConsumerTag());
2874 
2875             if ((consumer == null|| consumer.isClosed())
2876             {
2877                 if (_dispatcherLogger.isInfoEnabled())
2878                 {
2879                     if (consumer == null)
2880                     {
2881                         _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message("
2882                                                + System.identityHashCode(message")" "["
2883                                                + message.getDeliveryTag() "] from queue "
2884                                                + message.getConsumerTag() " )without a handler - rejecting(requeue)...");
2885                     }
2886                     else
2887                     {
2888                         if (consumer.isNoConsume())
2889                         {
2890                             _dispatcherLogger.info("Received a message("
2891                                                    + System.identityHashCode(message")" "["
2892                                                    + message.getDeliveryTag() "] from queue " " consumer("
2893                                                    + message.getConsumerTag() ") is closed and a browser so dropping...");
2894                             //DROP MESSAGE
2895                             return;
2896 
2897                         }
2898                         else
2899                         {
2900                             _dispatcherLogger.info("Received a message("
2901                                                    + System.identityHashCode(message")" "["
2902                                                    + message.getDeliveryTag() "] from queue " " consumer("
2903                                                    + message.getConsumerTag() ") is closed rejecting(requeue)...");
2904                         }
2905                     }
2906                 }
2907                 // Don't reject if we're already closing
2908                 if (!_closed.get())
2909                 {
2910                     rejectMessage(message, true);
2911                 }
2912             }
2913             else
2914             {
2915                 consumer.notifyMessage(message);
2916             }
2917         }
2918     }
2919 
2920     protected abstract boolean tagLE(long tag1, long tag2);
2921 
2922     protected abstract boolean updateRollbackMark(long current, long deliveryTag);
2923 
2924     public abstract AMQMessageDelegateFactory getMessageDelegateFactory();
2925 
2926     /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
2927         boolean read) throws AMQException
2928     {
2929         getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
2930                 getProtocolMajorVersion(), getProtocolMinorVersion(), active, exclusive, passive, read, realm, write),
2931             new BlockingMethodFrameListener(_channelId)
2932             {
2933 
2934                 public boolean processMethod(int channelId, AMQMethodBody frame) // throws AMQException
2935                 {
2936                     if (frame instanceof AccessRequestOkBody)
2937                     {
2938                         setTicket(((AccessRequestOkBody) frame).getTicket());
2939 
2940                         return true;
2941                     }
2942                     else
2943                     {
2944                         return false;
2945                     }
2946                 }
2947             });
2948     }*/
2949 
2950     private class SuspenderRunner implements Runnable
2951     {
2952         private AtomicBoolean _suspend;
2953 
2954         public SuspenderRunner(AtomicBoolean suspend)
2955         {
2956             _suspend = suspend;
2957         }
2958 
2959         public void run()
2960         {
2961             try
2962             {
2963                 synchronized (_suspensionLock)
2964                 {
2965                     suspendChannel(_suspend.get());
2966                 }
2967             }
2968             catch (AMQException e)
2969             {
2970                 _logger.warn("Unable to suspend channel");
2971             }
2972         }
2973     }
2974 }