AMQQueueMBean.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.queue;
022 
023 import org.apache.log4j.Logger;
024 
025 import org.apache.mina.common.ByteBuffer;
026 
027 import org.apache.qpid.AMQException;
028 import org.apache.qpid.framing.AMQShortString;
029 import org.apache.qpid.framing.BasicContentHeaderProperties;
030 import org.apache.qpid.framing.CommonContentHeaderProperties;
031 import org.apache.qpid.framing.ContentHeaderBody;
032 import org.apache.qpid.framing.abstraction.ContentChunk;
033 import org.apache.qpid.server.management.AMQManagedObject;
034 import org.apache.qpid.server.management.MBeanConstructor;
035 import org.apache.qpid.server.management.MBeanDescription;
036 import org.apache.qpid.server.management.ManagedObject;
037 import org.apache.qpid.server.store.StoreContext;
038 
039 import javax.management.JMException;
040 import javax.management.MBeanException;
041 import javax.management.MBeanNotificationInfo;
042 import javax.management.Notification;
043 import javax.management.OperationsException;
044 import javax.management.monitor.MonitorNotification;
045 import javax.management.openmbean.ArrayType;
046 import javax.management.openmbean.CompositeData;
047 import javax.management.openmbean.CompositeDataSupport;
048 import javax.management.openmbean.CompositeType;
049 import javax.management.openmbean.OpenDataException;
050 import javax.management.openmbean.OpenType;
051 import javax.management.openmbean.SimpleType;
052 import javax.management.openmbean.TabularData;
053 import javax.management.openmbean.TabularDataSupport;
054 import javax.management.openmbean.TabularType;
055 
056 import java.text.SimpleDateFormat;
057 import java.util.*;
058 
059 /**
060  * AMQQueueMBean is the management bean for an {@link AMQQueue}.
061  *
062  <p/><tablse id="crc"><caption>CRC Caption</caption>
063  <tr><th> Responsibilities <th> Collaborations
064  </table>
065  */
066 @MBeanDescription("Management Interface for AMQQueue")
067 public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
068 {
069     /** Used for debugging purposes. */
070     private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
071 
072     private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
073 
074     /**
075      * Since the MBean is not associated with a real channel we can safely create our own store context
076      * for use in the few methods that require one.
077      */
078     private StoreContext _storeContext = new StoreContext();
079 
080     private AMQQueue _queue = null;
081     private String _queueName = null;
082     // OpenMBean data types for viewMessages method
083     private static final String[] _msgAttributeNames = "AMQ MessageId""Header""Size(bytes)""Redelivered" };
084     private static String[] _msgAttributeIndex = _msgAttributeNames[0] };
085     private static OpenType[] _msgAttributeTypes = new OpenType[4]// AMQ message attribute types.
086     private static CompositeType _messageDataType = null// Composite type for representing AMQ Message data.
087     private static TabularType _messagelistDataType = null// Datatype for representing AMQ messages list.
088 
089     // OpenMBean data types for viewMessageContent method
090     private static CompositeType _msgContentType = null;
091     private static final String[] _msgContentAttributes = "AMQ MessageId""MimeType""Encoding""Content" };
092     private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
093 
094     private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
095     private Notification _lastNotification = null;
096 
097 
098 
099 
100     @MBeanConstructor("Creates an MBean exposing an AMQQueue")
101     public AMQQueueMBean(AMQQueue queuethrows JMException
102     {
103         super(ManagedQueue.class, ManagedQueue.TYPE);
104         _queue = queue;
105         _queueName = jmxEncode(new StringBuffer(queue.getName())0).toString();
106     }
107 
108     public ManagedObject getParentObject()
109     {
110         return _queue.getVirtualHost().getManagedObject();
111     }
112 
113     static
114     {
115         try
116         {
117             init();
118         }
119         catch (JMException ex)
120         {
121             // This is not expected to ever occur.
122             throw new RuntimeException("Got JMException in static initializer.", ex);
123         }
124     }
125 
126     /**
127      * initialises the openmbean data types
128      */
129     private static void init() throws OpenDataException
130     {
131         _msgContentAttributeTypes[0= SimpleType.LONG; // For message id
132         _msgContentAttributeTypes[1= SimpleType.STRING; // For MimeType
133         _msgContentAttributeTypes[2= SimpleType.STRING; // For Encoding
134         _msgContentAttributeTypes[3new ArrayType(1, SimpleType.BYTE)// For message content
135         _msgContentType =
136             new CompositeType("Message Content""AMQ Message Content", _msgContentAttributes, _msgContentAttributes,
137                 _msgContentAttributeTypes);
138 
139         _msgAttributeTypes[0= SimpleType.LONG; // For message id
140         _msgAttributeTypes[1new ArrayType(1, SimpleType.STRING)// For header attributes
141         _msgAttributeTypes[2= SimpleType.LONG; // For size
142         _msgAttributeTypes[3= SimpleType.BOOLEAN; // For redelivered
143 
144         _messageDataType =
145             new CompositeType("Message""AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
146         _messagelistDataType = new TabularType("Messages""List of messages", _messageDataType, _msgAttributeIndex);
147     }
148 
149     public String getObjectInstanceName()
150     {
151         return _queueName;
152     }
153 
154     public String getName()
155     {
156         return _queueName;
157     }
158 
159     public boolean isDurable()
160     {
161         return _queue.isDurable();
162     }
163 
164     public String getOwner()
165     {
166         return String.valueOf(_queue.getOwner());
167     }
168 
169     public boolean isAutoDelete()
170     {
171         return _queue.isAutoDelete();
172     }
173 
174     public Integer getMessageCount()
175     {
176         return _queue.getMessageCount();
177     }
178 
179     public Long getMaximumMessageSize()
180     {
181         return _queue.getMaximumMessageSize();
182     }
183 
184     public Long getMaximumMessageAge()
185     {
186         return _queue.getMaximumMessageAge();
187     }
188 
189     public void setMaximumMessageAge(Long maximumMessageAge)
190     {
191         _queue.setMaximumMessageAge(maximumMessageAge);
192     }
193 
194     public void setMaximumMessageSize(Long value)
195     {
196         _queue.setMaximumMessageSize(value);
197     }
198 
199     public Integer getConsumerCount()
200     {
201         return _queue.getConsumerCount();
202     }
203 
204     public Integer getActiveConsumerCount()
205     {
206         return _queue.getActiveConsumerCount();
207     }
208 
209     public Long getReceivedMessageCount()
210     {
211         return _queue.getReceivedMessageCount();
212     }
213 
214     public Long getMaximumMessageCount()
215     {
216         return _queue.getMaximumMessageCount();
217     }
218 
219     public void setMaximumMessageCount(Long value)
220     {
221         _queue.setMaximumMessageCount(value);
222     }
223 
224     public Long getMaximumQueueDepth()
225     {
226         long queueDepthInBytes = _queue.getMaximumQueueDepth();
227 
228         return queueDepthInBytes >> 10;
229     }
230 
231     public void setMaximumQueueDepth(Long value)
232     {
233         _queue.setMaximumQueueDepth(value);
234     }
235 
236     /**
237      * returns the size of messages(KB) in the queue.
238      */
239     public Long getQueueDepth() throws JMException
240     {
241         long queueBytesSize = _queue.getQueueDepth();
242 
243         return queueBytesSize >> 10;
244     }
245 
246     /**
247      * Checks if there is any notification to be send to the listeners
248      */
249     public void checkForNotification(AMQMessage msgthrows AMQException
250     {
251 
252         final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
253 
254         if(!notificationChecks.isEmpty())
255         {
256             final long currentTime = System.currentTimeMillis();
257             final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
258 
259             for (NotificationCheck check : notificationChecks)
260             {
261                 if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
262                 {
263                     if (check.notifyIfNecessary(msg, _queue, this))
264                     {
265                         _lastNotificationTimes[check.ordinal()] = currentTime;
266                     }
267                 }
268             }
269         }
270 
271     }
272 
273     /**
274      * Sends the notification to the listeners
275      */
276     public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
277     {
278         // important : add log to the log file - monitoring tools may be looking for this
279         _logger.info(notification.name() " On Queue " + queue.getName() " - " + notificationMsg);
280         notificationMsg = notification.name() " " + notificationMsg;
281 
282         _lastNotification =
283             new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
284                 System.currentTimeMillis(), notificationMsg);
285 
286         _broadcaster.sendNotification(_lastNotification);
287     }
288 
289     public Notification getLastNotification()
290     {
291         return _lastNotification;
292     }
293 
294     /**
295      @see AMQQueue#deleteMessageFromTop
296      */
297     public void deleteMessageFromTop() throws JMException
298     {
299         try
300         {
301             _queue.deleteMessageFromTop(_storeContext);
302         }
303         catch (AMQException ex)
304         {
305             throw new MBeanException(ex, ex.toString());
306         }
307     }
308 
309     /**
310      @see AMQQueue#clearQueue
311      */
312     public void clearQueue() throws JMException
313     {
314         try
315         {
316             _queue.clearQueue(_storeContext);
317         }
318         catch (AMQException ex)
319         {
320             throw new MBeanException(ex, ex.toString());
321         }
322     }
323 
324     /**
325      * returns message content as byte array and related attributes for the given message id.
326      */
327     public CompositeData viewMessageContent(long msgIdthrows JMException
328     {
329         QueueEntry entry = _queue.getMessageOnTheQueue(msgId);
330 
331         if (entry == null)
332         {
333             throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
334         }
335 
336         AMQMessage msg = entry.getMessage();
337         // get message content
338         Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
339         List<Byte> msgContent = new ArrayList<Byte>();
340         while (cBodies.hasNext())
341         {
342             ContentChunk body = cBodies.next();
343             if (body.getSize() != 0)
344             {
345                 if (body.getSize() != 0)
346                 {
347                     ByteBuffer slice = body.getData().slice();
348                     for (int j = 0; j < slice.limit(); j++)
349                     {
350                         msgContent.add(slice.get());
351                     }
352                 }
353             }
354         }
355 
356         // Create header attributes list
357         CommonContentHeaderProperties headerProperties =
358             (CommonContentHeaderPropertiesmsg.getContentHeaderBody().properties;
359         String mimeType = null, encoding = null;
360         if (headerProperties != null)
361         {
362             AMQShortString mimeTypeShortSting = headerProperties.getContentType();
363             mimeType = (mimeTypeShortSting == nullnull : mimeTypeShortSting.toString();
364             encoding = (headerProperties.getEncoding() == null"" : headerProperties.getEncoding().toString();
365         }
366 
367         Object[] itemValues = msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
368 
369         return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
370     }
371 
372     /**
373      * Returns the header contents of the messages stored in this queue in tabular form.
374      */
375     public TabularData viewMessages(int beginIndex, int endIndexthrows JMException
376     {
377         if ((beginIndex > endIndex|| (beginIndex < 1))
378         {
379             throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex
380                 "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
381         }
382 
383         List<QueueEntry> list = _queue.getMessagesOnTheQueue();
384         TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
385 
386         // Create the tabular list of message header contents
387         for (int i = beginIndex; (i <= endIndex&& (i <= list.size()); i++)
388         {
389             QueueEntry queueEntry = list.get(i - 1);
390             AMQMessage msg = queueEntry.getMessage();
391             ContentHeaderBody headerBody = msg.getContentHeaderBody();
392             // Create header attributes list
393             String[] headerAttributes = getMessageHeaderProperties(headerBody);
394             Object[] itemValues = msg.getMessageId(), headerAttributes, headerBody.bodySize,
395                                     queueEntry.isRedelivered() };
396             CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
397             _messageList.put(messageData);
398         }
399 
400         return _messageList;
401     }
402 
403     private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
404     {
405         List<String> list = new ArrayList<String>();
406         BasicContentHeaderProperties headerProperties = (BasicContentHeaderPropertiesheaderBody.properties;
407         list.add("reply-to = " + headerProperties.getReplyToAsString());
408         list.add("propertyFlags = " + headerProperties.getPropertyFlags());
409         list.add("ApplicationID = " + headerProperties.getAppIdAsString());
410         list.add("ClusterID = " + headerProperties.getClusterIdAsString());
411         list.add("UserId = " + headerProperties.getUserIdAsString());
412         list.add("JMSMessageID = " + headerProperties.getMessageIdAsString());
413         list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString());
414 
415         int delMode = headerProperties.getDeliveryMode();
416         list.add("JMSDeliveryMode = " ((delMode == 1"Persistent" "Non_Persistent"));
417 
418         list.add("JMSPriority = " + headerProperties.getPriority());
419         list.add("JMSType = " + headerProperties.getType());
420 
421         long longDate = headerProperties.getExpiration();
422         String strDate = (longDate != 0? _dateFormat.format(new Date(longDate)) null;
423         list.add("JMSExpiration = " + strDate);
424 
425         longDate = headerProperties.getTimestamp();
426         strDate = (longDate != 0? _dateFormat.format(new Date(longDate)) null;
427         list.add("JMSTimestamp = " + strDate);
428 
429         return list.toArray(new String[list.size()]);
430     }
431 
432     /**
433      @see ManagedQueue#moveMessages
434      @param fromMessageId
435      @param toMessageId
436      @param toQueueName
437      @throws JMException
438      */
439     public void moveMessages(long fromMessageId, long toMessageId, String toQueueNamethrows JMException
440     {
441         if ((fromMessageId > toMessageId|| (fromMessageId < 1))
442         {
443             throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");
444         }
445 
446         _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
447     }
448 
449     /**
450      * returns Notifications sent by this MBean.
451      */
452     @Override
453     public MBeanNotificationInfo[] getNotificationInfo()
454     {
455         String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
456         String name = MonitorNotification.class.getName();
457         String description = "Either Message count or Queue depth or Message size has reached threshold high value";
458         MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
459 
460         return new MBeanNotificationInfo[] { info1 };
461     }
462 
463 // End of AMQQueueMBean class