001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.subscription;
022
023 import org.apache.qpid.server.queue.AMQQueue;
024 import org.apache.qpid.server.subscription.Subscription;
025
026 import java.util.concurrent.atomic.AtomicReference;
027 import java.util.concurrent.atomic.AtomicBoolean;
028 import java.util.concurrent.atomic.AtomicInteger;
029 import java.nio.ByteBuffer;
030
031 public class SubscriptionList
032 {
033
034 private final SubscriptionNode _head = new SubscriptionNode();
035
036 private AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
037 private final AMQQueue _queue;
038 private AtomicInteger _size = new AtomicInteger();
039
040
041 public final class SubscriptionNode
042 {
043 private final AtomicBoolean _deleted = new AtomicBoolean();
044 private final AtomicReference<SubscriptionNode> _next = new AtomicReference<SubscriptionNode>();
045 private final Subscription _sub;
046
047
048 public SubscriptionNode()
049 {
050
051 _sub = null;
052 _deleted.set(true);
053 }
054
055 public SubscriptionNode(final Subscription sub)
056 {
057 _sub = sub;
058 }
059
060
061 public SubscriptionNode getNext()
062 {
063
064 SubscriptionNode next = nextNode();
065 while(next != null && next.isDeleted())
066 {
067
068 final SubscriptionNode newNext = next.nextNode();
069 if(newNext != null)
070 {
071 _next.compareAndSet(next, newNext);
072 next = nextNode();
073 }
074 else
075 {
076 next = null;
077 }
078
079 }
080 return next;
081 }
082
083 private SubscriptionNode nextNode()
084 {
085 return _next.get();
086 }
087
088 public boolean isDeleted()
089 {
090 return _deleted.get();
091 }
092
093
094 public boolean delete()
095 {
096 if(_deleted.compareAndSet(false,true))
097 {
098 _size.decrementAndGet();
099 advanceHead();
100 return true;
101 }
102 else
103 {
104 return false;
105 }
106 }
107
108
109 public Subscription getSubscription()
110 {
111 return _sub;
112 }
113 }
114
115
116 public SubscriptionList(AMQQueue queue)
117 {
118 _queue = queue;
119 }
120
121 private void advanceHead()
122 {
123 SubscriptionNode head = _head.nextNode();
124 while(head._next.get() != null && head.isDeleted())
125 {
126
127 final SubscriptionNode newhead = head.nextNode();
128 if(newhead != null)
129 {
130 _head._next.compareAndSet(head, newhead);
131 }
132 head = _head.nextNode();
133 }
134 }
135
136
137 public SubscriptionNode add(Subscription sub)
138 {
139 SubscriptionNode node = new SubscriptionNode(sub);
140 for (;;)
141 {
142 SubscriptionNode tail = _tail.get();
143 SubscriptionNode next = tail.nextNode();
144 if (tail == _tail.get())
145 {
146 if (next == null)
147 {
148 if (tail._next.compareAndSet(null, node))
149 {
150 _tail.compareAndSet(tail, node);
151 _size.incrementAndGet();
152 return node;
153 }
154 }
155 else
156 {
157 _tail.compareAndSet(tail, next);
158 }
159 }
160 }
161
162 }
163
164 public boolean remove(Subscription sub)
165 {
166 SubscriptionNode node = _head.getNext();
167 while(node != null)
168 {
169 if(sub.equals(node._sub) && node.delete())
170 {
171 return true;
172 }
173 node = node.getNext();
174 }
175 return false;
176 }
177
178
179 public class SubscriptionNodeIterator
180 {
181
182 private SubscriptionNode _lastNode;
183
184 SubscriptionNodeIterator(SubscriptionNode startNode)
185 {
186 _lastNode = startNode;
187 }
188
189
190 public boolean atTail()
191 {
192 return _lastNode.nextNode() == null;
193 }
194
195 public SubscriptionNode getNode()
196 {
197
198 return _lastNode;
199
200 }
201
202 public boolean advance()
203 {
204
205 if(!atTail())
206 {
207 SubscriptionNode nextNode = _lastNode.nextNode();
208 while(nextNode.isDeleted() && nextNode.nextNode() != null)
209 {
210 nextNode = nextNode.nextNode();
211 }
212 _lastNode = nextNode;
213 return true;
214
215 }
216 else
217 {
218 return false;
219 }
220
221 }
222
223 }
224
225
226 public SubscriptionNodeIterator iterator()
227 {
228 return new SubscriptionNodeIterator(_head);
229 }
230
231
232 public SubscriptionNode getHead()
233 {
234 return _head;
235 }
236
237 public int size()
238 {
239 return _size.get();
240 }
241
242
243
244 }
245
246
|