ConcurrentLinkedMessageQueueAtomicSize.java
001 /*
002  *  Licensed to the Apache Software Foundation (ASF) under one
003  *  or more contributor license agreements.  See the NOTICE file
004  *  distributed with this work for additional information
005  *  regarding copyright ownership.  The ASF licenses this file
006  *  to you under the Apache License, Version 2.0 (the
007  *  "License"); you may not use this file except in compliance
008  *  with the License.  You may obtain a copy of the License at
009  *
010  *    http://www.apache.org/licenses/LICENSE-2.0
011  *
012  *  Unless required by applicable law or agreed to in writing,
013  *  software distributed under the License is distributed on an
014  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015  *  KIND, either express or implied.  See the License for the
016  *  specific language governing permissions and limitations
017  *  under the License.
018  *
019  *
020  */
021 package org.apache.qpid.util;
022 
023 import org.slf4j.Logger;
024 import org.slf4j.LoggerFactory;
025 
026 import java.util.Collection;
027 import java.util.Iterator;
028 import java.util.Queue;
029 import java.util.concurrent.atomic.AtomicInteger;
030 
031 public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQueueAtomicSize<E> implements MessageQueue<E>
032 {
033     private static final Logger _logger = LoggerFactory.getLogger(ConcurrentLinkedMessageQueueAtomicSize.class);
034 
035     protected Queue<E> _messageHead = new ConcurrentLinkedQueueAtomicSize<E>();
036 
037     protected AtomicInteger _messageHeadSize = new AtomicInteger(0);
038 
039     @Override
040     public int size()
041     {
042         return super.size() + _messageHeadSize.get();
043     }
044 
045     public int headSize()
046     {
047         return _messageHeadSize.get();
048     }
049 
050     @Override
051     public E poll()
052     {
053         if (_messageHead.isEmpty())
054         {
055             return super.poll();
056         }
057         else
058         {
059             E e = _messageHead.poll();
060 
061             if (_logger.isDebugEnabled())
062             {
063                 _logger.debug("Providing item(" + e + ")from message head");
064             }
065 
066             if (e != null)
067             {
068                 _messageHeadSize.decrementAndGet();
069             }
070 
071             return e;
072         }
073     }
074 
075     @Override
076     public boolean remove(Object o)
077     {
078 
079         if (_messageHead.isEmpty())
080         {
081             return super.remove(o);
082         }
083         else
084         {
085             if (_messageHead.remove(o))
086             {
087                 _messageHeadSize.decrementAndGet();
088 
089                 return true;
090             }
091 
092             return super.remove(o);
093         }
094     }
095 
096     @Override
097     public boolean removeAll(Collection<?> c)
098     {
099         if (_messageHead.isEmpty())
100         {
101             return super.removeAll(c);
102         }
103         else
104         {
105             // fixme this is super.removeAll but iterator here doesn't work
106             // we need to be able to correctly decrement _messageHeadSize
107             // boolean modified = false;
108             // Iterator<?> e = iterator();
109             // while (e.hasNext())
110             // {
111             // if (c.contains(e.next()))
112             // {
113             // e.remove();
114             // modified = true;
115             // _size.decrementAndGet();
116             // }
117             // }
118             // return modified;
119 
120             throw new RuntimeException("Not implemented");
121         }
122     }
123 
124     @Override
125     public boolean isEmpty()
126     {
127         return (_messageHead.isEmpty() && super.isEmpty());
128     }
129 
130     @Override
131     public void clear()
132     {
133         super.clear();
134         _messageHead.clear();
135     }
136 
137     @Override
138     public boolean contains(Object o)
139     {
140         return _messageHead.contains(o|| super.contains(o);
141     }
142 
143     @Override
144     public boolean containsAll(Collection<?> o)
145     {
146         return _messageHead.containsAll(o|| super.containsAll(o);
147     }
148 
149     @Override
150     public E element()
151     {
152         if (_messageHead.isEmpty())
153         {
154             return super.element();
155         }
156         else
157         {
158             return _messageHead.element();
159         }
160     }
161 
162     @Override
163     public E peek()
164     {
165         if (_messageHead.isEmpty())
166         {
167             return super.peek();
168         }
169         else
170         {
171             E o = _messageHead.peek();
172             if (_logger.isDebugEnabled())
173             {
174                 _logger.debug("Peeking item (" + o + ") from message head");
175             }
176 
177             return o;
178         }
179 
180     }
181 
182     @Override
183     public Iterator<E> iterator()
184     {
185         final Iterator<E> mainMessageIterator = super.iterator();
186 
187         return new Iterator<E>()
188             {
189                 final Iterator<E> _headIterator = _messageHead.iterator();
190                 final Iterator<E> _mainIterator = mainMessageIterator;
191 
192                 Iterator<E> last;
193 
194                 public boolean hasNext()
195                 {
196                     return _headIterator.hasNext() || _mainIterator.hasNext();
197                 }
198 
199                 public E next()
200                 {
201                     if (_headIterator.hasNext())
202                     {
203                         last = _headIterator;
204 
205                         return _headIterator.next();
206                     }
207                     else
208                     {
209                         last = _mainIterator;
210 
211                         return _mainIterator.next();
212                     }
213                 }
214 
215                 public void remove()
216                 {
217                     last.remove();
218                     if(last == _mainIterator)
219                     {
220                         _size.decrementAndGet();
221                     }
222                     else
223                     {
224                         _messageHeadSize.decrementAndGet();                        
225                     }
226                 }
227             };
228     }
229 
230     @Override
231     public boolean retainAll(Collection<?> c)
232     {
233         throw new RuntimeException("Not Implemented");
234     }
235 
236     @Override
237     public Object[] toArray()
238     {
239         throw new RuntimeException("Not Implemented");
240     }
241 
242     public boolean pushHead(E o)
243     {
244         if (_logger.isDebugEnabled())
245         {
246             _logger.debug("Adding item(" + o + ") to head of queue");
247         }
248 
249         if (_messageHead.offer(o))
250         {
251             _messageHeadSize.incrementAndGet();
252 
253             return true;
254         }
255 
256         return false;
257     }
258 }