ProtocolBufferMonitorFilter.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client.protocol;
022 
023 import org.apache.mina.common.IoFilterAdapter;
024 import org.apache.mina.common.IoSession;
025 
026 import org.slf4j.Logger;
027 import org.slf4j.LoggerFactory;
028 
029 /**
030  * A MINA filter that monitors the numbers of messages pending to be sent by MINA. It outputs a message
031  * when a threshold has been exceeded, and has a frequency configuration so that messages are not output
032  * too often.
033  *
034  */
035 public class ProtocolBufferMonitorFilter extends IoFilterAdapter
036 {
037     private static final Logger _logger = LoggerFactory.getLogger(ProtocolBufferMonitorFilter.class);
038 
039     public static long DEFAULT_FREQUENCY = 5000;
040 
041     public static int DEFAULT_THRESHOLD = 3000;
042 
043     private int _bufferedMessages = 0;
044 
045     private int _threshold;
046 
047     private long _lastMessageOutputTime;
048 
049     private long _outputFrequencyInMillis;
050 
051     public ProtocolBufferMonitorFilter()
052     {
053         _threshold = DEFAULT_THRESHOLD;
054         _outputFrequencyInMillis = DEFAULT_FREQUENCY;
055     }
056 
057     public ProtocolBufferMonitorFilter(int threshold, long frequency)
058     {
059         _threshold = threshold;
060         _outputFrequencyInMillis = frequency;
061     }
062 
063     public void messageReceived(NextFilter nextFilter, IoSession session, Object messagethrows Exception
064     {
065         _bufferedMessages++;
066         if (_bufferedMessages > _threshold)
067         {
068             long now = System.currentTimeMillis();
069             if ((now - _lastMessageOutputTime> _outputFrequencyInMillis)
070             {
071                 _logger.warn("Protocol message buffer exceeded threshold of " + _threshold + ". Current backlog: "
072                     + _bufferedMessages);
073                 _lastMessageOutputTime = now;
074             }
075         }
076 
077         nextFilter.messageReceived(session, message);
078     }
079 
080     public void messageSent(NextFilter nextFilter, IoSession session, Object messagethrows Exception
081     {
082         _bufferedMessages--;
083         nextFilter.messageSent(session, message);
084     }
085 
086     public int getBufferedMessages()
087     {
088         return _bufferedMessages;
089     }
090 
091     public int getThreshold()
092     {
093         return _threshold;
094     }
095 
096     public void setThreshold(int threshold)
097     {
098         _threshold = threshold;
099     }
100 
101     public long getOutputFrequencyInMillis()
102     {
103         return _outputFrequencyInMillis;
104     }
105 
106     public void setOutputFrequencyInMillis(long outputFrequencyInMillis)
107     {
108         _outputFrequencyInMillis = outputFrequencyInMillis;
109     }
110 
111     public long getLastMessageOutputTime()
112     {
113         return _lastMessageOutputTime;
114     }
115 }