AMQProtocolSessionMBean.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 /*
022  *
023  * Copyright (c) 2006 The Apache Software Foundation
024  *
025  * Licensed under the Apache License, Version 2.0 (the "License");
026  * you may not use this file except in compliance with the License.
027  * You may obtain a copy of the License at
028  *
029  *    http://www.apache.org/licenses/LICENSE-2.0
030  *
031  * Unless required by applicable law or agreed to in writing, software
032  * distributed under the License is distributed on an "AS IS" BASIS,
033  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
034  * See the License for the specific language governing permissions and
035  * limitations under the License.
036  *
037  */
038 package org.apache.qpid.server.protocol;
039 
040 import java.security.Principal;
041 import java.util.Date;
042 import java.util.List;
043 
044 import javax.management.JMException;
045 import javax.management.MBeanException;
046 import javax.management.MBeanNotificationInfo;
047 import javax.management.NotCompliantMBeanException;
048 import javax.management.Notification;
049 import javax.management.monitor.MonitorNotification;
050 import javax.management.openmbean.CompositeData;
051 import javax.management.openmbean.CompositeDataSupport;
052 import javax.management.openmbean.CompositeType;
053 import javax.management.openmbean.OpenDataException;
054 import javax.management.openmbean.OpenType;
055 import javax.management.openmbean.SimpleType;
056 import javax.management.openmbean.TabularData;
057 import javax.management.openmbean.TabularDataSupport;
058 import javax.management.openmbean.TabularType;
059 
060 import org.apache.qpid.AMQException;
061 import org.apache.qpid.framing.AMQFrame;
062 import org.apache.qpid.framing.AMQShortString;
063 import org.apache.qpid.framing.ConnectionCloseBody;
064 import org.apache.qpid.framing.MethodRegistry;
065 import org.apache.qpid.protocol.AMQConstant;
066 import org.apache.qpid.server.AMQChannel;
067 import org.apache.qpid.server.management.AMQManagedObject;
068 import org.apache.qpid.server.management.MBeanConstructor;
069 import org.apache.qpid.server.management.MBeanDescription;
070 import org.apache.qpid.server.management.ManagedObject;
071 
072 /**
073  * This MBean class implements the management interface. In order to make more attributes, operations and notifications
074  * available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here.
075  */
076 @MBeanDescription("Management Bean for an AMQ Broker Connection")
077 public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
078 {
079     private AMQMinaProtocolSession _session = null;
080     private String _name = null;
081 
082     // openmbean data types for representing the channel attributes
083     private static final String[] _channelAtttibuteNames =
084         "Channel Id""Transactional""Default Queue""Unacknowledged Message Count" };
085     private static final String[] _indexNames = _channelAtttibuteNames[0] };
086     private static final OpenType[] _channelAttributeTypes =
087         SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER };
088     private static CompositeType _channelType = null// represents the data type for channel data
089     private static TabularType _channelsType = null// Data type for list of channels type
090     private static final AMQShortString BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION =
091         new AMQShortString("Broker Management Console has closed the connection.");
092 
093     @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
094     public AMQProtocolSessionMBean(AMQMinaProtocolSession sessionthrows NotCompliantMBeanException, OpenDataException
095     {
096         super(ManagedConnection.class, ManagedConnection.TYPE);
097         _session = session;
098         String remote = getRemoteAddress();
099         remote = "anonymous".equals(remote(remote + hashCode()) : remote;
100         _name = jmxEncode(new StringBuffer(remote)0).toString();
101         init();
102     }
103 
104     static
105     {
106         try
107         {
108             init();
109         }
110         catch (JMException ex)
111         {
112             // This is not expected to ever occur.
113             throw new RuntimeException("Got JMException in static initializer.", ex);
114         }
115     }
116 
117     /**
118      * initialises the openmbean data types
119      */
120     private static void init() throws OpenDataException
121     {
122         _channelType =
123             new CompositeType("Channel""Channel Details", _channelAtttibuteNames, _channelAtttibuteNames,
124                 _channelAttributeTypes);
125         _channelsType = new TabularType("Channels""Channels", _channelType, _indexNames);
126     }
127 
128     public String getClientId()
129     {
130         return (_session.getContextKey() == nullnull : _session.getContextKey().toString();
131     }
132 
133     public String getAuthorizedId()
134     {
135         return (_session.getAuthorizedID() != null ? _session.getAuthorizedID().getName() null;
136     }
137 
138     public String getVersion()
139     {
140         return (_session.getClientVersion() == nullnull : _session.getClientVersion().toString();
141     }
142 
143     public Date getLastIoTime()
144     {
145         return new Date(_session.getIOSession().getLastIoTime());
146     }
147 
148     public String getRemoteAddress()
149     {
150         return _session.getIOSession().getRemoteAddress().toString();
151     }
152 
153     public ManagedObject getParentObject()
154     {
155         return _session.getVirtualHost().getManagedObject();
156     }
157 
158     public Long getWrittenBytes()
159     {
160         return _session.getIOSession().getWrittenBytes();
161     }
162 
163     public Long getReadBytes()
164     {
165         return _session.getIOSession().getReadBytes();
166     }
167 
168     public Long getMaximumNumberOfChannels()
169     {
170         return _session.getMaximumNumberOfChannels();
171     }
172 
173     public void setMaximumNumberOfChannels(Long value)
174     {
175         _session.setMaximumNumberOfChannels(value);
176     }
177 
178     public String getObjectInstanceName()
179     {
180         return _name;
181     }
182 
183     /**
184      * commits transactions for a transactional channel
185      *
186      @param channelId
187      @throws JMException if channel with given id doesn't exist or if commit fails
188      */
189     public void commitTransactions(int channelIdthrows JMException
190     {
191         try
192         {
193             AMQChannel channel = _session.getChannel(channelId);
194             if (channel == null)
195             {
196                 throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
197             }
198 
199             _session.commitTransactions(channel);
200         }
201         catch (AMQException ex)
202         {
203             throw new MBeanException(ex, ex.toString());
204         }
205     }
206 
207     /**
208      * rollsback the transactions for a transactional channel
209      *
210      @param channelId
211      @throws JMException if channel with given id doesn't exist or if rollback fails
212      */
213     public void rollbackTransactions(int channelIdthrows JMException
214     {
215         try
216         {
217             AMQChannel channel = _session.getChannel(channelId);
218             if (channel == null)
219             {
220                 throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
221             }
222 
223             _session.rollbackTransactions(channel);
224         }
225         catch (AMQException ex)
226         {
227             throw new MBeanException(ex, ex.toString());
228         }
229     }
230 
231     /**
232      * Creates the list of channels in tabular form from the _channelMap.
233      *
234      @return list of channels in tabular form.
235      @throws OpenDataException
236      */
237     public TabularData channels() throws OpenDataException
238     {
239         TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
240         List<AMQChannel> list = _session.getChannels();
241 
242         for (AMQChannel channel : list)
243         {
244             Object[] itemValues =
245                 {
246                     channel.getChannelId(), channel.isTransactional(),
247                     (channel.getDefaultQueue() != null? channel.getDefaultQueue().getName().asString() : null,
248                     channel.getUnacknowledgedMessageMap().size()
249                 };
250 
251             CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
252             channelsList.put(channelData);
253         }
254 
255         return channelsList;
256     }
257 
258     /**
259      * closes the connection. The administrator can use this management operation to close connection to free up
260      * resources.
261      @throws JMException
262      */
263     public void closeConnection() throws JMException
264     {
265 
266         MethodRegistry methodRegistry = _session.getMethodRegistry();
267         ConnectionCloseBody responseBody =
268                 methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
269                                                          // replyCode
270                                                          BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION,
271                                                          // replyText,
272                                                          0,
273                                                          0);
274 
275         _session.writeFrame(responseBody.generateFrame(0));
276 
277         try
278         {
279             _session.closeSession();
280         }
281         catch (AMQException ex)
282         {
283             throw new MBeanException(ex, ex.toString());
284         }
285     }
286 
287     @Override
288     public MBeanNotificationInfo[] getNotificationInfo()
289     {
290         String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
291         String name = MonitorNotification.class.getName();
292         String description = "Channel count has reached threshold value";
293         MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
294 
295         return new MBeanNotificationInfo[] { info1 };
296     }
297 
298     public void notifyClients(String notificationMsg)
299     {
300         Notification n =
301             new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
302                 System.currentTimeMillis(), notificationMsg);
303         _broadcaster.sendNotification(n);
304     }
305 
306 // End of MBean class