AMQDataBlockDecoder.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.framing;
022 
023 import org.apache.mina.common.ByteBuffer;
024 import org.apache.mina.common.IoSession;
025 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
026 
027 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
028 
029 import org.slf4j.Logger;
030 import org.slf4j.LoggerFactory;
031 
032 public class AMQDataBlockDecoder
033 {
034     private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
035 
036     private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
037 
038     static
039     {
040         _bodiesSupported[ContentHeaderBody.TYPE= ContentHeaderBodyFactory.getInstance();
041         _bodiesSupported[ContentBody.TYPE= ContentBodyFactory.getInstance();
042         _bodiesSupported[HeartbeatBody.TYPEnew HeartbeatBodyFactory();
043     }
044 
045     Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class);
046 
047     public AMQDataBlockDecoder()
048     { }
049 
050     public boolean decodable(IoSession session, ByteBuffer inthrows AMQFrameDecodingException
051     {
052         final int remainingAfterAttributes = in.remaining() (1);
053         // type, channel, body length and end byte
054         if (remainingAfterAttributes < 0)
055         {
056             return false;
057         }
058 
059         in.skip(2);
060         final long bodySize = in.getUnsignedInt();
061 
062         return (remainingAfterAttributes >= bodySize);
063 
064     }
065 
066     protected Object createAndPopulateFrame(IoSession session, ByteBuffer in)
067         throws AMQFrameDecodingException, AMQProtocolVersionException
068     {
069         final byte type = in.get();
070 
071         BodyFactory bodyFactory;
072         if (type == AMQMethodBody.TYPE)
073         {
074             bodyFactory = (BodyFactorysession.getAttribute(SESSION_METHOD_BODY_FACTORY);
075             if (bodyFactory == null)
076             {
077                 AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSessionsession.getAttachment();
078                 bodyFactory = new AMQMethodBodyFactory(protocolSession);
079                 session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
080 
081             }
082 
083         }
084         else
085         {
086             bodyFactory = _bodiesSupported[type];
087         }
088 
089         if (bodyFactory == null)
090         {
091             throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
092         }
093 
094         final int channel = in.getUnsignedShort();
095         final long bodySize = in.getUnsignedInt();
096 
097         // bodySize can be zero
098         if ((channel < 0|| (bodySize < 0))
099         {
100             throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel
101                 " bodySize = " + bodySize, null);
102         }
103 
104         AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
105 
106         byte marker = in.get();
107         if ((marker & 0xFF!= 0xCE)
108         {
109             throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
110                 " type=" + type, null);
111         }
112 
113         return frame;
114     }
115 
116     public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput outthrows Exception
117     {
118         out.write(createAndPopulateFrame(session, in));
119     }
120 }