IncomingMessage.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.queue;
022 
023 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
024 import org.apache.qpid.framing.abstraction.ContentChunk;
025 import org.apache.qpid.framing.ContentHeaderBody;
026 import org.apache.qpid.framing.AMQShortString;
027 import org.apache.qpid.framing.BasicContentHeaderProperties;
028 import org.apache.qpid.server.txn.TransactionalContext;
029 import org.apache.qpid.server.protocol.AMQProtocolSession;
030 import org.apache.qpid.server.registry.ApplicationRegistry;
031 import org.apache.qpid.server.exchange.NoRouteException;
032 import org.apache.qpid.server.exchange.Exchange;
033 import org.apache.qpid.server.transactionlog.TransactionLog;
034 import org.apache.qpid.AMQException;
035 import org.apache.log4j.Logger;
036 
037 import java.util.ArrayList;
038 
039 public class IncomingMessage implements Filterable<RuntimeException>
040 {
041 
042     /** Used for debugging purposes. */
043     private static final Logger _logger = Logger.getLogger(IncomingMessage.class);
044 
045     private static final boolean SYNCHED_CLOCKS =
046             ApplicationRegistry.getInstance().getConfiguration().getSynchedClocks();
047 
048     private final MessagePublishInfo _messagePublishInfo;
049     private ContentHeaderBody _contentHeaderBody;
050     private AMQMessage _message;
051     private final TransactionalContext _txnContext;
052 
053     private static final boolean MSG_AUTH = 
054         ApplicationRegistry.getInstance().getConfiguration().getMsgAuth();
055 
056 
057     /**
058      * Keeps a track of how many bytes we have received in body frames
059      */
060     private long _bodyLengthReceived = 0;
061 
062     /**
063      * This is stored during routing, to know the queues to which this message should immediately be
064      * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
065      * by the message handle.
066      */
067     private ArrayList<AMQQueue> _destinationQueues;
068 
069     private AMQProtocolSession _publisher;
070     private TransactionLog _messageStore;
071     private long _expiration;
072     
073     private Exchange _exchange;
074     private static MessageFactory MESSAGE_FACTORY = MessageFactory.getInstance();
075 
076     public IncomingMessage(final MessagePublishInfo info,
077                            final TransactionalContext txnContext,
078                            final AMQProtocolSession publisher,
079                            TransactionLog messasgeStore)
080     {
081         if (publisher == null)
082         {
083             throw new NullPointerException("Message Publisher cannot be null");
084         }
085         _messagePublishInfo = info;
086         _txnContext = txnContext;
087         _publisher = publisher;
088         _messageStore = messasgeStore;
089     }
090 
091     public void setContentHeaderBody(final ContentHeaderBody contentHeaderBodythrows AMQException
092     {
093         _contentHeaderBody = contentHeaderBody;
094         _message = MESSAGE_FACTORY.createMessage(_messageStore, isPersistent());
095     }
096 
097     public void setExpiration()
098     {
099             long expiration =
100                     ((BasicContentHeaderProperties_contentHeaderBody.properties).getExpiration();
101             long timestamp =
102                     ((BasicContentHeaderProperties_contentHeaderBody.properties).getTimestamp();
103 
104             if (SYNCHED_CLOCKS)
105             {
106                 _expiration = expiration;
107             }
108             else
109             {
110                 // Update TTL to be in broker time.
111                 if (expiration != 0L)
112                 {
113                     if (timestamp != 0L)
114                     {
115                         // todo perhaps use arrival time
116                         long diff = (System.currentTimeMillis() - timestamp);
117 
118                         if ((diff > 1000L|| (diff < 1000L))
119                         {
120                             _expiration = expiration + diff;
121                         }
122                     }
123                 }
124             }
125 
126     }
127 
128     public void routingComplete(final TransactionLog transactionLogthrows AMQException
129     {
130 
131         if (isPersistent())
132         {
133             _txnContext.beginTranIfNecessary();
134              // enqueuing the messages ensure that if required the destinations are recorded to a
135             // persistent store
136 
137             if(_destinationQueues != null)
138             {
139                 for (int i = 0; i < _destinationQueues.size(); i++)
140                 {
141                     transactionLog.enqueueMessage(_txnContext.getStoreContext(),
142                             _destinationQueues.get(i), getMessageId());
143                 }
144             }
145         }
146     }
147 
148     public AMQMessage deliverToQueues()
149             throws AMQException
150     {
151 
152         // we get a reference to the destination queues now so that we can clear the
153         // transient message data as quickly as possible
154         if (_logger.isDebugEnabled())
155         {
156             _logger.debug("Delivering message " + getMessageId() " to " + _destinationQueues);
157         }
158 
159 
160             // first we allow the handle to know that the message has been fully received. This is useful if it is
161             // maintaining any calculated values based on content chunks
162             _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody());
163 
164 
165 
166             _message.setExpiration(_expiration);
167             _message.setClientIdentifier(_publisher.getSessionIdentifier());
168 
169             // we then allow the transactional context to do something with the message content
170             // now that it has all been received, before we attempt delivery
171             _txnContext.messageFullyReceived(isPersistent());
172             
173             AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ?
174                      ((BasicContentHeaderPropertiesgetContentHeaderBody().properties).getUserId() null
175             
176             if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null"" : userID.toString()))
177             {
178                 throw new UnauthorizedAccessException("Acccess Refused", _message);
179             }
180             
181             if ((_destinationQueues == null|| _destinationQueues.size() == 0)
182             {
183 
184                 if (isMandatory() || isImmediate())
185                 {
186                     throw new NoRouteException("No Route for message", _message);
187 
188                 }
189                 else
190                 {
191                     _logger.warn("MESSAGE DISCARDED: No routes for message - " + _message);
192                 }
193             }
194             else
195             {
196                 int offset;
197                 final int queueCount = _destinationQueues.size();
198                 if(queueCount == 1)
199                 {
200                     offset = 0;
201                 }
202                 else
203                 {
204                     offset = ((int)(_message.getMessageId().longValue())) % queueCount;
205                     if(offset < 0)
206                     {
207                         offset = -offset;
208                     }
209                 }
210                 for (int i = offset; i < queueCount; i++)
211                 {
212                     // normal deliver so add this message at the end.
213                     _txnContext.deliver(_destinationQueues.get(i), _message);
214                 }
215                 for (int i = 0; i < offset; i++)
216                 {
217                     // normal deliver so add this message at the end.
218                     _txnContext.deliver(_destinationQueues.get(i), _message);
219                 }
220             }
221 
222             return _message;
223       
224 
225 
226     }
227 
228     public void addContentBodyFrame(final ContentChunk contentChunk)
229             throws AMQException
230     {
231 
232         _bodyLengthReceived += contentChunk.getSize();
233 
234         _message.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());
235 
236     }
237 
238     public boolean allContentReceived()
239     {
240         return (_bodyLengthReceived == getContentHeaderBody().bodySize);
241     }
242 
243     public AMQShortString getExchange() throws AMQException
244     {
245         return _messagePublishInfo.getExchange();
246     }
247 
248     public AMQShortString getRoutingKey() throws AMQException
249     {
250         return _messagePublishInfo.getRoutingKey();
251     }
252 
253     public boolean isMandatory() throws AMQException
254     {
255         return _messagePublishInfo.isMandatory();
256     }
257 
258 
259     public boolean isImmediate() throws AMQException
260     {
261         return _messagePublishInfo.isImmediate();
262     }
263 
264     public ContentHeaderBody getContentHeaderBody()
265     {
266         return _contentHeaderBody;
267     }
268 
269 
270     public boolean isPersistent()
271     {
272         //todo remove literal values to a constant file such as AMQConstants in common
273         return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
274              ((BasicContentHeaderPropertiesgetContentHeaderBody().properties).getDeliveryMode() == 2;
275     }
276     
277     public boolean isRedelivered()
278     {
279         return false;
280     }
281 
282     /**
283      * The message ID will not be assigned until the ContentHeaderBody has arrived.
284      @return
285      */
286     public Long getMessageId()
287     {
288         return _message.getMessageId();
289     }
290 
291     public void setExchange(final Exchange e)
292     {
293         _exchange = e;
294     }
295 
296     public void route() throws AMQException
297     {
298         _exchange.route(this);
299     }
300 
301     public void enqueue(final ArrayList<AMQQueue> queues)
302     {
303         _destinationQueues = queues;
304     }
305 }