ReadWriteJobQueue.java
001 package org.apache.qpid.pool;
002 
003 import java.util.AbstractQueue;
004 import java.util.Iterator;
005 import java.util.Collection;
006 import java.util.concurrent.BlockingQueue;
007 import java.util.concurrent.TimeUnit;
008 import java.util.concurrent.ConcurrentLinkedQueue;
009 import java.util.concurrent.locks.ReentrantLock;
010 import java.util.concurrent.locks.Condition;
011 import java.util.concurrent.atomic.AtomicInteger;
012 
013 /*
014 *
015 * Licensed to the Apache Software Foundation (ASF) under one
016 * or more contributor license agreements.  See the NOTICE file
017 * distributed with this work for additional information
018 * regarding copyright ownership.  The ASF licenses this file
019 * to you under the Apache License, Version 2.0 (the
020 * "License"); you may not use this file except in compliance
021 * with the License.  You may obtain a copy of the License at
022 *
023 *   http://www.apache.org/licenses/LICENSE-2.0
024 *
025 * Unless required by applicable law or agreed to in writing,
026 * software distributed under the License is distributed on an
027 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
028 * KIND, either express or implied.  See the License for the
029 * specific language governing permissions and limitations
030 * under the License.
031 *
032 */
033 public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable>
034 {
035 
036     private final AtomicInteger _count = new AtomicInteger(0);
037 
038     private final ReentrantLock _takeLock = new ReentrantLock();
039 
040     private final Condition _notEmpty = _takeLock.newCondition();
041 
042     private final ReentrantLock _putLock = new ReentrantLock();
043 
044     private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
045 
046     private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
047 
048 
049     private class ReadWriteJobIterator implements Iterator<Runnable>
050     {
051 
052         private boolean _onReads;
053         private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator();
054 
055         public boolean hasNext()
056         {
057             if(!_iter.hasNext())
058             {
059                 if(_onReads)
060                 {
061                     _iter = _readJobQueue.iterator();
062                     _onReads = true;
063                     return _iter.hasNext();
064                 }
065                 else
066                 {
067                     return false;
068                 }
069             }
070             else
071             {
072                 return true;
073             }
074         }
075 
076         public Runnable next()
077         {
078             if(_iter.hasNext())
079             {
080                 return _iter.next();
081             }
082             else
083             {
084                 return null;
085             }
086         }
087 
088         public void remove()
089         {
090             _takeLock.lock();
091             try
092             {
093                 _iter.remove();
094                 _count.decrementAndGet();
095             }
096             finally
097             {
098                 _takeLock.unlock();
099             }
100         }
101     }
102 
103     public Iterator<Runnable> iterator()
104     {
105         return new ReadWriteJobIterator();
106     }
107 
108     public int size()
109     {
110         return _count.get();
111     }
112 
113     public boolean offer(final Runnable runnable)
114     {
115         final ReadWriteRunnable job = (ReadWriteRunnablerunnable;
116         final ReentrantLock putLock = _putLock;
117         putLock.lock();
118         try
119         {
120             if(job.isRead())
121             {
122                 _readJobQueue.offer(job);
123             }
124             else
125             {
126                 _writeJobQueue.offer(job);
127             }
128             if(_count.getAndIncrement() == 0)
129             {
130                 _takeLock.lock();
131                 try
132                 {
133                     _notEmpty.signal();
134                 }
135                 finally
136                 {
137                     _takeLock.unlock();
138                 }
139             }
140             return true;
141         }
142         finally
143         {
144             putLock.unlock();
145         }
146     }
147 
148     public void put(final Runnable runnablethrows InterruptedException
149     {
150         final ReadWriteRunnable job = (ReadWriteRunnablerunnable;
151         final ReentrantLock putLock = _putLock;
152         putLock.lock();
153 
154         try
155         {
156             if(job.isRead())
157             {
158                 _readJobQueue.offer(job);
159             }
160             else
161             {
162                 _writeJobQueue.offer(job);
163             }
164             if(_count.getAndIncrement() == 0)
165             {
166                                 _takeLock.lock();
167                 try
168                 {
169                     _notEmpty.signal();
170                 }
171                 finally
172                 {
173                     _takeLock.unlock();
174                 }
175             }
176 
177         }
178         finally
179         {
180             putLock.unlock();
181         }
182     }
183 
184 
185 
186     public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unitthrows InterruptedException
187     {
188         final ReadWriteRunnable job = (ReadWriteRunnablerunnable;
189         final ReentrantLock putLock = _putLock;
190         putLock.lock();
191 
192         try
193         {
194             if(job.isRead())
195             {
196                 _readJobQueue.offer(job);
197             }
198             else
199             {
200                 _writeJobQueue.offer(job);
201             }
202             if(_count.getAndIncrement() == 0)
203             {
204                 _takeLock.lock();
205                 try
206                 {
207                     _notEmpty.signal();
208                 }
209                 finally
210                 {
211                     _takeLock.unlock();
212                 }
213             }
214 
215             return true;
216         }
217         finally
218         {
219             putLock.unlock();
220         }
221 
222     }
223 
224     public Runnable take() throws InterruptedException
225     {
226         final ReentrantLock takeLock = _takeLock;
227         takeLock.lockInterruptibly();
228         try
229         {
230             try
231             {
232                 while (_count.get() == 0)
233                 {
234                     _notEmpty.await();
235                 }
236             }
237             catch (InterruptedException ie)
238             {
239                 _notEmpty.signal();
240                 throw ie;
241             }
242 
243             ReadWriteRunnable job = _writeJobQueue.poll();
244             if(job == null)
245             {
246                 job = _readJobQueue.poll();
247             }
248             int c = _count.getAndDecrement();
249             if (c > 1)
250             {
251                 _notEmpty.signal();
252             }
253             return job;
254         }
255         finally
256         {
257             takeLock.unlock();
258         }
259 
260 
261     }
262 
263     public Runnable poll(final long timeout, final TimeUnit unitthrows InterruptedException
264     {
265         final ReentrantLock takeLock = _takeLock;
266         final AtomicInteger count = _count;
267         long nanos = unit.toNanos(timeout);
268         takeLock.lockInterruptibly();
269         ReadWriteRunnable job = null;
270         try
271         {
272 
273             for (;;)
274             {
275                 if (count.get() 0)
276                 {
277                     job = _writeJobQueue.poll();
278                     if(job == null)
279                     {
280                         job = _readJobQueue.poll();
281                     }
282                     int c = count.getAndDecrement();
283                     if (c > 1)
284                     {
285                         _notEmpty.signal();
286                     }
287                     break;
288                 }
289                 if (nanos <= 0)
290                 {
291                     return null;
292                 }
293                 try
294                 {
295                     nanos = _notEmpty.awaitNanos(nanos);
296                 }
297                 catch (InterruptedException ie)
298                 {
299                     _notEmpty.signal();
300                     throw ie;
301                 }
302             }
303         }
304         finally
305         {
306             takeLock.unlock();
307         }
308 
309         return job;
310     }
311 
312     public int remainingCapacity()
313     {
314         return Integer.MAX_VALUE;
315     }
316 
317     public int drainTo(final Collection<? super Runnable> c)
318     {
319         int total = 0;
320 
321         _putLock.lock();
322         _takeLock.lock();
323         try
324         {
325             ReadWriteRunnable job;
326             while((job = _writeJobQueue.peek())!= null)
327             {
328                 c.add(job);
329                 _writeJobQueue.poll();
330                 _count.decrementAndGet();
331                 total++;
332             }
333 
334             while((job = _readJobQueue.peek())!= null)
335             {
336                 c.add(job);
337                 _readJobQueue.poll();
338                 _count.decrementAndGet();
339                 total++;
340             }
341 
342         }
343         finally
344         {
345             _takeLock.unlock();
346             _putLock.unlock();
347         }
348         return total;
349     }
350 
351     public int drainTo(final Collection<? super Runnable> c, final int maxElements)
352     {
353         int total = 0;
354 
355         _putLock.lock();
356         _takeLock.lock();
357         try
358         {
359             ReadWriteRunnable job;
360             while(total<=maxElements && (job = _writeJobQueue.peek())!= null)
361             {
362                 c.add(job);
363                 _writeJobQueue.poll();
364                 _count.decrementAndGet();
365                 total++;
366             }
367 
368             while(total<=maxElements && (job = _readJobQueue.peek())!= null)
369             {
370                 c.add(job);
371                 _readJobQueue.poll();
372                 _count.decrementAndGet();
373                 total++;
374             }
375 
376         }
377         finally
378         {
379             _takeLock.unlock();
380             _putLock.unlock();
381         }
382         return total;
383 
384     }
385 
386     public Runnable poll()
387     {
388         final ReentrantLock takeLock = _takeLock;
389         takeLock.lock();
390         try
391         {
392             if(_count.get() 0)
393             {
394                 ReadWriteRunnable job = _writeJobQueue.poll();
395                 if(job == null)
396                 {
397                     job = _readJobQueue.poll();
398                 }
399                 _count.decrementAndGet();
400                 return job;
401             }
402             else
403             {
404                 return null;
405             }
406         }
407         finally
408         {
409             takeLock.unlock();
410         }
411 
412     }
413 
414     public Runnable peek()
415     {
416         final ReentrantLock takeLock = _takeLock;
417         takeLock.lock();
418         try
419         {
420             ReadWriteRunnable job = _writeJobQueue.peek();
421             if(job == null)
422             {
423                 job = _readJobQueue.peek();
424             }
425             return job;
426         }
427         finally
428         {
429             takeLock.unlock();
430         }
431     }
432 }