001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.queue;
022
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.framing.AMQBody;
026 import org.apache.qpid.framing.AMQDataBlock;
027 import org.apache.qpid.framing.AMQFrame;
028 import org.apache.qpid.framing.ContentHeaderBody;
029 import org.apache.qpid.framing.abstraction.ContentChunk;
030 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
031 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
032 import org.apache.qpid.server.protocol.AMQProtocolSession;
033 import org.apache.qpid.server.store.StoreContext;
034
035 import java.util.ArrayList;
036 import java.util.Collections;
037 import java.util.Iterator;
038 import java.util.List;
039 import java.util.concurrent.atomic.AtomicInteger;
040
041 /** A deliverable message. */
042 public class TransientAMQMessage implements AMQMessage
043 {
044 /** Used for debugging purposes. */
045 protected static final Logger _log = Logger.getLogger(AMQMessage.class);
046
047 private final AtomicInteger _referenceCount = new AtomicInteger(1);
048
049 protected ContentHeaderBody _contentHeaderBody;
050
051 protected MessagePublishInfo _messagePublishInfo;
052
053 protected List<ContentChunk> _contentBodies;
054
055 protected long _arrivalTime;
056
057 protected final Long _messageId;
058
059 /** Flag to indicate that this message requires 'immediate' delivery. */
060
061 private static final byte IMMEDIATE = 0x01;
062
063 /**
064 * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
065 * for messages published with the 'immediate' flag.
066 */
067
068 private static final byte DELIVERED_TO_CONSUMER = 0x02;
069
070 private byte _flags = 0;
071
072 private long _expiration;
073
074 private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
075 private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
076
077 /**
078 * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
079 * therefore is memory-efficient.
080 */
081 private class BodyFrameIterator implements Iterator<AMQDataBlock>
082 {
083 private int _channel;
084
085 private int _index = -1;
086 private AMQProtocolSession _protocolSession;
087
088 private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
089 {
090 _channel = channel;
091 _protocolSession = protocolSession;
092 }
093
094 public boolean hasNext()
095 {
096 return _index < (getBodyCount() - 1);
097 }
098
099 public AMQDataBlock next()
100 {
101 AMQBody cb =
102 getProtocolVersionMethodConverter().convertToBody(getContentChunk(++_index));
103
104 return new AMQFrame(_channel, cb);
105 }
106
107 private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
108 {
109 return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
110 }
111
112 public void remove()
113 {
114 throw new UnsupportedOperationException();
115 }
116 }
117
118 private class BodyContentIterator implements Iterator<ContentChunk>
119 {
120
121 private int _index = -1;
122
123 public boolean hasNext()
124 {
125 return _index < (getBodyCount() - 1);
126 }
127
128 public ContentChunk next()
129 {
130 return getContentChunk(++_index);
131 }
132
133 public void remove()
134 {
135 throw new UnsupportedOperationException();
136 }
137 }
138
139 /**
140 * Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message
141 * These all need refactoring to some sort of MockAMQMessageFactory.
142 */
143 @Deprecated
144 protected TransientAMQMessage(AMQMessage message) throws AMQException
145 {
146 _messageId = message.getMessageId();
147 _flags = ((TransientAMQMessage) message)._flags;
148 _contentHeaderBody = message.getContentHeaderBody();
149 _messagePublishInfo = message.getMessagePublishInfo();
150 }
151
152 /**
153 * Normal message creation via the MessageFactory uses this constructor
154 * Package scope limited as MessageFactory should be used
155 *
156 * @param messageId
157 *
158 * @see MessageFactory
159 */
160 TransientAMQMessage(Long messageId)
161 {
162 _messageId = messageId;
163 }
164
165 public String debugIdentity()
166 {
167 return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
168 }
169
170 public void setExpiration(final long expiration)
171 {
172 _expiration = expiration;
173 }
174
175 public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
176 {
177 return new BodyFrameIterator(protocolSession, channel);
178 }
179
180 public Iterator<ContentChunk> getContentBodyIterator()
181 {
182 return new BodyContentIterator();
183 }
184
185 public ContentHeaderBody getContentHeaderBody()
186 {
187 return _contentHeaderBody;
188 }
189
190 public Long getMessageId()
191 {
192 return _messageId;
193 }
194
195 /**
196 * Called selectors to determin if the message has already been sent
197 *
198 * @return _deliveredToConsumer
199 */
200 public boolean getDeliveredToConsumer()
201 {
202 return (_flags & DELIVERED_TO_CONSUMER) != 0;
203 }
204
205 /**
206 * Called to enforce the 'immediate' flag.
207 *
208 * @returns true if the message is marked for immediate delivery but has not been marked as delivered
209 * to a consumer
210 */
211 public boolean immediateAndNotDelivered()
212 {
213
214 return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
215
216 }
217
218 /**
219 * Checks to see if the message has expired. If it has the message is dequeued.
220 *
221 * @return true if the message has expire
222 *
223 * @throws AMQException
224 */
225 public boolean expired() throws AMQException
226 {
227
228 if (_expiration != 0L)
229 {
230 long now = System.currentTimeMillis();
231
232 return (now > _expiration);
233 }
234
235 return false;
236 }
237
238 /**
239 * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
240 * And for selector efficiency.
241 */
242 public void setDeliveredToConsumer()
243 {
244 _flags |= DELIVERED_TO_CONSUMER;
245 }
246
247 public long getSize()
248 {
249 return _contentHeaderBody.bodySize;
250 }
251
252 public Object getPublisherClientInstance()
253 {
254 return _sessionIdentifier.getSessionInstance();
255 }
256
257 public Object getPublisherIdentifier()
258 {
259 return _sessionIdentifier.getSessionIdentifier();
260 }
261
262 public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier)
263 {
264 _sessionIdentifier = sessionIdentifier;
265 }
266
267 /** From AMQMessageHandle * */
268
269 public int getBodyCount()
270 {
271 return _contentBodies.size();
272 }
273
274 public ContentChunk getContentChunk(int index)
275 {
276 if (_contentBodies == null)
277 {
278 throw new RuntimeException("No ContentBody has been set");
279 }
280
281 if (index > _contentBodies.size() - 1 || index < 0)
282 {
283 throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
284 (_contentBodies.size() - 1));
285 }
286 return _contentBodies.get(index);
287 }
288
289 public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
290 throws AMQException
291 {
292 if (_contentBodies == null)
293 {
294 if (isLastContentBody)
295 {
296 _contentBodies = Collections.singletonList(contentChunk);
297 }
298 else
299 {
300 _contentBodies = new ArrayList<ContentChunk>();
301 _contentBodies.add(contentChunk);
302 }
303 }
304 else
305 {
306 _contentBodies.add(contentChunk);
307 }
308 }
309
310 public MessagePublishInfo getMessagePublishInfo()
311 {
312 return _messagePublishInfo;
313 }
314
315 public boolean isPersistent()
316 {
317 return false;
318 }
319
320 /**
321 * This is called when all the content has been received.
322 *
323 * @param storeContext
324 * @param messagePublishInfo
325 * @param contentHeaderBody @throws AMQException
326 */
327 public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
328 ContentHeaderBody contentHeaderBody)
329 throws AMQException
330 {
331
332 if (contentHeaderBody == null)
333 {
334 throw new NullPointerException("HeaderBody cannot be null");
335 }
336
337 if (messagePublishInfo == null)
338 {
339 throw new NullPointerException("PublishInfo cannot be null");
340 }
341
342 _messagePublishInfo = messagePublishInfo;
343 _contentHeaderBody = contentHeaderBody;
344
345 if (contentHeaderBody.bodySize == 0)
346 {
347 _contentBodies = Collections.EMPTY_LIST;
348 }
349
350 _arrivalTime = System.currentTimeMillis();
351
352 if (messagePublishInfo.isImmediate())
353 {
354 _flags |= IMMEDIATE;
355 }
356 }
357
358 public long getArrivalTime()
359 {
360 return _arrivalTime;
361 }
362
363 public String toString()
364 {
365 // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
366 // _taken + " by :" + _takenBySubcription;
367
368 return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
369 }
370
371 }
|