ManagementClient.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.management.domain.services;
022 
023 import java.util.UUID;
024 
025 import org.apache.qpid.QpidException;
026 import org.apache.qpid.management.Messages;
027 import org.apache.qpid.management.Names;
028 import org.apache.qpid.management.configuration.BrokerConnectionData;
029 import org.apache.qpid.management.configuration.Configuration;
030 import org.apache.qpid.management.domain.model.DomainModel;
031 import org.apache.qpid.transport.util.Logger;
032 
033 /**
034  * This is the Object representation of a management client.
035  * According to specification : "A software component that is separate from the messaging broker, connected to the 
036  * management broker via an AMQP connection, which allows any software component to be managed remotely by QPID."
037  *
038  @author Andrea Gazzarini
039  */
040 public final class ManagementClient
041 {      
042     private final static Logger LOGGER = Logger.get(ManagementClient.class);
043     
044     private final String _managementQueueName;
045     private final String _methodReplyQueueName;
046     
047     private DomainModel _domainModel;
048     private QpidService _service;
049    
050     private final BrokerConnectionData _connectionData;
051     
052     /**
053      * Builds a new <code>ManagementClient</code> with the given identifier and connection data.
054      
055      @param brokerId the broker identifier.
056      @param connectionData the broker connection data (host, port, etc...)
057      */
058     ManagementClient(UUID brokerId,BrokerConnectionData connectionData)
059     {
060       _connectionData = connectionData;
061         _service = new QpidService(brokerId);
062         _domainModel = new DomainModel(brokerId);
063         _managementQueueName = Configuration.getInstance().getManagementQueueName();
064         _methodReplyQueueName = Configuration.getInstance().getMethodReplyQueueName();
065     }
066 
067     @Override
068     public String toString()
069     {
070       return _connectionData.toString();
071     }
072     
073     /**
074      * Returns the connection data associated with this management client.
075      
076      @return the connection data associated with this management client.
077      */
078     public BrokerConnectionData getBrokerConnectionData()
079     {
080       return _connectionData;
081     }
082     
083     /**
084      * Establishing initial communication Between Client and Broker.
085      * According to specification :
086      * "Communication is established between the management client and management agent using normal AMQP procedures. 
087      * The client creates a connection to the broker and then establishes a session with its corresponding channel.
088      * Two private queues are then declared. 
089      * A management queue is declared and bound to the qpid.management exchange with "mgmt.#" as routing key; in that 
090      * way all management-related messages sent to the exchange will be received by this client. 
091      * When a client successfully binds to the qpid.management exchange, the management agent schedules a schema 
092      * broadcast to be sent to the exchange. 
093      * The agent will publish, via the exchange, a description of the schema for all manageable objects in its control. That 
094      * schema is therefore received by this service and it will be part of service's domain model."
095      
096      @throws StartupFailureException when this management client cannot perform startup operations due to an error.
097      */
098     void estabilishFirstConnectionWithBroker() throws StartupFailureException{
099         try {
100             connectWithBroker();
101             
102             createAndBindMethodReplyQueue();
103             createAndBindManagementQueue();
104                     
105             registerConsumerOnManagementQueue();
106             registerConsumerOnMethodReplyQueue();
107             
108             synchronize();
109         catch(Exception exception
110         {
111             try {
112                 _service.close();
113             catch(Exception ignore
114             {                
115             }            
116             throw new StartupFailureException(exception);
117         }
118     }
119 
120     /**
121      * Shutdown procedure for this management client.
122      */
123     void shutdown ()
124     {        
125         LOGGER.info(Messages.QMAN_000011_SHUTDOWN_INITIATED,_domainModel.getBrokerId());
126         
127         removeMethodReplyConsumer();
128         destroyAndUnbingMethodReplyQueue();
129         
130         removeManagementConsumer();
131         destroyAndUnbingManagementQueue();
132         
133         _domainModel.releaseResources();
134         
135         _service.close();
136 
137         LOGGER.info(Messages.QMAN_000012_MANAGEMENT_CLIENT_SHUT_DOWN,_domainModel.getBrokerId());
138     }
139 
140     /**
141      * Registers a consumer (that is, a listener) on the method-reply queue.
142      */
143     private void registerConsumerOnMethodReplyQueue ()
144     {
145         BrokerMessageListener methodReplyChannelListener = new BrokerMessageListener(_domainModel);
146         methodReplyChannelListener.setHandlers(Configuration.getInstance().getMethodReplyQueueHandlers());
147         _service.createSubscription(_methodReplyQueueName, _methodReplyQueueName, methodReplyChannelListener);
148         
149         LOGGER.info(Messages.QMAN_000013_METHOD_REPLY_CONSUMER_INSTALLED, _domainModel.getBrokerId());             
150     }
151 
152     /**
153      * Registers a consumer (listener) on the management queue.
154      */
155     private void registerConsumerOnManagementQueue () throws QpidException
156     {  
157         BrokerMessageListener managementChannelListener = new BrokerMessageListener(_domainModel);
158         managementChannelListener.setHandlers(Configuration.getInstance().getManagementQueueHandlers());        
159         _service.createSubscription(_managementQueueName, _managementQueueName, managementChannelListener);
160         
161         LOGGER.info(Messages.QMAN_000014_MANAGEMENT_CONSUMER_INSTALLED, _domainModel.getBrokerId());               
162     }
163 
164     /**
165      * Declares a management queue and bound it to the "qpid.management" exchange with "mgmt.#" as routing key; 
166      */
167     private void createAndBindManagementQueue ()
168     {
169         _service.declareQueue(_managementQueueName);
170         _service.declareBinding(
171                 _managementQueueName, 
172                 Names.MANAGEMENT_EXCHANGE, 
173                 Names.MANAGEMENT_ROUTING_KEY);
174 
175         LOGGER.info(Messages.QMAN_000015_MANAGEMENT_QUEUE_DECLARED,_managementQueueName,_domainModel.getBrokerId());       
176     }
177     
178     /**
179      * Declares a private queue for receiving method replies (after method invocations). 
180      * This queue is bound to the amq.direct exchange using a routing key equal to the name of the queue.
181      */
182     private void createAndBindMethodReplyQueue ()
183     {
184         _service.declareQueue(_methodReplyQueueName);
185         _service.declareBinding(_methodReplyQueueName, Names.AMQ_DIRECT_QUEUE, _methodReplyQueueName);
186         
187         LOGGER.info(Messages.QMAN_000016_METHOD_REPLY_QUEUE_DECLARED,_methodReplyQueueName, _domainModel.getBrokerId());       
188     }    
189     
190     /**
191      * Removes the method-reply queue consumer. 
192      */
193     private void removeMethodReplyConsumer()
194     {
195         _service.removeSubscription(_methodReplyQueueName)
196         
197         LOGGER.info(Messages.QMAN_000017_CONSUMER_HAS_BEEN_REMOVED,_methodReplyQueueName,_domainModel.getBrokerId());
198     }
199     
200     /**
201      * Unbind the method reply queue and after that destroy it from remote broker.
202      */
203     private void destroyAndUnbingMethodReplyQueue()
204     {
205         _service.declareUnbinding(_methodReplyQueueName, Names.AMQ_DIRECT_QUEUE, _methodReplyQueueName);
206         _service.deleteQueue(_methodReplyQueueName);        
207         
208         LOGGER.info(Messages.QMAN_000018_QUEUE_UNDECLARED,_methodReplyQueueName,_domainModel.getBrokerId());
209     }
210     
211     /**
212      * Removes the management queue consumer. 
213      */
214     private void removeManagementConsumer()
215     {
216         _service.removeSubscription(_managementQueueName);  
217 
218         LOGGER.info(Messages.QMAN_000017_CONSUMER_HAS_BEEN_REMOVED,_managementQueueName,_domainModel.getBrokerId());
219     }
220     
221     /**
222      * Unbind the management queue and after that destroy it from remote broker.
223      */
224     private void destroyAndUnbingManagementQueue()
225     {
226         _service.declareUnbinding(_managementQueueName, Names.MANAGEMENT_EXCHANGE, Names.MANAGEMENT_ROUTING_KEY);
227         _service.deleteQueue(_managementQueueName);        
228 
229         LOGGER.info(Messages.QMAN_000018_QUEUE_UNDECLARED, _managementQueueName,_domainModel.getBrokerId());
230     }    
231     
232     /**
233      * Connects this client with the broker.
234      
235      @throws QpidException when it's not possibile to connect with the broker.
236      */
237     private void connectWithBroker() throws Exception 
238     {
239         _service.connect();
240     }
241 
242     /**
243      * All the Management client commands are asynchronous. 
244      * Synchronous behavior is achieved through invoking the sync method.
245      */
246     private void synchronize() 
247     {
248         _service.sync();
249     }
250 }