001 package org.apache.qpid.server.queue;
002
003 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
004
005 /*
006 *
007 * Licensed to the Apache Software Foundation (ASF) under one
008 * or more contributor license agreements. See the NOTICE file
009 * distributed with this work for additional information
010 * regarding copyright ownership. The ASF licenses this file
011 * to you under the Apache License, Version 2.0 (the
012 * "License"); you may not use this file except in compliance
013 * with the License. You may obtain a copy of the License at
014 *
015 * http://www.apache.org/licenses/LICENSE-2.0
016 *
017 * Unless required by applicable law or agreed to in writing,
018 * software distributed under the License is distributed on an
019 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
020 * KIND, either express or implied. See the License for the
021 * specific language governing permissions and limitations
022 * under the License.
023 *
024 */
025 public class SimpleQueueEntryList implements QueueEntryList
026 {
027 private final QueueEntryImpl _head;
028
029 private volatile QueueEntryImpl _tail;
030
031 static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, QueueEntryImpl>
032 _tailUpdater =
033 AtomicReferenceFieldUpdater.newUpdater
034 (SimpleQueueEntryList.class, QueueEntryImpl.class, "_tail");
035
036
037 private final AMQQueue _queue;
038
039 static final AtomicReferenceFieldUpdater<QueueEntryImpl, QueueEntryImpl>
040 _nextUpdater =
041 AtomicReferenceFieldUpdater.newUpdater
042 (QueueEntryImpl.class, QueueEntryImpl.class, "_next");
043
044
045
046
047
048 public SimpleQueueEntryList(AMQQueue queue)
049 {
050 _queue = queue;
051 _head = new QueueEntryImpl(this);
052 _tail = _head;
053 }
054
055 void advanceHead()
056 {
057 QueueEntryImpl head = _head.nextNode();
058 while(head._next != null && head.isDeleted())
059 {
060
061 final QueueEntryImpl newhead = head.nextNode();
062 if(newhead != null)
063 {
064 _nextUpdater.compareAndSet(_head,head, newhead);
065 }
066 head = _head.nextNode();
067 }
068 }
069
070
071 public AMQQueue getQueue()
072 {
073 return _queue;
074 }
075
076
077 public QueueEntry add(AMQMessage message)
078 {
079 QueueEntryImpl node = new QueueEntryImpl(this, message);
080 for (;;)
081 {
082 QueueEntryImpl tail = _tail;
083 QueueEntryImpl next = tail.nextNode();
084 if (tail == _tail)
085 {
086 if (next == null)
087 {
088 node.setEntryId(tail.getEntryId()+1);
089 if (_nextUpdater.compareAndSet(tail, null, node))
090 {
091 _tailUpdater.compareAndSet(this, tail, node);
092
093 return node;
094 }
095 }
096 else
097 {
098 _tailUpdater.compareAndSet(this,tail, next);
099 }
100 }
101 }
102 }
103
104 public QueueEntry next(QueueEntry node)
105 {
106 return ((QueueEntryImpl)node).getNext();
107 }
108
109
110 public class QueueEntryIteratorImpl implements QueueEntryIterator
111 {
112
113 private QueueEntryImpl _lastNode;
114
115 QueueEntryIteratorImpl(QueueEntryImpl startNode)
116 {
117 _lastNode = startNode;
118 }
119
120
121 public boolean atTail()
122 {
123 return _lastNode.nextNode() == null;
124 }
125
126 public QueueEntry getNode()
127 {
128
129 return _lastNode;
130
131 }
132
133 public boolean advance()
134 {
135
136 if(!atTail())
137 {
138 QueueEntryImpl nextNode = _lastNode.nextNode();
139 while(nextNode.isDeleted() && nextNode.nextNode() != null)
140 {
141 nextNode = nextNode.nextNode();
142 }
143 _lastNode = nextNode;
144 return true;
145
146 }
147 else
148 {
149 return false;
150 }
151
152 }
153
154 }
155
156
157 public QueueEntryIterator iterator()
158 {
159 return new QueueEntryIteratorImpl(_head);
160 }
161
162
163 public QueueEntry getHead()
164 {
165 return _head;
166 }
167
168 static class Factory implements QueueEntryListFactory
169 {
170
171 public QueueEntryList createQueueEntryList(AMQQueue queue)
172 {
173 return new SimpleQueueEntryList(queue);
174 }
175 }
176
177
178 }
|