001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.management.domain.services;
022
023 import java.util.UUID;
024
025 import org.apache.qpid.QpidException;
026 import org.apache.qpid.management.Messages;
027 import org.apache.qpid.management.Names;
028 import org.apache.qpid.management.configuration.BrokerConnectionData;
029 import org.apache.qpid.management.configuration.Configuration;
030 import org.apache.qpid.management.domain.model.DomainModel;
031 import org.apache.qpid.transport.util.Logger;
032
033 /**
034 * This is the Object representation of a management client.
035 * According to specification : "A software component that is separate from the messaging broker, connected to the
036 * management broker via an AMQP connection, which allows any software component to be managed remotely by QPID."
037 *
038 * @author Andrea Gazzarini
039 */
040 public final class ManagementClient
041 {
042 private final static Logger LOGGER = Logger.get(ManagementClient.class);
043
044 private final String _managementQueueName;
045 private final String _methodReplyQueueName;
046
047 private DomainModel _domainModel;
048 private QpidService _service;
049
050 private final BrokerConnectionData _connectionData;
051
052 /**
053 * Builds a new <code>ManagementClient</code> with the given identifier and connection data.
054 *
055 * @param brokerId the broker identifier.
056 * @param connectionData the broker connection data (host, port, etc...)
057 */
058 ManagementClient(UUID brokerId,BrokerConnectionData connectionData)
059 {
060 _connectionData = connectionData;
061 _service = new QpidService(brokerId);
062 _domainModel = new DomainModel(brokerId);
063 _managementQueueName = Configuration.getInstance().getManagementQueueName();
064 _methodReplyQueueName = Configuration.getInstance().getMethodReplyQueueName();
065 }
066
067 @Override
068 public String toString()
069 {
070 return _connectionData.toString();
071 }
072
073 /**
074 * Returns the connection data associated with this management client.
075 *
076 * @return the connection data associated with this management client.
077 */
078 public BrokerConnectionData getBrokerConnectionData()
079 {
080 return _connectionData;
081 }
082
083 /**
084 * Establishing initial communication Between Client and Broker.
085 * According to specification :
086 * "Communication is established between the management client and management agent using normal AMQP procedures.
087 * The client creates a connection to the broker and then establishes a session with its corresponding channel.
088 * Two private queues are then declared.
089 * A management queue is declared and bound to the qpid.management exchange with "mgmt.#" as routing key; in that
090 * way all management-related messages sent to the exchange will be received by this client.
091 * When a client successfully binds to the qpid.management exchange, the management agent schedules a schema
092 * broadcast to be sent to the exchange.
093 * The agent will publish, via the exchange, a description of the schema for all manageable objects in its control. That
094 * schema is therefore received by this service and it will be part of service's domain model."
095 *
096 * @throws StartupFailureException when this management client cannot perform startup operations due to an error.
097 */
098 void estabilishFirstConnectionWithBroker() throws StartupFailureException{
099 try {
100 connectWithBroker();
101
102 createAndBindMethodReplyQueue();
103 createAndBindManagementQueue();
104
105 registerConsumerOnManagementQueue();
106 registerConsumerOnMethodReplyQueue();
107
108 synchronize();
109 } catch(Exception exception)
110 {
111 try {
112 _service.close();
113 } catch(Exception ignore)
114 {
115 }
116 throw new StartupFailureException(exception);
117 }
118 }
119
120 /**
121 * Shutdown procedure for this management client.
122 */
123 void shutdown ()
124 {
125 LOGGER.info(Messages.QMAN_000011_SHUTDOWN_INITIATED,_domainModel.getBrokerId());
126
127 removeMethodReplyConsumer();
128 destroyAndUnbingMethodReplyQueue();
129
130 removeManagementConsumer();
131 destroyAndUnbingManagementQueue();
132
133 _domainModel.releaseResources();
134
135 _service.close();
136
137 LOGGER.info(Messages.QMAN_000012_MANAGEMENT_CLIENT_SHUT_DOWN,_domainModel.getBrokerId());
138 }
139
140 /**
141 * Registers a consumer (that is, a listener) on the method-reply queue.
142 */
143 private void registerConsumerOnMethodReplyQueue ()
144 {
145 BrokerMessageListener methodReplyChannelListener = new BrokerMessageListener(_domainModel);
146 methodReplyChannelListener.setHandlers(Configuration.getInstance().getMethodReplyQueueHandlers());
147 _service.createSubscription(_methodReplyQueueName, _methodReplyQueueName, methodReplyChannelListener);
148
149 LOGGER.info(Messages.QMAN_000013_METHOD_REPLY_CONSUMER_INSTALLED, _domainModel.getBrokerId());
150 }
151
152 /**
153 * Registers a consumer (listener) on the management queue.
154 */
155 private void registerConsumerOnManagementQueue () throws QpidException
156 {
157 BrokerMessageListener managementChannelListener = new BrokerMessageListener(_domainModel);
158 managementChannelListener.setHandlers(Configuration.getInstance().getManagementQueueHandlers());
159 _service.createSubscription(_managementQueueName, _managementQueueName, managementChannelListener);
160
161 LOGGER.info(Messages.QMAN_000014_MANAGEMENT_CONSUMER_INSTALLED, _domainModel.getBrokerId());
162 }
163
164 /**
165 * Declares a management queue and bound it to the "qpid.management" exchange with "mgmt.#" as routing key;
166 */
167 private void createAndBindManagementQueue ()
168 {
169 _service.declareQueue(_managementQueueName);
170 _service.declareBinding(
171 _managementQueueName,
172 Names.MANAGEMENT_EXCHANGE,
173 Names.MANAGEMENT_ROUTING_KEY);
174
175 LOGGER.info(Messages.QMAN_000015_MANAGEMENT_QUEUE_DECLARED,_managementQueueName,_domainModel.getBrokerId());
176 }
177
178 /**
179 * Declares a private queue for receiving method replies (after method invocations).
180 * This queue is bound to the amq.direct exchange using a routing key equal to the name of the queue.
181 */
182 private void createAndBindMethodReplyQueue ()
183 {
184 _service.declareQueue(_methodReplyQueueName);
185 _service.declareBinding(_methodReplyQueueName, Names.AMQ_DIRECT_QUEUE, _methodReplyQueueName);
186
187 LOGGER.info(Messages.QMAN_000016_METHOD_REPLY_QUEUE_DECLARED,_methodReplyQueueName, _domainModel.getBrokerId());
188 }
189
190 /**
191 * Removes the method-reply queue consumer.
192 */
193 private void removeMethodReplyConsumer()
194 {
195 _service.removeSubscription(_methodReplyQueueName);
196
197 LOGGER.info(Messages.QMAN_000017_CONSUMER_HAS_BEEN_REMOVED,_methodReplyQueueName,_domainModel.getBrokerId());
198 }
199
200 /**
201 * Unbind the method reply queue and after that destroy it from remote broker.
202 */
203 private void destroyAndUnbingMethodReplyQueue()
204 {
205 _service.declareUnbinding(_methodReplyQueueName, Names.AMQ_DIRECT_QUEUE, _methodReplyQueueName);
206 _service.deleteQueue(_methodReplyQueueName);
207
208 LOGGER.info(Messages.QMAN_000018_QUEUE_UNDECLARED,_methodReplyQueueName,_domainModel.getBrokerId());
209 }
210
211 /**
212 * Removes the management queue consumer.
213 */
214 private void removeManagementConsumer()
215 {
216 _service.removeSubscription(_managementQueueName);
217
218 LOGGER.info(Messages.QMAN_000017_CONSUMER_HAS_BEEN_REMOVED,_managementQueueName,_domainModel.getBrokerId());
219 }
220
221 /**
222 * Unbind the management queue and after that destroy it from remote broker.
223 */
224 private void destroyAndUnbingManagementQueue()
225 {
226 _service.declareUnbinding(_managementQueueName, Names.MANAGEMENT_EXCHANGE, Names.MANAGEMENT_ROUTING_KEY);
227 _service.deleteQueue(_managementQueueName);
228
229 LOGGER.info(Messages.QMAN_000018_QUEUE_UNDECLARED, _managementQueueName,_domainModel.getBrokerId());
230 }
231
232 /**
233 * Connects this client with the broker.
234 *
235 * @throws QpidException when it's not possibile to connect with the broker.
236 */
237 private void connectWithBroker() throws Exception
238 {
239 _service.connect();
240 }
241
242 /**
243 * All the Management client commands are asynchronous.
244 * Synchronous behavior is achieved through invoking the sync method.
245 */
246 private void synchronize()
247 {
248 _service.sync();
249 }
250 }
|