TxAck.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.ack;
022 
023 import java.util.List;
024 import java.util.Map;
025 import java.util.HashMap;
026 import java.util.ArrayList;
027 
028 import org.apache.qpid.AMQException;
029 import org.apache.qpid.server.store.StoreContext;
030 import org.apache.qpid.server.txn.TxnOp;
031 import org.apache.qpid.server.queue.QueueEntry;
032 
033 /**
034  * A TxnOp implementation for handling accumulated acks
035  */
036 public class TxAck implements TxnOp
037 {
038     private final UnacknowledgedMessageMap _map;
039     private final Map<Long, QueueEntry> _unacked = new HashMap<Long,QueueEntry>();
040     private List<Long> _individual;
041     private long _deliveryTag;
042     private boolean _multiple;
043 
044     public TxAck(UnacknowledgedMessageMap map)
045     {
046         _map = map;
047     }
048 
049     public void update(long deliveryTag, boolean multiple)
050     {
051         _unacked.clear();
052         if (!multiple)
053         {
054             if(_individual == null)
055             {
056                 _individual = new ArrayList<Long>();
057             }
058             //have acked a single message that is not part of
059             //the previously acked region so record
060             //individually
061             _individual.add(deliveryTag);//_multiple && !multiple
062         }
063         else if (deliveryTag > _deliveryTag)
064         {
065             //have simply moved the last acked message on a
066             //bit
067             _deliveryTag = deliveryTag;
068             _multiple = true;
069         }
070     }
071 
072     public void consolidate()
073     {
074         if(_unacked.isEmpty())
075         {
076             //lookup all the unacked messages that have been acked in this transaction
077             if (_multiple)
078             {
079                 //get all the unacked messages for the accumulated
080                 //multiple acks
081                 _map.collect(_deliveryTag, true, _unacked);
082             }
083             if(_individual != null)
084             {
085                 //get any unacked messages for individual acks outside the
086                 //range covered by multiple acks
087                 for (long tag : _individual)
088                 {
089                     if(_deliveryTag < tag)
090                     {
091                         _map.collect(tag, false, _unacked);
092                     }
093                 }
094             }
095         }
096     }
097 
098     public boolean checkPersistent() throws AMQException
099     {
100         consolidate();
101         //if any of the messages in unacked are persistent the txn
102         //buffer must be marked as persistent:
103         for (QueueEntry msg : _unacked.values())
104         {
105             if (msg.getMessage().isPersistent())
106             {
107                 return true;
108             }
109         }
110         return false;
111     }
112 
113     public void prepare(StoreContext storeContextthrows AMQException
114     {
115         //make persistent changes, i.e. dequeue and decrementReference
116         for (QueueEntry msg : _unacked.values())
117         {
118             //Message has been ack so dequeueAndDelete it.
119             // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
120             // from the transaciton log
121             msg.dequeueAndDelete(storeContext);
122         }
123     }
124 
125     public void undoPrepare()
126     {
127         //As this is transaction the above dequeueAndDelete will only request the message be dequeue from the
128         // transactionLog. Only when the transaction succesfully completes will it perform any
129         // update of the internal transactionLog reference counting and any resulting message data deletion.
130         // The success or failure of the data deletion is not important to this transaction only that the ack has been
131         // successfully recorded.
132     }
133 
134     public void commit(StoreContext storeContext)
135     {
136         //remove the unacked messages from the channels map
137         _map.remove(_unacked);        
138     }
139 
140     public void rollback(StoreContext storeContext)
141     {
142     }
143 }
144