01 /*
02 *
03 * Licensed to the Apache Software Foundation (ASF) under one
04 * or more contributor license agreements. See the NOTICE file
05 * distributed with this work for additional information
06 * regarding copyright ownership. The ASF licenses this file
07 * to you under the Apache License, Version 2.0 (the
08 * "License"); you may not use this file except in compliance
09 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 *
20 */
21 package org.apache.qpid.server.exchange;
22
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.ArrayList;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentMap;
29 import java.util.concurrent.CopyOnWriteArrayList;
30
31 import org.apache.qpid.framing.AMQShortString;
32 import org.apache.qpid.server.queue.AMQQueue;
33
34 /**
35 * An index of queues against routing key. Allows multiple queues to be stored
36 * against the same key. Used in the DirectExchange.
37 */
38 class Index
39 {
40 private ConcurrentMap<AMQShortString, ArrayList<AMQQueue>> _index
41 = new ConcurrentHashMap<AMQShortString, ArrayList<AMQQueue>>();
42
43 synchronized boolean add(AMQShortString key, AMQQueue queue)
44 {
45 ArrayList<AMQQueue> queues = _index.get(key);
46 if(queues == null)
47 {
48 queues = new ArrayList<AMQQueue>();
49 }
50 else
51 {
52 queues = new ArrayList<AMQQueue>(queues);
53 }
54 //next call is atomic, so there is no race to create the list
55 _index.put(key, queues);
56
57 if(queues.contains(queue))
58 {
59 return false;
60 }
61 else
62 {
63 return queues.add(queue);
64 }
65 }
66
67 synchronized boolean remove(AMQShortString key, AMQQueue queue)
68 {
69 ArrayList<AMQQueue> queues = _index.get(key);
70 if (queues != null)
71 {
72 queues = new ArrayList<AMQQueue>(queues);
73 boolean removed = queues.remove(queue);
74 if(removed)
75 {
76 if (queues.size() == 0)
77 {
78 _index.remove(key);
79 }
80 else
81 {
82 _index.put(key, queues);
83 }
84 }
85 return removed;
86 }
87 return false;
88 }
89
90 ArrayList<AMQQueue> get(AMQShortString key)
91 {
92 return _index.get(key);
93 }
94
95 Map<AMQShortString, List<AMQQueue>> getBindingsMap()
96 {
97 return new HashMap<AMQShortString, List<AMQQueue>>(_index);
98 }
99 }
|