001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.exchange;
022
023 import java.util.ArrayList;
024 import java.util.List;
025 import java.util.Map;
026
027 import javax.management.JMException;
028 import javax.management.MBeanException;
029 import javax.management.openmbean.CompositeData;
030 import javax.management.openmbean.CompositeDataSupport;
031 import javax.management.openmbean.OpenDataException;
032 import javax.management.openmbean.TabularData;
033 import javax.management.openmbean.TabularDataSupport;
034
035 import org.apache.log4j.Logger;
036 import org.apache.qpid.AMQException;
037 import org.apache.qpid.protocol.AMQConstant;
038 import org.apache.qpid.exchange.ExchangeDefaults;
039 import org.apache.qpid.framing.AMQShortString;
040 import org.apache.qpid.framing.FieldTable;
041 import org.apache.qpid.server.management.MBeanConstructor;
042 import org.apache.qpid.server.management.MBeanDescription;
043 import org.apache.qpid.server.queue.IncomingMessage;
044 import org.apache.qpid.server.queue.AMQQueue;
045 import org.apache.qpid.server.virtualhost.VirtualHost;
046
047 public class DirectExchange extends AbstractExchange
048 {
049 private static final Logger _logger = Logger.getLogger(DirectExchange.class);
050
051 /**
052 * Maps from queue name to queue instances
053 */
054 private final Index _index = new Index();
055
056 public static final ExchangeType<DirectExchange> TYPE = new ExchangeType<DirectExchange>()
057 {
058
059 public AMQShortString getName()
060 {
061 return ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
062 }
063
064 public Class<DirectExchange> getExchangeClass()
065 {
066 return DirectExchange.class;
067 }
068
069 public DirectExchange newInstance(VirtualHost host,
070 AMQShortString name,
071 boolean durable,
072 int ticket,
073 boolean autoDelete) throws AMQException
074 {
075 DirectExchange exch = new DirectExchange();
076 exch.initialise(host,name,durable,ticket,autoDelete);
077 return exch;
078 }
079
080 public AMQShortString getDefaultExchangeName()
081 {
082 return ExchangeDefaults.DIRECT_EXCHANGE_NAME;
083 }
084 };
085
086 /**
087 * MBean class implementing the management interfaces.
088 */
089 @MBeanDescription("Management Bean for Direct Exchange")
090 private final class DirectExchangeMBean extends ExchangeMBean
091 {
092 @MBeanConstructor("Creates an MBean for AMQ direct exchange")
093 public DirectExchangeMBean() throws JMException
094 {
095 super();
096 _exchangeType = "direct";
097 init();
098 }
099
100 public TabularData bindings() throws OpenDataException
101 {
102 Map<AMQShortString, List<AMQQueue>> bindings = _index.getBindingsMap();
103 _bindingList = new TabularDataSupport(_bindinglistDataType);
104
105 for (Map.Entry<AMQShortString, List<AMQQueue>> entry : bindings.entrySet())
106 {
107 AMQShortString key = entry.getKey();
108 List<String> queueList = new ArrayList<String>();
109
110 List<AMQQueue> queues = entry.getValue();
111 for (AMQQueue q : queues)
112 {
113 queueList.add(q.getName().toString());
114 }
115
116 Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])};
117 CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
118 _bindingList.put(bindingData);
119 }
120
121 return _bindingList;
122 }
123
124 public void createNewBinding(String queueName, String binding) throws JMException
125 {
126 AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
127 if (queue == null)
128 {
129 throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
130 }
131
132 try
133 {
134 queue.bind(DirectExchange.this, new AMQShortString(binding), null);
135 }
136 catch (AMQException ex)
137 {
138 throw new MBeanException(ex);
139 }
140 }
141
142 }// End of MBean class
143
144
145 protected ExchangeMBean createMBean() throws AMQException
146 {
147 try
148 {
149 return new DirectExchangeMBean();
150 }
151 catch (JMException ex)
152 {
153 _logger.error("Exception occured in creating the direct exchange mbean", ex);
154 throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
155 }
156 }
157
158 public AMQShortString getType()
159 {
160 return ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
161 }
162
163 public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
164 {
165 assert queue != null;
166 assert routingKey != null;
167 if (!_index.add(routingKey, queue))
168 {
169 if (_logger.isDebugEnabled())
170 {
171 _logger.debug("Queue (" + queue.getName() + ")" + queue + " is already registered with routing key " + routingKey);
172 }
173 }
174 else
175 {
176 if (_logger.isDebugEnabled())
177 {
178 _logger.debug("Binding queue(" + queue.getName() + ") " + queue + " with routing key " + routingKey
179 + (args == null ? "" : " and arguments " + args.toString())
180 + " to exchange " + this);
181 }
182 }
183 }
184
185 public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
186 {
187 assert queue != null;
188 assert routingKey != null;
189
190 if (!_index.remove(routingKey, queue))
191 {
192 throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() +
193 " with routing key " + routingKey + ". No queue was registered with that _routing key");
194 }
195 }
196
197 public void route(IncomingMessage payload) throws AMQException
198 {
199
200 final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey();
201
202 final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
203
204 if (_logger.isDebugEnabled())
205 {
206 _logger.debug("Publishing message to queue " + queues);
207 }
208
209 payload.enqueue(queues);
210
211
212 }
213
214 public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
215 {
216 return isBound(routingKey,queue);
217 }
218
219 public boolean isBound(AMQShortString routingKey, AMQQueue queue)
220 {
221 final List<AMQQueue> queues = _index.get(routingKey);
222 return queues != null && queues.contains(queue);
223 }
224
225 public boolean isBound(AMQShortString routingKey)
226 {
227 final List<AMQQueue> queues = _index.get(routingKey);
228 return queues != null && !queues.isEmpty();
229 }
230
231 public boolean isBound(AMQQueue queue)
232 {
233 Map<AMQShortString, List<AMQQueue>> bindings = _index.getBindingsMap();
234 for (List<AMQQueue> queues : bindings.values())
235 {
236 if (queues.contains(queue))
237 {
238 return true;
239 }
240 }
241 return false;
242 }
243
244 public boolean hasBindings()
245 {
246 return !_index.getBindingsMap().isEmpty();
247 }
248
249 public Map<AMQShortString, List<AMQQueue>> getBindings()
250 {
251 return _index.getBindingsMap();
252 }
253 }
|