001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.exchange;
022
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.protocol.AMQConstant;
026 import org.apache.qpid.exchange.ExchangeDefaults;
027 import org.apache.qpid.framing.AMQShortString;
028 import org.apache.qpid.framing.AMQTypedValue;
029 import org.apache.qpid.framing.BasicContentHeaderProperties;
030 import org.apache.qpid.framing.ContentHeaderBody;
031 import org.apache.qpid.framing.FieldTable;
032 import org.apache.qpid.server.management.MBeanConstructor;
033 import org.apache.qpid.server.management.MBeanDescription;
034 import org.apache.qpid.server.queue.IncomingMessage;
035 import org.apache.qpid.server.queue.AMQQueue;
036 import org.apache.qpid.server.virtualhost.VirtualHost;
037
038 import javax.management.JMException;
039 import javax.management.openmbean.ArrayType;
040 import javax.management.openmbean.CompositeData;
041 import javax.management.openmbean.CompositeDataSupport;
042 import javax.management.openmbean.CompositeType;
043 import javax.management.openmbean.OpenDataException;
044 import javax.management.openmbean.OpenType;
045 import javax.management.openmbean.SimpleType;
046 import javax.management.openmbean.TabularData;
047 import javax.management.openmbean.TabularDataSupport;
048 import javax.management.openmbean.TabularType;
049 import java.util.ArrayList;
050 import java.util.Iterator;
051 import java.util.List;
052 import java.util.Map;
053 import java.util.Collection;
054 import java.util.concurrent.CopyOnWriteArrayList;
055
056 /**
057 * An exchange that binds queues based on a set of required headers and header values
058 * and routes messages to these queues by matching the headers of the message against
059 * those with which the queues were bound.
060 * <p/>
061 * <pre>
062 * The Headers Exchange
063 *
064 * Routes messages according to the value/presence of fields in the message header table.
065 * (Basic and JMS content has a content header field called "headers" that is a table of
066 * message header fields).
067 *
068 * class = "headers"
069 * routing key is not used
070 *
071 * Has the following binding arguments:
072 *
073 * the X-match field - if "all", does an AND match (used for GRM), if "any", does an OR match.
074 * other fields prefixed with "X-" are ignored (and generate a console warning message).
075 * a field with no value or empty value indicates a match on presence only.
076 * a field with a value indicates match on field presence and specific value.
077 *
078 * Standard instances:
079 *
080 * amq.match - pub/sub on field content/value
081 * </pre>
082 */
083 public class HeadersExchange extends AbstractExchange
084 {
085 private static final Logger _logger = Logger.getLogger(HeadersExchange.class);
086
087
088
089 public static final ExchangeType<HeadersExchange> TYPE = new ExchangeType<HeadersExchange>()
090 {
091
092 public AMQShortString getName()
093 {
094 return ExchangeDefaults.HEADERS_EXCHANGE_CLASS;
095 }
096
097 public Class<HeadersExchange> getExchangeClass()
098 {
099 return HeadersExchange.class;
100 }
101
102 public HeadersExchange newInstance(VirtualHost host, AMQShortString name, boolean durable, int ticket,
103 boolean autoDelete) throws AMQException
104 {
105 HeadersExchange exch = new HeadersExchange();
106 exch.initialise(host, name, durable, ticket, autoDelete);
107 return exch;
108 }
109
110 public AMQShortString getDefaultExchangeName()
111 {
112
113 return ExchangeDefaults.HEADERS_EXCHANGE_NAME;
114 }
115 };
116
117
118 private final List<Registration> _bindings = new CopyOnWriteArrayList<Registration>();
119
120 /**
121 * HeadersExchangeMBean class implements the management interface for the
122 * Header Exchanges.
123 */
124 @MBeanDescription("Management Bean for Headers Exchange")
125 private final class HeadersExchangeMBean extends ExchangeMBean
126 {
127 @MBeanConstructor("Creates an MBean for AMQ Headers exchange")
128 public HeadersExchangeMBean() throws JMException
129 {
130 super();
131 _exchangeType = "headers";
132 init();
133 }
134
135 /**
136 * initialises the OpenType objects.
137 */
138 protected void init() throws OpenDataException
139 {
140 _bindingItemNames = new String[]{"Binding No", "Queue Name", "Queue Bindings"};
141 _bindingItemIndexNames = new String[]{_bindingItemNames[0]};
142
143 _bindingItemTypes = new OpenType[3];
144 _bindingItemTypes[0] = SimpleType.INTEGER;
145 _bindingItemTypes[1] = SimpleType.STRING;
146 _bindingItemTypes[2] = new ArrayType(1, SimpleType.STRING);
147 _bindingDataType = new CompositeType("Exchange Binding", "Queue name and header bindings",
148 _bindingItemNames, _bindingItemNames, _bindingItemTypes);
149 _bindinglistDataType = new TabularType("Exchange Bindings", "List of exchange bindings for " + getName(),
150 _bindingDataType, _bindingItemIndexNames);
151 }
152
153 public TabularData bindings() throws OpenDataException
154 {
155 _bindingList = new TabularDataSupport(_bindinglistDataType);
156 int count = 1;
157 for (Iterator<Registration> itr = _bindings.iterator(); itr.hasNext();)
158 {
159 Registration registration = itr.next();
160 String queueName = registration.queue.getName().toString();
161
162 HeadersBinding headers = registration.binding;
163 FieldTable headerMappings = headers.getMappings();
164 final List<String> mappingList = new ArrayList<String>();
165
166 headerMappings.processOverElements(new FieldTable.FieldTableElementProcessor()
167 {
168
169 public boolean processElement(String propertyName, AMQTypedValue value)
170 {
171 mappingList.add(propertyName + "=" + value.getValue());
172 return true;
173 }
174
175 public Object getResult()
176 {
177 return mappingList;
178 }
179 });
180
181
182 Object[] bindingItemValues = {count++, queueName, mappingList.toArray(new String[0])};
183 CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
184 _bindingList.put(bindingData);
185 }
186
187 return _bindingList;
188 }
189
190 /**
191 * Creates bindings. Binding pattern is as follows-
192 * <attributename>=<value>,<attributename>=<value>,...
193 * @param queueName
194 * @param binding
195 * @throws javax.management.JMException
196 */
197 public void createNewBinding(String queueName, String binding) throws JMException
198 {
199 AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
200
201 if (queue == null)
202 {
203 throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
204 }
205
206 String[] bindings = binding.split(",");
207 FieldTable bindingMap = new FieldTable();
208 for (int i = 0; i < bindings.length; i++)
209 {
210 String[] keyAndValue = bindings[i].split("=");
211 if (keyAndValue == null || keyAndValue.length < 2)
212 {
213 throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" ");
214 }
215 bindingMap.setString(keyAndValue[0], keyAndValue[1]);
216 }
217
218 _bindings.add(new Registration(new HeadersBinding(bindingMap), queue));
219 }
220
221 } // End of MBean class
222
223 public AMQShortString getType()
224 {
225 return ExchangeDefaults.HEADERS_EXCHANGE_CLASS;
226 }
227
228 public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
229 {
230 _logger.debug("Exchange " + getName() + ": Binding " + queue.getName() + " with " + args);
231 _bindings.add(new Registration(new HeadersBinding(args), queue));
232 }
233
234 public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
235 {
236 _logger.debug("Exchange " + getName() + ": Unbinding " + queue.getName());
237 if(!_bindings.remove(new Registration(new HeadersBinding(args), queue)))
238 {
239 throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
240 + " with headers args " + args);
241 }
242 }
243
244 public void route(IncomingMessage payload) throws AMQException
245 {
246 FieldTable headers = getHeaders(payload.getContentHeaderBody());
247 if (_logger.isDebugEnabled())
248 {
249 _logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
250 }
251 boolean routed = false;
252 ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
253 for (Registration e : _bindings)
254 {
255
256 if (e.binding.matches(headers))
257 {
258 if (_logger.isDebugEnabled())
259 {
260 _logger.debug("Exchange " + getName() + ": delivering message with headers " +
261 headers + " to " + e.queue.getName());
262 }
263 queues.add(e.queue);
264
265 routed = true;
266 }
267 }
268 payload.enqueue(queues);
269 }
270
271 public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
272 {
273 //fixme isBound here should take the arguements in to consideration.
274 return isBound(routingKey, queue);
275 }
276
277 public boolean isBound(AMQShortString routingKey, AMQQueue queue)
278 {
279 return isBound(queue);
280 }
281
282 public boolean isBound(AMQShortString routingKey)
283 {
284 return hasBindings();
285 }
286
287 public boolean isBound(AMQQueue queue)
288 {
289 for (Registration r : _bindings)
290 {
291 if (r.queue.equals(queue))
292 {
293 return true;
294 }
295 }
296 return false;
297 }
298
299 public boolean hasBindings()
300 {
301 return !_bindings.isEmpty();
302 }
303
304 protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame)
305 {
306 //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
307 //but these are not yet implemented.
308 return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders();
309 }
310
311 protected ExchangeMBean createMBean() throws AMQException
312 {
313 try
314 {
315 return new HeadersExchangeMBean();
316 }
317 catch (JMException ex)
318 {
319 _logger.error("Exception occured in creating the HeadersExchangeMBean", ex);
320 throw new AMQException("Exception occured in creating the HeadersExchangeMBean", ex);
321 }
322 }
323
324 public Map<AMQShortString, List<AMQQueue>> getBindings()
325 {
326 return null;
327 }
328
329 private static class Registration
330 {
331 private final HeadersBinding binding;
332 private final AMQQueue queue;
333
334 Registration(HeadersBinding binding, AMQQueue queue)
335 {
336 this.binding = binding;
337 this.queue = queue;
338 }
339
340 public int hashCode()
341 {
342 return queue.hashCode();
343 }
344
345 public boolean equals(Object o)
346 {
347 return o instanceof Registration && ((Registration) o).queue.equals(queue);
348 }
349 }
350 }
|