001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.framing;
022
023 import org.apache.mina.common.ByteBuffer;
024 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
025 import org.apache.qpid.AMQException;
026
027 public class ContentBody implements AMQBody
028 {
029 public static final byte TYPE = 3;
030
031 public ByteBuffer payload;
032
033 public ContentBody()
034 {
035 }
036
037 public ContentBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException
038 {
039 if (size > 0)
040 {
041 payload = buffer.slice();
042 payload.limit((int) size);
043 buffer.skip((int) size);
044 }
045
046 }
047
048
049 public ContentBody(ByteBuffer payload)
050 {
051 this.payload = payload;
052 }
053
054 public byte getFrameType()
055 {
056 return TYPE;
057 }
058
059 public int getSize()
060 {
061 return (payload == null ? 0 : payload.limit());
062 }
063
064 public void writePayload(ByteBuffer buffer)
065 {
066 if (payload != null)
067 {
068 if(payload.isDirect() || payload.isReadOnly())
069 {
070 ByteBuffer copy = payload.duplicate();
071 buffer.put(copy.rewind());
072 }
073 else
074 {
075 buffer.put(payload.array(),payload.arrayOffset(),payload.limit());
076 }
077 }
078 }
079
080 public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
081 throws AMQException
082 {
083 session.contentBodyReceived(channelId, this);
084 }
085
086 protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
087 {
088 if (size > 0)
089 {
090 payload = buffer.slice();
091 payload.limit((int) size);
092 buffer.skip((int) size);
093 }
094
095 }
096
097 public void reduceBufferToFit()
098 {
099 if (payload != null && (payload.remaining() < payload.capacity() / 2))
100 {
101 int size = payload.limit();
102 ByteBuffer newPayload = ByteBuffer.allocate(size);
103
104 newPayload.put(payload);
105 newPayload.flip();
106
107 //reduce reference count on payload
108 payload.release();
109
110 payload = newPayload;
111 }
112 }
113
114
115
116 public static AMQFrame createAMQFrame(int channelId, ContentBody body)
117 {
118 final AMQFrame frame = new AMQFrame(channelId, body);
119 return frame;
120 }
121 }
|