001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.txn;
022
023 import java.util.List;
024
025 import org.apache.log4j.Logger;
026 import org.apache.qpid.AMQException;
027 import org.apache.qpid.server.AMQChannel;
028 import org.apache.qpid.server.RequiredDeliveryException;
029 import org.apache.qpid.server.transactionlog.TransactionLog;
030 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
031 import org.apache.qpid.server.protocol.AMQProtocolSession;
032 import org.apache.qpid.server.queue.*;
033 import org.apache.qpid.server.store.StoreContext;
034
035 /** @author Apache Software Foundation */
036 public class NonTransactionalContext implements TransactionalContext
037 {
038 private static final Logger _log = Logger.getLogger(NonTransactionalContext.class);
039
040 /** Channel is useful for logging */
041 private final AMQChannel _channel;
042
043 /** Where to put undeliverable messages */
044 private final List<RequiredDeliveryException> _returnMessages;
045
046
047
048 private final TransactionLog _transactionLog;
049
050 private final StoreContext _storeContext;
051
052 /** Whether we are in a transaction */
053 private boolean _inTran;
054
055 public NonTransactionalContext(TransactionLog transactionLog, StoreContext storeContext, AMQChannel channel,
056 List<RequiredDeliveryException> returnMessages)
057 {
058 _channel = channel;
059 _storeContext = storeContext;
060 _returnMessages = returnMessages;
061 _transactionLog = transactionLog;
062
063 }
064
065
066 public StoreContext getStoreContext()
067 {
068 return _storeContext;
069 }
070
071 public void beginTranIfNecessary() throws AMQException
072 {
073 if (!_inTran)
074 {
075 _transactionLog.beginTran(_storeContext);
076 _inTran = true;
077 }
078 }
079
080 public void commit() throws AMQException
081 {
082 // Does not apply to this context
083 }
084
085 public void rollback() throws AMQException
086 {
087 // Does not apply to this context
088 }
089
090 public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
091 {
092 QueueEntry entry = queue.enqueue(_storeContext, message);
093
094 //following check implements the functionality
095 //required by the 'immediate' flag:
096 if(entry.immediateAndNotDelivered())
097 {
098 _returnMessages.add(new NoConsumersException(entry.getMessage()));
099 }
100
101 }
102
103 public void requeue(QueueEntry entry) throws AMQException
104 {
105 entry.requeue(_storeContext);
106 }
107
108 public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag,
109 boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap)
110 throws AMQException
111 {
112
113 final boolean debug = _log.isDebugEnabled();
114 ;
115 if (multiple)
116 {
117 if (deliveryTag == 0)
118 {
119
120 //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero,
121 // tells the server to acknowledge all outstanding mesages.
122 _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" +
123 unacknowledgedMessageMap.size());
124 unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
125 {
126 public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException
127 {
128 if (debug)
129 {
130 _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
131 }
132 if(queueEntry.getMessage().isPersistent())
133 {
134 beginTranIfNecessary();
135 }
136 //Message has been ack so dequeueAndDelete it.
137 queueEntry.dequeueAndDelete(_storeContext);
138
139 return false;
140 }
141
142 public void visitComplete()
143 {
144 unacknowledgedMessageMap.clear();
145 }
146 });
147 }
148 else
149 {
150 if (!unacknowledgedMessageMap.contains(deliveryTag))
151 {
152 throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
153 }
154
155 unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext);
156 }
157 }
158 else
159 {
160 QueueEntry queueEntry;
161 queueEntry = unacknowledgedMessageMap.get(deliveryTag);
162
163 if (debug)
164 {
165 _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
166 }
167
168 if (queueEntry == null)
169 {
170 _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
171 _channel.getChannelId());
172 throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
173 _channel.getChannelId());
174 }
175
176 if (debug)
177 {
178 _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
179 }
180 if(queueEntry.getMessage().isPersistent())
181 {
182 beginTranIfNecessary();
183 }
184
185 //Message has been ack so dequeueAndDelete it.
186 // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
187 // from the transaciton log
188 queueEntry.dequeueAndDelete(_storeContext);
189
190 unacknowledgedMessageMap.remove(deliveryTag);
191
192
193 }
194 if(_inTran)
195 {
196 _transactionLog.commitTran(_storeContext);
197 _inTran = false;
198 }
199 }
200
201 public void messageFullyReceived(boolean persistent) throws AMQException
202 {
203 if (persistent)
204 {
205 _transactionLog.commitTran(_storeContext);
206 _inTran = false;
207 }
208 }
209
210 public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException
211 {
212 _channel.processReturns();
213 }
214 }
|