001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.queue;
022
023 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
024 import org.apache.qpid.framing.abstraction.ContentChunk;
025 import org.apache.qpid.framing.ContentHeaderBody;
026 import org.apache.qpid.framing.AMQShortString;
027 import org.apache.qpid.framing.BasicContentHeaderProperties;
028 import org.apache.qpid.server.txn.TransactionalContext;
029 import org.apache.qpid.server.protocol.AMQProtocolSession;
030 import org.apache.qpid.server.registry.ApplicationRegistry;
031 import org.apache.qpid.server.exchange.NoRouteException;
032 import org.apache.qpid.server.exchange.Exchange;
033 import org.apache.qpid.server.transactionlog.TransactionLog;
034 import org.apache.qpid.AMQException;
035 import org.apache.log4j.Logger;
036
037 import java.util.ArrayList;
038
039 public class IncomingMessage implements Filterable<RuntimeException>
040 {
041
042 /** Used for debugging purposes. */
043 private static final Logger _logger = Logger.getLogger(IncomingMessage.class);
044
045 private static final boolean SYNCHED_CLOCKS =
046 ApplicationRegistry.getInstance().getConfiguration().getSynchedClocks();
047
048 private final MessagePublishInfo _messagePublishInfo;
049 private ContentHeaderBody _contentHeaderBody;
050 private AMQMessage _message;
051 private final TransactionalContext _txnContext;
052
053 private static final boolean MSG_AUTH =
054 ApplicationRegistry.getInstance().getConfiguration().getMsgAuth();
055
056
057 /**
058 * Keeps a track of how many bytes we have received in body frames
059 */
060 private long _bodyLengthReceived = 0;
061
062 /**
063 * This is stored during routing, to know the queues to which this message should immediately be
064 * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
065 * by the message handle.
066 */
067 private ArrayList<AMQQueue> _destinationQueues;
068
069 private AMQProtocolSession _publisher;
070 private TransactionLog _messageStore;
071 private long _expiration;
072
073 private Exchange _exchange;
074 private static MessageFactory MESSAGE_FACTORY = MessageFactory.getInstance();
075
076 public IncomingMessage(final MessagePublishInfo info,
077 final TransactionalContext txnContext,
078 final AMQProtocolSession publisher,
079 TransactionLog messasgeStore)
080 {
081 if (publisher == null)
082 {
083 throw new NullPointerException("Message Publisher cannot be null");
084 }
085 _messagePublishInfo = info;
086 _txnContext = txnContext;
087 _publisher = publisher;
088 _messageStore = messasgeStore;
089 }
090
091 public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
092 {
093 _contentHeaderBody = contentHeaderBody;
094 _message = MESSAGE_FACTORY.createMessage(_messageStore, isPersistent());
095 }
096
097 public void setExpiration()
098 {
099 long expiration =
100 ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration();
101 long timestamp =
102 ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp();
103
104 if (SYNCHED_CLOCKS)
105 {
106 _expiration = expiration;
107 }
108 else
109 {
110 // Update TTL to be in broker time.
111 if (expiration != 0L)
112 {
113 if (timestamp != 0L)
114 {
115 // todo perhaps use arrival time
116 long diff = (System.currentTimeMillis() - timestamp);
117
118 if ((diff > 1000L) || (diff < 1000L))
119 {
120 _expiration = expiration + diff;
121 }
122 }
123 }
124 }
125
126 }
127
128 public void routingComplete(final TransactionLog transactionLog) throws AMQException
129 {
130
131 if (isPersistent())
132 {
133 _txnContext.beginTranIfNecessary();
134 // enqueuing the messages ensure that if required the destinations are recorded to a
135 // persistent store
136
137 if(_destinationQueues != null)
138 {
139 for (int i = 0; i < _destinationQueues.size(); i++)
140 {
141 transactionLog.enqueueMessage(_txnContext.getStoreContext(),
142 _destinationQueues.get(i), getMessageId());
143 }
144 }
145 }
146 }
147
148 public AMQMessage deliverToQueues()
149 throws AMQException
150 {
151
152 // we get a reference to the destination queues now so that we can clear the
153 // transient message data as quickly as possible
154 if (_logger.isDebugEnabled())
155 {
156 _logger.debug("Delivering message " + getMessageId() + " to " + _destinationQueues);
157 }
158
159
160 // first we allow the handle to know that the message has been fully received. This is useful if it is
161 // maintaining any calculated values based on content chunks
162 _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody());
163
164
165
166 _message.setExpiration(_expiration);
167 _message.setClientIdentifier(_publisher.getSessionIdentifier());
168
169 // we then allow the transactional context to do something with the message content
170 // now that it has all been received, before we attempt delivery
171 _txnContext.messageFullyReceived(isPersistent());
172
173 AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ?
174 ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null;
175
176 if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
177 {
178 throw new UnauthorizedAccessException("Acccess Refused", _message);
179 }
180
181 if ((_destinationQueues == null) || _destinationQueues.size() == 0)
182 {
183
184 if (isMandatory() || isImmediate())
185 {
186 throw new NoRouteException("No Route for message", _message);
187
188 }
189 else
190 {
191 _logger.warn("MESSAGE DISCARDED: No routes for message - " + _message);
192 }
193 }
194 else
195 {
196 int offset;
197 final int queueCount = _destinationQueues.size();
198 if(queueCount == 1)
199 {
200 offset = 0;
201 }
202 else
203 {
204 offset = ((int)(_message.getMessageId().longValue())) % queueCount;
205 if(offset < 0)
206 {
207 offset = -offset;
208 }
209 }
210 for (int i = offset; i < queueCount; i++)
211 {
212 // normal deliver so add this message at the end.
213 _txnContext.deliver(_destinationQueues.get(i), _message);
214 }
215 for (int i = 0; i < offset; i++)
216 {
217 // normal deliver so add this message at the end.
218 _txnContext.deliver(_destinationQueues.get(i), _message);
219 }
220 }
221
222 return _message;
223
224
225
226 }
227
228 public void addContentBodyFrame(final ContentChunk contentChunk)
229 throws AMQException
230 {
231
232 _bodyLengthReceived += contentChunk.getSize();
233
234 _message.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());
235
236 }
237
238 public boolean allContentReceived()
239 {
240 return (_bodyLengthReceived == getContentHeaderBody().bodySize);
241 }
242
243 public AMQShortString getExchange() throws AMQException
244 {
245 return _messagePublishInfo.getExchange();
246 }
247
248 public AMQShortString getRoutingKey() throws AMQException
249 {
250 return _messagePublishInfo.getRoutingKey();
251 }
252
253 public boolean isMandatory() throws AMQException
254 {
255 return _messagePublishInfo.isMandatory();
256 }
257
258
259 public boolean isImmediate() throws AMQException
260 {
261 return _messagePublishInfo.isImmediate();
262 }
263
264 public ContentHeaderBody getContentHeaderBody()
265 {
266 return _contentHeaderBody;
267 }
268
269
270 public boolean isPersistent()
271 {
272 //todo remove literal values to a constant file such as AMQConstants in common
273 return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
274 ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() == 2;
275 }
276
277 public boolean isRedelivered()
278 {
279 return false;
280 }
281
282 /**
283 * The message ID will not be assigned until the ContentHeaderBody has arrived.
284 * @return
285 */
286 public Long getMessageId()
287 {
288 return _message.getMessageId();
289 }
290
291 public void setExchange(final Exchange e)
292 {
293 _exchange = e;
294 }
295
296 public void route() throws AMQException
297 {
298 _exchange.route(this);
299 }
300
301 public void enqueue(final ArrayList<AMQQueue> queues)
302 {
303 _destinationQueues = queues;
304 }
305 }
|