TopicExchange.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.exchange;
022 
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.common.AMQPFilterTypes;
026 import org.apache.qpid.protocol.AMQConstant;
027 import org.apache.qpid.exchange.ExchangeDefaults;
028 import org.apache.qpid.framing.AMQShortString;
029 import org.apache.qpid.framing.FieldTable;
030 import org.apache.qpid.framing.AMQShortStringTokenizer;
031 import org.apache.qpid.server.management.MBeanConstructor;
032 import org.apache.qpid.server.management.MBeanDescription;
033 import org.apache.qpid.server.queue.IncomingMessage;
034 import org.apache.qpid.server.queue.AMQQueue;
035 import org.apache.qpid.server.virtualhost.VirtualHost;
036 import org.apache.qpid.server.exchange.topic.TopicParser;
037 import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
038 import org.apache.qpid.server.filter.MessageFilter;
039 import org.apache.qpid.server.filter.JMSSelectorFilter;
040 
041 import javax.management.JMException;
042 import javax.management.MBeanException;
043 import javax.management.openmbean.CompositeData;
044 import javax.management.openmbean.CompositeDataSupport;
045 import javax.management.openmbean.OpenDataException;
046 import javax.management.openmbean.TabularData;
047 import javax.management.openmbean.TabularDataSupport;
048 import java.util.*;
049 import java.util.concurrent.ConcurrentHashMap;
050 import java.lang.ref.WeakReference;
051 
052 public class TopicExchange extends AbstractExchange
053 {
054 
055     public static final ExchangeType<TopicExchange> TYPE = new ExchangeType<TopicExchange>()
056     {
057 
058         public AMQShortString getName()
059         {
060             return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
061         }
062 
063         public Class<TopicExchange> getExchangeClass()
064         {
065             return TopicExchange.class;
066         }
067 
068         public TopicExchange newInstance(VirtualHost host,
069                                             AMQShortString name,
070                                             boolean durable,
071                                             int ticket,
072                                             boolean autoDeletethrows AMQException
073         {
074             TopicExchange exch = new TopicExchange();
075             exch.initialise(host, name, durable, ticket, autoDelete);
076             return exch;
077         }
078 
079         public AMQShortString getDefaultExchangeName()
080         {
081             return ExchangeDefaults.TOPIC_EXCHANGE_NAME;
082         }
083     };
084 
085 
086     private static final Logger _logger = Logger.getLogger(TopicExchange.class);
087 
088 /*
089     private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues =
090             new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
091     private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues =
092             new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
093     private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues =
094             new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
095 */
096     // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
097     private static final byte TOPIC_SEPARATOR = (byte)'.';
098     private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString(".");
099     private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*");
100     private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#");
101 
102     private static final byte HASH_BYTE = (byte)'#';
103     private static final byte STAR_BYTE = (byte)'*';
104 
105     private final TopicParser _parser = new TopicParser();
106 
107     private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
108             new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
109 
110     private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
111 
112     private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>();
113 
114     public static class Binding
115     {
116         private final AMQShortString _bindingKey;
117         private final AMQQueue _queue;
118         private final FieldTable _args;
119 
120         public Binding(AMQShortString bindingKey, AMQQueue queue, FieldTable args)
121         {
122             _bindingKey = bindingKey;
123             _queue = queue;
124             _args = args;
125         }
126 
127         public AMQShortString getBindingKey()
128         {
129             return _bindingKey;
130         }
131 
132         public AMQQueue getQueue()
133         {
134             return _queue;
135         }
136 
137         public int hashCode()
138         {
139             return (_bindingKey == null : _bindingKey.hashCode())*31 +_queue.hashCode();
140         }
141 
142         public boolean equals(Object o)
143         {
144             if(this == o)
145             {
146                 return true;
147             }
148             if(instanceof Binding)
149             {
150                 Binding other = (Bindingo;
151                 return (_queue == other._queue)
152                         && ((_bindingKey == null? other._bindingKey == null : _bindingKey.equals(other._bindingKey));
153             }
154             return false;
155         }
156     }
157 
158 
159 
160     private final class TopicExchangeResult implements TopicMatcherResult
161     {
162         private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
163         private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>();
164 
165         public void addUnfilteredQueue(AMQQueue queue)
166         {
167             Integer instances = _unfilteredQueues.get(queue);
168             if(instances == null)
169             {
170                 _unfilteredQueues.put(queue, 1);
171             }
172             else
173             {
174                 _unfilteredQueues.put(queue, instances + 1);
175             }
176         }
177 
178         public void removeUnfilteredQueue(AMQQueue queue)
179         {
180             Integer instances = _unfilteredQueues.get(queue);
181             if(instances == 1)
182             {
183                 _unfilteredQueues.remove(queue);
184             }
185             else
186             {
187                 _unfilteredQueues.put(queue,instances - 1);
188             }
189 
190         }
191 
192 
193         public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
194         {
195             Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
196             if(filters == null)
197             {
198                 filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>();
199                 _filteredQueues.put(queue, filters);
200             }
201             Integer instances = filters.get(filter);
202             if(instances == null)
203             {
204                 filters.put(filter,1);
205             }
206             else
207             {
208                 filters.put(filter, instances + 1);
209             }
210 
211         }
212 
213         public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
214         {
215             Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
216             if(filters != null)
217             {
218                 Integer instances = filters.get(filter);
219                 if(instances != null)
220                 {
221                     if(instances == 1)
222                     {
223                         filters.remove(filter);
224                         if(filters.isEmpty())
225                         {
226                             _filteredQueues.remove(queue);
227                         }
228                     }
229                     else 
230                     {
231                         filters.put(filter, instances - 1);
232                     }
233                 }
234 
235             }
236 
237         }
238 
239         public void replaceQueueFilter(AMQQueue queue,
240                                        MessageFilter<RuntimeException> oldFilter,
241                                        MessageFilter<RuntimeException> newFilter)
242         {
243             Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
244             Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters);
245             Integer oldFilterInstances = filters.get(oldFilter);
246             if(oldFilterInstances == 1)
247             {
248                 newFilters.remove(oldFilter);
249             }
250             else
251             {
252                 newFilters.put(oldFilter, oldFilterInstances-1);
253             }
254             Integer newFilterInstances = filters.get(newFilter);
255             if(newFilterInstances == null)
256             {
257                 newFilters.put(newFilter, 1);
258             }
259             else
260             {
261                 newFilters.put(newFilter, newFilterInstances+1);
262             }
263             _filteredQueues.put(queue,newFilters);
264         }
265 
266         public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues)
267         {
268             if(queues == null)
269             {
270                 if(_filteredQueues.isEmpty())
271                 {
272                     return new ArrayList<AMQQueue>(_unfilteredQueues.keySet());
273                 }
274                 else
275                 {
276                     queues = new HashSet<AMQQueue>();
277                 }
278             }
279             else if(!(queues instanceof Set))
280             {
281                 queues = new HashSet<AMQQueue>(queues);
282             }
283 
284             queues.addAll(_unfilteredQueues.keySet());
285             if(!_filteredQueues.isEmpty())
286             {
287                 for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet())
288                 {
289                     if(!queues.contains(entry.getKey()))
290                     {
291                         for(MessageFilter<RuntimeException> filter : entry.getValue().keySet())
292                         {
293                             if(filter.matches(msg))
294                             {
295                                 queues.add(entry.getKey());
296                             }
297                         }
298                     }
299                 }
300             }
301             return queues;
302         }
303 
304     }
305 
306 
307     /** TopicExchangeMBean class implements the management interface for the Topic exchanges. */
308     @MBeanDescription("Management Bean for Topic Exchange")
309     private final class TopicExchangeMBean extends ExchangeMBean
310     {
311         @MBeanConstructor("Creates an MBean for AMQ topic exchange")
312         public TopicExchangeMBean() throws JMException
313         {
314             super();
315             _exchangeType = "topic";
316             init();
317         }
318 
319         /** returns exchange bindings in tabular form */
320         public TabularData bindings() throws OpenDataException
321         {
322             _bindingList = new TabularDataSupport(_bindinglistDataType);
323             Map<String, List<String>> bindingData = new HashMap<String, List<String>>();
324             for (Binding binding : _bindings.keySet())
325             {
326                 String key = binding.getBindingKey().toString();
327                 List<String> queueNames = bindingData.get(key);
328                 if(queueNames == null)
329                 {
330                     queueNames = new ArrayList<String>();
331                     bindingData.put(key, queueNames);
332                 }
333                 queueNames.add(binding.getQueue().getName().toString());
334 
335             }
336             for(Map.Entry<String, List<String>> entry : bindingData.entrySet())
337             {
338                 Object[] bindingItemValues = {entry.getKey(), entry.getValue().toArray(new String[entry.getValue().size()]) };
339                 CompositeData bindingCompositeData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
340                 _bindingList.put(bindingCompositeData);
341             }
342 
343             return _bindingList;
344         }
345 
346         public void createNewBinding(String queueName, String bindingthrows JMException
347         {
348             AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
349             if (queue == null)
350             {
351                 throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
352             }
353 
354             try
355             {
356                 queue.bind(TopicExchange.this, new AMQShortString(binding)null);
357             }
358             catch (AMQException ex)
359             {
360                 throw new MBeanException(ex);
361             }
362         }
363 
364     // End of MBean class
365 
366     public AMQShortString getType()
367     {
368         return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
369     }
370 
371     public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable argsthrows AMQException
372     {
373         assert queue != null;
374         assert rKey != null;
375 
376         _logger.debug("Registering queue " + queue.getName() " with routing key " + rKey);
377 
378 
379         AMQShortString routingKey;
380 
381         if(rKey.contains(HASH_BYTE|| rKey.contains(STAR_BYTE))
382         {
383             routingKey = normalize(rKey);
384         }
385         else
386         {
387             routingKey = rKey;
388         }
389 
390         Binding binding = new Binding(rKey, queue, args);
391 
392         if(_bindings.containsKey(binding))
393         {
394             FieldTable oldArgs = _bindings.get(binding);
395             TopicExchangeResult result = _topicExchangeResults.get(routingKey);
396 
397             if(argumentsContainSelector(args))
398             {
399                 if(argumentsContainSelector(oldArgs))
400                 {
401                     result.replaceQueueFilter(queue,createSelectorFilter(oldArgs), createSelectorFilter(args));
402                 }
403                 else
404                 {
405                     result.addFilteredQueue(queue,createSelectorFilter(args));
406                     result.removeUnfilteredQueue(queue);
407                 }
408             }
409             else
410             {
411                 if(argumentsContainSelector(oldArgs))
412                 {
413                     result.addUnfilteredQueue(queue);
414                     result.removeFilteredQueue(queue, createSelectorFilter(oldArgs));
415                 }
416                 else
417                 {
418                     // TODO - fix control flow
419                     return;
420                 }
421             }
422 
423         }
424         else
425         {
426 
427             TopicExchangeResult result = _topicExchangeResults.get(routingKey);
428             if(result == null)
429             {
430                 result = new TopicExchangeResult();
431                 if(argumentsContainSelector(args))
432                 {
433                     result.addFilteredQueue(queue, createSelectorFilter(args));
434                 }
435                 else
436                 {
437                     result.addUnfilteredQueue(queue);
438                 }
439                 _parser.addBinding(routingKey, result);    
440                 _topicExchangeResults.put(routingKey,result);
441             }
442             else                        
443             {
444                 if(argumentsContainSelector(args))
445                 {
446                     result.addFilteredQueue(queue, createSelectorFilter(args));
447                 }
448                 else
449                 {
450                     result.addUnfilteredQueue(queue);
451                 }
452             }
453             _bindings.put(binding, args);
454         }
455 
456 
457     }
458 
459     private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args)
460             throws AMQException
461     {
462 
463         final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
464         WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString);
465         JMSSelectorFilter selector = null;
466 
467         if(selectorRef == null || (selector = selectorRef.get())==null)
468         {
469             selector = new JMSSelectorFilter<RuntimeException>(selectorString);
470             _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector));
471         }
472         return selector;
473     }
474 
475     private static boolean argumentsContainSelector(final FieldTable args)
476     {
477         return args != null && args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0;
478     }
479 
480     private AMQShortString normalize(AMQShortString routingKey)
481     {
482         if(routingKey == null)
483         {
484             routingKey = AMQShortString.EMPTY_STRING;
485         }
486         
487         AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
488 
489         List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>();
490 
491         while (routingTokens.hasMoreTokens())
492         {
493             subscriptionList.add(routingTokens.nextToken());
494         }
495 
496         int size = subscriptionList.size();
497 
498         for (int index = 0; index < size; index++)
499         {
500             // if there are more levels
501             if ((index + 1< size)
502             {
503                 if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN))
504                 {
505                     if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN))
506                     {
507                         // we don't need #.# delete this one
508                         subscriptionList.remove(index);
509                         size--;
510                         // redo this normalisation
511                         index--;
512                     }
513 
514                     if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN))
515                     {
516                         // we don't want #.* swap to *.#
517                         // remove it and put it in at index + 1
518                         subscriptionList.add(index + 1, subscriptionList.remove(index));
519                     }
520                 }
521             // if we have more levels
522         }
523 
524 
525 
526         AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING);
527 
528         return normalizedString;
529     }
530 
531     public void route(IncomingMessage payloadthrows AMQException
532     {
533 
534         final AMQShortString routingKey = payload.getRoutingKey();
535 
536         // The copy here is unfortunate, but not too bad relevant to the amount of
537         // things created and copied in getMatchedQueues
538         ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
539         queues.addAll(getMatchedQueues(payload, routingKey));
540 
541         if(queues == null || queues.isEmpty())
542         {
543             _logger.info("Message routing key: " + payload.getRoutingKey() " No routes.");
544         }
545 
546         payload.enqueue(queues);
547 
548     }
549 
550     public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
551     {
552         Binding binding = new Binding(routingKey, queue, arguments);
553         if (arguments == null)
554         {
555             return _bindings.containsKey(binding);
556         }
557         else
558         {
559             FieldTable o = _bindings.get(binding);
560             if (o != null)
561             {
562                 return o.equals(arguments);
563             }
564             else
565             {
566                 return false;
567             }
568             
569         }
570     }
571 
572     public boolean isBound(AMQShortString routingKey, AMQQueue queue)
573     {
574         return isBound(routingKey, null, queue);
575     }
576 
577     public boolean isBound(AMQShortString routingKey)
578     {
579         for(Binding b : _bindings.keySet())
580         {
581             if(b.getBindingKey().equals(routingKey))
582             {
583                 return true;
584             }
585         }
586 
587         return false;
588     }
589 
590     public boolean isBound(AMQQueue queue)
591     {
592         for(Binding b : _bindings.keySet())
593         {
594             if(b.getQueue().equals(queue))
595             {
596                 return true;
597             }
598         }
599 
600         return false;
601     }
602 
603     public boolean hasBindings()
604     {
605         return !_bindings.isEmpty();
606     }
607 
608     public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable argsthrows AMQException
609     {
610         assert queue != null;
611         assert rKey != null;
612 
613         Binding binding = new Binding(rKey, queue, args);
614 
615 
616         if (!_bindings.containsKey(binding))
617         {
618             throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue.getName() " was not registered with exchange " this.getName()
619                                    " with routing key " + rKey + ".");
620         }
621 
622         FieldTable bindingArgs = _bindings.remove(binding);
623         AMQShortString bindingKey = normalize(rKey);
624         TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
625         if(argumentsContainSelector(bindingArgs))
626         {
627             result.removeFilteredQueue(queue, createSelectorFilter(bindingArgs));
628         }
629         else
630         {
631             result.removeUnfilteredQueue(queue);
632         }
633 
634     }
635 
636     protected ExchangeMBean createMBean() throws AMQException
637     {
638         try
639         {
640             return new TopicExchangeMBean();
641         }
642         catch (JMException ex)
643         {
644             _logger.error("Exception occured in creating the topic exchenge mbean", ex);
645             throw new AMQException("Exception occured in creating the topic exchenge mbean", ex);
646         }
647     }
648 
649     private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey)
650     {
651 
652         Collection<TopicMatcherResult> results = _parser.parse(routingKey);
653         if(results.isEmpty())
654         {
655             return Collections.EMPTY_SET;
656         }
657         else
658         {
659             Collection<AMQQueue> queues = results.size() == null new HashSet<AMQQueue>();
660             for(TopicMatcherResult result : results)
661             {
662 
663                 queues = ((TopicExchangeResult)result).processMessage(message, queues);
664             }
665             return queues;
666         }
667 
668 
669     }
670 }