001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.codec;
022
023 import org.apache.mina.common.ByteBuffer;
024 import org.apache.mina.common.IoSession;
025 import org.apache.mina.common.SimpleByteBufferAllocator;
026 import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
027 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
028
029 import org.apache.qpid.framing.AMQDataBlockDecoder;
030 import org.apache.qpid.framing.ProtocolInitiation;
031
032 /**
033 * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
034 * protocol initiation decoder. It is a cumulative decoder, which means that it can accumulate data to decode in the
035 * buffer until there is enough data to decode.
036 *
037 * <p/>One instance of this class is created per session, so any changes or configuration done at run time to the
038 * decoder will only affect decoding of the protocol session data to which is it bound.
039 *
040 * <p/><table id="crc"><caption>CRC Card</caption>
041 * <tr><th> Responsibilities <th> Collaborations
042 * <tr><td> Delegate protocol initiation to its decoder. <td> {@link ProtocolInitiation.Decoder}
043 * <tr><td> Delegate AMQP data to its decoder. <td> {@link AMQDataBlockDecoder}
044 * <tr><td> Accept notification that protocol initiation has completed.
045 * </table>
046 *
047 * @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
048 * per-session overhead.
049 */
050 public class AMQDecoder extends CumulativeProtocolDecoder
051 {
052
053 private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
054
055 /** Holds the 'normal' AMQP data decoder. */
056 private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
057
058 /** Holds the protocol initiation decoder. */
059 private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder();
060
061 /** Flag to indicate whether this decoder needs to handle protocol initiation. */
062 private boolean _expectProtocolInitiation;
063 private boolean firstDecode = true;
064
065 /**
066 * Creates a new AMQP decoder.
067 *
068 * @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation.
069 */
070 public AMQDecoder(boolean expectProtocolInitiation)
071 {
072 _expectProtocolInitiation = expectProtocolInitiation;
073 }
074
075 /**
076 * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
077 * intiation decoders.
078 *
079 * @param session The Mina session.
080 * @param in The raw byte buffer.
081 * @param out The Mina object output gatherer to write decoded objects to.
082 *
083 * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
084 *
085 * @throws Exception If the data cannot be decoded for any reason.
086 */
087 protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
088 {
089
090 boolean decoded;
091 if (_expectProtocolInitiation
092 || (firstDecode
093 && (in.remaining() > 0)
094 && (in.get(in.position()) == (byte)'A')))
095 {
096 decoded = doDecodePI(session, in, out);
097 }
098 else
099 {
100 decoded = doDecodeDataBlock(session, in, out);
101 }
102 if(firstDecode && decoded)
103 {
104 firstDecode = false;
105 }
106 return decoded;
107 }
108
109 /**
110 * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
111 *
112 * @param session The Mina session.
113 * @param in The raw byte buffer.
114 * @param out The Mina object output gatherer to write decoded objects to.
115 *
116 * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
117 *
118 * @throws Exception If the data cannot be decoded for any reason.
119 */
120 protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
121 {
122 int pos = in.position();
123 boolean enoughData = _dataBlockDecoder.decodable(session, in);
124 in.position(pos);
125 if (!enoughData)
126 {
127 // returning false means it will leave the contents in the buffer and
128 // call us again when more data has been read
129 return false;
130 }
131 else
132 {
133 _dataBlockDecoder.decode(session, in, out);
134
135 return true;
136 }
137 }
138
139 /**
140 * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
141 *
142 * @param session The Mina session.
143 * @param in The raw byte buffer.
144 * @param out The Mina object output gatherer to write decoded objects to.
145 *
146 * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
147 *
148 * @throws Exception If the data cannot be decoded for any reason.
149 */
150 private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
151 {
152 boolean enoughData = _piDecoder.decodable(session, in);
153 if (!enoughData)
154 {
155 // returning false means it will leave the contents in the buffer and
156 // call us again when more data has been read
157 return false;
158 }
159 else
160 {
161 _piDecoder.decode(session, in, out);
162
163 return true;
164 }
165 }
166
167 /**
168 * Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol
169 * initation decoder. This method is expected to be called with <tt>false</tt> once protocol initation completes.
170 *
171 * @param expectProtocolInitiation <tt>true</tt> to use the protocol initiation decoder, <tt>false</tt> to use the
172 * data decoder.
173 */
174 public void setExpectProtocolInitiation(boolean expectProtocolInitiation)
175 {
176 _expectProtocolInitiation = expectProtocolInitiation;
177 }
178
179
180 /**
181 * Cumulates content of <tt>in</tt> into internal buffer and forwards
182 * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
183 * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
184 * and the cumulative buffer is compacted after decoding ends.
185 *
186 * @throws IllegalStateException if your <tt>doDecode()</tt> returned
187 * <tt>true</tt> not consuming the cumulative buffer.
188 */
189 public void decode( IoSession session, ByteBuffer in,
190 ProtocolDecoderOutput out ) throws Exception
191 {
192 ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
193 // if we have a session buffer, append data to that otherwise
194 // use the buffer read from the network directly
195 if( buf != null )
196 {
197 buf.put( in );
198 buf.flip();
199 }
200 else
201 {
202 buf = in;
203 }
204
205 for( ;; )
206 {
207 int oldPos = buf.position();
208 boolean decoded = doDecode( session, buf, out );
209 if( decoded )
210 {
211 if( buf.position() == oldPos )
212 {
213 throw new IllegalStateException(
214 "doDecode() can't return true when buffer is not consumed." );
215 }
216
217 if( !buf.hasRemaining() )
218 {
219 break;
220 }
221 }
222 else
223 {
224 break;
225 }
226 }
227
228 // if there is any data left that cannot be decoded, we store
229 // it in a buffer in the session and next time this decoder is
230 // invoked the session buffer gets appended to
231 if ( buf.hasRemaining() )
232 {
233 storeRemainingInSession( buf, session );
234 }
235 else
236 {
237 removeSessionBuffer( session );
238 }
239 }
240
241 /**
242 * Releases the cumulative buffer used by the specified <tt>session</tt>.
243 * Please don't forget to call <tt>super.dispose( session )</tt> when
244 * you override this method.
245 */
246 public void dispose( IoSession session ) throws Exception
247 {
248 removeSessionBuffer( session );
249 }
250
251 private void removeSessionBuffer(IoSession session)
252 {
253 ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
254 if( buf != null )
255 {
256 buf.release();
257 session.removeAttribute( BUFFER );
258 }
259 }
260
261 private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
262
263 private void storeRemainingInSession(ByteBuffer buf, IoSession session)
264 {
265 ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
266 remainingBuf.setAutoExpand( true );
267 remainingBuf.put( buf );
268 session.setAttribute( BUFFER, remainingBuf );
269 }
270
271 }
|