001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.txn;
022
023 import org.apache.log4j.Logger;
024
025 import org.apache.qpid.AMQException;
026 import org.apache.qpid.server.RequiredDeliveryException;
027 import org.apache.qpid.server.AMQChannel;
028 import org.apache.qpid.server.transactionlog.TransactionLog;
029 import org.apache.qpid.server.ack.TxAck;
030 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
031 import org.apache.qpid.server.protocol.AMQProtocolSession;
032 import org.apache.qpid.server.queue.*;
033 import org.apache.qpid.server.store.StoreContext;
034
035 import java.util.List;
036 import java.util.ArrayList;
037
038 /** A transactional context that only supports local transactions. */
039 public class LocalTransactionalContext implements TransactionalContext
040 {
041 private static final Logger _log = Logger.getLogger(LocalTransactionalContext.class);
042
043 private final TxnBuffer _txnBuffer = new TxnBuffer();
044
045 private final List<DeliveryAction> _postCommitDeliveryList = new ArrayList<DeliveryAction>();
046
047 /**
048 * We keep hold of the ack operation so that we can consolidate acks, i.e. multiple acks within a txn are
049 * consolidated into a single operation
050 */
051 private TxAck _ackOp;
052
053 private boolean _inTran = false;
054
055 /** Are there messages to deliver. NOT Has the message been delivered */
056 private boolean _messageDelivered = false;
057 private final AMQChannel _channel;
058
059 private abstract class DeliveryAction
060 {
061
062 abstract public void process() throws AMQException;
063
064 }
065
066 private class RequeueAction extends DeliveryAction
067 {
068 public QueueEntry entry;
069
070 public RequeueAction(QueueEntry entry)
071 {
072 this.entry = entry;
073 }
074
075 public void process() throws AMQException
076 {
077 entry.requeue(getStoreContext());
078 }
079 }
080
081 private class PublishAction extends DeliveryAction
082 {
083 private final AMQQueue _queue;
084 private final AMQMessage _message;
085
086 public PublishAction(final AMQQueue queue, final AMQMessage message)
087 {
088 _queue = queue;
089 _message = message;
090 }
091
092 public void process() throws AMQException
093 {
094
095 QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
096
097 if(entry.immediateAndNotDelivered())
098 {
099 getReturnMessages().add(new NoConsumersException(_message));
100 }
101 }
102 }
103
104 public LocalTransactionalContext(final AMQChannel channel)
105 {
106 _channel = channel;
107 }
108
109 public StoreContext getStoreContext()
110 {
111 return _channel.getStoreContext();
112 }
113
114 public List<RequiredDeliveryException> getReturnMessages()
115 {
116 return _channel.getReturnMessages();
117 }
118
119 public TransactionLog getTransactionLog()
120 {
121 return _channel.getTransactionLog();
122 }
123
124
125 public void rollback() throws AMQException
126 {
127 _txnBuffer.rollback(getStoreContext());
128 // Hack to deal with uncommitted non-transactional writes
129 if (getTransactionLog().inTran(getStoreContext()))
130 {
131 getTransactionLog().abortTran(getStoreContext());
132 _inTran = false;
133 }
134
135 _postCommitDeliveryList.clear();
136 }
137
138 public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
139 {
140 // A publication will result in the enlisting of several
141 // TxnOps. The first is an op that will store the message.
142 // Following that (and ordering is important), an op will
143 // be added for every queue onto which the message is
144 // enqueued.
145 _postCommitDeliveryList.add(new PublishAction(queue, message));
146 _messageDelivered = true;
147
148 }
149
150 public void requeue(QueueEntry entry) throws AMQException
151 {
152 _postCommitDeliveryList.add(new RequeueAction(entry));
153 _messageDelivered = true;
154
155 }
156
157
158 private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
159 {
160 if (!unacknowledgedMessageMap.contains(deliveryTag))
161 {
162 throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
163 }
164 }
165
166 public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
167 UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
168 {
169 // check that the tag exists to give early failure
170 if (!multiple || (deliveryTag > 0))
171 {
172 checkAck(deliveryTag, unacknowledgedMessageMap);
173 }
174 // we use a single txn op for all acks and update this op
175 // as new acks come in. If this is the first ack in the txn
176 // we will need to create and enlist the op.
177 if (_ackOp == null)
178 {
179 _ackOp = new TxAck(unacknowledgedMessageMap);
180 _txnBuffer.enlist(_ackOp);
181 }
182 // update the op to include this ack request
183 if (multiple && (deliveryTag == 0))
184 {
185 // if have signalled to ack all, that refers only
186 // to all at this time
187 _ackOp.update(lastDeliveryTag, multiple);
188 }
189 else
190 {
191 _ackOp.update(deliveryTag, multiple);
192 }
193 if(!_inTran && _ackOp.checkPersistent())
194 {
195 beginTranIfNecessary();
196 }
197 }
198
199 public void messageFullyReceived(boolean persistent) throws AMQException
200 {
201 // Not required in this transactional context
202 }
203
204 public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException
205 {
206 // Not required in this transactional context
207 }
208
209 public void beginTranIfNecessary() throws AMQException
210 {
211 if (!_inTran)
212 {
213 if (_log.isDebugEnabled())
214 {
215 _log.debug("Starting transaction on message store: " + this);
216 }
217
218 getTransactionLog().beginTran(getStoreContext());
219 _inTran = true;
220 }
221 }
222
223 public void commit() throws AMQException
224 {
225 if (_log.isDebugEnabled())
226 {
227 _log.debug("Committing transactional context: " + this);
228 }
229
230 if (_ackOp != null)
231 {
232
233 _messageDelivered = true;
234 _ackOp.consolidate();
235 // already enlisted, after commit will reset regardless of outcome
236 _ackOp = null;
237 }
238
239 if (_messageDelivered && _inTran)
240 {
241 _txnBuffer.enlist(new StoreMessageOperation(getTransactionLog()));
242 }
243 // fixme fail commit here ... QPID-440
244 try
245 {
246 _txnBuffer.commit(getStoreContext());
247 }
248 finally
249 {
250 _messageDelivered = false;
251 _inTran = getTransactionLog().inTran(getStoreContext());
252 }
253
254 try
255 {
256 postCommitDelivery();
257 }
258 catch (AMQException e)
259 {
260 // OK so what do we do now...?
261 _log.error("Failed to deliver messages following txn commit: " + e, e);
262 }
263 }
264
265 private void postCommitDelivery() throws AMQException
266 {
267 if (_log.isDebugEnabled())
268 {
269 _log.debug("Performing post commit delivery");
270 }
271
272 try
273 {
274 for (DeliveryAction dd : _postCommitDeliveryList)
275 {
276 dd.process();
277 }
278 }
279 finally
280 {
281 _postCommitDeliveryList.clear();
282 }
283 }
284 }
|