SocketTransportConnection.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client.transport;
022 
023 import org.apache.mina.common.ByteBuffer;
024 import org.apache.mina.common.ConnectFuture;
025 import org.apache.mina.common.IoConnector;
026 import org.apache.mina.common.SimpleByteBufferAllocator;
027 import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
028 import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
029 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
030 
031 import org.apache.qpid.client.protocol.AMQProtocolHandler;
032 import org.apache.qpid.jms.BrokerDetails;
033 import org.apache.qpid.pool.ReadWriteThreadModel;
034 
035 import org.slf4j.Logger;
036 import org.slf4j.LoggerFactory;
037 
038 import java.io.IOException;
039 import java.net.InetSocketAddress;
040 import java.net.Socket;
041 import java.util.Map;
042 import java.util.concurrent.ConcurrentHashMap;
043 
044 public class SocketTransportConnection implements ITransportConnection
045 {
046     private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);
047     private static final int DEFAULT_BUFFER_SIZE = 32 1024;
048 
049     private SocketConnectorFactory _socketConnectorFactory;
050 
051     static interface SocketConnectorFactory
052     {
053         IoConnector newSocketConnector();
054     }
055 
056     public SocketTransportConnection(SocketConnectorFactory socketConnectorFactory)
057     {
058         _socketConnectorFactory = socketConnectorFactory;
059     }
060 
061     public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetailthrows IOException
062     {
063         ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
064 
065         // the MINA default is currently to use the pooled allocator although this may change in future
066         // once more testing of the performance of the simple allocator has been done
067         if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
068         {
069             _logger.info("Using SimpleByteBufferAllocator");
070             ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
071         }
072 
073         final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
074         SocketConnectorConfig cfg = (SocketConnectorConfigioConnector.getDefaultConfig();
075 
076         // if we do not use our own thread model we get the MINA default which is to use
077         // its own leader-follower model
078         boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
079         if (readWriteThreading)
080         {
081             cfg.setThreadModel(ReadWriteThreadModel.getInstance());
082         }
083 
084         SocketSessionConfig scfg = (SocketSessionConfigcfg.getSessionConfig();
085         scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay""true")));
086         scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE));
087         _logger.info("send-buffer-size = " + scfg.getSendBufferSize());
088         scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE));
089         _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
090 
091         final InetSocketAddress address;
092 
093         if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
094         {
095             address = null;
096 
097             Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost());
098 
099             if (socket != null)
100             {
101                 _logger.info("Using existing Socket:" + socket);
102 
103                 ((ExistingSocketConnectorioConnector).setOpenSocket(socket);
104             }
105             else
106             {
107                 throw new IllegalArgumentException("Active Socket must be provided for broker " +
108                                                    "with 'socket://<SocketID>' transport:" + brokerDetail);
109             }
110         }
111         else
112         {
113             address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
114             _logger.info("Attempting connection to " + address);
115         }
116 
117 
118         ConnectFuture future = ioConnector.connect(address, protocolHandler);
119 
120         // wait for connection to complete
121         if (future.join(brokerDetail.getTimeout()))
122         {
123             // we call getSession which throws an IOException if there has been an error connecting
124             future.getSession();
125         }
126         else
127         {
128             throw new IOException("Timeout waiting for connection.");
129         }
130     }
131 }