0001 /*
0002 *
0003 * Licensed to the Apache Software Foundation (ASF) under one
0004 * or more contributor license agreements. See the NOTICE file
0005 * distributed with this work for additional information
0006 * regarding copyright ownership. The ASF licenses this file
0007 * to you under the Apache License, Version 2.0 (the
0008 * "License"); you may not use this file except in compliance
0009 * with the License. You may obtain a copy of the License at
0010 *
0011 * http://www.apache.org/licenses/LICENSE-2.0
0012 *
0013 * Unless required by applicable law or agreed to in writing,
0014 * software distributed under the License is distributed on an
0015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0016 * KIND, either express or implied. See the License for the
0017 * specific language governing permissions and limitations
0018 * under the License.
0019 *
0020 */
0021 package org.apache.qpid.client;
0022
0023 import org.apache.qpid.AMQException;
0024 import org.apache.qpid.client.failover.FailoverException;
0025 import org.apache.qpid.client.message.*;
0026 import org.apache.qpid.client.protocol.AMQProtocolHandler;
0027 import org.apache.qpid.framing.*;
0028 import org.apache.qpid.jms.MessageConsumer;
0029 import org.apache.qpid.jms.Session;
0030 import org.slf4j.Logger;
0031 import org.slf4j.LoggerFactory;
0032
0033 import javax.jms.JMSException;
0034 import javax.jms.Message;
0035 import javax.jms.MessageListener;
0036 import java.util.Arrays;
0037 import java.util.Iterator;
0038 import java.util.List;
0039 import java.util.SortedSet;
0040 import java.util.ArrayList;
0041 import java.util.Collections;
0042 import java.util.TreeSet;
0043 import java.util.concurrent.BlockingQueue;
0044 import java.util.concurrent.ConcurrentLinkedQueue;
0045 import java.util.concurrent.LinkedBlockingQueue;
0046 import java.util.concurrent.TimeUnit;
0047 import java.util.concurrent.atomic.AtomicBoolean;
0048 import java.util.concurrent.atomic.AtomicReference;
0049
0050 public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
0051 {
0052 private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
0053
0054 /** The connection being used by this consumer */
0055 protected final AMQConnection _connection;
0056
0057 protected final String _messageSelector;
0058
0059 private final boolean _noLocal;
0060
0061 private final AMQDestination _destination;
0062
0063 /**
0064 * When true indicates that a blocking receive call is in progress
0065 */
0066 private final AtomicBoolean _receiving = new AtomicBoolean(false);
0067 /**
0068 * Holds an atomic reference to the listener installed.
0069 */
0070 private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
0071
0072 /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
0073 protected int _consumerTag;
0074
0075 /** We need to know the channel id when constructing frames */
0076 protected final int _channelId;
0077
0078 /**
0079 * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
0080 * <p/> Argument true indicates we want strict FIFO semantics
0081 */
0082 protected final BlockingQueue _synchronousQueue;
0083
0084 protected final MessageFactoryRegistry _messageFactory;
0085
0086 protected final AMQSession _session;
0087
0088 protected final AMQProtocolHandler _protocolHandler;
0089
0090 /**
0091 * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
0092 */
0093 private final FieldTable _arguments;
0094
0095 /**
0096 * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
0097 * failover
0098 */
0099 private final int _prefetchHigh;
0100
0101 /**
0102 * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of
0103 * failover
0104 */
0105 private final int _prefetchLow;
0106
0107 /**
0108 * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
0109 */
0110 private final boolean _exclusive;
0111
0112 /**
0113 * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
0114 * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
0115 * implementation.
0116 */
0117 protected final int _acknowledgeMode;
0118
0119 /**
0120 * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
0121 */
0122 private int _outstanding;
0123
0124 /**
0125 * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
0126 * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
0127 */
0128 private boolean _dups_ok_acknowledge_send;
0129
0130 /**
0131 * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
0132 */
0133 private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
0134
0135 /** The last tag that was "multiple" acknowledged on this session (if transacted) */
0136 private long _lastAcked;
0137
0138 /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
0139 private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
0140
0141 private final Object _commitLock = new Object();
0142
0143 /**
0144 * The thread that was used to call receive(). This is important for being able to interrupt that thread if a
0145 * receive() is in progress.
0146 */
0147 private Thread _receivingThread;
0148
0149
0150 /**
0151 * Used to store this consumer queue name
0152 * Usefull when more than binding key should be used
0153 */
0154 private AMQShortString _queuename;
0155
0156 /**
0157 * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
0158 * on the queue. This is used for queue browsing.
0159 */
0160 private final boolean _autoClose;
0161
0162 private final boolean _noConsume;
0163 private List<StackTraceElement> _closedStack = null;
0164
0165
0166
0167 protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
0168 String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
0169 AMQSession session, AMQProtocolHandler protocolHandler,
0170 FieldTable arguments, int prefetchHigh, int prefetchLow,
0171 boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
0172 {
0173 _channelId = channelId;
0174 _connection = connection;
0175 _messageSelector = messageSelector;
0176 _noLocal = noLocal;
0177 _destination = destination;
0178 _messageFactory = messageFactory;
0179 _session = session;
0180 _protocolHandler = protocolHandler;
0181 _arguments = arguments;
0182 _prefetchHigh = prefetchHigh;
0183 _prefetchLow = prefetchLow;
0184 _exclusive = exclusive;
0185
0186 _synchronousQueue = new LinkedBlockingQueue();
0187 _autoClose = autoClose;
0188 _noConsume = noConsume;
0189
0190 // Force queue browsers not to use acknowledge modes.
0191 if (_noConsume)
0192 {
0193 _acknowledgeMode = Session.NO_ACKNOWLEDGE;
0194 }
0195 else
0196 {
0197 _acknowledgeMode = acknowledgeMode;
0198 }
0199 }
0200
0201 public AMQDestination getDestination()
0202 {
0203 return _destination;
0204 }
0205
0206 public String getMessageSelector() throws JMSException
0207 {
0208 checkPreConditions();
0209
0210 return _messageSelector;
0211 }
0212
0213 public MessageListener getMessageListener() throws JMSException
0214 {
0215 checkPreConditions();
0216
0217 return _messageListener.get();
0218 }
0219
0220 public int getAcknowledgeMode()
0221 {
0222 return _acknowledgeMode;
0223 }
0224
0225 protected boolean isMessageListenerSet()
0226 {
0227 return _messageListener.get() != null;
0228 }
0229
0230 public void setMessageListener(final MessageListener messageListener) throws JMSException
0231 {
0232 checkPreConditions();
0233
0234 // if the current listener is non-null and the session is not stopped, then
0235 // it is an error to call this method.
0236
0237 // i.e. it is only valid to call this method if
0238 //
0239 // (a) the connection is stopped, in which case the dispatcher is not running
0240 // OR
0241 // (b) the listener is null AND we are not receiving synchronously at present
0242 //
0243
0244 if (!_session.getAMQConnection().started())
0245 {
0246 _messageListener.set(messageListener);
0247 _session.setHasMessageListeners();
0248
0249 if (_logger.isDebugEnabled())
0250 {
0251 _logger.debug(
0252 "Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
0253 }
0254 }
0255 else
0256 {
0257 if (_receiving.get())
0258 {
0259 throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
0260 }
0261
0262 if (!_messageListener.compareAndSet(null, messageListener))
0263 {
0264 throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started.");
0265 }
0266
0267 _logger.debug("Message listener set for destination " + _destination);
0268
0269 if (messageListener != null)
0270 {
0271 //todo: handle case where connection has already been started, and the dispatcher has alreaded started
0272 // putting values on the _synchronousQueue
0273
0274 synchronized (_session)
0275 {
0276 _messageListener.set(messageListener);
0277 _session.setHasMessageListeners();
0278 _session.startDispatcherIfNecessary();
0279
0280 // If we already have messages on the queue, deliver them to the listener
0281 Object o = _synchronousQueue.poll();
0282 while (o != null)
0283 {
0284 notifyMessage((AbstractJMSMessage) o);
0285 o = _synchronousQueue.poll();
0286 }
0287 }
0288 }
0289 }
0290 }
0291
0292 protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
0293 {
0294 if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
0295 {
0296 _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
0297 }
0298
0299 _session.setInRecovery(false);
0300 preDeliver(jmsMsg);
0301 }
0302
0303 /**
0304 * @param immediate if true then return immediately if the connection is failing over
0305 *
0306 * @return boolean if the acquisition was successful
0307 *
0308 * @throws JMSException if a listener has already been set or another thread is receiving
0309 * @throws InterruptedException if interrupted
0310 */
0311 private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException
0312 {
0313 if (_connection.isFailingOver())
0314 {
0315 if (immediate)
0316 {
0317 return false;
0318 }
0319 else
0320 {
0321 _connection.blockUntilNotFailingOver();
0322 }
0323 }
0324
0325 if (!_receiving.compareAndSet(false, true))
0326 {
0327 throw new javax.jms.IllegalStateException("Another thread is already receiving.");
0328 }
0329
0330 if (isMessageListenerSet())
0331 {
0332 throw new javax.jms.IllegalStateException("A listener has already been set.");
0333 }
0334
0335 _receivingThread = Thread.currentThread();
0336 return true;
0337 }
0338
0339 private void releaseReceiving()
0340 {
0341 _receiving.set(false);
0342 _receivingThread = null;
0343 }
0344
0345 public FieldTable getArguments()
0346 {
0347 return _arguments;
0348 }
0349
0350 public int getPrefetch()
0351 {
0352 return _prefetchHigh;
0353 }
0354
0355 public int getPrefetchHigh()
0356 {
0357 return _prefetchHigh;
0358 }
0359
0360 public int getPrefetchLow()
0361 {
0362 return _prefetchLow;
0363 }
0364
0365 public boolean isNoLocal()
0366 {
0367 return _noLocal;
0368 }
0369
0370 public boolean isExclusive()
0371 {
0372 return _exclusive;
0373 }
0374
0375 public boolean isReceiving()
0376 {
0377 return _receiving.get();
0378 }
0379
0380 public Message receive() throws JMSException
0381 {
0382 return receive(0);
0383 }
0384
0385 public Message receive(long l) throws JMSException
0386 {
0387
0388 checkPreConditions();
0389
0390 try
0391 {
0392 acquireReceiving(false);
0393 }
0394 catch (InterruptedException e)
0395 {
0396 _logger.warn("Interrupted acquire: " + e);
0397 if (isClosed())
0398 {
0399 return null;
0400 }
0401 }
0402
0403 _session.startDispatcherIfNecessary();
0404
0405 try
0406 {
0407 Object o = getMessageFromQueue(l);
0408 final AbstractJMSMessage m = returnMessageOrThrow(o);
0409 if (m != null)
0410 {
0411 preApplicationProcessing(m);
0412 postDeliver(m);
0413 }
0414 return m;
0415 }
0416 catch (InterruptedException e)
0417 {
0418 _logger.warn("Interrupted: " + e);
0419
0420 return null;
0421 }
0422 finally
0423 {
0424 releaseReceiving();
0425 }
0426 }
0427
0428 public Object getMessageFromQueue(long l) throws InterruptedException
0429 {
0430 Object o;
0431 if (l > 0)
0432 {
0433 o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
0434 }
0435 else if (l < 0)
0436 {
0437 o = _synchronousQueue.poll();
0438 }
0439 else
0440 {
0441 o = _synchronousQueue.take();
0442 }
0443 return o;
0444 }
0445
0446 public Message receiveNoWait() throws JMSException
0447 {
0448 checkPreConditions();
0449
0450 try
0451 {
0452 if (!acquireReceiving(true))
0453 {
0454 //If we couldn't acquire the receiving thread then return null.
0455 // This will occur if failing over.
0456 return null;
0457 }
0458 }
0459 catch (InterruptedException e)
0460 {
0461 /*
0462 * This seems slightly shoddy but should never actually be executed
0463 * since we told acquireReceiving to return immediately and it shouldn't
0464 * block on anything.
0465 */
0466
0467 return null;
0468 }
0469
0470 _session.startDispatcherIfNecessary();
0471
0472 try
0473 {
0474 Object o = getMessageFromQueue(-1);
0475 final AbstractJMSMessage m = returnMessageOrThrow(o);
0476 if (m != null)
0477 {
0478 preApplicationProcessing(m);
0479 postDeliver(m);
0480 }
0481
0482 return m;
0483 }
0484 catch (InterruptedException e)
0485 {
0486 _logger.warn("Interrupted: " + e);
0487
0488 return null;
0489 }
0490 finally
0491 {
0492 releaseReceiving();
0493 }
0494 }
0495
0496 /**
0497 * We can get back either a Message or an exception from the queue. This method examines the argument and deals with
0498 * it by throwing it (if an exception) or returning it (in any other case).
0499 *
0500 * @param o the object to return or throw
0501 * @return a message only if o is a Message
0502 * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a
0503 * JMSException is created with the linked exception set appropriately
0504 */
0505 private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException
0506 {
0507 // errors are passed via the queue too since there is no way of interrupting the poll() via the API.
0508 if (o instanceof Throwable)
0509 {
0510 JMSException e = new JMSException("Message consumer forcibly closed due to error: " + o);
0511 if (o instanceof Exception)
0512 {
0513 e.setLinkedException((Exception) o);
0514 }
0515
0516 throw e;
0517 }
0518 else if (o instanceof CloseConsumerMessage)
0519 {
0520 _closed.set(true);
0521 deregisterConsumer();
0522 return null;
0523 }
0524 else
0525 {
0526 return (AbstractJMSMessage) o;
0527 }
0528 }
0529
0530 public void close() throws JMSException
0531 {
0532 close(true);
0533 }
0534
0535 public void close(boolean sendClose) throws JMSException
0536 {
0537 if (_logger.isInfoEnabled())
0538 {
0539 _logger.info("Closing consumer:" + debugIdentity());
0540 }
0541
0542 if (!_closed.getAndSet(true))
0543 {
0544 if (_logger.isDebugEnabled())
0545 {
0546 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
0547 if (_closedStack != null)
0548 {
0549 _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
0550 }
0551 else
0552 {
0553 _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
0554 }
0555 }
0556
0557 if (sendClose)
0558 {
0559 // The Synchronized block only needs to protect network traffic.
0560 synchronized (_connection.getFailoverMutex())
0561 {
0562 try
0563 {
0564 sendCancel();
0565 }
0566 catch (AMQException e)
0567 {
0568 throw new JMSAMQException("Error closing consumer: " + e, e);
0569 }
0570 catch (FailoverException e)
0571 {
0572 throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
0573 }
0574 }
0575 }
0576 else
0577 {
0578 // FIXME: wow this is ugly
0579 // //fixme this probably is not right
0580 // if (!isNoConsume())
0581 { // done in BasicCancelOK Handler but not sending one so just deregister.
0582 deregisterConsumer();
0583 }
0584 }
0585
0586 // This will occur if session.close is called closing all consumers we may be blocked waiting for a receive
0587 // so we need to let it know it is time to close.
0588 if ((_messageListener != null) && _receiving.get())
0589 {
0590 if (_logger.isInfoEnabled())
0591 {
0592 _logger.info("Interrupting thread: " + _receivingThread);
0593 }
0594
0595 _receivingThread.interrupt();
0596 }
0597 }
0598 }
0599
0600 abstract void sendCancel() throws AMQException, FailoverException;
0601
0602 /**
0603 * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
0604 * vetoed automatic resubscription. The caller must hold the failover mutex.
0605 */
0606 void markClosed()
0607 {
0608 // synchronized (_closed)
0609 {
0610 _closed.set(true);
0611
0612 if (_logger.isDebugEnabled())
0613 {
0614 if (_closedStack != null)
0615 {
0616 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
0617 _logger.debug(_consumerTag + " markClosed():"
0618 + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
0619 _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
0620 }
0621 else
0622 {
0623 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
0624 _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
0625 }
0626 }
0627 }
0628
0629 deregisterConsumer();
0630 }
0631
0632 /**
0633 * @param closeMessage
0634 * this message signals that we should close the browser
0635 */
0636 public void notifyCloseMessage(CloseConsumerMessage closeMessage)
0637 {
0638 if (isMessageListenerSet())
0639 {
0640 // Currently only possible to get this msg type with a browser.
0641 // If we get the message here then we should probably just close
0642 // this consumer.
0643 // Though an AutoClose consumer with message listener is quite odd..
0644 // Just log out the fact so we know where we are
0645 _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
0646 }
0647 else
0648 {
0649 try
0650 {
0651 _synchronousQueue.put(closeMessage);
0652 }
0653 catch (InterruptedException e)
0654 {
0655 _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing,"
0656 + "but we shouldn't have close yet");
0657 }
0658 }
0659 }
0660
0661
0662 /**
0663 * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case of a
0664 * message listener or a synchronous receive() caller.
0665 *
0666 * @param messageFrame the raw unprocessed mesage
0667 */
0668 void notifyMessage(U messageFrame)
0669 {
0670 if (messageFrame instanceof CloseConsumerMessage)
0671 {
0672 notifyCloseMessage((CloseConsumerMessage) messageFrame);
0673 return;
0674 }
0675
0676
0677
0678 try
0679 {
0680 AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame);
0681
0682 if (_logger.isDebugEnabled())
0683 {
0684 _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
0685 }
0686 // synchronized (_closed)
0687
0688 {
0689 // if (!_closed.get())
0690 {
0691
0692 //preDeliver(jmsMessage);
0693
0694 notifyMessage(jmsMessage);
0695 }
0696 // else
0697 // {
0698 // _logger.error("MESSAGE REJECTING!");
0699 // _session.rejectMessage(jmsMessage, true);
0700 // //_logger.error("MESSAGE JUST DROPPED!");
0701 // }
0702 }
0703 }
0704 catch (Exception e)
0705 {
0706 if (e instanceof InterruptedException)
0707 {
0708 _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
0709 }
0710 else
0711 {
0712 _logger.error("Caught exception (dump follows) - ignoring...", e);
0713 }
0714 }
0715 }
0716
0717 public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, U messageFrame)
0718 throws Exception;
0719
0720 /** @param jmsMessage this message has already been processed so can't redo preDeliver */
0721 public void notifyMessage(AbstractJMSMessage jmsMessage)
0722 {
0723 try
0724 {
0725 if (isMessageListenerSet())
0726 {
0727 preApplicationProcessing(jmsMessage);
0728 getMessageListener().onMessage(jmsMessage);
0729 postDeliver(jmsMessage);
0730 }
0731 else
0732 {
0733 // we should not be allowed to add a message is the
0734 // consumer is closed
0735 _synchronousQueue.put(jmsMessage);
0736 }
0737 }
0738 catch (Exception e)
0739 {
0740 if (e instanceof InterruptedException)
0741 {
0742 _logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing");
0743 }
0744 else
0745 {
0746 _logger.error("reNotification : Caught exception (dump follows) - ignoring...", e);
0747 }
0748 }
0749 }
0750
0751 void preDeliver(AbstractJMSMessage msg)
0752 {
0753 switch (_acknowledgeMode)
0754 {
0755
0756 case Session.PRE_ACKNOWLEDGE:
0757 _session.acknowledgeMessage(msg.getDeliveryTag(), false);
0758 break;
0759
0760 case Session.CLIENT_ACKNOWLEDGE:
0761 // we set the session so that when the user calls acknowledge() it can call the method on session
0762 // to send out the appropriate frame
0763 msg.setAMQSession(_session);
0764 break;
0765 case Session.SESSION_TRANSACTED:
0766 if (isNoConsume())
0767 {
0768 _session.acknowledgeMessage(msg.getDeliveryTag(), false);
0769 }
0770 else
0771 {
0772 _session.addDeliveredMessage(msg.getDeliveryTag());
0773 }
0774
0775 break;
0776 }
0777 }
0778
0779 void postDeliver(AbstractJMSMessage msg) throws JMSException
0780 {
0781 msg.setJMSDestination(_destination);
0782 switch (_acknowledgeMode)
0783 {
0784
0785 case Session.CLIENT_ACKNOWLEDGE:
0786 if (isNoConsume())
0787 {
0788 _session.acknowledgeMessage(msg.getDeliveryTag(), false);
0789 }
0790 _session.markDirty();
0791 break;
0792
0793 case Session.DUPS_OK_ACKNOWLEDGE:
0794 case Session.AUTO_ACKNOWLEDGE:
0795 // we do not auto ack a message if the application code called recover()
0796 if (!_session.isInRecovery())
0797 {
0798 _session.acknowledgeMessage(msg.getDeliveryTag(), false);
0799 }
0800
0801 break;
0802 }
0803 }
0804
0805
0806 /**
0807 * Acknowledge up to last message delivered (if any). Used when commiting.
0808 *
0809 * @return the lastDeliveryTag to acknowledge
0810 */
0811 Long getLastDelivered()
0812 {
0813 if (!_receivedDeliveryTags.isEmpty())
0814 {
0815 Long lastDeliveryTag = _receivedDeliveryTags.poll();
0816
0817 while (!_receivedDeliveryTags.isEmpty())
0818 {
0819 lastDeliveryTag = _receivedDeliveryTags.poll();
0820 }
0821
0822 assert _receivedDeliveryTags.isEmpty();
0823
0824 return lastDeliveryTag;
0825 }
0826
0827 return null;
0828 }
0829
0830 /**
0831 * Acknowledge up to last message delivered (if any). Used when commiting.
0832 */
0833 void acknowledgeDelivered()
0834 {
0835 synchronized(_commitLock)
0836 {
0837 ArrayList<Long> tagsToAck = new ArrayList<Long>();
0838
0839 while (!_receivedDeliveryTags.isEmpty())
0840 {
0841 tagsToAck.add(_receivedDeliveryTags.poll());
0842 }
0843
0844 Collections.sort(tagsToAck);
0845
0846 long prevAcked = _lastAcked;
0847 long oldAckPoint = -1;
0848
0849 while(oldAckPoint != prevAcked)
0850 {
0851 oldAckPoint = prevAcked;
0852
0853 Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
0854
0855 while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
0856 {
0857 tagsToAckIterator.remove();
0858 prevAcked++;
0859 }
0860
0861 Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
0862 while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
0863 {
0864 previousAckIterator.remove();
0865 prevAcked++;
0866 }
0867
0868 }
0869 if(prevAcked != _lastAcked)
0870 {
0871 _session.acknowledgeMessage(prevAcked, true);
0872 _lastAcked = prevAcked;
0873 }
0874
0875 Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
0876
0877 while(tagsToAckIterator.hasNext())
0878 {
0879 Long tag = tagsToAckIterator.next();
0880 _session.acknowledgeMessage(tag, false);
0881 _previouslyAcked.add(tag);
0882 }
0883 }
0884 }
0885
0886
0887 void notifyError(Throwable cause)
0888 {
0889 // synchronized (_closed)
0890 {
0891 _closed.set(true);
0892 if (_logger.isDebugEnabled())
0893 {
0894 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
0895 if (_closedStack != null)
0896 {
0897 _logger.debug(_consumerTag + " notifyError():"
0898 + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
0899 _logger.debug(_consumerTag + " previously" + _closedStack.toString());
0900 }
0901 else
0902 {
0903 _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
0904 }
0905 }
0906 }
0907 // QPID-293 can "request redelivery of this error through dispatcher"
0908
0909 // we have no way of propagating the exception to a message listener - a JMS limitation - so we
0910 // deal with the case where we have a synchronous receive() waiting for a message to arrive
0911 if (!isMessageListenerSet())
0912 {
0913 // offer only succeeds if there is a thread waiting for an item from the queue
0914 if (_synchronousQueue.offer(cause))
0915 {
0916 _logger.debug("Passed exception to synchronous queue for propagation to receive()");
0917 }
0918 }
0919
0920 deregisterConsumer();
0921 }
0922
0923 /**
0924 * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean case and in
0925 * the case of an error occurring.
0926 */
0927 private void deregisterConsumer()
0928 {
0929 _session.deregisterConsumer(this);
0930 }
0931
0932 public int getConsumerTag()
0933 {
0934 return _consumerTag;
0935 }
0936
0937 public void setConsumerTag(int consumerTag)
0938 {
0939 _consumerTag = consumerTag;
0940 }
0941
0942 public AMQSession getSession()
0943 {
0944 return _session;
0945 }
0946
0947 private void checkPreConditions() throws JMSException
0948 {
0949
0950 this.checkNotClosed();
0951
0952 if ((_session == null) || _session.isClosed())
0953 {
0954 throw new javax.jms.IllegalStateException("Invalid Session");
0955 }
0956 }
0957
0958 public boolean isAutoClose()
0959 {
0960 return _autoClose;
0961 }
0962
0963 public boolean isNoConsume()
0964 {
0965 return _noConsume;
0966 }
0967
0968 public void rollback()
0969 {
0970 rollbackPendingMessages();
0971 }
0972
0973 public void rollbackPendingMessages()
0974 {
0975 if (_synchronousQueue.size() > 0)
0976 {
0977 if (_logger.isDebugEnabled())
0978 {
0979 _logger.debug("Rejecting the messages(" + _synchronousQueue
0980 .size() + ") in _syncQueue (PRQ)" + "for consumer with tag:" + _consumerTag);
0981 }
0982
0983 Iterator iterator = _synchronousQueue.iterator();
0984
0985 int initialSize = _synchronousQueue.size();
0986
0987 boolean removed = false;
0988 while (iterator.hasNext())
0989 {
0990
0991 Object o = iterator.next();
0992 if (o instanceof AbstractJMSMessage)
0993 {
0994 _session.rejectMessage(((AbstractJMSMessage) o), true);
0995
0996 if (_logger.isDebugEnabled())
0997 {
0998 _logger.debug("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
0999 }
1000
1001 iterator.remove();
1002 removed = true;
1003
1004 }
1005 else
1006 {
1007 _logger.error("Queue contained a :" + o.getClass()
1008 + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
1009 iterator.remove();
1010 removed = true;
1011 }
1012 }
1013
1014 if (removed && (initialSize == _synchronousQueue.size()))
1015 {
1016 _logger.error("Queue had content removed but didn't change in size." + initialSize);
1017 }
1018
1019
1020 if (_synchronousQueue.size() != 0)
1021 {
1022 _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size());
1023 rollback();
1024 }
1025
1026 clearReceiveQueue();
1027 }
1028 }
1029
1030 public String debugIdentity()
1031 {
1032 return String.valueOf(_consumerTag) + "[" + System.identityHashCode(this) + "]";
1033 }
1034
1035 public void clearReceiveQueue()
1036 {
1037 _synchronousQueue.clear();
1038 }
1039
1040 public void start()
1041 {
1042 // do nothing as this is a 0_10 feature
1043 }
1044
1045
1046 public void stop()
1047 {
1048 // do nothing as this is a 0_10 feature
1049 }
1050
1051 public boolean isStrated()
1052 {
1053 // do nothing as this is a 0_10 feature
1054 return false;
1055 }
1056
1057 public AMQShortString getQueuename()
1058 {
1059 return _queuename;
1060 }
1061
1062 public void setQueuename(AMQShortString queuename)
1063 {
1064 this._queuename = queuename;
1065 }
1066
1067 public void addBindingKey(AMQDestination amqd, String routingKey) throws AMQException
1068 {
1069 _session.addBindingKey(this,amqd,routingKey);
1070 }
1071
1072 /** to be called when a failover has occured */
1073 public void failedOver()
1074 {
1075 clearReceiveQueue();
1076 // TGM FIXME: think this should just be removed
1077 // clearUnackedMessages();
1078 }
1079 }
|