ContentBody.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.framing;
022 
023 import org.apache.mina.common.ByteBuffer;
024 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
025 import org.apache.qpid.AMQException;
026 
027 public class ContentBody implements AMQBody
028 {
029     public static final byte TYPE = 3;
030 
031     public ByteBuffer payload;
032 
033     public ContentBody()
034     {
035     }
036 
037     public ContentBody(ByteBuffer buffer, long sizethrows AMQFrameDecodingException
038     {
039         if (size > 0)
040         {
041             payload = buffer.slice();
042             payload.limit((intsize);
043             buffer.skip((intsize);
044         }
045 
046     }
047 
048 
049     public ContentBody(ByteBuffer payload)
050     {
051         this.payload = payload;
052     }
053 
054     public byte getFrameType()
055     {
056         return TYPE;
057     }
058 
059     public int getSize()
060     {
061         return (payload == null : payload.limit());
062     }
063 
064     public void writePayload(ByteBuffer buffer)
065     {
066         if (payload != null)
067         {
068             if(payload.isDirect() || payload.isReadOnly())
069             {            
070                 ByteBuffer copy = payload.duplicate();
071                 buffer.put(copy.rewind());
072             }
073             else
074             {
075                 buffer.put(payload.array(),payload.arrayOffset(),payload.limit());
076             }
077         }
078     }
079 
080     public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
081             throws AMQException
082     {
083         session.contentBodyReceived(channelId, this);
084     }
085 
086     protected void populateFromBuffer(ByteBuffer buffer, long sizethrows AMQFrameDecodingException
087     {
088         if (size > 0)
089         {
090             payload = buffer.slice();
091             payload.limit((intsize);
092             buffer.skip((intsize);
093         }
094 
095     }
096 
097     public void reduceBufferToFit()
098     {
099         if (payload != null && (payload.remaining() < payload.capacity() 2))
100         {
101             int size = payload.limit();
102             ByteBuffer newPayload = ByteBuffer.allocate(size);
103 
104             newPayload.put(payload);
105             newPayload.flip();
106 
107             //reduce reference count on payload
108             payload.release();
109 
110             payload = newPayload;
111         }
112     }
113 
114 
115 
116     public static AMQFrame createAMQFrame(int channelId, ContentBody body)
117     {
118         final AMQFrame frame = new AMQFrame(channelId, body);
119         return frame;
120     }
121 }