TransportConnection.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client.transport;
022 
023 import org.apache.mina.common.IoConnector;
024 import org.apache.mina.common.IoHandlerAdapter;
025 import org.apache.mina.common.IoServiceConfig;
026 import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
027 import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
028 import org.apache.mina.transport.socket.nio.SocketConnector;
029 import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
030 import org.apache.mina.transport.vmpipe.VmPipeAddress;
031 import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
032 import org.apache.qpid.jms.BrokerDetails;
033 import org.apache.qpid.pool.ReadWriteThreadModel;
034 import org.slf4j.Logger;
035 import org.slf4j.LoggerFactory;
036 
037 import java.io.IOException;
038 import java.util.HashMap;
039 import java.util.Map;
040 import java.util.concurrent.ConcurrentHashMap;
041 import java.net.Socket;
042 
043 /**
044  * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
045  * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
046  * protocol events. <p/> Could be extended in future to support different transport types by turning this into concrete
047  * class/interface combo.
048  */
049 public class TransportConnection
050 {
051     private static ITransportConnection _instance;
052 
053     private static Map _inVmPipeAddress = new HashMap();
054     private static VmPipeAcceptor _acceptor;
055     private static int _currentInstance = -1;
056     private static int _currentVMPort = -1;
057 
058     private static final int TCP = 0;
059     private static final int VM = 1;
060     private static final int SOCKET = 2;
061 
062     private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class);
063 
064     private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
065 
066     private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>();
067 
068     public static void registerOpenSocket(String socketID, Socket openSocket)
069     {
070         _openSocketRegister.put(socketID, openSocket);
071     }
072 
073     public static Socket removeOpenSocket(String socketID)
074     {
075         return _openSocketRegister.remove(socketID);
076     }
077 
078     public static synchronized ITransportConnection getInstance(BrokerDetails detailsthrows AMQTransportConnectionException
079     {
080         int transport = getTransport(details.getTransport());
081 
082         if (transport == -1)
083         {
084             throw new AMQNoTransportForProtocolException(details, null, null);
085         }
086 
087         switch (transport)
088         {
089             case SOCKET:
090                 return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
091                 {
092                     public IoConnector newSocketConnector()
093                     {
094                         return new ExistingSocketConnector(1,new QpidThreadExecutor());
095                     }
096                 });
097             case TCP:
098                 return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
099                 {
100                     public IoConnector newSocketConnector()
101                     {
102                         SocketConnector result;
103                         // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
104                         if (Boolean.getBoolean("qpidnio"))
105                         {
106                             _logger.warn("Using Qpid MultiThreaded NIO - " (System.getProperties().containsKey("qpidnio")
107                                                                               "Qpid NIO is new default"
108                                                                               "Sysproperty 'qpidnio' is set"));
109                             result = new MultiThreadSocketConnector(1new QpidThreadExecutor());
110                         }
111                         else
112                         {
113                             _logger.info("Using Mina NIO");
114                             result = new SocketConnector(1new QpidThreadExecutor())// non-blocking connector
115                         }
116                         // Don't have the connector's worker thread wait around for other connections (we only use
117                         // one SocketConnector per connection at the moment anyway). This allows short-running
118                         // clients (like unit tests) to complete quickly.
119                         result.setWorkerTimeout(0);
120                         return result;
121                     }
122                 });
123             case VM:
124             {
125                 return getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
126             }
127             default:
128                 throw new AMQNoTransportForProtocolException(details, "Transport not recognised:" + transport, null);
129         }
130     }
131 
132     private static int getTransport(String transport)
133     {
134         if (transport.equals(BrokerDetails.SOCKET))
135         {
136             return SOCKET;
137         }
138 
139         if (transport.equals(BrokerDetails.TCP))
140         {
141             return TCP;
142         }
143 
144         if (transport.equals(BrokerDetails.VM))
145         {
146             return VM;
147         }
148 
149         return -1;
150     }
151 
152     private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate)
153             throws AMQVMBrokerCreationException
154     {
155         int port = details.getPort();
156 
157         synchronized (_inVmPipeAddress)
158         {
159             if (!_inVmPipeAddress.containsKey(port))
160             {
161                 if (AutoCreate)
162                 {
163                     if (AutoCreate)
164                     {
165                         _logger.warn("Auto Creating InVM Broker on port:" + port);
166                         createVMBroker(port);
167                     }
168                     else
169                     {
170                         throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
171                                                                            " does not exist. Auto create disabled."null);
172                     }
173                 }
174                 else
175                 {
176                     throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
177                                                                        " does not exist. Auto create disabled."null);
178                 }
179             }
180         }
181 
182         return new VmPipeTransportConnection(port);
183     }
184 
185     public static void createVMBroker(int portthrows AMQVMBrokerCreationException
186     {
187         if (_acceptor == null)
188         {
189             _acceptor = new VmPipeAcceptor();
190 
191             IoServiceConfig config = _acceptor.getDefaultConfig();
192 
193             config.setThreadModel(ReadWriteThreadModel.getInstance());
194         }
195         synchronized (_inVmPipeAddress)
196         {
197 
198             if (!_inVmPipeAddress.containsKey(port))
199             {
200                 _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
201                 IoHandlerAdapter provider = null;
202                 try
203                 {
204                     VmPipeAddress pipe = new VmPipeAddress(port);
205 
206                     provider = createBrokerInstance(port);
207 
208                     _acceptor.bind(pipe, provider);
209 
210                     _inVmPipeAddress.put(port, pipe);
211                     _logger.info("Created InVM Qpid.AMQP listening on port " + port);
212                 }
213                 catch (IOException e)
214                 {
215                     _logger.error("Got IOException.", e);
216 
217                     // Try and unbind provider
218                     try
219                     {
220                         VmPipeAddress pipe = new VmPipeAddress(port);
221 
222                         try
223                         {
224                             _acceptor.unbind(pipe);
225                         }
226                         catch (Exception ignore)
227                         {
228                             // ignore
229                         }
230 
231                         if (provider == null)
232                         {
233                             provider = createBrokerInstance(port);
234                         }
235 
236                         _acceptor.bind(pipe, provider);
237                         _inVmPipeAddress.put(port, pipe);
238                         _logger.info("Created InVM Qpid.AMQP listening on port " + port);
239                     }
240                     catch (IOException justUseFirstException)
241                     {
242                         String because;
243                         if (e.getCause() == null)
244                         {
245                             because = e.toString();
246                         }
247                         else
248                         {
249                             because = e.getCause().toString();
250                         }
251 
252                         throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
253                     }
254                 }
255 
256             }
257             else
258             {
259                 _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
260             }
261         }
262     }
263 
264     private static IoHandlerAdapter createBrokerInstance(int portthrows AMQVMBrokerCreationException
265     {
266         String protocolProviderClass = System.getProperty("amqj.protocolprovider.class", DEFAULT_QPID_SERVER);
267         _logger.info("Creating Qpid protocol provider: " + protocolProviderClass);
268 
269         // can't use introspection to get Provider as it is a server class.
270         // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access.
271 
272         // get right constructor and pass in instancec ID - "port"
273         IoHandlerAdapter provider;
274         try
275         {
276             Class[] cnstr = {Integer.class};
277             Object[] params = {port};
278             provider = (IoHandlerAdapterClass.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
279             // Give the broker a second to create
280             _logger.info("Created VMBroker Instance:" + port);
281         }
282         catch (Exception e)
283         {
284             _logger.info("Unable to create InVM Qpid.AMQP on port " + port + ". Because: " + e.getCause());
285             String because;
286             if (e.getCause() == null)
287             {
288                 because = e.toString();
289             }
290             else
291             {
292                 because = e.getCause().toString();
293             }
294 
295             AMQVMBrokerCreationException amqbce =
296                     new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e);
297             throw amqbce;
298         }
299 
300         return provider;
301     }
302 
303     public static void killAllVMBrokers()
304     {
305         _logger.info("Killing all VM Brokers");
306         if (_acceptor != null)
307         {
308             _acceptor.unbindAll();
309         }
310         synchronized (_inVmPipeAddress)
311         {
312             _inVmPipeAddress.clear();
313         }
314         _acceptor = null;
315         _currentInstance = -1;
316         _currentVMPort = -1;
317     }
318 
319     public static void killVMBroker(int port)
320     {
321         synchronized (_inVmPipeAddress)
322         {
323             VmPipeAddress pipe = (VmPipeAddress_inVmPipeAddress.get(port);
324             if (pipe != null)
325             {
326                 _logger.info("Killing VM Broker:" + port);
327                 _inVmPipeAddress.remove(port);
328                 // This does need to be sychronized as otherwise mina can hang
329                 // if a new connection is made
330                 _acceptor.unbind(pipe);
331             }
332         }
333     }
334 
335 }