BasicMessageConsumer_0_10.java
001 /* Licensed to the Apache Software Foundation (ASF) under one
002  * or more contributor license agreements.  See the NOTICE file
003  * distributed with this work for additional information
004  * regarding copyright ownership.  The ASF licenses this file
005  * to you under the Apache License, Version 2.0 (the
006  * "License"); you may not use this file except in compliance
007  * with the License.  You may obtain a copy of the License at
008  *
009  *   http://www.apache.org/licenses/LICENSE-2.0
010  *
011  * Unless required by applicable law or agreed to in writing,
012  * software distributed under the License is distributed on an
013  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
014  * KIND, either express or implied.  See the License for the
015  * specific language governing permissions and limitations
016  * under the License.
017  */
018 package org.apache.qpid.client;
019 
020 import org.slf4j.Logger;
021 import org.slf4j.LoggerFactory;
022 import org.apache.qpid.client.message.*;
023 import org.apache.qpid.client.protocol.AMQProtocolHandler;
024 import org.apache.qpid.framing.FieldTable;
025 import org.apache.qpid.AMQException;
026 import org.apache.qpid.protocol.AMQConstant;
027 import org.apache.qpid.transport.*;
028 import org.apache.qpid.QpidException;
029 import org.apache.qpid.filter.MessageFilter;
030 import org.apache.qpid.filter.JMSSelectorFilter;
031 
032 import javax.jms.InvalidSelectorException;
033 import javax.jms.JMSException;
034 import javax.jms.MessageListener;
035 import java.util.Iterator;
036 import java.util.concurrent.atomic.AtomicBoolean;
037 
038 /**
039  * This is a 0.10 message consumer.
040  */
041 public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10>
042 {
043 
044     /**
045      * This class logger
046      */
047     protected final Logger _logger = LoggerFactory.getLogger(getClass());
048 
049     /**
050      * The message selector filter associated with this consumer message selector
051      */
052     private MessageFilter _filter = null;
053 
054     /**
055      * The underlying QpidSession
056      */
057     private AMQSession_0_10 _0_10session;
058 
059     /**
060      * Indicates whether this consumer receives pre-acquired messages
061      */
062     private boolean _preAcquire = true;
063 
064     /**
065      * Indicate whether this consumer is started.
066      */
067     private boolean _isStarted = false;
068 
069     /**
070      * Specify whether this consumer is performing a sync receive
071      */
072     private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
073     private String _consumerTagString;
074 
075     //--- constructor
076     protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
077                                         String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
078                                         AMQSession session, AMQProtocolHandler protocolHandler,
079                                         FieldTable arguments, int prefetchHigh, int prefetchLow,
080                                         boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
081             throws JMSException
082     {
083         super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
084                 arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
085         _0_10session = (AMQSession_0_10session;
086         if (messageSelector != null && !messageSelector.equals(""))
087         {
088             try
089             {
090                 _filter = new JMSSelectorFilter(messageSelector);
091             }
092             catch (QpidException e)
093             {
094                 throw new InvalidSelectorException("cannot create consumer because of selector issue");
095             }
096             if (destination instanceof AMQQueue)
097             {
098                 _preAcquire = false;
099             }
100         }
101         _isStarted = connection.started();
102     }
103 
104 
105     @Override public void setConsumerTag(int consumerTag)
106     {
107         super.setConsumerTag(consumerTag);
108         _consumerTagString = String.valueOf(consumerTag);
109     }
110 
111     public String getConsumerTagString()
112     {
113         return _consumerTagString;
114     }
115 
116     /**
117      *
118      * This is invoked by the session thread when emptying the session message queue.
119      * We first check if the message is valid (match the selector) and then deliver it to the
120      * message listener or to the sync consumer queue.
121      *
122      @param jmsMessage this message has already been processed so can't redo preDeliver
123      */
124     @Override public void notifyMessage(AbstractJMSMessage jmsMessage)
125     {
126         boolean messageOk = false;
127         try
128         {
129             messageOk = checkPreConditions(jmsMessage);
130         }
131         catch (AMQException e)
132         {
133             _logger.error("Receivecd an Exception when receiving message",e);
134             try
135             {
136 
137                 getSession().getAMQConnection().getExceptionListener()
138                         .onException(new JMSAMQException("Error when receiving message", e));
139             }
140             catch (Exception e1)
141             {
142                 // we should silently log thie exception as it only hanppens when the connection is closed
143                 _logger.error("Exception when receiving message", e1);
144             }
145         }
146         if (messageOk)
147         {
148             if (isMessageListenerSet() && ! getSession().prefetch())
149             {
150                 _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
151                                                           MessageCreditUnit.MESSAGE, 1);
152             }
153             _logger.debug("messageOk, trying to notify");
154             super.notifyMessage(jmsMessage);
155         }
156     }
157 
158     //----- overwritten methods
159 
160     /**
161      * This method is invoked when this consumer is stopped.
162      * It tells the broker to stop delivering messages to this consumer.
163      */
164     @Override void sendCancel() throws AMQException
165     {
166         ((AMQSession_0_10getSession()).getQpidSession().messageCancel(getConsumerTagString());
167         ((AMQSession_0_10getSession()).getQpidSession().sync();
168         // confirm cancel
169         getSession().confirmConsumerCancelled(getConsumerTag());
170         ((AMQSession_0_10getSession()).getCurrentException();
171     }
172 
173     @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
174     {
175 
176         super.notifyMessage(messageFrame);
177     }
178 
179     @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsgthrows JMSException
180     {
181         super.preApplicationProcessing(jmsMsg);
182         if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
183         {
184             _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
185         }
186     }
187 
188     @Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
189             AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msgthrows Exception
190     {
191         AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader()((AMQSession_0_10)getSession()).getQpidSession());
192         return _messageFactory.createMessage(msg.getMessageTransfer());
193     }
194 
195     // private methods
196     /**
197      * Check whether a message can be delivered to this consumer.
198      *
199      @param message The message to be checked.
200      @return true if the message matches the selector and can be acquired, false otherwise.
201      @throws AMQException If the message preConditions cannot be checked due to some internal error.
202      */
203     private boolean checkPreConditions(AbstractJMSMessage messagethrows AMQException
204     {
205         boolean messageOk = true;
206         // TODO Use a tag for fiding out if message filtering is done here or by the broker.
207         try
208         {
209             if (_messageSelector != null && !_messageSelector.equals(""))
210             {
211                 messageOk = _filter.matches(message);
212             }
213         }
214         catch (Exception e)
215         {
216             throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e);
217         }
218 
219         if (_logger.isDebugEnabled())
220         {
221             _logger.debug("messageOk " + messageOk);
222             _logger.debug("_preAcquire " + _preAcquire);
223         }
224         if (!messageOk)
225         {
226             if (_preAcquire)
227             {
228                 // this is the case for topics
229                 // We need to ack this message
230                 if (_logger.isDebugEnabled())
231                 {
232                     _logger.debug("filterMessage - trying to ack message");
233                 }
234                 acknowledgeMessage(message);
235             }
236             else
237             {
238                 if (_logger.isDebugEnabled())
239                 {
240                     _logger.debug("Message not OK, releasing");
241                 }
242                 releaseMessage(message);
243             }
244             // if we are syncrhonously waiting for a message
245             // and messages are not prefetched we then need to request another one
246             if(! getSession().prefetch())
247             {
248                _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
249                                                          MessageCreditUnit.MESSAGE, 1);
250             }
251         }
252         // now we need to acquire this message if needed
253         // this is the case of queue with a message selector set
254         if (!_preAcquire && messageOk && !isNoConsume())
255         {
256             if (_logger.isDebugEnabled())
257             {
258                 _logger.debug("filterMessage - trying to acquire message");
259             }
260             messageOk = acquireMessage(message);
261             _logger.debug("filterMessage - *************************************");
262             _logger.debug("filterMessage - message acquire status : " + messageOk);
263             _logger.debug("filterMessage - *************************************");
264         }
265         return messageOk;
266     }
267 
268 
269     /**
270      * Acknowledge a message
271      *
272      @param message The message to be acknowledged
273      @throws AMQException If the message cannot be acquired due to some internal error.
274      */
275     private void acknowledgeMessage(AbstractJMSMessage messagethrows AMQException
276     {
277         if (!_preAcquire)
278         {
279             RangeSet ranges = new RangeSet();
280             ranges.add((intmessage.getDeliveryTag());
281             _0_10session.messageAcknowledge
282                 (ranges,
283                  _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
284             _0_10session.getCurrentException();
285         }
286     }
287 
288     /**
289      * Release a message
290      *
291      @param message The message to be released
292      @throws AMQException If the message cannot be released due to some internal error.
293      */
294     private void releaseMessage(AbstractJMSMessage messagethrows AMQException
295     {
296         if (_preAcquire)
297         {
298             RangeSet ranges = new RangeSet();
299             ranges.add((intmessage.getDeliveryTag());
300             _0_10session.getQpidSession().messageRelease(ranges);
301             _0_10session.getCurrentException();
302         }
303     }
304 
305     /**
306      * Acquire a message
307      *
308      @param message The message to be acquired
309      @return true if the message has been acquired, false otherwise.
310      @throws AMQException If the message cannot be acquired due to some internal error.
311      */
312     private boolean acquireMessage(AbstractJMSMessage messagethrows AMQException
313     {
314         boolean result = false;
315         if (!_preAcquire)
316         {
317             RangeSet ranges = new RangeSet();
318             ranges.add((intmessage.getDeliveryTag());
319 
320             Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
321 
322             RangeSet acquired = acq.getTransfers();
323             if (acquired != null && acquired.size() 0)
324             {
325                 result = true;
326             }
327         }
328         return result;
329     }
330 
331 
332     public void setMessageListener(final MessageListener messageListenerthrows JMSException
333     {
334         super.setMessageListener(messageListener);
335         if (messageListener != null && ! getSession().prefetch())
336         {
337             _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
338                                                       MessageCreditUnit.MESSAGE, 1);
339         }
340         if (messageListener != null && !_synchronousQueue.isEmpty())
341         {
342             Iterator messages=_synchronousQueue.iterator();
343             while (messages.hasNext())
344             {
345                 AbstractJMSMessage message=(AbstractJMSMessagemessages.next();
346                 messages.remove();
347                 _session.rejectMessage(message, true);
348             }
349         }
350     }
351 
352     public boolean isStrated()
353     {
354         return _isStarted;
355     }
356 
357     public void start()
358     {
359         _isStarted = true;
360         if (_syncReceive.get())
361         {
362             _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
363                                                       MessageCreditUnit.MESSAGE, 1);
364         }
365     }
366 
367     public void stop()
368     {
369         _isStarted = false;
370     }
371 
372     /**
373      * When messages are not prefetched we need to request a message from the
374      * broker.
375      * Note that if the timeout is too short a message may be queued in _synchronousQueue until
376      * this consumer closes or request it.
377      @param l
378      @return
379      @throws InterruptedException
380      */
381     public Object getMessageFromQueue(long lthrows InterruptedException
382     {
383         if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
384         {
385             _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
386                                                       MessageCreditUnit.MESSAGE, 1);
387         }
388         if (! getSession().prefetch())
389         {
390             _syncReceive.set(true);
391         }
392         Object o = super.getMessageFromQueue(l);
393         if (! getSession().prefetch())
394         {
395             _syncReceive.set(false);
396         }
397         return o;
398     }
399 
400     void postDeliver(AbstractJMSMessage msgthrows JMSException
401     {
402         super.postDeliver(msg);
403         if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
404         {
405           _session.acknowledgeMessage(msg.getDeliveryTag()false);
406         }
407     }
408 
409 }