SimpleQueueEntryList.java
001 package org.apache.qpid.server.queue;
002 
003 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
004 
005 /*
006 *
007 * Licensed to the Apache Software Foundation (ASF) under one
008 * or more contributor license agreements.  See the NOTICE file
009 * distributed with this work for additional information
010 * regarding copyright ownership.  The ASF licenses this file
011 * to you under the Apache License, Version 2.0 (the
012 * "License"); you may not use this file except in compliance
013 * with the License.  You may obtain a copy of the License at
014 *
015 *   http://www.apache.org/licenses/LICENSE-2.0
016 *
017 * Unless required by applicable law or agreed to in writing,
018 * software distributed under the License is distributed on an
019 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
020 * KIND, either express or implied.  See the License for the
021 * specific language governing permissions and limitations
022 * under the License.
023 *
024 */
025 public class SimpleQueueEntryList implements QueueEntryList
026 {
027     private final QueueEntryImpl _head;
028 
029     private volatile QueueEntryImpl _tail;
030 
031     static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, QueueEntryImpl>
032             _tailUpdater =
033         AtomicReferenceFieldUpdater.newUpdater
034         (SimpleQueueEntryList.class, QueueEntryImpl.class, "_tail");
035 
036 
037     private final AMQQueue _queue;
038 
039     static final AtomicReferenceFieldUpdater<QueueEntryImpl, QueueEntryImpl>
040                 _nextUpdater =
041             AtomicReferenceFieldUpdater.newUpdater
042             (QueueEntryImpl.class, QueueEntryImpl.class, "_next");
043 
044 
045 
046 
047 
048     public SimpleQueueEntryList(AMQQueue queue)
049     {
050         _queue = queue;
051         _head = new QueueEntryImpl(this);
052         _tail = _head;
053     }
054 
055     void advanceHead()
056     {
057         QueueEntryImpl head = _head.nextNode();
058         while(head._next != null && head.isDeleted())
059         {
060 
061             final QueueEntryImpl newhead = head.nextNode();
062             if(newhead != null)
063             {
064                 _nextUpdater.compareAndSet(_head,head, newhead);
065             }
066             head = _head.nextNode();
067         }
068     }
069 
070 
071     public AMQQueue getQueue()
072     {
073         return _queue;
074     }
075 
076 
077     public QueueEntry add(AMQMessage message)
078     {
079         QueueEntryImpl node = new QueueEntryImpl(this, message);
080         for (;;)
081         {
082             QueueEntryImpl tail = _tail;
083             QueueEntryImpl next = tail.nextNode();
084             if (tail == _tail)
085             {
086                 if (next == null)
087                 {
088                     node.setEntryId(tail.getEntryId()+1);
089                     if (_nextUpdater.compareAndSet(tail, null, node))
090                     {
091                         _tailUpdater.compareAndSet(this, tail, node);
092 
093                         return node;
094                     }
095                 }
096                 else
097                 {
098                     _tailUpdater.compareAndSet(this,tail, next);
099                 }
100             }
101         }
102     }
103 
104     public QueueEntry next(QueueEntry node)
105     {
106         return ((QueueEntryImpl)node).getNext();
107     }
108 
109 
110     public class QueueEntryIteratorImpl implements QueueEntryIterator
111     {
112 
113         private QueueEntryImpl _lastNode;
114 
115         QueueEntryIteratorImpl(QueueEntryImpl startNode)
116         {
117             _lastNode = startNode;
118         }
119 
120 
121         public boolean atTail()
122         {
123             return _lastNode.nextNode() == null;
124         }
125 
126         public QueueEntry getNode()
127         {
128 
129             return _lastNode;
130 
131         }
132 
133         public boolean advance()
134         {
135 
136             if(!atTail())
137             {
138                 QueueEntryImpl nextNode = _lastNode.nextNode();
139                 while(nextNode.isDeleted() && nextNode.nextNode() != null)
140                 {
141                     nextNode = nextNode.nextNode();
142                 }
143                 _lastNode = nextNode;
144                 return true;
145 
146             }
147             else
148             {
149                 return false;
150             }
151 
152         }
153 
154     }
155 
156 
157     public QueueEntryIterator iterator()
158     {
159         return new QueueEntryIteratorImpl(_head);
160     }
161 
162 
163     public QueueEntry getHead()
164     {
165         return _head;
166     }
167 
168     static class Factory implements QueueEntryListFactory
169     {
170 
171         public QueueEntryList createQueueEntryList(AMQQueue queue)
172         {
173             return new SimpleQueueEntryList(queue);
174         }
175     }
176     
177 
178 }