PriorityQueueList.java
001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.queue;
022 
023 import org.apache.qpid.framing.CommonContentHeaderProperties;
024 import org.apache.qpid.AMQException;
025 
026 public class PriorityQueueList implements QueueEntryList
027 {
028     private final AMQQueue _queue;
029     private final QueueEntryList[] _priorityLists;
030     private final int _priorities;
031     private final int _priorityOffset;
032 
033     public PriorityQueueList(AMQQueue queue, int priorities)
034     {
035         _queue = queue;
036         _priorityLists = new QueueEntryList[priorities];
037         _priorities = priorities;
038         _priorityOffset = 5-((priorities + 1)/2);
039         for(int i = 0; i < priorities; i++)
040         {
041             _priorityLists[inew SimpleQueueEntryList(queue);
042         }
043     }
044 
045     public int getPriorities()
046     {
047         return _priorities;
048     }
049 
050     public AMQQueue getQueue()
051     {
052         return _queue;
053     }
054 
055     public QueueEntry add(AMQMessage message)
056     {
057         int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
058         if(index >= _priorities)
059         {
060             index = _priorities-1;
061         }
062         else if(index < 0)
063         {
064             index = 0;
065         }
066         return _priorityLists[index].add(message);
067     }
068 
069     public QueueEntry next(QueueEntry node)
070     {
071         QueueEntryImpl nodeImpl = (QueueEntryImpl)node;
072         QueueEntry next = nodeImpl.getNext();
073 
074         if(next == null)
075         {
076             QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList();
077             int index;
078             for(index = _priorityLists.length-1; _priorityLists[index!= nodeEntryList; index--);
079 
080             while(next == null && index != 0)
081             {
082                 index--;
083                 next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext();
084             }
085 
086         }
087         return next;
088     }
089 
090     private final class PriorityQueueEntryListIterator implements QueueEntryIterator
091     {
092         private final QueueEntryIterator[] _iterators = new QueueEntryIterator_priorityLists.length ];
093         private QueueEntry _lastNode;
094 
095         PriorityQueueEntryListIterator()
096         {
097             for(int i = 0; i < _priorityLists.length; i++)
098             {
099                 _iterators[i= _priorityLists[i].iterator();
100             }
101             _lastNode = _iterators[_iterators.length - 1].getNode();
102         }
103 
104 
105         public boolean atTail()
106         {
107             for(int i = 0; i < _iterators.length; i++)
108             {
109                 if(!_iterators[i].atTail())
110                 {
111                     return false;
112                 }
113             }
114             return true;
115         }
116 
117         public QueueEntry getNode()
118         {
119             return _lastNode;
120         }
121 
122         public boolean advance()
123         {
124             for(int i = _iterators.length-1; i >= 0; i--)
125             {
126                 if(_iterators[i].advance())
127                 {
128                     _lastNode = _iterators[i].getNode();
129                     return true;
130                 }
131             }
132             return false;
133         }
134     }
135 
136     public QueueEntryIterator iterator()
137     {
138         return new PriorityQueueEntryListIterator();
139     }
140 
141     public QueueEntry getHead()
142     {
143         return _priorityLists[_priorities-1].getHead();
144     }
145 
146     static class Factory implements QueueEntryListFactory
147     {
148         private final int _priorities;
149 
150         Factory(int priorities)
151         {
152             _priorities = priorities;
153         }
154 
155         public QueueEntryList createQueueEntryList(AMQQueue queue)
156         {
157             return new PriorityQueueList(queue, _priorities);
158         }
159     }
160 }