TransientAMQMessage.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.queue;
022 
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.framing.AMQBody;
026 import org.apache.qpid.framing.AMQDataBlock;
027 import org.apache.qpid.framing.AMQFrame;
028 import org.apache.qpid.framing.ContentHeaderBody;
029 import org.apache.qpid.framing.abstraction.ContentChunk;
030 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
031 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
032 import org.apache.qpid.server.protocol.AMQProtocolSession;
033 import org.apache.qpid.server.store.StoreContext;
034 
035 import java.util.ArrayList;
036 import java.util.Collections;
037 import java.util.Iterator;
038 import java.util.List;
039 import java.util.concurrent.atomic.AtomicInteger;
040 
041 /** A deliverable message. */
042 public class TransientAMQMessage implements AMQMessage
043 {
044     /** Used for debugging purposes. */
045     protected static final Logger _log = Logger.getLogger(AMQMessage.class);
046 
047     private final AtomicInteger _referenceCount = new AtomicInteger(1);
048 
049     protected ContentHeaderBody _contentHeaderBody;
050 
051     protected MessagePublishInfo _messagePublishInfo;
052 
053     protected List<ContentChunk> _contentBodies;
054 
055     protected long _arrivalTime;
056 
057     protected final Long _messageId;
058 
059     /** Flag to indicate that this message requires 'immediate' delivery. */
060 
061     private static final byte IMMEDIATE = 0x01;
062 
063     /**
064      * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
065      * for messages published with the 'immediate' flag.
066      */
067 
068     private static final byte DELIVERED_TO_CONSUMER = 0x02;
069 
070     private byte _flags = 0;
071 
072     private long _expiration;
073 
074     private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
075     private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
076 
077     /**
078      * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
079      * therefore is memory-efficient.
080      */
081     private class BodyFrameIterator implements Iterator<AMQDataBlock>
082     {
083         private int _channel;
084 
085         private int _index = -1;
086         private AMQProtocolSession _protocolSession;
087 
088         private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
089         {
090             _channel = channel;
091             _protocolSession = protocolSession;
092         }
093 
094         public boolean hasNext()
095         {
096             return _index < (getBodyCount() 1);
097         }
098 
099         public AMQDataBlock next()
100         {
101             AMQBody cb =
102                     getProtocolVersionMethodConverter().convertToBody(getContentChunk(++_index));
103 
104             return new AMQFrame(_channel, cb);
105         }
106 
107         private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
108         {
109             return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
110         }
111 
112         public void remove()
113         {
114             throw new UnsupportedOperationException();
115         }
116     }
117 
118     private class BodyContentIterator implements Iterator<ContentChunk>
119     {
120 
121         private int _index = -1;
122 
123         public boolean hasNext()
124         {
125             return _index < (getBodyCount() 1);
126         }
127 
128         public ContentChunk next()
129         {
130             return getContentChunk(++_index);
131         }
132 
133         public void remove()
134         {
135             throw new UnsupportedOperationException();
136         }
137     }
138 
139     /**
140      * Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message
141      * These all need refactoring to some sort of MockAMQMessageFactory.
142      */
143     @Deprecated
144     protected TransientAMQMessage(AMQMessage messagethrows AMQException
145     {
146         _messageId = message.getMessageId();
147         _flags = ((TransientAMQMessagemessage)._flags;
148         _contentHeaderBody = message.getContentHeaderBody();
149         _messagePublishInfo = message.getMessagePublishInfo();
150     }
151 
152     /**
153      * Normal message creation via the MessageFactory uses this constructor
154      * Package scope limited as MessageFactory should be used
155      *
156      @param messageId
157      *
158      @see MessageFactory
159      */
160     TransientAMQMessage(Long messageId)
161     {
162         _messageId = messageId;
163     }
164 
165     public String debugIdentity()
166     {
167         return "(HC:" + System.identityHashCode(this" ID:" + getMessageId() " Ref:" + _referenceCount.get() ")";
168     }
169 
170     public void setExpiration(final long expiration)
171     {
172         _expiration = expiration;
173     }
174 
175     public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
176     {
177         return new BodyFrameIterator(protocolSession, channel);
178     }
179 
180     public Iterator<ContentChunk> getContentBodyIterator()
181     {
182         return new BodyContentIterator();
183     }
184 
185     public ContentHeaderBody getContentHeaderBody()
186     {
187         return _contentHeaderBody;
188     }
189 
190     public Long getMessageId()
191     {
192         return _messageId;
193     }
194 
195     /**
196      * Called selectors to determin if the message has already been sent
197      *
198      @return _deliveredToConsumer
199      */
200     public boolean getDeliveredToConsumer()
201     {
202         return (_flags & DELIVERED_TO_CONSUMER!= 0;
203     }
204 
205     /**
206      * Called to enforce the 'immediate' flag.
207      *
208      * @returns true if the message is marked for immediate delivery but has not been marked as delivered
209      * to a consumer
210      */
211     public boolean immediateAndNotDelivered()
212     {
213 
214         return (_flags & IMMEDIATE_AND_DELIVERED== IMMEDIATE;
215 
216     }
217 
218     /**
219      * Checks to see if the message has expired. If it has the message is dequeued.
220      *
221      @return true if the message has expire
222      *
223      @throws AMQException
224      */
225     public boolean expired() throws AMQException
226     {
227 
228         if (_expiration != 0L)
229         {
230             long now = System.currentTimeMillis();
231 
232             return (now > _expiration);
233         }
234 
235         return false;
236     }
237 
238     /**
239      * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
240      * And for selector efficiency.
241      */
242     public void setDeliveredToConsumer()
243     {
244         _flags |= DELIVERED_TO_CONSUMER;
245     }
246 
247     public long getSize()
248     {
249         return _contentHeaderBody.bodySize;
250     }
251 
252     public Object getPublisherClientInstance()
253     {
254         return _sessionIdentifier.getSessionInstance();
255     }
256 
257     public Object getPublisherIdentifier()
258     {
259         return _sessionIdentifier.getSessionIdentifier();
260     }
261 
262     public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier)
263     {
264         _sessionIdentifier = sessionIdentifier;
265     }
266 
267     /** From AMQMessageHandle * */
268 
269     public int getBodyCount()
270     {
271         return _contentBodies.size();
272     }
273 
274     public ContentChunk getContentChunk(int index)
275     {
276         if (_contentBodies == null)
277         {
278             throw new RuntimeException("No ContentBody has been set");
279         }
280 
281         if (index > _contentBodies.size() || index < 0)
282         {
283             throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
284                                                (_contentBodies.size() 1));
285         }
286         return _contentBodies.get(index);
287     }
288 
289     public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
290             throws AMQException
291     {
292         if (_contentBodies == null)
293         {
294             if (isLastContentBody)
295             {
296                 _contentBodies = Collections.singletonList(contentChunk);
297             }
298             else
299             {
300                 _contentBodies = new ArrayList<ContentChunk>();
301                 _contentBodies.add(contentChunk);
302             }
303         }
304         else
305         {
306             _contentBodies.add(contentChunk);
307         }
308     }
309 
310     public MessagePublishInfo getMessagePublishInfo()
311     {
312         return _messagePublishInfo;
313     }
314 
315     public boolean isPersistent()
316     {
317         return false;
318     }
319 
320     /**
321      * This is called when all the content has been received.
322      *
323      @param storeContext
324      @param messagePublishInfo
325      @param contentHeaderBody  @throws AMQException
326      */
327     public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
328                                                ContentHeaderBody contentHeaderBody)
329             throws AMQException
330     {
331 
332         if (contentHeaderBody == null)
333         {
334             throw new NullPointerException("HeaderBody cannot be null");
335         }
336 
337         if (messagePublishInfo == null)
338         {
339             throw new NullPointerException("PublishInfo cannot be null");
340         }
341 
342         _messagePublishInfo = messagePublishInfo;
343         _contentHeaderBody = contentHeaderBody;
344 
345         if (contentHeaderBody.bodySize == 0)
346         {
347             _contentBodies = Collections.EMPTY_LIST;
348         }
349 
350         _arrivalTime = System.currentTimeMillis();
351 
352         if (messagePublishInfo.isImmediate())
353         {
354             _flags |= IMMEDIATE;
355         }
356     }
357 
358     public long getArrivalTime()
359     {
360         return _arrivalTime;
361     }
362 
363     public String toString()
364     {
365         // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
366         // _taken + " by :" + _takenBySubcription;
367 
368         return "Message[" + debugIdentity() "]: " + getMessageId() "; ref count: " + _referenceCount;
369     }
370 
371 }