AMQPFastProtocolHandler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.protocol;
022 
023 import java.io.IOException;
024 import java.net.InetSocketAddress;
025 
026 import org.apache.log4j.Logger;
027 import org.apache.mina.common.ByteBuffer;
028 import org.apache.mina.common.IdleStatus;
029 import org.apache.mina.common.IoFilterChain;
030 import org.apache.mina.common.IoHandlerAdapter;
031 import org.apache.mina.common.IoSession;
032 import org.apache.mina.filter.ReadThrottleFilterBuilder;
033 import org.apache.mina.filter.SSLFilter;
034 import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
035 import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
036 import org.apache.mina.filter.executor.ExecutorFilter;
037 import org.apache.mina.util.SessionUtil;
038 import org.apache.qpid.AMQException;
039 import org.apache.qpid.codec.AMQCodecFactory;
040 import org.apache.qpid.framing.AMQDataBlock;
041 import org.apache.qpid.framing.AMQProtocolHeaderException;
042 import org.apache.qpid.framing.AMQShortString;
043 import org.apache.qpid.framing.ConnectionCloseBody;
044 import org.apache.qpid.framing.HeartbeatBody;
045 import org.apache.qpid.framing.MethodRegistry;
046 import org.apache.qpid.framing.ProtocolInitiation;
047 import org.apache.qpid.framing.ProtocolVersion;
048 import org.apache.qpid.server.configuration.ServerConfiguration;
049 import org.apache.qpid.server.registry.ApplicationRegistry;
050 import org.apache.qpid.server.registry.IApplicationRegistry;
051 import org.apache.qpid.ssl.SSLContextFactory;
052 
053 /**
054  * The protocol handler handles "protocol events" for all connections. The state
055  * associated with an individual connection is accessed through the protocol session.
056  *
057  * We delegate all frame (message) processing to the AMQProtocolSession which wraps
058  * the state for the connection.
059  */
060 public class AMQPFastProtocolHandler extends IoHandlerAdapter
061 {
062     private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
063 
064     private final IApplicationRegistry _applicationRegistry;
065 
066     private final int BUFFER_READ_LIMIT_SIZE;
067     private final int BUFFER_WRITE_LIMIT_SIZE;
068 
069     public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
070     {
071         this(ApplicationRegistry.getInstance(applicationRegistryInstance));
072     }
073 
074     public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
075     {
076         _applicationRegistry = applicationRegistry;
077 
078         // Read the configuration from the application registry
079         BUFFER_READ_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferReadLimit();
080         BUFFER_WRITE_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferWriteLimit();
081 
082         _logger.debug("AMQPFastProtocolHandler created");
083     }
084 
085     protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler)
086     {
087         this(handler._applicationRegistry);
088     }
089 
090     public void sessionCreated(IoSession protocolSessionthrows Exception
091     {
092         SessionUtil.initialize(protocolSession);
093         final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
094 
095         createSession(protocolSession, _applicationRegistry, codecFactory);
096         _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
097 
098         final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory);
099         final ServerConfiguration config = _applicationRegistry.getConfiguration();
100         
101         String keystorePath = config.getKeystorePath();
102         String keystorePassword = config.getKeystorePassword();
103         String certType = config.getCertType();
104         SSLContextFactory sslContextFactory = null;
105         boolean isSsl = false
106         if (config.getEnableSSL() && isSSLClient(config, protocolSession))
107         {
108             sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
109             isSsl = true;
110         }
111         if (config.getEnableExecutorPool())
112         {
113             if (isSsl)
114             {
115                 protocolSession.getFilterChain().addAfter("AsynchronousReadFilter""sslFilter",
116                                                           new SSLFilter(sslContextFactory.buildServerContext()));
117             }
118             protocolSession.getFilterChain().addBefore("AsynchronousWriteFilter""protocolFilter", pcf);
119         }
120         else
121         {
122             protocolSession.getFilterChain().addLast("protocolFilter", pcf);
123             if (isSsl)
124             {
125                 protocolSession.getFilterChain().addBefore("protocolFilter""sslFilter",
126                                                            new SSLFilter(sslContextFactory.buildServerContext()));
127             }
128         }
129 
130         if (ApplicationRegistry.getInstance().getConfiguration().getProtectIOEnabled())
131         {
132             try
133             {
134 //        //Add IO Protection Filters
135                 IoFilterChain chain = protocolSession.getFilterChain();
136 
137 
138                 protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder"new ExecutorFilter());
139 
140                 ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
141                 readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE);
142                 readfilter.attach(chain);
143 
144                 WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
145                 writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE);
146                 writefilter.attach(chain);
147 
148                 protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
149                 _logger.info("Using IO Read/Write Filter Protection");
150             }
151             catch (Exception e)
152             {
153                 _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
154             }
155         }
156     }
157 
158     /** Separated into its own, protected, method to allow easier reuse */
159     protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codecthrows AMQException
160     {
161         new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
162     }
163 
164     public void sessionOpened(IoSession protocolSessionthrows Exception
165     {
166         _logger.info("Session opened for:" + protocolSession.getRemoteAddress());
167     }
168 
169     public void sessionClosed(IoSession protocolSessionthrows Exception
170     {
171         _logger.info("Protocol Session closed for:" + protocolSession.getRemoteAddress());
172         final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
173         //fixme  -- this can be null
174         if (amqProtocolSession != null)
175         {
176             try
177             {
178                 amqProtocolSession.closeSession();
179             }
180             catch (AMQException e)
181             {
182                 _logger.error("Caught AMQException whilst closingSession:" + e);
183             }
184         }
185     }
186 
187     public void sessionIdle(IoSession session, IdleStatus statusthrows Exception
188     {
189         _logger.debug("Protocol Session [" this "] idle: " + status + " :for:" + session.getRemoteAddress());
190         if (IdleStatus.WRITER_IDLE.equals(status))
191         {
192             //write heartbeat frame:
193             session.write(HeartbeatBody.FRAME);
194         }
195         else if (IdleStatus.READER_IDLE.equals(status))
196         {
197             //failover:
198             throw new IOException("Timed out while waiting for heartbeat from peer.");
199         }
200 
201     }
202 
203     public void exceptionCaught(IoSession protocolSession, Throwable throwablethrows Exception
204     {
205         AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
206         if (throwable instanceof AMQProtocolHeaderException)
207         {
208 
209             protocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
210 
211             protocolSession.close();
212 
213             _logger.error("Error in protocol initiation " + session + ":" + protocolSession.getRemoteAddress() " :" + throwable.getMessage(), throwable);
214         }
215         else if (throwable instanceof IOException)
216         {
217             _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable);
218         }
219         else
220         {
221             _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
222 
223 
224             MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(session.getProtocolVersion());
225             ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
226                         
227             protocolSession.write(closeBody.generateFrame(0));
228 
229             protocolSession.close();
230         }
231     }
232 
233     /**
234      * Invoked when a message is received on a particular protocol session. Note that a
235      * protocol session is directly tied to a particular physical connection.
236      *
237      @param protocolSession the protocol session that received the message
238      @param message         the message itself (i.e. a decoded frame)
239      *
240      @throws Exception if the message cannot be processed
241      */
242     public void messageReceived(IoSession protocolSession, Object messagethrows Exception
243     {
244         final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
245 
246         if (message instanceof AMQDataBlock)
247         {
248             amqProtocolSession.dataBlockReceived((AMQDataBlockmessage);
249 
250         }
251         else if (message instanceof ByteBuffer)
252         {
253             throw new IllegalStateException("Handed undecoded ByteBuffer buf = " + message);
254         }
255         else
256         {
257             throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() " message = " + message);
258         }
259     }
260 
261     /**
262      * Called after a message has been sent out on a particular protocol session
263      *
264      @param protocolSession the protocol session (i.e. connection) on which this
265      *                        message was sent
266      @param object          the message (frame) that was encoded and sent
267      *
268      @throws Exception if we want to indicate an error
269      */
270     public void messageSent(IoSession protocolSession, Object objectthrows Exception
271     {
272         if (_logger.isDebugEnabled())
273         {
274             _logger.debug("Message sent: " + object);
275         }
276     }
277 
278     protected boolean isSSLClient(ServerConfiguration connectionConfig,
279                                   IoSession protocolSession)
280     {
281         InetSocketAddress addr = (InetSocketAddressprotocolSession.getLocalAddress();
282         return addr.getPort() == connectionConfig.getSSLPort();
283     }
284 }