FlowControllingBlockingQueue.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client.util;
022 
023 import java.util.Iterator;
024 import java.util.Queue;
025 import java.util.concurrent.BlockingQueue;
026 import java.util.concurrent.LinkedBlockingQueue;
027 import java.util.concurrent.ConcurrentLinkedQueue;
028 
029 /**
030  * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
031  * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
032  * caller is not obliged to react to the events. <p/> This implementation is <b>only</b> safe where we have a single
033  * thread adding items and a single (different) thread removing items.
034  *
035  * @todo Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted.
036  */
037 public class FlowControllingBlockingQueue
038 {
039     /** This queue is bounded and is used to store messages before being dispatched to the consumer */
040     private final Queue _queue = new ConcurrentLinkedQueue();
041 
042     private final int _flowControlHighThreshold;
043     private final int _flowControlLowThreshold;
044 
045     private final ThresholdListener _listener;
046 
047     /** We require a separate count so we can track whether we have reached the threshold */
048     private int _count;
049 
050     public boolean isEmpty()
051     {
052         return _queue.isEmpty();
053     }
054 
055     public interface ThresholdListener
056     {
057         void aboveThreshold(int currentValue);
058 
059         void underThreshold(int currentValue);
060     }
061 
062     public FlowControllingBlockingQueue(int threshold, ThresholdListener listener)
063     {
064         this(threshold, threshold, listener);
065     }
066 
067     public FlowControllingBlockingQueue(int highThreshold, int lowThreshold, ThresholdListener listener)
068     {
069         _flowControlHighThreshold = highThreshold;
070         _flowControlLowThreshold = lowThreshold;
071         _listener = listener;
072     }
073 
074     public Object take() throws InterruptedException
075     {
076         Object o = _queue.poll();
077         if(o == null)
078         {
079             synchronized(this)
080             {
081                 while((o = _queue.poll())==null)
082                 {
083                     wait();
084                 }
085             }
086         }
087         if (_listener != null)
088         {
089             synchronized (_listener)
090             {
091                 if (_count-- == _flowControlLowThreshold)
092                 {
093                     _listener.underThreshold(_count);
094                 }
095             }
096         }
097 
098         return o;
099     }
100 
101     public void add(Object o)
102     {
103         synchronized(this)
104         {
105             _queue.add(o);
106 
107             notifyAll();
108         }
109         if (_listener != null)
110         {
111             synchronized (_listener)
112             {
113                 if (++_count == _flowControlHighThreshold)
114                 {
115                     _listener.aboveThreshold(_count);
116                 }
117             }
118         }
119     }
120 
121     public Iterator iterator()
122     {
123         return _queue.iterator();
124     }
125 }