MinaHandler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.transport.network.mina;
022 
023 import java.io.IOException;
024 import java.net.InetSocketAddress;
025 import java.net.SocketAddress;
026 
027 import org.apache.mina.common.*;
028 
029 import org.apache.mina.transport.socket.nio.SocketAcceptor;
030 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
031 import org.apache.mina.transport.socket.nio.SocketConnector;
032 import org.apache.mina.filter.ReadThrottleFilterBuilder;
033 import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
034 import org.apache.mina.filter.executor.ExecutorFilter;
035 
036 import org.apache.qpid.transport.Binding;
037 import org.apache.qpid.transport.Connection;
038 import org.apache.qpid.transport.ConnectionDelegate;
039 import org.apache.qpid.transport.Receiver;
040 import org.apache.qpid.transport.Sender;
041 import org.apache.qpid.transport.network.ConnectionBinding;
042 
043 import org.apache.qpid.transport.util.Logger;
044 
045 import org.apache.qpid.transport.network.Assembler;
046 import org.apache.qpid.transport.network.Disassembler;
047 import org.apache.qpid.transport.network.InputHandler;
048 
049 import static org.apache.qpid.transport.util.Functions.*;
050 
051 /**
052  * MinaHandler
053  *
054  @author Rafael H. Schloming
055  */
056 //RA making this public until we sort out the package issues
057 public class MinaHandler<E> implements IoHandler
058 {
059     /** Default buffer size for pending messages reads */
060     private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
061     /** Default buffer size for pending messages writes */
062     private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144";
063     private static final int MAX_RCVBUF = 64*1024;
064 
065     private static final Logger log = Logger.get(MinaHandler.class);
066 
067     static
068     {
069         ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
070         ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
071     }
072 
073     private final Binding<E,java.nio.ByteBuffer> binding;
074 
075     private MinaHandler(Binding<E,java.nio.ByteBuffer> binding)
076     {
077         this.binding = binding;
078     }
079 
080     public void messageReceived(IoSession ssn, Object obj)
081     {
082         Attachment<E> attachment = (Attachment<E>ssn.getAttachment();
083         ByteBuffer buf = (ByteBufferobj;
084         try
085         {
086             attachment.receiver.received(buf.buf());
087         }
088         catch (Throwable t)
089         {
090             log.error(t, "exception handling buffer %s", str(buf.buf()));
091             throw new RuntimeException(t);
092         }
093     }
094 
095     public void messageSent(IoSession ssn, Object obj)
096     {
097         // do nothing
098     }
099 
100     public void exceptionCaught(IoSession ssn, Throwable e)
101     {
102         Attachment<E> attachment = (Attachment<E>ssn.getAttachment();
103         attachment.receiver.exception(e);
104     }
105 
106     /**
107      * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
108      * session, which filters the events handled by this handler. The filter chain consists of, handing off events
109      * to an optional protectio
110      *
111      @param session The MINA session.
112      @throws Exception Any underlying exceptions are allowed to fall through to MINA.
113      */
114     public void sessionCreated(IoSession sessionthrows Exception
115     {
116         log.debug("Protocol session created for session " + System.identityHashCode(session));
117 
118         if (Boolean.getBoolean("protectio"))
119         {
120             try
121             {
122                 //Add IO Protection Filters
123                 IoFilterChain chain = session.getFilterChain();
124 
125                 session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder"new ExecutorFilter());
126 
127                 ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
128                 readfilter.setMaximumConnectionBufferSize(
129                         Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT)));
130                 readfilter.attach(chain);
131 
132                 WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
133                 writefilter.setMaximumConnectionBufferSize(
134                         Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT)));
135                 writefilter.attach(chain);
136                 session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
137 
138                 log.info("Using IO Read/Write Filter Protection");
139             }
140             catch (Exception e)
141             {
142                 log.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
143             }
144         }
145     }
146 
147     public void sessionOpened(final IoSession ssn)
148     {
149         log.debug("opened: %s"this);
150         E endpoint = binding.endpoint(new MinaSender(ssn));
151         Attachment<E>  attachment =
152             new Attachment<E>(endpoint, binding.receiver(endpoint));
153 
154         // We need to synchronize and notify here because the MINA
155         // connect future returns the session prior to the attachment
156         // being set. This is arguably a bug in MINA.
157         synchronized (ssn)
158         {
159             ssn.setAttachment(attachment);
160             ssn.notifyAll();
161         }
162     }
163 
164     public void sessionClosed(IoSession ssn)
165     {
166         log.debug("closed: %s", ssn);
167         Attachment<E> attachment = (Attachment<E>ssn.getAttachment();
168         attachment.receiver.closed();
169         ssn.setAttachment(null);
170     }
171 
172     public void sessionIdle(IoSession ssn, IdleStatus status)
173     {
174         // do nothing
175     }
176 
177     private static class Attachment<E>
178     {
179 
180         E endpoint;
181         Receiver<java.nio.ByteBuffer> receiver;
182 
183         Attachment(E endpoint, Receiver<java.nio.ByteBuffer> receiver)
184         {
185             this.endpoint = endpoint;
186             this.receiver = receiver;
187         }
188     }
189 
190     public static final void accept(String host, int port,
191                                     Binding<?,java.nio.ByteBuffer> binding)
192         throws IOException
193     {
194         accept(new InetSocketAddress(host, port), binding);
195     }
196 
197     public static final <E> void accept(SocketAddress address,
198                                         Binding<E,java.nio.ByteBuffer> binding)
199         throws IOException
200     {
201         IoAcceptor acceptor = new SocketAcceptor();
202         acceptor.bind(address, new MinaHandler<E>(binding));
203     }
204 
205     public static final <E> E connect(String host, int port,
206                                       Binding<E,java.nio.ByteBuffer> binding)
207     {
208         return connect(new InetSocketAddress(host, port), binding);
209     }
210 
211     public static final <E> E connect(SocketAddress address,
212                                       Binding<E,java.nio.ByteBuffer> binding)
213     {
214         MinaHandler<E> handler = new MinaHandler<E>(binding);
215         SocketConnector connector = new SocketConnector();
216         IoServiceConfig acceptorConfig = connector.getDefaultConfig();
217         acceptorConfig.setThreadModel(ThreadModel.MANUAL);
218         SocketSessionConfig scfg = (SocketSessionConfigacceptorConfig.getSessionConfig();
219         scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
220         Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize");
221         if (sendBufferSize != null && sendBufferSize > 0)
222         {
223             scfg.setSendBufferSize(sendBufferSize);
224         }
225         Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize");
226         if (receiveBufferSize != null && receiveBufferSize > 0)
227         {
228             scfg.setReceiveBufferSize(receiveBufferSize);
229         }
230         else if (scfg.getReceiveBufferSize() > MAX_RCVBUF)
231         {
232             scfg.setReceiveBufferSize(MAX_RCVBUF);
233         }
234         connector.setWorkerTimeout(0);
235         ConnectFuture cf = connector.connect(address, handler);
236         cf.join();
237         IoSession ssn = cf.getSession();
238 
239         // We need to synchronize and wait here because the MINA
240         // connect future returns the session prior to the attachment
241         // being set. This is arguably a bug in MINA.
242         synchronized (ssn)
243         {
244             while (ssn.getAttachment() == null)
245             {
246                 try
247                 {
248                     ssn.wait();
249                 }
250                 catch (InterruptedException e)
251                 {
252                     throw new RuntimeException(e);
253                 }
254             }
255         }
256 
257         Attachment<E> attachment = (Attachment<E>ssn.getAttachment();
258         return attachment.endpoint;
259     }
260 
261     public static final void accept(String host, int port,
262                                     ConnectionDelegate delegate)
263         throws IOException
264     {
265         accept(host, port, ConnectionBinding.get(delegate));
266     }
267 
268     public static final Connection connect(String host, int port,
269                                            ConnectionDelegate delegate)
270     {
271         return connect(host, port, ConnectionBinding.get(delegate));
272     }
273 
274 }