001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.protocol;
022
023 import java.io.IOException;
024 import java.net.InetSocketAddress;
025
026 import org.apache.log4j.Logger;
027 import org.apache.mina.common.ByteBuffer;
028 import org.apache.mina.common.IdleStatus;
029 import org.apache.mina.common.IoFilterChain;
030 import org.apache.mina.common.IoHandlerAdapter;
031 import org.apache.mina.common.IoSession;
032 import org.apache.mina.filter.ReadThrottleFilterBuilder;
033 import org.apache.mina.filter.SSLFilter;
034 import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
035 import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
036 import org.apache.mina.filter.executor.ExecutorFilter;
037 import org.apache.mina.util.SessionUtil;
038 import org.apache.qpid.AMQException;
039 import org.apache.qpid.codec.AMQCodecFactory;
040 import org.apache.qpid.framing.AMQDataBlock;
041 import org.apache.qpid.framing.AMQProtocolHeaderException;
042 import org.apache.qpid.framing.AMQShortString;
043 import org.apache.qpid.framing.ConnectionCloseBody;
044 import org.apache.qpid.framing.HeartbeatBody;
045 import org.apache.qpid.framing.MethodRegistry;
046 import org.apache.qpid.framing.ProtocolInitiation;
047 import org.apache.qpid.framing.ProtocolVersion;
048 import org.apache.qpid.server.configuration.ServerConfiguration;
049 import org.apache.qpid.server.registry.ApplicationRegistry;
050 import org.apache.qpid.server.registry.IApplicationRegistry;
051 import org.apache.qpid.ssl.SSLContextFactory;
052
053 /**
054 * The protocol handler handles "protocol events" for all connections. The state
055 * associated with an individual connection is accessed through the protocol session.
056 *
057 * We delegate all frame (message) processing to the AMQProtocolSession which wraps
058 * the state for the connection.
059 */
060 public class AMQPFastProtocolHandler extends IoHandlerAdapter
061 {
062 private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
063
064 private final IApplicationRegistry _applicationRegistry;
065
066 private final int BUFFER_READ_LIMIT_SIZE;
067 private final int BUFFER_WRITE_LIMIT_SIZE;
068
069 public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
070 {
071 this(ApplicationRegistry.getInstance(applicationRegistryInstance));
072 }
073
074 public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
075 {
076 _applicationRegistry = applicationRegistry;
077
078 // Read the configuration from the application registry
079 BUFFER_READ_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferReadLimit();
080 BUFFER_WRITE_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferWriteLimit();
081
082 _logger.debug("AMQPFastProtocolHandler created");
083 }
084
085 protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler)
086 {
087 this(handler._applicationRegistry);
088 }
089
090 public void sessionCreated(IoSession protocolSession) throws Exception
091 {
092 SessionUtil.initialize(protocolSession);
093 final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
094
095 createSession(protocolSession, _applicationRegistry, codecFactory);
096 _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
097
098 final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory);
099 final ServerConfiguration config = _applicationRegistry.getConfiguration();
100
101 String keystorePath = config.getKeystorePath();
102 String keystorePassword = config.getKeystorePassword();
103 String certType = config.getCertType();
104 SSLContextFactory sslContextFactory = null;
105 boolean isSsl = false;
106 if (config.getEnableSSL() && isSSLClient(config, protocolSession))
107 {
108 sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
109 isSsl = true;
110 }
111 if (config.getEnableExecutorPool())
112 {
113 if (isSsl)
114 {
115 protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
116 new SSLFilter(sslContextFactory.buildServerContext()));
117 }
118 protocolSession.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
119 }
120 else
121 {
122 protocolSession.getFilterChain().addLast("protocolFilter", pcf);
123 if (isSsl)
124 {
125 protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
126 new SSLFilter(sslContextFactory.buildServerContext()));
127 }
128 }
129
130 if (ApplicationRegistry.getInstance().getConfiguration().getProtectIOEnabled())
131 {
132 try
133 {
134 // //Add IO Protection Filters
135 IoFilterChain chain = protocolSession.getFilterChain();
136
137
138 protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
139
140 ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
141 readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE);
142 readfilter.attach(chain);
143
144 WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
145 writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE);
146 writefilter.attach(chain);
147
148 protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
149 _logger.info("Using IO Read/Write Filter Protection");
150 }
151 catch (Exception e)
152 {
153 _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
154 }
155 }
156 }
157
158 /** Separated into its own, protected, method to allow easier reuse */
159 protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
160 {
161 new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
162 }
163
164 public void sessionOpened(IoSession protocolSession) throws Exception
165 {
166 _logger.info("Session opened for:" + protocolSession.getRemoteAddress());
167 }
168
169 public void sessionClosed(IoSession protocolSession) throws Exception
170 {
171 _logger.info("Protocol Session closed for:" + protocolSession.getRemoteAddress());
172 final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
173 //fixme -- this can be null
174 if (amqProtocolSession != null)
175 {
176 try
177 {
178 amqProtocolSession.closeSession();
179 }
180 catch (AMQException e)
181 {
182 _logger.error("Caught AMQException whilst closingSession:" + e);
183 }
184 }
185 }
186
187 public void sessionIdle(IoSession session, IdleStatus status) throws Exception
188 {
189 _logger.debug("Protocol Session [" + this + "] idle: " + status + " :for:" + session.getRemoteAddress());
190 if (IdleStatus.WRITER_IDLE.equals(status))
191 {
192 //write heartbeat frame:
193 session.write(HeartbeatBody.FRAME);
194 }
195 else if (IdleStatus.READER_IDLE.equals(status))
196 {
197 //failover:
198 throw new IOException("Timed out while waiting for heartbeat from peer.");
199 }
200
201 }
202
203 public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception
204 {
205 AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
206 if (throwable instanceof AMQProtocolHeaderException)
207 {
208
209 protocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
210
211 protocolSession.close();
212
213 _logger.error("Error in protocol initiation " + session + ":" + protocolSession.getRemoteAddress() + " :" + throwable.getMessage(), throwable);
214 }
215 else if (throwable instanceof IOException)
216 {
217 _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable);
218 }
219 else
220 {
221 _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
222
223
224 MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(session.getProtocolVersion());
225 ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
226
227 protocolSession.write(closeBody.generateFrame(0));
228
229 protocolSession.close();
230 }
231 }
232
233 /**
234 * Invoked when a message is received on a particular protocol session. Note that a
235 * protocol session is directly tied to a particular physical connection.
236 *
237 * @param protocolSession the protocol session that received the message
238 * @param message the message itself (i.e. a decoded frame)
239 *
240 * @throws Exception if the message cannot be processed
241 */
242 public void messageReceived(IoSession protocolSession, Object message) throws Exception
243 {
244 final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
245
246 if (message instanceof AMQDataBlock)
247 {
248 amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
249
250 }
251 else if (message instanceof ByteBuffer)
252 {
253 throw new IllegalStateException("Handed undecoded ByteBuffer buf = " + message);
254 }
255 else
256 {
257 throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message);
258 }
259 }
260
261 /**
262 * Called after a message has been sent out on a particular protocol session
263 *
264 * @param protocolSession the protocol session (i.e. connection) on which this
265 * message was sent
266 * @param object the message (frame) that was encoded and sent
267 *
268 * @throws Exception if we want to indicate an error
269 */
270 public void messageSent(IoSession protocolSession, Object object) throws Exception
271 {
272 if (_logger.isDebugEnabled())
273 {
274 _logger.debug("Message sent: " + object);
275 }
276 }
277
278 protected boolean isSSLClient(ServerConfiguration connectionConfig,
279 IoSession protocolSession)
280 {
281 InetSocketAddress addr = (InetSocketAddress) protocolSession.getLocalAddress();
282 return addr.getPort() == connectionConfig.getSSLPort();
283 }
284 }
|