BasicMessageConsumer.java
0001 /*
0002  *
0003  * Licensed to the Apache Software Foundation (ASF) under one
0004  * or more contributor license agreements.  See the NOTICE file
0005  * distributed with this work for additional information
0006  * regarding copyright ownership.  The ASF licenses this file
0007  * to you under the Apache License, Version 2.0 (the
0008  * "License"); you may not use this file except in compliance
0009  * with the License.  You may obtain a copy of the License at
0010  *
0011  *   http://www.apache.org/licenses/LICENSE-2.0
0012  *
0013  * Unless required by applicable law or agreed to in writing,
0014  * software distributed under the License is distributed on an
0015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0016  * KIND, either express or implied.  See the License for the
0017  * specific language governing permissions and limitations
0018  * under the License.
0019  *
0020  */
0021 package org.apache.qpid.client;
0022 
0023 import org.apache.qpid.AMQException;
0024 import org.apache.qpid.client.failover.FailoverException;
0025 import org.apache.qpid.client.message.*;
0026 import org.apache.qpid.client.protocol.AMQProtocolHandler;
0027 import org.apache.qpid.framing.*;
0028 import org.apache.qpid.jms.MessageConsumer;
0029 import org.apache.qpid.jms.Session;
0030 import org.slf4j.Logger;
0031 import org.slf4j.LoggerFactory;
0032 
0033 import javax.jms.JMSException;
0034 import javax.jms.Message;
0035 import javax.jms.MessageListener;
0036 import java.util.Arrays;
0037 import java.util.Iterator;
0038 import java.util.List;
0039 import java.util.SortedSet;
0040 import java.util.ArrayList;
0041 import java.util.Collections;
0042 import java.util.TreeSet;
0043 import java.util.concurrent.BlockingQueue;
0044 import java.util.concurrent.ConcurrentLinkedQueue;
0045 import java.util.concurrent.LinkedBlockingQueue;
0046 import java.util.concurrent.TimeUnit;
0047 import java.util.concurrent.atomic.AtomicBoolean;
0048 import java.util.concurrent.atomic.AtomicReference;
0049 
0050 public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
0051 {
0052     private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
0053 
0054     /** The connection being used by this consumer */
0055     protected final AMQConnection _connection;
0056 
0057     protected final String _messageSelector;
0058 
0059     private final boolean _noLocal;
0060 
0061     private final AMQDestination _destination;
0062 
0063     /**
0064      * When true indicates that a blocking receive call is in progress
0065      */
0066     private final AtomicBoolean _receiving = new AtomicBoolean(false);
0067     /**
0068      * Holds an atomic reference to the listener installed.
0069      */
0070     private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
0071 
0072     /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
0073     protected int _consumerTag;
0074 
0075     /** We need to know the channel id when constructing frames */
0076     protected final int _channelId;
0077 
0078     /**
0079      * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
0080      <p/> Argument true indicates we want strict FIFO semantics
0081      */
0082     protected final BlockingQueue _synchronousQueue;
0083 
0084     protected final MessageFactoryRegistry _messageFactory;
0085 
0086     protected final AMQSession _session;
0087 
0088     protected final AMQProtocolHandler _protocolHandler;
0089 
0090     /**
0091      * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
0092      */
0093     private final FieldTable _arguments;
0094 
0095     /**
0096      * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
0097      * failover
0098      */
0099     private final int _prefetchHigh;
0100 
0101     /**
0102      * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of
0103      * failover
0104      */
0105     private final int _prefetchLow;
0106 
0107     /**
0108      * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
0109      */
0110     private final boolean _exclusive;
0111 
0112     /**
0113      * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
0114      * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
0115      * implementation.
0116      */
0117     protected final int _acknowledgeMode;
0118 
0119     /**
0120      * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
0121      */
0122     private int _outstanding;
0123 
0124     /**
0125      * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
0126      * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
0127      */
0128     private boolean _dups_ok_acknowledge_send;
0129 
0130     /**
0131      * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
0132      */
0133     private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
0134 
0135     /** The last tag that was "multiple" acknowledged on this session (if transacted) */
0136     private long _lastAcked;
0137 
0138     /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
0139     private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
0140 
0141     private final Object _commitLock = new Object();
0142 
0143     /**
0144      * The thread that was used to call receive(). This is important for being able to interrupt that thread if a
0145      * receive() is in progress.
0146      */
0147     private Thread _receivingThread;
0148 
0149 
0150     /**
0151      * Used to store this consumer queue name
0152      * Usefull when more than binding key should be used
0153      */
0154     private AMQShortString _queuename;
0155 
0156     /**
0157      * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
0158      * on the queue.  This is used for queue browsing.
0159      */
0160     private final boolean _autoClose;
0161 
0162     private final boolean _noConsume;
0163     private List<StackTraceElement> _closedStack = null;
0164 
0165 
0166 
0167     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
0168                                    String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
0169                                    AMQSession session, AMQProtocolHandler protocolHandler,
0170                                    FieldTable arguments, int prefetchHigh, int prefetchLow,
0171                                    boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
0172     {
0173         _channelId = channelId;
0174         _connection = connection;
0175         _messageSelector = messageSelector;
0176         _noLocal = noLocal;
0177         _destination = destination;
0178         _messageFactory = messageFactory;
0179         _session = session;
0180         _protocolHandler = protocolHandler;
0181         _arguments = arguments;
0182         _prefetchHigh = prefetchHigh;
0183         _prefetchLow = prefetchLow;
0184         _exclusive = exclusive;
0185 
0186         _synchronousQueue = new LinkedBlockingQueue();
0187         _autoClose = autoClose;
0188         _noConsume = noConsume;
0189 
0190         // Force queue browsers not to use acknowledge modes.
0191         if (_noConsume)
0192         {
0193             _acknowledgeMode = Session.NO_ACKNOWLEDGE;
0194         }
0195         else
0196         {
0197             _acknowledgeMode = acknowledgeMode;
0198         }
0199     }
0200 
0201     public AMQDestination getDestination()
0202     {
0203         return _destination;
0204     }
0205 
0206     public String getMessageSelector() throws JMSException
0207     {
0208         checkPreConditions();
0209 
0210         return _messageSelector;
0211     }
0212 
0213     public MessageListener getMessageListener() throws JMSException
0214     {
0215         checkPreConditions();
0216 
0217         return _messageListener.get();
0218     }
0219 
0220     public int getAcknowledgeMode()
0221     {
0222         return _acknowledgeMode;
0223     }
0224 
0225     protected boolean isMessageListenerSet()
0226     {
0227         return _messageListener.get() != null;
0228     }
0229 
0230     public void setMessageListener(final MessageListener messageListenerthrows JMSException
0231     {
0232         checkPreConditions();
0233 
0234         // if the current listener is non-null and the session is not stopped, then
0235         // it is an error to call this method.
0236 
0237         // i.e. it is only valid to call this method if
0238         //
0239         // (a) the connection is stopped, in which case the dispatcher is not running
0240         // OR
0241         // (b) the listener is null AND we are not receiving synchronously at present
0242         //
0243 
0244         if (!_session.getAMQConnection().started())
0245         {
0246             _messageListener.set(messageListener);
0247             _session.setHasMessageListeners();
0248 
0249             if (_logger.isDebugEnabled())
0250             {
0251                 _logger.debug(
0252                         "Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
0253             }
0254         }
0255         else
0256         {
0257             if (_receiving.get())
0258             {
0259                 throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
0260             }
0261 
0262             if (!_messageListener.compareAndSet(null, messageListener))
0263             {
0264                 throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started.");
0265             }
0266 
0267             _logger.debug("Message listener set for destination " + _destination);
0268 
0269             if (messageListener != null)
0270             {
0271                 //todo: handle case where connection has already been started, and the dispatcher has alreaded started
0272                 // putting values on the _synchronousQueue
0273 
0274                 synchronized (_session)
0275                 {
0276                     _messageListener.set(messageListener);
0277                     _session.setHasMessageListeners();
0278                     _session.startDispatcherIfNecessary();
0279                     
0280                     // If we already have messages on the queue, deliver them to the listener
0281                     Object o = _synchronousQueue.poll();
0282                     while (o != null)
0283                     {
0284                         notifyMessage((AbstractJMSMessageo);
0285                         o = _synchronousQueue.poll();
0286                     }
0287                 }
0288             }
0289         }
0290     }
0291 
0292     protected void preApplicationProcessing(AbstractJMSMessage jmsMsgthrows JMSException
0293     {
0294         if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
0295         {
0296             _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
0297         }
0298         
0299         _session.setInRecovery(false);
0300         preDeliver(jmsMsg);
0301     }
0302 
0303     /**
0304      @param immediate if true then return immediately if the connection is failing over
0305      *
0306      @return boolean if the acquisition was successful
0307      *
0308      @throws JMSException if a listener has already been set or another thread is receiving
0309      @throws InterruptedException if interrupted
0310      */
0311     private boolean acquireReceiving(boolean immediatethrows JMSException, InterruptedException
0312     {
0313         if (_connection.isFailingOver())
0314         {
0315             if (immediate)
0316             {
0317                 return false;
0318             }
0319             else
0320             {
0321                 _connection.blockUntilNotFailingOver();
0322             }
0323         }
0324 
0325         if (!_receiving.compareAndSet(false, true))
0326         {
0327             throw new javax.jms.IllegalStateException("Another thread is already receiving.");
0328         }
0329 
0330         if (isMessageListenerSet())
0331         {
0332             throw new javax.jms.IllegalStateException("A listener has already been set.");
0333         }
0334 
0335         _receivingThread = Thread.currentThread();
0336         return true;
0337     }
0338 
0339     private void releaseReceiving()
0340     {
0341         _receiving.set(false);
0342         _receivingThread = null;
0343     }
0344 
0345     public FieldTable getArguments()
0346     {
0347         return _arguments;
0348     }
0349 
0350     public int getPrefetch()
0351     {
0352         return _prefetchHigh;
0353     }
0354 
0355     public int getPrefetchHigh()
0356     {
0357         return _prefetchHigh;
0358     }
0359 
0360     public int getPrefetchLow()
0361     {
0362         return _prefetchLow;
0363     }
0364 
0365     public boolean isNoLocal()
0366     {
0367         return _noLocal;
0368     }
0369 
0370     public boolean isExclusive()
0371     {
0372         return _exclusive;
0373     }
0374 
0375     public boolean isReceiving()
0376     {
0377         return _receiving.get();
0378     }
0379 
0380     public Message receive() throws JMSException
0381     {
0382         return receive(0);
0383     }
0384 
0385     public Message receive(long lthrows JMSException
0386     {
0387 
0388         checkPreConditions();
0389 
0390         try
0391         {
0392             acquireReceiving(false);
0393         }
0394         catch (InterruptedException e)
0395         {
0396             _logger.warn("Interrupted acquire: " + e);
0397             if (isClosed())
0398             {
0399                 return null;
0400             }
0401         }
0402 
0403         _session.startDispatcherIfNecessary();
0404 
0405         try
0406         {
0407             Object o = getMessageFromQueue(l);
0408             final AbstractJMSMessage m = returnMessageOrThrow(o);
0409             if (m != null)
0410             {
0411                 preApplicationProcessing(m);
0412                 postDeliver(m);
0413             }
0414             return m;
0415         }
0416         catch (InterruptedException e)
0417         {
0418             _logger.warn("Interrupted: " + e);
0419 
0420             return null;
0421         }
0422         finally
0423         {
0424             releaseReceiving();
0425         }
0426     }
0427 
0428     public  Object getMessageFromQueue(long lthrows InterruptedException
0429     {
0430          Object o;
0431          if (l > 0)
0432          {
0433              o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
0434          }
0435          else if (l < 0)
0436          {
0437              o = _synchronousQueue.poll();
0438          }
0439          else
0440          {
0441              o = _synchronousQueue.take();
0442          }
0443          return o;
0444      }
0445 
0446     public Message receiveNoWait() throws JMSException
0447     {
0448         checkPreConditions();
0449 
0450         try
0451         {
0452             if (!acquireReceiving(true))
0453             {
0454                 //If we couldn't acquire the receiving thread then return null.
0455                 // This will occur if failing over.
0456                 return null;
0457             }
0458         }
0459         catch (InterruptedException e)
0460         {
0461             /*
0462              *  This seems slightly shoddy but should never actually be executed
0463              *  since we told acquireReceiving to return immediately and it shouldn't
0464              *  block on anything.
0465              */
0466 
0467             return null;
0468         }
0469 
0470         _session.startDispatcherIfNecessary();
0471 
0472         try
0473         {
0474             Object o = getMessageFromQueue(-1);
0475             final AbstractJMSMessage m = returnMessageOrThrow(o);
0476             if (m != null)
0477             {
0478                 preApplicationProcessing(m);
0479                 postDeliver(m);
0480             }
0481 
0482             return m;
0483         }
0484         catch (InterruptedException e)
0485         {
0486             _logger.warn("Interrupted: " + e);
0487 
0488             return null;
0489         }
0490         finally
0491         {
0492             releaseReceiving();
0493         }
0494     }
0495 
0496     /**
0497      * We can get back either a Message or an exception from the queue. This method examines the argument and deals with
0498      * it by throwing it (if an exception) or returning it (in any other case).
0499      *
0500      @param o the object to return or throw
0501      @return a message only if o is a Message
0502      @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a
0503      *                      JMSException is created with the linked exception set appropriately
0504      */
0505     private AbstractJMSMessage returnMessageOrThrow(Object othrows JMSException
0506     {
0507         // errors are passed via the queue too since there is no way of interrupting the poll() via the API.
0508         if (instanceof Throwable)
0509         {
0510             JMSException e = new JMSException("Message consumer forcibly closed due to error: " + o);
0511             if (instanceof Exception)
0512             {
0513                 e.setLinkedException((Exceptiono);
0514             }
0515 
0516             throw e;
0517         }
0518         else if (instanceof CloseConsumerMessage)
0519         {
0520             _closed.set(true);
0521             deregisterConsumer();
0522             return null;
0523         }
0524         else
0525         {
0526             return (AbstractJMSMessageo;
0527         }
0528     }
0529 
0530     public void close() throws JMSException
0531     {
0532         close(true);
0533     }
0534 
0535     public void close(boolean sendClosethrows JMSException
0536     {
0537         if (_logger.isInfoEnabled())
0538         {
0539             _logger.info("Closing consumer:" + debugIdentity());
0540         }
0541 
0542         if (!_closed.getAndSet(true))
0543         {
0544             if (_logger.isDebugEnabled())
0545             {
0546                 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
0547                 if (_closedStack != null)
0548                 {
0549                     _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
0550                 }
0551                 else
0552                 {
0553                     _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
0554                 }
0555             }
0556 
0557             if (sendClose)
0558             {
0559                 // The Synchronized block only needs to protect network traffic.
0560                 synchronized (_connection.getFailoverMutex())
0561                 {
0562                     try
0563                     {
0564                         sendCancel();
0565                     }
0566                     catch (AMQException e)
0567                     {
0568                         throw new JMSAMQException("Error closing consumer: " + e, e);
0569                     }
0570                     catch (FailoverException e)
0571                     {
0572                         throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
0573                     }
0574                 }
0575             }
0576             else
0577             {
0578               // FIXME: wow this is ugly
0579                 // //fixme this probably is not right
0580                 // if (!isNoConsume())
0581                 // done in BasicCancelOK Handler but not sending one so just deregister.
0582                     deregisterConsumer();
0583                 }
0584             }
0585 
0586             // This will occur if session.close is called closing all consumers we may be blocked waiting for a receive
0587             // so we need to let it know it is time to close.
0588             if ((_messageListener != null&& _receiving.get())
0589             {
0590                 if (_logger.isInfoEnabled())
0591                 {
0592                     _logger.info("Interrupting thread: " + _receivingThread);
0593                 }
0594 
0595                 _receivingThread.interrupt();
0596             }
0597         }
0598     }
0599 
0600     abstract void sendCancel() throws AMQException, FailoverException;
0601 
0602     /**
0603      * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
0604      * vetoed automatic resubscription. The caller must hold the failover mutex.
0605      */
0606     void markClosed()
0607     {
0608         // synchronized (_closed)
0609         {
0610             _closed.set(true);
0611 
0612             if (_logger.isDebugEnabled())
0613             {
0614                 if (_closedStack != null)
0615                 {
0616                     StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
0617                     _logger.debug(_consumerTag + " markClosed():"
0618                                   + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
0619                     _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
0620                 }
0621                 else
0622                 {
0623                   StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
0624                     _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
0625                 }
0626             }
0627         }
0628 
0629         deregisterConsumer();
0630     }
0631 
0632     /**
0633      @param closeMessage
0634      *            this message signals that we should close the browser
0635      */
0636     public void notifyCloseMessage(CloseConsumerMessage closeMessage)
0637     {
0638         if (isMessageListenerSet())
0639         {
0640             // Currently only possible to get this msg type with a browser.
0641             // If we get the message here then we should probably just close
0642             // this consumer.
0643             // Though an AutoClose consumer with message listener is quite odd..
0644             // Just log out the fact so we know where we are
0645             _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
0646         }
0647         else
0648         {
0649             try
0650             {
0651                 _synchronousQueue.put(closeMessage);
0652             }
0653             catch (InterruptedException e)
0654             {
0655                 _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing,"
0656                         "but we shouldn't have close yet");
0657             }
0658         }
0659     }
0660 
0661     
0662     /**
0663      * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case of a
0664      * message listener or a synchronous receive() caller.
0665      *
0666      @param messageFrame the raw unprocessed mesage
0667      */
0668     void notifyMessage(U messageFrame)
0669     {
0670         if (messageFrame instanceof CloseConsumerMessage)
0671         {
0672             notifyCloseMessage((CloseConsumerMessagemessageFrame);
0673             return;
0674         }
0675 
0676 
0677 
0678         try
0679         {
0680             AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame);
0681 
0682             if (_logger.isDebugEnabled())
0683             {
0684                 _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
0685             }
0686             // synchronized (_closed)
0687 
0688             {
0689                 // if (!_closed.get())
0690                 {
0691 
0692                     //preDeliver(jmsMessage);
0693 
0694                     notifyMessage(jmsMessage);
0695                 }
0696                 // else
0697                 // {
0698                 // _logger.error("MESSAGE REJECTING!");
0699                 // _session.rejectMessage(jmsMessage, true);
0700                 // //_logger.error("MESSAGE JUST DROPPED!");
0701                 // }
0702             }
0703         }
0704         catch (Exception e)
0705         {
0706             if (instanceof InterruptedException)
0707             {
0708                 _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
0709             }
0710             else
0711             {
0712                 _logger.error("Caught exception (dump follows) - ignoring...", e);
0713             }
0714         }
0715     }
0716 
0717     public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, U messageFrame)
0718             throws Exception;
0719 
0720     /** @param jmsMessage this message has already been processed so can't redo preDeliver */
0721     public void notifyMessage(AbstractJMSMessage jmsMessage)
0722     {
0723         try
0724         {
0725             if (isMessageListenerSet())
0726             {
0727                 preApplicationProcessing(jmsMessage);
0728                 getMessageListener().onMessage(jmsMessage);
0729                 postDeliver(jmsMessage);
0730             }
0731             else
0732             {
0733                 // we should not be allowed to add a message is the
0734                 // consumer is closed
0735                 _synchronousQueue.put(jmsMessage);
0736             }
0737         }
0738         catch (Exception e)
0739         {
0740             if (instanceof InterruptedException)
0741             {
0742                 _logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing");
0743             }
0744             else
0745             {
0746                 _logger.error("reNotification : Caught exception (dump follows) - ignoring...", e);
0747             }
0748         }
0749     }
0750 
0751     void preDeliver(AbstractJMSMessage msg)
0752     {
0753         switch (_acknowledgeMode)
0754         {
0755 
0756             case Session.PRE_ACKNOWLEDGE:
0757                 _session.acknowledgeMessage(msg.getDeliveryTag()false);
0758                 break;
0759 
0760             case Session.CLIENT_ACKNOWLEDGE:
0761                 // we set the session so that when the user calls acknowledge() it can call the method on session
0762                 // to send out the appropriate frame
0763                 msg.setAMQSession(_session);
0764                 break;
0765             case Session.SESSION_TRANSACTED:
0766                 if (isNoConsume())
0767                 {
0768                     _session.acknowledgeMessage(msg.getDeliveryTag()false);
0769                 }
0770                 else
0771                 {
0772                     _session.addDeliveredMessage(msg.getDeliveryTag());
0773                 }
0774 
0775                 break;
0776         }
0777     }
0778 
0779     void postDeliver(AbstractJMSMessage msgthrows JMSException
0780     {
0781         msg.setJMSDestination(_destination);
0782         switch (_acknowledgeMode)
0783         {
0784 
0785             case Session.CLIENT_ACKNOWLEDGE:
0786                 if (isNoConsume())
0787                 {
0788                     _session.acknowledgeMessage(msg.getDeliveryTag()false);
0789                 }
0790                 _session.markDirty();
0791                 break;
0792 
0793             case Session.DUPS_OK_ACKNOWLEDGE:
0794             case Session.AUTO_ACKNOWLEDGE:
0795                 // we do not auto ack a message if the application code called recover()
0796                 if (!_session.isInRecovery())
0797                 {
0798                     _session.acknowledgeMessage(msg.getDeliveryTag()false);
0799                 }
0800 
0801                 break;
0802         }
0803     }
0804 
0805 
0806     /**
0807      * Acknowledge up to last message delivered (if any). Used when commiting.
0808      *
0809      @return the lastDeliveryTag to acknowledge
0810      */
0811     Long getLastDelivered()
0812     {
0813         if (!_receivedDeliveryTags.isEmpty())
0814         {
0815             Long lastDeliveryTag = _receivedDeliveryTags.poll();
0816 
0817             while (!_receivedDeliveryTags.isEmpty())
0818             {
0819                 lastDeliveryTag = _receivedDeliveryTags.poll();
0820             }
0821 
0822             assert _receivedDeliveryTags.isEmpty();
0823 
0824             return lastDeliveryTag;
0825         }
0826 
0827         return null;
0828     }
0829 
0830     /**
0831      * Acknowledge up to last message delivered (if any). Used when commiting.
0832      */
0833     void acknowledgeDelivered()
0834     {
0835         synchronized(_commitLock)
0836         {
0837             ArrayList<Long> tagsToAck = new ArrayList<Long>();
0838 
0839             while (!_receivedDeliveryTags.isEmpty())
0840             {
0841                 tagsToAck.add(_receivedDeliveryTags.poll());
0842             }
0843 
0844             Collections.sort(tagsToAck);
0845 
0846             long prevAcked = _lastAcked;
0847             long oldAckPoint = -1;
0848 
0849             while(oldAckPoint != prevAcked)
0850             {
0851                 oldAckPoint = prevAcked;
0852 
0853                 Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
0854 
0855                 while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
0856                 {
0857                     tagsToAckIterator.remove();
0858                     prevAcked++;
0859                 }
0860 
0861                 Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
0862                 while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
0863                 {
0864                     previousAckIterator.remove();
0865                     prevAcked++;
0866                 }
0867 
0868             }
0869             if(prevAcked != _lastAcked)
0870             {
0871                 _session.acknowledgeMessage(prevAcked, true);
0872                 _lastAcked = prevAcked;
0873             }
0874 
0875             Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
0876 
0877             while(tagsToAckIterator.hasNext())
0878             {
0879                 Long tag = tagsToAckIterator.next();
0880                 _session.acknowledgeMessage(tag, false);
0881                 _previouslyAcked.add(tag);
0882             }
0883         }
0884     }
0885 
0886 
0887     void notifyError(Throwable cause)
0888     {
0889         // synchronized (_closed)
0890         {
0891             _closed.set(true);
0892             if (_logger.isDebugEnabled())
0893             {
0894                 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
0895                 if (_closedStack != null)
0896                 {
0897                     _logger.debug(_consumerTag + " notifyError():"
0898                                   + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
0899                     _logger.debug(_consumerTag + " previously" + _closedStack.toString());
0900                 }
0901                 else
0902                 {
0903                     _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
0904                 }
0905             }
0906         }
0907         // QPID-293 can "request redelivery of this error through dispatcher"
0908 
0909         // we have no way of propagating the exception to a message listener - a JMS limitation - so we
0910         // deal with the case where we have a synchronous receive() waiting for a message to arrive
0911         if (!isMessageListenerSet())
0912         {
0913             // offer only succeeds if there is a thread waiting for an item from the queue
0914             if (_synchronousQueue.offer(cause))
0915             {
0916                 _logger.debug("Passed exception to synchronous queue for propagation to receive()");
0917             }
0918         }
0919 
0920         deregisterConsumer();
0921     }
0922 
0923     /**
0924      * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean case and in
0925      * the case of an error occurring.
0926      */
0927     private void deregisterConsumer()
0928     {
0929         _session.deregisterConsumer(this);
0930     }
0931 
0932     public int getConsumerTag()
0933     {
0934         return _consumerTag;
0935     }
0936 
0937     public void setConsumerTag(int consumerTag)
0938     {
0939         _consumerTag = consumerTag;
0940     }
0941 
0942     public AMQSession getSession()
0943     {
0944         return _session;
0945     }
0946 
0947     private void checkPreConditions() throws JMSException
0948     {
0949 
0950         this.checkNotClosed();
0951 
0952         if ((_session == null|| _session.isClosed())
0953         {
0954             throw new javax.jms.IllegalStateException("Invalid Session");
0955         }
0956     }
0957 
0958     public boolean isAutoClose()
0959     {
0960         return _autoClose;
0961     }
0962 
0963     public boolean isNoConsume()
0964     {
0965         return _noConsume;
0966     }
0967 
0968     public void rollback()
0969     {
0970         rollbackPendingMessages();
0971     }
0972 
0973     public void rollbackPendingMessages()
0974     {
0975         if (_synchronousQueue.size() 0)
0976         {
0977             if (_logger.isDebugEnabled())
0978             {
0979                 _logger.debug("Rejecting the messages(" + _synchronousQueue
0980                         .size() ") in _syncQueue (PRQ)" "for consumer with tag:" + _consumerTag);
0981             }
0982 
0983             Iterator iterator = _synchronousQueue.iterator();
0984 
0985             int initialSize = _synchronousQueue.size();
0986 
0987             boolean removed = false;
0988             while (iterator.hasNext())
0989             {
0990 
0991                 Object o = iterator.next();
0992                 if (instanceof AbstractJMSMessage)
0993                 {
0994                     _session.rejectMessage(((AbstractJMSMessageo)true);
0995 
0996                     if (_logger.isDebugEnabled())
0997                     {
0998                         _logger.debug("Rejected message:" ((AbstractJMSMessageo).getDeliveryTag());
0999                     }
1000 
1001                     iterator.remove();
1002                     removed = true;
1003 
1004                 }
1005                 else
1006                 {
1007                     _logger.error("Queue contained a :" + o.getClass()
1008                                   " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
1009                     iterator.remove();
1010                     removed = true;
1011                 }
1012                 }
1013 
1014             if (removed && (initialSize == _synchronousQueue.size()))
1015             {
1016                 _logger.error("Queue had content removed but didn't change in size." + initialSize);
1017             }
1018 
1019 
1020             if (_synchronousQueue.size() != 0)
1021             {
1022                 _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size());
1023                 rollback();
1024             }
1025 
1026             clearReceiveQueue();
1027         }
1028     }
1029 
1030     public String debugIdentity()
1031     {
1032         return String.valueOf(_consumerTag"[" + System.identityHashCode(this"]";
1033     }
1034 
1035     public void clearReceiveQueue()
1036     {
1037         _synchronousQueue.clear();
1038     }
1039 
1040     public void start()
1041     {
1042         // do nothing as this is a 0_10 feature
1043     }
1044 
1045 
1046     public void stop()
1047     {
1048         // do nothing as this is a 0_10 feature
1049     }
1050 
1051     public boolean isStrated()
1052     {
1053         // do nothing as this is a 0_10 feature
1054         return false;
1055     }
1056 
1057     public AMQShortString getQueuename()
1058     {
1059         return _queuename;
1060     }
1061 
1062     public void setQueuename(AMQShortString queuename)
1063     {
1064         this._queuename = queuename;
1065     }
1066 
1067     public void addBindingKey(AMQDestination amqd, String routingKeythrows AMQException
1068     {
1069         _session.addBindingKey(this,amqd,routingKey);
1070     }
1071 
1072     /** to be called when a failover has occured */
1073     public void failedOver()
1074     {
1075         clearReceiveQueue();
1076         // TGM FIXME: think this should just be removed
1077         // clearUnackedMessages();
1078     }
1079 }