001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.transport.network.mina;
022
023 import java.io.IOException;
024 import java.net.InetSocketAddress;
025 import java.net.SocketAddress;
026
027 import org.apache.mina.common.*;
028
029 import org.apache.mina.transport.socket.nio.SocketAcceptor;
030 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
031 import org.apache.mina.transport.socket.nio.SocketConnector;
032 import org.apache.mina.filter.ReadThrottleFilterBuilder;
033 import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
034 import org.apache.mina.filter.executor.ExecutorFilter;
035
036 import org.apache.qpid.transport.Binding;
037 import org.apache.qpid.transport.Connection;
038 import org.apache.qpid.transport.ConnectionDelegate;
039 import org.apache.qpid.transport.Receiver;
040 import org.apache.qpid.transport.Sender;
041 import org.apache.qpid.transport.network.ConnectionBinding;
042
043 import org.apache.qpid.transport.util.Logger;
044
045 import org.apache.qpid.transport.network.Assembler;
046 import org.apache.qpid.transport.network.Disassembler;
047 import org.apache.qpid.transport.network.InputHandler;
048
049 import static org.apache.qpid.transport.util.Functions.*;
050
051 /**
052 * MinaHandler
053 *
054 * @author Rafael H. Schloming
055 */
056 //RA making this public until we sort out the package issues
057 public class MinaHandler<E> implements IoHandler
058 {
059 /** Default buffer size for pending messages reads */
060 private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
061 /** Default buffer size for pending messages writes */
062 private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144";
063 private static final int MAX_RCVBUF = 64*1024;
064
065 private static final Logger log = Logger.get(MinaHandler.class);
066
067 static
068 {
069 ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
070 ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
071 }
072
073 private final Binding<E,java.nio.ByteBuffer> binding;
074
075 private MinaHandler(Binding<E,java.nio.ByteBuffer> binding)
076 {
077 this.binding = binding;
078 }
079
080 public void messageReceived(IoSession ssn, Object obj)
081 {
082 Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
083 ByteBuffer buf = (ByteBuffer) obj;
084 try
085 {
086 attachment.receiver.received(buf.buf());
087 }
088 catch (Throwable t)
089 {
090 log.error(t, "exception handling buffer %s", str(buf.buf()));
091 throw new RuntimeException(t);
092 }
093 }
094
095 public void messageSent(IoSession ssn, Object obj)
096 {
097 // do nothing
098 }
099
100 public void exceptionCaught(IoSession ssn, Throwable e)
101 {
102 Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
103 attachment.receiver.exception(e);
104 }
105
106 /**
107 * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
108 * session, which filters the events handled by this handler. The filter chain consists of, handing off events
109 * to an optional protectio
110 *
111 * @param session The MINA session.
112 * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
113 */
114 public void sessionCreated(IoSession session) throws Exception
115 {
116 log.debug("Protocol session created for session " + System.identityHashCode(session));
117
118 if (Boolean.getBoolean("protectio"))
119 {
120 try
121 {
122 //Add IO Protection Filters
123 IoFilterChain chain = session.getFilterChain();
124
125 session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
126
127 ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
128 readfilter.setMaximumConnectionBufferSize(
129 Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT)));
130 readfilter.attach(chain);
131
132 WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
133 writefilter.setMaximumConnectionBufferSize(
134 Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT)));
135 writefilter.attach(chain);
136 session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
137
138 log.info("Using IO Read/Write Filter Protection");
139 }
140 catch (Exception e)
141 {
142 log.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
143 }
144 }
145 }
146
147 public void sessionOpened(final IoSession ssn)
148 {
149 log.debug("opened: %s", this);
150 E endpoint = binding.endpoint(new MinaSender(ssn));
151 Attachment<E> attachment =
152 new Attachment<E>(endpoint, binding.receiver(endpoint));
153
154 // We need to synchronize and notify here because the MINA
155 // connect future returns the session prior to the attachment
156 // being set. This is arguably a bug in MINA.
157 synchronized (ssn)
158 {
159 ssn.setAttachment(attachment);
160 ssn.notifyAll();
161 }
162 }
163
164 public void sessionClosed(IoSession ssn)
165 {
166 log.debug("closed: %s", ssn);
167 Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
168 attachment.receiver.closed();
169 ssn.setAttachment(null);
170 }
171
172 public void sessionIdle(IoSession ssn, IdleStatus status)
173 {
174 // do nothing
175 }
176
177 private static class Attachment<E>
178 {
179
180 E endpoint;
181 Receiver<java.nio.ByteBuffer> receiver;
182
183 Attachment(E endpoint, Receiver<java.nio.ByteBuffer> receiver)
184 {
185 this.endpoint = endpoint;
186 this.receiver = receiver;
187 }
188 }
189
190 public static final void accept(String host, int port,
191 Binding<?,java.nio.ByteBuffer> binding)
192 throws IOException
193 {
194 accept(new InetSocketAddress(host, port), binding);
195 }
196
197 public static final <E> void accept(SocketAddress address,
198 Binding<E,java.nio.ByteBuffer> binding)
199 throws IOException
200 {
201 IoAcceptor acceptor = new SocketAcceptor();
202 acceptor.bind(address, new MinaHandler<E>(binding));
203 }
204
205 public static final <E> E connect(String host, int port,
206 Binding<E,java.nio.ByteBuffer> binding)
207 {
208 return connect(new InetSocketAddress(host, port), binding);
209 }
210
211 public static final <E> E connect(SocketAddress address,
212 Binding<E,java.nio.ByteBuffer> binding)
213 {
214 MinaHandler<E> handler = new MinaHandler<E>(binding);
215 SocketConnector connector = new SocketConnector();
216 IoServiceConfig acceptorConfig = connector.getDefaultConfig();
217 acceptorConfig.setThreadModel(ThreadModel.MANUAL);
218 SocketSessionConfig scfg = (SocketSessionConfig) acceptorConfig.getSessionConfig();
219 scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
220 Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize");
221 if (sendBufferSize != null && sendBufferSize > 0)
222 {
223 scfg.setSendBufferSize(sendBufferSize);
224 }
225 Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize");
226 if (receiveBufferSize != null && receiveBufferSize > 0)
227 {
228 scfg.setReceiveBufferSize(receiveBufferSize);
229 }
230 else if (scfg.getReceiveBufferSize() > MAX_RCVBUF)
231 {
232 scfg.setReceiveBufferSize(MAX_RCVBUF);
233 }
234 connector.setWorkerTimeout(0);
235 ConnectFuture cf = connector.connect(address, handler);
236 cf.join();
237 IoSession ssn = cf.getSession();
238
239 // We need to synchronize and wait here because the MINA
240 // connect future returns the session prior to the attachment
241 // being set. This is arguably a bug in MINA.
242 synchronized (ssn)
243 {
244 while (ssn.getAttachment() == null)
245 {
246 try
247 {
248 ssn.wait();
249 }
250 catch (InterruptedException e)
251 {
252 throw new RuntimeException(e);
253 }
254 }
255 }
256
257 Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
258 return attachment.endpoint;
259 }
260
261 public static final void accept(String host, int port,
262 ConnectionDelegate delegate)
263 throws IOException
264 {
265 accept(host, port, ConnectionBinding.get(delegate));
266 }
267
268 public static final Connection connect(String host, int port,
269 ConnectionDelegate delegate)
270 {
271 return connect(host, port, ConnectionBinding.get(delegate));
272 }
273
274 }
|