001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.txn;
022
023 import org.apache.qpid.AMQException;
024 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
025 import org.apache.qpid.server.protocol.AMQProtocolSession;
026 import org.apache.qpid.server.queue.AMQMessage;
027 import org.apache.qpid.server.queue.QueueEntry;
028 import org.apache.qpid.server.queue.AMQQueue;
029 import org.apache.qpid.server.store.StoreContext;
030
031 /**
032 * TransactionalContext provides a context in which transactional operations on {@link AMQMessage}s are performed.
033 * Different levels of transactional support for the delivery of messages may be provided by different implementations
034 * of this interface.
035 *
036 * <p/>The fundamental transactional operations that can be performed on a message queue are 'enqueue' and 'dequeue'.
037 * In this interface, these have been recast as the {@link #messageFullyReceived} and {@link #acknowledgeMessage}
038 * operations. This interface essentially provides a way to make enqueueing and dequeuing transactional.
039 *
040 * <p/><table id="crc"><caption>CRC Card</caption>
041 * <tr><th> Responsibilities
042 * <tr><td> Explicitly accept a transaction start notification.
043 * <tr><td> Commit all pending operations in a transaction.
044 * <tr><td> Rollback all pending operations in a transaction.
045 * <tr><td> Deliver a message to a queue as part of a transaction.
046 * <tr><td> Redeliver a message to a queue as part of a transaction.
047 * <tr><td> Mark a message as acknowledged as part of a transaction.
048 * <tr><td> Accept notification that a message has been completely received as part of a transaction.
049 * <tr><td> Accept notification that a message has been fully processed as part of a transaction.
050 * <tr><td> Associate a message store context with this transaction context.
051 * </table>
052 *
053 * @todo The 'fullyReceived' and 'messageProcessed' events sit uncomfortably in the responsibilities of a transactional
054 * context. They are non-transactional operations, used to trigger other side-effects. Consider moving them
055 * somewhere else, a seperate interface for example.
056 *
057 * @todo This transactional context could be written as a wrapper extension to a Queue implementation, that provides
058 * transactional management of the enqueue and dequeue operations, with added commit/rollback methods. Any
059 * queue implementation could be made transactional by wrapping it as a transactional queue. This would mean
060 * that the enqueue/dequeue operations do not need to be recast as deliver/acknowledge operations, which may be
061 * conceptually neater.
062 *
063 * For example:
064 * <pre>
065 * public interface Transactional
066 * {
067 * public void commit();
068 * public void rollback();
069 * }
070 *
071 * public interface TransactionalQueue<E> extends Transactional, SizeableQueue<E>
072 * {}
073 *
074 * public class Queues
075 * {
076 * ...
077 * // For transactional messaging, take a transactional view onto the queue.
078 * public static <E> TransactionalQueue<E> getTransactionalQueue(SizeableQueue<E> queue) { ... }
079 *
080 * // For non-transactional messaging, take a non-transactional view onto the queue.
081 * public static <E> TransactionalQueue<E> getNonTransactionalQueue(SizeableQueue<E> queue) { ... }
082 * }
083 * </pre>
084 */
085 public interface TransactionalContext
086 {
087 /**
088 * Explicitly begins the transaction, if it has not already been started. {@link #commit} or {@link #rollback}
089 * should automatically begin the next transaction in the chain.
090 *
091 * @throws AMQException If the transaction cannot be started for any reason.
092 */
093 void beginTranIfNecessary() throws AMQException;
094
095 /**
096 * Makes all pending operations on the transaction permanent and visible.
097 *
098 * @throws AMQException If the transaction cannot be committed for any reason.
099 */
100 void commit() throws AMQException;
101
102 /**
103 * Erases all pending operations on the transaction.
104 *
105 * @throws AMQException If the transaction cannot be committed for any reason.
106 */
107 void rollback() throws AMQException;
108
109 /**
110 * Delivers the specified message to the specified queue.
111 *
112 * <p/>This is an 'enqueue' operation.
113 *
114 * @param queue
115 * @param message The message to deliver
116 * @throws AMQException If the message cannot be delivered for any reason.
117 */
118 void deliver(final AMQQueue queue, AMQMessage message) throws AMQException;
119
120 /**
121 * Requeues the specified message entry (message queue pair)
122 *
123 *
124 * @param queueEntry The message,queue pair
125 *
126 * @throws AMQException If the message cannot be delivered for any reason.
127 */
128 void requeue(QueueEntry queueEntry) throws AMQException;
129
130
131 /**
132 * Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by
133 * setting the 'multiple' flag. It is also possible for the acknowledged message id to be zero, when the 'multiple'
134 * flag is set, in which case an acknowledgement up to the latest delivered message should be done.
135 *
136 * <p/>This is a 'dequeue' operation.
137 *
138 * @param deliveryTag The id of the message to acknowledge, or zero, if using multiple acknowledgement
139 * up to the latest message.
140 * @param lastDeliveryTag The latest message delivered.
141 * @param multiple <tt>true</tt> if all message ids up the acknowledged one or latest delivered, are
142 * to be acknowledged, <tt>false</tt> otherwise.
143 * @param unacknowledgedMessageMap The unacknowledged messages in the transaction, to remove the acknowledged message
144 * from.
145 *
146 * @throws AMQException If the message cannot be acknowledged for any reason.
147 */
148 void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
149 UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
150
151 /**
152 * Notifies the transactional context that a message has been fully received. The actual message that was received
153 * is not specified. This event may be used to trigger a process related to the receipt of the message, for example,
154 * flushing its data to disk.
155 *
156 * @param persistent <tt>true</tt> if the received message is persistent, <tt>false</tt> otherwise.
157 *
158 * @throws AMQException If the fully received event cannot be processed for any reason.
159 */
160 void messageFullyReceived(boolean persistent) throws AMQException;
161
162 /**
163 * Notifies the transactional context that a message has been delivered, succesfully or otherwise. The actual
164 * message that was delivered is not specified. This event may be used to trigger a process related to the
165 * outcome of the delivery of the message, for example, cleaning up failed deliveries.
166 *
167 * @param protocolSession The protocol session of the deliverable message.
168 *
169 * @throws AMQException If the message processed event cannot be handled for any reason.
170 */
171 void messageProcessed(AMQProtocolSession protocolSession) throws AMQException;
172
173 /**
174 * Gets the message store context associated with this transactional context.
175 *
176 * @return The message store context associated with this transactional context.
177 */
178 StoreContext getStoreContext();
179 }
|