BatchSynchQueueBase.java
001 package org.apache.qpid.util.concurrent;
002 /*
003  
004  * Licensed to the Apache Software Foundation (ASF) under one
005  * or more contributor license agreements.  See the NOTICE file
006  * distributed with this work for additional information
007  * regarding copyright ownership.  The ASF licenses this file
008  * to you under the Apache License, Version 2.0 (the
009  * "License"); you may not use this file except in compliance
010  * with the License.  You may obtain a copy of the License at
011  
012  *   http://www.apache.org/licenses/LICENSE-2.0
013  
014  * Unless required by applicable law or agreed to in writing,
015  * software distributed under the License is distributed on an
016  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017  * KIND, either express or implied.  See the License for the
018  * specific language governing permissions and limitations
019  * under the License.
020  
021  */
022 
023 
024 import java.util.*;
025 import java.util.concurrent.TimeUnit;
026 import java.util.concurrent.locks.Condition;
027 import java.util.concurrent.locks.ReentrantLock;
028 
029 import org.slf4j.Logger;
030 import org.slf4j.LoggerFactory;
031 
032 
033 /**
034  * Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data.
035  * Synchronous is harder. Deposit data, but then must wait until deposited element/elements are taken before being
036  * allowed to unblock and continue. Consumer needs some options here too. Can just get the data from the buffer and
037  * allow any producers unblocked as a result to continue, or can get data but continue blocking while the data is
038  * processed before sending a message to do the unblocking. Synch/Asynch mode to be controlled by a switch.
039  * Unblocking/not unblocking during consumer processing to be controlled by the consumers calls.
040  *
041  <p/>Implementing sub-classes only need to supply an implementation of a queue to produce a valid concrete
042  * implementation of this. This queue is only accessed through the methods {@link #insert}{@link #extract},
043  {@link #getBufferCapacity()}{@link #peekAtBufferHead()}. An implementation can override these methods to implement
044  * the buffer other than by a queue, for example, by using an array.
045  *
046  <p/>Normal queue methods to work asynchronously.
047  <p/>Put, take and drain methods from the BlockingQueue interface work synchronously but unblock producers immediately
048  * when their data is taken.
049  <p/>The additional put, take and drain methods from the BatchSynchQueue interface work synchronously and provide the
050  * option to keep producers blocked until the consumer decides to release them.
051  *
052  <p/>Removed take method that keeps producers blocked as it is pointless. Essentially it reduces this class to
053  * synchronous processing of individual data items, which negates the point of the hand-off design. The efficiency
054  * gain of the hand off design comes in being able to batch consume requests, ammortizing latency (such as caused by io)
055  * accross many producers. The only advantage of the single blocking take method is that it did take advantage of the
056  * queue ordering, which ma be usefull, for example to apply a priority ordering amongst producers. This is also an
057  * advantage over the java.util.concurrent.SynchronousQueue which doesn't have a backing queue which can be used to
058  * apply orderings. If a single item take is really needed can just use the drainTo method with a maximum of one item.
059  *
060  <p/><table id="crc"><caption>CRC Card</caption>
061  <tr><th> Responsibilities <th> Collaborations
062  </table>
063  */
064 public abstract class BatchSynchQueueBase<E> extends AbstractQueue<E> implements BatchSynchQueue<E>
065 {
066     /** Used for logging. */
067     private static final Logger log = LoggerFactory.getLogger(BatchSynchQueueBase.class);
068 
069     /** Holds a reference to the queue implementation that holds the buffer. */
070     Queue<SynchRecordImpl<E>> buffer;
071 
072     /** Holds the number of items in the queue */
073     private int count;
074 
075     /** Main lock guarding all access */
076     private ReentrantLock lock;
077 
078     /** Condition for waiting takes */
079     private Condition notEmpty;
080 
081     /** Condition for waiting puts */
082     private Condition notFull;
083 
084     /**
085      * Creates a batch synch queue without fair thread scheduling.
086      */
087     public BatchSynchQueueBase()
088     {
089         this(false);
090     }
091 
092     /**
093      * Ensures that the underlying buffer implementation is created.
094      *
095      @param fair <tt>true</tt> if fairness is to be applied to threads waiting to access the buffer.
096      */
097     public BatchSynchQueueBase(boolean fair)
098     {
099         buffer = this.createQueue();
100 
101         // Create the buffer lock with the fairness flag set accordingly.
102         lock = new ReentrantLock(fair);
103 
104         // Create the non-empty and non-full condition monitors on the buffer lock.
105         notEmpty = lock.newCondition();
106         notFull = lock.newCondition();
107     }
108 
109     /**
110      * Returns an iterator over the elements contained in this collection.
111      *
112      @return An iterator over the elements contained in this collection.
113      */
114     public Iterator<E> iterator()
115     {
116         throw new RuntimeException("Not implemented.");
117     }
118 
119     /**
120      * Returns the number of elements in this collection.  If the collection contains more than
121      <tt>Integer.MAX_VALUE</tt> elements, returns <tt>Integer.MAX_VALUE</tt>.
122      *
123      @return The number of elements in this collection.
124      */
125     public int size()
126     {
127         final ReentrantLock lock = this.lock;
128         lock.lock();
129 
130         try
131         {
132             return count;
133         }
134         finally
135         {
136             lock.unlock();
137         }
138     }
139 
140     /**
141      * Inserts the specified element into this queue, if possible. When using queues that may impose insertion
142      * restrictions (for example capacity bounds), method <tt>offer</tt> is generally preferable to method
143      {@link java.util.Collection#add}, which can fail to insert an element only by throwing an exception.
144      *
145      @param e The element to insert.
146      *
147      @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt>
148      */
149     public boolean offer(E e)
150     {
151         if (e == null)
152         {
153             throw new NullPointerException();
154         }
155 
156         final ReentrantLock lock = this.lock;
157         lock.lock();
158 
159         try
160         {
161             return insert(e, false);
162         }
163         finally
164         {
165             lock.unlock();
166         }
167     }
168 
169     /**
170      * Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to
171      * become available.
172      *
173      @param e       The element to add.
174      @param timeout How long to wait before giving up, in units of <tt>unit</tt>
175      @param unit    A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
176      *
177      @return <tt>true</tt> if successful, or <tt>false</tt> if the specified waiting time elapses before space is
178      *         available.
179      *
180      @throws InterruptedException If interrupted while waiting.
181      @throws NullPointerException If the specified element is <tt>null</tt>.
182      */
183     public boolean offer(E e, long timeout, TimeUnit unitthrows InterruptedException
184     {
185         if (e == null)
186         {
187             throw new NullPointerException();
188         }
189 
190         final ReentrantLock lock = this.lock;
191         lock.lockInterruptibly();
192 
193         long nanos = unit.toNanos(timeout);
194 
195         try
196         {
197             do
198             {
199                 if (insert(e, false))
200                 {
201                     return true;
202                 }
203 
204                 try
205                 {
206                     nanos = notFull.awaitNanos(nanos);
207                 }
208                 catch (InterruptedException ie)
209                 {
210                     notFull.signal()// propagate to non-interrupted thread
211                     throw ie;
212                 }
213             }
214             while (nanos > 0);
215 
216             return false;
217         }
218         finally
219         {
220             lock.unlock();
221         }
222     }
223 
224     /**
225      * Retrieves and removes the head of this queue, or <tt>null</tt> if this queue is empty.
226      *
227      @return The head of this queue, or <tt>null</tt> if this queue is empty.
228      */
229     public E poll()
230     {
231         final ReentrantLock lock = this.lock;
232 
233         lock.lock();
234         try
235         {
236             if (count == 0)
237             {
238                 return null;
239             }
240 
241             E x = extract(true, true).getElement();
242 
243             return x;
244         }
245         finally
246         {
247             lock.unlock();
248         }
249     }
250 
251     /**
252      * Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements
253      * are present on this queue.
254      *
255      @param timeout How long to wait before giving up, in units of <tt>unit</tt>.
256      @param unit    A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
257      *
258      @return The head of this queue, or <tt>null</tt> if the specified waiting time elapses before an element is present.
259      *
260      @throws InterruptedException If interrupted while waiting.
261      */
262     public E poll(long timeout, TimeUnit unitthrows InterruptedException
263     {
264         final ReentrantLock lock = this.lock;
265         lock.lockInterruptibly();
266         try
267         {
268             long nanos = unit.toNanos(timeout);
269 
270             do
271             {
272                 if (count != 0)
273                 {
274                     E x = extract(true, true).getElement();
275 
276                     return x;
277                 }
278 
279                 try
280                 {
281                     nanos = notEmpty.awaitNanos(nanos);
282                 }
283                 catch (InterruptedException ie)
284                 {
285                     notEmpty.signal()// propagate to non-interrupted thread
286                     throw ie;
287                 }
288             }
289             while (nanos > 0);
290 
291             return null;
292         }
293         finally
294         {
295             lock.unlock();
296         }
297     }
298 
299     /**
300      * Retrieves, but does not remove, the head of this queue, returning <tt>null</tt> if this queue is empty.
301      *
302      @return The head of this queue, or <tt>null</tt> if this queue is empty.
303      */
304     public E peek()
305     {
306         final ReentrantLock lock = this.lock;
307         lock.lock();
308 
309         try
310         {
311             return peekAtBufferHead();
312         }
313         finally
314         {
315             lock.unlock();
316         }
317     }
318 
319     /**
320      * Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints)
321      * accept without blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic limit.
322      *
323      <p>Note that you <em>cannot</em> always tell if an attempt to <tt>add</tt> an element will succeed by
324      * inspecting <tt>remainingCapacity</tt> because it may be the case that another thread is about to <tt>put</tt>
325      * or <tt>take</tt> an element.
326      *
327      @return The remaining capacity.
328      */
329     public int remainingCapacity()
330     {
331         final ReentrantLock lock = this.lock;
332         lock.lock();
333 
334         try
335         {
336             return getBufferCapacity() - count;
337         }
338         finally
339         {
340             lock.unlock();
341         }
342     }
343 
344     /**
345      * Adds the specified element to this queue, waiting if necessary for space to become available.
346      *
347      <p/>This method delegated to {@link #tryPut} which can raise {@link SynchException}s. If any are raised
348      * this method silently ignores them. Use the {@link #tryPut} method directly if you want to catch these
349      * exceptions.
350      *
351      @param e The element to add.
352      *
353      @throws InterruptedException If interrupted while waiting.
354      */
355     public void put(E ethrows InterruptedException
356     {
357         try
358         {
359             tryPut(e);
360         }
361         catch (SynchException ex)
362         {
363             // This exception is deliberately ignored. See the method comment for information about this.
364         }
365     }
366 
367     /**
368      * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the
369      * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}.
370      *
371      @param e The data element to put into the queue. Cannot be null.
372      *
373      @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting
374      *                              on its entry in the queue being consumed.
375      @throws SynchException       If a consumer encounters an error whilst processing the data element.
376      */
377     public void tryPut(E ethrows InterruptedException, SynchException
378     {
379         if (e == null)
380         {
381             throw new NullPointerException();
382         }
383 
384         // final Queue<E> items = this.buffer;
385         final ReentrantLock lock = this.lock;
386         lock.lockInterruptibly();
387 
388         try
389         {
390             while (count == getBufferCapacity())
391             {
392                 // Release the lock and wait until the queue is not full.
393                 notFull.await();
394             }
395         }
396         catch (InterruptedException ie)
397         {
398             notFull.signal()// propagate to non-interrupted thread
399             throw ie;
400         }
401 
402         // There is room in the queue so insert must succeed. Insert into the queu, release the lock and block
403         // the producer until its data is taken.
404         insert(e, true);
405     }
406 
407     /**
408      * Retrieves and removes the head of this queue, waiting if no elements are present on this queue.
409      * Any producer that has its data element taken by this call will be immediately unblocked. To keep the
410      * producer blocked whilst taking just a single item, use the
411      * {@link #drainTo(java.util.Collection<org.apache.qpid.util.concurrent.SynchRecord<E>>, int, boolean)}
412      * method. There is no take method to do that because there is not usually any advantage in a synchronous hand
413      * off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption
414      * latencies accross many producers where possible.
415      *
416      @return The head of this queue.
417      *
418      @throws InterruptedException if interrupted while waiting.
419      */
420     public E take() throws InterruptedException
421     {
422         final ReentrantLock lock = this.lock;
423         lock.lockInterruptibly();
424 
425         try
426         {
427             try
428             {
429                 while (count == 0)
430                 {
431                     // Release the lock and wait until the queue becomes non-empty.
432                     notEmpty.await();
433                 }
434             }
435             catch (InterruptedException ie)
436             {
437                 notEmpty.signal()// propagate to non-interrupted thread
438                 throw ie;
439             }
440 
441             // There is data in the queue so extraction must succeed. Notify any waiting threads that the queue is
442             // not full, and unblock the producer that owns the data item that is taken.
443             E x = extract(true, true).getElement();
444 
445             return x;
446         }
447         finally
448         {
449             lock.unlock();
450         }
451     }
452 
453     /**
454      * Removes all available elements from this queue and adds them into the given collection.  This operation may be
455      * more efficient than repeatedly polling this queue. A failure encountered while attempting to <tt>add</tt> elements
456      * to collection <tt>c</tt> may result in elements being in neither, either or both collections when the associated
457      * exception is thrown. Attempts to drain a queue to itself result in <tt>IllegalArgumentException</tt>. Further,
458      * the behavior of this operation is undefined if the specified collection is modified while the operation is in
459      * progress.
460      *
461      @param objects The collection to transfer elements into.
462      *
463      @return The number of elements transferred.
464      *
465      @throws NullPointerException     If objects is null.
466      @throws IllegalArgumentException If objects is this queue.
467      */
468     public int drainTo(Collection<? super E> objects)
469     {
470         return drainTo(objects, -1);
471     }
472 
473     /**
474      * Removes at most the given number of available elements from this queue and adds them into the given collection.
475      * A failure encountered while attempting to <tt>add</tt> elements to collection <tt>c</tt> may result in elements
476      * being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue
477      * to itself result in <tt>IllegalArgumentException</tt>. Further, the behavior of this operation is undefined if
478      * the specified collection is modified while the operation is in progress.
479      *
480      @param objects     The collection to transfer elements into.
481      @param maxElements The maximum number of elements to transfer. If this is -1 then that is interpreted as meaning
482      *                    all elements.
483      *
484      @return The number of elements transferred.
485      *
486      @throws NullPointerException     If c is null.
487      @throws IllegalArgumentException If c is this queue.
488      */
489     public int drainTo(Collection<? super E> objects, int maxElements)
490     {
491         if (objects == null)
492         {
493             throw new NullPointerException();
494         }
495 
496         if (objects == this)
497         {
498             throw new IllegalArgumentException();
499         }
500 
501         // final Queue<E> items = this.buffer;
502         final ReentrantLock lock = this.lock;
503         lock.lock();
504 
505         try
506         {
507             int n = 0;
508 
509             for (int max = ((maxElements >= count|| (maxElements < 0)) ? count : maxElements; n < max; n++)
510             {
511                 // Take items from the queue, do unblock the producers, but don't send not full signals yet.
512                 objects.add(extract(true, false).getElement());
513             }
514 
515             if (n > 0)
516             {
517                 // count -= n;
518                 notFull.signalAll();
519             }
520 
521             return n;
522         }
523         finally
524         {
525             lock.unlock();
526         }
527     }
528 
529     /**
530      * Takes all available data items from the queue or blocks until some become available. The returned items
531      * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
532      * producers, where the producers are still blocked.
533      *
534      @param c       The collection to drain the data items into.
535      @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
536      *
537      @return A count of the number of elements that were drained from the queue.
538      */
539     public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock)
540     {
541         return drainTo(c, -1, unblock);
542     }
543 
544     /**
545      * Takes up to maxElements available data items from the queue or blocks until some become available. The returned
546      * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
547      * producers, where the producers are still blocked.
548      *
549      @param coll        The collection to drain the data items into.
550      @param maxElements The maximum number of elements to drain.
551      @param unblock     If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
552      *
553      @return A count of the number of elements that were drained from the queue.
554      */
555     public SynchRef drainTo(Collection<SynchRecord<E>> coll, int maxElements, boolean unblock)
556     {
557         if (coll == null)
558         {
559             throw new NullPointerException();
560         }
561 
562         // final Queue<E> items = this.buffer;
563         final ReentrantLock lock = this.lock;
564         lock.lock();
565 
566         try
567         {
568             int n = 0;
569 
570             for (int max = ((maxElements >= count|| (maxElements < 0)) ? count : maxElements; n < max; n++)
571             {
572                 // Extract the next record from the queue, don't signall the not full condition yet and release
573                 // producers depending on whether the caller wants to or not.
574                 coll.add(extract(false, unblock));
575             }
576 
577             if (n > 0)
578             {
579                 // count -= n;
580                 notFull.signalAll();
581             }
582 
583             return new SynchRefImpl(n, coll);
584         }
585         finally
586         {
587             lock.unlock();
588         }
589     }
590 
591     /**
592      * This abstract method should be overriden to return an empty queue. Different implementations of producer
593      * consumer buffers can control the order in which data is accessed using different queue implementations.
594      * This method allows the type of queue to be abstracted out of this class and to be supplied by concrete
595      * implementations.
596      *
597      @return An empty queue.
598      */
599     protected abstract <T> Queue<T> createQueue();
600 
601     /**
602      * Insert element into the queue, then possibly signal that the queue is not empty and block the producer
603      * on the element until permission to procede is given.
604      *
605      <p/>If the producer is to be blocked then the lock must be released first, otherwise no other process
606      * will be able to get access to the queue. Hence, unlock and block are always set together.
607      *
608      <p/>Call only when holding the global lock.
609      *
610      @param unlockAndBlock <tt>true</tt>If the global queue lock should be released and the producer should be blocked.
611      *
612      @return <tt>true</tt> if the operation succeeded, <tt>false</tt> otherwise. If the result is <tt>true</tt> this
613      *         method may not return straight away, but only after the producer is unblocked by having its data
614      *         consumed if the unlockAndBlock flag is set. In the false case the method will return straight away, no
615      *         matter what value the unlockAndBlock flag has, leaving the global lock on.
616      */
617     protected boolean insert(E x, boolean unlockAndBlock)
618     {
619         // Create a new record for the data item.
620         SynchRecordImpl<E> record = new SynchRecordImpl<E>(x);
621 
622         boolean result = buffer.offer(record);
623 
624         if (result)
625         {
626             count++;
627 
628             // Tell any waiting consumers that the queue is not empty.
629             notEmpty.signal();
630 
631             if (unlockAndBlock)
632             {
633                 // Allow other threads to read/write the queue.
634                 lock.unlock();
635 
636                 // Wait until a consumer takes this data item.
637                 record.waitForConsumer();
638             }
639 
640             return true;
641         }
642         else
643         {
644             return false;
645         }
646     }
647 
648     /**
649      * Extract element at current take position, advance, and signal.
650      *
651      <p/>Call only when holding lock.
652      */
653     protected SynchRecordImpl<E> extract(boolean unblock, boolean signal)
654     {
655         SynchRecordImpl<E> result = buffer.remove();
656         count--;
657 
658         if (signal)
659         {
660             notFull.signal();
661         }
662 
663         if (unblock)
664         {
665             result.releaseImmediately();
666         }
667 
668         return result;
669     }
670 
671     /**
672      * Get the capacity of the buffer. If the buffer has no maximum capacity then Integer.MAX_VALUE is returned.
673      *
674      <p/>Call only when holding lock.
675      *
676      @return The maximum capacity of the buffer.
677      */
678     protected int getBufferCapacity()
679     {
680         if (buffer instanceof Capacity)
681         {
682             return ((Capacitybuffer).getCapacity();
683         }
684         else
685         {
686             return Integer.MAX_VALUE;
687         }
688     }
689 
690     /**
691      * Return the head element from the buffer.
692      *
693      <p/>Call only when holding lock.
694      *
695      @return The head element from the buffer.
696      */
697     protected E peekAtBufferHead()
698     {
699         return buffer.peek().getElement();
700     }
701 
702     public class SynchRefImpl implements SynchRef
703     {
704         /** Holds the number of synch records associated with this reference. */
705         int numRecords;
706 
707         /** Holds a reference to the collection of synch records managed by this. */
708         Collection<SynchRecord<E>> records;
709 
710         public SynchRefImpl(int n, Collection<SynchRecord<E>> records)
711         {
712             this.numRecords = n;
713             this.records = records;
714         }
715 
716         public int getNumRecords()
717         {
718             return numRecords;
719         }
720 
721         /**
722          * Any producers that have had their data elements taken from the queue but have not been unblocked are unblocked
723          * when this method is called. The exception to this is producers that have had their data put back onto the queue
724          * by a consumer. Producers that have had exceptions for their data items registered by consumers will be unblocked
725          * but will not return from their put call normally, but with an exception instead.
726          */
727         public void unblockProducers()
728         {
729             log.debug("public void unblockProducers(): called");
730 
731             if (records != null)
732             {
733                 for (SynchRecord<E> record : records)
734                 {
735                     // This call takes account of items that have already been released, are to be requeued or are in
736                     // error.
737                     record.releaseImmediately();
738                 }
739             }
740 
741             records = null;
742         }
743     }
744 
745     /**
746      * A SynchRecordImpl is used by a {@link BatchSynchQueue} to pair together a producer with its data. This allows
747      * the producer of data to be identified so that it can be unblocked when its data is consumed or sent errors when
748      * its data cannot be consumed.
749      */
750     public class SynchRecordImpl<E> implements SynchRecord<E>
751     {
752         /** A boolean latch that determines when the producer for this data item will be allowed to continue. */
753         BooleanLatch latch = new BooleanLatch();
754 
755         /** The data element associated with this item. */
756         E element;
757 
758         /**
759          * Create a new synch record.
760          *
761          @param e The data element that the record encapsulates.
762          */
763         public SynchRecordImpl(E e)
764         {
765             // Keep the data element.
766             element = e;
767         }
768 
769         /**
770          * Waits until the producer is given permission to proceded by a consumer.
771          */
772         public void waitForConsumer()
773         {
774             latch.await();
775         }
776 
777         /**
778          * Gets the data element contained by this record.
779          *
780          @return The data element contained by this record.
781          */
782         public E getElement()
783         {
784             return element;
785         }
786 
787         /**
788          * Immediately releases the producer of this data record. Consumers can bring the synchronization time of
789          * producers to a minimum by using this method to release them at the earliest possible moment when batch
790          * consuming records from sychronized producers.
791          */
792         public void releaseImmediately()
793         {
794             // Check that the record has not already been released, is in error or is to be requeued.
795             latch.signal();
796 
797             // Propagate errors to the producer.
798 
799             // Requeue items to be requeued.
800         }
801 
802         /**
803          * Tells the synch queue to put this element back onto the queue instead of releasing its producer.
804          * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method or
805          * the {@link #releaseImmediately()} method.
806          *
807          <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
808          * element has already been unblocked.
809          */
810         public void reQueue()
811         {
812             throw new RuntimeException("Not implemented.");
813         }
814 
815         /**
816          * Tells the synch queue to raise an exception with this elements producer. The exception is not raised
817          * immediately but upon calling the {@link SynchRef#unblockProducers()} method or the
818          {@link #releaseImmediately()} method. The exception will be wrapped in a {@link SynchException} before it is
819          * raised on the producer.
820          *
821          <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used
822          * because the exception is to be passed onto a different thread.
823          *
824          <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
825          * element has already been unblocked.
826          *
827          @param e The exception to raise on the producer.
828          */
829         public void inError(Exception e)
830         {
831             throw new RuntimeException("Not implemented.");
832         }
833     }
834 }