001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.queue;
022
023 import org.apache.qpid.AMQException;
024
025 public enum NotificationCheck
026 {
027
028 MESSAGE_COUNT_ALERT
029 {
030 boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
031 {
032 int msgCount;
033 final long maximumMessageCount = queue.getMaximumMessageCount();
034 if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
035 {
036 listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
037 return true;
038 }
039 return false;
040 }
041 },
042 MESSAGE_SIZE_ALERT(true)
043 {
044 boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
045 {
046 final long maximumMessageSize = queue.getMaximumMessageSize();
047 if(maximumMessageSize != 0)
048 {
049 // Check for threshold message size
050 long messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
051
052 if (messageSize >= maximumMessageSize)
053 {
054 listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold (" +
055 maximumMessageSize + ") breached. [Message ID=" +
056 (msg == null ? "null" : msg.getMessageId()) + "]");
057 return true;
058 }
059 }
060 return false;
061 }
062
063 },
064 QUEUE_DEPTH_ALERT
065 {
066 boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
067 {
068 // Check for threshold queue depth in bytes
069 final long maximumQueueDepth = queue.getMaximumQueueDepth();
070
071 if(maximumQueueDepth != 0)
072 {
073 final long queueDepth = queue.getQueueDepth();
074
075 if (queueDepth >= maximumQueueDepth)
076 {
077 listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
078 return true;
079 }
080 }
081 return false;
082 }
083
084 },
085 MESSAGE_AGE_ALERT
086 {
087 boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
088 {
089
090 final long maxMessageAge = queue.getMaximumMessageAge();
091 if(maxMessageAge != 0)
092 {
093 final long currentTime = System.currentTimeMillis();
094 final long thresholdTime = currentTime - maxMessageAge;
095 final long firstArrivalTime = queue.getOldestMessageArrivalTime();
096
097 if(firstArrivalTime < thresholdTime)
098 {
099 long oldestAge = currentTime - firstArrivalTime;
100 listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
101
102 return true;
103 }
104 }
105 return false;
106
107 }
108
109 }
110 ;
111
112 private final boolean _messageSpecific;
113
114 NotificationCheck()
115 {
116 this(false);
117 }
118
119 NotificationCheck(boolean messageSpecific)
120 {
121 _messageSpecific = messageSpecific;
122 }
123
124 public boolean isMessageSpecific()
125 {
126 return _messageSpecific;
127 }
128
129 abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
130
131 }
|