001 package org.apache.qpid.pool;
002
003 import java.util.AbstractQueue;
004 import java.util.Iterator;
005 import java.util.Collection;
006 import java.util.concurrent.BlockingQueue;
007 import java.util.concurrent.TimeUnit;
008 import java.util.concurrent.ConcurrentLinkedQueue;
009 import java.util.concurrent.locks.ReentrantLock;
010 import java.util.concurrent.locks.Condition;
011 import java.util.concurrent.atomic.AtomicInteger;
012
013 /*
014 *
015 * Licensed to the Apache Software Foundation (ASF) under one
016 * or more contributor license agreements. See the NOTICE file
017 * distributed with this work for additional information
018 * regarding copyright ownership. The ASF licenses this file
019 * to you under the Apache License, Version 2.0 (the
020 * "License"); you may not use this file except in compliance
021 * with the License. You may obtain a copy of the License at
022 *
023 * http://www.apache.org/licenses/LICENSE-2.0
024 *
025 * Unless required by applicable law or agreed to in writing,
026 * software distributed under the License is distributed on an
027 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
028 * KIND, either express or implied. See the License for the
029 * specific language governing permissions and limitations
030 * under the License.
031 *
032 */
033 public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable>
034 {
035
036 private final AtomicInteger _count = new AtomicInteger(0);
037
038 private final ReentrantLock _takeLock = new ReentrantLock();
039
040 private final Condition _notEmpty = _takeLock.newCondition();
041
042 private final ReentrantLock _putLock = new ReentrantLock();
043
044 private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
045
046 private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
047
048
049 private class ReadWriteJobIterator implements Iterator<Runnable>
050 {
051
052 private boolean _onReads;
053 private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator();
054
055 public boolean hasNext()
056 {
057 if(!_iter.hasNext())
058 {
059 if(_onReads)
060 {
061 _iter = _readJobQueue.iterator();
062 _onReads = true;
063 return _iter.hasNext();
064 }
065 else
066 {
067 return false;
068 }
069 }
070 else
071 {
072 return true;
073 }
074 }
075
076 public Runnable next()
077 {
078 if(_iter.hasNext())
079 {
080 return _iter.next();
081 }
082 else
083 {
084 return null;
085 }
086 }
087
088 public void remove()
089 {
090 _takeLock.lock();
091 try
092 {
093 _iter.remove();
094 _count.decrementAndGet();
095 }
096 finally
097 {
098 _takeLock.unlock();
099 }
100 }
101 }
102
103 public Iterator<Runnable> iterator()
104 {
105 return new ReadWriteJobIterator();
106 }
107
108 public int size()
109 {
110 return _count.get();
111 }
112
113 public boolean offer(final Runnable runnable)
114 {
115 final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
116 final ReentrantLock putLock = _putLock;
117 putLock.lock();
118 try
119 {
120 if(job.isRead())
121 {
122 _readJobQueue.offer(job);
123 }
124 else
125 {
126 _writeJobQueue.offer(job);
127 }
128 if(_count.getAndIncrement() == 0)
129 {
130 _takeLock.lock();
131 try
132 {
133 _notEmpty.signal();
134 }
135 finally
136 {
137 _takeLock.unlock();
138 }
139 }
140 return true;
141 }
142 finally
143 {
144 putLock.unlock();
145 }
146 }
147
148 public void put(final Runnable runnable) throws InterruptedException
149 {
150 final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
151 final ReentrantLock putLock = _putLock;
152 putLock.lock();
153
154 try
155 {
156 if(job.isRead())
157 {
158 _readJobQueue.offer(job);
159 }
160 else
161 {
162 _writeJobQueue.offer(job);
163 }
164 if(_count.getAndIncrement() == 0)
165 {
166 _takeLock.lock();
167 try
168 {
169 _notEmpty.signal();
170 }
171 finally
172 {
173 _takeLock.unlock();
174 }
175 }
176
177 }
178 finally
179 {
180 putLock.unlock();
181 }
182 }
183
184
185
186 public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException
187 {
188 final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
189 final ReentrantLock putLock = _putLock;
190 putLock.lock();
191
192 try
193 {
194 if(job.isRead())
195 {
196 _readJobQueue.offer(job);
197 }
198 else
199 {
200 _writeJobQueue.offer(job);
201 }
202 if(_count.getAndIncrement() == 0)
203 {
204 _takeLock.lock();
205 try
206 {
207 _notEmpty.signal();
208 }
209 finally
210 {
211 _takeLock.unlock();
212 }
213 }
214
215 return true;
216 }
217 finally
218 {
219 putLock.unlock();
220 }
221
222 }
223
224 public Runnable take() throws InterruptedException
225 {
226 final ReentrantLock takeLock = _takeLock;
227 takeLock.lockInterruptibly();
228 try
229 {
230 try
231 {
232 while (_count.get() == 0)
233 {
234 _notEmpty.await();
235 }
236 }
237 catch (InterruptedException ie)
238 {
239 _notEmpty.signal();
240 throw ie;
241 }
242
243 ReadWriteRunnable job = _writeJobQueue.poll();
244 if(job == null)
245 {
246 job = _readJobQueue.poll();
247 }
248 int c = _count.getAndDecrement();
249 if (c > 1)
250 {
251 _notEmpty.signal();
252 }
253 return job;
254 }
255 finally
256 {
257 takeLock.unlock();
258 }
259
260
261 }
262
263 public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException
264 {
265 final ReentrantLock takeLock = _takeLock;
266 final AtomicInteger count = _count;
267 long nanos = unit.toNanos(timeout);
268 takeLock.lockInterruptibly();
269 ReadWriteRunnable job = null;
270 try
271 {
272
273 for (;;)
274 {
275 if (count.get() > 0)
276 {
277 job = _writeJobQueue.poll();
278 if(job == null)
279 {
280 job = _readJobQueue.poll();
281 }
282 int c = count.getAndDecrement();
283 if (c > 1)
284 {
285 _notEmpty.signal();
286 }
287 break;
288 }
289 if (nanos <= 0)
290 {
291 return null;
292 }
293 try
294 {
295 nanos = _notEmpty.awaitNanos(nanos);
296 }
297 catch (InterruptedException ie)
298 {
299 _notEmpty.signal();
300 throw ie;
301 }
302 }
303 }
304 finally
305 {
306 takeLock.unlock();
307 }
308
309 return job;
310 }
311
312 public int remainingCapacity()
313 {
314 return Integer.MAX_VALUE;
315 }
316
317 public int drainTo(final Collection<? super Runnable> c)
318 {
319 int total = 0;
320
321 _putLock.lock();
322 _takeLock.lock();
323 try
324 {
325 ReadWriteRunnable job;
326 while((job = _writeJobQueue.peek())!= null)
327 {
328 c.add(job);
329 _writeJobQueue.poll();
330 _count.decrementAndGet();
331 total++;
332 }
333
334 while((job = _readJobQueue.peek())!= null)
335 {
336 c.add(job);
337 _readJobQueue.poll();
338 _count.decrementAndGet();
339 total++;
340 }
341
342 }
343 finally
344 {
345 _takeLock.unlock();
346 _putLock.unlock();
347 }
348 return total;
349 }
350
351 public int drainTo(final Collection<? super Runnable> c, final int maxElements)
352 {
353 int total = 0;
354
355 _putLock.lock();
356 _takeLock.lock();
357 try
358 {
359 ReadWriteRunnable job;
360 while(total<=maxElements && (job = _writeJobQueue.peek())!= null)
361 {
362 c.add(job);
363 _writeJobQueue.poll();
364 _count.decrementAndGet();
365 total++;
366 }
367
368 while(total<=maxElements && (job = _readJobQueue.peek())!= null)
369 {
370 c.add(job);
371 _readJobQueue.poll();
372 _count.decrementAndGet();
373 total++;
374 }
375
376 }
377 finally
378 {
379 _takeLock.unlock();
380 _putLock.unlock();
381 }
382 return total;
383
384 }
385
386 public Runnable poll()
387 {
388 final ReentrantLock takeLock = _takeLock;
389 takeLock.lock();
390 try
391 {
392 if(_count.get() > 0)
393 {
394 ReadWriteRunnable job = _writeJobQueue.poll();
395 if(job == null)
396 {
397 job = _readJobQueue.poll();
398 }
399 _count.decrementAndGet();
400 return job;
401 }
402 else
403 {
404 return null;
405 }
406 }
407 finally
408 {
409 takeLock.unlock();
410 }
411
412 }
413
414 public Runnable peek()
415 {
416 final ReentrantLock takeLock = _takeLock;
417 takeLock.lock();
418 try
419 {
420 ReadWriteRunnable job = _writeJobQueue.peek();
421 if(job == null)
422 {
423 job = _readJobQueue.peek();
424 }
425 return job;
426 }
427 finally
428 {
429 takeLock.unlock();
430 }
431 }
432 }
|