001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client.util;
022
023 import java.util.Iterator;
024 import java.util.Queue;
025 import java.util.concurrent.BlockingQueue;
026 import java.util.concurrent.LinkedBlockingQueue;
027 import java.util.concurrent.ConcurrentLinkedQueue;
028
029 /**
030 * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
031 * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
032 * caller is not obliged to react to the events. <p/> This implementation is <b>only</b> safe where we have a single
033 * thread adding items and a single (different) thread removing items.
034 *
035 * @todo Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted.
036 */
037 public class FlowControllingBlockingQueue
038 {
039 /** This queue is bounded and is used to store messages before being dispatched to the consumer */
040 private final Queue _queue = new ConcurrentLinkedQueue();
041
042 private final int _flowControlHighThreshold;
043 private final int _flowControlLowThreshold;
044
045 private final ThresholdListener _listener;
046
047 /** We require a separate count so we can track whether we have reached the threshold */
048 private int _count;
049
050 public boolean isEmpty()
051 {
052 return _queue.isEmpty();
053 }
054
055 public interface ThresholdListener
056 {
057 void aboveThreshold(int currentValue);
058
059 void underThreshold(int currentValue);
060 }
061
062 public FlowControllingBlockingQueue(int threshold, ThresholdListener listener)
063 {
064 this(threshold, threshold, listener);
065 }
066
067 public FlowControllingBlockingQueue(int highThreshold, int lowThreshold, ThresholdListener listener)
068 {
069 _flowControlHighThreshold = highThreshold;
070 _flowControlLowThreshold = lowThreshold;
071 _listener = listener;
072 }
073
074 public Object take() throws InterruptedException
075 {
076 Object o = _queue.poll();
077 if(o == null)
078 {
079 synchronized(this)
080 {
081 while((o = _queue.poll())==null)
082 {
083 wait();
084 }
085 }
086 }
087 if (_listener != null)
088 {
089 synchronized (_listener)
090 {
091 if (_count-- == _flowControlLowThreshold)
092 {
093 _listener.underThreshold(_count);
094 }
095 }
096 }
097
098 return o;
099 }
100
101 public void add(Object o)
102 {
103 synchronized(this)
104 {
105 _queue.add(o);
106
107 notifyAll();
108 }
109 if (_listener != null)
110 {
111 synchronized (_listener)
112 {
113 if (++_count == _flowControlHighThreshold)
114 {
115 _listener.aboveThreshold(_count);
116 }
117 }
118 }
119 }
120
121 public Iterator iterator()
122 {
123 return _queue.iterator();
124 }
125 }
|