001 package org.apache.qpid.util.concurrent;
002 /*
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied. See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 *
021 */
022
023
024 import java.util.*;
025 import java.util.concurrent.TimeUnit;
026 import java.util.concurrent.locks.Condition;
027 import java.util.concurrent.locks.ReentrantLock;
028
029 import org.slf4j.Logger;
030 import org.slf4j.LoggerFactory;
031
032
033 /**
034 * Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data.
035 * Synchronous is harder. Deposit data, but then must wait until deposited element/elements are taken before being
036 * allowed to unblock and continue. Consumer needs some options here too. Can just get the data from the buffer and
037 * allow any producers unblocked as a result to continue, or can get data but continue blocking while the data is
038 * processed before sending a message to do the unblocking. Synch/Asynch mode to be controlled by a switch.
039 * Unblocking/not unblocking during consumer processing to be controlled by the consumers calls.
040 *
041 * <p/>Implementing sub-classes only need to supply an implementation of a queue to produce a valid concrete
042 * implementation of this. This queue is only accessed through the methods {@link #insert}, {@link #extract},
043 * {@link #getBufferCapacity()}, {@link #peekAtBufferHead()}. An implementation can override these methods to implement
044 * the buffer other than by a queue, for example, by using an array.
045 *
046 * <p/>Normal queue methods to work asynchronously.
047 * <p/>Put, take and drain methods from the BlockingQueue interface work synchronously but unblock producers immediately
048 * when their data is taken.
049 * <p/>The additional put, take and drain methods from the BatchSynchQueue interface work synchronously and provide the
050 * option to keep producers blocked until the consumer decides to release them.
051 *
052 * <p/>Removed take method that keeps producers blocked as it is pointless. Essentially it reduces this class to
053 * synchronous processing of individual data items, which negates the point of the hand-off design. The efficiency
054 * gain of the hand off design comes in being able to batch consume requests, ammortizing latency (such as caused by io)
055 * accross many producers. The only advantage of the single blocking take method is that it did take advantage of the
056 * queue ordering, which ma be usefull, for example to apply a priority ordering amongst producers. This is also an
057 * advantage over the java.util.concurrent.SynchronousQueue which doesn't have a backing queue which can be used to
058 * apply orderings. If a single item take is really needed can just use the drainTo method with a maximum of one item.
059 *
060 * <p/><table id="crc"><caption>CRC Card</caption>
061 * <tr><th> Responsibilities <th> Collaborations
062 * </table>
063 */
064 public abstract class BatchSynchQueueBase<E> extends AbstractQueue<E> implements BatchSynchQueue<E>
065 {
066 /** Used for logging. */
067 private static final Logger log = LoggerFactory.getLogger(BatchSynchQueueBase.class);
068
069 /** Holds a reference to the queue implementation that holds the buffer. */
070 Queue<SynchRecordImpl<E>> buffer;
071
072 /** Holds the number of items in the queue */
073 private int count;
074
075 /** Main lock guarding all access */
076 private ReentrantLock lock;
077
078 /** Condition for waiting takes */
079 private Condition notEmpty;
080
081 /** Condition for waiting puts */
082 private Condition notFull;
083
084 /**
085 * Creates a batch synch queue without fair thread scheduling.
086 */
087 public BatchSynchQueueBase()
088 {
089 this(false);
090 }
091
092 /**
093 * Ensures that the underlying buffer implementation is created.
094 *
095 * @param fair <tt>true</tt> if fairness is to be applied to threads waiting to access the buffer.
096 */
097 public BatchSynchQueueBase(boolean fair)
098 {
099 buffer = this.createQueue();
100
101 // Create the buffer lock with the fairness flag set accordingly.
102 lock = new ReentrantLock(fair);
103
104 // Create the non-empty and non-full condition monitors on the buffer lock.
105 notEmpty = lock.newCondition();
106 notFull = lock.newCondition();
107 }
108
109 /**
110 * Returns an iterator over the elements contained in this collection.
111 *
112 * @return An iterator over the elements contained in this collection.
113 */
114 public Iterator<E> iterator()
115 {
116 throw new RuntimeException("Not implemented.");
117 }
118
119 /**
120 * Returns the number of elements in this collection. If the collection contains more than
121 * <tt>Integer.MAX_VALUE</tt> elements, returns <tt>Integer.MAX_VALUE</tt>.
122 *
123 * @return The number of elements in this collection.
124 */
125 public int size()
126 {
127 final ReentrantLock lock = this.lock;
128 lock.lock();
129
130 try
131 {
132 return count;
133 }
134 finally
135 {
136 lock.unlock();
137 }
138 }
139
140 /**
141 * Inserts the specified element into this queue, if possible. When using queues that may impose insertion
142 * restrictions (for example capacity bounds), method <tt>offer</tt> is generally preferable to method
143 * {@link java.util.Collection#add}, which can fail to insert an element only by throwing an exception.
144 *
145 * @param e The element to insert.
146 *
147 * @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt>
148 */
149 public boolean offer(E e)
150 {
151 if (e == null)
152 {
153 throw new NullPointerException();
154 }
155
156 final ReentrantLock lock = this.lock;
157 lock.lock();
158
159 try
160 {
161 return insert(e, false);
162 }
163 finally
164 {
165 lock.unlock();
166 }
167 }
168
169 /**
170 * Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to
171 * become available.
172 *
173 * @param e The element to add.
174 * @param timeout How long to wait before giving up, in units of <tt>unit</tt>
175 * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
176 *
177 * @return <tt>true</tt> if successful, or <tt>false</tt> if the specified waiting time elapses before space is
178 * available.
179 *
180 * @throws InterruptedException If interrupted while waiting.
181 * @throws NullPointerException If the specified element is <tt>null</tt>.
182 */
183 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
184 {
185 if (e == null)
186 {
187 throw new NullPointerException();
188 }
189
190 final ReentrantLock lock = this.lock;
191 lock.lockInterruptibly();
192
193 long nanos = unit.toNanos(timeout);
194
195 try
196 {
197 do
198 {
199 if (insert(e, false))
200 {
201 return true;
202 }
203
204 try
205 {
206 nanos = notFull.awaitNanos(nanos);
207 }
208 catch (InterruptedException ie)
209 {
210 notFull.signal(); // propagate to non-interrupted thread
211 throw ie;
212 }
213 }
214 while (nanos > 0);
215
216 return false;
217 }
218 finally
219 {
220 lock.unlock();
221 }
222 }
223
224 /**
225 * Retrieves and removes the head of this queue, or <tt>null</tt> if this queue is empty.
226 *
227 * @return The head of this queue, or <tt>null</tt> if this queue is empty.
228 */
229 public E poll()
230 {
231 final ReentrantLock lock = this.lock;
232
233 lock.lock();
234 try
235 {
236 if (count == 0)
237 {
238 return null;
239 }
240
241 E x = extract(true, true).getElement();
242
243 return x;
244 }
245 finally
246 {
247 lock.unlock();
248 }
249 }
250
251 /**
252 * Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements
253 * are present on this queue.
254 *
255 * @param timeout How long to wait before giving up, in units of <tt>unit</tt>.
256 * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
257 *
258 * @return The head of this queue, or <tt>null</tt> if the specified waiting time elapses before an element is present.
259 *
260 * @throws InterruptedException If interrupted while waiting.
261 */
262 public E poll(long timeout, TimeUnit unit) throws InterruptedException
263 {
264 final ReentrantLock lock = this.lock;
265 lock.lockInterruptibly();
266 try
267 {
268 long nanos = unit.toNanos(timeout);
269
270 do
271 {
272 if (count != 0)
273 {
274 E x = extract(true, true).getElement();
275
276 return x;
277 }
278
279 try
280 {
281 nanos = notEmpty.awaitNanos(nanos);
282 }
283 catch (InterruptedException ie)
284 {
285 notEmpty.signal(); // propagate to non-interrupted thread
286 throw ie;
287 }
288 }
289 while (nanos > 0);
290
291 return null;
292 }
293 finally
294 {
295 lock.unlock();
296 }
297 }
298
299 /**
300 * Retrieves, but does not remove, the head of this queue, returning <tt>null</tt> if this queue is empty.
301 *
302 * @return The head of this queue, or <tt>null</tt> if this queue is empty.
303 */
304 public E peek()
305 {
306 final ReentrantLock lock = this.lock;
307 lock.lock();
308
309 try
310 {
311 return peekAtBufferHead();
312 }
313 finally
314 {
315 lock.unlock();
316 }
317 }
318
319 /**
320 * Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints)
321 * accept without blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic limit.
322 *
323 * <p>Note that you <em>cannot</em> always tell if an attempt to <tt>add</tt> an element will succeed by
324 * inspecting <tt>remainingCapacity</tt> because it may be the case that another thread is about to <tt>put</tt>
325 * or <tt>take</tt> an element.
326 *
327 * @return The remaining capacity.
328 */
329 public int remainingCapacity()
330 {
331 final ReentrantLock lock = this.lock;
332 lock.lock();
333
334 try
335 {
336 return getBufferCapacity() - count;
337 }
338 finally
339 {
340 lock.unlock();
341 }
342 }
343
344 /**
345 * Adds the specified element to this queue, waiting if necessary for space to become available.
346 *
347 * <p/>This method delegated to {@link #tryPut} which can raise {@link SynchException}s. If any are raised
348 * this method silently ignores them. Use the {@link #tryPut} method directly if you want to catch these
349 * exceptions.
350 *
351 * @param e The element to add.
352 *
353 * @throws InterruptedException If interrupted while waiting.
354 */
355 public void put(E e) throws InterruptedException
356 {
357 try
358 {
359 tryPut(e);
360 }
361 catch (SynchException ex)
362 {
363 // This exception is deliberately ignored. See the method comment for information about this.
364 }
365 }
366
367 /**
368 * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the
369 * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}.
370 *
371 * @param e The data element to put into the queue. Cannot be null.
372 *
373 * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting
374 * on its entry in the queue being consumed.
375 * @throws SynchException If a consumer encounters an error whilst processing the data element.
376 */
377 public void tryPut(E e) throws InterruptedException, SynchException
378 {
379 if (e == null)
380 {
381 throw new NullPointerException();
382 }
383
384 // final Queue<E> items = this.buffer;
385 final ReentrantLock lock = this.lock;
386 lock.lockInterruptibly();
387
388 try
389 {
390 while (count == getBufferCapacity())
391 {
392 // Release the lock and wait until the queue is not full.
393 notFull.await();
394 }
395 }
396 catch (InterruptedException ie)
397 {
398 notFull.signal(); // propagate to non-interrupted thread
399 throw ie;
400 }
401
402 // There is room in the queue so insert must succeed. Insert into the queu, release the lock and block
403 // the producer until its data is taken.
404 insert(e, true);
405 }
406
407 /**
408 * Retrieves and removes the head of this queue, waiting if no elements are present on this queue.
409 * Any producer that has its data element taken by this call will be immediately unblocked. To keep the
410 * producer blocked whilst taking just a single item, use the
411 * {@link #drainTo(java.util.Collection<org.apache.qpid.util.concurrent.SynchRecord<E>>, int, boolean)}
412 * method. There is no take method to do that because there is not usually any advantage in a synchronous hand
413 * off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption
414 * latencies accross many producers where possible.
415 *
416 * @return The head of this queue.
417 *
418 * @throws InterruptedException if interrupted while waiting.
419 */
420 public E take() throws InterruptedException
421 {
422 final ReentrantLock lock = this.lock;
423 lock.lockInterruptibly();
424
425 try
426 {
427 try
428 {
429 while (count == 0)
430 {
431 // Release the lock and wait until the queue becomes non-empty.
432 notEmpty.await();
433 }
434 }
435 catch (InterruptedException ie)
436 {
437 notEmpty.signal(); // propagate to non-interrupted thread
438 throw ie;
439 }
440
441 // There is data in the queue so extraction must succeed. Notify any waiting threads that the queue is
442 // not full, and unblock the producer that owns the data item that is taken.
443 E x = extract(true, true).getElement();
444
445 return x;
446 }
447 finally
448 {
449 lock.unlock();
450 }
451 }
452
453 /**
454 * Removes all available elements from this queue and adds them into the given collection. This operation may be
455 * more efficient than repeatedly polling this queue. A failure encountered while attempting to <tt>add</tt> elements
456 * to collection <tt>c</tt> may result in elements being in neither, either or both collections when the associated
457 * exception is thrown. Attempts to drain a queue to itself result in <tt>IllegalArgumentException</tt>. Further,
458 * the behavior of this operation is undefined if the specified collection is modified while the operation is in
459 * progress.
460 *
461 * @param objects The collection to transfer elements into.
462 *
463 * @return The number of elements transferred.
464 *
465 * @throws NullPointerException If objects is null.
466 * @throws IllegalArgumentException If objects is this queue.
467 */
468 public int drainTo(Collection<? super E> objects)
469 {
470 return drainTo(objects, -1);
471 }
472
473 /**
474 * Removes at most the given number of available elements from this queue and adds them into the given collection.
475 * A failure encountered while attempting to <tt>add</tt> elements to collection <tt>c</tt> may result in elements
476 * being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue
477 * to itself result in <tt>IllegalArgumentException</tt>. Further, the behavior of this operation is undefined if
478 * the specified collection is modified while the operation is in progress.
479 *
480 * @param objects The collection to transfer elements into.
481 * @param maxElements The maximum number of elements to transfer. If this is -1 then that is interpreted as meaning
482 * all elements.
483 *
484 * @return The number of elements transferred.
485 *
486 * @throws NullPointerException If c is null.
487 * @throws IllegalArgumentException If c is this queue.
488 */
489 public int drainTo(Collection<? super E> objects, int maxElements)
490 {
491 if (objects == null)
492 {
493 throw new NullPointerException();
494 }
495
496 if (objects == this)
497 {
498 throw new IllegalArgumentException();
499 }
500
501 // final Queue<E> items = this.buffer;
502 final ReentrantLock lock = this.lock;
503 lock.lock();
504
505 try
506 {
507 int n = 0;
508
509 for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++)
510 {
511 // Take items from the queue, do unblock the producers, but don't send not full signals yet.
512 objects.add(extract(true, false).getElement());
513 }
514
515 if (n > 0)
516 {
517 // count -= n;
518 notFull.signalAll();
519 }
520
521 return n;
522 }
523 finally
524 {
525 lock.unlock();
526 }
527 }
528
529 /**
530 * Takes all available data items from the queue or blocks until some become available. The returned items
531 * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
532 * producers, where the producers are still blocked.
533 *
534 * @param c The collection to drain the data items into.
535 * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
536 *
537 * @return A count of the number of elements that were drained from the queue.
538 */
539 public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock)
540 {
541 return drainTo(c, -1, unblock);
542 }
543
544 /**
545 * Takes up to maxElements available data items from the queue or blocks until some become available. The returned
546 * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
547 * producers, where the producers are still blocked.
548 *
549 * @param coll The collection to drain the data items into.
550 * @param maxElements The maximum number of elements to drain.
551 * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
552 *
553 * @return A count of the number of elements that were drained from the queue.
554 */
555 public SynchRef drainTo(Collection<SynchRecord<E>> coll, int maxElements, boolean unblock)
556 {
557 if (coll == null)
558 {
559 throw new NullPointerException();
560 }
561
562 // final Queue<E> items = this.buffer;
563 final ReentrantLock lock = this.lock;
564 lock.lock();
565
566 try
567 {
568 int n = 0;
569
570 for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++)
571 {
572 // Extract the next record from the queue, don't signall the not full condition yet and release
573 // producers depending on whether the caller wants to or not.
574 coll.add(extract(false, unblock));
575 }
576
577 if (n > 0)
578 {
579 // count -= n;
580 notFull.signalAll();
581 }
582
583 return new SynchRefImpl(n, coll);
584 }
585 finally
586 {
587 lock.unlock();
588 }
589 }
590
591 /**
592 * This abstract method should be overriden to return an empty queue. Different implementations of producer
593 * consumer buffers can control the order in which data is accessed using different queue implementations.
594 * This method allows the type of queue to be abstracted out of this class and to be supplied by concrete
595 * implementations.
596 *
597 * @return An empty queue.
598 */
599 protected abstract <T> Queue<T> createQueue();
600
601 /**
602 * Insert element into the queue, then possibly signal that the queue is not empty and block the producer
603 * on the element until permission to procede is given.
604 *
605 * <p/>If the producer is to be blocked then the lock must be released first, otherwise no other process
606 * will be able to get access to the queue. Hence, unlock and block are always set together.
607 *
608 * <p/>Call only when holding the global lock.
609 *
610 * @param unlockAndBlock <tt>true</tt>If the global queue lock should be released and the producer should be blocked.
611 *
612 * @return <tt>true</tt> if the operation succeeded, <tt>false</tt> otherwise. If the result is <tt>true</tt> this
613 * method may not return straight away, but only after the producer is unblocked by having its data
614 * consumed if the unlockAndBlock flag is set. In the false case the method will return straight away, no
615 * matter what value the unlockAndBlock flag has, leaving the global lock on.
616 */
617 protected boolean insert(E x, boolean unlockAndBlock)
618 {
619 // Create a new record for the data item.
620 SynchRecordImpl<E> record = new SynchRecordImpl<E>(x);
621
622 boolean result = buffer.offer(record);
623
624 if (result)
625 {
626 count++;
627
628 // Tell any waiting consumers that the queue is not empty.
629 notEmpty.signal();
630
631 if (unlockAndBlock)
632 {
633 // Allow other threads to read/write the queue.
634 lock.unlock();
635
636 // Wait until a consumer takes this data item.
637 record.waitForConsumer();
638 }
639
640 return true;
641 }
642 else
643 {
644 return false;
645 }
646 }
647
648 /**
649 * Extract element at current take position, advance, and signal.
650 *
651 * <p/>Call only when holding lock.
652 */
653 protected SynchRecordImpl<E> extract(boolean unblock, boolean signal)
654 {
655 SynchRecordImpl<E> result = buffer.remove();
656 count--;
657
658 if (signal)
659 {
660 notFull.signal();
661 }
662
663 if (unblock)
664 {
665 result.releaseImmediately();
666 }
667
668 return result;
669 }
670
671 /**
672 * Get the capacity of the buffer. If the buffer has no maximum capacity then Integer.MAX_VALUE is returned.
673 *
674 * <p/>Call only when holding lock.
675 *
676 * @return The maximum capacity of the buffer.
677 */
678 protected int getBufferCapacity()
679 {
680 if (buffer instanceof Capacity)
681 {
682 return ((Capacity) buffer).getCapacity();
683 }
684 else
685 {
686 return Integer.MAX_VALUE;
687 }
688 }
689
690 /**
691 * Return the head element from the buffer.
692 *
693 * <p/>Call only when holding lock.
694 *
695 * @return The head element from the buffer.
696 */
697 protected E peekAtBufferHead()
698 {
699 return buffer.peek().getElement();
700 }
701
702 public class SynchRefImpl implements SynchRef
703 {
704 /** Holds the number of synch records associated with this reference. */
705 int numRecords;
706
707 /** Holds a reference to the collection of synch records managed by this. */
708 Collection<SynchRecord<E>> records;
709
710 public SynchRefImpl(int n, Collection<SynchRecord<E>> records)
711 {
712 this.numRecords = n;
713 this.records = records;
714 }
715
716 public int getNumRecords()
717 {
718 return numRecords;
719 }
720
721 /**
722 * Any producers that have had their data elements taken from the queue but have not been unblocked are unblocked
723 * when this method is called. The exception to this is producers that have had their data put back onto the queue
724 * by a consumer. Producers that have had exceptions for their data items registered by consumers will be unblocked
725 * but will not return from their put call normally, but with an exception instead.
726 */
727 public void unblockProducers()
728 {
729 log.debug("public void unblockProducers(): called");
730
731 if (records != null)
732 {
733 for (SynchRecord<E> record : records)
734 {
735 // This call takes account of items that have already been released, are to be requeued or are in
736 // error.
737 record.releaseImmediately();
738 }
739 }
740
741 records = null;
742 }
743 }
744
745 /**
746 * A SynchRecordImpl is used by a {@link BatchSynchQueue} to pair together a producer with its data. This allows
747 * the producer of data to be identified so that it can be unblocked when its data is consumed or sent errors when
748 * its data cannot be consumed.
749 */
750 public class SynchRecordImpl<E> implements SynchRecord<E>
751 {
752 /** A boolean latch that determines when the producer for this data item will be allowed to continue. */
753 BooleanLatch latch = new BooleanLatch();
754
755 /** The data element associated with this item. */
756 E element;
757
758 /**
759 * Create a new synch record.
760 *
761 * @param e The data element that the record encapsulates.
762 */
763 public SynchRecordImpl(E e)
764 {
765 // Keep the data element.
766 element = e;
767 }
768
769 /**
770 * Waits until the producer is given permission to proceded by a consumer.
771 */
772 public void waitForConsumer()
773 {
774 latch.await();
775 }
776
777 /**
778 * Gets the data element contained by this record.
779 *
780 * @return The data element contained by this record.
781 */
782 public E getElement()
783 {
784 return element;
785 }
786
787 /**
788 * Immediately releases the producer of this data record. Consumers can bring the synchronization time of
789 * producers to a minimum by using this method to release them at the earliest possible moment when batch
790 * consuming records from sychronized producers.
791 */
792 public void releaseImmediately()
793 {
794 // Check that the record has not already been released, is in error or is to be requeued.
795 latch.signal();
796
797 // Propagate errors to the producer.
798
799 // Requeue items to be requeued.
800 }
801
802 /**
803 * Tells the synch queue to put this element back onto the queue instead of releasing its producer.
804 * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method or
805 * the {@link #releaseImmediately()} method.
806 *
807 * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
808 * element has already been unblocked.
809 */
810 public void reQueue()
811 {
812 throw new RuntimeException("Not implemented.");
813 }
814
815 /**
816 * Tells the synch queue to raise an exception with this elements producer. The exception is not raised
817 * immediately but upon calling the {@link SynchRef#unblockProducers()} method or the
818 * {@link #releaseImmediately()} method. The exception will be wrapped in a {@link SynchException} before it is
819 * raised on the producer.
820 *
821 * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used
822 * because the exception is to be passed onto a different thread.
823 *
824 * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
825 * element has already been unblocked.
826 *
827 * @param e The exception to raise on the producer.
828 */
829 public void inError(Exception e)
830 {
831 throw new RuntimeException("Not implemented.");
832 }
833 }
834 }
|