001 package org.apache.qpid.nclient.util;
002 /*
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied. See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 *
021 */
022
023
024 import java.io.IOException;
025 import java.nio.ByteBuffer;
026 import java.util.*;
027
028 import org.apache.qpid.transport.DeliveryProperties;
029 import org.apache.qpid.transport.MessageProperties;
030 import org.apache.qpid.transport.Header;
031 import org.apache.qpid.api.Message;
032
033 /**
034 * <p>A Simple implementation of the message interface
035 * for small messages. When the readData methods are called
036 * we assume the message is complete. i.e there want be any
037 * appendData operations after that.</p>
038 *
039 * <p>If you need large message support please see
040 * <code>FileMessage</code> and <code>StreamingMessage</code>
041 * </p>
042 */
043 public class ByteBufferMessage implements Message
044 {
045 private List<ByteBuffer> _data;// = new ArrayList<ByteBuffer>();
046 private ByteBuffer _readBuffer;
047 private int _dataSize;
048 private DeliveryProperties _currentDeliveryProps;
049 private MessageProperties _currentMessageProps;
050 private int _transferId;
051 private Header _header;
052
053 public ByteBufferMessage(MessageProperties messageProperties, DeliveryProperties deliveryProperties)
054 {
055 _currentMessageProps = messageProperties;
056 _currentDeliveryProps = deliveryProperties;
057 }
058
059 public void setHeader(Header header) {
060 _header = header;
061 }
062
063 public Header getHeader() {
064 return _header;
065 }
066
067 public ByteBufferMessage()
068 {
069 _currentDeliveryProps = new DeliveryProperties();
070 _currentMessageProps = new MessageProperties();
071 }
072
073 public ByteBufferMessage(int transferId)
074 {
075 _transferId = transferId;
076 }
077
078 public int getMessageTransferId()
079 {
080 return _transferId;
081 }
082
083 public void clearData()
084 {
085 _data = new LinkedList<ByteBuffer>();
086 _readBuffer = null;
087 }
088
089 public void appendData(byte[] src) throws IOException
090 {
091 appendData(ByteBuffer.wrap(src));
092 }
093
094 /**
095 * write the data from the current position up to the buffer limit
096 */
097 public void appendData(ByteBuffer src) throws IOException
098 {
099 if(_data == null)
100 {
101 _data = Collections.singletonList(src);
102 }
103 else
104 {
105 if(_data.size() == 1)
106 {
107 _data = new ArrayList<ByteBuffer>(_data);
108 }
109 _data.add(src);
110 }
111 _dataSize += src.remaining();
112 }
113
114 public DeliveryProperties getDeliveryProperties()
115 {
116 return _currentDeliveryProps;
117 }
118
119 public MessageProperties getMessageProperties()
120 {
121 return _currentMessageProps;
122 }
123
124 public void setDeliveryProperties(DeliveryProperties props)
125 {
126 _currentDeliveryProps = props;
127 }
128
129 public void setMessageProperties(MessageProperties props)
130 {
131 _currentMessageProps = props;
132 }
133
134 public void readData(byte[] target)
135 {
136 getReadBuffer().get(target);
137 }
138
139 public ByteBuffer readData()
140 {
141 return getReadBuffer();
142 }
143
144 private void buildReadBuffer()
145 {
146 //optimize for the simple cases
147 if(_data.size() == 1)
148 {
149 _readBuffer = _data.get(0).duplicate();
150 }
151 else
152 {
153 _readBuffer = ByteBuffer.allocate(_dataSize);
154 for(ByteBuffer buf:_data)
155 {
156 _readBuffer.put(buf);
157 }
158 _readBuffer.flip();
159 }
160 }
161
162 private ByteBuffer getReadBuffer()
163 {
164 if (_readBuffer != null )
165 {
166 return _readBuffer.slice();
167 }
168 else
169 {
170 if (_data.size() >0)
171 {
172 buildReadBuffer();
173 return _readBuffer.slice();
174 }
175 else
176 {
177 return ByteBuffer.allocate(0);
178 }
179 }
180 }
181
182 //hack for testing
183 @Override public String toString()
184 {
185 ByteBuffer temp = getReadBuffer();
186 byte[] b = new byte[temp.remaining()];
187 temp.get(b);
188 return new String(b);
189 }
190 }
|