AMQDecoder.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.codec;
022 
023 import org.apache.mina.common.ByteBuffer;
024 import org.apache.mina.common.IoSession;
025 import org.apache.mina.common.SimpleByteBufferAllocator;
026 import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
027 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
028 
029 import org.apache.qpid.framing.AMQDataBlockDecoder;
030 import org.apache.qpid.framing.ProtocolInitiation;
031 
032 /**
033  * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
034  * protocol initiation decoder. It is a cumulative decoder, which means that it can accumulate data to decode in the
035  * buffer until there is enough data to decode.
036  *
037  <p/>One instance of this class is created per session, so any changes or configuration done at run time to the
038  * decoder will only affect decoding of the protocol session data to which is it bound.
039  *
040  <p/><table id="crc"><caption>CRC Card</caption>
041  <tr><th> Responsibilities <th> Collaborations
042  <tr><td> Delegate protocol initiation to its decoder. <td> {@link ProtocolInitiation.Decoder}
043  <tr><td> Delegate AMQP data to its decoder. <td> {@link AMQDataBlockDecoder}
044  <tr><td> Accept notification that protocol initiation has completed.
045  </table>
046  *
047  * @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
048  *       per-session overhead.
049  */
050 public class AMQDecoder extends CumulativeProtocolDecoder
051 {
052 
053     private static final String BUFFER = AMQDecoder.class.getName() ".Buffer";
054 
055     /** Holds the 'normal' AMQP data decoder. */
056     private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
057 
058     /** Holds the protocol initiation decoder. */
059     private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder();
060 
061     /** Flag to indicate whether this decoder needs to handle protocol initiation. */
062     private boolean _expectProtocolInitiation;
063     private boolean firstDecode = true;
064 
065     /**
066      * Creates a new AMQP decoder.
067      *
068      @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation.
069      */
070     public AMQDecoder(boolean expectProtocolInitiation)
071     {
072         _expectProtocolInitiation = expectProtocolInitiation;
073     }
074 
075     /**
076      * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
077      * intiation decoders.
078      *
079      @param session The Mina session.
080      @param in      The raw byte buffer.
081      @param out     The Mina object output gatherer to write decoded objects to.
082      *
083      @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
084      *
085      @throws Exception If the data cannot be decoded for any reason.
086      */
087     protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput outthrows Exception
088     {
089 
090         boolean decoded;
091         if (_expectProtocolInitiation  
092             || (firstDecode
093                 && (in.remaining() 0)
094                 && (in.get(in.position()) == (byte)'A')))
095         {
096             decoded = doDecodePI(session, in, out);
097         }
098         else
099         {
100             decoded = doDecodeDataBlock(session, in, out);
101         }
102         if(firstDecode && decoded)
103         {
104             firstDecode = false;
105         }
106         return decoded;
107     }
108 
109     /**
110      * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
111      *
112      @param session The Mina session.
113      @param in      The raw byte buffer.
114      @param out     The Mina object output gatherer to write decoded objects to.
115      *
116      @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
117      *
118      @throws Exception If the data cannot be decoded for any reason.
119      */
120     protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput outthrows Exception
121     {
122         int pos = in.position();
123         boolean enoughData = _dataBlockDecoder.decodable(session, in);
124         in.position(pos);
125         if (!enoughData)
126         {
127             // returning false means it will leave the contents in the buffer and
128             // call us again when more data has been read
129             return false;
130         }
131         else
132         {
133             _dataBlockDecoder.decode(session, in, out);
134 
135             return true;
136         }
137     }
138 
139     /**
140      * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
141      *
142      @param session The Mina session.
143      @param in      The raw byte buffer.
144      @param out     The Mina object output gatherer to write decoded objects to.
145      *
146      @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
147      *
148      @throws Exception If the data cannot be decoded for any reason.
149      */
150     private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput outthrows Exception
151     {
152         boolean enoughData = _piDecoder.decodable(session, in);
153         if (!enoughData)
154         {
155             // returning false means it will leave the contents in the buffer and
156             // call us again when more data has been read
157             return false;
158         }
159         else
160         {
161             _piDecoder.decode(session, in, out);
162 
163             return true;
164         }
165     }
166 
167     /**
168      * Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol
169      * initation decoder. This method is expected to be called with <tt>false</tt> once protocol initation completes.
170      *
171      @param expectProtocolInitiation <tt>true</tt> to use the protocol initiation decoder, <tt>false</tt> to use the
172      *                                data decoder.
173      */
174     public void setExpectProtocolInitiation(boolean expectProtocolInitiation)
175     {
176         _expectProtocolInitiation = expectProtocolInitiation;
177     }
178 
179 
180  /**
181      * Cumulates content of <tt>in</tt> into internal buffer and forwards
182      * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
183      <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
184      * and the cumulative buffer is compacted after decoding ends.
185      *
186      @throws IllegalStateException if your <tt>doDecode()</tt> returned
187      *                               <tt>true</tt> not consuming the cumulative buffer.
188      */
189     public void decodeIoSession session, ByteBuffer in,
190                         ProtocolDecoderOutput out throws Exception
191     {
192         ByteBuffer buf = ByteBuffer session.getAttributeBUFFER );
193         // if we have a session buffer, append data to that otherwise
194         // use the buffer read from the network directly
195         ifbuf != null )
196         {
197             buf.putin );
198             buf.flip();
199         }
200         else
201         {
202             buf = in;
203         }
204 
205         for;; )
206         {
207             int oldPos = buf.position();
208             boolean decoded = doDecodesession, buf, out );
209             ifdecoded )
210             {
211                 ifbuf.position() == oldPos )
212                 {
213                     throw new IllegalStateException(
214                             "doDecode() can't return true when buffer is not consumed." );
215                 }
216 
217                 if!buf.hasRemaining() )
218                 {
219                     break;
220                 }
221             }
222             else
223             {
224                 break;
225             }
226         }
227 
228         // if there is any data left that cannot be decoded, we store
229         // it in a buffer in the session and next time this decoder is
230         // invoked the session buffer gets appended to
231         if buf.hasRemaining() )
232         {
233             storeRemainingInSessionbuf, session );
234         }
235         else
236         {
237             removeSessionBuffersession );
238         }
239     }
240 
241     /**
242      * Releases the cumulative buffer used by the specified <tt>session</tt>.
243      * Please don't forget to call <tt>super.dispose( session )</tt> when
244      * you override this method.
245      */
246     public void disposeIoSession session throws Exception
247     {
248         removeSessionBuffersession );
249     }
250 
251     private void removeSessionBuffer(IoSession session)
252     {
253         ByteBuffer buf = ByteBuffer session.getAttributeBUFFER );
254         ifbuf != null )
255         {
256             buf.release();
257             session.removeAttributeBUFFER );
258         }
259     }
260 
261     private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
262 
263     private void storeRemainingInSession(ByteBuffer buf, IoSession session)
264     {
265         ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocatebuf.remaining()false );
266         remainingBuf.setAutoExpandtrue );
267         remainingBuf.putbuf );
268         session.setAttributeBUFFER, remainingBuf );
269     }
270 
271 }