001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.management.domain.services;
022
023 import java.io.IOException;
024 import java.util.Map;
025 import java.util.UUID;
026 import java.util.concurrent.ConcurrentHashMap;
027
028 import org.apache.qpid.QpidException;
029 import org.apache.qpid.api.Message;
030 import org.apache.qpid.management.Messages;
031 import org.apache.qpid.management.Names;
032 import org.apache.qpid.management.configuration.QpidDatasource;
033 import org.apache.qpid.management.domain.model.QpidMethod;
034 import org.apache.qpid.management.domain.model.type.Binary;
035 import org.apache.qpid.management.messages.MethodInvocationRequestMessage;
036 import org.apache.qpid.management.messages.SchemaRequestMessage;
037 import org.apache.qpid.nclient.util.MessageListener;
038 import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
039 import org.apache.qpid.transport.Connection;
040 import org.apache.qpid.transport.MessageAcceptMode;
041 import org.apache.qpid.transport.MessageAcquireMode;
042 import org.apache.qpid.transport.MessageCreditUnit;
043 import org.apache.qpid.transport.MessageTransfer;
044 import org.apache.qpid.transport.Option;
045 import org.apache.qpid.transport.Session;
046 import org.apache.qpid.transport.SessionException;
047 import org.apache.qpid.transport.SessionListener;
048 import org.apache.qpid.transport.util.Logger;
049
050 /**
051 * Qpid Broker facade.
052 *
053 * @author Andrea Gazzarini
054 */
055 public class QpidService implements SessionListener
056 {
057 private final static Logger LOGGER = Logger.get(QpidService.class);
058
059 private UUID _brokerId;
060 private Connection _connection;
061 private Session _session;
062 private Map<String,MessagePartListenerAdapter> _listeners;
063
064 /**
065 * Builds a new service with the given connection data.
066 *
067 * @param connectionData the connection data of the broker.
068 */
069 public QpidService(UUID brokerId)
070 {
071 this._brokerId = brokerId;
072 }
073
074 /**
075 * Estabilishes a connection with the broker.
076 *
077 * @throws QpidException in case of connection failure.
078 */
079 public void connect() throws Exception
080 {
081 _connection = QpidDatasource.getInstance().getConnection(_brokerId);
082 _listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>();
083 _session = _connection.createSession(0);
084 _session.setSessionListener(this);
085 }
086
087 public void opened(Session ssn) {}
088
089 public void message(Session ssn, MessageTransfer xfr)
090 {
091 MessagePartListenerAdapter l = _listeners.get(xfr.getDestination());
092 if (l == null)
093 {
094 LOGGER.error("unhandled message: %s", xfr);
095 }
096 else
097 {
098 l.messageTransfer(xfr);
099 }
100 }
101
102
103 public void exception(Session ssn, SessionException exc)
104 {
105
106 }
107
108 public void closed(Session ssn) {}
109
110 /**
111 * All the previously entered outstanding commands are asynchronous.
112 * Synchronous behavior is achieved through invoking this method.
113 */
114 public void sync()
115 {
116 _session.sync();
117 }
118
119 /**
120 * Closes communication with broker.
121 */
122 public void close()
123 {
124 try
125 {
126 _session.close();
127 _session = null;
128 _listeners = null;
129 } catch (Exception e)
130 {
131 }
132 try
133 {
134 _connection.close();
135 _connection = null;
136 } catch (Exception e)
137 {
138 }
139 }
140
141 /**
142 * Associate a message listener with a destination therefore creating a new subscription.
143 *
144 * @param queueName The name of the queue that the subscriber is receiving messages from.
145 * @param destinationName the name of the destination, or delivery tag, for the subscriber.
146 * @param listener the listener for this destination.
147 *
148 * @see Session#messageSubscribe(String, String, short, short, org.apache.qpid.nclient.MessagePartListener, java.util.Map, org.apache.qpid.transport.Option...)
149 */
150 public void createSubscription(String queueName, String destinationName, MessageListener listener)
151 {
152 _listeners.put(destinationName, new MessagePartListenerAdapter(listener));
153 _session.messageSubscribe
154 (queueName,
155 destinationName,
156 MessageAcceptMode.NONE,
157 MessageAcquireMode.PRE_ACQUIRED,
158 null, 0, null);
159
160 _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
161 _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT);
162
163 LOGGER.debug(Messages.QMAN_200025_SUBSCRIPTION_DECLARED,queueName,destinationName);
164 }
165
166 /**
167 * Removes a previously declared consumer from the broker.
168 *
169 * @param destinationName the name of the destination, or delivery tag, for the subscriber.
170 * @see Session#messageCancel(String, Option...)
171 */
172 public void removeSubscription(String destinationName)
173 {
174 _session.messageCancel(destinationName);
175 LOGGER.debug(Messages.QMAN_200026_SUBSCRIPTION_REMOVED,destinationName);
176 }
177
178 /**
179 * Declares a queue on the broker with the given name.
180 *
181 * @param queueName the name of the declared queue.
182 * @see Session#queueDeclare(String, String, java.util.Map, Option...)
183 */
184 public void declareQueue(String queueName)
185 {
186 _session.queueDeclare(queueName, null, null);
187 LOGGER.debug(Messages.QMAN_200027_QUEUE_DECLARED,queueName);
188 }
189
190 /**
191 * Removes the queue with the given name from the broker.
192 *
193 * @param queueName the name of the queue.
194 * @see Session#queueDelete(String, Option...)
195 */
196 public void deleteQueue(String queueName)
197 {
198 _session.queueDelete(queueName);
199 LOGGER.debug(Messages.QMAN_200028_QUEUE_REMOVED,queueName);
200 }
201
202 /**
203 * Binds (on the broker) a queue with an exchange.
204 *
205 * @param queueName the name of the queue to bind.
206 * @param exchangeName the exchange name.
207 * @param routingKey the routing key used for the binding.
208 * @see Session#exchangeBind(String, String, String, java.util.Map, Option...)
209 */
210 public void declareBinding(String queueName, String exchangeName, String routingKey)
211 {
212 _session.exchangeBind(queueName, exchangeName, routingKey, null);
213 LOGGER.debug(Messages.QMAN_200029_BINDING_DECLARED,routingKey,queueName,exchangeName);
214 }
215
216 /**
217 * Removes a previously declare binding between an exchange and a queue.
218 *
219 * @param queueName the name of the queue.
220 * @param exchangeName the name of the exchange.
221 * @param routingKey the routing key used for binding.
222 */
223 public void declareUnbinding(String queueName, String exchangeName, String routingKey)
224 {
225 _session.exchangeUnbind(queueName, exchangeName, routingKey);
226 LOGGER.debug(Messages.QMAN_200030_BINDING_REMOVED,routingKey,queueName,exchangeName);
227 }
228
229 /**
230 * Sends a command message with the given data on the management queue.
231 *
232 * @param messageData the command message content.
233 */
234
235 /**
236 * Requests a schema for the given package.class.hash.
237 *
238 * @param packageName the package name.
239 * @param className the class name.
240 * @param schemaHash the schema hash.
241 * @throws IOException when the schema request cannot be sent.
242 */
243 public void requestSchema(final String packageName, final String className, final Binary schemaHash) throws IOException
244 {
245 Message message = new SchemaRequestMessage()
246 {
247 @Override
248 protected String className ()
249 {
250 return className;
251 }
252
253 @Override
254 protected String packageName ()
255 {
256 return packageName;
257 }
258
259 @Override
260 protected Binary schemaHash ()
261 {
262 return schemaHash;
263 }
264 };
265
266 sendMessage(message);
267 }
268
269 /**
270 * Invokes an operation on a broker object instance.
271 *
272 * @param packageName the package name.
273 * @param className the class name.
274 * @param schemaHash the schema hash of the corresponding class.
275 * @param objectId the object instance identifier.
276 * @param parameters the parameters for this invocation.
277 * @param method the method (definition) invoked.
278 * @param bankId the object bank identifier.
279 * @param brokerId the broker identifier.
280 * @return the sequence number used for this message.
281 * @throws MethodInvocationException when the invoked method returns an error code.
282 * @throws UnableToComplyException when it wasn't possibile to invoke the requested operation.
283 */
284 public void invoke(
285 final String packageName,
286 final String className,
287 final Binary schemaHash,
288 final Binary objectId,
289 final Object[] parameters,
290 final QpidMethod method,
291 final int sequenceNumber,
292 final long bankId,
293 final long brokerId) throws MethodInvocationException, UnableToComplyException
294 {
295 Message message = new MethodInvocationRequestMessage(bankId, brokerId)
296 {
297
298 @Override
299 protected int sequenceNumber ()
300 {
301 return sequenceNumber;
302 }
303
304 protected Binary objectId() {
305 return objectId;
306 }
307
308 protected String packageName()
309 {
310 return packageName;
311 }
312
313 protected String className()
314 {
315 return className;
316 }
317
318 @Override
319 protected QpidMethod method ()
320 {
321 return method;
322 }
323
324 @Override
325 protected Object[] parameters ()
326 {
327 return parameters;
328 }
329
330 @Override
331 protected Binary schemaHash ()
332 {
333 return schemaHash;
334 }
335 };
336
337 try {
338 sendMessage(message);
339 sync();
340 } catch(Exception exception) {
341 throw new UnableToComplyException(exception);
342 }
343 }
344
345 /**
346 * Sends a command message.
347 *
348 * @param message the command message.
349 * @throws IOException when the message cannot be sent.
350 */
351 public void sendMessage(Message message) throws IOException
352 {
353 _session.messageTransfer(
354 Names.MANAGEMENT_EXCHANGE,
355 MessageAcceptMode.EXPLICIT,
356 MessageAcquireMode.PRE_ACQUIRED,
357 message.getHeader(),
358 message.readData());
359 }
360 }
|