001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.queue;
022
023 import org.apache.qpid.framing.CommonContentHeaderProperties;
024 import org.apache.qpid.AMQException;
025
026 public class PriorityQueueList implements QueueEntryList
027 {
028 private final AMQQueue _queue;
029 private final QueueEntryList[] _priorityLists;
030 private final int _priorities;
031 private final int _priorityOffset;
032
033 public PriorityQueueList(AMQQueue queue, int priorities)
034 {
035 _queue = queue;
036 _priorityLists = new QueueEntryList[priorities];
037 _priorities = priorities;
038 _priorityOffset = 5-((priorities + 1)/2);
039 for(int i = 0; i < priorities; i++)
040 {
041 _priorityLists[i] = new SimpleQueueEntryList(queue);
042 }
043 }
044
045 public int getPriorities()
046 {
047 return _priorities;
048 }
049
050 public AMQQueue getQueue()
051 {
052 return _queue;
053 }
054
055 public QueueEntry add(AMQMessage message)
056 {
057 int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
058 if(index >= _priorities)
059 {
060 index = _priorities-1;
061 }
062 else if(index < 0)
063 {
064 index = 0;
065 }
066 return _priorityLists[index].add(message);
067 }
068
069 public QueueEntry next(QueueEntry node)
070 {
071 QueueEntryImpl nodeImpl = (QueueEntryImpl)node;
072 QueueEntry next = nodeImpl.getNext();
073
074 if(next == null)
075 {
076 QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList();
077 int index;
078 for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--);
079
080 while(next == null && index != 0)
081 {
082 index--;
083 next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext();
084 }
085
086 }
087 return next;
088 }
089
090 private final class PriorityQueueEntryListIterator implements QueueEntryIterator
091 {
092 private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
093 private QueueEntry _lastNode;
094
095 PriorityQueueEntryListIterator()
096 {
097 for(int i = 0; i < _priorityLists.length; i++)
098 {
099 _iterators[i] = _priorityLists[i].iterator();
100 }
101 _lastNode = _iterators[_iterators.length - 1].getNode();
102 }
103
104
105 public boolean atTail()
106 {
107 for(int i = 0; i < _iterators.length; i++)
108 {
109 if(!_iterators[i].atTail())
110 {
111 return false;
112 }
113 }
114 return true;
115 }
116
117 public QueueEntry getNode()
118 {
119 return _lastNode;
120 }
121
122 public boolean advance()
123 {
124 for(int i = _iterators.length-1; i >= 0; i--)
125 {
126 if(_iterators[i].advance())
127 {
128 _lastNode = _iterators[i].getNode();
129 return true;
130 }
131 }
132 return false;
133 }
134 }
135
136 public QueueEntryIterator iterator()
137 {
138 return new PriorityQueueEntryListIterator();
139 }
140
141 public QueueEntry getHead()
142 {
143 return _priorityLists[_priorities-1].getHead();
144 }
145
146 static class Factory implements QueueEntryListFactory
147 {
148 private final int _priorities;
149
150 Factory(int priorities)
151 {
152 _priorities = priorities;
153 }
154
155 public QueueEntryList createQueueEntryList(AMQQueue queue)
156 {
157 return new PriorityQueueList(queue, _priorities);
158 }
159 }
160 }
|