LocalTransactionalContext.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.txn;
022 
023 import org.apache.log4j.Logger;
024 
025 import org.apache.qpid.AMQException;
026 import org.apache.qpid.server.RequiredDeliveryException;
027 import org.apache.qpid.server.AMQChannel;
028 import org.apache.qpid.server.transactionlog.TransactionLog;
029 import org.apache.qpid.server.ack.TxAck;
030 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
031 import org.apache.qpid.server.protocol.AMQProtocolSession;
032 import org.apache.qpid.server.queue.*;
033 import org.apache.qpid.server.store.StoreContext;
034 
035 import java.util.List;
036 import java.util.ArrayList;
037 
038 /** A transactional context that only supports local transactions. */
039 public class LocalTransactionalContext implements TransactionalContext
040 {
041     private static final Logger _log = Logger.getLogger(LocalTransactionalContext.class);
042 
043     private final TxnBuffer _txnBuffer = new TxnBuffer();
044 
045     private final List<DeliveryAction> _postCommitDeliveryList = new ArrayList<DeliveryAction>();
046 
047     /**
048      * We keep hold of the ack operation so that we can consolidate acks, i.e. multiple acks within a txn are
049      * consolidated into a single operation
050      */
051     private TxAck _ackOp;
052 
053     private boolean _inTran = false;
054 
055     /** Are there messages to deliver. NOT Has the message been delivered */
056     private boolean _messageDelivered = false;
057     private final AMQChannel _channel;
058 
059     private abstract class DeliveryAction
060     {
061 
062         abstract public void process() throws AMQException;
063 
064     }
065 
066     private class RequeueAction extends DeliveryAction
067     {
068         public QueueEntry entry;
069 
070         public RequeueAction(QueueEntry entry)
071         {
072             this.entry = entry;
073         }
074 
075         public void process() throws AMQException
076         {
077             entry.requeue(getStoreContext());
078         }
079     }
080 
081     private class PublishAction extends DeliveryAction
082     {
083         private final AMQQueue _queue;
084         private final AMQMessage _message;
085 
086         public PublishAction(final AMQQueue queue, final AMQMessage message)
087         {
088             _queue = queue;
089             _message = message;
090         }
091 
092         public void process() throws AMQException
093         {
094 
095                 QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
096 
097                 if(entry.immediateAndNotDelivered())
098                 {
099                     getReturnMessages().add(new NoConsumersException(_message));
100                 }
101         }
102     }
103 
104     public LocalTransactionalContext(final AMQChannel channel)
105     {
106         _channel = channel;
107     }
108 
109     public StoreContext getStoreContext()
110     {
111         return _channel.getStoreContext();
112     }
113 
114     public List<RequiredDeliveryException> getReturnMessages()
115     {
116         return _channel.getReturnMessages();
117     }
118 
119     public TransactionLog getTransactionLog()
120     {
121         return _channel.getTransactionLog();
122     }
123 
124 
125     public void rollback() throws AMQException
126     {
127         _txnBuffer.rollback(getStoreContext());
128         // Hack to deal with uncommitted non-transactional writes
129         if (getTransactionLog().inTran(getStoreContext()))
130         {
131             getTransactionLog().abortTran(getStoreContext());
132             _inTran = false;
133         }
134 
135         _postCommitDeliveryList.clear();
136     }
137 
138     public void deliver(final AMQQueue queue, AMQMessage messagethrows AMQException
139     {
140         // A publication will result in the enlisting of several
141         // TxnOps. The first is an op that will store the message.
142         // Following that (and ordering is important), an op will
143         // be added for every queue onto which the message is
144         // enqueued.
145         _postCommitDeliveryList.add(new PublishAction(queue, message));
146         _messageDelivered = true;
147 
148     }
149 
150     public void requeue(QueueEntry entrythrows AMQException
151     {
152         _postCommitDeliveryList.add(new RequeueAction(entry));
153         _messageDelivered = true;
154 
155     }
156 
157 
158     private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMapthrows AMQException
159     {
160         if (!unacknowledgedMessageMap.contains(deliveryTag))
161         {
162             throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
163         }
164     }
165 
166     public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
167         UnacknowledgedMessageMap unacknowledgedMessageMapthrows AMQException
168     {
169         // check that the tag exists to give early failure
170         if (!multiple || (deliveryTag > 0))
171         {
172             checkAck(deliveryTag, unacknowledgedMessageMap);
173         }
174         // we use a single txn op for all acks and update this op
175         // as new acks come in. If this is the first ack in the txn
176         // we will need to create and enlist the op.
177         if (_ackOp == null)
178         {            
179             _ackOp = new TxAck(unacknowledgedMessageMap);
180             _txnBuffer.enlist(_ackOp);
181         }
182         // update the op to include this ack request
183         if (multiple && (deliveryTag == 0))
184         {
185             // if have signalled to ack all, that refers only
186             // to all at this time
187             _ackOp.update(lastDeliveryTag, multiple);
188         }
189         else
190         {
191             _ackOp.update(deliveryTag, multiple);
192         }
193         if(!_inTran && _ackOp.checkPersistent())
194         {
195             beginTranIfNecessary();
196         }
197     }
198 
199     public void messageFullyReceived(boolean persistentthrows AMQException
200     {
201         // Not required in this transactional context
202     }
203 
204     public void messageProcessed(AMQProtocolSession protocolSessionthrows AMQException
205     {
206         // Not required in this transactional context
207     }
208 
209     public void beginTranIfNecessary() throws AMQException
210     {
211         if (!_inTran)
212         {
213             if (_log.isDebugEnabled())
214             {
215                 _log.debug("Starting transaction on message store: " this);
216             }
217 
218             getTransactionLog().beginTran(getStoreContext());
219             _inTran = true;
220         }
221     }
222 
223     public void commit() throws AMQException
224     {
225         if (_log.isDebugEnabled())
226         {
227             _log.debug("Committing transactional context: " this);
228         }
229 
230         if (_ackOp != null)
231         {
232 
233             _messageDelivered = true;
234             _ackOp.consolidate();
235             // already enlisted, after commit will reset regardless of outcome
236             _ackOp = null;
237         }
238 
239         if (_messageDelivered && _inTran)
240         {
241             _txnBuffer.enlist(new StoreMessageOperation(getTransactionLog()));
242         }
243         // fixme fail commit here ... QPID-440
244         try
245         {
246             _txnBuffer.commit(getStoreContext());
247         }
248         finally
249         {
250             _messageDelivered = false;
251             _inTran = getTransactionLog().inTran(getStoreContext());
252         }
253 
254         try
255         {
256             postCommitDelivery();
257         }
258         catch (AMQException e)
259         {
260             // OK so what do we do now...?
261             _log.error("Failed to deliver messages following txn commit: " + e, e);
262         }
263     }
264 
265     private void postCommitDelivery() throws AMQException
266     {
267         if (_log.isDebugEnabled())
268         {
269             _log.debug("Performing post commit delivery");
270         }
271 
272         try
273         {
274             for (DeliveryAction dd : _postCommitDeliveryList)
275             {
276                 dd.process();
277             }
278         }
279         finally
280         {
281             _postCommitDeliveryList.clear();
282         }
283     }
284 }