FanoutExchange.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.exchange;
022 
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.protocol.AMQConstant;
026 import org.apache.qpid.exchange.ExchangeDefaults;
027 import org.apache.qpid.framing.AMQShortString;
028 import org.apache.qpid.framing.FieldTable;
029 import org.apache.qpid.server.management.MBeanConstructor;
030 import org.apache.qpid.server.management.MBeanDescription;
031 import org.apache.qpid.server.queue.IncomingMessage;
032 import org.apache.qpid.server.queue.AMQQueue;
033 import org.apache.qpid.server.virtualhost.VirtualHost;
034 
035 import javax.management.JMException;
036 import javax.management.MBeanException;
037 import javax.management.openmbean.CompositeData;
038 import javax.management.openmbean.CompositeDataSupport;
039 import javax.management.openmbean.OpenDataException;
040 import javax.management.openmbean.TabularData;
041 import javax.management.openmbean.TabularDataSupport;
042 import java.util.List;
043 import java.util.Map;
044 import java.util.ArrayList;
045 import java.util.concurrent.CopyOnWriteArraySet;
046 
047 public class FanoutExchange extends AbstractExchange
048 {
049     private static final Logger _logger = Logger.getLogger(FanoutExchange.class);
050 
051     /**
052      * Maps from queue name to queue instances
053      */
054     private final CopyOnWriteArraySet<AMQQueue> _queues = new CopyOnWriteArraySet<AMQQueue>();
055 
056     /**
057      * MBean class implementing the management interfaces.
058      */
059     @MBeanDescription("Management Bean for Fanout Exchange")
060     private final class FanoutExchangeMBean extends ExchangeMBean
061     {
062         @MBeanConstructor("Creates an MBean for AMQ fanout exchange")
063         public FanoutExchangeMBean() throws JMException
064         {
065             super();
066             _exchangeType = "fanout";
067             init();
068         }
069 
070         public TabularData bindings() throws OpenDataException
071         {
072 
073             _bindingList = new TabularDataSupport(_bindinglistDataType);
074 
075             for (AMQQueue queue : _queues)
076             {
077                 String queueName = queue.getName().toString();
078 
079                 Object[] bindingItemValues = {queueName, new String[]{queueName}};
080                 CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
081                 _bindingList.put(bindingData);
082             }
083 
084             return _bindingList;
085         }
086 
087         public void createNewBinding(String queueName, String bindingthrows JMException
088         {
089             AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
090             if (queue == null)
091             {
092                 throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
093             }
094 
095             try
096             {
097                 queue.bind(FanoutExchange.this, new AMQShortString(binding)null);
098             }
099             catch (AMQException ex)
100             {
101                 throw new MBeanException(ex);
102             }
103         }
104 
105     // End of MBean class
106 
107     protected ExchangeMBean createMBean() throws AMQException
108     {
109         try
110         {
111             return new FanoutExchange.FanoutExchangeMBean();
112         }
113         catch (JMException ex)
114         {
115             _logger.error("Exception occured in creating the direct exchange mbean", ex);
116             throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
117         }
118     }
119 
120     public static final ExchangeType<FanoutExchange> TYPE = new ExchangeType<FanoutExchange>()
121     {
122 
123       public AMQShortString getName()
124       {
125         return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
126       }
127 
128       public Class<FanoutExchange> getExchangeClass()
129       {
130         return FanoutExchange.class;
131       }
132 
133       public FanoutExchange newInstance(VirtualHost host,
134                         AMQShortString name,
135                         boolean durable,
136                         int ticket,
137                         boolean autoDeletethrows AMQException
138       {
139         FanoutExchange exch = new FanoutExchange();
140         exch.initialise(host, name, durable, ticket, autoDelete);
141         return exch;
142       }
143 
144       public AMQShortString getDefaultExchangeName()
145       {
146         return ExchangeDefaults.FANOUT_EXCHANGE_NAME;
147       }
148     };
149 
150     public Map<AMQShortString, List<AMQQueue>> getBindings()
151     {
152         return null;
153     }
154 
155     public AMQShortString getType()
156     {
157         return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
158     }
159 
160     public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable argsthrows AMQException
161     {
162         assert queue != null;
163 
164         if (_queues.contains(queue))
165         {
166             _logger.debug("Queue " + queue + " is already registered");
167         }
168         else
169         {
170             _queues.add(queue);
171             _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " this);
172         }
173     }
174 
175     public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable argsthrows AMQException
176     {
177         assert queue != null;
178 
179         if (!_queues.remove(queue))
180         {
181             throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " this.getName() ". ");
182         }
183     }
184 
185     public void route(IncomingMessage payloadthrows AMQException
186     {
187 
188     
189         if (_logger.isDebugEnabled())
190         {
191             _logger.debug("Publishing message to queue " + _queues);
192         }
193 
194         payload.enqueue(new ArrayList(_queues));
195 
196     }
197 
198     public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
199     {
200         return isBound(routingKey, queue);
201     }
202 
203     public boolean isBound(AMQShortString routingKey, AMQQueue queue)
204     {
205         return _queues.contains(queue);
206     }
207 
208     public boolean isBound(AMQShortString routingKey)
209     {
210 
211         return (_queues != null&& !_queues.isEmpty();
212     }
213 
214     public boolean isBound(AMQQueue queue)
215     {
216 
217         return _queues.contains(queue);
218     }
219 
220     public boolean hasBindings()
221     {
222         return !_queues.isEmpty();
223     }
224 }