001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.ack;
022
023 import java.util.List;
024 import java.util.Map;
025 import java.util.HashMap;
026 import java.util.ArrayList;
027
028 import org.apache.qpid.AMQException;
029 import org.apache.qpid.server.store.StoreContext;
030 import org.apache.qpid.server.txn.TxnOp;
031 import org.apache.qpid.server.queue.QueueEntry;
032
033 /**
034 * A TxnOp implementation for handling accumulated acks
035 */
036 public class TxAck implements TxnOp
037 {
038 private final UnacknowledgedMessageMap _map;
039 private final Map<Long, QueueEntry> _unacked = new HashMap<Long,QueueEntry>();
040 private List<Long> _individual;
041 private long _deliveryTag;
042 private boolean _multiple;
043
044 public TxAck(UnacknowledgedMessageMap map)
045 {
046 _map = map;
047 }
048
049 public void update(long deliveryTag, boolean multiple)
050 {
051 _unacked.clear();
052 if (!multiple)
053 {
054 if(_individual == null)
055 {
056 _individual = new ArrayList<Long>();
057 }
058 //have acked a single message that is not part of
059 //the previously acked region so record
060 //individually
061 _individual.add(deliveryTag);//_multiple && !multiple
062 }
063 else if (deliveryTag > _deliveryTag)
064 {
065 //have simply moved the last acked message on a
066 //bit
067 _deliveryTag = deliveryTag;
068 _multiple = true;
069 }
070 }
071
072 public void consolidate()
073 {
074 if(_unacked.isEmpty())
075 {
076 //lookup all the unacked messages that have been acked in this transaction
077 if (_multiple)
078 {
079 //get all the unacked messages for the accumulated
080 //multiple acks
081 _map.collect(_deliveryTag, true, _unacked);
082 }
083 if(_individual != null)
084 {
085 //get any unacked messages for individual acks outside the
086 //range covered by multiple acks
087 for (long tag : _individual)
088 {
089 if(_deliveryTag < tag)
090 {
091 _map.collect(tag, false, _unacked);
092 }
093 }
094 }
095 }
096 }
097
098 public boolean checkPersistent() throws AMQException
099 {
100 consolidate();
101 //if any of the messages in unacked are persistent the txn
102 //buffer must be marked as persistent:
103 for (QueueEntry msg : _unacked.values())
104 {
105 if (msg.getMessage().isPersistent())
106 {
107 return true;
108 }
109 }
110 return false;
111 }
112
113 public void prepare(StoreContext storeContext) throws AMQException
114 {
115 //make persistent changes, i.e. dequeue and decrementReference
116 for (QueueEntry msg : _unacked.values())
117 {
118 //Message has been ack so dequeueAndDelete it.
119 // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
120 // from the transaciton log
121 msg.dequeueAndDelete(storeContext);
122 }
123 }
124
125 public void undoPrepare()
126 {
127 //As this is transaction the above dequeueAndDelete will only request the message be dequeue from the
128 // transactionLog. Only when the transaction succesfully completes will it perform any
129 // update of the internal transactionLog reference counting and any resulting message data deletion.
130 // The success or failure of the data deletion is not important to this transaction only that the ack has been
131 // successfully recorded.
132 }
133
134 public void commit(StoreContext storeContext)
135 {
136 //remove the unacked messages from the channels map
137 _map.remove(_unacked);
138 }
139
140 public void rollback(StoreContext storeContext)
141 {
142 }
143 }
144
|