001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 /*
022 *
023 * Copyright (c) 2006 The Apache Software Foundation
024 *
025 * Licensed under the Apache License, Version 2.0 (the "License");
026 * you may not use this file except in compliance with the License.
027 * You may obtain a copy of the License at
028 *
029 * http://www.apache.org/licenses/LICENSE-2.0
030 *
031 * Unless required by applicable law or agreed to in writing, software
032 * distributed under the License is distributed on an "AS IS" BASIS,
033 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
034 * See the License for the specific language governing permissions and
035 * limitations under the License.
036 *
037 */
038 package org.apache.qpid.server.protocol;
039
040 import java.security.Principal;
041 import java.util.Date;
042 import java.util.List;
043
044 import javax.management.JMException;
045 import javax.management.MBeanException;
046 import javax.management.MBeanNotificationInfo;
047 import javax.management.NotCompliantMBeanException;
048 import javax.management.Notification;
049 import javax.management.monitor.MonitorNotification;
050 import javax.management.openmbean.CompositeData;
051 import javax.management.openmbean.CompositeDataSupport;
052 import javax.management.openmbean.CompositeType;
053 import javax.management.openmbean.OpenDataException;
054 import javax.management.openmbean.OpenType;
055 import javax.management.openmbean.SimpleType;
056 import javax.management.openmbean.TabularData;
057 import javax.management.openmbean.TabularDataSupport;
058 import javax.management.openmbean.TabularType;
059
060 import org.apache.qpid.AMQException;
061 import org.apache.qpid.framing.AMQFrame;
062 import org.apache.qpid.framing.AMQShortString;
063 import org.apache.qpid.framing.ConnectionCloseBody;
064 import org.apache.qpid.framing.MethodRegistry;
065 import org.apache.qpid.protocol.AMQConstant;
066 import org.apache.qpid.server.AMQChannel;
067 import org.apache.qpid.server.management.AMQManagedObject;
068 import org.apache.qpid.server.management.MBeanConstructor;
069 import org.apache.qpid.server.management.MBeanDescription;
070 import org.apache.qpid.server.management.ManagedObject;
071
072 /**
073 * This MBean class implements the management interface. In order to make more attributes, operations and notifications
074 * available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here.
075 */
076 @MBeanDescription("Management Bean for an AMQ Broker Connection")
077 public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
078 {
079 private AMQMinaProtocolSession _session = null;
080 private String _name = null;
081
082 // openmbean data types for representing the channel attributes
083 private static final String[] _channelAtttibuteNames =
084 { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count" };
085 private static final String[] _indexNames = { _channelAtttibuteNames[0] };
086 private static final OpenType[] _channelAttributeTypes =
087 { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER };
088 private static CompositeType _channelType = null; // represents the data type for channel data
089 private static TabularType _channelsType = null; // Data type for list of channels type
090 private static final AMQShortString BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION =
091 new AMQShortString("Broker Management Console has closed the connection.");
092
093 @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
094 public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException
095 {
096 super(ManagedConnection.class, ManagedConnection.TYPE);
097 _session = session;
098 String remote = getRemoteAddress();
099 remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
100 _name = jmxEncode(new StringBuffer(remote), 0).toString();
101 init();
102 }
103
104 static
105 {
106 try
107 {
108 init();
109 }
110 catch (JMException ex)
111 {
112 // This is not expected to ever occur.
113 throw new RuntimeException("Got JMException in static initializer.", ex);
114 }
115 }
116
117 /**
118 * initialises the openmbean data types
119 */
120 private static void init() throws OpenDataException
121 {
122 _channelType =
123 new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, _channelAtttibuteNames,
124 _channelAttributeTypes);
125 _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
126 }
127
128 public String getClientId()
129 {
130 return (_session.getContextKey() == null) ? null : _session.getContextKey().toString();
131 }
132
133 public String getAuthorizedId()
134 {
135 return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null;
136 }
137
138 public String getVersion()
139 {
140 return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString();
141 }
142
143 public Date getLastIoTime()
144 {
145 return new Date(_session.getIOSession().getLastIoTime());
146 }
147
148 public String getRemoteAddress()
149 {
150 return _session.getIOSession().getRemoteAddress().toString();
151 }
152
153 public ManagedObject getParentObject()
154 {
155 return _session.getVirtualHost().getManagedObject();
156 }
157
158 public Long getWrittenBytes()
159 {
160 return _session.getIOSession().getWrittenBytes();
161 }
162
163 public Long getReadBytes()
164 {
165 return _session.getIOSession().getReadBytes();
166 }
167
168 public Long getMaximumNumberOfChannels()
169 {
170 return _session.getMaximumNumberOfChannels();
171 }
172
173 public void setMaximumNumberOfChannels(Long value)
174 {
175 _session.setMaximumNumberOfChannels(value);
176 }
177
178 public String getObjectInstanceName()
179 {
180 return _name;
181 }
182
183 /**
184 * commits transactions for a transactional channel
185 *
186 * @param channelId
187 * @throws JMException if channel with given id doesn't exist or if commit fails
188 */
189 public void commitTransactions(int channelId) throws JMException
190 {
191 try
192 {
193 AMQChannel channel = _session.getChannel(channelId);
194 if (channel == null)
195 {
196 throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
197 }
198
199 _session.commitTransactions(channel);
200 }
201 catch (AMQException ex)
202 {
203 throw new MBeanException(ex, ex.toString());
204 }
205 }
206
207 /**
208 * rollsback the transactions for a transactional channel
209 *
210 * @param channelId
211 * @throws JMException if channel with given id doesn't exist or if rollback fails
212 */
213 public void rollbackTransactions(int channelId) throws JMException
214 {
215 try
216 {
217 AMQChannel channel = _session.getChannel(channelId);
218 if (channel == null)
219 {
220 throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
221 }
222
223 _session.rollbackTransactions(channel);
224 }
225 catch (AMQException ex)
226 {
227 throw new MBeanException(ex, ex.toString());
228 }
229 }
230
231 /**
232 * Creates the list of channels in tabular form from the _channelMap.
233 *
234 * @return list of channels in tabular form.
235 * @throws OpenDataException
236 */
237 public TabularData channels() throws OpenDataException
238 {
239 TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
240 List<AMQChannel> list = _session.getChannels();
241
242 for (AMQChannel channel : list)
243 {
244 Object[] itemValues =
245 {
246 channel.getChannelId(), channel.isTransactional(),
247 (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName().asString() : null,
248 channel.getUnacknowledgedMessageMap().size()
249 };
250
251 CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
252 channelsList.put(channelData);
253 }
254
255 return channelsList;
256 }
257
258 /**
259 * closes the connection. The administrator can use this management operation to close connection to free up
260 * resources.
261 * @throws JMException
262 */
263 public void closeConnection() throws JMException
264 {
265
266 MethodRegistry methodRegistry = _session.getMethodRegistry();
267 ConnectionCloseBody responseBody =
268 methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
269 // replyCode
270 BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION,
271 // replyText,
272 0,
273 0);
274
275 _session.writeFrame(responseBody.generateFrame(0));
276
277 try
278 {
279 _session.closeSession();
280 }
281 catch (AMQException ex)
282 {
283 throw new MBeanException(ex, ex.toString());
284 }
285 }
286
287 @Override
288 public MBeanNotificationInfo[] getNotificationInfo()
289 {
290 String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
291 String name = MonitorNotification.class.getName();
292 String description = "Channel count has reached threshold value";
293 MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
294
295 return new MBeanNotificationInfo[] { info1 };
296 }
297
298 public void notifyClients(String notificationMsg)
299 {
300 Notification n =
301 new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
302 System.currentTimeMillis(), notificationMsg);
303 _broadcaster.sendNotification(n);
304 }
305
306 } // End of MBean class
|