001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server;
022
023 import java.util.Collection;
024 import java.util.HashMap;
025 import java.util.LinkedHashMap;
026 import java.util.LinkedList;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.concurrent.atomic.AtomicBoolean;
030
031 import org.apache.log4j.Logger;
032 import org.apache.qpid.AMQException;
033 import org.apache.qpid.framing.AMQShortString;
034 import org.apache.qpid.framing.ContentBody;
035 import org.apache.qpid.framing.ContentHeaderBody;
036 import org.apache.qpid.framing.FieldTable;
037 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
038 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
039 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
040 import org.apache.qpid.server.exchange.Exchange;
041 import org.apache.qpid.server.exchange.NoRouteException;
042 import org.apache.qpid.server.flow.FlowCreditManager;
043 import org.apache.qpid.server.flow.Pre0_10CreditManager;
044 import org.apache.qpid.server.protocol.AMQProtocolSession;
045 import org.apache.qpid.server.queue.AMQMessage;
046 import org.apache.qpid.server.queue.AMQQueue;
047 import org.apache.qpid.server.queue.IncomingMessage;
048 import org.apache.qpid.server.queue.QueueEntry;
049 import org.apache.qpid.server.queue.UnauthorizedAccessException;
050 import org.apache.qpid.server.queue.AMQMessage;
051 import org.apache.qpid.server.subscription.Subscription;
052 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
053 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
054 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
055 import org.apache.qpid.server.store.StoreContext;
056 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
057 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
058 import org.apache.qpid.server.subscription.Subscription;
059 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
060 import org.apache.qpid.server.txn.LocalTransactionalContext;
061 import org.apache.qpid.server.txn.NonTransactionalContext;
062 import org.apache.qpid.server.txn.TransactionalContext;
063 import org.apache.qpid.server.transactionlog.TransactionLog;
064
065 public class AMQChannel
066 {
067 public static final int DEFAULT_PREFETCH = 5000;
068
069 private static final Logger _log = Logger.getLogger(AMQChannel.class);
070
071 private final int _channelId;
072
073
074 private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l);
075
076 /**
077 * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
078 * value of this represents the <b>last</b> tag sent out
079 */
080 private long _deliveryTag = 0;
081
082 /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
083 private AMQQueue _defaultQueue;
084
085 /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
086 private int _consumerTag;
087
088 /**
089 * The current message - which may be partial in the sense that not all frames have been received yet - which has
090 * been received by this channel. As the frames are received the message gets updated and once all frames have been
091 * received the message can then be routed.
092 */
093 private IncomingMessage _currentMessage;
094
095 /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
096 protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
097
098 private final TransactionLog _transactionLog;
099
100 private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
101
102 private final AtomicBoolean _suspended = new AtomicBoolean(false);
103
104 private TransactionalContext _txnContext;
105
106 /**
107 * A context used by the message store enabling it to track context for a given channel even across thread
108 * boundaries
109 */
110 private final StoreContext _storeContext;
111
112 private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
113
114 // Why do we need this reference ? - ritchiem
115 private final AMQProtocolSession _session;
116 private boolean _closing;
117
118 public AMQChannel(AMQProtocolSession session, int channelId, TransactionLog transactionLog)
119 throws AMQException
120 {
121 _session = session;
122 _channelId = channelId;
123 _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
124
125
126 _transactionLog = transactionLog;
127
128 // by default the session is non-transactional
129 _txnContext = new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
130 }
131
132 /** Sets this channel to be part of a local transaction */
133 public void setLocalTransactional()
134 {
135 _txnContext = new LocalTransactionalContext(this);
136 }
137
138 public boolean isTransactional()
139 {
140 // this does not look great but there should only be one "non-transactional"
141 // transactional context, while there could be several transactional ones in
142 // theory
143 return !(_txnContext instanceof NonTransactionalContext);
144 }
145
146 public int getChannelId()
147 {
148 return _channelId;
149 }
150
151 public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException
152 {
153
154 _currentMessage = new IncomingMessage(info, _txnContext, _session, _transactionLog);
155 _currentMessage.setExchange(e);
156 }
157
158 public void publishContentHeader(ContentHeaderBody contentHeaderBody)
159 throws AMQException
160 {
161 if (_currentMessage == null)
162 {
163 throw new AMQException("Received content header without previously receiving a BasicPublish frame");
164 }
165 else
166 {
167 if (_log.isDebugEnabled())
168 {
169 _log.debug("Content header received on channel " + _channelId);
170 }
171
172 _currentMessage.setContentHeaderBody(contentHeaderBody);
173
174 _currentMessage.setExpiration();
175
176 routeCurrentMessage();
177
178 _currentMessage.routingComplete(_transactionLog);
179
180 deliverCurrentMessageIfComplete();
181
182 }
183 }
184
185 private void deliverCurrentMessageIfComplete()
186 throws AMQException
187 {
188 // check and deliver if header says body length is zero
189 if (_currentMessage.allContentReceived())
190 {
191 try
192 {
193 _currentMessage.deliverToQueues();
194 }
195 catch (NoRouteException e)
196 {
197 _returnMessages.add(e);
198 }
199 catch(UnauthorizedAccessException ex)
200 {
201 _returnMessages.add(ex);
202 }
203 finally
204 {
205 // callback to allow the context to do any post message processing
206 // primary use is to allow message return processing in the non-tx case
207 _txnContext.messageProcessed(_session);
208 _currentMessage = null;
209 }
210 }
211
212 }
213
214 public void publishContentBody(ContentBody contentBody) throws AMQException
215 {
216 if (_currentMessage == null)
217 {
218 throw new AMQException("Received content body without previously receiving a JmsPublishBody");
219 }
220
221 if (_log.isDebugEnabled())
222 {
223 _log.debug(debugIdentity() + "Content body received on channel " + _channelId);
224 }
225
226 try
227 {
228
229 // returns true iff the message was delivered (i.e. if all data was
230 // received
231 _currentMessage.addContentBodyFrame(
232 _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
233 contentBody));
234
235 deliverCurrentMessageIfComplete();
236 }
237 catch (AMQException e)
238 {
239 // we want to make sure we don't keep a reference to the message in the
240 // event of an error
241 _currentMessage = null;
242 throw e;
243 }
244 }
245
246 protected void routeCurrentMessage() throws AMQException
247 {
248 try
249 {
250 _currentMessage.route();
251 }
252 catch (NoRouteException e)
253 {
254 //_currentMessage.takeReference();
255 _returnMessages.add(e);
256 }
257 }
258
259 public long getNextDeliveryTag()
260 {
261 return ++_deliveryTag;
262 }
263
264 public int getNextConsumerTag()
265 {
266 return ++_consumerTag;
267 }
268
269 /**
270 * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
271 * up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
272 *
273 * @param tag the tag chosen by the client (if null, server will generate one)
274 * @param queue the queue to subscribe to
275 * @param acks Are acks enabled for this subscriber
276 * @param filters Filters to apply to this subscriber
277 *
278 * @param noLocal Flag stopping own messages being receivied.
279 * @param exclusive Flag requesting exclusive access to the queue
280 * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
281 *
282 * @throws ConsumerTagNotUniqueException if the tag is not unique
283 * @throws AMQException if something goes wrong
284 */
285 public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
286 FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
287 {
288 if (tag == null)
289 {
290 tag = new AMQShortString("sgen_" + getNextConsumerTag());
291 }
292
293 if (_tag2SubscriptionMap.containsKey(tag))
294 {
295 throw new ConsumerTagNotUniqueException();
296 }
297
298 Subscription subscription =
299 SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
300
301
302 // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
303 // We add before we register as the Async Delivery process may AutoClose the subscriber
304 // so calling _cT2QM.remove before we have done put which was after the register succeeded.
305 // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
306
307 _tag2SubscriptionMap.put(tag, subscription);
308
309 try
310 {
311 queue.registerSubscription(subscription, exclusive);
312 }
313 catch (AMQException e)
314 {
315 _tag2SubscriptionMap.remove(tag);
316 throw e;
317 }
318 return tag;
319 }
320
321 /**
322 * Unsubscribe a consumer from a queue.
323 * @param consumerTag
324 * @return true if the consumerTag had a mapped queue that could be unregistered.
325 * @throws AMQException
326 */
327 public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
328 {
329
330 Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
331 if (sub != null)
332 {
333 try
334 {
335 sub.getSendLock();
336 sub.getQueue().unregisterSubscription(sub);
337 }
338 finally
339 {
340 sub.releaseSendLock();
341 }
342 return true;
343 }
344 else
345 {
346 _log.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered.");
347 }
348 return false;
349 }
350
351 /**
352 * Called from the protocol session to close this channel and clean up. T
353 *
354 * @throws AMQException if there is an error during closure
355 */
356 public void close() throws AMQException
357 {
358 _txnContext.rollback();
359 unsubscribeAllConsumers();
360 try
361 {
362 requeue();
363 }
364 catch (AMQException e)
365 {
366 _log.error("Caught AMQException whilst attempting to reque:" + e);
367 }
368
369 setClosing(true);
370 }
371
372 private void setClosing(boolean closing)
373 {
374 _closing = closing;
375 }
376
377 private void unsubscribeAllConsumers() throws AMQException
378 {
379 if (_log.isInfoEnabled())
380 {
381 if (!_tag2SubscriptionMap.isEmpty())
382 {
383 _log.info("Unsubscribing all consumers on channel " + toString());
384 }
385 else
386 {
387 _log.info("No consumers to unsubscribe on channel " + toString());
388 }
389 }
390
391 for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
392 {
393 if (_log.isInfoEnabled())
394 {
395 _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
396 }
397
398 Subscription sub = me.getValue();
399
400 try
401 {
402 sub.getSendLock();
403 sub.getQueue().unregisterSubscription(sub);
404 }
405 finally
406 {
407 sub.releaseSendLock();
408 }
409
410 }
411
412 _tag2SubscriptionMap.clear();
413 }
414
415 /**
416 * Add a message to the channel-based list of unacknowledged messages
417 *
418 * @param entry the record of the message on the queue that was delivered
419 * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
420 * delivery tag)
421 * @param subscription The consumer that is to acknowledge this message.
422 */
423 public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
424 {
425 if (_log.isDebugEnabled())
426 {
427 if (entry.getQueue() == null)
428 {
429 _log.debug("Adding unacked message with a null queue:" + entry.debugIdentity());
430 }
431 else
432 {
433 if (_log.isDebugEnabled())
434 {
435 _log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
436 + ") with a queue(" + entry.getQueue() + ") for " + subscription);
437 }
438 }
439 }
440
441 _unacknowledgedMessageMap.add(deliveryTag, entry);
442
443 }
444
445 private final String id = "(" + System.identityHashCode(this) + ")";
446
447 public String debugIdentity()
448 {
449 return _channelId + id;
450 }
451
452 /**
453 * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to
454 * this same channel or to other subscribers.
455 *
456 * @throws org.apache.qpid.AMQException if the requeue fails
457 */
458 public void requeue() throws AMQException
459 {
460 // we must create a new map since all the messages will get a new delivery tag when they are redelivered
461 Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
462
463 // Deliver these messages out of the transaction as their delivery was never
464 // part of the transaction only the receive.
465 TransactionalContext deliveryContext = null;
466
467 if (!messagesToBeDelivered.isEmpty())
468 {
469 if (_log.isInfoEnabled())
470 {
471 _log.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString());
472 }
473
474 if (!(_txnContext instanceof NonTransactionalContext))
475 {
476
477 deliveryContext =
478 new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
479 }
480 else
481 {
482 deliveryContext = _txnContext;
483 }
484 }
485
486 for (QueueEntry unacked : messagesToBeDelivered)
487 {
488 if (!unacked.isQueueDeleted())
489 {
490 // Mark message redelivered
491 unacked.setRedelivered(true);
492
493 // Ensure message is released for redelivery
494 unacked.release();
495
496 // Deliver Message
497 deliveryContext.requeue(unacked);
498
499 }
500 else
501 {
502 unacked.dequeueAndDelete(_storeContext);
503 }
504 }
505
506 }
507
508 /**
509 * Requeue a single message
510 *
511 * @param deliveryTag The message to requeue
512 *
513 * @throws AMQException If something goes wrong.
514 */
515 public void requeue(long deliveryTag) throws AMQException
516 {
517 QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag);
518
519 if (unacked != null)
520 {
521 // Mark message redelivered
522 unacked.setRedelivered(true);
523
524 // Ensure message is released for redelivery
525 if (!unacked.isQueueDeleted())
526 {
527 unacked.release();
528 }
529
530
531 // Deliver these messages out of the transaction as their delivery was never
532 // part of the transaction only the receive.
533 TransactionalContext deliveryContext;
534 if (!(_txnContext instanceof NonTransactionalContext))
535 {
536
537 deliveryContext =
538 new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
539
540 }
541 else
542 {
543 deliveryContext = _txnContext;
544 }
545
546 if (!unacked.isQueueDeleted())
547 {
548 // Redeliver the messages to the front of the queue
549 deliveryContext.requeue(unacked);
550 // Deliver increments the message count but we have already deliverted this once so don't increment it again
551 // this was because deliver did an increment changed this.
552 }
553 else
554 {
555 _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
556 + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
557
558 unacked.dequeueAndDelete(_storeContext);
559 }
560 }
561 else
562 {
563 _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
564 + _unacknowledgedMessageMap.size());
565
566 }
567
568 }
569
570 /**
571 * Called to resend all outstanding unacknowledged messages to this same channel.
572 *
573 * @param requeue Are the messages to be requeued or dropped.
574 *
575 * @throws AMQException When something goes wrong.
576 */
577 public void resend(final boolean requeue) throws AMQException
578 {
579
580
581 final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
582 final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
583
584 if (_log.isDebugEnabled())
585 {
586 _log.debug("unacked map Size:" + _unacknowledgedMessageMap.size());
587 }
588
589 // Process the Unacked-Map.
590 // Marking messages who still have a consumer for to be resent
591 // and those that don't to be requeued.
592 _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
593 msgToResend, requeue, _storeContext));
594
595
596 // Process Messages to Resend
597 if (_log.isDebugEnabled())
598 {
599 if (!msgToResend.isEmpty())
600 {
601 _log.debug("Preparing (" + msgToResend.size() + ") message to resend.");
602 }
603 else
604 {
605 _log.debug("No message to resend.");
606 }
607 }
608
609 for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
610 {
611 QueueEntry queueEntry = entry.getValue();
612 long deliveryTag = entry.getKey();
613
614 AMQQueue queue = queueEntry.getQueue();
615
616 // Our Java Client will always suspend the channel when resending!
617 // If the client has requested the messages be resent then it is
618 // their responsibility to ensure that thay are capable of receiving them
619 // i.e. The channel hasn't been server side suspended.
620 // if (isSuspended())
621 // {
622 // _log.info("Channel is suspended so requeuing");
623 // //move this message to requeue
624 // msgToRequeue.add(message);
625 // }
626 // else
627 // {
628 // release to allow it to be delivered
629
630 // Without any details from the client about what has been processed we have to mark
631 // all messages in the unacked map as redelivered.
632 queueEntry.setRedelivered(true);
633
634 Subscription sub = queueEntry.getDeliveredSubscription();
635
636 if (sub != null)
637 {
638
639 if(!queue.resend(queueEntry, sub))
640 {
641 msgToRequeue.put(deliveryTag, queueEntry);
642 }
643 }
644 else
645 {
646
647 if (_log.isInfoEnabled())
648 {
649 _log.info("DeliveredSubscription not recorded so just requeueing(" + queueEntry.toString()
650 + ")to prevent loss");
651 }
652 // move this message to requeue
653 msgToRequeue.put(deliveryTag, queueEntry);
654 }
655 } // for all messages
656 // } else !isSuspend
657
658 if (_log.isInfoEnabled())
659 {
660 if (!msgToRequeue.isEmpty())
661 {
662 _log.info("Preparing (" + msgToRequeue.size() + ") message to requeue to.");
663 }
664 }
665
666 // Deliver these messages out of the transaction as their delivery was never
667 // part of the transaction only the receive.
668 TransactionalContext deliveryContext;
669 if (!(_txnContext instanceof NonTransactionalContext))
670 {
671
672 deliveryContext =
673 new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
674 }
675 else
676 {
677 deliveryContext = _txnContext;
678 }
679
680 // Process Messages to Requeue at the front of the queue
681 for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
682 {
683 QueueEntry message = entry.getValue();
684 long deliveryTag = entry.getKey();
685
686 message.release();
687 message.setRedelivered(true);
688
689 deliveryContext.requeue(message);
690
691 _unacknowledgedMessageMap.remove(deliveryTag);
692 }
693 }
694
695 /**
696 * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged messages to
697 * remove the queue reference and also decrement any message reference counts, without actually removing the item
698 * since we may get an ack for a delivery tag that was generated from the deleted queue.
699 *
700 * @param queue the queue that has been deleted
701 *
702 */
703 /* public void queueDeleted(final AMQQueue queue)
704 {
705 try
706 {
707 _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
708 {
709 public boolean callback(UnacknowledgedMessage message)
710 {
711 if (message.getQueue() == queue)
712 {
713 try
714 {
715 message.dequeueAndDelete(_storeContext);
716 message.setQueueDeleted(true);
717
718 }
719 catch (AMQException e)
720 {
721 _log.error(
722 "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e);
723 throw new RuntimeException(e);
724 }
725 }
726
727 return false;
728 }
729
730 public void visitComplete()
731 {
732 }
733 });
734 }
735 catch (AMQException e)
736 {
737 _log.error("Unexpected Error while handling deletion of queue", e);
738 throw new RuntimeException(e);
739 }
740
741 }
742 */
743 /**
744 * Acknowledge one or more messages.
745 *
746 * @param deliveryTag the last delivery tag
747 * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only
748 * acknowledges the single message specified by the delivery tag
749 *
750 * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
751 */
752 public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
753 {
754 _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
755 }
756
757 /**
758 * Used only for testing purposes.
759 *
760 * @return the map of unacknowledged messages
761 */
762 public UnacknowledgedMessageMap getUnacknowledgedMessageMap()
763 {
764 return _unacknowledgedMessageMap;
765 }
766
767
768 public void setSuspended(boolean suspended)
769 {
770
771
772 boolean wasSuspended = _suspended.getAndSet(suspended);
773 if (wasSuspended != suspended)
774 {
775 if (wasSuspended)
776 {
777 // may need to deliver queued messages
778 for (Subscription s : _tag2SubscriptionMap.values())
779 {
780 s.getQueue().deliverAsync(s);
781 }
782 }
783 }
784 }
785
786 public boolean isSuspended()
787 {
788 return _suspended.get();
789 }
790
791 public void commit() throws AMQException
792 {
793 if (!isTransactional())
794 {
795 throw new AMQException("Fatal error: commit called on non-transactional channel");
796 }
797
798 _txnContext.commit();
799 }
800
801 public void rollback() throws AMQException
802 {
803 _txnContext.rollback();
804 }
805
806 public String toString()
807 {
808 return "["+_session.toString()+":"+_channelId+"]";
809 }
810
811 public void setDefaultQueue(AMQQueue queue)
812 {
813 _defaultQueue = queue;
814 }
815
816 public AMQQueue getDefaultQueue()
817 {
818 return _defaultQueue;
819 }
820
821 public StoreContext getStoreContext()
822 {
823 return _storeContext;
824 }
825
826 public void processReturns() throws AMQException
827 {
828 if (!_returnMessages.isEmpty())
829 {
830 for (RequiredDeliveryException bouncedMessage : _returnMessages)
831 {
832 AMQMessage message = bouncedMessage.getAMQMessage();
833 _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
834 new AMQShortString(bouncedMessage.getMessage()));
835 }
836
837 _returnMessages.clear();
838 }
839 }
840
841
842 public TransactionalContext getTransactionalContext()
843 {
844 return _txnContext;
845 }
846
847 public boolean isClosing()
848 {
849 return _closing;
850 }
851
852 public AMQProtocolSession getProtocolSession()
853 {
854 return _session;
855 }
856
857 public FlowCreditManager getCreditManager()
858 {
859 return _creditManager;
860 }
861
862 public void setCredit(final long prefetchSize, final int prefetchCount)
863 {
864 _creditManager.setCreditLimits(prefetchSize, prefetchCount);
865 }
866
867 public List<RequiredDeliveryException> getReturnMessages()
868 {
869 return _returnMessages;
870 }
871
872 public TransactionLog getTransactionLog()
873 {
874 return _transactionLog;
875 }
876
877 private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod()
878 {
879
880 public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
881 throws AMQException
882 {
883 getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag());
884 }
885 };
886
887 public ClientDeliveryMethod getClientDeliveryMethod()
888 {
889 return _clientDeliveryMethod;
890 }
891
892 private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
893 {
894
895 public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
896 {
897 addUnacknowledgedMessage(entry, deliveryTag, sub);
898 }
899 };
900
901 public RecordDeliveryMethod getRecordDeliveryMethod()
902 {
903 return _recordDeliveryMethod;
904 }
905 }
|