QueueEntryImpl.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.queue;
022 
023 import org.apache.qpid.AMQException;
024 import org.apache.qpid.framing.ContentHeaderBody;
025 import org.apache.qpid.server.store.StoreContext;
026 import org.apache.qpid.server.subscription.Subscription;
027 import org.apache.log4j.Logger;
028 
029 import java.util.Set;
030 import java.util.HashSet;
031 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
032 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
033 import java.util.concurrent.CopyOnWriteArraySet;
034 
035 
036 public class QueueEntryImpl implements QueueEntry
037 {
038 
039     /**
040      * Used for debugging purposes.
041      */
042     private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
043 
044     private final SimpleQueueEntryList _queueEntryList;
045 
046     private AMQMessage _message;
047 
048     private boolean _redelivered;
049 
050     private Set<Subscription> _rejectedBy = null;
051 
052     private volatile EntryState _state = AVAILABLE_STATE;
053 
054     private static final
055         AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
056             _stateUpdater =
057         AtomicReferenceFieldUpdater.newUpdater
058         (QueueEntryImpl.class, EntryState.class, "_state");
059 
060 
061     private volatile Set<StateChangeListener> _stateChangeListeners;
062 
063     private static final
064         AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
065                 _listenersUpdater =
066         AtomicReferenceFieldUpdater.newUpdater
067         (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
068 
069 
070     private static final
071         AtomicLongFieldUpdater<QueueEntryImpl>
072             _entryIdUpdater =
073         AtomicLongFieldUpdater.newUpdater
074         (QueueEntryImpl.class, "_entryId");
075 
076 
077     private volatile long _entryId;
078 
079     volatile QueueEntryImpl _next;
080 
081 
082     QueueEntryImpl(SimpleQueueEntryList queueEntryList)
083     {
084         this(queueEntryList,null,Long.MIN_VALUE);
085         _state = DELETED_STATE;
086     }
087 
088 
089     public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
090     {
091         _queueEntryList = queueEntryList;
092         _message = message;
093 
094         _entryIdUpdater.set(this, entryId);
095     }
096 
097     public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
098     {
099         _queueEntryList = queueEntryList;
100         _message = message;
101     }
102 
103     protected void setEntryId(long entryId)
104     {
105         _entryIdUpdater.set(this, entryId);
106     }
107 
108     protected long getEntryId()
109     {
110         return _entryId;
111     }
112 
113     public AMQQueue getQueue()
114     {
115         return _queueEntryList.getQueue();
116     }
117 
118     public AMQMessage getMessage()
119     {
120         return _message;
121     }
122 
123     public long getSize()
124     {
125         return getMessage().getSize();
126     }
127 
128     public boolean getDeliveredToConsumer()
129     {
130         return getMessage().getDeliveredToConsumer();
131     }
132 
133     public boolean expired() throws AMQException
134     {
135         return getMessage().expired();
136     }
137 
138     public boolean isAcquired()
139     {
140         return _state.getState() == State.ACQUIRED;
141     }
142 
143     public boolean acquire()
144     {
145         return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
146     }
147 
148     private boolean acquire(final EntryState state)
149     {
150         boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
151         if(acquired && _stateChangeListeners != null)
152         {
153             notifyStateChange(State.AVAILABLE, State.ACQUIRED);
154         }
155 
156         return acquired;
157     }
158 
159     public boolean acquire(Subscription sub)
160     {
161         return acquire(sub.getOwningState());
162     }
163 
164     public boolean acquiredBySubscription()
165     {
166 
167         return (_state instanceof SubscriptionAcquiredState);
168     }
169 
170     public void setDeliveredToSubscription()
171     {
172         getMessage().setDeliveredToConsumer();
173     }
174 
175     public void release()
176     {
177         _stateUpdater.set(this,AVAILABLE_STATE);
178     }
179 
180     public String debugIdentity()
181     {
182         return getMessage().debugIdentity();
183     }
184 
185 
186     public boolean immediateAndNotDelivered() 
187     {
188         return _message.immediateAndNotDelivered();
189     }
190 
191     public ContentHeaderBody getContentHeaderBody() throws AMQException
192     {
193         return _message.getContentHeaderBody();
194     }
195 
196     public boolean isPersistent() throws AMQException
197     {
198         return _message.isPersistent();
199     }
200 
201     public boolean isRedelivered()
202     {
203         return _redelivered;
204     }
205 
206     public void setRedelivered(boolean redelivered)
207     {
208         _redelivered = redelivered;
209         // todo - here we could mark this message as redelivered so we don't have to mark
210         // all messages on recover as redelivered.       
211     }
212 
213     public Subscription getDeliveredSubscription()
214     {
215             EntryState state = _state;
216             if (state instanceof SubscriptionAcquiredState)
217             {
218                 return ((SubscriptionAcquiredStatestate).getSubscription();
219             }
220             else
221             {
222                 return null;
223             }
224 
225     }
226 
227     public void reject()
228     {
229         reject(getDeliveredSubscription());
230     }
231 
232     public void reject(Subscription subscription)
233     {
234         if (subscription != null)
235         {
236             if (_rejectedBy == null)
237             {
238                 _rejectedBy = new HashSet<Subscription>();
239             }
240 
241             _rejectedBy.add(subscription);
242         }
243         else
244         {
245             _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
246         }
247     }
248 
249     public boolean isRejectedBy(Subscription subscription)
250     {        
251 
252         if (_rejectedBy != null// We have subscriptions that rejected this message
253         {
254             return _rejectedBy.contains(subscription);
255         }
256         else // This messasge hasn't been rejected yet.
257         {
258             return false;
259         }
260     }
261 
262 
263     public void requeue(final StoreContext storeContextthrows AMQException
264     {
265         getQueue().requeue(storeContext, this);
266         if(_stateChangeListeners != null)
267         {
268             notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
269         }
270     }
271 
272     public void dequeue(final StoreContext storeContextthrows FailedDequeueException
273     {
274         EntryState state = _state;
275 
276         if((state.getState() == State.ACQUIRED&&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
277         {
278             if (state instanceof SubscriptionAcquiredState)
279             {
280                 Subscription s = ((SubscriptionAcquiredStatestate).getSubscription();
281                 s.restoreCredit(this);
282             }
283 
284             getQueue().dequeue(storeContext, this);
285 
286             if (_stateChangeListeners != null)
287             {
288                 notifyStateChange(state.getState(), QueueEntry.State.DEQUEUED);
289             }
290         }
291     }
292 
293     private void notifyStateChange(final State oldState, final State newState)
294     {
295         for(StateChangeListener l : _stateChangeListeners)
296         {
297             l.stateChanged(this, oldState, newState);
298         }
299     }
300 
301     public void dequeueAndDelete(StoreContext storeContextthrows FailedDequeueException
302     {
303         //if the queue is null (i.e. queue.delete()'d) then the message is waiting to be acked, but has already be delete()'d;
304         if (getQueue() != null)
305         {
306             dequeue(storeContext);
307         }
308 
309         delete();
310     }
311 
312     public boolean isQueueDeleted()
313     {
314         return getQueue().isDeleted();
315     }
316 
317     public void addStateChangeListener(StateChangeListener listener)
318     {
319         Set<StateChangeListener> listeners = _stateChangeListeners;
320         if(listeners == null)
321         {
322             _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
323             listeners = _stateChangeListeners;
324         }
325 
326         listeners.add(listener);
327     }
328 
329     public boolean removeStateChangeListener(StateChangeListener listener)
330     {
331         Set<StateChangeListener> listeners = _stateChangeListeners;
332         if(listeners != null)
333         {
334             return listeners.remove(listener);
335         }
336 
337         return false;
338     }
339 
340 
341     public int compareTo(final QueueEntry o)
342     {
343         QueueEntryImpl other = (QueueEntryImpl)o;
344         return getEntryId() > other.getEntryId() : getEntryId() < other.getEntryId() ? -0;
345     }
346 
347     public QueueEntryImpl getNext()
348     {
349 
350         QueueEntryImpl next = nextNode();
351         while(next != null && next.isDeleted())
352         {
353 
354             final QueueEntryImpl newNext = next.nextNode();
355             if(newNext != null)
356             {
357                 SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
358                 next = nextNode();
359             }
360             else
361             {
362                 next = null;
363             }
364 
365         }
366         return next;
367     }
368 
369     QueueEntryImpl nextNode()
370     {
371         return _next;
372     }
373 
374     public boolean isDeleted()
375     {
376         return _state == DELETED_STATE;
377     }
378 
379     public boolean delete()
380     {
381         EntryState state = _state;
382 
383         if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
384         {
385             _queueEntryList.advanceHead();            
386             return true;
387         }
388         else
389         {
390             return false;
391         }
392     }
393 
394     public QueueEntryList getQueueEntryList()
395     {
396         return _queueEntryList;
397     }
398 }