001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.ack;
022
023 import org.apache.qpid.server.store.StoreContext;
024 import java.util.Collection;
025 import java.util.Iterator;
026 import java.util.LinkedHashMap;
027 import java.util.Map;
028 import java.util.Set;
029
030 import org.apache.qpid.AMQException;
031 import org.apache.qpid.server.queue.QueueEntry;
032 import org.apache.qpid.server.txn.TransactionalContext;
033
034 public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
035 {
036 private final Object _lock = new Object();
037
038 private long _unackedSize;
039
040 private Map<Long, QueueEntry> _map;
041
042 private long _lastDeliveryTag;
043
044 private final int _prefetchLimit;
045
046 public UnacknowledgedMessageMapImpl(int prefetchLimit)
047 {
048 _prefetchLimit = prefetchLimit;
049 _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit);
050 }
051
052 public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs)
053 {
054 if (multiple)
055 {
056 collect(deliveryTag, msgs);
057 }
058 else
059 {
060 msgs.put(deliveryTag, get(deliveryTag));
061 }
062
063 }
064
065 public boolean contains(long deliveryTag) throws AMQException
066 {
067 synchronized (_lock)
068 {
069 return _map.containsKey(deliveryTag);
070 }
071 }
072
073 public void remove(Map<Long,QueueEntry> msgs)
074 {
075 synchronized (_lock)
076 {
077 for (Long deliveryTag : msgs.keySet())
078 {
079 remove(deliveryTag);
080 }
081 }
082 }
083
084 public QueueEntry remove(long deliveryTag)
085 {
086 synchronized (_lock)
087 {
088
089 QueueEntry message = _map.remove(deliveryTag);
090 if(message != null)
091 {
092 _unackedSize -= message.getMessage().getSize();
093
094 }
095
096 return message;
097 }
098 }
099
100 public void visit(Visitor visitor) throws AMQException
101 {
102 synchronized (_lock)
103 {
104 Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet();
105 for (Map.Entry<Long, QueueEntry> entry : currentEntries)
106 {
107 visitor.callback(entry.getKey().longValue(), entry.getValue());
108 }
109 visitor.visitComplete();
110 }
111 }
112
113 public void add(long deliveryTag, QueueEntry message)
114 {
115 synchronized (_lock)
116 {
117 _map.put(deliveryTag, message);
118 _unackedSize += message.getMessage().getSize();
119 _lastDeliveryTag = deliveryTag;
120 }
121 }
122
123 public Collection<QueueEntry> cancelAllMessages()
124 {
125 synchronized (_lock)
126 {
127 Collection<QueueEntry> currentEntries = _map.values();
128 _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
129 _unackedSize = 0l;
130 return currentEntries;
131 }
132 }
133
134 public void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext)
135 throws AMQException
136 {
137 synchronized (_lock)
138 {
139 txnContext.acknowledgeMessage(deliveryTag, _lastDeliveryTag, multiple, this);
140 }
141 }
142
143 public int size()
144 {
145 synchronized (_lock)
146 {
147 return _map.size();
148 }
149 }
150
151 public void clear()
152 {
153 synchronized (_lock)
154 {
155 _map.clear();
156 _unackedSize = 0l;
157 }
158 }
159
160 public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException
161
162 {
163 synchronized (_lock)
164 {
165 Iterator<Map.Entry<Long, QueueEntry>> it = _map.entrySet().iterator();
166 while (it.hasNext())
167 {
168 Map.Entry<Long, QueueEntry> unacked = it.next();
169
170 if (unacked.getKey() > deliveryTag)
171 {
172 //This should not occur now.
173 throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
174 " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
175 }
176
177 //Message has been ack so dequeueAndDelete it.
178 // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
179 // from the transaciton log
180 unacked.getValue().dequeueAndDelete(storeContext);
181
182 it.remove();
183
184 _unackedSize -= unacked.getValue().getMessage().getSize();
185
186
187 if (unacked.getKey() == deliveryTag)
188 {
189 break;
190 }
191 }
192 }
193 }
194
195 public QueueEntry get(long key)
196 {
197 synchronized (_lock)
198 {
199 return _map.get(key);
200 }
201 }
202
203 public Set<Long> getDeliveryTags()
204 {
205 synchronized (_lock)
206 {
207 return _map.keySet();
208 }
209 }
210
211 private void collect(long key, Map<Long, QueueEntry> msgs)
212 {
213 synchronized (_lock)
214 {
215 for (Map.Entry<Long, QueueEntry> entry : _map.entrySet())
216 {
217 msgs.put(entry.getKey(),entry.getValue());
218 if (entry.getKey() == key)
219 {
220 break;
221 }
222 }
223 }
224 }
225
226 public long getUnacknowledgeBytes()
227 {
228 return _unackedSize;
229 }
230 }
|