001 /* Licensed to the Apache Software Foundation (ASF) under one
002 * or more contributor license agreements. See the NOTICE file
003 * distributed with this work for additional information
004 * regarding copyright ownership. The ASF licenses this file
005 * to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance
007 * with the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing,
012 * software distributed under the License is distributed on an
013 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
014 * KIND, either express or implied. See the License for the
015 * specific language governing permissions and limitations
016 * under the License.
017 */
018 package org.apache.qpid.client;
019
020 import org.slf4j.Logger;
021 import org.slf4j.LoggerFactory;
022 import org.apache.qpid.client.message.*;
023 import org.apache.qpid.client.protocol.AMQProtocolHandler;
024 import org.apache.qpid.framing.FieldTable;
025 import org.apache.qpid.AMQException;
026 import org.apache.qpid.protocol.AMQConstant;
027 import org.apache.qpid.transport.*;
028 import org.apache.qpid.QpidException;
029 import org.apache.qpid.filter.MessageFilter;
030 import org.apache.qpid.filter.JMSSelectorFilter;
031
032 import javax.jms.InvalidSelectorException;
033 import javax.jms.JMSException;
034 import javax.jms.MessageListener;
035 import java.util.Iterator;
036 import java.util.concurrent.atomic.AtomicBoolean;
037
038 /**
039 * This is a 0.10 message consumer.
040 */
041 public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10>
042 {
043
044 /**
045 * This class logger
046 */
047 protected final Logger _logger = LoggerFactory.getLogger(getClass());
048
049 /**
050 * The message selector filter associated with this consumer message selector
051 */
052 private MessageFilter _filter = null;
053
054 /**
055 * The underlying QpidSession
056 */
057 private AMQSession_0_10 _0_10session;
058
059 /**
060 * Indicates whether this consumer receives pre-acquired messages
061 */
062 private boolean _preAcquire = true;
063
064 /**
065 * Indicate whether this consumer is started.
066 */
067 private boolean _isStarted = false;
068
069 /**
070 * Specify whether this consumer is performing a sync receive
071 */
072 private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
073 private String _consumerTagString;
074
075 //--- constructor
076 protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
077 String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
078 AMQSession session, AMQProtocolHandler protocolHandler,
079 FieldTable arguments, int prefetchHigh, int prefetchLow,
080 boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
081 throws JMSException
082 {
083 super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
084 arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
085 _0_10session = (AMQSession_0_10) session;
086 if (messageSelector != null && !messageSelector.equals(""))
087 {
088 try
089 {
090 _filter = new JMSSelectorFilter(messageSelector);
091 }
092 catch (QpidException e)
093 {
094 throw new InvalidSelectorException("cannot create consumer because of selector issue");
095 }
096 if (destination instanceof AMQQueue)
097 {
098 _preAcquire = false;
099 }
100 }
101 _isStarted = connection.started();
102 }
103
104
105 @Override public void setConsumerTag(int consumerTag)
106 {
107 super.setConsumerTag(consumerTag);
108 _consumerTagString = String.valueOf(consumerTag);
109 }
110
111 public String getConsumerTagString()
112 {
113 return _consumerTagString;
114 }
115
116 /**
117 *
118 * This is invoked by the session thread when emptying the session message queue.
119 * We first check if the message is valid (match the selector) and then deliver it to the
120 * message listener or to the sync consumer queue.
121 *
122 * @param jmsMessage this message has already been processed so can't redo preDeliver
123 */
124 @Override public void notifyMessage(AbstractJMSMessage jmsMessage)
125 {
126 boolean messageOk = false;
127 try
128 {
129 messageOk = checkPreConditions(jmsMessage);
130 }
131 catch (AMQException e)
132 {
133 _logger.error("Receivecd an Exception when receiving message",e);
134 try
135 {
136
137 getSession().getAMQConnection().getExceptionListener()
138 .onException(new JMSAMQException("Error when receiving message", e));
139 }
140 catch (Exception e1)
141 {
142 // we should silently log thie exception as it only hanppens when the connection is closed
143 _logger.error("Exception when receiving message", e1);
144 }
145 }
146 if (messageOk)
147 {
148 if (isMessageListenerSet() && ! getSession().prefetch())
149 {
150 _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
151 MessageCreditUnit.MESSAGE, 1);
152 }
153 _logger.debug("messageOk, trying to notify");
154 super.notifyMessage(jmsMessage);
155 }
156 }
157
158 //----- overwritten methods
159
160 /**
161 * This method is invoked when this consumer is stopped.
162 * It tells the broker to stop delivering messages to this consumer.
163 */
164 @Override void sendCancel() throws AMQException
165 {
166 ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString());
167 ((AMQSession_0_10) getSession()).getQpidSession().sync();
168 // confirm cancel
169 getSession().confirmConsumerCancelled(getConsumerTag());
170 ((AMQSession_0_10) getSession()).getCurrentException();
171 }
172
173 @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
174 {
175
176 super.notifyMessage(messageFrame);
177 }
178
179 @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
180 {
181 super.preApplicationProcessing(jmsMsg);
182 if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
183 {
184 _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
185 }
186 }
187
188 @Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
189 AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception
190 {
191 AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession());
192 return _messageFactory.createMessage(msg.getMessageTransfer());
193 }
194
195 // private methods
196 /**
197 * Check whether a message can be delivered to this consumer.
198 *
199 * @param message The message to be checked.
200 * @return true if the message matches the selector and can be acquired, false otherwise.
201 * @throws AMQException If the message preConditions cannot be checked due to some internal error.
202 */
203 private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException
204 {
205 boolean messageOk = true;
206 // TODO Use a tag for fiding out if message filtering is done here or by the broker.
207 try
208 {
209 if (_messageSelector != null && !_messageSelector.equals(""))
210 {
211 messageOk = _filter.matches(message);
212 }
213 }
214 catch (Exception e)
215 {
216 throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e);
217 }
218
219 if (_logger.isDebugEnabled())
220 {
221 _logger.debug("messageOk " + messageOk);
222 _logger.debug("_preAcquire " + _preAcquire);
223 }
224 if (!messageOk)
225 {
226 if (_preAcquire)
227 {
228 // this is the case for topics
229 // We need to ack this message
230 if (_logger.isDebugEnabled())
231 {
232 _logger.debug("filterMessage - trying to ack message");
233 }
234 acknowledgeMessage(message);
235 }
236 else
237 {
238 if (_logger.isDebugEnabled())
239 {
240 _logger.debug("Message not OK, releasing");
241 }
242 releaseMessage(message);
243 }
244 // if we are syncrhonously waiting for a message
245 // and messages are not prefetched we then need to request another one
246 if(! getSession().prefetch())
247 {
248 _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
249 MessageCreditUnit.MESSAGE, 1);
250 }
251 }
252 // now we need to acquire this message if needed
253 // this is the case of queue with a message selector set
254 if (!_preAcquire && messageOk && !isNoConsume())
255 {
256 if (_logger.isDebugEnabled())
257 {
258 _logger.debug("filterMessage - trying to acquire message");
259 }
260 messageOk = acquireMessage(message);
261 _logger.debug("filterMessage - *************************************");
262 _logger.debug("filterMessage - message acquire status : " + messageOk);
263 _logger.debug("filterMessage - *************************************");
264 }
265 return messageOk;
266 }
267
268
269 /**
270 * Acknowledge a message
271 *
272 * @param message The message to be acknowledged
273 * @throws AMQException If the message cannot be acquired due to some internal error.
274 */
275 private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
276 {
277 if (!_preAcquire)
278 {
279 RangeSet ranges = new RangeSet();
280 ranges.add((int) message.getDeliveryTag());
281 _0_10session.messageAcknowledge
282 (ranges,
283 _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
284 _0_10session.getCurrentException();
285 }
286 }
287
288 /**
289 * Release a message
290 *
291 * @param message The message to be released
292 * @throws AMQException If the message cannot be released due to some internal error.
293 */
294 private void releaseMessage(AbstractJMSMessage message) throws AMQException
295 {
296 if (_preAcquire)
297 {
298 RangeSet ranges = new RangeSet();
299 ranges.add((int) message.getDeliveryTag());
300 _0_10session.getQpidSession().messageRelease(ranges);
301 _0_10session.getCurrentException();
302 }
303 }
304
305 /**
306 * Acquire a message
307 *
308 * @param message The message to be acquired
309 * @return true if the message has been acquired, false otherwise.
310 * @throws AMQException If the message cannot be acquired due to some internal error.
311 */
312 private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
313 {
314 boolean result = false;
315 if (!_preAcquire)
316 {
317 RangeSet ranges = new RangeSet();
318 ranges.add((int) message.getDeliveryTag());
319
320 Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
321
322 RangeSet acquired = acq.getTransfers();
323 if (acquired != null && acquired.size() > 0)
324 {
325 result = true;
326 }
327 }
328 return result;
329 }
330
331
332 public void setMessageListener(final MessageListener messageListener) throws JMSException
333 {
334 super.setMessageListener(messageListener);
335 if (messageListener != null && ! getSession().prefetch())
336 {
337 _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
338 MessageCreditUnit.MESSAGE, 1);
339 }
340 if (messageListener != null && !_synchronousQueue.isEmpty())
341 {
342 Iterator messages=_synchronousQueue.iterator();
343 while (messages.hasNext())
344 {
345 AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
346 messages.remove();
347 _session.rejectMessage(message, true);
348 }
349 }
350 }
351
352 public boolean isStrated()
353 {
354 return _isStarted;
355 }
356
357 public void start()
358 {
359 _isStarted = true;
360 if (_syncReceive.get())
361 {
362 _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
363 MessageCreditUnit.MESSAGE, 1);
364 }
365 }
366
367 public void stop()
368 {
369 _isStarted = false;
370 }
371
372 /**
373 * When messages are not prefetched we need to request a message from the
374 * broker.
375 * Note that if the timeout is too short a message may be queued in _synchronousQueue until
376 * this consumer closes or request it.
377 * @param l
378 * @return
379 * @throws InterruptedException
380 */
381 public Object getMessageFromQueue(long l) throws InterruptedException
382 {
383 if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
384 {
385 _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
386 MessageCreditUnit.MESSAGE, 1);
387 }
388 if (! getSession().prefetch())
389 {
390 _syncReceive.set(true);
391 }
392 Object o = super.getMessageFromQueue(l);
393 if (! getSession().prefetch())
394 {
395 _syncReceive.set(false);
396 }
397 return o;
398 }
399
400 void postDeliver(AbstractJMSMessage msg) throws JMSException
401 {
402 super.postDeliver(msg);
403 if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
404 {
405 _session.acknowledgeMessage(msg.getDeliveryTag(), false);
406 }
407 }
408
409 }
|