QManAdapterCapability.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.management.wsdm.capabilities;
022 
023 import java.lang.management.ManagementFactory;
024 import java.lang.reflect.Method;
025 import java.net.URI;
026 import java.util.ArrayList;
027 import java.util.Collection;
028 import java.util.HashMap;
029 import java.util.Map;
030 import java.util.concurrent.ArrayBlockingQueue;
031 import java.util.concurrent.ThreadPoolExecutor;
032 import java.util.concurrent.TimeUnit;
033 
034 import javax.management.InstanceNotFoundException;
035 import javax.management.MBeanServer;
036 import javax.management.Notification;
037 import javax.management.NotificationFilter;
038 import javax.management.NotificationListener;
039 import javax.management.ObjectName;
040 import javax.xml.namespace.QName;
041 
042 import org.apache.muse.core.AbstractCapability;
043 import org.apache.muse.core.Resource;
044 import org.apache.muse.core.ResourceManager;
045 import org.apache.muse.core.routing.MessageHandler;
046 import org.apache.muse.core.serializer.SerializerRegistry;
047 import org.apache.muse.ws.addressing.EndpointReference;
048 import org.apache.muse.ws.addressing.soap.SoapFault;
049 import org.apache.muse.ws.notification.NotificationProducer;
050 import org.apache.muse.ws.notification.WsnConstants;
051 import org.apache.qpid.management.Messages;
052 import org.apache.qpid.management.Names;
053 import org.apache.qpid.management.configuration.Configuration;
054 import org.apache.qpid.management.jmx.EntityLifecycleNotification;
055 import org.apache.qpid.management.wsdm.common.ThreadSessionManager;
056 import org.apache.qpid.management.wsdm.common.UnableToConnectWithBrokerFault;
057 import org.apache.qpid.management.wsdm.muse.engine.WSDMAdapterEnvironment;
058 import org.apache.qpid.management.wsdm.muse.serializer.ByteArraySerializer;
059 import org.apache.qpid.management.wsdm.notifications.LifeCycleEvent;
060 import org.apache.qpid.transport.util.Logger;
061 
062 /**
063  * QMan Adapter capability.
064  * Basically it acts as a lifecycle manager of all ws resource that correspond to entities on JMX side.
065  
066  @author Andrea Gazzarini
067 */
068 @SuppressWarnings("serial")
069 public class QManAdapterCapability extends AbstractCapability
070 {  
071   private final static Logger LOGGER = Logger.get(QManAdapterCapability.class);
072 
073   private MBeanServer _mxServer;
074   private WsArtifactsFactory _artifactsFactory; 
075   private URI _resourceURI;
076   private NotificationProducer _publisherCapability;
077   private ThreadPoolExecutor _workManager;
078   private Map<String, QName> _lifeCycleTopics = new HashMap<String, QName>();
079   
080   /**
081    * Runnable wrapper used for sending asynchronous 
082    * notifications.
083    
084    @author Andrea Gazzarini
085    */
086   private final class AsynchNotificationTask implements Runnable 
087   {
088     private final QName topicName;
089     private final LifeCycleEvent event;
090     
091     AsynchNotificationTask(QName tName, LifeCycleEvent evt)
092     {
093       topicName = tName;
094       event = evt;
095     }
096     
097     public void run()
098     {
099       try
100       {
101         _publisherCapability.publish(topicName,event);
102       catch (SoapFault exception)
103       {
104         LOGGER.error(
105             exception,
106             Messages.QMAN_100038_UNABLE_TO_SEND_WS_NOTIFICATION);
107       }      
108     }
109   };
110   
111   /**
112    * NotificationFilter for "create" only events.
113    */
114   private final NotificationFilter _filterForNewInstances = new NotificationFilter(){
115 
116     /**
117      * Returns true when the notification is related to a creation of a new instance. 
118      
119      @return true when the notification is related to a creation of a new instance.
120      */
121     public boolean isNotificationEnabled(Notification notification)
122     {
123       return EntityLifecycleNotification.INSTANCE_ADDED_NOTIFICATION_TYPE.equals(notification.getType());
124     }
125     
126   };
127 
128   /**
129    * NotificationFilter for "remove" only events.
130    */
131   private final NotificationFilter _filterForRemovedInstances = new NotificationFilter(){
132 
133     /**
134      * Returns true when the notification is related to a deletion of an existing instance. 
135      
136      @return true when the notification is related to a deletion of an existing instance.
137      */
138     public boolean isNotificationEnabled(Notification notification)
139     {
140       return EntityLifecycleNotification.INSTANCE_REMOVED_NOTIFICATION_TYPE.equals(notification.getType());
141     }
142   };
143   
144   /**
145    * This listener handles "create" mbean events and therefore provides procedure to create and initialize
146    * corresponding ws resources.
147    */
148   private final NotificationListener _listenerForNewInstances = new NotificationListener() 
149   {
150     /**
151      * Handles JMX "create" notification type.
152      
153      @param notification the entity lifecycle notification.
154      @param data user data associated with the incoming notifiication : it is not used at the moment.
155      */
156     public void handleNotification(Notification notification, Object data
157     {
158       ObjectName eventSourceName = null;
159       try 
160       {        
161         EntityLifecycleNotification lifecycleNotification = (EntityLifecycleNotificationnotification;
162         eventSourceName = lifecycleNotification.getObjectName();
163         
164         ThreadSessionManager.getInstance().getSession().setObjectName(eventSourceName);
165       
166         LOGGER.debug(Messages.QMAN_200039_DEBUG_JMX_NOTIFICATION, notification);
167 
168         ResourceManager resourceManager = getResource().getResourceManager();
169         Resource resource = resourceManager.createResource(Names.QMAN_RESOURCE_NAME);
170         
171         WsArtifacts artifacts = _artifactsFactory.getArtifactsFor(resource,eventSourceName);
172         MBeanCapability capability = _artifactsFactory.createCapability(
173             artifacts.getCapabilityClass()
174             eventSourceName);
175         
176         ThreadSessionManager.getInstance().getSession().setWsdlDocument(artifacts.getWsdl());
177         ThreadSessionManager.getInstance().getSession().setResourceMetadataDescriptor(artifacts.getResourceMetadataDescriptor());
178         
179         resource.setWsdlPortType(Names.QMAN_RESOURCE_PORT_TYPE_NAME);
180         capability.setCapabilityURI(Names.NAMESPACE_URI+"/"+capability.getClass().getSimpleName());
181         capability.setMessageHandlers(createMessageHandlers(capability));
182         
183         resource.addCapability(capability);
184         resource.initialize();
185         resourceManager.addResource(resource.getEndpointReference(), resource);
186         
187         LOGGER.info(
188             Messages.QMAN_000030_RESOURCE_HAS_BEEN_CREATED,
189             eventSourceName);
190         
191         AsynchNotificationTask asynchNotificationTask = new AsynchNotificationTask(
192             getTopicName(lifecycleNotification.getClassKind()),
193             LifeCycleEvent.newCreateEvent(
194                 eventSourceName.getKeyProperty(Names.OBJECT_ID)
195                 lifecycleNotification.getPackageName(),
196                 lifecycleNotification.getClassName()));
197         
198         _workManager.execute(asynchNotificationTask);
199         
200       catch (ArtifactsNotAvailableException exception
201       {
202         LOGGER.error(
203             exception,
204             Messages.QMAN_100023_BUILD_WS_ARTIFACTS_FAILURE);
205       catch (IllegalAccessException exception
206       {
207         LOGGER.error(
208             exception,
209             Messages.QMAN_100024_CAPABILITY_INSTANTIATION_FAILURE,
210             eventSourceName);
211       catch (InstantiationException exception
212       {
213         LOGGER.error(
214             exception,
215             Messages.QMAN_100024_CAPABILITY_INSTANTIATION_FAILURE,
216             eventSourceName);
217       catch (SoapFault exception
218       {
219         LOGGER.error(
220             exception,Messages.QMAN_100025_WSRF_FAILURE,
221             eventSourceName);  
222       catch (Exception exception
223       {
224         LOGGER.error(
225             exception,
226             Messages.QMAN_100025_WSRF_FAILURE,
227             eventSourceName);  
228       
229     }
230   };
231   
232   /**
233    * This listener handles "remove" mbean events and therefore provides procedure to shutdown and remove
234    * corresponding ws resources.
235    */
236   private final NotificationListener _listenerForRemovedInstances = new NotificationListener() 
237   {
238     /**
239      * Handles JMX "remove" notification type.
240      
241      @param notification the entity lifecycle notification.
242      @param data user data associated with the incoming notifiication : it is not used at the moment.
243      */
244     public void handleNotification(Notification notification, Object data
245     {
246       EntityLifecycleNotification lifecycleNotification = (EntityLifecycleNotificationnotification;
247       ObjectName eventSourceName = lifecycleNotification.getObjectName();
248 
249       LOGGER.debug(Messages.QMAN_200042_REMOVING_RESOURCE, eventSourceName);
250 
251       EndpointReference endpointPointReference = new EndpointReference(_resourceURI);      
252       endpointPointReference.addParameter(
253           Names.RESOURCE_ID_QNAME, 
254           eventSourceName.getKeyProperty(Names.OBJECT_ID));
255       
256       ResourceManager resourceManager = getResource().getResourceManager();
257       try 
258       {
259         Resource resource = resourceManager.getResource(endpointPointReference);
260         resource.shutdown();
261           
262         LOGGER.info(
263             Messages.QMAN_000031_RESOURCE_HAS_BEEN_REMOVED, 
264             eventSourceName);
265 
266         AsynchNotificationTask asynchNotificationTask = new AsynchNotificationTask(
267             getTopicName(lifecycleNotification.getClassKind()),
268             LifeCycleEvent.newRemoveEvent(
269                 eventSourceName.getKeyProperty(Names.OBJECT_ID)
270                 lifecycleNotification.getPackageName(),
271                 lifecycleNotification.getClassName()));
272         
273         _workManager.execute(asynchNotificationTask);
274 
275       }
276       catch(Exception exception
277       {
278         LOGGER.error(
279             exception, 
280             Messages.QMAN_100027_RESOURCE_SHUTDOWN_FAILURE, 
281             eventSourceName);
282       }
283     }
284   };  
285       
286   /**
287    * Initializes this capability.
288    
289    @throws SoapFault when the initialization fails..
290    */
291   @Override
292   public void initialize() throws SoapFault 
293   {
294     super.initialize();
295     
296     registerByteArraySerializer();
297     
298     createLifeCycleTopics();
299     
300     initializeWorkManager();
301     
302     createQManResourceURI();
303 
304     _mxServer = ManagementFactory.getPlatformMBeanServer();
305     _artifactsFactory = new WsArtifactsFactory(getEnvironment(),_mxServer);
306     
307     registerQManLifecycleListeners();  
308   }
309 
310   /**
311    * Connects QMan with a broker with the given connection data.
312    
313    @param host the host where the broker is running.
314    @param port the port number where the broker is running.
315    @param username username for estabilshing connection.
316    @param password password for estabilshing connection.
317    @param virtualHost the virtualHost name.
318    @param initialPoolCapacity the initial size of broker connection pool. 
319    @param maxPoolCapacity the max allowed size of broker connection pool.
320    @param maxWaitTimeout the max wait timeout for retrieving connections.
321    @throws SoapFault when the connection with broker cannot be estabilished.
322    */
323   @SuppressWarnings("unchecked")
324   public void connect(
325       String host, 
326       int port, 
327       String username, 
328       String password, 
329       String virtualHost,
330       int initialPoolCapacity,
331       int maxPoolCapacity, 
332       long maxWaitTimeoutthrows SoapFault 
333   {
334     try 
335     {
336       _mxServer.invoke(
337           Names.QMAN_OBJECT_NAME, 
338           "addBroker"
339           new Object[]{host,port,username,password,virtualHost,initialPoolCapacity,maxPoolCapacity,maxWaitTimeout}
340           new String[]{
341               String.class.getName(),
342               int.class.getName(),
343               String.class.getName(),
344               String.class.getName(),
345               String.class.getName(),
346               int.class.getName(),
347               int.class.getName(),
348               long.class.getName()});
349     catch(Exception exception)
350     {      
351       LOGGER.error(Messages.QMAN_100017_UNABLE_TO_CONNECT,host,port);
352       throw new UnableToConnectWithBrokerFault(
353           getResource().getEndpointReference(),
354           host,
355           port,
356           username,
357           virtualHost,
358           exception.getMessage());
359     }
360   }
361     
362   /**
363    * Creates the message handlers for the given capability.
364    
365    @param capability the QMan capability.
366    @return a collection with message handlers for the given capability.
367    */
368   protected Collection<MessageHandler> createMessageHandlers(MBeanCapability capability)
369   {
370         Collection<MessageHandler> handlers = new ArrayList<MessageHandler>();
371         
372         for (Method method :  capability.getClass().getDeclaredMethods())
373         {
374           String name = method.getName();
375           
376           QName requestName = new QName(
377               Names.NAMESPACE_URI,
378               name,
379               Names.PREFIX);
380           
381           QName returnValueName = new QName(
382               Names.NAMESPACE_URI,
383               name+"Response",
384               Names.PREFIX);
385           
386           String actionURI = Names.NAMESPACE_URI+"/"+name;
387             
388           MessageHandler handler = new QManMessageHandler(
389                 actionURI, 
390                 requestName, 
391                 returnValueName);
392             
393           handler.setMethod(method);
394             handlers.add(handler);
395         }
396         return handlers;  
397     }
398 
399   /**
400    * Returns the publisher capability associated with the owner resource.
401    
402    @return the publisher capability associated with the owner resource.
403    */
404   NotificationProducer getPublisherCapability()
405   {
406     return (NotificationProducergetResource().getCapability(WsnConstants.PRODUCER_URI);
407   }
408   
409   /**
410    * Creates events & objects lifecycle topic that will be used to publish lifecycle event
411    * messages..
412    */
413   void createLifeCycleTopics() 
414   {
415     try 
416     {
417       _publisherCapability = getPublisherCapability();
418       
419       _publisherCapability.addTopic(Names.EVENTS_LIFECYLE_TOPIC_NAME);
420       _lifeCycleTopics.put(Names.EVENT,Names.EVENTS_LIFECYLE_TOPIC_NAME);
421 
422       LOGGER.info(
423           Messages.QMAN_000032_EVENTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, 
424           Names.OBJECTS_LIFECYLE_TOPIC_NAME);
425       
426       _publisherCapability.addTopic(Names.OBJECTS_LIFECYLE_TOPIC_NAME);    
427       _lifeCycleTopics.put(Names.CLASS,Names.OBJECTS_LIFECYLE_TOPIC_NAME);
428 
429       LOGGER.info(
430           Messages.QMAN_000033_OBJECTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, 
431           Names.OBJECTS_LIFECYLE_TOPIC_NAME);
432       
433       _publisherCapability.addTopic(Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME);          
434       LOGGER.info(
435           Messages.QMAN_000034_UNCLASSIFIED_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, 
436           Names.OBJECTS_LIFECYLE_TOPIC_NAME);
437     catch(Exception exception
438     {
439       LOGGER.error(exception, Messages.QMAN_100036_TOPIC_DECLARATION_FAILURE);
440     }
441   }
442   
443   /**
444    * Starting from an object type (i.e. event or class) returns the name of the
445    * corresponding topic where the lifecycle message must be published.
446    * Note that if the given object type is unknown then the "Unclassified Object Types" topic 
447    * will be returned (and therefore the message will be published there).
448    
449    @param objectType the type of the object.
450    @return the name of the topic associated with the given object type.
451    */
452   QName getTopicName(String objectType
453   {
454     QName topicName = _lifeCycleTopics.get(objectType);
455     return (topicName != null
456       ? topicName 
457       : Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME;
458   }
459   
460   /** 
461    * Workaround : it seems that is not possibile to declare a serializer 
462    * for a byte array using muse descriptor...
463   *  What is the stringified name of the class? 
464   *  byte[].getClass().getName() is [B but is not working (ClassNotFound).
465   *   So, at the end, this is hard-coded here!
466   */
467   private void registerByteArraySerializer()
468   {
469     SerializerRegistry.getInstance().registerSerializer(
470         byte[].class, 
471         new ByteArraySerializer());    
472   }
473   
474   /**
475    * Creates the URI that will be later used to identify a QMan WS-Resource.
476    * Note that the resources that will be created are identified also with their resource id.
477    * Briefly we could say that this is the soap:address of the WS-Resource definition.
478    
479    @throws SoapFault when the URI cannot be built (probably it is malformed).
480    */
481   private void createQManResourceURI() throws SoapFault
482   {
483     WSDMAdapterEnvironment environment = (WSDMAdapterEnvironmentgetEnvironment();
484     String resourceURI = environment.getDefaultURIPrefix()+Names.QMAN_RESOURCE_NAME;
485     try 
486     {
487       _resourceURI = URI.create(resourceURI);
488       
489     catch(IllegalArgumentException exception)
490     {
491       LOGGER.info(
492           exception,
493           Messages.QMAN_100029_MALFORMED_RESOURCE_URI_FAILURE,
494           resourceURI);      
495       throw new SoapFault(exception);
496     }
497   }  
498   
499   /**
500    * Initializes the work manager used for asynchronous notifications.
501    */
502   private void initializeWorkManager()
503   {
504     Configuration configuration = Configuration.getInstance();
505     _workManager = new ThreadPoolExecutor(
506         configuration.getWorkerManagerPoolSize(),
507         configuration.getWorkerManagerMaxPoolSize(),
508         configuration.getWorkerManagerKeepAliveTime(),
509         TimeUnit.MILLISECONDS,
510         new ArrayBlockingQueue<Runnable>(30));
511   }
512 
513   /**
514    * This adapter capability needs to be an event listener of QMan JMX core 
515    * in order to detect relevant lifecycle events and therefore create WS artifacts & notification(s).
516    
517    @throws SoapFault when it's not possible to register event listener : is QMan running?
518    */
519   @SuppressWarnings("serial")
520   private void registerQManLifecycleListeners() throws SoapFault
521   {
522     try 
523     {      
524       _mxServer.addNotificationListener(
525           Names.QMAN_OBJECT_NAME, 
526           _listenerForNewInstances, 
527           _filterForNewInstances, 
528           null);
529       
530       _mxServer.addNotificationListener(
531           Names.QMAN_OBJECT_NAME, 
532           _listenerForRemovedInstances, 
533           _filterForRemovedInstances, 
534           null);
535 
536       try 
537       {
538         _mxServer.addNotificationListener(
539             Names.QPID_EMULATOR_OBJECT_NAME, 
540             _listenerForNewInstances, 
541             _filterForNewInstances, null);
542 
543         _mxServer.addNotificationListener(
544             Names.QPID_EMULATOR_OBJECT_NAME, 
545             _listenerForRemovedInstances, 
546             _filterForRemovedInstances, null);
547 
548       catch (Exception exception
549       {
550         LOGGER.info(Messages.QMAN_000028_TEST_MODULE_NOT_FOUND);
551       
552     }  catch(InstanceNotFoundException exception
553     {
554       throw new SoapFault(exception);  
555     }
556   }  
557 }