001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client.protocol;
022
023 import org.apache.mina.common.IoFilterAdapter;
024 import org.apache.mina.common.IoSession;
025
026 import org.slf4j.Logger;
027 import org.slf4j.LoggerFactory;
028
029 /**
030 * A MINA filter that monitors the numbers of messages pending to be sent by MINA. It outputs a message
031 * when a threshold has been exceeded, and has a frequency configuration so that messages are not output
032 * too often.
033 *
034 */
035 public class ProtocolBufferMonitorFilter extends IoFilterAdapter
036 {
037 private static final Logger _logger = LoggerFactory.getLogger(ProtocolBufferMonitorFilter.class);
038
039 public static long DEFAULT_FREQUENCY = 5000;
040
041 public static int DEFAULT_THRESHOLD = 3000;
042
043 private int _bufferedMessages = 0;
044
045 private int _threshold;
046
047 private long _lastMessageOutputTime;
048
049 private long _outputFrequencyInMillis;
050
051 public ProtocolBufferMonitorFilter()
052 {
053 _threshold = DEFAULT_THRESHOLD;
054 _outputFrequencyInMillis = DEFAULT_FREQUENCY;
055 }
056
057 public ProtocolBufferMonitorFilter(int threshold, long frequency)
058 {
059 _threshold = threshold;
060 _outputFrequencyInMillis = frequency;
061 }
062
063 public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception
064 {
065 _bufferedMessages++;
066 if (_bufferedMessages > _threshold)
067 {
068 long now = System.currentTimeMillis();
069 if ((now - _lastMessageOutputTime) > _outputFrequencyInMillis)
070 {
071 _logger.warn("Protocol message buffer exceeded threshold of " + _threshold + ". Current backlog: "
072 + _bufferedMessages);
073 _lastMessageOutputTime = now;
074 }
075 }
076
077 nextFilter.messageReceived(session, message);
078 }
079
080 public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception
081 {
082 _bufferedMessages--;
083 nextFilter.messageSent(session, message);
084 }
085
086 public int getBufferedMessages()
087 {
088 return _bufferedMessages;
089 }
090
091 public int getThreshold()
092 {
093 return _threshold;
094 }
095
096 public void setThreshold(int threshold)
097 {
098 _threshold = threshold;
099 }
100
101 public long getOutputFrequencyInMillis()
102 {
103 return _outputFrequencyInMillis;
104 }
105
106 public void setOutputFrequencyInMillis(long outputFrequencyInMillis)
107 {
108 _outputFrequencyInMillis = outputFrequencyInMillis;
109 }
110
111 public long getLastMessageOutputTime()
112 {
113 return _lastMessageOutputTime;
114 }
115 }
|