001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.management.wsdm.capabilities;
022
023 import java.lang.management.ManagementFactory;
024 import java.lang.reflect.Method;
025 import java.net.URI;
026 import java.util.ArrayList;
027 import java.util.Collection;
028 import java.util.HashMap;
029 import java.util.Map;
030 import java.util.concurrent.ArrayBlockingQueue;
031 import java.util.concurrent.ThreadPoolExecutor;
032 import java.util.concurrent.TimeUnit;
033
034 import javax.management.InstanceNotFoundException;
035 import javax.management.MBeanServer;
036 import javax.management.Notification;
037 import javax.management.NotificationFilter;
038 import javax.management.NotificationListener;
039 import javax.management.ObjectName;
040 import javax.xml.namespace.QName;
041
042 import org.apache.muse.core.AbstractCapability;
043 import org.apache.muse.core.Resource;
044 import org.apache.muse.core.ResourceManager;
045 import org.apache.muse.core.routing.MessageHandler;
046 import org.apache.muse.core.serializer.SerializerRegistry;
047 import org.apache.muse.ws.addressing.EndpointReference;
048 import org.apache.muse.ws.addressing.soap.SoapFault;
049 import org.apache.muse.ws.notification.NotificationProducer;
050 import org.apache.muse.ws.notification.WsnConstants;
051 import org.apache.qpid.management.Messages;
052 import org.apache.qpid.management.Names;
053 import org.apache.qpid.management.configuration.Configuration;
054 import org.apache.qpid.management.jmx.EntityLifecycleNotification;
055 import org.apache.qpid.management.wsdm.common.ThreadSessionManager;
056 import org.apache.qpid.management.wsdm.common.UnableToConnectWithBrokerFault;
057 import org.apache.qpid.management.wsdm.muse.engine.WSDMAdapterEnvironment;
058 import org.apache.qpid.management.wsdm.muse.serializer.ByteArraySerializer;
059 import org.apache.qpid.management.wsdm.notifications.LifeCycleEvent;
060 import org.apache.qpid.transport.util.Logger;
061
062 /**
063 * QMan Adapter capability.
064 * Basically it acts as a lifecycle manager of all ws resource that correspond to entities on JMX side.
065 *
066 * @author Andrea Gazzarini
067 */
068 @SuppressWarnings("serial")
069 public class QManAdapterCapability extends AbstractCapability
070 {
071 private final static Logger LOGGER = Logger.get(QManAdapterCapability.class);
072
073 private MBeanServer _mxServer;
074 private WsArtifactsFactory _artifactsFactory;
075 private URI _resourceURI;
076 private NotificationProducer _publisherCapability;
077 private ThreadPoolExecutor _workManager;
078 private Map<String, QName> _lifeCycleTopics = new HashMap<String, QName>();
079
080 /**
081 * Runnable wrapper used for sending asynchronous
082 * notifications.
083 *
084 * @author Andrea Gazzarini
085 */
086 private final class AsynchNotificationTask implements Runnable
087 {
088 private final QName topicName;
089 private final LifeCycleEvent event;
090
091 AsynchNotificationTask(QName tName, LifeCycleEvent evt)
092 {
093 topicName = tName;
094 event = evt;
095 }
096
097 public void run()
098 {
099 try
100 {
101 _publisherCapability.publish(topicName,event);
102 } catch (SoapFault exception)
103 {
104 LOGGER.error(
105 exception,
106 Messages.QMAN_100038_UNABLE_TO_SEND_WS_NOTIFICATION);
107 }
108 }
109 };
110
111 /**
112 * NotificationFilter for "create" only events.
113 */
114 private final NotificationFilter _filterForNewInstances = new NotificationFilter(){
115
116 /**
117 * Returns true when the notification is related to a creation of a new instance.
118 *
119 * @return true when the notification is related to a creation of a new instance.
120 */
121 public boolean isNotificationEnabled(Notification notification)
122 {
123 return EntityLifecycleNotification.INSTANCE_ADDED_NOTIFICATION_TYPE.equals(notification.getType());
124 }
125
126 };
127
128 /**
129 * NotificationFilter for "remove" only events.
130 */
131 private final NotificationFilter _filterForRemovedInstances = new NotificationFilter(){
132
133 /**
134 * Returns true when the notification is related to a deletion of an existing instance.
135 *
136 * @return true when the notification is related to a deletion of an existing instance.
137 */
138 public boolean isNotificationEnabled(Notification notification)
139 {
140 return EntityLifecycleNotification.INSTANCE_REMOVED_NOTIFICATION_TYPE.equals(notification.getType());
141 }
142 };
143
144 /**
145 * This listener handles "create" mbean events and therefore provides procedure to create and initialize
146 * corresponding ws resources.
147 */
148 private final NotificationListener _listenerForNewInstances = new NotificationListener()
149 {
150 /**
151 * Handles JMX "create" notification type.
152 *
153 * @param notification the entity lifecycle notification.
154 * @param data user data associated with the incoming notifiication : it is not used at the moment.
155 */
156 public void handleNotification(Notification notification, Object data)
157 {
158 ObjectName eventSourceName = null;
159 try
160 {
161 EntityLifecycleNotification lifecycleNotification = (EntityLifecycleNotification) notification;
162 eventSourceName = lifecycleNotification.getObjectName();
163
164 ThreadSessionManager.getInstance().getSession().setObjectName(eventSourceName);
165
166 LOGGER.debug(Messages.QMAN_200039_DEBUG_JMX_NOTIFICATION, notification);
167
168 ResourceManager resourceManager = getResource().getResourceManager();
169 Resource resource = resourceManager.createResource(Names.QMAN_RESOURCE_NAME);
170
171 WsArtifacts artifacts = _artifactsFactory.getArtifactsFor(resource,eventSourceName);
172 MBeanCapability capability = _artifactsFactory.createCapability(
173 artifacts.getCapabilityClass(),
174 eventSourceName);
175
176 ThreadSessionManager.getInstance().getSession().setWsdlDocument(artifacts.getWsdl());
177 ThreadSessionManager.getInstance().getSession().setResourceMetadataDescriptor(artifacts.getResourceMetadataDescriptor());
178
179 resource.setWsdlPortType(Names.QMAN_RESOURCE_PORT_TYPE_NAME);
180 capability.setCapabilityURI(Names.NAMESPACE_URI+"/"+capability.getClass().getSimpleName());
181 capability.setMessageHandlers(createMessageHandlers(capability));
182
183 resource.addCapability(capability);
184 resource.initialize();
185 resourceManager.addResource(resource.getEndpointReference(), resource);
186
187 LOGGER.info(
188 Messages.QMAN_000030_RESOURCE_HAS_BEEN_CREATED,
189 eventSourceName);
190
191 AsynchNotificationTask asynchNotificationTask = new AsynchNotificationTask(
192 getTopicName(lifecycleNotification.getClassKind()),
193 LifeCycleEvent.newCreateEvent(
194 eventSourceName.getKeyProperty(Names.OBJECT_ID),
195 lifecycleNotification.getPackageName(),
196 lifecycleNotification.getClassName()));
197
198 _workManager.execute(asynchNotificationTask);
199
200 } catch (ArtifactsNotAvailableException exception)
201 {
202 LOGGER.error(
203 exception,
204 Messages.QMAN_100023_BUILD_WS_ARTIFACTS_FAILURE);
205 } catch (IllegalAccessException exception)
206 {
207 LOGGER.error(
208 exception,
209 Messages.QMAN_100024_CAPABILITY_INSTANTIATION_FAILURE,
210 eventSourceName);
211 } catch (InstantiationException exception)
212 {
213 LOGGER.error(
214 exception,
215 Messages.QMAN_100024_CAPABILITY_INSTANTIATION_FAILURE,
216 eventSourceName);
217 } catch (SoapFault exception)
218 {
219 LOGGER.error(
220 exception,Messages.QMAN_100025_WSRF_FAILURE,
221 eventSourceName);
222 } catch (Exception exception)
223 {
224 LOGGER.error(
225 exception,
226 Messages.QMAN_100025_WSRF_FAILURE,
227 eventSourceName);
228 }
229 }
230 };
231
232 /**
233 * This listener handles "remove" mbean events and therefore provides procedure to shutdown and remove
234 * corresponding ws resources.
235 */
236 private final NotificationListener _listenerForRemovedInstances = new NotificationListener()
237 {
238 /**
239 * Handles JMX "remove" notification type.
240 *
241 * @param notification the entity lifecycle notification.
242 * @param data user data associated with the incoming notifiication : it is not used at the moment.
243 */
244 public void handleNotification(Notification notification, Object data)
245 {
246 EntityLifecycleNotification lifecycleNotification = (EntityLifecycleNotification) notification;
247 ObjectName eventSourceName = lifecycleNotification.getObjectName();
248
249 LOGGER.debug(Messages.QMAN_200042_REMOVING_RESOURCE, eventSourceName);
250
251 EndpointReference endpointPointReference = new EndpointReference(_resourceURI);
252 endpointPointReference.addParameter(
253 Names.RESOURCE_ID_QNAME,
254 eventSourceName.getKeyProperty(Names.OBJECT_ID));
255
256 ResourceManager resourceManager = getResource().getResourceManager();
257 try
258 {
259 Resource resource = resourceManager.getResource(endpointPointReference);
260 resource.shutdown();
261
262 LOGGER.info(
263 Messages.QMAN_000031_RESOURCE_HAS_BEEN_REMOVED,
264 eventSourceName);
265
266 AsynchNotificationTask asynchNotificationTask = new AsynchNotificationTask(
267 getTopicName(lifecycleNotification.getClassKind()),
268 LifeCycleEvent.newRemoveEvent(
269 eventSourceName.getKeyProperty(Names.OBJECT_ID),
270 lifecycleNotification.getPackageName(),
271 lifecycleNotification.getClassName()));
272
273 _workManager.execute(asynchNotificationTask);
274
275 }
276 catch(Exception exception)
277 {
278 LOGGER.error(
279 exception,
280 Messages.QMAN_100027_RESOURCE_SHUTDOWN_FAILURE,
281 eventSourceName);
282 }
283 }
284 };
285
286 /**
287 * Initializes this capability.
288 *
289 * @throws SoapFault when the initialization fails..
290 */
291 @Override
292 public void initialize() throws SoapFault
293 {
294 super.initialize();
295
296 registerByteArraySerializer();
297
298 createLifeCycleTopics();
299
300 initializeWorkManager();
301
302 createQManResourceURI();
303
304 _mxServer = ManagementFactory.getPlatformMBeanServer();
305 _artifactsFactory = new WsArtifactsFactory(getEnvironment(),_mxServer);
306
307 registerQManLifecycleListeners();
308 }
309
310 /**
311 * Connects QMan with a broker with the given connection data.
312 *
313 * @param host the host where the broker is running.
314 * @param port the port number where the broker is running.
315 * @param username username for estabilshing connection.
316 * @param password password for estabilshing connection.
317 * @param virtualHost the virtualHost name.
318 * @param initialPoolCapacity the initial size of broker connection pool.
319 * @param maxPoolCapacity the max allowed size of broker connection pool.
320 * @param maxWaitTimeout the max wait timeout for retrieving connections.
321 * @throws SoapFault when the connection with broker cannot be estabilished.
322 */
323 @SuppressWarnings("unchecked")
324 public void connect(
325 String host,
326 int port,
327 String username,
328 String password,
329 String virtualHost,
330 int initialPoolCapacity,
331 int maxPoolCapacity,
332 long maxWaitTimeout) throws SoapFault
333 {
334 try
335 {
336 _mxServer.invoke(
337 Names.QMAN_OBJECT_NAME,
338 "addBroker",
339 new Object[]{host,port,username,password,virtualHost,initialPoolCapacity,maxPoolCapacity,maxWaitTimeout},
340 new String[]{
341 String.class.getName(),
342 int.class.getName(),
343 String.class.getName(),
344 String.class.getName(),
345 String.class.getName(),
346 int.class.getName(),
347 int.class.getName(),
348 long.class.getName()});
349 } catch(Exception exception)
350 {
351 LOGGER.error(Messages.QMAN_100017_UNABLE_TO_CONNECT,host,port);
352 throw new UnableToConnectWithBrokerFault(
353 getResource().getEndpointReference(),
354 host,
355 port,
356 username,
357 virtualHost,
358 exception.getMessage());
359 }
360 }
361
362 /**
363 * Creates the message handlers for the given capability.
364 *
365 * @param capability the QMan capability.
366 * @return a collection with message handlers for the given capability.
367 */
368 protected Collection<MessageHandler> createMessageHandlers(MBeanCapability capability)
369 {
370 Collection<MessageHandler> handlers = new ArrayList<MessageHandler>();
371
372 for (Method method : capability.getClass().getDeclaredMethods())
373 {
374 String name = method.getName();
375
376 QName requestName = new QName(
377 Names.NAMESPACE_URI,
378 name,
379 Names.PREFIX);
380
381 QName returnValueName = new QName(
382 Names.NAMESPACE_URI,
383 name+"Response",
384 Names.PREFIX);
385
386 String actionURI = Names.NAMESPACE_URI+"/"+name;
387
388 MessageHandler handler = new QManMessageHandler(
389 actionURI,
390 requestName,
391 returnValueName);
392
393 handler.setMethod(method);
394 handlers.add(handler);
395 }
396 return handlers;
397 }
398
399 /**
400 * Returns the publisher capability associated with the owner resource.
401 *
402 * @return the publisher capability associated with the owner resource.
403 */
404 NotificationProducer getPublisherCapability()
405 {
406 return (NotificationProducer) getResource().getCapability(WsnConstants.PRODUCER_URI);
407 }
408
409 /**
410 * Creates events & objects lifecycle topic that will be used to publish lifecycle event
411 * messages..
412 */
413 void createLifeCycleTopics()
414 {
415 try
416 {
417 _publisherCapability = getPublisherCapability();
418
419 _publisherCapability.addTopic(Names.EVENTS_LIFECYLE_TOPIC_NAME);
420 _lifeCycleTopics.put(Names.EVENT,Names.EVENTS_LIFECYLE_TOPIC_NAME);
421
422 LOGGER.info(
423 Messages.QMAN_000032_EVENTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED,
424 Names.OBJECTS_LIFECYLE_TOPIC_NAME);
425
426 _publisherCapability.addTopic(Names.OBJECTS_LIFECYLE_TOPIC_NAME);
427 _lifeCycleTopics.put(Names.CLASS,Names.OBJECTS_LIFECYLE_TOPIC_NAME);
428
429 LOGGER.info(
430 Messages.QMAN_000033_OBJECTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED,
431 Names.OBJECTS_LIFECYLE_TOPIC_NAME);
432
433 _publisherCapability.addTopic(Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME);
434 LOGGER.info(
435 Messages.QMAN_000034_UNCLASSIFIED_LIFECYCLE_TOPIC_HAS_BEEN_CREATED,
436 Names.OBJECTS_LIFECYLE_TOPIC_NAME);
437 } catch(Exception exception)
438 {
439 LOGGER.error(exception, Messages.QMAN_100036_TOPIC_DECLARATION_FAILURE);
440 }
441 }
442
443 /**
444 * Starting from an object type (i.e. event or class) returns the name of the
445 * corresponding topic where the lifecycle message must be published.
446 * Note that if the given object type is unknown then the "Unclassified Object Types" topic
447 * will be returned (and therefore the message will be published there).
448 *
449 * @param objectType the type of the object.
450 * @return the name of the topic associated with the given object type.
451 */
452 QName getTopicName(String objectType)
453 {
454 QName topicName = _lifeCycleTopics.get(objectType);
455 return (topicName != null)
456 ? topicName
457 : Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME;
458 }
459
460 /**
461 * Workaround : it seems that is not possibile to declare a serializer
462 * for a byte array using muse descriptor...
463 * What is the stringified name of the class?
464 * byte[].getClass().getName() is [B but is not working (ClassNotFound).
465 * So, at the end, this is hard-coded here!
466 */
467 private void registerByteArraySerializer()
468 {
469 SerializerRegistry.getInstance().registerSerializer(
470 byte[].class,
471 new ByteArraySerializer());
472 }
473
474 /**
475 * Creates the URI that will be later used to identify a QMan WS-Resource.
476 * Note that the resources that will be created are identified also with their resource id.
477 * Briefly we could say that this is the soap:address of the WS-Resource definition.
478 *
479 * @throws SoapFault when the URI cannot be built (probably it is malformed).
480 */
481 private void createQManResourceURI() throws SoapFault
482 {
483 WSDMAdapterEnvironment environment = (WSDMAdapterEnvironment) getEnvironment();
484 String resourceURI = environment.getDefaultURIPrefix()+Names.QMAN_RESOURCE_NAME;
485 try
486 {
487 _resourceURI = URI.create(resourceURI);
488
489 } catch(IllegalArgumentException exception)
490 {
491 LOGGER.info(
492 exception,
493 Messages.QMAN_100029_MALFORMED_RESOURCE_URI_FAILURE,
494 resourceURI);
495 throw new SoapFault(exception);
496 }
497 }
498
499 /**
500 * Initializes the work manager used for asynchronous notifications.
501 */
502 private void initializeWorkManager()
503 {
504 Configuration configuration = Configuration.getInstance();
505 _workManager = new ThreadPoolExecutor(
506 configuration.getWorkerManagerPoolSize(),
507 configuration.getWorkerManagerMaxPoolSize(),
508 configuration.getWorkerManagerKeepAliveTime(),
509 TimeUnit.MILLISECONDS,
510 new ArrayBlockingQueue<Runnable>(30));
511 }
512
513 /**
514 * This adapter capability needs to be an event listener of QMan JMX core
515 * in order to detect relevant lifecycle events and therefore create WS artifacts & notification(s).
516 *
517 * @throws SoapFault when it's not possible to register event listener : is QMan running?
518 */
519 @SuppressWarnings("serial")
520 private void registerQManLifecycleListeners() throws SoapFault
521 {
522 try
523 {
524 _mxServer.addNotificationListener(
525 Names.QMAN_OBJECT_NAME,
526 _listenerForNewInstances,
527 _filterForNewInstances,
528 null);
529
530 _mxServer.addNotificationListener(
531 Names.QMAN_OBJECT_NAME,
532 _listenerForRemovedInstances,
533 _filterForRemovedInstances,
534 null);
535
536 try
537 {
538 _mxServer.addNotificationListener(
539 Names.QPID_EMULATOR_OBJECT_NAME,
540 _listenerForNewInstances,
541 _filterForNewInstances, null);
542
543 _mxServer.addNotificationListener(
544 Names.QPID_EMULATOR_OBJECT_NAME,
545 _listenerForRemovedInstances,
546 _filterForRemovedInstances, null);
547
548 } catch (Exception exception)
549 {
550 LOGGER.info(Messages.QMAN_000028_TEST_MODULE_NOT_FOUND);
551 }
552 } catch(InstanceNotFoundException exception)
553 {
554 throw new SoapFault(exception);
555 }
556 }
557 }
|