UnacknowledgedMessageMapImpl.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.ack;
022 
023 import org.apache.qpid.server.store.StoreContext;
024 import java.util.Collection;
025 import java.util.Iterator;
026 import java.util.LinkedHashMap;
027 import java.util.Map;
028 import java.util.Set;
029 
030 import org.apache.qpid.AMQException;
031 import org.apache.qpid.server.queue.QueueEntry;
032 import org.apache.qpid.server.txn.TransactionalContext;
033 
034 public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
035 {
036     private final Object _lock = new Object();
037 
038     private long _unackedSize;
039 
040     private Map<Long, QueueEntry> _map;
041 
042     private long _lastDeliveryTag;
043 
044     private final int _prefetchLimit;
045 
046     public UnacknowledgedMessageMapImpl(int prefetchLimit)
047     {
048         _prefetchLimit = prefetchLimit;
049         _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit);
050     }
051 
052     public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs)
053     {
054         if (multiple)
055         {
056             collect(deliveryTag, msgs);
057         }
058         else
059         {
060             msgs.put(deliveryTag, get(deliveryTag));
061         }
062 
063     }
064 
065     public boolean contains(long deliveryTagthrows AMQException
066     {
067         synchronized (_lock)
068         {
069             return _map.containsKey(deliveryTag);
070         }
071     }
072 
073     public void remove(Map<Long,QueueEntry> msgs)
074     {
075         synchronized (_lock)
076         {
077             for (Long deliveryTag : msgs.keySet())
078             {
079                 remove(deliveryTag);
080             }
081         }
082     }
083 
084     public QueueEntry remove(long deliveryTag)
085     {
086         synchronized (_lock)
087         {
088 
089             QueueEntry message = _map.remove(deliveryTag);
090             if(message != null)
091             {
092                 _unackedSize -= message.getMessage().getSize();
093 
094             }
095 
096             return message;
097         }
098     }
099 
100     public void visit(Visitor visitorthrows AMQException
101     {
102         synchronized (_lock)
103         {
104             Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet();
105             for (Map.Entry<Long, QueueEntry> entry : currentEntries)
106             {
107                 visitor.callback(entry.getKey().longValue(), entry.getValue());
108             }
109             visitor.visitComplete();
110         }
111     }
112 
113     public void add(long deliveryTag, QueueEntry message)
114     {
115         synchronized (_lock)
116         {
117             _map.put(deliveryTag, message);
118             _unackedSize += message.getMessage().getSize();
119             _lastDeliveryTag = deliveryTag;
120         }
121     }
122 
123     public Collection<QueueEntry> cancelAllMessages()
124     {
125         synchronized (_lock)
126         {
127             Collection<QueueEntry> currentEntries = _map.values();
128             _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
129             _unackedSize = 0l;
130             return currentEntries;
131         }
132     }
133 
134     public void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext)
135             throws AMQException
136     {
137         synchronized (_lock)
138         {
139             txnContext.acknowledgeMessage(deliveryTag, _lastDeliveryTag, multiple, this);
140         }
141     }
142 
143     public int size()
144     {
145         synchronized (_lock)
146         {
147             return _map.size();
148         }
149     }
150 
151     public void clear()
152     {
153         synchronized (_lock)
154         {
155             _map.clear();
156             _unackedSize = 0l;
157         }
158     }
159 
160     public void drainTo(long deliveryTag, StoreContext storeContextthrows AMQException
161    
162     {
163         synchronized (_lock)
164         {
165             Iterator<Map.Entry<Long, QueueEntry>> it = _map.entrySet().iterator();
166             while (it.hasNext())
167             {
168                 Map.Entry<Long, QueueEntry> unacked = it.next();
169 
170                 if (unacked.getKey() > deliveryTag)
171                 {
172                     //This should not occur now.
173                     throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
174                                            " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
175                 }
176 
177                 //Message has been ack so dequeueAndDelete it.
178                 // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
179                 // from the transaciton log                
180                 unacked.getValue().dequeueAndDelete(storeContext);
181 
182                 it.remove();
183 
184                 _unackedSize -= unacked.getValue().getMessage().getSize();
185 
186 
187                 if (unacked.getKey() == deliveryTag)
188                 {
189                     break;
190                 }
191             }
192         }
193     }
194     
195     public QueueEntry get(long key)
196     {
197         synchronized (_lock)
198         {
199             return _map.get(key);
200         }
201     }
202 
203     public Set<Long> getDeliveryTags()
204     {
205         synchronized (_lock)
206         {
207             return _map.keySet();
208         }
209     }
210 
211     private void collect(long key, Map<Long, QueueEntry> msgs)
212     {
213         synchronized (_lock)
214         {
215             for (Map.Entry<Long, QueueEntry> entry : _map.entrySet())
216             {
217                 msgs.put(entry.getKey(),entry.getValue());
218                 if (entry.getKey() == key)
219                 {
220                     break;
221                 }
222             }
223         }
224     }
225 
226     public long getUnacknowledgeBytes()
227     {
228         return _unackedSize;
229     }
230 }