001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client.transport;
022
023 import org.apache.mina.common.ByteBuffer;
024 import org.apache.mina.common.ConnectFuture;
025 import org.apache.mina.common.IoConnector;
026 import org.apache.mina.common.SimpleByteBufferAllocator;
027 import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
028 import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
029 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
030
031 import org.apache.qpid.client.protocol.AMQProtocolHandler;
032 import org.apache.qpid.jms.BrokerDetails;
033 import org.apache.qpid.pool.ReadWriteThreadModel;
034
035 import org.slf4j.Logger;
036 import org.slf4j.LoggerFactory;
037
038 import java.io.IOException;
039 import java.net.InetSocketAddress;
040 import java.net.Socket;
041 import java.util.Map;
042 import java.util.concurrent.ConcurrentHashMap;
043
044 public class SocketTransportConnection implements ITransportConnection
045 {
046 private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);
047 private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
048
049 private SocketConnectorFactory _socketConnectorFactory;
050
051 static interface SocketConnectorFactory
052 {
053 IoConnector newSocketConnector();
054 }
055
056 public SocketTransportConnection(SocketConnectorFactory socketConnectorFactory)
057 {
058 _socketConnectorFactory = socketConnectorFactory;
059 }
060
061 public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
062 {
063 ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
064
065 // the MINA default is currently to use the pooled allocator although this may change in future
066 // once more testing of the performance of the simple allocator has been done
067 if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
068 {
069 _logger.info("Using SimpleByteBufferAllocator");
070 ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
071 }
072
073 final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
074 SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
075
076 // if we do not use our own thread model we get the MINA default which is to use
077 // its own leader-follower model
078 boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
079 if (readWriteThreading)
080 {
081 cfg.setThreadModel(ReadWriteThreadModel.getInstance());
082 }
083
084 SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
085 scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true")));
086 scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE));
087 _logger.info("send-buffer-size = " + scfg.getSendBufferSize());
088 scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE));
089 _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
090
091 final InetSocketAddress address;
092
093 if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
094 {
095 address = null;
096
097 Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost());
098
099 if (socket != null)
100 {
101 _logger.info("Using existing Socket:" + socket);
102
103 ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
104 }
105 else
106 {
107 throw new IllegalArgumentException("Active Socket must be provided for broker " +
108 "with 'socket://<SocketID>' transport:" + brokerDetail);
109 }
110 }
111 else
112 {
113 address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
114 _logger.info("Attempting connection to " + address);
115 }
116
117
118 ConnectFuture future = ioConnector.connect(address, protocolHandler);
119
120 // wait for connection to complete
121 if (future.join(brokerDetail.getTimeout()))
122 {
123 // we call getSession which throws an IOException if there has been an error connecting
124 future.getSession();
125 }
126 else
127 {
128 throw new IOException("Timeout waiting for connection.");
129 }
130 }
131 }
|