AbstractBytesMessage.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client.message;
022 
023 import java.io.IOException;
024 import java.nio.charset.Charset;
025 
026 import javax.jms.JMSException;
027 import javax.jms.MessageEOFException;
028 
029 import org.apache.mina.common.ByteBuffer;
030 
031 import org.apache.qpid.AMQException;
032 import org.apache.qpid.framing.AMQShortString;
033 import org.apache.qpid.framing.BasicContentHeaderProperties;
034 
035 /**
036  @author Apache Software Foundation
037  */
038 public abstract class AbstractBytesMessage extends AbstractJMSMessage
039 {
040 
041     /**
042      * The default initial size of the buffer. The buffer expands automatically.
043      */
044     private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
045 
046     AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory)
047     {
048         this(delegateFactory, null);
049     }
050 
051     /**
052      * Construct a bytes message with existing data.
053      *
054      @param delegateFactory
055      @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
056      */
057     AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
058     {
059         super(delegateFactory, data)// this instanties a content header
060         setContentType(getMimeType());
061 
062         if (_data == null)
063         {
064             allocateInitialBuffer();
065         }
066     }
067 
068     protected void allocateInitialBuffer()
069     {
070         _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
071         _data.setAutoExpand(true);
072     }
073 
074     AbstractBytesMessage(AMQMessageDelegate delegate, ByteBuffer datathrows AMQException
075      {
076          super(delegate, data);
077          setContentType(getMimeType());
078      }
079 
080 
081     public void clearBodyImpl() throws JMSException
082     {
083         allocateInitialBuffer();
084     }
085 
086     public String toBodyString() throws JMSException
087     {
088         checkReadable();
089         try
090         {
091             return getText();
092         }
093         catch (IOException e)
094         {
095             JMSException jmse = new JMSException(e.toString());
096             jmse.setLinkedException(e);
097             throw jmse;
098         }
099     }
100 
101     /**
102      * We reset the stream before and after reading the data. This means that toString() will always output
103      * the entire message and also that the caller can then immediately start reading as if toString() had
104      * never been called.
105      *
106      @return
107      @throws IOException
108      */
109     private String getText() throws IOException
110     {
111         // this will use the default platform encoding
112         if (_data == null)
113         {
114             return null;
115         }
116 
117         int pos = _data.position();
118         _data.rewind();
119         // one byte left is for the end of frame marker
120         if (_data.remaining() == 0)
121         {
122             // this is really redundant since pos must be zero
123             _data.position(pos);
124 
125             return null;
126         }
127         else
128         {
129             String data = _data.getString(Charset.forName("UTF8").newDecoder());
130             _data.position(pos);
131 
132             return data;
133         }
134     }
135 
136     /**
137      * Check that there is at least a certain number of bytes available to read
138      *
139      @param len the number of bytes
140      @throws javax.jms.MessageEOFException if there are less than len bytes available to read
141      */
142     protected void checkAvailable(int lenthrows MessageEOFException
143     {
144         if (_data.remaining() < len)
145         {
146             throw new MessageEOFException("Unable to read " + len + " bytes");
147         }
148     }
149 }