001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client.message;
022
023 import java.io.IOException;
024 import java.nio.charset.Charset;
025
026 import javax.jms.JMSException;
027 import javax.jms.MessageEOFException;
028
029 import org.apache.mina.common.ByteBuffer;
030
031 import org.apache.qpid.AMQException;
032 import org.apache.qpid.framing.AMQShortString;
033 import org.apache.qpid.framing.BasicContentHeaderProperties;
034
035 /**
036 * @author Apache Software Foundation
037 */
038 public abstract class AbstractBytesMessage extends AbstractJMSMessage
039 {
040
041 /**
042 * The default initial size of the buffer. The buffer expands automatically.
043 */
044 private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
045
046 AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory)
047 {
048 this(delegateFactory, null);
049 }
050
051 /**
052 * Construct a bytes message with existing data.
053 *
054 * @param delegateFactory
055 * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
056 */
057 AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
058 {
059 super(delegateFactory, data); // this instanties a content header
060 setContentType(getMimeType());
061
062 if (_data == null)
063 {
064 allocateInitialBuffer();
065 }
066 }
067
068 protected void allocateInitialBuffer()
069 {
070 _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
071 _data.setAutoExpand(true);
072 }
073
074 AbstractBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
075 {
076 super(delegate, data);
077 setContentType(getMimeType());
078 }
079
080
081 public void clearBodyImpl() throws JMSException
082 {
083 allocateInitialBuffer();
084 }
085
086 public String toBodyString() throws JMSException
087 {
088 checkReadable();
089 try
090 {
091 return getText();
092 }
093 catch (IOException e)
094 {
095 JMSException jmse = new JMSException(e.toString());
096 jmse.setLinkedException(e);
097 throw jmse;
098 }
099 }
100
101 /**
102 * We reset the stream before and after reading the data. This means that toString() will always output
103 * the entire message and also that the caller can then immediately start reading as if toString() had
104 * never been called.
105 *
106 * @return
107 * @throws IOException
108 */
109 private String getText() throws IOException
110 {
111 // this will use the default platform encoding
112 if (_data == null)
113 {
114 return null;
115 }
116
117 int pos = _data.position();
118 _data.rewind();
119 // one byte left is for the end of frame marker
120 if (_data.remaining() == 0)
121 {
122 // this is really redundant since pos must be zero
123 _data.position(pos);
124
125 return null;
126 }
127 else
128 {
129 String data = _data.getString(Charset.forName("UTF8").newDecoder());
130 _data.position(pos);
131
132 return data;
133 }
134 }
135
136 /**
137 * Check that there is at least a certain number of bytes available to read
138 *
139 * @param len the number of bytes
140 * @throws javax.jms.MessageEOFException if there are less than len bytes available to read
141 */
142 protected void checkAvailable(int len) throws MessageEOFException
143 {
144 if (_data.remaining() < len)
145 {
146 throw new MessageEOFException("Unable to read " + len + " bytes");
147 }
148 }
149 }
|