001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019
020 package org.apache.qpid.transport.network.io;
021
022 import java.io.IOException;
023 import java.net.InetAddress;
024 import java.net.InetSocketAddress;
025 import java.net.Socket;
026 import java.net.SocketException;
027 import java.nio.ByteBuffer;
028
029 import javax.net.ssl.SSLContext;
030 import javax.net.ssl.SSLEngine;
031
032 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
033 import org.apache.qpid.ssl.SSLContextFactory;
034 import org.apache.qpid.transport.Binding;
035 import org.apache.qpid.transport.Connection;
036 import org.apache.qpid.transport.ConnectionDelegate;
037 import org.apache.qpid.transport.Receiver;
038 import org.apache.qpid.transport.Sender;
039 import org.apache.qpid.transport.TransportException;
040 import org.apache.qpid.transport.network.ConnectionBinding;
041 import org.apache.qpid.transport.network.ssl.SSLReceiver;
042 import org.apache.qpid.transport.network.ssl.SSLSender;
043 import org.apache.qpid.transport.util.Logger;
044
045 /**
046 * This class provides a socket based transport using the java.io
047 * classes.
048 *
049 * The following params are configurable via JVM arguments
050 * TCP_NO_DELAY - amqj.tcpNoDelay
051 * SO_RCVBUF - amqj.receiveBufferSize
052 * SO_SNDBUF - amqj.sendBufferSize
053 */
054 public final class IoTransport<E>
055 {
056
057 static
058 {
059 org.apache.mina.common.ByteBuffer.setAllocator
060 (new org.apache.mina.common.SimpleByteBufferAllocator());
061 org.apache.mina.common.ByteBuffer.setUseDirectBuffers
062 (Boolean.getBoolean("amqj.enableDirectBuffers"));
063 }
064
065 private static final Logger log = Logger.get(IoTransport.class);
066
067 private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
068 private static int readBufferSize = Integer.getInteger
069 ("amqj.receiveBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
070 private static int writeBufferSize = Integer.getInteger
071 ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
072
073 private Socket socket;
074 private Sender<ByteBuffer> sender;
075 private E endpoint;
076 private IoReceiver receiver;
077 private long timeout = 60000;
078
079 IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl)
080 {
081 this.socket = socket;
082
083 if (ssl)
084 {
085 SSLEngine engine = null;
086 SSLContext sslCtx;
087 try
088 {
089 sslCtx = createSSLContext();
090 }
091 catch (Exception e)
092 {
093 throw new TransportException("Error creating SSL Context", e);
094 }
095
096 try
097 {
098 engine = sslCtx.createSSLEngine();
099 engine.setUseClientMode(true);
100 }
101 catch(Exception e)
102 {
103 throw new TransportException("Error creating SSL Engine", e);
104 }
105
106 this.sender = new SSLSender(engine,new IoSender(this, 2*writeBufferSize, timeout));
107 this.endpoint = binding.endpoint(sender);
108 this.receiver = new IoReceiver(this, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
109 2*readBufferSize, timeout);
110 }
111 else
112 {
113 this.sender = new IoSender(this, 2*writeBufferSize, timeout);
114 this.endpoint = binding.endpoint(sender);
115 this.receiver = new IoReceiver(this, binding.receiver(endpoint),
116 2*readBufferSize, timeout);
117 }
118 }
119
120 Sender<ByteBuffer> getSender()
121 {
122 return sender;
123 }
124
125 IoReceiver getReceiver()
126 {
127 return receiver;
128 }
129
130 Socket getSocket()
131 {
132 return socket;
133 }
134
135 public static final <E> E connect(String host, int port,
136 Binding<E,ByteBuffer> binding,
137 boolean ssl)
138 {
139 Socket socket = createSocket(host, port);
140 IoTransport<E> transport = new IoTransport<E>(socket, binding,ssl);
141 return transport.endpoint;
142 }
143
144 public static final Connection connect(String host, int port,
145 ConnectionDelegate delegate,
146 boolean ssl)
147 {
148 return connect(host, port, ConnectionBinding.get(delegate),ssl);
149 }
150
151 public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port, boolean ssl)
152 {
153 connect(host, port, new Binding_0_9(session),ssl);
154 }
155
156 private static class Binding_0_9
157 implements Binding<AMQVersionAwareProtocolSession,ByteBuffer>
158 {
159
160 private AMQVersionAwareProtocolSession session;
161
162 Binding_0_9(AMQVersionAwareProtocolSession session)
163 {
164 this.session = session;
165 }
166
167 public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer> sender)
168 {
169 session.setSender(sender);
170 return session;
171 }
172
173 public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession ssn)
174 {
175 return new InputHandler_0_9(ssn);
176 }
177
178 }
179
180 private static Socket createSocket(String host, int port)
181 {
182 try
183 {
184 InetAddress address = InetAddress.getByName(host);
185 Socket socket = new Socket();
186 socket.setReuseAddress(true);
187 socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
188
189 log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
190 log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
191
192 socket.setSendBufferSize(writeBufferSize);
193 socket.setReceiveBufferSize(readBufferSize);
194
195 log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
196 log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
197
198 socket.connect(new InetSocketAddress(address, port));
199 return socket;
200 }
201 catch (SocketException e)
202 {
203 throw new TransportException("Error connecting to broker", e);
204 }
205 catch (IOException e)
206 {
207 throw new TransportException("Error connecting to broker", e);
208 }
209 }
210
211 private SSLContext createSSLContext() throws Exception
212 {
213 String trustStorePath = System.getProperty("javax.net.ssl.trustStore");
214 String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");
215 String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509");
216
217 String keyStorePath = System.getProperty("javax.net.ssl.keyStore",trustStorePath);
218 String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword",trustStorePassword);
219 String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509");
220
221 SSLContextFactory sslContextFactory = new SSLContextFactory(trustStorePath,trustStorePassword,
222 trustStoreCertType,keyStorePath,
223 keyStorePassword,keyStoreCertType);
224
225 return sslContextFactory.buildServerContext();
226
227 }
228
229 }
|