DirectExchange.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.exchange;
022 
023 import java.util.ArrayList;
024 import java.util.List;
025 import java.util.Map;
026 
027 import javax.management.JMException;
028 import javax.management.MBeanException;
029 import javax.management.openmbean.CompositeData;
030 import javax.management.openmbean.CompositeDataSupport;
031 import javax.management.openmbean.OpenDataException;
032 import javax.management.openmbean.TabularData;
033 import javax.management.openmbean.TabularDataSupport;
034 
035 import org.apache.log4j.Logger;
036 import org.apache.qpid.AMQException;
037 import org.apache.qpid.protocol.AMQConstant;
038 import org.apache.qpid.exchange.ExchangeDefaults;
039 import org.apache.qpid.framing.AMQShortString;
040 import org.apache.qpid.framing.FieldTable;
041 import org.apache.qpid.server.management.MBeanConstructor;
042 import org.apache.qpid.server.management.MBeanDescription;
043 import org.apache.qpid.server.queue.IncomingMessage;
044 import org.apache.qpid.server.queue.AMQQueue;
045 import org.apache.qpid.server.virtualhost.VirtualHost;
046 
047 public class DirectExchange extends AbstractExchange
048 {
049     private static final Logger _logger = Logger.getLogger(DirectExchange.class);
050 
051     /**
052      * Maps from queue name to queue instances
053      */
054     private final Index _index = new Index();
055 
056     public static final ExchangeType<DirectExchange> TYPE = new ExchangeType<DirectExchange>()
057     {
058 
059         public AMQShortString getName()
060         {
061             return ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
062         }
063 
064         public Class<DirectExchange> getExchangeClass()
065         {
066             return DirectExchange.class;
067         }
068 
069         public DirectExchange newInstance(VirtualHost host,
070                                             AMQShortString name,
071                                             boolean durable,
072                                             int ticket,
073                                             boolean autoDeletethrows AMQException
074         {
075             DirectExchange exch = new DirectExchange();
076             exch.initialise(host,name,durable,ticket,autoDelete);
077             return exch;
078         }
079 
080         public AMQShortString getDefaultExchangeName()
081         {
082             return ExchangeDefaults.DIRECT_EXCHANGE_NAME;
083         }
084     };
085 
086     /**
087      * MBean class implementing the management interfaces.
088      */
089     @MBeanDescription("Management Bean for Direct Exchange")
090     private final class DirectExchangeMBean extends ExchangeMBean
091     {
092         @MBeanConstructor("Creates an MBean for AMQ direct exchange")
093         public DirectExchangeMBean()  throws JMException
094         {
095             super();
096             _exchangeType = "direct";
097             init();
098         }
099 
100         public TabularData bindings() throws OpenDataException
101         {
102             Map<AMQShortString, List<AMQQueue>> bindings = _index.getBindingsMap();
103             _bindingList = new TabularDataSupport(_bindinglistDataType);
104 
105             for (Map.Entry<AMQShortString, List<AMQQueue>> entry : bindings.entrySet())
106             {
107                 AMQShortString key = entry.getKey();
108                 List<String> queueList = new ArrayList<String>();
109 
110                 List<AMQQueue> queues = entry.getValue();
111                 for (AMQQueue q : queues)
112                 {
113                     queueList.add(q.getName().toString());
114                 }
115 
116                 Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])};
117                 CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
118                 _bindingList.put(bindingData);
119             }
120 
121             return _bindingList;
122         }
123 
124         public void createNewBinding(String queueName, String bindingthrows JMException
125         {
126             AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
127             if (queue == null)
128             {
129                 throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
130             }
131 
132             try
133             {
134                 queue.bind(DirectExchange.this, new AMQShortString(binding)null);
135             }
136             catch (AMQException ex)
137             {
138                 throw new MBeanException(ex);
139             }
140         }
141 
142     }// End of MBean class
143 
144 
145     protected ExchangeMBean createMBean() throws AMQException
146     {
147         try
148         {
149             return new DirectExchangeMBean();
150         }
151         catch (JMException ex)
152         {
153             _logger.error("Exception occured in creating the direct exchange mbean", ex);
154             throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
155         }
156     }
157 
158     public AMQShortString getType()
159     {
160         return ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
161     }
162 
163     public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable argsthrows AMQException
164     {
165         assert queue != null;
166         assert routingKey != null;
167         if (!_index.add(routingKey, queue))
168         {
169             if (_logger.isDebugEnabled())
170             {
171                 _logger.debug("Queue (" + queue.getName() ")" + queue + " is already registered with routing key " + routingKey);
172             }
173         }
174         else
175         {
176             if (_logger.isDebugEnabled())
177             {
178                 _logger.debug("Binding queue(" + queue.getName() ") " + queue + " with routing key " + routingKey
179                               (args == null "" " and arguments " + args.toString())
180                               " to exchange " this);
181             }
182         }
183     }
184 
185     public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable argsthrows AMQException
186     {
187         assert queue != null;
188         assert routingKey != null;
189 
190         if (!_index.remove(routingKey, queue))
191         {
192             throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " this.getName() +
193                                    " with routing key " + routingKey + ". No queue was registered with that _routing key");
194         }
195     }
196 
197     public void route(IncomingMessage payloadthrows AMQException
198     {
199 
200         final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey();
201 
202         final ArrayList<AMQQueue> queues = (routingKey == nullnull : _index.get(routingKey);
203 
204         if (_logger.isDebugEnabled())
205         {
206             _logger.debug("Publishing message to queue " + queues);
207         }
208 
209         payload.enqueue(queues);
210 
211 
212     }
213 
214     public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
215     {
216         return isBound(routingKey,queue);
217     }
218 
219     public boolean isBound(AMQShortString routingKey, AMQQueue queue)
220     {
221         final List<AMQQueue> queues = _index.get(routingKey);
222         return queues != null && queues.contains(queue);
223     }
224 
225     public boolean isBound(AMQShortString routingKey)
226     {
227         final List<AMQQueue> queues = _index.get(routingKey);
228         return queues != null && !queues.isEmpty();
229     }
230 
231     public boolean isBound(AMQQueue queue)
232     {
233         Map<AMQShortString, List<AMQQueue>> bindings = _index.getBindingsMap();
234         for (List<AMQQueue> queues : bindings.values())
235         {
236             if (queues.contains(queue))
237             {
238                 return true;
239             }
240         }
241         return false;
242     }
243 
244     public boolean hasBindings()
245     {
246         return !_index.getBindingsMap().isEmpty();
247     }
248 
249     public Map<AMQShortString, List<AMQQueue>> getBindings()
250     {
251         return _index.getBindingsMap();
252     }
253 }