001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.exchange;
022
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.common.AMQPFilterTypes;
026 import org.apache.qpid.protocol.AMQConstant;
027 import org.apache.qpid.exchange.ExchangeDefaults;
028 import org.apache.qpid.framing.AMQShortString;
029 import org.apache.qpid.framing.FieldTable;
030 import org.apache.qpid.framing.AMQShortStringTokenizer;
031 import org.apache.qpid.server.management.MBeanConstructor;
032 import org.apache.qpid.server.management.MBeanDescription;
033 import org.apache.qpid.server.queue.IncomingMessage;
034 import org.apache.qpid.server.queue.AMQQueue;
035 import org.apache.qpid.server.virtualhost.VirtualHost;
036 import org.apache.qpid.server.exchange.topic.TopicParser;
037 import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
038 import org.apache.qpid.server.filter.MessageFilter;
039 import org.apache.qpid.server.filter.JMSSelectorFilter;
040
041 import javax.management.JMException;
042 import javax.management.MBeanException;
043 import javax.management.openmbean.CompositeData;
044 import javax.management.openmbean.CompositeDataSupport;
045 import javax.management.openmbean.OpenDataException;
046 import javax.management.openmbean.TabularData;
047 import javax.management.openmbean.TabularDataSupport;
048 import java.util.*;
049 import java.util.concurrent.ConcurrentHashMap;
050 import java.lang.ref.WeakReference;
051
052 public class TopicExchange extends AbstractExchange
053 {
054
055 public static final ExchangeType<TopicExchange> TYPE = new ExchangeType<TopicExchange>()
056 {
057
058 public AMQShortString getName()
059 {
060 return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
061 }
062
063 public Class<TopicExchange> getExchangeClass()
064 {
065 return TopicExchange.class;
066 }
067
068 public TopicExchange newInstance(VirtualHost host,
069 AMQShortString name,
070 boolean durable,
071 int ticket,
072 boolean autoDelete) throws AMQException
073 {
074 TopicExchange exch = new TopicExchange();
075 exch.initialise(host, name, durable, ticket, autoDelete);
076 return exch;
077 }
078
079 public AMQShortString getDefaultExchangeName()
080 {
081 return ExchangeDefaults.TOPIC_EXCHANGE_NAME;
082 }
083 };
084
085
086 private static final Logger _logger = Logger.getLogger(TopicExchange.class);
087
088 /*
089 private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues =
090 new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
091 private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues =
092 new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
093 private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues =
094 new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
095 */
096 // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
097 private static final byte TOPIC_SEPARATOR = (byte)'.';
098 private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString(".");
099 private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*");
100 private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#");
101
102 private static final byte HASH_BYTE = (byte)'#';
103 private static final byte STAR_BYTE = (byte)'*';
104
105 private final TopicParser _parser = new TopicParser();
106
107 private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
108 new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
109
110 private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
111
112 private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>();
113
114 public static class Binding
115 {
116 private final AMQShortString _bindingKey;
117 private final AMQQueue _queue;
118 private final FieldTable _args;
119
120 public Binding(AMQShortString bindingKey, AMQQueue queue, FieldTable args)
121 {
122 _bindingKey = bindingKey;
123 _queue = queue;
124 _args = args;
125 }
126
127 public AMQShortString getBindingKey()
128 {
129 return _bindingKey;
130 }
131
132 public AMQQueue getQueue()
133 {
134 return _queue;
135 }
136
137 public int hashCode()
138 {
139 return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 +_queue.hashCode();
140 }
141
142 public boolean equals(Object o)
143 {
144 if(this == o)
145 {
146 return true;
147 }
148 if(o instanceof Binding)
149 {
150 Binding other = (Binding) o;
151 return (_queue == other._queue)
152 && ((_bindingKey == null) ? other._bindingKey == null : _bindingKey.equals(other._bindingKey));
153 }
154 return false;
155 }
156 }
157
158
159
160 private final class TopicExchangeResult implements TopicMatcherResult
161 {
162 private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
163 private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>();
164
165 public void addUnfilteredQueue(AMQQueue queue)
166 {
167 Integer instances = _unfilteredQueues.get(queue);
168 if(instances == null)
169 {
170 _unfilteredQueues.put(queue, 1);
171 }
172 else
173 {
174 _unfilteredQueues.put(queue, instances + 1);
175 }
176 }
177
178 public void removeUnfilteredQueue(AMQQueue queue)
179 {
180 Integer instances = _unfilteredQueues.get(queue);
181 if(instances == 1)
182 {
183 _unfilteredQueues.remove(queue);
184 }
185 else
186 {
187 _unfilteredQueues.put(queue,instances - 1);
188 }
189
190 }
191
192
193 public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
194 {
195 Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
196 if(filters == null)
197 {
198 filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>();
199 _filteredQueues.put(queue, filters);
200 }
201 Integer instances = filters.get(filter);
202 if(instances == null)
203 {
204 filters.put(filter,1);
205 }
206 else
207 {
208 filters.put(filter, instances + 1);
209 }
210
211 }
212
213 public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
214 {
215 Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
216 if(filters != null)
217 {
218 Integer instances = filters.get(filter);
219 if(instances != null)
220 {
221 if(instances == 1)
222 {
223 filters.remove(filter);
224 if(filters.isEmpty())
225 {
226 _filteredQueues.remove(queue);
227 }
228 }
229 else
230 {
231 filters.put(filter, instances - 1);
232 }
233 }
234
235 }
236
237 }
238
239 public void replaceQueueFilter(AMQQueue queue,
240 MessageFilter<RuntimeException> oldFilter,
241 MessageFilter<RuntimeException> newFilter)
242 {
243 Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
244 Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters);
245 Integer oldFilterInstances = filters.get(oldFilter);
246 if(oldFilterInstances == 1)
247 {
248 newFilters.remove(oldFilter);
249 }
250 else
251 {
252 newFilters.put(oldFilter, oldFilterInstances-1);
253 }
254 Integer newFilterInstances = filters.get(newFilter);
255 if(newFilterInstances == null)
256 {
257 newFilters.put(newFilter, 1);
258 }
259 else
260 {
261 newFilters.put(newFilter, newFilterInstances+1);
262 }
263 _filteredQueues.put(queue,newFilters);
264 }
265
266 public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues)
267 {
268 if(queues == null)
269 {
270 if(_filteredQueues.isEmpty())
271 {
272 return new ArrayList<AMQQueue>(_unfilteredQueues.keySet());
273 }
274 else
275 {
276 queues = new HashSet<AMQQueue>();
277 }
278 }
279 else if(!(queues instanceof Set))
280 {
281 queues = new HashSet<AMQQueue>(queues);
282 }
283
284 queues.addAll(_unfilteredQueues.keySet());
285 if(!_filteredQueues.isEmpty())
286 {
287 for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet())
288 {
289 if(!queues.contains(entry.getKey()))
290 {
291 for(MessageFilter<RuntimeException> filter : entry.getValue().keySet())
292 {
293 if(filter.matches(msg))
294 {
295 queues.add(entry.getKey());
296 }
297 }
298 }
299 }
300 }
301 return queues;
302 }
303
304 }
305
306
307 /** TopicExchangeMBean class implements the management interface for the Topic exchanges. */
308 @MBeanDescription("Management Bean for Topic Exchange")
309 private final class TopicExchangeMBean extends ExchangeMBean
310 {
311 @MBeanConstructor("Creates an MBean for AMQ topic exchange")
312 public TopicExchangeMBean() throws JMException
313 {
314 super();
315 _exchangeType = "topic";
316 init();
317 }
318
319 /** returns exchange bindings in tabular form */
320 public TabularData bindings() throws OpenDataException
321 {
322 _bindingList = new TabularDataSupport(_bindinglistDataType);
323 Map<String, List<String>> bindingData = new HashMap<String, List<String>>();
324 for (Binding binding : _bindings.keySet())
325 {
326 String key = binding.getBindingKey().toString();
327 List<String> queueNames = bindingData.get(key);
328 if(queueNames == null)
329 {
330 queueNames = new ArrayList<String>();
331 bindingData.put(key, queueNames);
332 }
333 queueNames.add(binding.getQueue().getName().toString());
334
335 }
336 for(Map.Entry<String, List<String>> entry : bindingData.entrySet())
337 {
338 Object[] bindingItemValues = {entry.getKey(), entry.getValue().toArray(new String[entry.getValue().size()]) };
339 CompositeData bindingCompositeData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
340 _bindingList.put(bindingCompositeData);
341 }
342
343 return _bindingList;
344 }
345
346 public void createNewBinding(String queueName, String binding) throws JMException
347 {
348 AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
349 if (queue == null)
350 {
351 throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
352 }
353
354 try
355 {
356 queue.bind(TopicExchange.this, new AMQShortString(binding), null);
357 }
358 catch (AMQException ex)
359 {
360 throw new MBeanException(ex);
361 }
362 }
363
364 } // End of MBean class
365
366 public AMQShortString getType()
367 {
368 return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
369 }
370
371 public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
372 {
373 assert queue != null;
374 assert rKey != null;
375
376 _logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey);
377
378
379 AMQShortString routingKey;
380
381 if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
382 {
383 routingKey = normalize(rKey);
384 }
385 else
386 {
387 routingKey = rKey;
388 }
389
390 Binding binding = new Binding(rKey, queue, args);
391
392 if(_bindings.containsKey(binding))
393 {
394 FieldTable oldArgs = _bindings.get(binding);
395 TopicExchangeResult result = _topicExchangeResults.get(routingKey);
396
397 if(argumentsContainSelector(args))
398 {
399 if(argumentsContainSelector(oldArgs))
400 {
401 result.replaceQueueFilter(queue,createSelectorFilter(oldArgs), createSelectorFilter(args));
402 }
403 else
404 {
405 result.addFilteredQueue(queue,createSelectorFilter(args));
406 result.removeUnfilteredQueue(queue);
407 }
408 }
409 else
410 {
411 if(argumentsContainSelector(oldArgs))
412 {
413 result.addUnfilteredQueue(queue);
414 result.removeFilteredQueue(queue, createSelectorFilter(oldArgs));
415 }
416 else
417 {
418 // TODO - fix control flow
419 return;
420 }
421 }
422
423 }
424 else
425 {
426
427 TopicExchangeResult result = _topicExchangeResults.get(routingKey);
428 if(result == null)
429 {
430 result = new TopicExchangeResult();
431 if(argumentsContainSelector(args))
432 {
433 result.addFilteredQueue(queue, createSelectorFilter(args));
434 }
435 else
436 {
437 result.addUnfilteredQueue(queue);
438 }
439 _parser.addBinding(routingKey, result);
440 _topicExchangeResults.put(routingKey,result);
441 }
442 else
443 {
444 if(argumentsContainSelector(args))
445 {
446 result.addFilteredQueue(queue, createSelectorFilter(args));
447 }
448 else
449 {
450 result.addUnfilteredQueue(queue);
451 }
452 }
453 _bindings.put(binding, args);
454 }
455
456
457 }
458
459 private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args)
460 throws AMQException
461 {
462
463 final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
464 WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString);
465 JMSSelectorFilter selector = null;
466
467 if(selectorRef == null || (selector = selectorRef.get())==null)
468 {
469 selector = new JMSSelectorFilter<RuntimeException>(selectorString);
470 _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector));
471 }
472 return selector;
473 }
474
475 private static boolean argumentsContainSelector(final FieldTable args)
476 {
477 return args != null && args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0;
478 }
479
480 private AMQShortString normalize(AMQShortString routingKey)
481 {
482 if(routingKey == null)
483 {
484 routingKey = AMQShortString.EMPTY_STRING;
485 }
486
487 AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
488
489 List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>();
490
491 while (routingTokens.hasMoreTokens())
492 {
493 subscriptionList.add(routingTokens.nextToken());
494 }
495
496 int size = subscriptionList.size();
497
498 for (int index = 0; index < size; index++)
499 {
500 // if there are more levels
501 if ((index + 1) < size)
502 {
503 if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN))
504 {
505 if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN))
506 {
507 // we don't need #.# delete this one
508 subscriptionList.remove(index);
509 size--;
510 // redo this normalisation
511 index--;
512 }
513
514 if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN))
515 {
516 // we don't want #.* swap to *.#
517 // remove it and put it in at index + 1
518 subscriptionList.add(index + 1, subscriptionList.remove(index));
519 }
520 }
521 } // if we have more levels
522 }
523
524
525
526 AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING);
527
528 return normalizedString;
529 }
530
531 public void route(IncomingMessage payload) throws AMQException
532 {
533
534 final AMQShortString routingKey = payload.getRoutingKey();
535
536 // The copy here is unfortunate, but not too bad relevant to the amount of
537 // things created and copied in getMatchedQueues
538 ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
539 queues.addAll(getMatchedQueues(payload, routingKey));
540
541 if(queues == null || queues.isEmpty())
542 {
543 _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
544 }
545
546 payload.enqueue(queues);
547
548 }
549
550 public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
551 {
552 Binding binding = new Binding(routingKey, queue, arguments);
553 if (arguments == null)
554 {
555 return _bindings.containsKey(binding);
556 }
557 else
558 {
559 FieldTable o = _bindings.get(binding);
560 if (o != null)
561 {
562 return o.equals(arguments);
563 }
564 else
565 {
566 return false;
567 }
568
569 }
570 }
571
572 public boolean isBound(AMQShortString routingKey, AMQQueue queue)
573 {
574 return isBound(routingKey, null, queue);
575 }
576
577 public boolean isBound(AMQShortString routingKey)
578 {
579 for(Binding b : _bindings.keySet())
580 {
581 if(b.getBindingKey().equals(routingKey))
582 {
583 return true;
584 }
585 }
586
587 return false;
588 }
589
590 public boolean isBound(AMQQueue queue)
591 {
592 for(Binding b : _bindings.keySet())
593 {
594 if(b.getQueue().equals(queue))
595 {
596 return true;
597 }
598 }
599
600 return false;
601 }
602
603 public boolean hasBindings()
604 {
605 return !_bindings.isEmpty();
606 }
607
608 public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
609 {
610 assert queue != null;
611 assert rKey != null;
612
613 Binding binding = new Binding(rKey, queue, args);
614
615
616 if (!_bindings.containsKey(binding))
617 {
618 throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue.getName() + " was not registered with exchange " + this.getName()
619 + " with routing key " + rKey + ".");
620 }
621
622 FieldTable bindingArgs = _bindings.remove(binding);
623 AMQShortString bindingKey = normalize(rKey);
624 TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
625 if(argumentsContainSelector(bindingArgs))
626 {
627 result.removeFilteredQueue(queue, createSelectorFilter(bindingArgs));
628 }
629 else
630 {
631 result.removeUnfilteredQueue(queue);
632 }
633
634 }
635
636 protected ExchangeMBean createMBean() throws AMQException
637 {
638 try
639 {
640 return new TopicExchangeMBean();
641 }
642 catch (JMException ex)
643 {
644 _logger.error("Exception occured in creating the topic exchenge mbean", ex);
645 throw new AMQException("Exception occured in creating the topic exchenge mbean", ex);
646 }
647 }
648
649 private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey)
650 {
651
652 Collection<TopicMatcherResult> results = _parser.parse(routingKey);
653 if(results.isEmpty())
654 {
655 return Collections.EMPTY_SET;
656 }
657 else
658 {
659 Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>();
660 for(TopicMatcherResult result : results)
661 {
662
663 queues = ((TopicExchangeResult)result).processMessage(message, queues);
664 }
665 return queues;
666 }
667
668
669 }
670 }
|