IoTransport.java
001 /*
002  * Licensed to the Apache Software Foundation (ASF) under one
003  * or more contributor license agreements.  See the NOTICE file
004  * distributed with this work for additional information
005  * regarding copyright ownership.  The ASF licenses this file
006  * to you under the Apache License, Version 2.0 (the
007  * "License"); you may not use this file except in compliance
008  * with the License.  You may obtain a copy of the License at
009  *
010  *   http://www.apache.org/licenses/LICENSE-2.0
011  *
012  * Unless required by applicable law or agreed to in writing,
013  * software distributed under the License is distributed on an
014  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015  * KIND, either express or implied.  See the License for the
016  * specific language governing permissions and limitations
017  * under the License.
018  */
019 
020 package org.apache.qpid.transport.network.io;
021 
022 import java.io.IOException;
023 import java.net.InetAddress;
024 import java.net.InetSocketAddress;
025 import java.net.Socket;
026 import java.net.SocketException;
027 import java.nio.ByteBuffer;
028 
029 import javax.net.ssl.SSLContext;
030 import javax.net.ssl.SSLEngine;
031 
032 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
033 import org.apache.qpid.ssl.SSLContextFactory;
034 import org.apache.qpid.transport.Binding;
035 import org.apache.qpid.transport.Connection;
036 import org.apache.qpid.transport.ConnectionDelegate;
037 import org.apache.qpid.transport.Receiver;
038 import org.apache.qpid.transport.Sender;
039 import org.apache.qpid.transport.TransportException;
040 import org.apache.qpid.transport.network.ConnectionBinding;
041 import org.apache.qpid.transport.network.ssl.SSLReceiver;
042 import org.apache.qpid.transport.network.ssl.SSLSender;
043 import org.apache.qpid.transport.util.Logger;
044 
045 /**
046  * This class provides a socket based transport using the java.io
047  * classes.
048  *
049  * The following params are configurable via JVM arguments
050  * TCP_NO_DELAY - amqj.tcpNoDelay
051  * SO_RCVBUF    - amqj.receiveBufferSize
052  * SO_SNDBUF    - amqj.sendBufferSize
053  */
054 public final class IoTransport<E>
055 {
056 
057     static
058     {
059         org.apache.mina.common.ByteBuffer.setAllocator
060             (new org.apache.mina.common.SimpleByteBufferAllocator());
061         org.apache.mina.common.ByteBuffer.setUseDirectBuffers
062             (Boolean.getBoolean("amqj.enableDirectBuffers"));
063     }
064 
065     private static final Logger log = Logger.get(IoTransport.class);
066 
067     private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 1024;
068     private static int readBufferSize = Integer.getInteger
069         ("amqj.receiveBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
070     private static int writeBufferSize = Integer.getInteger
071         ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
072 
073     private Socket socket;
074     private Sender<ByteBuffer> sender;
075     private E endpoint;
076     private IoReceiver receiver;
077     private long timeout = 60000;
078 
079     IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl)
080     {
081         this.socket = socket;
082         
083         if (ssl)
084         {
085             SSLEngine engine = null;
086             SSLContext sslCtx;
087             try
088             {
089                 sslCtx = createSSLContext();
090             }
091             catch (Exception e)
092             {
093                 throw new TransportException("Error creating SSL Context", e);
094             }
095             
096             try
097             {
098                 engine = sslCtx.createSSLEngine();
099                 engine.setUseClientMode(true);
100             }
101             catch(Exception e)
102             {
103                 throw new TransportException("Error creating SSL Engine", e);
104             }
105             
106             this.sender = new SSLSender(engine,new IoSender(this, 2*writeBufferSize, timeout));
107             this.endpoint = binding.endpoint(sender);
108             this.receiver = new IoReceiver(this, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
109                                            2*readBufferSize, timeout);
110         }
111         else
112         {
113             this.sender = new IoSender(this, 2*writeBufferSize, timeout);
114             this.endpoint = binding.endpoint(sender);
115             this.receiver = new IoReceiver(this, binding.receiver(endpoint),
116                                            2*readBufferSize, timeout);
117         }
118     }
119 
120     Sender<ByteBuffer> getSender()
121     {
122         return sender;
123     }
124 
125     IoReceiver getReceiver()
126     {
127         return receiver;
128     }
129 
130     Socket getSocket()
131     {
132         return socket;
133     }
134 
135     public static final <E> E connect(String host, int port,
136                                       Binding<E,ByteBuffer> binding,
137                                       boolean ssl)
138     {
139         Socket socket = createSocket(host, port);
140         IoTransport<E> transport = new IoTransport<E>(socket, binding,ssl);
141         return transport.endpoint;
142     }
143 
144     public static final Connection connect(String host, int port,
145                                            ConnectionDelegate delegate,
146                                            boolean ssl)
147     {
148         return connect(host, port, ConnectionBinding.get(delegate),ssl);
149     }
150 
151     public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port, boolean ssl)
152     {
153         connect(host, port, new Binding_0_9(session),ssl);
154     }
155 
156     private static class Binding_0_9
157         implements Binding<AMQVersionAwareProtocolSession,ByteBuffer>
158     {
159 
160         private AMQVersionAwareProtocolSession session;
161 
162         Binding_0_9(AMQVersionAwareProtocolSession session)
163         {
164             this.session = session;
165         }
166 
167         public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer> sender)
168         {
169             session.setSender(sender);
170             return session;
171         }
172 
173         public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession ssn)
174         {
175             return new InputHandler_0_9(ssn);
176         }
177 
178     }
179 
180     private static Socket createSocket(String host, int port)
181     {
182         try
183         {
184             InetAddress address = InetAddress.getByName(host);
185             Socket socket = new Socket();
186             socket.setReuseAddress(true);
187             socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
188 
189             log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
190             log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
191 
192             socket.setSendBufferSize(writeBufferSize);
193             socket.setReceiveBufferSize(readBufferSize);
194 
195             log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
196             log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
197 
198             socket.connect(new InetSocketAddress(address, port));
199             return socket;
200         }
201         catch (SocketException e)
202         {
203             throw new TransportException("Error connecting to broker", e);
204         }
205         catch (IOException e)
206         {
207             throw new TransportException("Error connecting to broker", e);
208         }
209     }
210     
211     private SSLContext createSSLContext() throws Exception
212     {
213         String trustStorePath = System.getProperty("javax.net.ssl.trustStore");
214         String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");
215         String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509");
216                 
217         String keyStorePath = System.getProperty("javax.net.ssl.keyStore",trustStorePath);
218         String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword",trustStorePassword);
219         String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509");
220         
221         SSLContextFactory sslContextFactory = new SSLContextFactory(trustStorePath,trustStorePassword,
222                                                                     trustStoreCertType,keyStorePath,
223                                                                     keyStorePassword,keyStoreCertType);
224         
225         return sslContextFactory.buildServerContext();
226         
227     }
228 
229 }