001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 *
019 *
020 */
021 package org.apache.qpid.util;
022
023 import org.slf4j.Logger;
024 import org.slf4j.LoggerFactory;
025
026 import java.util.Collection;
027 import java.util.Iterator;
028 import java.util.Queue;
029 import java.util.concurrent.atomic.AtomicInteger;
030
031 public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQueueAtomicSize<E> implements MessageQueue<E>
032 {
033 private static final Logger _logger = LoggerFactory.getLogger(ConcurrentLinkedMessageQueueAtomicSize.class);
034
035 protected Queue<E> _messageHead = new ConcurrentLinkedQueueAtomicSize<E>();
036
037 protected AtomicInteger _messageHeadSize = new AtomicInteger(0);
038
039 @Override
040 public int size()
041 {
042 return super.size() + _messageHeadSize.get();
043 }
044
045 public int headSize()
046 {
047 return _messageHeadSize.get();
048 }
049
050 @Override
051 public E poll()
052 {
053 if (_messageHead.isEmpty())
054 {
055 return super.poll();
056 }
057 else
058 {
059 E e = _messageHead.poll();
060
061 if (_logger.isDebugEnabled())
062 {
063 _logger.debug("Providing item(" + e + ")from message head");
064 }
065
066 if (e != null)
067 {
068 _messageHeadSize.decrementAndGet();
069 }
070
071 return e;
072 }
073 }
074
075 @Override
076 public boolean remove(Object o)
077 {
078
079 if (_messageHead.isEmpty())
080 {
081 return super.remove(o);
082 }
083 else
084 {
085 if (_messageHead.remove(o))
086 {
087 _messageHeadSize.decrementAndGet();
088
089 return true;
090 }
091
092 return super.remove(o);
093 }
094 }
095
096 @Override
097 public boolean removeAll(Collection<?> c)
098 {
099 if (_messageHead.isEmpty())
100 {
101 return super.removeAll(c);
102 }
103 else
104 {
105 // fixme this is super.removeAll but iterator here doesn't work
106 // we need to be able to correctly decrement _messageHeadSize
107 // boolean modified = false;
108 // Iterator<?> e = iterator();
109 // while (e.hasNext())
110 // {
111 // if (c.contains(e.next()))
112 // {
113 // e.remove();
114 // modified = true;
115 // _size.decrementAndGet();
116 // }
117 // }
118 // return modified;
119
120 throw new RuntimeException("Not implemented");
121 }
122 }
123
124 @Override
125 public boolean isEmpty()
126 {
127 return (_messageHead.isEmpty() && super.isEmpty());
128 }
129
130 @Override
131 public void clear()
132 {
133 super.clear();
134 _messageHead.clear();
135 }
136
137 @Override
138 public boolean contains(Object o)
139 {
140 return _messageHead.contains(o) || super.contains(o);
141 }
142
143 @Override
144 public boolean containsAll(Collection<?> o)
145 {
146 return _messageHead.containsAll(o) || super.containsAll(o);
147 }
148
149 @Override
150 public E element()
151 {
152 if (_messageHead.isEmpty())
153 {
154 return super.element();
155 }
156 else
157 {
158 return _messageHead.element();
159 }
160 }
161
162 @Override
163 public E peek()
164 {
165 if (_messageHead.isEmpty())
166 {
167 return super.peek();
168 }
169 else
170 {
171 E o = _messageHead.peek();
172 if (_logger.isDebugEnabled())
173 {
174 _logger.debug("Peeking item (" + o + ") from message head");
175 }
176
177 return o;
178 }
179
180 }
181
182 @Override
183 public Iterator<E> iterator()
184 {
185 final Iterator<E> mainMessageIterator = super.iterator();
186
187 return new Iterator<E>()
188 {
189 final Iterator<E> _headIterator = _messageHead.iterator();
190 final Iterator<E> _mainIterator = mainMessageIterator;
191
192 Iterator<E> last;
193
194 public boolean hasNext()
195 {
196 return _headIterator.hasNext() || _mainIterator.hasNext();
197 }
198
199 public E next()
200 {
201 if (_headIterator.hasNext())
202 {
203 last = _headIterator;
204
205 return _headIterator.next();
206 }
207 else
208 {
209 last = _mainIterator;
210
211 return _mainIterator.next();
212 }
213 }
214
215 public void remove()
216 {
217 last.remove();
218 if(last == _mainIterator)
219 {
220 _size.decrementAndGet();
221 }
222 else
223 {
224 _messageHeadSize.decrementAndGet();
225 }
226 }
227 };
228 }
229
230 @Override
231 public boolean retainAll(Collection<?> c)
232 {
233 throw new RuntimeException("Not Implemented");
234 }
235
236 @Override
237 public Object[] toArray()
238 {
239 throw new RuntimeException("Not Implemented");
240 }
241
242 public boolean pushHead(E o)
243 {
244 if (_logger.isDebugEnabled())
245 {
246 _logger.debug("Adding item(" + o + ") to head of queue");
247 }
248
249 if (_messageHead.offer(o))
250 {
251 _messageHeadSize.incrementAndGet();
252
253 return true;
254 }
255
256 return false;
257 }
258 }
|