001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.queue;
022
023 import org.apache.log4j.Logger;
024
025 import org.apache.mina.common.ByteBuffer;
026
027 import org.apache.qpid.AMQException;
028 import org.apache.qpid.framing.AMQShortString;
029 import org.apache.qpid.framing.BasicContentHeaderProperties;
030 import org.apache.qpid.framing.CommonContentHeaderProperties;
031 import org.apache.qpid.framing.ContentHeaderBody;
032 import org.apache.qpid.framing.abstraction.ContentChunk;
033 import org.apache.qpid.server.management.AMQManagedObject;
034 import org.apache.qpid.server.management.MBeanConstructor;
035 import org.apache.qpid.server.management.MBeanDescription;
036 import org.apache.qpid.server.management.ManagedObject;
037 import org.apache.qpid.server.store.StoreContext;
038
039 import javax.management.JMException;
040 import javax.management.MBeanException;
041 import javax.management.MBeanNotificationInfo;
042 import javax.management.Notification;
043 import javax.management.OperationsException;
044 import javax.management.monitor.MonitorNotification;
045 import javax.management.openmbean.ArrayType;
046 import javax.management.openmbean.CompositeData;
047 import javax.management.openmbean.CompositeDataSupport;
048 import javax.management.openmbean.CompositeType;
049 import javax.management.openmbean.OpenDataException;
050 import javax.management.openmbean.OpenType;
051 import javax.management.openmbean.SimpleType;
052 import javax.management.openmbean.TabularData;
053 import javax.management.openmbean.TabularDataSupport;
054 import javax.management.openmbean.TabularType;
055
056 import java.text.SimpleDateFormat;
057 import java.util.*;
058
059 /**
060 * AMQQueueMBean is the management bean for an {@link AMQQueue}.
061 *
062 * <p/><tablse id="crc"><caption>CRC Caption</caption>
063 * <tr><th> Responsibilities <th> Collaborations
064 * </table>
065 */
066 @MBeanDescription("Management Interface for AMQQueue")
067 public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
068 {
069 /** Used for debugging purposes. */
070 private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
071
072 private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
073
074 /**
075 * Since the MBean is not associated with a real channel we can safely create our own store context
076 * for use in the few methods that require one.
077 */
078 private StoreContext _storeContext = new StoreContext();
079
080 private AMQQueue _queue = null;
081 private String _queueName = null;
082 // OpenMBean data types for viewMessages method
083 private static final String[] _msgAttributeNames = { "AMQ MessageId", "Header", "Size(bytes)", "Redelivered" };
084 private static String[] _msgAttributeIndex = { _msgAttributeNames[0] };
085 private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
086 private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
087 private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
088
089 // OpenMBean data types for viewMessageContent method
090 private static CompositeType _msgContentType = null;
091 private static final String[] _msgContentAttributes = { "AMQ MessageId", "MimeType", "Encoding", "Content" };
092 private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
093
094 private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
095 private Notification _lastNotification = null;
096
097
098
099
100 @MBeanConstructor("Creates an MBean exposing an AMQQueue")
101 public AMQQueueMBean(AMQQueue queue) throws JMException
102 {
103 super(ManagedQueue.class, ManagedQueue.TYPE);
104 _queue = queue;
105 _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
106 }
107
108 public ManagedObject getParentObject()
109 {
110 return _queue.getVirtualHost().getManagedObject();
111 }
112
113 static
114 {
115 try
116 {
117 init();
118 }
119 catch (JMException ex)
120 {
121 // This is not expected to ever occur.
122 throw new RuntimeException("Got JMException in static initializer.", ex);
123 }
124 }
125
126 /**
127 * initialises the openmbean data types
128 */
129 private static void init() throws OpenDataException
130 {
131 _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
132 _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
133 _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
134 _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
135 _msgContentType =
136 new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, _msgContentAttributes,
137 _msgContentAttributeTypes);
138
139 _msgAttributeTypes[0] = SimpleType.LONG; // For message id
140 _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
141 _msgAttributeTypes[2] = SimpleType.LONG; // For size
142 _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
143
144 _messageDataType =
145 new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
146 _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
147 }
148
149 public String getObjectInstanceName()
150 {
151 return _queueName;
152 }
153
154 public String getName()
155 {
156 return _queueName;
157 }
158
159 public boolean isDurable()
160 {
161 return _queue.isDurable();
162 }
163
164 public String getOwner()
165 {
166 return String.valueOf(_queue.getOwner());
167 }
168
169 public boolean isAutoDelete()
170 {
171 return _queue.isAutoDelete();
172 }
173
174 public Integer getMessageCount()
175 {
176 return _queue.getMessageCount();
177 }
178
179 public Long getMaximumMessageSize()
180 {
181 return _queue.getMaximumMessageSize();
182 }
183
184 public Long getMaximumMessageAge()
185 {
186 return _queue.getMaximumMessageAge();
187 }
188
189 public void setMaximumMessageAge(Long maximumMessageAge)
190 {
191 _queue.setMaximumMessageAge(maximumMessageAge);
192 }
193
194 public void setMaximumMessageSize(Long value)
195 {
196 _queue.setMaximumMessageSize(value);
197 }
198
199 public Integer getConsumerCount()
200 {
201 return _queue.getConsumerCount();
202 }
203
204 public Integer getActiveConsumerCount()
205 {
206 return _queue.getActiveConsumerCount();
207 }
208
209 public Long getReceivedMessageCount()
210 {
211 return _queue.getReceivedMessageCount();
212 }
213
214 public Long getMaximumMessageCount()
215 {
216 return _queue.getMaximumMessageCount();
217 }
218
219 public void setMaximumMessageCount(Long value)
220 {
221 _queue.setMaximumMessageCount(value);
222 }
223
224 public Long getMaximumQueueDepth()
225 {
226 long queueDepthInBytes = _queue.getMaximumQueueDepth();
227
228 return queueDepthInBytes >> 10;
229 }
230
231 public void setMaximumQueueDepth(Long value)
232 {
233 _queue.setMaximumQueueDepth(value);
234 }
235
236 /**
237 * returns the size of messages(KB) in the queue.
238 */
239 public Long getQueueDepth() throws JMException
240 {
241 long queueBytesSize = _queue.getQueueDepth();
242
243 return queueBytesSize >> 10;
244 }
245
246 /**
247 * Checks if there is any notification to be send to the listeners
248 */
249 public void checkForNotification(AMQMessage msg) throws AMQException
250 {
251
252 final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
253
254 if(!notificationChecks.isEmpty())
255 {
256 final long currentTime = System.currentTimeMillis();
257 final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
258
259 for (NotificationCheck check : notificationChecks)
260 {
261 if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
262 {
263 if (check.notifyIfNecessary(msg, _queue, this))
264 {
265 _lastNotificationTimes[check.ordinal()] = currentTime;
266 }
267 }
268 }
269 }
270
271 }
272
273 /**
274 * Sends the notification to the listeners
275 */
276 public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
277 {
278 // important : add log to the log file - monitoring tools may be looking for this
279 _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
280 notificationMsg = notification.name() + " " + notificationMsg;
281
282 _lastNotification =
283 new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
284 System.currentTimeMillis(), notificationMsg);
285
286 _broadcaster.sendNotification(_lastNotification);
287 }
288
289 public Notification getLastNotification()
290 {
291 return _lastNotification;
292 }
293
294 /**
295 * @see AMQQueue#deleteMessageFromTop
296 */
297 public void deleteMessageFromTop() throws JMException
298 {
299 try
300 {
301 _queue.deleteMessageFromTop(_storeContext);
302 }
303 catch (AMQException ex)
304 {
305 throw new MBeanException(ex, ex.toString());
306 }
307 }
308
309 /**
310 * @see AMQQueue#clearQueue
311 */
312 public void clearQueue() throws JMException
313 {
314 try
315 {
316 _queue.clearQueue(_storeContext);
317 }
318 catch (AMQException ex)
319 {
320 throw new MBeanException(ex, ex.toString());
321 }
322 }
323
324 /**
325 * returns message content as byte array and related attributes for the given message id.
326 */
327 public CompositeData viewMessageContent(long msgId) throws JMException
328 {
329 QueueEntry entry = _queue.getMessageOnTheQueue(msgId);
330
331 if (entry == null)
332 {
333 throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
334 }
335
336 AMQMessage msg = entry.getMessage();
337 // get message content
338 Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
339 List<Byte> msgContent = new ArrayList<Byte>();
340 while (cBodies.hasNext())
341 {
342 ContentChunk body = cBodies.next();
343 if (body.getSize() != 0)
344 {
345 if (body.getSize() != 0)
346 {
347 ByteBuffer slice = body.getData().slice();
348 for (int j = 0; j < slice.limit(); j++)
349 {
350 msgContent.add(slice.get());
351 }
352 }
353 }
354 }
355
356 // Create header attributes list
357 CommonContentHeaderProperties headerProperties =
358 (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
359 String mimeType = null, encoding = null;
360 if (headerProperties != null)
361 {
362 AMQShortString mimeTypeShortSting = headerProperties.getContentType();
363 mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
364 encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
365 }
366
367 Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
368
369 return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
370 }
371
372 /**
373 * Returns the header contents of the messages stored in this queue in tabular form.
374 */
375 public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
376 {
377 if ((beginIndex > endIndex) || (beginIndex < 1))
378 {
379 throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex
380 + "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
381 }
382
383 List<QueueEntry> list = _queue.getMessagesOnTheQueue();
384 TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
385
386 // Create the tabular list of message header contents
387 for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
388 {
389 QueueEntry queueEntry = list.get(i - 1);
390 AMQMessage msg = queueEntry.getMessage();
391 ContentHeaderBody headerBody = msg.getContentHeaderBody();
392 // Create header attributes list
393 String[] headerAttributes = getMessageHeaderProperties(headerBody);
394 Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize,
395 queueEntry.isRedelivered() };
396 CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
397 _messageList.put(messageData);
398 }
399
400 return _messageList;
401 }
402
403 private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
404 {
405 List<String> list = new ArrayList<String>();
406 BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
407 list.add("reply-to = " + headerProperties.getReplyToAsString());
408 list.add("propertyFlags = " + headerProperties.getPropertyFlags());
409 list.add("ApplicationID = " + headerProperties.getAppIdAsString());
410 list.add("ClusterID = " + headerProperties.getClusterIdAsString());
411 list.add("UserId = " + headerProperties.getUserIdAsString());
412 list.add("JMSMessageID = " + headerProperties.getMessageIdAsString());
413 list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString());
414
415 int delMode = headerProperties.getDeliveryMode();
416 list.add("JMSDeliveryMode = " + ((delMode == 1) ? "Persistent" : "Non_Persistent"));
417
418 list.add("JMSPriority = " + headerProperties.getPriority());
419 list.add("JMSType = " + headerProperties.getType());
420
421 long longDate = headerProperties.getExpiration();
422 String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
423 list.add("JMSExpiration = " + strDate);
424
425 longDate = headerProperties.getTimestamp();
426 strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
427 list.add("JMSTimestamp = " + strDate);
428
429 return list.toArray(new String[list.size()]);
430 }
431
432 /**
433 * @see ManagedQueue#moveMessages
434 * @param fromMessageId
435 * @param toMessageId
436 * @param toQueueName
437 * @throws JMException
438 */
439 public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException
440 {
441 if ((fromMessageId > toMessageId) || (fromMessageId < 1))
442 {
443 throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");
444 }
445
446 _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
447 }
448
449 /**
450 * returns Notifications sent by this MBean.
451 */
452 @Override
453 public MBeanNotificationInfo[] getNotificationInfo()
454 {
455 String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
456 String name = MonitorNotification.class.getName();
457 String description = "Either Message count or Queue depth or Message size has reached threshold high value";
458 MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
459
460 return new MBeanNotificationInfo[] { info1 };
461 }
462
463 } // End of AMQQueueMBean class
|