SubscriptionList.java
001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.subscription;
022 
023 import org.apache.qpid.server.queue.AMQQueue;
024 import org.apache.qpid.server.subscription.Subscription;
025 
026 import java.util.concurrent.atomic.AtomicReference;
027 import java.util.concurrent.atomic.AtomicBoolean;
028 import java.util.concurrent.atomic.AtomicInteger;
029 import java.nio.ByteBuffer;
030 
031 public class SubscriptionList
032 {
033 
034     private final SubscriptionNode _head = new SubscriptionNode();
035 
036     private AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
037     private final AMQQueue _queue;
038     private AtomicInteger _size = new AtomicInteger();
039 
040 
041     public final class SubscriptionNode
042     {
043         private final AtomicBoolean _deleted = new AtomicBoolean();
044         private final AtomicReference<SubscriptionNode> _next = new AtomicReference<SubscriptionNode>();
045         private final Subscription _sub;
046 
047 
048         public SubscriptionNode()
049         {
050 
051             _sub = null;
052             _deleted.set(true);
053         }
054 
055         public SubscriptionNode(final Subscription sub)
056         {
057             _sub = sub;
058         }
059 
060 
061         public SubscriptionNode getNext()
062         {
063 
064             SubscriptionNode next = nextNode();
065             while(next != null && next.isDeleted())
066             {
067 
068                 final SubscriptionNode newNext = next.nextNode();
069                 if(newNext != null)
070                 {
071                     _next.compareAndSet(next, newNext);
072                     next = nextNode();
073                 }
074                 else
075                 {
076                     next = null;
077                 }
078 
079             }
080             return next;
081         }
082 
083         private SubscriptionNode nextNode()
084         {
085             return _next.get();
086         }
087 
088         public boolean isDeleted()
089         {
090             return _deleted.get();
091         }
092 
093 
094         public boolean delete()
095         {
096             if(_deleted.compareAndSet(false,true))
097             {
098                 _size.decrementAndGet();
099                 advanceHead();
100                 return true;
101             }
102             else
103             {
104                 return false;
105             }
106         }
107 
108 
109         public Subscription getSubscription()
110         {
111             return _sub;
112         }
113     }
114 
115 
116     public SubscriptionList(AMQQueue queue)
117     {
118         _queue = queue;
119     }
120 
121     private void advanceHead()
122     {
123         SubscriptionNode head = _head.nextNode();
124         while(head._next.get() != null && head.isDeleted())
125         {
126 
127             final SubscriptionNode newhead = head.nextNode();
128             if(newhead != null)
129             {
130                 _head._next.compareAndSet(head, newhead);
131             }
132             head = _head.nextNode();
133         }
134     }
135 
136 
137     public SubscriptionNode add(Subscription sub)
138     {
139         SubscriptionNode node = new SubscriptionNode(sub);
140         for (;;)
141         {
142             SubscriptionNode tail = _tail.get();
143             SubscriptionNode next = tail.nextNode();
144             if (tail == _tail.get())
145             {
146                 if (next == null)
147                 {
148                     if (tail._next.compareAndSet(null, node))
149                     {
150                         _tail.compareAndSet(tail, node);
151                         _size.incrementAndGet();
152                         return node;
153                     }
154                 }
155                 else
156                 {
157                     _tail.compareAndSet(tail, next);
158                 }
159             }
160         }
161 
162     }
163 
164     public boolean remove(Subscription sub)
165     {
166         SubscriptionNode node = _head.getNext();
167         while(node != null)
168         {
169             if(sub.equals(node._sub&& node.delete())
170             {
171                 return true;
172             }
173             node = node.getNext();
174         }
175         return false;
176     }
177 
178 
179     public class SubscriptionNodeIterator
180     {
181 
182         private SubscriptionNode _lastNode;
183 
184         SubscriptionNodeIterator(SubscriptionNode startNode)
185         {
186             _lastNode = startNode;
187         }
188 
189 
190         public boolean atTail()
191         {
192             return _lastNode.nextNode() == null;
193         }
194 
195         public SubscriptionNode getNode()
196         {
197 
198             return _lastNode;
199 
200         }
201 
202         public boolean advance()
203         {
204 
205             if(!atTail())
206             {
207                 SubscriptionNode nextNode = _lastNode.nextNode();
208                 while(nextNode.isDeleted() && nextNode.nextNode() != null)
209                 {
210                     nextNode = nextNode.nextNode();
211                 }
212                 _lastNode = nextNode;
213                 return true;
214 
215             }
216             else
217             {
218                 return false;
219             }
220 
221         }
222 
223     }
224 
225 
226     public SubscriptionNodeIterator iterator()
227     {
228         return new SubscriptionNodeIterator(_head);
229     }
230 
231 
232     public SubscriptionNode getHead()
233     {
234         return _head;
235     }
236 
237     public int size()
238     {
239         return _size.get();
240     }
241 
242 
243 
244 }
245 
246