NonTransactionalContext.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.txn;
022 
023 import java.util.List;
024 
025 import org.apache.log4j.Logger;
026 import org.apache.qpid.AMQException;
027 import org.apache.qpid.server.AMQChannel;
028 import org.apache.qpid.server.RequiredDeliveryException;
029 import org.apache.qpid.server.transactionlog.TransactionLog;
030 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
031 import org.apache.qpid.server.protocol.AMQProtocolSession;
032 import org.apache.qpid.server.queue.*;
033 import org.apache.qpid.server.store.StoreContext;
034 
035 /** @author Apache Software Foundation */
036 public class NonTransactionalContext implements TransactionalContext
037 {
038     private static final Logger _log = Logger.getLogger(NonTransactionalContext.class);
039 
040     /** Channel is useful for logging */
041     private final AMQChannel _channel;
042 
043     /** Where to put undeliverable messages */
044     private final List<RequiredDeliveryException> _returnMessages;
045 
046 
047 
048     private final TransactionLog _transactionLog;
049 
050     private final StoreContext _storeContext;
051 
052     /** Whether we are in a transaction */
053     private boolean _inTran;
054 
055     public NonTransactionalContext(TransactionLog transactionLog, StoreContext storeContext, AMQChannel channel,
056                                    List<RequiredDeliveryException> returnMessages)
057     {
058         _channel = channel;
059         _storeContext = storeContext;
060         _returnMessages = returnMessages;
061         _transactionLog = transactionLog;
062 
063     }
064 
065 
066     public StoreContext getStoreContext()
067     {
068         return _storeContext;
069     }
070 
071     public void beginTranIfNecessary() throws AMQException
072     {
073         if (!_inTran)
074         {
075             _transactionLog.beginTran(_storeContext);
076             _inTran = true;
077         }
078     }
079 
080     public void commit() throws AMQException
081     {
082         // Does not apply to this context
083     }
084 
085     public void rollback() throws AMQException
086     {
087         // Does not apply to this context
088     }
089 
090     public void deliver(final AMQQueue queue, AMQMessage messagethrows AMQException
091     {
092         QueueEntry entry = queue.enqueue(_storeContext, message);
093         
094         //following check implements the functionality
095         //required by the 'immediate' flag:
096         if(entry.immediateAndNotDelivered())
097         {
098             _returnMessages.add(new NoConsumersException(entry.getMessage()));
099         }
100 
101     }
102 
103     public void requeue(QueueEntry entrythrows AMQException
104     {
105         entry.requeue(_storeContext);
106     }
107 
108     public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag,
109                                    boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap)
110             throws AMQException
111     {
112 
113         final boolean debug = _log.isDebugEnabled();
114         ;
115         if (multiple)
116         {
117             if (deliveryTag == 0)
118             {
119 
120                 //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero,
121                 // tells the server to acknowledge all outstanding mesages.
122                 _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" +
123                           unacknowledgedMessageMap.size());
124                 unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
125                 {
126                     public boolean callback(final long deliveryTag, QueueEntry queueEntrythrows AMQException
127                     {
128                         if (debug)
129                         {
130                             _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
131                         }
132                         if(queueEntry.getMessage().isPersistent())
133                         {
134                             beginTranIfNecessary();
135                         }
136                         //Message has been ack so dequeueAndDelete it.
137                         queueEntry.dequeueAndDelete(_storeContext);
138 
139                         return false;
140                     }
141 
142                     public void visitComplete()
143                     {
144                         unacknowledgedMessageMap.clear();
145                     }
146                 });
147             }
148             else
149             {
150                 if (!unacknowledgedMessageMap.contains(deliveryTag))
151                 {
152                     throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
153                 }
154 
155                 unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext);
156             }
157         }
158         else
159         {
160             QueueEntry queueEntry;
161             queueEntry = unacknowledgedMessageMap.get(deliveryTag);
162 
163             if (debug)
164             {
165                 _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
166             }
167 
168             if (queueEntry == null)
169             {
170                 _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
171                           _channel.getChannelId());
172                 throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
173                                        _channel.getChannelId());
174             }
175 
176             if (debug)
177             {
178                 _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
179             }
180             if(queueEntry.getMessage().isPersistent())
181             {
182                 beginTranIfNecessary();
183             }
184 
185             //Message has been ack so dequeueAndDelete it.
186             // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
187             // from the transaciton log
188             queueEntry.dequeueAndDelete(_storeContext);
189 
190             unacknowledgedMessageMap.remove(deliveryTag);
191 
192 
193         }
194         if(_inTran)
195         {
196             _transactionLog.commitTran(_storeContext);
197             _inTran = false;
198         }
199     }
200 
201     public void messageFullyReceived(boolean persistentthrows AMQException
202     {
203         if (persistent)
204         {
205             _transactionLog.commitTran(_storeContext);
206             _inTran = false;
207         }
208     }
209 
210     public void messageProcessed(AMQProtocolSession protocolSessionthrows AMQException
211     {
212         _channel.processReturns();
213     }
214 }