001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.queue;
022
023 import org.apache.qpid.AMQException;
024 import org.apache.qpid.framing.ContentHeaderBody;
025 import org.apache.qpid.server.store.StoreContext;
026 import org.apache.qpid.server.subscription.Subscription;
027 import org.apache.log4j.Logger;
028
029 import java.util.Set;
030 import java.util.HashSet;
031 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
032 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
033 import java.util.concurrent.CopyOnWriteArraySet;
034
035
036 public class QueueEntryImpl implements QueueEntry
037 {
038
039 /**
040 * Used for debugging purposes.
041 */
042 private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
043
044 private final SimpleQueueEntryList _queueEntryList;
045
046 private AMQMessage _message;
047
048 private boolean _redelivered;
049
050 private Set<Subscription> _rejectedBy = null;
051
052 private volatile EntryState _state = AVAILABLE_STATE;
053
054 private static final
055 AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
056 _stateUpdater =
057 AtomicReferenceFieldUpdater.newUpdater
058 (QueueEntryImpl.class, EntryState.class, "_state");
059
060
061 private volatile Set<StateChangeListener> _stateChangeListeners;
062
063 private static final
064 AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
065 _listenersUpdater =
066 AtomicReferenceFieldUpdater.newUpdater
067 (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
068
069
070 private static final
071 AtomicLongFieldUpdater<QueueEntryImpl>
072 _entryIdUpdater =
073 AtomicLongFieldUpdater.newUpdater
074 (QueueEntryImpl.class, "_entryId");
075
076
077 private volatile long _entryId;
078
079 volatile QueueEntryImpl _next;
080
081
082 QueueEntryImpl(SimpleQueueEntryList queueEntryList)
083 {
084 this(queueEntryList,null,Long.MIN_VALUE);
085 _state = DELETED_STATE;
086 }
087
088
089 public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
090 {
091 _queueEntryList = queueEntryList;
092 _message = message;
093
094 _entryIdUpdater.set(this, entryId);
095 }
096
097 public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
098 {
099 _queueEntryList = queueEntryList;
100 _message = message;
101 }
102
103 protected void setEntryId(long entryId)
104 {
105 _entryIdUpdater.set(this, entryId);
106 }
107
108 protected long getEntryId()
109 {
110 return _entryId;
111 }
112
113 public AMQQueue getQueue()
114 {
115 return _queueEntryList.getQueue();
116 }
117
118 public AMQMessage getMessage()
119 {
120 return _message;
121 }
122
123 public long getSize()
124 {
125 return getMessage().getSize();
126 }
127
128 public boolean getDeliveredToConsumer()
129 {
130 return getMessage().getDeliveredToConsumer();
131 }
132
133 public boolean expired() throws AMQException
134 {
135 return getMessage().expired();
136 }
137
138 public boolean isAcquired()
139 {
140 return _state.getState() == State.ACQUIRED;
141 }
142
143 public boolean acquire()
144 {
145 return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
146 }
147
148 private boolean acquire(final EntryState state)
149 {
150 boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
151 if(acquired && _stateChangeListeners != null)
152 {
153 notifyStateChange(State.AVAILABLE, State.ACQUIRED);
154 }
155
156 return acquired;
157 }
158
159 public boolean acquire(Subscription sub)
160 {
161 return acquire(sub.getOwningState());
162 }
163
164 public boolean acquiredBySubscription()
165 {
166
167 return (_state instanceof SubscriptionAcquiredState);
168 }
169
170 public void setDeliveredToSubscription()
171 {
172 getMessage().setDeliveredToConsumer();
173 }
174
175 public void release()
176 {
177 _stateUpdater.set(this,AVAILABLE_STATE);
178 }
179
180 public String debugIdentity()
181 {
182 return getMessage().debugIdentity();
183 }
184
185
186 public boolean immediateAndNotDelivered()
187 {
188 return _message.immediateAndNotDelivered();
189 }
190
191 public ContentHeaderBody getContentHeaderBody() throws AMQException
192 {
193 return _message.getContentHeaderBody();
194 }
195
196 public boolean isPersistent() throws AMQException
197 {
198 return _message.isPersistent();
199 }
200
201 public boolean isRedelivered()
202 {
203 return _redelivered;
204 }
205
206 public void setRedelivered(boolean redelivered)
207 {
208 _redelivered = redelivered;
209 // todo - here we could mark this message as redelivered so we don't have to mark
210 // all messages on recover as redelivered.
211 }
212
213 public Subscription getDeliveredSubscription()
214 {
215 EntryState state = _state;
216 if (state instanceof SubscriptionAcquiredState)
217 {
218 return ((SubscriptionAcquiredState) state).getSubscription();
219 }
220 else
221 {
222 return null;
223 }
224
225 }
226
227 public void reject()
228 {
229 reject(getDeliveredSubscription());
230 }
231
232 public void reject(Subscription subscription)
233 {
234 if (subscription != null)
235 {
236 if (_rejectedBy == null)
237 {
238 _rejectedBy = new HashSet<Subscription>();
239 }
240
241 _rejectedBy.add(subscription);
242 }
243 else
244 {
245 _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
246 }
247 }
248
249 public boolean isRejectedBy(Subscription subscription)
250 {
251
252 if (_rejectedBy != null) // We have subscriptions that rejected this message
253 {
254 return _rejectedBy.contains(subscription);
255 }
256 else // This messasge hasn't been rejected yet.
257 {
258 return false;
259 }
260 }
261
262
263 public void requeue(final StoreContext storeContext) throws AMQException
264 {
265 getQueue().requeue(storeContext, this);
266 if(_stateChangeListeners != null)
267 {
268 notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
269 }
270 }
271
272 public void dequeue(final StoreContext storeContext) throws FailedDequeueException
273 {
274 EntryState state = _state;
275
276 if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
277 {
278 if (state instanceof SubscriptionAcquiredState)
279 {
280 Subscription s = ((SubscriptionAcquiredState) state).getSubscription();
281 s.restoreCredit(this);
282 }
283
284 getQueue().dequeue(storeContext, this);
285
286 if (_stateChangeListeners != null)
287 {
288 notifyStateChange(state.getState(), QueueEntry.State.DEQUEUED);
289 }
290 }
291 }
292
293 private void notifyStateChange(final State oldState, final State newState)
294 {
295 for(StateChangeListener l : _stateChangeListeners)
296 {
297 l.stateChanged(this, oldState, newState);
298 }
299 }
300
301 public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
302 {
303 //if the queue is null (i.e. queue.delete()'d) then the message is waiting to be acked, but has already be delete()'d;
304 if (getQueue() != null)
305 {
306 dequeue(storeContext);
307 }
308
309 delete();
310 }
311
312 public boolean isQueueDeleted()
313 {
314 return getQueue().isDeleted();
315 }
316
317 public void addStateChangeListener(StateChangeListener listener)
318 {
319 Set<StateChangeListener> listeners = _stateChangeListeners;
320 if(listeners == null)
321 {
322 _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
323 listeners = _stateChangeListeners;
324 }
325
326 listeners.add(listener);
327 }
328
329 public boolean removeStateChangeListener(StateChangeListener listener)
330 {
331 Set<StateChangeListener> listeners = _stateChangeListeners;
332 if(listeners != null)
333 {
334 return listeners.remove(listener);
335 }
336
337 return false;
338 }
339
340
341 public int compareTo(final QueueEntry o)
342 {
343 QueueEntryImpl other = (QueueEntryImpl)o;
344 return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
345 }
346
347 public QueueEntryImpl getNext()
348 {
349
350 QueueEntryImpl next = nextNode();
351 while(next != null && next.isDeleted())
352 {
353
354 final QueueEntryImpl newNext = next.nextNode();
355 if(newNext != null)
356 {
357 SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
358 next = nextNode();
359 }
360 else
361 {
362 next = null;
363 }
364
365 }
366 return next;
367 }
368
369 QueueEntryImpl nextNode()
370 {
371 return _next;
372 }
373
374 public boolean isDeleted()
375 {
376 return _state == DELETED_STATE;
377 }
378
379 public boolean delete()
380 {
381 EntryState state = _state;
382
383 if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
384 {
385 _queueEntryList.advanceHead();
386 return true;
387 }
388 else
389 {
390 return false;
391 }
392 }
393
394 public QueueEntryList getQueueEntryList()
395 {
396 return _queueEntryList;
397 }
398 }
|