001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.exchange;
022
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.protocol.AMQConstant;
026 import org.apache.qpid.exchange.ExchangeDefaults;
027 import org.apache.qpid.framing.AMQShortString;
028 import org.apache.qpid.framing.FieldTable;
029 import org.apache.qpid.server.management.MBeanConstructor;
030 import org.apache.qpid.server.management.MBeanDescription;
031 import org.apache.qpid.server.queue.IncomingMessage;
032 import org.apache.qpid.server.queue.AMQQueue;
033 import org.apache.qpid.server.virtualhost.VirtualHost;
034
035 import javax.management.JMException;
036 import javax.management.MBeanException;
037 import javax.management.openmbean.CompositeData;
038 import javax.management.openmbean.CompositeDataSupport;
039 import javax.management.openmbean.OpenDataException;
040 import javax.management.openmbean.TabularData;
041 import javax.management.openmbean.TabularDataSupport;
042 import java.util.List;
043 import java.util.Map;
044 import java.util.ArrayList;
045 import java.util.concurrent.CopyOnWriteArraySet;
046
047 public class FanoutExchange extends AbstractExchange
048 {
049 private static final Logger _logger = Logger.getLogger(FanoutExchange.class);
050
051 /**
052 * Maps from queue name to queue instances
053 */
054 private final CopyOnWriteArraySet<AMQQueue> _queues = new CopyOnWriteArraySet<AMQQueue>();
055
056 /**
057 * MBean class implementing the management interfaces.
058 */
059 @MBeanDescription("Management Bean for Fanout Exchange")
060 private final class FanoutExchangeMBean extends ExchangeMBean
061 {
062 @MBeanConstructor("Creates an MBean for AMQ fanout exchange")
063 public FanoutExchangeMBean() throws JMException
064 {
065 super();
066 _exchangeType = "fanout";
067 init();
068 }
069
070 public TabularData bindings() throws OpenDataException
071 {
072
073 _bindingList = new TabularDataSupport(_bindinglistDataType);
074
075 for (AMQQueue queue : _queues)
076 {
077 String queueName = queue.getName().toString();
078
079 Object[] bindingItemValues = {queueName, new String[]{queueName}};
080 CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
081 _bindingList.put(bindingData);
082 }
083
084 return _bindingList;
085 }
086
087 public void createNewBinding(String queueName, String binding) throws JMException
088 {
089 AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
090 if (queue == null)
091 {
092 throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
093 }
094
095 try
096 {
097 queue.bind(FanoutExchange.this, new AMQShortString(binding), null);
098 }
099 catch (AMQException ex)
100 {
101 throw new MBeanException(ex);
102 }
103 }
104
105 } // End of MBean class
106
107 protected ExchangeMBean createMBean() throws AMQException
108 {
109 try
110 {
111 return new FanoutExchange.FanoutExchangeMBean();
112 }
113 catch (JMException ex)
114 {
115 _logger.error("Exception occured in creating the direct exchange mbean", ex);
116 throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
117 }
118 }
119
120 public static final ExchangeType<FanoutExchange> TYPE = new ExchangeType<FanoutExchange>()
121 {
122
123 public AMQShortString getName()
124 {
125 return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
126 }
127
128 public Class<FanoutExchange> getExchangeClass()
129 {
130 return FanoutExchange.class;
131 }
132
133 public FanoutExchange newInstance(VirtualHost host,
134 AMQShortString name,
135 boolean durable,
136 int ticket,
137 boolean autoDelete) throws AMQException
138 {
139 FanoutExchange exch = new FanoutExchange();
140 exch.initialise(host, name, durable, ticket, autoDelete);
141 return exch;
142 }
143
144 public AMQShortString getDefaultExchangeName()
145 {
146 return ExchangeDefaults.FANOUT_EXCHANGE_NAME;
147 }
148 };
149
150 public Map<AMQShortString, List<AMQQueue>> getBindings()
151 {
152 return null;
153 }
154
155 public AMQShortString getType()
156 {
157 return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
158 }
159
160 public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
161 {
162 assert queue != null;
163
164 if (_queues.contains(queue))
165 {
166 _logger.debug("Queue " + queue + " is already registered");
167 }
168 else
169 {
170 _queues.add(queue);
171 _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this);
172 }
173 }
174
175 public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
176 {
177 assert queue != null;
178
179 if (!_queues.remove(queue))
180 {
181 throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() + ". ");
182 }
183 }
184
185 public void route(IncomingMessage payload) throws AMQException
186 {
187
188
189 if (_logger.isDebugEnabled())
190 {
191 _logger.debug("Publishing message to queue " + _queues);
192 }
193
194 payload.enqueue(new ArrayList(_queues));
195
196 }
197
198 public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
199 {
200 return isBound(routingKey, queue);
201 }
202
203 public boolean isBound(AMQShortString routingKey, AMQQueue queue)
204 {
205 return _queues.contains(queue);
206 }
207
208 public boolean isBound(AMQShortString routingKey)
209 {
210
211 return (_queues != null) && !_queues.isEmpty();
212 }
213
214 public boolean isBound(AMQQueue queue)
215 {
216
217 return _queues.contains(queue);
218 }
219
220 public boolean hasBindings()
221 {
222 return !_queues.isEmpty();
223 }
224 }
|