001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.framing;
022
023 import org.apache.mina.common.ByteBuffer;
024 import org.apache.mina.common.IoSession;
025 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
026
027 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
028
029 import org.slf4j.Logger;
030 import org.slf4j.LoggerFactory;
031
032 public class AMQDataBlockDecoder
033 {
034 private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
035
036 private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
037
038 static
039 {
040 _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
041 _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
042 _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
043 }
044
045 Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class);
046
047 public AMQDataBlockDecoder()
048 { }
049
050 public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException
051 {
052 final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
053 // type, channel, body length and end byte
054 if (remainingAfterAttributes < 0)
055 {
056 return false;
057 }
058
059 in.skip(1 + 2);
060 final long bodySize = in.getUnsignedInt();
061
062 return (remainingAfterAttributes >= bodySize);
063
064 }
065
066 protected Object createAndPopulateFrame(IoSession session, ByteBuffer in)
067 throws AMQFrameDecodingException, AMQProtocolVersionException
068 {
069 final byte type = in.get();
070
071 BodyFactory bodyFactory;
072 if (type == AMQMethodBody.TYPE)
073 {
074 bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
075 if (bodyFactory == null)
076 {
077 AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
078 bodyFactory = new AMQMethodBodyFactory(protocolSession);
079 session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
080
081 }
082
083 }
084 else
085 {
086 bodyFactory = _bodiesSupported[type];
087 }
088
089 if (bodyFactory == null)
090 {
091 throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
092 }
093
094 final int channel = in.getUnsignedShort();
095 final long bodySize = in.getUnsignedInt();
096
097 // bodySize can be zero
098 if ((channel < 0) || (bodySize < 0))
099 {
100 throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel
101 + " bodySize = " + bodySize, null);
102 }
103
104 AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
105
106 byte marker = in.get();
107 if ((marker & 0xFF) != 0xCE)
108 {
109 throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
110 + " type=" + type, null);
111 }
112
113 return frame;
114 }
115
116 public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
117 {
118 out.write(createAndPopulateFrame(session, in));
119 }
120 }
|