001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client.transport;
022
023 import org.apache.mina.common.IoConnector;
024 import org.apache.mina.common.IoHandlerAdapter;
025 import org.apache.mina.common.IoServiceConfig;
026 import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
027 import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
028 import org.apache.mina.transport.socket.nio.SocketConnector;
029 import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
030 import org.apache.mina.transport.vmpipe.VmPipeAddress;
031 import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
032 import org.apache.qpid.jms.BrokerDetails;
033 import org.apache.qpid.pool.ReadWriteThreadModel;
034 import org.slf4j.Logger;
035 import org.slf4j.LoggerFactory;
036
037 import java.io.IOException;
038 import java.util.HashMap;
039 import java.util.Map;
040 import java.util.concurrent.ConcurrentHashMap;
041 import java.net.Socket;
042
043 /**
044 * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
045 * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
046 * protocol events. <p/> Could be extended in future to support different transport types by turning this into concrete
047 * class/interface combo.
048 */
049 public class TransportConnection
050 {
051 private static ITransportConnection _instance;
052
053 private static Map _inVmPipeAddress = new HashMap();
054 private static VmPipeAcceptor _acceptor;
055 private static int _currentInstance = -1;
056 private static int _currentVMPort = -1;
057
058 private static final int TCP = 0;
059 private static final int VM = 1;
060 private static final int SOCKET = 2;
061
062 private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class);
063
064 private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
065
066 private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>();
067
068 public static void registerOpenSocket(String socketID, Socket openSocket)
069 {
070 _openSocketRegister.put(socketID, openSocket);
071 }
072
073 public static Socket removeOpenSocket(String socketID)
074 {
075 return _openSocketRegister.remove(socketID);
076 }
077
078 public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
079 {
080 int transport = getTransport(details.getTransport());
081
082 if (transport == -1)
083 {
084 throw new AMQNoTransportForProtocolException(details, null, null);
085 }
086
087 switch (transport)
088 {
089 case SOCKET:
090 return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
091 {
092 public IoConnector newSocketConnector()
093 {
094 return new ExistingSocketConnector(1,new QpidThreadExecutor());
095 }
096 });
097 case TCP:
098 return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
099 {
100 public IoConnector newSocketConnector()
101 {
102 SocketConnector result;
103 // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
104 if (Boolean.getBoolean("qpidnio"))
105 {
106 _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
107 ? "Qpid NIO is new default"
108 : "Sysproperty 'qpidnio' is set"));
109 result = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
110 }
111 else
112 {
113 _logger.info("Using Mina NIO");
114 result = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector
115 }
116 // Don't have the connector's worker thread wait around for other connections (we only use
117 // one SocketConnector per connection at the moment anyway). This allows short-running
118 // clients (like unit tests) to complete quickly.
119 result.setWorkerTimeout(0);
120 return result;
121 }
122 });
123 case VM:
124 {
125 return getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
126 }
127 default:
128 throw new AMQNoTransportForProtocolException(details, "Transport not recognised:" + transport, null);
129 }
130 }
131
132 private static int getTransport(String transport)
133 {
134 if (transport.equals(BrokerDetails.SOCKET))
135 {
136 return SOCKET;
137 }
138
139 if (transport.equals(BrokerDetails.TCP))
140 {
141 return TCP;
142 }
143
144 if (transport.equals(BrokerDetails.VM))
145 {
146 return VM;
147 }
148
149 return -1;
150 }
151
152 private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate)
153 throws AMQVMBrokerCreationException
154 {
155 int port = details.getPort();
156
157 synchronized (_inVmPipeAddress)
158 {
159 if (!_inVmPipeAddress.containsKey(port))
160 {
161 if (AutoCreate)
162 {
163 if (AutoCreate)
164 {
165 _logger.warn("Auto Creating InVM Broker on port:" + port);
166 createVMBroker(port);
167 }
168 else
169 {
170 throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
171 + " does not exist. Auto create disabled.", null);
172 }
173 }
174 else
175 {
176 throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
177 + " does not exist. Auto create disabled.", null);
178 }
179 }
180 }
181
182 return new VmPipeTransportConnection(port);
183 }
184
185 public static void createVMBroker(int port) throws AMQVMBrokerCreationException
186 {
187 if (_acceptor == null)
188 {
189 _acceptor = new VmPipeAcceptor();
190
191 IoServiceConfig config = _acceptor.getDefaultConfig();
192
193 config.setThreadModel(ReadWriteThreadModel.getInstance());
194 }
195 synchronized (_inVmPipeAddress)
196 {
197
198 if (!_inVmPipeAddress.containsKey(port))
199 {
200 _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
201 IoHandlerAdapter provider = null;
202 try
203 {
204 VmPipeAddress pipe = new VmPipeAddress(port);
205
206 provider = createBrokerInstance(port);
207
208 _acceptor.bind(pipe, provider);
209
210 _inVmPipeAddress.put(port, pipe);
211 _logger.info("Created InVM Qpid.AMQP listening on port " + port);
212 }
213 catch (IOException e)
214 {
215 _logger.error("Got IOException.", e);
216
217 // Try and unbind provider
218 try
219 {
220 VmPipeAddress pipe = new VmPipeAddress(port);
221
222 try
223 {
224 _acceptor.unbind(pipe);
225 }
226 catch (Exception ignore)
227 {
228 // ignore
229 }
230
231 if (provider == null)
232 {
233 provider = createBrokerInstance(port);
234 }
235
236 _acceptor.bind(pipe, provider);
237 _inVmPipeAddress.put(port, pipe);
238 _logger.info("Created InVM Qpid.AMQP listening on port " + port);
239 }
240 catch (IOException justUseFirstException)
241 {
242 String because;
243 if (e.getCause() == null)
244 {
245 because = e.toString();
246 }
247 else
248 {
249 because = e.getCause().toString();
250 }
251
252 throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
253 }
254 }
255
256 }
257 else
258 {
259 _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
260 }
261 }
262 }
263
264 private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException
265 {
266 String protocolProviderClass = System.getProperty("amqj.protocolprovider.class", DEFAULT_QPID_SERVER);
267 _logger.info("Creating Qpid protocol provider: " + protocolProviderClass);
268
269 // can't use introspection to get Provider as it is a server class.
270 // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access.
271
272 // get right constructor and pass in instancec ID - "port"
273 IoHandlerAdapter provider;
274 try
275 {
276 Class[] cnstr = {Integer.class};
277 Object[] params = {port};
278 provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
279 // Give the broker a second to create
280 _logger.info("Created VMBroker Instance:" + port);
281 }
282 catch (Exception e)
283 {
284 _logger.info("Unable to create InVM Qpid.AMQP on port " + port + ". Because: " + e.getCause());
285 String because;
286 if (e.getCause() == null)
287 {
288 because = e.toString();
289 }
290 else
291 {
292 because = e.getCause().toString();
293 }
294
295 AMQVMBrokerCreationException amqbce =
296 new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e);
297 throw amqbce;
298 }
299
300 return provider;
301 }
302
303 public static void killAllVMBrokers()
304 {
305 _logger.info("Killing all VM Brokers");
306 if (_acceptor != null)
307 {
308 _acceptor.unbindAll();
309 }
310 synchronized (_inVmPipeAddress)
311 {
312 _inVmPipeAddress.clear();
313 }
314 _acceptor = null;
315 _currentInstance = -1;
316 _currentVMPort = -1;
317 }
318
319 public static void killVMBroker(int port)
320 {
321 synchronized (_inVmPipeAddress)
322 {
323 VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
324 if (pipe != null)
325 {
326 _logger.info("Killing VM Broker:" + port);
327 _inVmPipeAddress.remove(port);
328 // This does need to be sychronized as otherwise mina can hang
329 // if a new connection is made
330 _acceptor.unbind(pipe);
331 }
332 }
333 }
334
335 }
|