AMQChannel.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server;
022 
023 import java.util.Collection;
024 import java.util.HashMap;
025 import java.util.LinkedHashMap;
026 import java.util.LinkedList;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.concurrent.atomic.AtomicBoolean;
030 
031 import org.apache.log4j.Logger;
032 import org.apache.qpid.AMQException;
033 import org.apache.qpid.framing.AMQShortString;
034 import org.apache.qpid.framing.ContentBody;
035 import org.apache.qpid.framing.ContentHeaderBody;
036 import org.apache.qpid.framing.FieldTable;
037 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
038 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
039 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
040 import org.apache.qpid.server.exchange.Exchange;
041 import org.apache.qpid.server.exchange.NoRouteException;
042 import org.apache.qpid.server.flow.FlowCreditManager;
043 import org.apache.qpid.server.flow.Pre0_10CreditManager;
044 import org.apache.qpid.server.protocol.AMQProtocolSession;
045 import org.apache.qpid.server.queue.AMQMessage;
046 import org.apache.qpid.server.queue.AMQQueue;
047 import org.apache.qpid.server.queue.IncomingMessage;
048 import org.apache.qpid.server.queue.QueueEntry;
049 import org.apache.qpid.server.queue.UnauthorizedAccessException;
050 import org.apache.qpid.server.queue.AMQMessage;
051 import org.apache.qpid.server.subscription.Subscription;
052 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
053 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
054 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
055 import org.apache.qpid.server.store.StoreContext;
056 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
057 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
058 import org.apache.qpid.server.subscription.Subscription;
059 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
060 import org.apache.qpid.server.txn.LocalTransactionalContext;
061 import org.apache.qpid.server.txn.NonTransactionalContext;
062 import org.apache.qpid.server.txn.TransactionalContext;
063 import org.apache.qpid.server.transactionlog.TransactionLog;
064 
065 public class AMQChannel
066 {
067     public static final int DEFAULT_PREFETCH = 5000;
068 
069     private static final Logger _log = Logger.getLogger(AMQChannel.class);
070 
071     private final int _channelId;
072 
073 
074     private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l);
075 
076     /**
077      * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
078      * value of this represents the <b>last</b> tag sent out
079      */
080     private long _deliveryTag = 0;
081 
082     /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
083     private AMQQueue _defaultQueue;
084 
085     /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
086     private int _consumerTag;
087 
088     /**
089      * The current message - which may be partial in the sense that not all frames have been received yet - which has
090      * been received by this channel. As the frames are received the message gets updated and once all frames have been
091      * received the message can then be routed.
092      */
093     private IncomingMessage _currentMessage;
094 
095     /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
096     protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
097 
098     private final TransactionLog _transactionLog;
099 
100     private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
101 
102     private final AtomicBoolean _suspended = new AtomicBoolean(false);
103 
104     private TransactionalContext _txnContext;
105 
106     /**
107      * A context used by the message store enabling it to track context for a given channel even across thread
108      * boundaries
109      */
110     private final StoreContext _storeContext;
111 
112     private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
113 
114     // Why do we need this reference ? - ritchiem
115     private final AMQProtocolSession _session;
116     private boolean _closing;
117 
118     public AMQChannel(AMQProtocolSession session, int channelId, TransactionLog transactionLog)
119             throws AMQException
120     {
121         _session = session;
122         _channelId = channelId;
123         _storeContext = new StoreContext("Session: " + session.getClientIdentifier() "; channel: " + channelId);
124 
125 
126         _transactionLog = transactionLog;
127 
128         // by default the session is non-transactional
129         _txnContext = new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
130     }
131 
132     /** Sets this channel to be part of a local transaction */
133     public void setLocalTransactional()
134     {
135         _txnContext = new LocalTransactionalContext(this);
136     }
137 
138     public boolean isTransactional()
139     {
140         // this does not look great but there should only be one "non-transactional"
141         // transactional context, while there could be several transactional ones in
142         // theory
143         return !(_txnContext instanceof NonTransactionalContext);
144     }
145 
146     public int getChannelId()
147     {
148         return _channelId;
149     }
150 
151     public void setPublishFrame(MessagePublishInfo info, final Exchange ethrows AMQException
152     {
153 
154         _currentMessage = new IncomingMessage(info, _txnContext, _session, _transactionLog);
155         _currentMessage.setExchange(e);
156     }
157 
158     public void publishContentHeader(ContentHeaderBody contentHeaderBody)
159             throws AMQException
160     {
161         if (_currentMessage == null)
162         {
163             throw new AMQException("Received content header without previously receiving a BasicPublish frame");
164         }
165         else
166         {
167             if (_log.isDebugEnabled())
168             {
169                 _log.debug("Content header received on channel " + _channelId);
170             }
171 
172             _currentMessage.setContentHeaderBody(contentHeaderBody);
173 
174             _currentMessage.setExpiration();
175 
176             routeCurrentMessage();
177 
178             _currentMessage.routingComplete(_transactionLog);
179 
180             deliverCurrentMessageIfComplete();
181 
182         }
183     }
184 
185     private void deliverCurrentMessageIfComplete()
186             throws AMQException
187     {
188         // check and deliver if header says body length is zero
189         if (_currentMessage.allContentReceived())
190         {
191             try
192             {
193                 _currentMessage.deliverToQueues();
194             }
195             catch (NoRouteException e)
196             {
197                 _returnMessages.add(e);
198             }
199             catch(UnauthorizedAccessException ex)
200             {
201                 _returnMessages.add(ex);
202             }
203             finally
204             {
205                 // callback to allow the context to do any post message processing
206                 // primary use is to allow message return processing in the non-tx case
207                 _txnContext.messageProcessed(_session);
208                 _currentMessage = null;
209             }
210         }
211 
212     }
213 
214     public void publishContentBody(ContentBody contentBodythrows AMQException
215     {
216         if (_currentMessage == null)
217         {
218             throw new AMQException("Received content body without previously receiving a JmsPublishBody");
219         }
220 
221         if (_log.isDebugEnabled())
222         {
223             _log.debug(debugIdentity() "Content body received on channel " + _channelId);
224         }
225 
226         try
227         {
228 
229             // returns true iff the message was delivered (i.e. if all data was
230             // received
231             _currentMessage.addContentBodyFrame(
232                     _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
233                             contentBody));
234 
235             deliverCurrentMessageIfComplete();
236         }
237         catch (AMQException e)
238         {
239             // we want to make sure we don't keep a reference to the message in the
240             // event of an error
241             _currentMessage = null;
242             throw e;
243         }
244     }
245 
246     protected void routeCurrentMessage() throws AMQException
247     {
248         try
249         {
250             _currentMessage.route();            
251         }
252         catch (NoRouteException e)
253         {
254             //_currentMessage.takeReference();
255             _returnMessages.add(e);
256         }
257     }
258 
259     public long getNextDeliveryTag()
260     {
261         return ++_deliveryTag;
262     }
263 
264     public int getNextConsumerTag()
265     {
266         return ++_consumerTag;
267     }
268 
269     /**
270      * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
271      * up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
272      *
273      @param tag       the tag chosen by the client (if null, server will generate one)
274      @param queue     the queue to subscribe to
275      @param acks      Are acks enabled for this subscriber
276      @param filters   Filters to apply to this subscriber
277      *
278      @param noLocal   Flag stopping own messages being receivied.
279      @param exclusive Flag requesting exclusive access to the queue
280      @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
281      *
282      @throws ConsumerTagNotUniqueException if the tag is not unique
283      @throws AMQException                  if something goes wrong
284      */
285     public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
286                                            FieldTable filters, boolean noLocal, boolean exclusivethrows AMQException, ConsumerTagNotUniqueException
287     {
288         if (tag == null)
289         {
290             tag = new AMQShortString("sgen_" + getNextConsumerTag());
291         }
292 
293         if (_tag2SubscriptionMap.containsKey(tag))
294         {
295             throw new ConsumerTagNotUniqueException();
296         }
297 
298          Subscription subscription =
299                 SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
300 
301 
302         // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
303         // We add before we register as the Async Delivery process may AutoClose the subscriber
304         // so calling _cT2QM.remove before we have done put which was after the register succeeded.
305         // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
306 
307         _tag2SubscriptionMap.put(tag, subscription);
308 
309         try
310         {
311             queue.registerSubscription(subscription, exclusive);
312         }
313         catch (AMQException e)
314         {
315             _tag2SubscriptionMap.remove(tag);
316             throw e;
317         }
318         return tag;
319     }
320 
321     /**
322      * Unsubscribe a consumer from a queue.
323      @param consumerTag
324      @return true if the consumerTag had a mapped queue that could be unregistered.
325      @throws AMQException
326      */
327     public boolean unsubscribeConsumer(AMQShortString consumerTagthrows AMQException
328     {
329 
330         Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
331         if (sub != null)
332         {
333             try 
334             {
335                 sub.getSendLock();
336                 sub.getQueue().unregisterSubscription(sub);
337             }
338             finally 
339             {
340                 sub.releaseSendLock();
341             }
342             return true;
343         }
344         else
345         {
346             _log.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered.");
347         }
348         return false;
349     }
350 
351     /**
352      * Called from the protocol session to close this channel and clean up. T
353      *
354      @throws AMQException if there is an error during closure
355      */
356     public void close() throws AMQException
357     {
358         _txnContext.rollback();
359         unsubscribeAllConsumers();
360         try
361         {
362             requeue();
363         }
364         catch (AMQException e)
365         {
366             _log.error("Caught AMQException whilst attempting to reque:" + e);        
367         }
368 
369         setClosing(true);
370     }
371 
372     private void setClosing(boolean closing)
373     {
374         _closing = closing;
375     }
376 
377     private void unsubscribeAllConsumers() throws AMQException
378     {
379         if (_log.isInfoEnabled())
380         {
381             if (!_tag2SubscriptionMap.isEmpty())
382             {
383                 _log.info("Unsubscribing all consumers on channel " + toString());
384             }
385             else
386             {
387                 _log.info("No consumers to unsubscribe on channel " + toString());
388             }
389         }
390 
391         for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
392         {
393             if (_log.isInfoEnabled())
394             {
395                 _log.info("Unsubscribing consumer '" + me.getKey() "' on channel " + toString());
396             }
397 
398             Subscription sub = me.getValue();
399 
400             try
401             {
402                 sub.getSendLock();
403                 sub.getQueue().unregisterSubscription(sub);
404             }
405             finally
406             {
407                 sub.releaseSendLock();
408             }
409             
410         }
411 
412         _tag2SubscriptionMap.clear();
413     }
414 
415     /**
416      * Add a message to the channel-based list of unacknowledged messages
417      *
418      @param entry       the record of the message on the queue that was delivered
419      @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
420      *                    delivery tag)
421      @param subscription The consumer that is to acknowledge this message.
422      */
423     public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
424     {
425         if (_log.isDebugEnabled())
426         {
427             if (entry.getQueue() == null)
428             {
429                 _log.debug("Adding unacked message with a null queue:" + entry.debugIdentity());
430             }
431             else
432             {
433                 if (_log.isDebugEnabled())
434                 {
435                     _log.debug(debugIdentity() " Adding unacked message(" + entry.getMessage().toString() " DT:" + deliveryTag
436                                ") with a queue(" + entry.getQueue() ") for " + subscription);
437                 }
438             }
439         }
440 
441         _unacknowledgedMessageMap.add(deliveryTag, entry);
442 
443     }
444 
445     private final String id = "(" + System.identityHashCode(this")";
446 
447     public String debugIdentity()
448     {
449         return _channelId + id;
450     }
451 
452     /**
453      * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to
454      * this same channel or to other subscribers.
455      *
456      @throws org.apache.qpid.AMQException if the requeue fails
457      */
458     public void requeue() throws AMQException
459     {
460         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
461         Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
462 
463         // Deliver these messages out of the transaction as their delivery was never
464         // part of the transaction only the receive.
465         TransactionalContext deliveryContext = null;
466 
467         if (!messagesToBeDelivered.isEmpty())
468         {
469             if (_log.isInfoEnabled())
470             {
471                 _log.info("Requeuing " + messagesToBeDelivered.size() " unacked messages. for " + toString());
472             }
473 
474             if (!(_txnContext instanceof NonTransactionalContext))
475             {
476 
477                     deliveryContext =
478                             new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
479             }
480             else
481             {
482                 deliveryContext = _txnContext;
483             }
484         }
485 
486         for (QueueEntry unacked : messagesToBeDelivered)
487         {
488             if (!unacked.isQueueDeleted())
489             {
490                 // Mark message redelivered
491                 unacked.setRedelivered(true);
492 
493                 // Ensure message is released for redelivery
494                 unacked.release();
495 
496                 // Deliver Message
497                 deliveryContext.requeue(unacked);
498 
499             }
500             else
501             {
502                 unacked.dequeueAndDelete(_storeContext);
503             }
504         }
505 
506     }
507 
508     /**
509      * Requeue a single message
510      *
511      @param deliveryTag The message to requeue
512      *
513      @throws AMQException If something goes wrong.
514      */
515     public void requeue(long deliveryTagthrows AMQException
516     {
517         QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag);
518 
519         if (unacked != null)
520         {
521             // Mark message redelivered
522             unacked.setRedelivered(true);
523 
524             // Ensure message is released for redelivery
525             if (!unacked.isQueueDeleted())
526             {
527                 unacked.release();
528             }
529 
530 
531             // Deliver these messages out of the transaction as their delivery was never
532             // part of the transaction only the receive.
533             TransactionalContext deliveryContext;
534             if (!(_txnContext instanceof NonTransactionalContext))
535             {
536 
537                 deliveryContext =
538                             new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
539 
540             }
541             else
542             {
543                 deliveryContext = _txnContext;
544             }
545 
546             if (!unacked.isQueueDeleted())
547             {
548                 // Redeliver the messages to the front of the queue
549                 deliveryContext.requeue(unacked);
550                 // Deliver increments the message count but we have already deliverted this once so don't increment it again
551                 // this was because deliver did an increment changed this.
552             }
553             else
554             {
555                 _log.warn(System.identityHashCode(this" Requested requeue of message(" + unacked.getMessage().debugIdentity()
556                           "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
557 
558                 unacked.dequeueAndDelete(_storeContext);
559             }
560         }
561         else
562         {
563             _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
564                       + _unacknowledgedMessageMap.size());
565 
566         }
567 
568     }
569 
570     /**
571      * Called to resend all outstanding unacknowledged messages to this same channel.
572      *
573      @param requeue Are the messages to be requeued or dropped.
574      *
575      @throws AMQException When something goes wrong.
576      */
577     public void resend(final boolean requeuethrows AMQException
578     {
579 
580 
581         final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
582         final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
583 
584         if (_log.isDebugEnabled())
585         {
586             _log.debug("unacked map Size:" + _unacknowledgedMessageMap.size());
587         }
588 
589         // Process the Unacked-Map.
590         // Marking messages who still have a consumer for to be resent
591         // and those that don't to be requeued.
592         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
593                                                                     msgToResend, requeue, _storeContext));
594 
595 
596         // Process Messages to Resend
597         if (_log.isDebugEnabled())
598         {
599             if (!msgToResend.isEmpty())
600             {
601                 _log.debug("Preparing (" + msgToResend.size() ") message to resend.");
602             }
603             else
604             {
605                 _log.debug("No message to resend.");
606             }
607         }
608 
609         for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
610         {
611             QueueEntry queueEntry = entry.getValue();
612             long deliveryTag = entry.getKey();
613 
614             AMQQueue queue = queueEntry.getQueue();
615 
616             // Our Java Client will always suspend the channel when resending!
617             // If the client has requested the messages be resent then it is
618             // their responsibility to ensure that thay are capable of receiving them
619             // i.e. The channel hasn't been server side suspended.
620             // if (isSuspended())
621             // {
622             // _log.info("Channel is suspended so requeuing");
623             // //move this message to requeue
624             // msgToRequeue.add(message);
625             // }
626             // else
627             // {
628             // release to allow it to be delivered
629 
630             // Without any details from the client about what has been processed we have to mark
631             // all messages in the unacked map as redelivered.
632             queueEntry.setRedelivered(true);
633 
634             Subscription sub = queueEntry.getDeliveredSubscription();
635 
636             if (sub != null)
637             {
638                 
639                 if(!queue.resend(queueEntry, sub))
640                 {
641                     msgToRequeue.put(deliveryTag, queueEntry);
642                 }
643             }
644             else
645             {
646 
647                 if (_log.isInfoEnabled())
648                 {
649                     _log.info("DeliveredSubscription not recorded so just requeueing(" + queueEntry.toString()
650                               ")to prevent loss");
651                 }
652                 // move this message to requeue
653                 msgToRequeue.put(deliveryTag, queueEntry);
654             }
655         // for all messages
656         // } else !isSuspend
657 
658         if (_log.isInfoEnabled())
659         {
660             if (!msgToRequeue.isEmpty())
661             {
662                 _log.info("Preparing (" + msgToRequeue.size() ") message to requeue to.");
663             }
664         }
665 
666         // Deliver these messages out of the transaction as their delivery was never
667         // part of the transaction only the receive.
668         TransactionalContext deliveryContext;
669         if (!(_txnContext instanceof NonTransactionalContext))
670         {
671 
672             deliveryContext =
673                         new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
674         }
675         else
676         {
677             deliveryContext = _txnContext;
678         }
679 
680         // Process Messages to Requeue at the front of the queue
681         for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
682         {
683             QueueEntry message = entry.getValue();
684             long deliveryTag = entry.getKey();
685             
686             message.release();
687             message.setRedelivered(true);
688 
689             deliveryContext.requeue(message);
690 
691             _unacknowledgedMessageMap.remove(deliveryTag);
692         }
693     }
694 
695     /**
696      * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged messages to
697      * remove the queue reference and also decrement any message reference counts, without actually removing the item
698      * since we may get an ack for a delivery tag that was generated from the deleted queue.
699      *
700      @param queue the queue that has been deleted
701      *
702      */
703  /*   public void queueDeleted(final AMQQueue queue)
704     {
705         try
706         {
707             _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
708             {
709                 public boolean callback(UnacknowledgedMessage message)
710                 {
711                     if (message.getQueue() == queue)
712                     {
713                         try
714                         {
715                             message.dequeueAndDelete(_storeContext);
716                             message.setQueueDeleted(true);
717 
718                         }
719                         catch (AMQException e)
720                         {
721                             _log.error(
722                                     "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e);
723                             throw new RuntimeException(e);
724                         }
725                     }
726 
727                     return false;
728                 }
729 
730                 public void visitComplete()
731                 {
732                 }
733             });
734         }
735         catch (AMQException e)
736         {
737             _log.error("Unexpected Error while handling deletion of queue", e);
738             throw new RuntimeException(e);
739         }
740 
741     }
742 */
743     /**
744      * Acknowledge one or more messages.
745      *
746      @param deliveryTag the last delivery tag
747      @param multiple    if true will acknowledge all messages up to an including the delivery tag. if false only
748      *                    acknowledges the single message specified by the delivery tag
749      *
750      @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
751      */
752     public void acknowledgeMessage(long deliveryTag, boolean multiplethrows AMQException
753     {
754         _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
755     }
756 
757     /**
758      * Used only for testing purposes.
759      *
760      @return the map of unacknowledged messages
761      */
762     public UnacknowledgedMessageMap getUnacknowledgedMessageMap()
763     {
764         return _unacknowledgedMessageMap;
765     }
766 
767 
768     public void setSuspended(boolean suspended)
769     {
770 
771 
772         boolean wasSuspended = _suspended.getAndSet(suspended);
773         if (wasSuspended != suspended)
774         {
775             if (wasSuspended)
776             {
777                 // may need to deliver queued messages
778                 for (Subscription s : _tag2SubscriptionMap.values())
779                 {
780                     s.getQueue().deliverAsync(s);
781                 }
782             }
783         }
784     }
785 
786     public boolean isSuspended()
787     {
788         return _suspended.get();
789     }
790 
791     public void commit() throws AMQException
792     {
793         if (!isTransactional())
794         {
795             throw new AMQException("Fatal error: commit called on non-transactional channel");
796         }
797 
798         _txnContext.commit();
799     }
800 
801     public void rollback() throws AMQException
802     {
803         _txnContext.rollback();
804     }
805 
806     public String toString()
807     {
808         return "["+_session.toString()+":"+_channelId+"]";
809     }
810 
811     public void setDefaultQueue(AMQQueue queue)
812     {
813         _defaultQueue = queue;
814     }
815 
816     public AMQQueue getDefaultQueue()
817     {
818         return _defaultQueue;
819     }
820 
821     public StoreContext getStoreContext()
822     {
823         return _storeContext;
824     }
825 
826     public void processReturns() throws AMQException
827     {
828         if (!_returnMessages.isEmpty())
829         {
830             for (RequiredDeliveryException bouncedMessage : _returnMessages)
831             {
832                 AMQMessage message = bouncedMessage.getAMQMessage();
833                 _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
834                                                                  new AMQShortString(bouncedMessage.getMessage()));                
835             }
836 
837             _returnMessages.clear();
838         }
839     }
840 
841 
842     public TransactionalContext getTransactionalContext()
843     {
844         return _txnContext;
845     }
846 
847     public boolean isClosing()
848     {
849         return _closing;
850     }
851 
852     public AMQProtocolSession getProtocolSession()
853     {
854         return _session;
855     }
856 
857     public FlowCreditManager getCreditManager()
858     {
859         return _creditManager;
860     }
861 
862     public void setCredit(final long prefetchSize, final int prefetchCount)
863     {
864         _creditManager.setCreditLimits(prefetchSize, prefetchCount);
865     }
866 
867     public List<RequiredDeliveryException> getReturnMessages()
868     {
869         return _returnMessages;
870     }
871 
872     public TransactionLog getTransactionLog()
873     {
874         return _transactionLog;
875     }
876 
877     private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod()
878         {
879 
880             public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
881                     throws AMQException
882             {
883                getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag());
884             }
885         };
886 
887     public ClientDeliveryMethod getClientDeliveryMethod()
888     {
889         return _clientDeliveryMethod;
890     }
891 
892     private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
893         {
894 
895             public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
896             {
897                 addUnacknowledgedMessage(entry, deliveryTag, sub);
898             }
899         };
900 
901     public RecordDeliveryMethod getRecordDeliveryMethod()
902     {
903         return _recordDeliveryMethod;
904     }
905 }