QpidService.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.management.domain.services;
022 
023 import java.io.IOException;
024 import java.util.Map;
025 import java.util.UUID;
026 import java.util.concurrent.ConcurrentHashMap;
027 
028 import org.apache.qpid.QpidException;
029 import org.apache.qpid.api.Message;
030 import org.apache.qpid.management.Messages;
031 import org.apache.qpid.management.Names;
032 import org.apache.qpid.management.configuration.QpidDatasource;
033 import org.apache.qpid.management.domain.model.QpidMethod;
034 import org.apache.qpid.management.domain.model.type.Binary;
035 import org.apache.qpid.management.messages.MethodInvocationRequestMessage;
036 import org.apache.qpid.management.messages.SchemaRequestMessage;
037 import org.apache.qpid.nclient.util.MessageListener;
038 import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
039 import org.apache.qpid.transport.Connection;
040 import org.apache.qpid.transport.MessageAcceptMode;
041 import org.apache.qpid.transport.MessageAcquireMode;
042 import org.apache.qpid.transport.MessageCreditUnit;
043 import org.apache.qpid.transport.MessageTransfer;
044 import org.apache.qpid.transport.Option;
045 import org.apache.qpid.transport.Session;
046 import org.apache.qpid.transport.SessionException;
047 import org.apache.qpid.transport.SessionListener;
048 import org.apache.qpid.transport.util.Logger;
049 
050 /**
051  * Qpid Broker facade.
052  *
053  @author Andrea Gazzarini
054  */
055 public class QpidService implements SessionListener
056 {
057     private final static Logger LOGGER = Logger.get(QpidService.class);
058 
059     private UUID _brokerId;
060     private Connection _connection;
061     private Session _session;
062     private Map<String,MessagePartListenerAdapter> _listeners;
063 
064     /**
065      * Builds a new service with the given connection data.
066      *
067      @param connectionData the connection data of the broker.
068      */
069     public QpidService(UUID brokerId)
070     {
071         this._brokerId = brokerId;
072     }
073 
074     /**
075      * Estabilishes a connection with the broker.
076      *
077      @throws QpidException in case of connection failure.
078      */
079     public void connect() throws Exception
080     {
081         _connection = QpidDatasource.getInstance().getConnection(_brokerId);
082         _listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>();
083         _session = _connection.createSession(0);
084         _session.setSessionListener(this);
085     }
086 
087     public void opened(Session ssn) {}
088 
089     public void message(Session ssn, MessageTransfer xfr)
090     {
091         MessagePartListenerAdapter l = _listeners.get(xfr.getDestination());
092         if (l == null)
093         {
094             LOGGER.error("unhandled message: %s", xfr);
095         }
096         else
097         {
098             l.messageTransfer(xfr);
099         }
100     }
101 
102     
103     public void exception(Session ssn, SessionException exc)
104     {
105         
106     }
107 
108     public void closed(Session ssn) {}
109 
110     /**
111      * All the previously entered outstanding commands are asynchronous.
112      * Synchronous behavior is achieved through invoking this  method.
113      */
114     public void sync()
115     {
116         _session.sync();
117     }
118 
119     /**
120      * Closes communication with broker.
121      */
122     public void close()
123     {
124         try
125         {
126             _session.close();
127             _session = null;
128             _listeners = null;
129         catch (Exception e)
130         {
131         }
132         try
133         {
134             _connection.close();
135             _connection = null;
136         catch (Exception e)
137         {
138         }
139     }
140 
141     /**
142      * Associate a message listener with a destination therefore creating a new subscription.
143      *
144      @param queueName The name of the queue that the subscriber is receiving messages from.
145      @param destinationName the name of the destination, or delivery tag, for the subscriber.
146      @param listener the listener for this destination.
147      *
148      @see Session#messageSubscribe(String, String, short, short, org.apache.qpid.nclient.MessagePartListener, java.util.Map, org.apache.qpid.transport.Option...)
149      */
150     public void createSubscription(String queueName, String destinationName, MessageListener listener)
151     {
152         _listeners.put(destinationName, new MessagePartListenerAdapter(listener));
153         _session.messageSubscribe
154             (queueName,
155              destinationName,
156              MessageAcceptMode.NONE,
157              MessageAcquireMode.PRE_ACQUIRED,
158              null, 0null);
159 
160         _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
161         _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT);
162 
163         LOGGER.debug(Messages.QMAN_200025_SUBSCRIPTION_DECLARED,queueName,destinationName);
164     }
165 
166     /**
167      * Removes a previously declared consumer from the broker.
168      *
169      @param destinationName the name of the destination, or delivery tag, for the subscriber.
170      @see Session#messageCancel(String, Option...)
171      */
172     public void removeSubscription(String destinationName)
173     {
174         _session.messageCancel(destinationName);
175         LOGGER.debug(Messages.QMAN_200026_SUBSCRIPTION_REMOVED,destinationName);
176     }
177 
178     /**
179      * Declares a queue on the broker with the given name.
180      *
181      @param queueName the name of the declared queue.
182      @see Session#queueDeclare(String, String, java.util.Map, Option...)
183      */
184     public void declareQueue(String queueName)
185     {
186         _session.queueDeclare(queueName, null, null);
187         LOGGER.debug(Messages.QMAN_200027_QUEUE_DECLARED,queueName);
188     }
189 
190     /**
191      * Removes the queue with the given name from the broker.
192      *
193      @param queueName the name of the queue.
194      @see Session#queueDelete(String, Option...)
195      */
196     public void deleteQueue(String queueName)
197     {
198         _session.queueDelete(queueName);
199         LOGGER.debug(Messages.QMAN_200028_QUEUE_REMOVED,queueName);
200     }
201 
202     /**
203      * Binds (on the broker) a queue with an exchange.
204      *
205      @param queueName the name of the queue to bind.
206      @param exchangeName the exchange name.
207      @param routingKey the routing key used for the binding.
208      @see Session#exchangeBind(String, String, String, java.util.Map, Option...)
209      */
210     public void declareBinding(String queueName, String exchangeName, String routingKey)
211     {
212         _session.exchangeBind(queueName, exchangeName, routingKey, null);
213         LOGGER.debug(Messages.QMAN_200029_BINDING_DECLARED,routingKey,queueName,exchangeName);
214     }
215 
216     /**
217      * Removes a previously declare binding between an exchange and a queue.
218      *
219      @param queueName the name of the queue.
220      @param exchangeName the name of the exchange.
221      @param routingKey the routing key used for binding.
222      */
223     public void declareUnbinding(String queueName, String exchangeName, String routingKey)
224     {
225         _session.exchangeUnbind(queueName, exchangeName, routingKey);
226         LOGGER.debug(Messages.QMAN_200030_BINDING_REMOVED,routingKey,queueName,exchangeName);
227     }
228 
229     /**
230      * Sends a command message with the given data on the management queue.
231      *
232      @param messageData the command message content.
233      */
234     
235     /**
236      * Requests a schema for the given package.class.hash.
237      
238      @param packageName the package name.
239      @param className the class name.
240      @param schemaHash the schema hash.
241      @throws IOException when the schema request cannot be sent.
242      */
243     public void requestSchema(final String packageName, final String className, final Binary schemaHashthrows IOException
244     {
245         Message message = new SchemaRequestMessage()
246         {
247             @Override
248             protected String className ()
249             {
250                 return className;
251             }
252 
253             @Override
254             protected String packageName ()
255             {
256                 return packageName;
257             }
258 
259             @Override
260             protected Binary schemaHash ()
261             {
262                 return schemaHash;
263             }
264         };
265         
266         sendMessage(message);
267     }
268     
269     /**
270      * Invokes an operation on a broker object instance.
271      
272      @param packageName the package name.
273      @param className the class name.
274      @param schemaHash the schema hash of the corresponding class.
275      @param objectId the object instance identifier.
276      @param parameters the parameters for this invocation.
277      @param method the method (definition) invoked.
278      @param bankId the object bank identifier.
279      @param brokerId the broker identifier.
280      @return the sequence number used for this message.
281      @throws MethodInvocationException when the invoked method returns an error code.
282      @throws UnableToComplyException when it wasn't possibile to invoke the requested operation. 
283      */
284     public void invoke(
285             final String packageName, 
286             final String className, 
287             final Binary schemaHash, 
288             final Binary objectId, 
289             final Object[] parameters, 
290             final QpidMethod method,
291             final int sequenceNumber,
292             final long bankId,
293             final long brokerIdthrows MethodInvocationException, UnableToComplyException 
294     {
295         Message message = new MethodInvocationRequestMessage(bankId, brokerId)
296         {
297             
298             @Override
299             protected int sequenceNumber ()
300             {
301                 return sequenceNumber;
302             }
303             
304             protected Binary objectId() {
305                 return objectId;
306             }
307             
308             protected String packageName()
309             {
310                 return packageName;
311             }
312             
313             protected String className() 
314             {
315                 return className;
316             }
317             
318             @Override
319             protected QpidMethod method ()
320             {
321                 return method;
322             }
323 
324             @Override
325             protected Object[] parameters ()
326             {
327                 return parameters;
328             }
329 
330             @Override
331             protected Binary schemaHash ()
332             {
333                 return schemaHash;
334             }
335         };
336         
337         try {
338             sendMessage(message);
339             sync();
340         catch(Exception exception) {
341             throw new UnableToComplyException(exception);
342         }
343     }     
344     
345     /**
346      * Sends a command message.
347      
348      @param message the command message.
349      @throws IOException when the message cannot be sent.
350      */
351     public void sendMessage(Message messagethrows IOException
352     {
353         _session.messageTransfer(
354                 Names.MANAGEMENT_EXCHANGE,
355                 MessageAcceptMode.EXPLICIT,
356                 MessageAcquireMode.PRE_ACQUIRED,
357                 message.getHeader(),
358                 message.readData());
359     }      
360 }