0001 /*
0002 *
0003 * Licensed to the Apache Software Foundation (ASF) under one
0004 * or more contributor license agreements. See the NOTICE file
0005 * distributed with this work for additional information
0006 * regarding copyright ownership. The ASF licenses this file
0007 * to you under the Apache License, Version 2.0 (the
0008 * "License"); you may not use this file except in compliance
0009 * with the License. You may obtain a copy of the License at
0010 *
0011 * http://www.apache.org/licenses/LICENSE-2.0
0012 *
0013 * Unless required by applicable law or agreed to in writing,
0014 * software distributed under the License is distributed on an
0015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0016 * KIND, either express or implied. See the License for the
0017 * specific language governing permissions and limitations
0018 * under the License.
0019 *
0020 */
0021 package org.apache.qpid.client;
0022
0023 import java.io.Serializable;
0024 import java.net.URISyntaxException;
0025 import java.text.MessageFormat;
0026 import java.util.ArrayList;
0027 import java.util.Collection;
0028 import java.util.Iterator;
0029 import java.util.Map;
0030 import java.util.concurrent.ConcurrentHashMap;
0031 import java.util.concurrent.ConcurrentLinkedQueue;
0032 import java.util.concurrent.CopyOnWriteArrayList;
0033 import java.util.concurrent.CountDownLatch;
0034 import java.util.concurrent.atomic.AtomicBoolean;
0035 import java.util.concurrent.atomic.AtomicInteger;
0036 import java.util.concurrent.atomic.AtomicLong;
0037
0038 import javax.jms.BytesMessage;
0039 import javax.jms.Destination;
0040 import javax.jms.IllegalStateException;
0041 import javax.jms.InvalidDestinationException;
0042 import javax.jms.InvalidSelectorException;
0043 import javax.jms.JMSException;
0044 import javax.jms.MapMessage;
0045 import javax.jms.MessageConsumer;
0046 import javax.jms.MessageListener;
0047 import javax.jms.MessageProducer;
0048 import javax.jms.ObjectMessage;
0049 import javax.jms.Queue;
0050 import javax.jms.QueueBrowser;
0051 import javax.jms.QueueReceiver;
0052 import javax.jms.QueueSender;
0053 import javax.jms.QueueSession;
0054 import javax.jms.StreamMessage;
0055 import javax.jms.TemporaryQueue;
0056 import javax.jms.TemporaryTopic;
0057 import javax.jms.TextMessage;
0058 import javax.jms.Topic;
0059 import javax.jms.TopicPublisher;
0060 import javax.jms.TopicSession;
0061 import javax.jms.TopicSubscriber;
0062
0063 import org.apache.qpid.AMQDisconnectedException;
0064 import org.apache.qpid.AMQException;
0065 import org.apache.qpid.AMQInvalidArgumentException;
0066 import org.apache.qpid.AMQInvalidRoutingKeyException;
0067 import org.apache.qpid.client.failover.FailoverException;
0068 import org.apache.qpid.client.failover.FailoverNoopSupport;
0069 import org.apache.qpid.client.failover.FailoverProtectedOperation;
0070 import org.apache.qpid.client.failover.FailoverRetrySupport;
0071 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
0072 import org.apache.qpid.client.message.AbstractJMSMessage;
0073 import org.apache.qpid.client.message.CloseConsumerMessage;
0074 import org.apache.qpid.client.message.JMSBytesMessage;
0075 import org.apache.qpid.client.message.JMSMapMessage;
0076 import org.apache.qpid.client.message.JMSObjectMessage;
0077 import org.apache.qpid.client.message.JMSStreamMessage;
0078 import org.apache.qpid.client.message.JMSTextMessage;
0079 import org.apache.qpid.client.message.MessageFactoryRegistry;
0080 import org.apache.qpid.client.message.UnprocessedMessage;
0081 import org.apache.qpid.client.protocol.AMQProtocolHandler;
0082 import org.apache.qpid.client.state.AMQState;
0083 import org.apache.qpid.client.state.AMQStateManager;
0084 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
0085 import org.apache.qpid.framing.AMQShortString;
0086 import org.apache.qpid.framing.FieldTable;
0087 import org.apache.qpid.framing.FieldTableFactory;
0088 import org.apache.qpid.framing.MethodRegistry;
0089 import org.apache.qpid.jms.Session;
0090 import org.apache.qpid.thread.Threading;
0091 import org.apache.qpid.url.AMQBindingURL;
0092 import org.slf4j.Logger;
0093 import org.slf4j.LoggerFactory;
0094
0095 /**
0096 * <p/><table id="crc"><caption>CRC Card</caption>
0097 * <tr><th> Responsibilities <th> Collaborations
0098 * <tr><td>
0099 * </table>
0100 *
0101 * @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For
0102 * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
0103 * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
0104 * the fail-over process, the retry handler could be used to automatically retry the operation once the connection
0105 * has been reestablished. All fail-over protected operations should be placed in private methods, with
0106 * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
0107 * fail-over process sets a nowait flag and uses an async method call instead.
0108 * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
0109 * after looking at worse bottlenecks first.
0110 */
0111 public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
0112 {
0113
0114
0115 public static final class IdToConsumerMap<C extends BasicMessageConsumer>
0116 {
0117 private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
0118 private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
0119
0120 public C get(int id)
0121 {
0122 if ((id & 0xFFFFFFF0) == 0)
0123 {
0124 return (C) _fastAccessConsumers[id];
0125 }
0126 else
0127 {
0128 return _slowAccessConsumers.get(id);
0129 }
0130 }
0131
0132 public C put(int id, C consumer)
0133 {
0134 C oldVal;
0135 if ((id & 0xFFFFFFF0) == 0)
0136 {
0137 oldVal = (C) _fastAccessConsumers[id];
0138 _fastAccessConsumers[id] = consumer;
0139 }
0140 else
0141 {
0142 oldVal = _slowAccessConsumers.put(id, consumer);
0143 }
0144
0145 return consumer;
0146
0147 }
0148
0149 public C remove(int id)
0150 {
0151 C consumer;
0152 if ((id & 0xFFFFFFF0) == 0)
0153 {
0154 consumer = (C) _fastAccessConsumers[id];
0155 _fastAccessConsumers[id] = null;
0156 }
0157 else
0158 {
0159 consumer = _slowAccessConsumers.remove(id);
0160 }
0161
0162 return consumer;
0163
0164 }
0165
0166 public Collection<C> values()
0167 {
0168 ArrayList<C> values = new ArrayList<C>();
0169
0170 for (int i = 0; i < 16; i++)
0171 {
0172 if (_fastAccessConsumers[i] != null)
0173 {
0174 values.add((C) _fastAccessConsumers[i]);
0175 }
0176 }
0177 values.addAll(_slowAccessConsumers.values());
0178
0179 return values;
0180 }
0181
0182 public void clear()
0183 {
0184 _slowAccessConsumers.clear();
0185 for (int i = 0; i < 16; i++)
0186 {
0187 _fastAccessConsumers[i] = null;
0188 }
0189 }
0190 }
0191
0192 /** Used for debugging. */
0193 private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
0194
0195 /**
0196 * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
0197 * not need to be attached to a queue.
0198 */
0199 protected static final boolean DEFAULT_IMMEDIATE = false;
0200
0201 /**
0202 * The default value for mandatory flag used by producers created by this session is true. That is, server will not
0203 * silently drop messages where no queue is connected to the exchange for the message.
0204 */
0205 protected static final boolean DEFAULT_MANDATORY = true;
0206
0207 /** System property to enable strict AMQP compliance. */
0208 public static final String STRICT_AMQP = "STRICT_AMQP";
0209
0210 /** Strict AMQP default setting. */
0211 public static final String STRICT_AMQP_DEFAULT = "false";
0212
0213 /** System property to enable failure if strict AMQP compliance is violated. */
0214 public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
0215
0216 /** Strickt AMQP failure default. */
0217 public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
0218
0219 /** System property to enable immediate message prefetching. */
0220 public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
0221
0222 /** Immediate message prefetch default. */
0223 public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
0224
0225 /** The connection to which this session belongs. */
0226 protected AMQConnection _connection;
0227
0228 /** Used to indicate whether or not this is a transactional session. */
0229 protected boolean _transacted;
0230
0231 /** Holds the sessions acknowledgement mode. */
0232 protected final int _acknowledgeMode;
0233
0234 /** Holds this session unique identifier, used to distinguish it from other sessions. */
0235 protected int _channelId;
0236
0237 private int _ticket;
0238
0239 /** Holds the high mark for prefetched message, at which the session is suspended. */
0240 private int _defaultPrefetchHighMark;
0241
0242 /** Holds the low mark for prefetched messages, below which the session is resumed. */
0243 private int _defaultPrefetchLowMark;
0244
0245 /** Holds the message listener, if any, which is attached to this session. */
0246 private MessageListener _messageListener = null;
0247
0248 /** Used to indicate that this session has been started at least once. */
0249 private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
0250
0251 /**
0252 * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only
0253 * keeps a record of subscriptions which have been created in the current instance. It does not remember
0254 * subscriptions between executions of the client.
0255 */
0256 protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
0257 new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
0258
0259 /**
0260 * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
0261 * up in the {@link #_subscriptions} map.
0262 */
0263 protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap =
0264 new ConcurrentHashMap<C, String>();
0265
0266 /**
0267 * Used to hold incoming messages.
0268 *
0269 * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
0270 */
0271 protected final FlowControllingBlockingQueue _queue;
0272
0273 /** Holds the highest received delivery tag. */
0274 private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
0275 private final AtomicLong _rollbackMark = new AtomicLong(-1);
0276
0277 /** All the not yet acknowledged message tags */
0278 protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
0279
0280 /** All the delivered message tags */
0281 protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
0282
0283 /** Holds the dispatcher thread for this session. */
0284 protected Dispatcher _dispatcher;
0285
0286 protected Thread _dispatcherThread;
0287
0288 /** Holds the message factory factory for this session. */
0289 protected MessageFactoryRegistry _messageFactoryRegistry;
0290
0291 /** Holds all of the producers created by this session, keyed by their unique identifiers. */
0292 private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
0293
0294 /**
0295 * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume
0296 * methods.
0297 */
0298 private int _nextTag = 1;
0299
0300 /**
0301 * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
0302 * consumer.
0303 */
0304 protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
0305
0306 //Map<AMQShortString, BasicMessageConsumer> _consumers =
0307 //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
0308
0309 /**
0310 * Contains a list of consumers which have been removed but which might still have
0311 * messages to acknowledge, eg in client ack or transacted modes
0312 */
0313 private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>();
0314
0315 /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
0316 private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
0317 new ConcurrentHashMap<Destination, AtomicInteger>();
0318
0319 /**
0320 * Used as a source of unique identifiers for producers within the session.
0321 *
0322 * <p/> Access to this id does not require to be synchronized since according to the JMS specification only one
0323 * thread of control is allowed to create producers for any given session instance.
0324 */
0325 private long _nextProducerId;
0326
0327 /**
0328 * Set when recover is called. This is to handle the case where recover() is called by application code during
0329 * onMessage() processing to enure that an auto ack is not sent.
0330 */
0331 private boolean _inRecovery;
0332
0333 /** Used to indicates that the connection to which this session belongs, has been stopped. */
0334 private boolean _connectionStopped;
0335
0336 /** Used to indicate that this session has a message listener attached to it. */
0337 private boolean _hasMessageListeners;
0338
0339 /** Used to indicate that this session has been suspended. */
0340 private boolean _suspended;
0341
0342 /**
0343 * Used to protect the suspension of this session, so that critical code can be executed during suspension,
0344 * without the session being resumed by other threads.
0345 */
0346 private final Object _suspensionLock = new Object();
0347
0348 /**
0349 * Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel.
0350 *
0351 * @todo This is accessed only within a synchronized method, so does not need to be atomic.
0352 */
0353 protected final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
0354
0355 /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
0356 protected final boolean _immediatePrefetch;
0357
0358 /** Indicates that warnings should be generated on violations of the strict AMQP. */
0359 protected final boolean _strictAMQP;
0360
0361 /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
0362 protected final boolean _strictAMQPFATAL;
0363 private final Object _messageDeliveryLock = new Object();
0364
0365 /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
0366 private boolean _dirty;
0367 /** Has failover occured on this session with outstanding actions to commit? */
0368 private boolean _failedOverDirty;
0369
0370 private static final class FlowControlIndicator
0371 {
0372 private volatile boolean _flowControl = true;
0373
0374 public synchronized void setFlowControl(boolean flowControl)
0375 {
0376 _flowControl = flowControl;
0377 notify();
0378 }
0379
0380 public boolean getFlowControl()
0381 {
0382 return _flowControl;
0383 }
0384 }
0385
0386 /** Flow control */
0387 private FlowControlIndicator _flowControl = new FlowControlIndicator();
0388
0389 /**
0390 * Creates a new session on a connection.
0391 *
0392 * @param con The connection on which to create the session.
0393 * @param channelId The unique identifier for the session.
0394 * @param transacted Indicates whether or not the session is transactional.
0395 * @param acknowledgeMode The acknoledgement mode for the session.
0396 * @param messageFactoryRegistry The message factory factory for the session.
0397 * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
0398 * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
0399 */
0400 protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
0401 MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
0402 {
0403
0404 _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
0405 _strictAMQPFATAL =
0406 Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
0407 _immediatePrefetch =
0408 _strictAMQP
0409 || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
0410
0411 _connection = con;
0412 _transacted = transacted;
0413 if (transacted)
0414 {
0415 _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED;
0416 }
0417 else
0418 {
0419 _acknowledgeMode = acknowledgeMode;
0420 }
0421
0422 _channelId = channelId;
0423 _messageFactoryRegistry = messageFactoryRegistry;
0424 _defaultPrefetchHighMark = defaultPrefetchHighMark;
0425 _defaultPrefetchLowMark = defaultPrefetchLowMark;
0426
0427 if (_acknowledgeMode == NO_ACKNOWLEDGE)
0428 {
0429 _queue =
0430 new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
0431 new FlowControllingBlockingQueue.ThresholdListener()
0432 {
0433 private final AtomicBoolean _suspendState = new AtomicBoolean();
0434
0435 public void aboveThreshold(int currentValue)
0436 {
0437 _logger.debug(
0438 "Above threshold(" + _defaultPrefetchHighMark
0439 + ") so suspending channel. Current value is " + currentValue);
0440 _suspendState.set(true);
0441 new Thread(new SuspenderRunner(_suspendState)).start();
0442
0443 }
0444
0445 public void underThreshold(int currentValue)
0446 {
0447 _logger.debug(
0448 "Below threshold(" + _defaultPrefetchLowMark
0449 + ") so unsuspending channel. Current value is " + currentValue);
0450 _suspendState.set(false);
0451 new Thread(new SuspenderRunner(_suspendState)).start();
0452
0453 }
0454 });
0455 }
0456 else
0457 {
0458 _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
0459 }
0460 }
0461
0462 /**
0463 * Creates a new session on a connection with the default message factory factory.
0464 *
0465 * @param con The connection on which to create the session.
0466 * @param channelId The unique identifier for the session.
0467 * @param transacted Indicates whether or not the session is transactional.
0468 * @param acknowledgeMode The acknoledgement mode for the session.
0469 * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
0470 * @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
0471 */
0472 AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
0473 int defaultPrefetchLow)
0474 {
0475 this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
0476 defaultPrefetchLow);
0477 }
0478
0479 // ===== JMS Session methods.
0480
0481 /**
0482 * Closes the session with no timeout.
0483 *
0484 * @throws JMSException If the JMS provider fails to close the session due to some internal error.
0485 */
0486 public void close() throws JMSException
0487 {
0488 close(-1);
0489 }
0490
0491 public void checkNotClosed() throws JMSException
0492 {
0493 try
0494 {
0495 super.checkNotClosed();
0496 }
0497 catch (IllegalStateException ise)
0498 {
0499 // if the Connection has closed then we should throw any exception that has occured that we were not waiting for
0500 AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
0501
0502 if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
0503 {
0504 ise.setLinkedException(manager.getLastException());
0505 }
0506
0507 throw ise;
0508 }
0509 }
0510
0511 public BytesMessage createBytesMessage() throws JMSException
0512 {
0513 checkNotClosed();
0514 return new JMSBytesMessage(getMessageDelegateFactory());
0515 }
0516
0517 /**
0518 * Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
0519 *
0520 * @throws IllegalStateException If the session is closed.
0521 */
0522 public void acknowledge() throws IllegalStateException
0523 {
0524 if (isClosed())
0525 {
0526 throw new IllegalStateException("Session is already closed");
0527 }
0528 else if (hasFailedOver())
0529 {
0530 throw new IllegalStateException("has failed over");
0531 }
0532
0533 while (true)
0534 {
0535 Long tag = _unacknowledgedMessageTags.poll();
0536 if (tag == null)
0537 {
0538 break;
0539 }
0540 acknowledgeMessage(tag, false);
0541 }
0542 }
0543
0544 /**
0545 * Acknowledge one or many messages.
0546 *
0547 * @param deliveryTag The tag of the last message to be acknowledged.
0548 * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the
0549 * delivery tag, <tt>false</tt> to just acknowledge that message.
0550 *
0551 * @todo Be aware of possible changes to parameter order as versions change.
0552 */
0553 public abstract void acknowledgeMessage(long deliveryTag, boolean multiple);
0554
0555 public MethodRegistry getMethodRegistry()
0556 {
0557 MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
0558 return methodRegistry;
0559 }
0560
0561 /**
0562 * Binds the named queue, with the specified routing key, to the named exchange.
0563 *
0564 * <p/>Note that this operation automatically retries in the event of fail-over.
0565 *
0566 * @param queueName The name of the queue to bind.
0567 * @param routingKey The routing key to bind the queue with.
0568 * @param arguments Additional arguments.
0569 * @param exchangeName The exchange to bind the queue on.
0570 *
0571 * @throws AMQException If the queue cannot be bound for any reason.
0572 * @todo Be aware of possible changes to parameter order as versions change.
0573 * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
0574 */
0575 public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
0576 final AMQShortString exchangeName, final AMQDestination destination) throws AMQException
0577 {
0578 /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
0579 new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
0580 {
0581 public Object execute() throws AMQException, FailoverException
0582 {
0583 sendQueueBind(queueName, routingKey, arguments, exchangeName, destination);
0584 return null;
0585 }
0586 }, _connection).execute();
0587 }
0588
0589 public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException
0590 {
0591 if (consumer.getQueuename() != null)
0592 {
0593 bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(), amqd);
0594 }
0595 }
0596
0597 public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
0598 final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException;
0599
0600 /**
0601 * Closes the session.
0602 *
0603 * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close
0604 * the channel. This is because the channel is marked as closed before the request to close it is made, so the
0605 * fail-over should not re-open it.
0606 *
0607 * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
0608 *
0609 * @throws JMSException If the JMS provider fails to close the session due to some internal error.
0610 * @todo Be aware of possible changes to parameter order as versions change.
0611 * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be
0612 * re-opened. May need to examine this more carefully.
0613 * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover,
0614 * because the failover process sends the failover event before acquiring the mutex itself.
0615 */
0616 public void close(long timeout) throws JMSException
0617 {
0618 if (_logger.isInfoEnabled())
0619 {
0620 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
0621 _logger.info("Closing session: " + this); // + ":"
0622 // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
0623 }
0624
0625 // Ensure we only try and close an open session.
0626 if (!_closed.getAndSet(true))
0627 {
0628 synchronized (getFailoverMutex())
0629 {
0630 // We must close down all producers and consumers in an orderly fashion. This is the only method
0631 // that can be called from a different thread of control from the one controlling the session.
0632 synchronized (_messageDeliveryLock)
0633 {
0634 // we pass null since this is not an error case
0635 closeProducersAndConsumers(null);
0636
0637 try
0638 {
0639 sendClose(timeout);
0640 }
0641 catch (AMQException e)
0642 {
0643 JMSException jmse = new JMSException("Error closing session: " + e);
0644 jmse.setLinkedException(e);
0645 throw jmse;
0646 }
0647 // This is ignored because the channel is already marked as closed so the fail-over process will
0648 // not re-open it.
0649 catch (FailoverException e)
0650 {
0651 _logger.debug(
0652 "Got FailoverException during channel close, ignored as channel already marked as closed.");
0653 }
0654 finally
0655 {
0656 _connection.deregisterSession(_channelId);
0657 }
0658 }
0659 }
0660 }
0661 }
0662
0663 public abstract void sendClose(long timeout) throws AMQException, FailoverException;
0664
0665 /**
0666 * Called when the server initiates the closure of the session unilaterally.
0667 *
0668 * @param e the exception that caused this session to be closed. Null causes the
0669 */
0670 public void closed(Throwable e) throws JMSException
0671 {
0672 // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
0673 // calls through connection.closeAllSessions which is also called by the public connection.close()
0674 // with a null cause
0675 // When we are closing the Session due to a protocol session error we simply create a new AMQException
0676 // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
0677 // We need to determin here if the connection should be
0678
0679 if (e instanceof AMQDisconnectedException)
0680 {
0681 if (_dispatcher != null)
0682 {
0683 // Failover failed and ain't coming back. Knife the dispatcher.
0684 _dispatcherThread.interrupt();
0685 }
0686 }
0687
0688 if (!_closed.getAndSet(true))
0689 {
0690 synchronized (getFailoverMutex())
0691 {
0692 synchronized (_messageDeliveryLock)
0693 {
0694 // An AMQException has an error code and message already and will be passed in when closure occurs as a
0695 // result of a channel close request
0696 AMQException amqe;
0697 if (e instanceof AMQException)
0698 {
0699 amqe = (AMQException) e;
0700 }
0701 else
0702 {
0703 amqe = new AMQException("Closing session forcibly", e);
0704 }
0705
0706 _connection.deregisterSession(_channelId);
0707 closeProducersAndConsumers(amqe);
0708 }
0709 }
0710 }
0711 }
0712
0713 /**
0714 * Commits all messages done in this transaction and releases any locks currently held.
0715 *
0716 * <p/>If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the
0717 * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown.
0718 * The client will be unable to determine whether or not the commit actually happened on the broker in this case.
0719 *
0720 * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
0721 * not mean that the commit is known to have failed, merely that it is not known whether it
0722 * failed or not.
0723 * @todo Be aware of possible changes to parameter order as versions change.
0724 */
0725 public void commit() throws JMSException
0726 {
0727 checkTransacted();
0728
0729 try
0730 {
0731
0732 // TGM FIXME: what about failover?
0733 // Acknowledge all delivered messages
0734 while (true)
0735 {
0736 Long tag = _deliveredMessageTags.poll();
0737 if (tag == null)
0738 {
0739 break;
0740 }
0741
0742 acknowledgeMessage(tag, false);
0743 }
0744 // Commits outstanding messages and acknowledgments
0745 sendCommit();
0746 markClean();
0747 }
0748 catch (AMQException e)
0749 {
0750 throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
0751 }
0752 catch (FailoverException e)
0753 {
0754 throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
0755 }
0756 }
0757
0758 public abstract void sendCommit() throws AMQException, FailoverException;
0759
0760
0761 public void confirmConsumerCancelled(int consumerTag)
0762 {
0763
0764 // Remove the consumer from the map
0765 C consumer = _consumers.get(consumerTag);
0766 if (consumer != null)
0767 {
0768 if (!consumer.isNoConsume()) // Normal Consumer
0769 {
0770 // Clean the Maps up first
0771 // Flush any pending messages for this consumerTag
0772 if (_dispatcher != null)
0773 {
0774 _logger.info("Dispatcher is not null");
0775 }
0776 else
0777 {
0778 _logger.info("Dispatcher is null so created stopped dispatcher");
0779 startDispatcherIfNecessary(true);
0780 }
0781
0782 _dispatcher.rejectPending(consumer);
0783 }
0784 else // Queue Browser
0785 {
0786 // Just close the consumer
0787 // fixme the CancelOK is being processed before the arriving messages..
0788 // The dispatcher is still to process them so the server sent in order but the client
0789 // has yet to receive before the close comes in.
0790
0791 // consumer.markClosed();
0792
0793 if (consumer.isAutoClose())
0794 {
0795 // There is a small window where the message is between the two queues in the dispatcher.
0796 if (consumer.isClosed())
0797 {
0798 if (_logger.isInfoEnabled())
0799 {
0800 _logger.info("Closing consumer:" + consumer.debugIdentity());
0801 }
0802
0803 deregisterConsumer(consumer);
0804 }
0805 else
0806 {
0807 _queue.add(new CloseConsumerMessage(consumer));
0808 }
0809 }
0810 }
0811 }
0812 }
0813
0814 public QueueBrowser createBrowser(Queue queue) throws JMSException
0815 {
0816 if (isStrictAMQP())
0817 {
0818 throw new UnsupportedOperationException();
0819 }
0820
0821 return createBrowser(queue, null);
0822 }
0823
0824 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
0825 {
0826 if (isStrictAMQP())
0827 {
0828 throw new UnsupportedOperationException();
0829 }
0830
0831 checkNotClosed();
0832 checkValidQueue(queue);
0833
0834 return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
0835 }
0836
0837 public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
0838 throws JMSException
0839 {
0840 checkValidDestination(destination);
0841
0842 return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
0843 messageSelector, null, true, true);
0844 }
0845
0846 public MessageConsumer createConsumer(Destination destination) throws JMSException
0847 {
0848 checkValidDestination(destination);
0849
0850 return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), null, null,
0851 false, false);
0852 }
0853
0854 public C createExclusiveConsumer(Destination destination) throws JMSException
0855 {
0856 checkValidDestination(destination);
0857
0858 return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
0859 false, false);
0860 }
0861
0862 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
0863 {
0864 checkValidDestination(destination);
0865
0866 return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic),
0867 messageSelector, null, false, false);
0868 }
0869
0870 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
0871 throws JMSException
0872 {
0873 checkValidDestination(destination);
0874
0875 return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, (destination instanceof Topic),
0876 messageSelector, null, false, false);
0877 }
0878
0879 public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
0880 throws JMSException
0881 {
0882 checkValidDestination(destination);
0883
0884 return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
0885 messageSelector, null, false, false);
0886 }
0887
0888 public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
0889 String selector) throws JMSException
0890 {
0891 checkValidDestination(destination);
0892
0893 return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false);
0894 }
0895
0896 public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
0897 boolean exclusive, String selector) throws JMSException
0898 {
0899 checkValidDestination(destination);
0900
0901 return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
0902 }
0903
0904 public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
0905 String selector, FieldTable rawSelector) throws JMSException
0906 {
0907 checkValidDestination(destination);
0908
0909 return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false);
0910 }
0911
0912 public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
0913 boolean exclusive, String selector, FieldTable rawSelector) throws JMSException
0914 {
0915 checkValidDestination(destination);
0916
0917 return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false,
0918 false);
0919 }
0920
0921 public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
0922
0923 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
0924 throws JMSException
0925 {
0926 checkNotClosed();
0927 checkValidTopic(topic);
0928 if (_subscriptions.containsKey(name))
0929 {
0930 _subscriptions.get(name).close();
0931 }
0932 AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
0933 C consumer = (C) createConsumer(dest, messageSelector, noLocal);
0934 TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer);
0935 _subscriptions.put(name, subscriber);
0936 _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
0937
0938 return subscriber;
0939 }
0940
0941 public MapMessage createMapMessage() throws JMSException
0942 {
0943 checkNotClosed();
0944 return new JMSMapMessage(getMessageDelegateFactory());
0945 }
0946
0947 public javax.jms.Message createMessage() throws JMSException
0948 {
0949 return createBytesMessage();
0950 }
0951
0952 public ObjectMessage createObjectMessage() throws JMSException
0953 {
0954 checkNotClosed();
0955 return (ObjectMessage) new JMSObjectMessage(getMessageDelegateFactory());
0956 }
0957
0958 public ObjectMessage createObjectMessage(Serializable object) throws JMSException
0959 {
0960 ObjectMessage msg = createObjectMessage();
0961 msg.setObject(object);
0962
0963 return msg;
0964 }
0965
0966 public P createProducer(Destination destination) throws JMSException
0967 {
0968 return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
0969 }
0970
0971 public P createProducer(Destination destination, boolean immediate) throws JMSException
0972 {
0973 return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
0974 }
0975
0976 public P createProducer(Destination destination, boolean mandatory, boolean immediate)
0977 throws JMSException
0978 {
0979 return createProducerImpl(destination, mandatory, immediate);
0980 }
0981
0982 public P createProducer(Destination destination, boolean mandatory, boolean immediate,
0983 boolean waitUntilSent) throws JMSException
0984 {
0985 return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
0986 }
0987
0988 public TopicPublisher createPublisher(Topic topic) throws JMSException
0989 {
0990 checkNotClosed();
0991
0992 return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic);
0993 }
0994
0995 public Queue createQueue(String queueName) throws JMSException
0996 {
0997 checkNotClosed();
0998 if (queueName.indexOf('/') == -1)
0999 {
1000 return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName));
1001 }
1002 else
1003 {
1004 try
1005 {
1006 return new AMQQueue(new AMQBindingURL(queueName));
1007 }
1008 catch (URISyntaxException urlse)
1009 {
1010 JMSException jmse = new JMSException(urlse.getReason());
1011 jmse.setLinkedException(urlse);
1012
1013 throw jmse;
1014 }
1015 }
1016 }
1017
1018 /**
1019 * Declares the named queue.
1020 *
1021 * <p/>Note that this operation automatically retries in the event of fail-over.
1022 *
1023 * @param name The name of the queue to declare.
1024 * @param autoDelete
1025 * @param durable Flag to indicate that the queue is durable.
1026 * @param exclusive Flag to indicate that the queue is exclusive to this client.
1027 *
1028 * @throws AMQException If the queue cannot be declared for any reason.
1029 * @todo Be aware of possible changes to parameter order as versions change.
1030 */
1031 public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
1032 final boolean exclusive) throws AMQException
1033 {
1034 createQueue(name, autoDelete, durable, exclusive, null);
1035 }
1036
1037 /**
1038 * Declares the named queue.
1039 *
1040 * <p/>Note that this operation automatically retries in the event of fail-over.
1041 *
1042 * @param name The name of the queue to declare.
1043 * @param autoDelete
1044 * @param durable Flag to indicate that the queue is durable.
1045 * @param exclusive Flag to indicate that the queue is exclusive to this client.
1046 * @param arguments Arguments used to set special properties of the queue
1047 *
1048 * @throws AMQException If the queue cannot be declared for any reason.
1049 * @todo Be aware of possible changes to parameter order as versions change.
1050 */
1051 public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
1052 final boolean exclusive, final Map<String, Object> arguments) throws AMQException
1053 {
1054 new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
1055 {
1056 public Object execute() throws AMQException, FailoverException
1057 {
1058 sendCreateQueue(name, autoDelete, durable, exclusive, arguments);
1059 return null;
1060 }
1061 }, _connection).execute();
1062 }
1063
1064 public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
1065 final boolean exclusive, final Map<String, Object> arguments) throws AMQException, FailoverException;
1066
1067 /**
1068 * Creates a QueueReceiver
1069 *
1070 * @param destination
1071 *
1072 * @return QueueReceiver - a wrapper around our MessageConsumer
1073 *
1074 * @throws JMSException
1075 */
1076 public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
1077 {
1078 checkValidDestination(destination);
1079 AMQQueue dest = (AMQQueue) destination;
1080 C consumer = (C) createConsumer(destination);
1081
1082 return new QueueReceiverAdaptor(dest, consumer);
1083 }
1084
1085 /**
1086 * Creates a QueueReceiver using a message selector
1087 *
1088 * @param destination
1089 * @param messageSelector
1090 *
1091 * @return QueueReceiver - a wrapper around our MessageConsumer
1092 *
1093 * @throws JMSException
1094 */
1095 public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
1096 {
1097 checkValidDestination(destination);
1098 AMQQueue dest = (AMQQueue) destination;
1099 C consumer = (C) createConsumer(destination, messageSelector);
1100
1101 return new QueueReceiverAdaptor(dest, consumer);
1102 }
1103
1104 /**
1105 * Creates a QueueReceiver wrapping a MessageConsumer
1106 *
1107 * @param queue
1108 *
1109 * @return QueueReceiver
1110 *
1111 * @throws JMSException
1112 */
1113 public QueueReceiver createReceiver(Queue queue) throws JMSException
1114 {
1115 checkNotClosed();
1116 AMQQueue dest = (AMQQueue) queue;
1117 C consumer = (C) createConsumer(dest);
1118
1119 return new QueueReceiverAdaptor(dest, consumer);
1120 }
1121
1122 /**
1123 * Creates a QueueReceiver wrapping a MessageConsumer using a message selector
1124 *
1125 * @param queue
1126 * @param messageSelector
1127 *
1128 * @return QueueReceiver
1129 *
1130 * @throws JMSException
1131 */
1132 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
1133 {
1134 checkNotClosed();
1135 AMQQueue dest = (AMQQueue) queue;
1136 C consumer = (C) createConsumer(dest, messageSelector);
1137
1138 return new QueueReceiverAdaptor(dest, consumer);
1139 }
1140
1141 public QueueSender createSender(Queue queue) throws JMSException
1142 {
1143 checkNotClosed();
1144
1145 // return (QueueSender) createProducer(queue);
1146 return new QueueSenderAdapter(createProducer(queue), queue);
1147 }
1148
1149 public StreamMessage createStreamMessage() throws JMSException
1150 {
1151 // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
1152 // calls through connection.closeAllSessions which is also called by the public connection.close()
1153 // with a null cause
1154 // When we are closing the Session due to a protocol session error we simply create a new AMQException
1155 // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
1156 // We need to determin here if the connection should be
1157
1158 synchronized (getFailoverMutex())
1159 {
1160 checkNotClosed();
1161
1162 return new JMSStreamMessage(getMessageDelegateFactory());
1163 }
1164 }
1165
1166 /**
1167 * Creates a non-durable subscriber
1168 *
1169 * @param topic
1170 *
1171 * @return TopicSubscriber - a wrapper round our MessageConsumer
1172 *
1173 * @throws JMSException
1174 */
1175 public TopicSubscriber createSubscriber(Topic topic) throws JMSException
1176 {
1177 checkNotClosed();
1178 AMQTopic dest = checkValidTopic(topic);
1179
1180 // AMQTopic dest = new AMQTopic(topic.getTopicName());
1181 return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
1182 }
1183
1184 /**
1185 * Creates a non-durable subscriber with a message selector
1186 *
1187 * @param topic
1188 * @param messageSelector
1189 * @param noLocal
1190 *
1191 * @return TopicSubscriber - a wrapper round our MessageConsumer
1192 *
1193 * @throws JMSException
1194 */
1195 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
1196 {
1197 checkNotClosed();
1198 AMQTopic dest = checkValidTopic(topic);
1199
1200 // AMQTopic dest = new AMQTopic(topic.getTopicName());
1201 return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
1202 }
1203
1204 public TemporaryQueue createTemporaryQueue() throws JMSException
1205 {
1206 checkNotClosed();
1207 try
1208 {
1209 AMQTemporaryQueue result = new AMQTemporaryQueue(this);
1210
1211 // this is done so that we can produce to a temporary queue before we create a consumer
1212 result.setQueueName(result.getRoutingKey());
1213 createQueue(result.getAMQQueueName(), result.isAutoDelete(),
1214 result.isDurable(), result.isExclusive());
1215 bindQueue(result.getAMQQueueName(), result.getRoutingKey(),
1216 new FieldTable(), result.getExchangeName(), result);
1217 return result;
1218 }
1219 catch (Exception e)
1220 {
1221 JMSException ex = new JMSException("Cannot create temporary queue");
1222 ex.setLinkedException(e);
1223 e.printStackTrace();
1224 throw ex;
1225 }
1226 }
1227
1228 public TemporaryTopic createTemporaryTopic() throws JMSException
1229 {
1230 checkNotClosed();
1231
1232 return new AMQTemporaryTopic(this);
1233 }
1234
1235 public TextMessage createTextMessage() throws JMSException
1236 {
1237 synchronized (getFailoverMutex())
1238 {
1239 checkNotClosed();
1240
1241 return new JMSTextMessage(getMessageDelegateFactory());
1242 }
1243 }
1244
1245 protected Object getFailoverMutex()
1246 {
1247 return _connection.getFailoverMutex();
1248 }
1249
1250 public TextMessage createTextMessage(String text) throws JMSException
1251 {
1252
1253 TextMessage msg = createTextMessage();
1254 msg.setText(text);
1255
1256 return msg;
1257 }
1258
1259 public Topic createTopic(String topicName) throws JMSException
1260 {
1261 checkNotClosed();
1262
1263 if (topicName.indexOf('/') == -1)
1264 {
1265 return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
1266 }
1267 else
1268 {
1269 try
1270 {
1271 return new AMQTopic(new AMQBindingURL(topicName));
1272 }
1273 catch (URISyntaxException urlse)
1274 {
1275 JMSException jmse = new JMSException(urlse.getReason());
1276 jmse.setLinkedException(urlse);
1277
1278 throw jmse;
1279 }
1280 }
1281 }
1282
1283 public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
1284 {
1285 declareExchange(name, type, getProtocolHandler(), nowait);
1286 }
1287
1288 public int getAcknowledgeMode() throws JMSException
1289 {
1290 checkNotClosed();
1291
1292 return _acknowledgeMode;
1293 }
1294
1295 public AMQConnection getAMQConnection()
1296 {
1297 return _connection;
1298 }
1299
1300 public int getChannelId()
1301 {
1302 return _channelId;
1303 }
1304
1305 public int getDefaultPrefetch()
1306 {
1307 return _defaultPrefetchHighMark;
1308 }
1309
1310 public int getDefaultPrefetchHigh()
1311 {
1312 return _defaultPrefetchHighMark;
1313 }
1314
1315 public int getDefaultPrefetchLow()
1316 {
1317 return _defaultPrefetchLowMark;
1318 }
1319
1320 public AMQShortString getDefaultQueueExchangeName()
1321 {
1322 return _connection.getDefaultQueueExchangeName();
1323 }
1324
1325 public AMQShortString getDefaultTopicExchangeName()
1326 {
1327 return _connection.getDefaultTopicExchangeName();
1328 }
1329
1330 public MessageListener getMessageListener() throws JMSException
1331 {
1332 // checkNotClosed();
1333 return _messageListener;
1334 }
1335
1336 public AMQShortString getTemporaryQueueExchangeName()
1337 {
1338 return _connection.getTemporaryQueueExchangeName();
1339 }
1340
1341 public AMQShortString getTemporaryTopicExchangeName()
1342 {
1343 return _connection.getTemporaryTopicExchangeName();
1344 }
1345
1346 public int getTicket()
1347 {
1348 return _ticket;
1349 }
1350
1351 public boolean getTransacted() throws JMSException
1352 {
1353 checkNotClosed();
1354
1355 return _transacted;
1356 }
1357
1358 public boolean hasConsumer(Destination destination)
1359 {
1360 AtomicInteger counter = _destinationConsumerCount.get(destination);
1361
1362 return (counter != null) && (counter.get() != 0);
1363 }
1364
1365 public boolean isStrictAMQP()
1366 {
1367 return _strictAMQP;
1368 }
1369
1370 public boolean isSuspended()
1371 {
1372 return _suspended;
1373 }
1374
1375 protected void addUnacknowledgedMessage(long id)
1376 {
1377 _unacknowledgedMessageTags.add(id);
1378 }
1379
1380 protected void addDeliveredMessage(long id)
1381 {
1382 _deliveredMessageTags.add(id);
1383 }
1384
1385 /**
1386 * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto
1387 * the queue read by the dispatcher.
1388 *
1389 * @param message the message that has been received
1390 */
1391 public void messageReceived(UnprocessedMessage message)
1392 {
1393 if (_logger.isDebugEnabled())
1394 {
1395 _logger.debug("Message[" + message.toString() + "] received in session");
1396 }
1397 _highestDeliveryTag.set(message.getDeliveryTag());
1398 _queue.add(message);
1399 }
1400
1401 public void declareAndBind(AMQDestination amqd)
1402 throws
1403 AMQException
1404 {
1405 AMQProtocolHandler protocolHandler = getProtocolHandler();
1406 declareExchange(amqd, protocolHandler, false);
1407 AMQShortString queueName = declareQueue(amqd, protocolHandler, false);
1408 bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
1409 }
1410
1411 /**
1412 * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
1413 *
1414 * <p/>All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges
1415 * all messages that have been delivered to the client.
1416 *
1417 * <p/>Restarting a session causes it to take the following actions:
1418 *
1419 * <ul>
1420 * <li>Stop message delivery.</li>
1421 * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered".
1422 * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
1423 * Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
1424 * </ul>
1425 *
1426 * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
1427 * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible
1428 * for the client to determine whether the broker is going to recover the session or not.
1429 *
1430 * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error.
1431 * Not that this does not necessarily mean that the recovery has failed, but simply that it is
1432 * not possible to tell if it has or not.
1433 * @todo Be aware of possible changes to parameter order as versions change.
1434 */
1435 public void recover() throws JMSException
1436 {
1437 // Ensure that the session is open.
1438 checkNotClosed();
1439
1440 // Ensure that the session is not transacted.
1441 checkNotTransacted();
1442
1443 // this is set only here, and the before the consumer's onMessage is called it is set to false
1444 _inRecovery = true;
1445 try
1446 {
1447
1448 boolean isSuspended = isSuspended();
1449
1450 if (!isSuspended)
1451 {
1452 suspendChannel(true);
1453 }
1454
1455 if (_dispatcher != null)
1456 {
1457 _dispatcher.rollback();
1458 }
1459
1460 sendRecover();
1461
1462 if (!isSuspended)
1463 {
1464 suspendChannel(false);
1465 }
1466 }
1467 catch (AMQException e)
1468 {
1469 throw new JMSAMQException("Recover failed: " + e.getMessage(), e);
1470 }
1471 catch (FailoverException e)
1472 {
1473 throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
1474 }
1475 }
1476
1477 protected abstract void sendRecover() throws AMQException, FailoverException;
1478
1479 public void rejectMessage(UnprocessedMessage message, boolean requeue)
1480 {
1481
1482 if (_logger.isDebugEnabled())
1483 {
1484 _logger.debug("Rejecting Unacked message:" + message.getDeliveryTag());
1485 }
1486
1487 rejectMessage(message.getDeliveryTag(), requeue);
1488 }
1489
1490 public void rejectMessage(AbstractJMSMessage message, boolean requeue)
1491 {
1492 if (_logger.isDebugEnabled())
1493 {
1494 _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
1495 }
1496
1497 rejectMessage(message.getDeliveryTag(), requeue);
1498
1499 }
1500
1501 public abstract void rejectMessage(long deliveryTag, boolean requeue);
1502
1503 /**
1504 * Commits all messages done in this transaction and releases any locks currently held.
1505 *
1506 * <p/>If the rollback fails, because the rollback itself is interrupted by a fail-over between requesting that the
1507 * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown.
1508 * The client will be unable to determine whether or not the rollback actually happened on the broker in this case.
1509 *
1510 * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does
1511 * not mean that the rollback is known to have failed, merely that it is not known whether it
1512 * failed or not.
1513 * @todo Be aware of possible changes to parameter order as versions change.
1514 */
1515 public void rollback() throws JMSException
1516 {
1517 synchronized (_suspensionLock)
1518 {
1519 checkTransacted();
1520
1521 try
1522 {
1523 boolean isSuspended = isSuspended();
1524
1525 if (!isSuspended)
1526 {
1527 suspendChannel(true);
1528 }
1529
1530 releaseForRollback();
1531
1532 sendRollback();
1533
1534 markClean();
1535
1536 if (!isSuspended)
1537 {
1538 suspendChannel(false);
1539 }
1540 }
1541 catch (AMQException e)
1542 {
1543 throw new JMSAMQException("Failed to rollback: " + e, e);
1544 }
1545 catch (FailoverException e)
1546 {
1547 throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e);
1548 }
1549 }
1550 }
1551
1552 public abstract void releaseForRollback();
1553
1554 public abstract void sendRollback() throws AMQException, FailoverException;
1555
1556 public void run()
1557 {
1558 throw new java.lang.UnsupportedOperationException();
1559 }
1560
1561 public void setMessageListener(MessageListener listener) throws JMSException
1562 {
1563 // checkNotClosed();
1564 //
1565 // if (_dispatcher != null && !_dispatcher.connectionStopped())
1566 // {
1567 // throw new javax.njms.IllegalStateException("Attempt to set listener while session is started.");
1568 // }
1569 //
1570 // // We are stopped
1571 // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
1572 // {
1573 // BasicMessageConsumer consumer = i.next();
1574 //
1575 // if (consumer.isReceiving())
1576 // {
1577 // throw new javax.njms.IllegalStateException("Another thread is already receiving synchronously.");
1578 // }
1579 // }
1580 //
1581 // _messageListener = listener;
1582 //
1583 // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
1584 // {
1585 // i.next().setMessageListener(_messageListener);
1586 // }
1587
1588 }
1589
1590 /*public void setTicket(int ticket)
1591 {
1592 _ticket = ticket;
1593 }*/
1594
1595 public void unsubscribe(String name) throws JMSException
1596 {
1597 checkNotClosed();
1598 TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
1599 if (subscriber != null)
1600 {
1601 // send a queue.delete for the subscription
1602 deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
1603 _subscriptions.remove(name);
1604 _reverseSubscriptionMap.remove(subscriber);
1605 }
1606 else
1607 {
1608 if (_strictAMQP)
1609 {
1610 if (_strictAMQPFATAL)
1611 {
1612 throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
1613 }
1614 else
1615 {
1616 _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
1617 + " Requesting queue deletion regardless.");
1618 }
1619
1620 deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
1621 }
1622 else // Queue Browser
1623 {
1624
1625 if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection)))
1626 {
1627 deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
1628 }
1629 else
1630 {
1631 throw new InvalidDestinationException("Unknown subscription exchange:" + name);
1632 }
1633 }
1634 }
1635 }
1636
1637 protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
1638 final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
1639 final boolean noConsume, final boolean autoClose) throws JMSException
1640 {
1641 checkTemporaryDestination(destination);
1642
1643 final String messageSelector;
1644
1645 if (_strictAMQP && !((selector == null) || selector.equals("")))
1646 {
1647 if (_strictAMQPFATAL)
1648 {
1649 throw new UnsupportedOperationException("Selectors not currently supported by AMQP.");
1650 }
1651 else
1652 {
1653 messageSelector = null;
1654 }
1655 }
1656 else
1657 {
1658 messageSelector = selector;
1659 }
1660
1661 return new FailoverRetrySupport<C, JMSException>(
1662 new FailoverProtectedOperation<C, JMSException>()
1663 {
1664 public C execute() throws JMSException, FailoverException
1665 {
1666 checkNotClosed();
1667
1668 AMQDestination amqd = (AMQDestination) destination;
1669
1670 final AMQProtocolHandler protocolHandler = getProtocolHandler();
1671 // TODO: Define selectors in AMQP
1672 // TODO: construct the rawSelector from the selector string if rawSelector == null
1673 final FieldTable ft = FieldTableFactory.newFieldTable();
1674 // if (rawSelector != null)
1675 // ft.put("headers", rawSelector.getDataAsBytes());
1676 // rawSelector is used by HeadersExchange and is not a JMS Selector
1677 if (rawSelector != null)
1678 {
1679 ft.addAll(rawSelector);
1680 }
1681
1682 if (messageSelector != null)
1683 {
1684 ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
1685 }
1686
1687 C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
1688 noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
1689
1690 if (_messageListener != null)
1691 {
1692 consumer.setMessageListener(_messageListener);
1693 }
1694
1695 try
1696 {
1697 registerConsumer(consumer, false);
1698 }
1699 catch (AMQInvalidArgumentException ise)
1700 {
1701 JMSException ex = new InvalidSelectorException(ise.getMessage());
1702 ex.setLinkedException(ise);
1703 throw ex;
1704 }
1705 catch (AMQInvalidRoutingKeyException e)
1706 {
1707 JMSException ide =
1708 new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString());
1709 ide.setLinkedException(e);
1710 throw ide;
1711 }
1712 catch (AMQException e)
1713 {
1714 JMSException ex = new JMSException("Error registering consumer: " + e);
1715
1716 ex.setLinkedException(e);
1717 throw ex;
1718 }
1719
1720 synchronized (destination)
1721 {
1722 _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
1723 _destinationConsumerCount.get(destination).incrementAndGet();
1724 }
1725
1726 return consumer;
1727 }
1728 }, _connection).execute();
1729 }
1730
1731 public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
1732 final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments,
1733 final boolean noConsume, final boolean autoClose) throws JMSException;
1734
1735 /**
1736 * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
1737 * instance.
1738 *
1739 * @param consumer the consum
1740 */
1741 void deregisterConsumer(C consumer)
1742 {
1743 if (_consumers.remove(consumer.getConsumerTag()) != null)
1744 {
1745 String subscriptionName = _reverseSubscriptionMap.remove(consumer);
1746 if (subscriptionName != null)
1747 {
1748 _subscriptions.remove(subscriptionName);
1749 }
1750
1751 Destination dest = consumer.getDestination();
1752 synchronized (dest)
1753 {
1754 if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
1755 {
1756 _destinationConsumerCount.remove(dest);
1757 }
1758 }
1759
1760 // Consumers that are closed in a transaction must be stored
1761 // so that messages they have received can be acknowledged on commit
1762 if (_transacted)
1763 {
1764 _removedConsumers.add(consumer);
1765 }
1766 }
1767 }
1768
1769 void deregisterProducer(long producerId)
1770 {
1771 _producers.remove(new Long(producerId));
1772 }
1773
1774 boolean isInRecovery()
1775 {
1776 return _inRecovery;
1777 }
1778
1779 boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
1780 {
1781 return isQueueBound(exchangeName, queueName, null);
1782 }
1783
1784 /**
1785 * Tests whether or not the specified queue is bound to the specified exchange under a particular routing key.
1786 *
1787 * <p/>Note that this operation automatically retries in the event of fail-over.
1788 *
1789 * @param exchangeName The exchange name to test for binding against.
1790 * @param queueName The queue name to check if bound.
1791 * @param routingKey The routing key to check if the queue is bound under.
1792 *
1793 * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not.
1794 *
1795 * @throws JMSException If the query fails for any reason.
1796 * @todo Be aware of possible changes to parameter order as versions change.
1797 */
1798 public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
1799 throws JMSException;
1800
1801 public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
1802
1803 /**
1804 * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
1805 * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex.
1806 */
1807 void markClosed()
1808 {
1809 _closed.set(true);
1810 _connection.deregisterSession(_channelId);
1811 markClosedProducersAndConsumers();
1812
1813 }
1814
1815 void failoverPrep()
1816 {
1817 startDispatcherIfNecessary();
1818 final CountDownLatch signal = new CountDownLatch(1);
1819 _queue.add(new Dispatchable() {
1820 public void dispatch(AMQSession ssn)
1821 {
1822 signal.countDown();
1823 }
1824 });
1825 try
1826 {
1827 signal.await();
1828 }
1829 catch (InterruptedException e)
1830 {
1831 // pass
1832 }
1833 }
1834
1835 /**
1836 * Resubscribes all producers and consumers. This is called when performing failover.
1837 *
1838 * @throws AMQException
1839 */
1840 void resubscribe() throws AMQException
1841 {
1842 if (_dirty)
1843 {
1844 _failedOverDirty = true;
1845 }
1846
1847 _rollbackMark.set(-1);
1848 resubscribeProducers();
1849 resubscribeConsumers();
1850 }
1851
1852 void setHasMessageListeners()
1853 {
1854 _hasMessageListeners = true;
1855 }
1856
1857 void setInRecovery(boolean inRecovery)
1858 {
1859 _inRecovery = inRecovery;
1860 }
1861
1862 /**
1863 * Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
1864 *
1865 * @throws AMQException If the session cannot be started for any reason.
1866 * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the
1867 * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
1868 * for each subsequent call to flow.. only need to do this if we have called stop.
1869 */
1870 void start() throws AMQException
1871 {
1872 // Check if the session has perviously been started and suspended, in which case it must be unsuspended.
1873 if (_startedAtLeastOnce.getAndSet(true))
1874 {
1875 suspendChannel(false);
1876 }
1877
1878 // If the event dispatcher is not running then start it too.
1879 if (hasMessageListeners())
1880 {
1881 startDispatcherIfNecessary();
1882 }
1883 }
1884
1885 void startDispatcherIfNecessary()
1886 {
1887 //If we are the dispatcher then we don't need to check we are started
1888 if (Thread.currentThread() == _dispatcherThread)
1889 {
1890 return;
1891 }
1892
1893 // If IMMEDIATE_PREFETCH is not set then we need to start fetching
1894 // This is final per session so will be multi-thread safe.
1895 if (!_immediatePrefetch)
1896 {
1897 // We do this now if this is the first call on a started connection
1898 if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false))
1899 {
1900 try
1901 {
1902 suspendChannel(false);
1903 }
1904 catch (AMQException e)
1905 {
1906 _logger.info("Unsuspending channel threw an exception:" + e);
1907 }
1908 }
1909 }
1910
1911 startDispatcherIfNecessary(false);
1912 }
1913
1914 synchronized void startDispatcherIfNecessary(boolean initiallyStopped)
1915 {
1916 if (_dispatcher == null)
1917 {
1918 _dispatcher = new Dispatcher();
1919 try
1920 {
1921 _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);
1922
1923 }
1924 catch(Exception e)
1925 {
1926 throw new Error("Error creating Dispatcher thread",e);
1927 }
1928 _dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
1929 _dispatcherThread.setDaemon(true);
1930 _dispatcher.setConnectionStopped(initiallyStopped);
1931 _dispatcherThread.start();
1932 if (_dispatcherLogger.isInfoEnabled())
1933 {
1934 _dispatcherLogger.info(_dispatcherThread.getName() + " created");
1935 }
1936 }
1937 else
1938 {
1939 _dispatcher.setConnectionStopped(initiallyStopped);
1940 }
1941 }
1942
1943 void stop() throws AMQException
1944 {
1945 // Stop the server delivering messages to this session.
1946 suspendChannel(true);
1947
1948 if (_dispatcher != null)
1949 {
1950 _dispatcher.setConnectionStopped(true);
1951 }
1952 }
1953
1954 /*
1955 * Binds the named queue, with the specified routing key, to the named exchange.
1956 *
1957 * <p/>Note that this operation automatically retries in the event of fail-over.
1958 *
1959 * @param queueName The name of the queue to bind.
1960 * @param routingKey The routing key to bind the queue with.
1961 * @param arguments Additional arguments.
1962 * @param exchangeName The exchange to bind the queue on.
1963 *
1964 * @throws AMQException If the queue cannot be bound for any reason.
1965 */
1966 /*private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft)
1967 throws AMQException, FailoverException
1968 {
1969 AMQFrame queueBind =
1970 QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), ft, // arguments
1971 amqd.getExchangeName(), // exchange
1972 false, // nowait
1973 queueName, // queue
1974 amqd.getRoutingKey(), // routingKey
1975 getTicket()); // ticket
1976
1977 protocolHandler.syncWrite(queueBind, QueueBindOkBody.class);
1978 }*/
1979
1980 private void checkNotTransacted() throws JMSException
1981 {
1982 if (getTransacted())
1983 {
1984 throw new IllegalStateException("Session is transacted");
1985 }
1986 }
1987
1988 private void checkTemporaryDestination(Destination destination) throws JMSException
1989 {
1990 if ((destination instanceof TemporaryDestination))
1991 {
1992 _logger.debug("destination is temporary");
1993 final TemporaryDestination tempDest = (TemporaryDestination) destination;
1994 if (tempDest.getSession() != this)
1995 {
1996 _logger.debug("destination is on different session");
1997 throw new JMSException("Cannot consume from a temporary destination created onanother session");
1998 }
1999
2000 if (tempDest.isDeleted())
2001 {
2002 _logger.debug("destination is deleted");
2003 throw new JMSException("Cannot consume from a deleted destination");
2004 }
2005 }
2006 }
2007
2008 protected void checkTransacted() throws JMSException
2009 {
2010 if (!getTransacted())
2011 {
2012 throw new IllegalStateException("Session is not transacted");
2013 }
2014 }
2015
2016 private void checkValidDestination(Destination destination) throws InvalidDestinationException
2017 {
2018 if (destination == null)
2019 {
2020 throw new javax.jms.InvalidDestinationException("Invalid Queue");
2021 }
2022 }
2023
2024 private void checkValidQueue(Queue queue) throws InvalidDestinationException
2025 {
2026 if (queue == null)
2027 {
2028 throw new javax.jms.InvalidDestinationException("Invalid Queue");
2029 }
2030 }
2031
2032 /*
2033 * I could have combined the last 3 methods, but this way it improves readability
2034 */
2035 protected AMQTopic checkValidTopic(Topic topic) throws JMSException
2036 {
2037 if (topic == null)
2038 {
2039 throw new javax.jms.InvalidDestinationException("Invalid Topic");
2040 }
2041
2042 if ((topic instanceof TemporaryDestination) && (((TemporaryDestination) topic).getSession() != this))
2043 {
2044 throw new javax.jms.InvalidDestinationException(
2045 "Cannot create a subscription on a temporary topic created in another session");
2046 }
2047
2048 if (!(topic instanceof AMQTopic))
2049 {
2050 throw new javax.jms.InvalidDestinationException(
2051 "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
2052 + topic.getClass().getName());
2053 }
2054
2055 return (AMQTopic) topic;
2056 }
2057
2058 /**
2059 * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
2060 *
2061 * @param error not null if this is a result of an error occurring at the connection level
2062 */
2063 private void closeConsumers(Throwable error) throws JMSException
2064 {
2065 // we need to clone the list of consumers since the close() method updates the _consumers collection
2066 // which would result in a concurrent modification exception
2067 final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
2068
2069 final Iterator<C> it = clonedConsumers.iterator();
2070 while (it.hasNext())
2071 {
2072 final C con = it.next();
2073 if (error != null)
2074 {
2075 con.notifyError(error);
2076 }
2077 else
2078 {
2079 con.close(false);
2080 }
2081 }
2082 // at this point the _consumers map will be empty
2083 if (_dispatcher != null)
2084 {
2085 _dispatcher.close();
2086 _dispatcher = null;
2087 }
2088 }
2089
2090 /**
2091 * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
2092 * currently no way of propagating errors to message producers (this is a JMS limitation).
2093 */
2094 private void closeProducers() throws JMSException
2095 {
2096 // we need to clone the list of producers since the close() method updates the _producers collection
2097 // which would result in a concurrent modification exception
2098 final ArrayList clonedProducers = new ArrayList(_producers.values());
2099
2100 final Iterator it = clonedProducers.iterator();
2101 while (it.hasNext())
2102 {
2103 final P prod = (P) it.next();
2104 prod.close();
2105 }
2106 // at this point the _producers map is empty
2107 }
2108
2109 /**
2110 * Close all producers or consumers. This is called either in the error case or when closing the session normally.
2111 *
2112 * @param amqe the exception, may be null to indicate no error has occurred
2113 */
2114 private void closeProducersAndConsumers(AMQException amqe) throws JMSException
2115 {
2116 JMSException jmse = null;
2117 try
2118 {
2119 closeProducers();
2120 }
2121 catch (JMSException e)
2122 {
2123 _logger.error("Error closing session: " + e, e);
2124 jmse = e;
2125 }
2126
2127 try
2128 {
2129 closeConsumers(amqe);
2130 }
2131 catch (JMSException e)
2132 {
2133 _logger.error("Error closing session: " + e, e);
2134 if (jmse == null)
2135 {
2136 jmse = e;
2137 }
2138 }
2139
2140 if (jmse != null)
2141 {
2142 throw jmse;
2143 }
2144 }
2145
2146 /**
2147 * Register to consume from the queue.
2148 *
2149 * @param queueName
2150 */
2151 private void consumeFromQueue(C consumer, AMQShortString queueName,
2152 AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
2153 {
2154 int tagId = _nextTag++;
2155
2156 consumer.setConsumerTag(tagId);
2157 // we must register the consumer in the map before we actually start listening
2158 _consumers.put(tagId, consumer);
2159
2160 try
2161 {
2162 sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId);
2163 }
2164 catch (AMQException e)
2165 {
2166 // clean-up the map in the event of an error
2167 _consumers.remove(tagId);
2168 throw e;
2169 }
2170 }
2171
2172 public abstract void sendConsume(C consumer, AMQShortString queueName,
2173 AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
2174
2175 private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
2176 throws JMSException
2177 {
2178 return createProducerImpl(destination, mandatory, immediate, false);
2179 }
2180
2181 private P createProducerImpl(final Destination destination, final boolean mandatory,
2182 final boolean immediate, final boolean waitUntilSent) throws JMSException
2183 {
2184 return new FailoverRetrySupport<P, JMSException>(
2185 new FailoverProtectedOperation<P, JMSException>()
2186 {
2187 public P execute() throws JMSException, FailoverException
2188 {
2189 checkNotClosed();
2190 long producerId = getNextProducerId();
2191 P producer = createMessageProducer(destination, mandatory,
2192 immediate, waitUntilSent, producerId);
2193 registerProducer(producerId, producer);
2194
2195 return producer;
2196 }
2197 }, _connection).execute();
2198 }
2199
2200 public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
2201 final boolean immediate, final boolean waitUntilSent, long producerId);
2202
2203 private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
2204 {
2205 declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
2206 }
2207
2208 /**
2209 * Returns the number of messages currently queued for the given destination.
2210 *
2211 * <p/>Note that this operation automatically retries in the event of fail-over.
2212 *
2213 * @param amqd The destination to be checked
2214 *
2215 * @return the number of queued messages.
2216 *
2217 * @throws AMQException If the queue cannot be declared for any reason.
2218 */
2219 public long getQueueDepth(final AMQDestination amqd)
2220 throws AMQException
2221 {
2222 return new FailoverNoopSupport<Long, AMQException>(
2223 new FailoverProtectedOperation<Long, AMQException>()
2224 {
2225 public Long execute() throws AMQException, FailoverException
2226 {
2227 return requestQueueDepth(amqd);
2228 }
2229 }, _connection).execute();
2230
2231 }
2232
2233 protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException;
2234
2235 /**
2236 * Declares the named exchange and type of exchange.
2237 *
2238 * <p/>Note that this operation automatically retries in the event of fail-over.
2239 *
2240 * @param name The name of the exchange to declare.
2241 * @param type The type of the exchange to declare.
2242 * @param protocolHandler The protocol handler to process the communication through.
2243 * @param nowait
2244 *
2245 * @throws AMQException If the exchange cannot be declared for any reason.
2246 * @todo Be aware of possible changes to parameter order as versions change.
2247 */
2248 private void declareExchange(final AMQShortString name, final AMQShortString type,
2249 final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
2250 {
2251 new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
2252 {
2253 public Object execute() throws AMQException, FailoverException
2254 {
2255 sendExchangeDeclare(name, type, protocolHandler, nowait);
2256 return null;
2257 }
2258 }, _connection).execute();
2259 }
2260
2261 public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
2262 final boolean nowait) throws AMQException, FailoverException;
2263
2264 /**
2265 * Declares a queue for a JMS destination.
2266 *
2267 * <p/>Note that for queues but not topics the name is generated in the client rather than the server. This allows
2268 * the name to be reused on failover if required. In general, the destination indicates whether it wants a name
2269 * generated or not.
2270 *
2271 * <p/>Note that this operation automatically retries in the event of fail-over.
2272 *
2273 * @param amqd The destination to declare as a queue.
2274 * @param protocolHandler The protocol handler to communicate through.
2275 *
2276 * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of
2277 * the client.
2278 *
2279 * @throws AMQException If the queue cannot be declared for any reason.
2280 * @todo Verify the destiation is valid or throw an exception.
2281 * @todo Be aware of possible changes to parameter order as versions change.
2282 */
2283 protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
2284 final boolean noLocal)
2285 throws AMQException
2286 {
2287 /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
2288 return new FailoverNoopSupport<AMQShortString, AMQException>(
2289 new FailoverProtectedOperation<AMQShortString, AMQException>()
2290 {
2291 public AMQShortString execute() throws AMQException, FailoverException
2292 {
2293 // Generate the queue name if the destination indicates that a client generated name is to be used.
2294 if (amqd.isNameRequired())
2295 {
2296 amqd.setQueueName(protocolHandler.generateQueueName());
2297 }
2298
2299 sendQueueDeclare(amqd, protocolHandler);
2300
2301 return amqd.getAMQQueueName();
2302 }
2303 }, _connection).execute();
2304 }
2305
2306 public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException;
2307
2308 /**
2309 * Undeclares the specified queue.
2310 *
2311 * <p/>Note that this operation automatically retries in the event of fail-over.
2312 *
2313 * @param queueName The name of the queue to delete.
2314 *
2315 * @throws JMSException If the queue could not be deleted for any reason.
2316 * @todo Be aware of possible changes to parameter order as versions change.
2317 */
2318 protected void deleteQueue(final AMQShortString queueName) throws JMSException
2319 {
2320 try
2321 {
2322 new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
2323 {
2324 public Object execute() throws AMQException, FailoverException
2325 {
2326 sendQueueDelete(queueName);
2327 return null;
2328 }
2329 }, _connection).execute();
2330 }
2331 catch (AMQException e)
2332 {
2333 throw new JMSAMQException("The queue deletion failed: " + e.getMessage(), e);
2334 }
2335 }
2336
2337 public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
2338
2339 private long getNextProducerId()
2340 {
2341 return ++_nextProducerId;
2342 }
2343
2344 protected AMQProtocolHandler getProtocolHandler()
2345 {
2346 return _connection.getProtocolHandler();
2347 }
2348
2349 public byte getProtocolMajorVersion()
2350 {
2351 return getProtocolHandler().getProtocolMajorVersion();
2352 }
2353
2354 public byte getProtocolMinorVersion()
2355 {
2356 return getProtocolHandler().getProtocolMinorVersion();
2357 }
2358
2359 protected boolean hasMessageListeners()
2360 {
2361 return _hasMessageListeners;
2362 }
2363
2364 private void markClosedConsumers() throws JMSException
2365 {
2366 if (_dispatcher != null)
2367 {
2368 _dispatcher.close();
2369 _dispatcher = null;
2370 }
2371 // we need to clone the list of consumers since the close() method updates the _consumers collection
2372 // which would result in a concurrent modification exception
2373 final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
2374
2375 final Iterator<C> it = clonedConsumers.iterator();
2376 while (it.hasNext())
2377 {
2378 final C con = it.next();
2379 con.markClosed();
2380 }
2381 // at this point the _consumers map will be empty
2382 }
2383
2384 private void markClosedProducersAndConsumers()
2385 {
2386 try
2387 {
2388 // no need for a markClosed* method in this case since there is no protocol traffic closing a producer
2389 closeProducers();
2390 }
2391 catch (JMSException e)
2392 {
2393 _logger.error("Error closing session: " + e, e);
2394 }
2395
2396 try
2397 {
2398 markClosedConsumers();
2399 }
2400 catch (JMSException e)
2401 {
2402 _logger.error("Error closing session: " + e, e);
2403 }
2404 }
2405
2406 /**
2407 * Callers must hold the failover mutex before calling this method.
2408 *
2409 * @param consumer
2410 *
2411 * @throws AMQException
2412 */
2413 private void registerConsumer(C consumer, boolean nowait) throws AMQException // , FailoverException
2414 {
2415 AMQDestination amqd = consumer.getDestination();
2416
2417 AMQProtocolHandler protocolHandler = getProtocolHandler();
2418
2419 declareExchange(amqd, protocolHandler, false);
2420
2421 AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal());
2422
2423 // store the consumer queue name
2424 consumer.setQueuename(queueName);
2425
2426 bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd);
2427
2428 // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
2429 if (!_immediatePrefetch)
2430 {
2431 // The dispatcher will be null if we have just created this session
2432 // so suspend the channel before we register our consumer so that we don't
2433 // start prefetching until a receive/mListener is set.
2434 if (_dispatcher == null)
2435 {
2436 if (!isSuspended())
2437 {
2438 try
2439 {
2440 suspendChannel(true);
2441 _logger.info(
2442 "Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
2443 }
2444 catch (AMQException e)
2445 {
2446 _logger.info("Suspending channel threw an exception:" + e);
2447 }
2448 }
2449 }
2450 }
2451 else
2452 {
2453 _logger.info("Immediately prefetching existing messages to new consumer.");
2454 }
2455
2456 try
2457 {
2458 consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
2459 }
2460 catch (JMSException e) // thrown by getMessageSelector
2461 {
2462 throw new AMQException(null, e.getMessage(), e);
2463 }
2464 catch (FailoverException e)
2465 {
2466 throw new AMQException(null, "Fail-over exception interrupted basic consume.", e);
2467 }
2468 }
2469
2470 private void registerProducer(long producerId, MessageProducer producer)
2471 {
2472 _producers.put(new Long(producerId), producer);
2473 }
2474
2475 private void rejectAllMessages(boolean requeue)
2476 {
2477 rejectMessagesForConsumerTag(0, requeue, true);
2478 }
2479
2480 /**
2481 * @param consumerTag The consumerTag to prune from queue or all if null
2482 * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
2483 * @param rejectAllConsumers
2484 */
2485
2486 private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
2487 {
2488 Iterator messages = _queue.iterator();
2489 if (_logger.isInfoEnabled())
2490 {
2491 _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
2492 + requeue);
2493
2494 if (messages.hasNext())
2495 {
2496 _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")");
2497 }
2498 else
2499 {
2500 _logger.info("No messages in _queue to reject");
2501 }
2502 }
2503 while (messages.hasNext())
2504 {
2505 UnprocessedMessage message = (UnprocessedMessage) messages.next();
2506
2507 if (rejectAllConsumers || (message.getConsumerTag() == consumerTag))
2508 {
2509 if (_logger.isDebugEnabled())
2510 {
2511 _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
2512 + message.getDeliveryTag());
2513 }
2514
2515 messages.remove();
2516
2517 rejectMessage(message, requeue);
2518
2519 if (_logger.isDebugEnabled())
2520 {
2521 _logger.debug("Rejected the message(" + message.toString() + ") for consumer :" + consumerTag);
2522 }
2523 }
2524 }
2525 }
2526
2527 private void resubscribeConsumers() throws AMQException
2528 {
2529 ArrayList<C> consumers = new ArrayList<C>(_consumers.values());
2530 _consumers.clear();
2531
2532 for (C consumer : consumers)
2533 {
2534 consumer.failedOver();
2535 registerConsumer(consumer, true);
2536 }
2537 }
2538
2539 private void resubscribeProducers() throws AMQException
2540 {
2541 ArrayList producers = new ArrayList(_producers.values());
2542 _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
2543 for (Iterator it = producers.iterator(); it.hasNext();)
2544 {
2545 P producer = (P) it.next();
2546 producer.resubscribe();
2547 }
2548 }
2549
2550 /**
2551 * Suspends or unsuspends this session.
2552 *
2553 * @param suspend <tt>true</tt> indicates that the session should be suspended, <tt>false<tt> indicates that it
2554 * should be unsuspended.
2555 *
2556 * @throws AMQException If the session cannot be suspended for any reason.
2557 * @todo Be aware of possible changes to parameter order as versions change.
2558 */
2559 protected void suspendChannel(boolean suspend) throws AMQException // , FailoverException
2560 {
2561 synchronized (_suspensionLock)
2562 {
2563 try
2564 {
2565 if (_logger.isDebugEnabled())
2566 {
2567 _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
2568 }
2569
2570 _suspended = suspend;
2571 sendSuspendChannel(suspend);
2572 }
2573 catch (FailoverException e)
2574 {
2575 throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e);
2576 }
2577 }
2578 }
2579
2580 public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException;
2581
2582 Object getMessageDeliveryLock()
2583 {
2584 return _messageDeliveryLock;
2585 }
2586
2587 /**
2588 * Indicates whether this session consumers pre-fetche messages
2589 *
2590 * @return true if this session consumers pre-fetche messages false otherwise
2591 */
2592 public boolean prefetch()
2593 {
2594 return getAMQConnection().getMaxPrefetch() > 0;
2595 }
2596
2597 /** Signifies that the session has pending sends to commit. */
2598 public void markDirty()
2599 {
2600 _dirty = true;
2601 }
2602
2603 /** Signifies that the session has no pending sends to commit. */
2604 public void markClean()
2605 {
2606 _dirty = false;
2607 _failedOverDirty = false;
2608 }
2609
2610 /**
2611 * Check to see if failover has occured since the last call to markClean(commit or rollback).
2612 *
2613 * @return boolean true if failover has occured.
2614 */
2615 public boolean hasFailedOver()
2616 {
2617 return _failedOverDirty;
2618 }
2619
2620 /**
2621 * Check to see if any message have been sent in this transaction and have not been commited.
2622 *
2623 * @return boolean true if a message has been sent but not commited
2624 */
2625 public boolean isDirty()
2626 {
2627 return _dirty;
2628 }
2629
2630 public void setTicket(int ticket)
2631 {
2632 _ticket = ticket;
2633 }
2634
2635 public void setFlowControl(final boolean active)
2636 {
2637 _flowControl.setFlowControl(active);
2638 }
2639
2640 public void checkFlowControl() throws InterruptedException
2641 {
2642 synchronized (_flowControl)
2643 {
2644 while (!_flowControl.getFlowControl())
2645 {
2646 _flowControl.wait();
2647 }
2648 }
2649
2650 }
2651
2652 public interface Dispatchable
2653 {
2654 void dispatch(AMQSession ssn);
2655 }
2656
2657 public void dispatch(UnprocessedMessage message)
2658 {
2659 if (_dispatcher == null)
2660 {
2661 throw new java.lang.IllegalStateException("dispatcher is not started");
2662 }
2663
2664 _dispatcher.dispatchMessage(message);
2665 }
2666
2667 /** Used for debugging in the dispatcher. */
2668 private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
2669
2670 /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
2671 class Dispatcher implements Runnable
2672 {
2673
2674 /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
2675 private final AtomicBoolean _closed = new AtomicBoolean(false);
2676
2677 private final Object _lock = new Object();
2678 private String dispatcherID = "" + System.identityHashCode(this);
2679
2680 public Dispatcher()
2681 {
2682 }
2683
2684 public void close()
2685 {
2686 _closed.set(true);
2687 _dispatcherThread.interrupt();
2688
2689 // fixme awaitTermination
2690
2691 }
2692
2693 public void rejectPending(C consumer)
2694 {
2695 synchronized (_lock)
2696 {
2697 boolean stopped = _dispatcher.connectionStopped();
2698
2699 if (!stopped)
2700 {
2701 _dispatcher.setConnectionStopped(true);
2702 }
2703
2704 // Reject messages on pre-receive queue
2705 consumer.rollbackPendingMessages();
2706
2707 // Reject messages on pre-dispatch queue
2708 rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
2709 //Let the dispatcher deal with this when it gets to them.
2710
2711 // closeConsumer
2712 consumer.markClosed();
2713
2714 _dispatcher.setConnectionStopped(stopped);
2715
2716 }
2717 }
2718
2719 public void rollback()
2720 {
2721
2722 synchronized (_lock)
2723 {
2724 boolean isStopped = connectionStopped();
2725
2726 if (!isStopped)
2727 {
2728 setConnectionStopped(true);
2729 }
2730
2731 _rollbackMark.set(_highestDeliveryTag.get());
2732
2733 _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
2734
2735 for (C consumer : _consumers.values())
2736 {
2737 if (!consumer.isNoConsume())
2738 {
2739 consumer.rollback();
2740 }
2741 else
2742 {
2743 // should perhaps clear the _SQ here.
2744 // consumer._synchronousQueue.clear();
2745 consumer.clearReceiveQueue();
2746 }
2747
2748 }
2749
2750 for (int i = 0; i < _removedConsumers.size(); i++)
2751 {
2752 // Sends acknowledgement to server
2753 _removedConsumers.get(i).rollback();
2754 _removedConsumers.remove(i);
2755 }
2756
2757 setConnectionStopped(isStopped);
2758 }
2759
2760 }
2761
2762 public void run()
2763 {
2764 if (_dispatcherLogger.isInfoEnabled())
2765 {
2766 _dispatcherLogger.info(_dispatcherThread.getName() + " started");
2767 }
2768
2769 UnprocessedMessage message;
2770
2771 // Allow disptacher to start stopped
2772 synchronized (_lock)
2773 {
2774 while (!_closed.get() && connectionStopped())
2775 {
2776 try
2777 {
2778 _lock.wait();
2779 }
2780 catch (InterruptedException e)
2781 {
2782 // ignore
2783 }
2784 }
2785 }
2786
2787 try
2788 {
2789 Dispatchable disp;
2790 while (!_closed.get() && ((disp = (Dispatchable) _queue.take()) != null))
2791 {
2792 disp.dispatch(AMQSession.this);
2793 }
2794 }
2795 catch (InterruptedException e)
2796 {
2797 // ignore
2798 }
2799
2800 if (_dispatcherLogger.isInfoEnabled())
2801 {
2802 _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId);
2803 }
2804 }
2805
2806 // only call while holding lock
2807 final boolean connectionStopped()
2808 {
2809 return _connectionStopped;
2810 }
2811
2812 boolean setConnectionStopped(boolean connectionStopped)
2813 {
2814 boolean currently;
2815 synchronized (_lock)
2816 {
2817 currently = _connectionStopped;
2818 _connectionStopped = connectionStopped;
2819 _lock.notify();
2820
2821 if (_dispatcherLogger.isDebugEnabled())
2822 {
2823 _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started")
2824 + ": Currently " + (currently ? "Stopped" : "Started"));
2825 }
2826 }
2827
2828 return currently;
2829 }
2830
2831 private void dispatchMessage(UnprocessedMessage message)
2832 {
2833 long deliveryTag = message.getDeliveryTag();
2834
2835 synchronized (_lock)
2836 {
2837
2838 try
2839 {
2840 while (connectionStopped())
2841 {
2842 _lock.wait();
2843 }
2844 }
2845 catch (InterruptedException e)
2846 {
2847 // pass
2848 }
2849
2850 if (!(message instanceof CloseConsumerMessage)
2851 && tagLE(deliveryTag, _rollbackMark.get()))
2852 {
2853 rejectMessage(message, true);
2854 }
2855 else
2856 {
2857 synchronized (_messageDeliveryLock)
2858 {
2859 notifyConsumer(message);
2860 }
2861 }
2862 }
2863
2864 long current = _rollbackMark.get();
2865 if (updateRollbackMark(current, deliveryTag))
2866 {
2867 _rollbackMark.compareAndSet(current, deliveryTag);
2868 }
2869 }
2870
2871 private void notifyConsumer(UnprocessedMessage message)
2872 {
2873 final C consumer = _consumers.get(message.getConsumerTag());
2874
2875 if ((consumer == null) || consumer.isClosed())
2876 {
2877 if (_dispatcherLogger.isInfoEnabled())
2878 {
2879 if (consumer == null)
2880 {
2881 _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message("
2882 + System.identityHashCode(message) + ")" + "["
2883 + message.getDeliveryTag() + "] from queue "
2884 + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
2885 }
2886 else
2887 {
2888 if (consumer.isNoConsume())
2889 {
2890 _dispatcherLogger.info("Received a message("
2891 + System.identityHashCode(message) + ")" + "["
2892 + message.getDeliveryTag() + "] from queue " + " consumer("
2893 + message.getConsumerTag() + ") is closed and a browser so dropping...");
2894 //DROP MESSAGE
2895 return;
2896
2897 }
2898 else
2899 {
2900 _dispatcherLogger.info("Received a message("
2901 + System.identityHashCode(message) + ")" + "["
2902 + message.getDeliveryTag() + "] from queue " + " consumer("
2903 + message.getConsumerTag() + ") is closed rejecting(requeue)...");
2904 }
2905 }
2906 }
2907 // Don't reject if we're already closing
2908 if (!_closed.get())
2909 {
2910 rejectMessage(message, true);
2911 }
2912 }
2913 else
2914 {
2915 consumer.notifyMessage(message);
2916 }
2917 }
2918 }
2919
2920 protected abstract boolean tagLE(long tag1, long tag2);
2921
2922 protected abstract boolean updateRollbackMark(long current, long deliveryTag);
2923
2924 public abstract AMQMessageDelegateFactory getMessageDelegateFactory();
2925
2926 /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
2927 boolean read) throws AMQException
2928 {
2929 getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
2930 getProtocolMajorVersion(), getProtocolMinorVersion(), active, exclusive, passive, read, realm, write),
2931 new BlockingMethodFrameListener(_channelId)
2932 {
2933
2934 public boolean processMethod(int channelId, AMQMethodBody frame) // throws AMQException
2935 {
2936 if (frame instanceof AccessRequestOkBody)
2937 {
2938 setTicket(((AccessRequestOkBody) frame).getTicket());
2939
2940 return true;
2941 }
2942 else
2943 {
2944 return false;
2945 }
2946 }
2947 });
2948 }*/
2949
2950 private class SuspenderRunner implements Runnable
2951 {
2952 private AtomicBoolean _suspend;
2953
2954 public SuspenderRunner(AtomicBoolean suspend)
2955 {
2956 _suspend = suspend;
2957 }
2958
2959 public void run()
2960 {
2961 try
2962 {
2963 synchronized (_suspensionLock)
2964 {
2965 suspendChannel(_suspend.get());
2966 }
2967 }
2968 catch (AMQException e)
2969 {
2970 _logger.warn("Unable to suspend channel");
2971 }
2972 }
2973 }
2974 }
|