HeadersExchange.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.exchange;
022 
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.protocol.AMQConstant;
026 import org.apache.qpid.exchange.ExchangeDefaults;
027 import org.apache.qpid.framing.AMQShortString;
028 import org.apache.qpid.framing.AMQTypedValue;
029 import org.apache.qpid.framing.BasicContentHeaderProperties;
030 import org.apache.qpid.framing.ContentHeaderBody;
031 import org.apache.qpid.framing.FieldTable;
032 import org.apache.qpid.server.management.MBeanConstructor;
033 import org.apache.qpid.server.management.MBeanDescription;
034 import org.apache.qpid.server.queue.IncomingMessage;
035 import org.apache.qpid.server.queue.AMQQueue;
036 import org.apache.qpid.server.virtualhost.VirtualHost;
037 
038 import javax.management.JMException;
039 import javax.management.openmbean.ArrayType;
040 import javax.management.openmbean.CompositeData;
041 import javax.management.openmbean.CompositeDataSupport;
042 import javax.management.openmbean.CompositeType;
043 import javax.management.openmbean.OpenDataException;
044 import javax.management.openmbean.OpenType;
045 import javax.management.openmbean.SimpleType;
046 import javax.management.openmbean.TabularData;
047 import javax.management.openmbean.TabularDataSupport;
048 import javax.management.openmbean.TabularType;
049 import java.util.ArrayList;
050 import java.util.Iterator;
051 import java.util.List;
052 import java.util.Map;
053 import java.util.Collection;
054 import java.util.concurrent.CopyOnWriteArrayList;
055 
056 /**
057  * An exchange that binds queues based on a set of required headers and header values
058  * and routes messages to these queues by matching the headers of the message against
059  * those with which the queues were bound.
060  <p/>
061  <pre>
062  * The Headers Exchange
063  *
064  *  Routes messages according to the value/presence of fields in the message header table.
065  *  (Basic and JMS content has a content header field called "headers" that is a table of
066  *   message header fields).
067  *
068  *  class = "headers"
069  *  routing key is not used
070  *
071  *  Has the following binding arguments:
072  *
073  *  the X-match field - if "all", does an AND match (used for GRM), if "any", does an OR match.
074  *  other fields prefixed with "X-" are ignored (and generate a console warning message).
075  *  a field with no value or empty value indicates a match on presence only.
076  *  a field with a value indicates match on field presence and specific value.
077  *
078  *  Standard instances:
079  *
080  *  amq.match - pub/sub on field content/value
081  *  </pre>
082  */
083 public class HeadersExchange extends AbstractExchange
084 {
085     private static final Logger _logger = Logger.getLogger(HeadersExchange.class);
086 
087 
088 
089     public static final ExchangeType<HeadersExchange> TYPE = new ExchangeType<HeadersExchange>()
090     {
091 
092         public AMQShortString getName()
093         {
094             return ExchangeDefaults.HEADERS_EXCHANGE_CLASS;
095         }
096 
097         public Class<HeadersExchange> getExchangeClass()
098         {
099             return HeadersExchange.class;
100         }
101 
102         public HeadersExchange newInstance(VirtualHost host, AMQShortString name, boolean durable, int ticket,
103                 boolean autoDeletethrows AMQException
104         {
105             HeadersExchange exch = new HeadersExchange();
106             exch.initialise(host, name, durable, ticket, autoDelete);
107             return exch;
108         }
109 
110         public AMQShortString getDefaultExchangeName()
111         {
112 
113             return ExchangeDefaults.HEADERS_EXCHANGE_NAME;
114         }
115     };
116 
117 
118     private final List<Registration> _bindings = new CopyOnWriteArrayList<Registration>();
119 
120     /**
121      * HeadersExchangeMBean class implements the management interface for the
122      * Header Exchanges.
123      */
124     @MBeanDescription("Management Bean for Headers Exchange")
125     private final class HeadersExchangeMBean extends ExchangeMBean
126     {
127         @MBeanConstructor("Creates an MBean for AMQ Headers exchange")
128         public HeadersExchangeMBean() throws JMException
129         {
130             super();
131             _exchangeType = "headers";
132             init();
133         }
134 
135         /**
136          * initialises the OpenType objects.
137          */
138         protected void init() throws OpenDataException
139         {
140             _bindingItemNames = new String[]{"Binding No""Queue  Name""Queue Bindings"};
141             _bindingItemIndexNames = new String[]{_bindingItemNames[0]};
142 
143             _bindingItemTypes = new OpenType[3];
144             _bindingItemTypes[0= SimpleType.INTEGER;
145             _bindingItemTypes[1= SimpleType.STRING;
146             _bindingItemTypes[2new ArrayType(1, SimpleType.STRING);
147             _bindingDataType = new CompositeType("Exchange Binding""Queue name and header bindings",
148                                                  _bindingItemNames, _bindingItemNames, _bindingItemTypes);
149             _bindinglistDataType = new TabularType("Exchange Bindings""List of exchange bindings for " + getName(),
150                                                    _bindingDataType, _bindingItemIndexNames);
151         }
152 
153         public TabularData bindings() throws OpenDataException
154         {
155             _bindingList = new TabularDataSupport(_bindinglistDataType);
156             int count = 1;
157             for (Iterator<Registration> itr = _bindings.iterator(); itr.hasNext();)
158             {
159                 Registration registration = itr.next();
160                 String queueName = registration.queue.getName().toString();
161 
162                 HeadersBinding headers = registration.binding;
163                 FieldTable headerMappings = headers.getMappings();
164                 final List<String> mappingList = new ArrayList<String>();
165 
166                 headerMappings.processOverElements(new FieldTable.FieldTableElementProcessor()
167                 {
168 
169                     public boolean processElement(String propertyName, AMQTypedValue value)
170                     {
171                         mappingList.add(propertyName + "=" + value.getValue());
172                         return true;
173                     }
174 
175                     public Object getResult()
176                     {
177                         return mappingList;
178                     }
179                 });
180 
181 
182                 Object[] bindingItemValues = {count++, queueName, mappingList.toArray(new String[0])};
183                 CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
184                 _bindingList.put(bindingData);
185             }
186 
187             return _bindingList;
188         }
189 
190         /**
191          * Creates bindings. Binding pattern is as follows-
192          <attributename>=<value>,<attributename>=<value>,...
193          @param queueName
194          @param binding
195          @throws javax.management.JMException
196          */
197         public void createNewBinding(String queueName, String bindingthrows JMException
198         {
199             AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
200 
201             if (queue == null)
202             {
203                 throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
204             }
205 
206             String[] bindings = binding.split(",");
207             FieldTable bindingMap = new FieldTable();
208             for (int i = 0; i < bindings.length; i++)
209             {
210                 String[] keyAndValue = bindings[i].split("=");
211                 if (keyAndValue == null || keyAndValue.length < 2)
212                 {
213                     throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" ");
214                 }
215                 bindingMap.setString(keyAndValue[0], keyAndValue[1]);
216             }
217 
218             _bindings.add(new Registration(new HeadersBinding(bindingMap), queue));
219         }
220 
221     // End of MBean class
222 
223     public AMQShortString getType()
224     {
225         return ExchangeDefaults.HEADERS_EXCHANGE_CLASS;
226     }
227 
228     public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable argsthrows AMQException
229     {
230         _logger.debug("Exchange " + getName() ": Binding " + queue.getName() " with " + args);
231         _bindings.add(new Registration(new HeadersBinding(args), queue));
232     }
233 
234     public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable argsthrows AMQException
235     {
236         _logger.debug("Exchange " + getName() ": Unbinding " + queue.getName());
237         if(!_bindings.remove(new Registration(new HeadersBinding(args), queue)))
238         {
239             throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " this.getName()
240                                    " with headers args " + args);    
241         }
242     }
243 
244     public void route(IncomingMessage payloadthrows AMQException
245     {
246         FieldTable headers = getHeaders(payload.getContentHeaderBody());
247         if (_logger.isDebugEnabled())
248         {
249             _logger.debug("Exchange " + getName() ": routing message with headers " + headers);
250         }
251         boolean routed = false;
252         ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
253         for (Registration e : _bindings)
254         {
255 
256             if (e.binding.matches(headers))
257             {
258                 if (_logger.isDebugEnabled())
259                 {
260                     _logger.debug("Exchange " + getName() ": delivering message with headers " +
261                                   headers + " to " + e.queue.getName());
262                 }
263                 queues.add(e.queue);
264 
265                 routed = true;
266             }
267         }
268         payload.enqueue(queues);
269     }
270 
271     public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
272     {
273         //fixme isBound here should take the arguements in to consideration.
274         return isBound(routingKey, queue);
275     }
276 
277     public boolean isBound(AMQShortString routingKey, AMQQueue queue)
278     {
279         return isBound(queue);
280     }
281 
282     public boolean isBound(AMQShortString routingKey)
283     {
284         return hasBindings();
285     }
286 
287     public boolean isBound(AMQQueue queue)
288     {
289         for (Registration r : _bindings)
290         {
291             if (r.queue.equals(queue))
292             {
293                 return true;
294             }
295         }
296         return false;
297     }
298 
299     public boolean hasBindings()
300     {
301         return !_bindings.isEmpty();
302     }
303 
304     protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame)
305     {
306         //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
307         //but these are not yet implemented.
308         return ((BasicContentHeaderPropertiescontentHeaderFrame.properties).getHeaders();
309     }
310 
311     protected ExchangeMBean createMBean() throws AMQException
312     {
313         try
314         {
315             return new HeadersExchangeMBean();
316         }
317         catch (JMException ex)
318         {
319             _logger.error("Exception occured in creating the HeadersExchangeMBean", ex);
320             throw new AMQException("Exception occured in creating the HeadersExchangeMBean", ex);
321         }
322     }
323 
324     public Map<AMQShortString, List<AMQQueue>> getBindings()
325     {
326         return null;
327     }
328 
329     private static class Registration
330     {
331         private final HeadersBinding binding;
332         private final AMQQueue queue;
333 
334         Registration(HeadersBinding binding, AMQQueue queue)
335         {
336             this.binding = binding;
337             this.queue = queue;
338         }
339 
340         public int hashCode()
341         {
342             return queue.hashCode();
343         }
344 
345         public boolean equals(Object o)
346         {
347             return instanceof Registration && ((Registrationo).queue.equals(queue);
348         }
349     }
350 }