001 package org.apache.qpid.server.exchange.topic;
002
003 import org.apache.qpid.framing.AMQShortString;
004 import org.apache.qpid.framing.AMQShortStringTokenizer;
005
006 import java.util.*;
007 import java.util.concurrent.atomic.AtomicReference;
008 import java.io.IOException;
009
010 /*
011 *
012 * Licensed to the Apache Software Foundation (ASF) under one
013 * or more contributor license agreements. See the NOTICE file
014 * distributed with this work for additional information
015 * regarding copyright ownership. The ASF licenses this file
016 * to you under the Apache License, Version 2.0 (the
017 * "License"); you may not use this file except in compliance
018 * with the License. You may obtain a copy of the License at
019 *
020 * http://www.apache.org/licenses/LICENSE-2.0
021 *
022 * Unless required by applicable law or agreed to in writing,
023 * software distributed under the License is distributed on an
024 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
025 * KIND, either express or implied. See the License for the
026 * specific language governing permissions and limitations
027 * under the License.
028 *
029 */
030 public class TopicParser
031 {
032 private static final byte TOPIC_DELIMITER = (byte)'.';
033
034 private final TopicWordDictionary _dictionary = new TopicWordDictionary();
035 private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<TopicMatcherDFAState>();
036
037 private static class Position
038 {
039 private final TopicWord _word;
040 private final boolean _selfTransition;
041 private final int _position;
042 private final boolean _endState;
043 private boolean _followedByAnyLoop;
044
045
046 public Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState)
047 {
048 _position = position;
049 _word = word;
050 _selfTransition = selfTransition;
051 _endState = endState;
052 }
053
054
055 }
056
057 private static final Position ERROR_POSITION = new Position(Integer.MAX_VALUE,null, true, false);
058
059 private static class SimpleState
060 {
061 Set<Position> _positions;
062 Map<TopicWord, SimpleState> _nextState;
063 }
064
065
066 public void addBinding(AMQShortString bindingKey, TopicMatcherResult result)
067 {
068
069 TopicMatcherDFAState startingStateMachine;
070 TopicMatcherDFAState newStateMachine;
071
072 do
073 {
074 startingStateMachine = _stateMachine.get();
075 if(startingStateMachine == null)
076 {
077 newStateMachine = createStateMachine(bindingKey, result);
078 }
079 else
080 {
081 newStateMachine = startingStateMachine.mergeStateMachines(createStateMachine(bindingKey, result));
082 }
083
084 }
085 while(!_stateMachine.compareAndSet(startingStateMachine,newStateMachine));
086
087 }
088
089 public Collection<TopicMatcherResult> parse(AMQShortString routingKey)
090 {
091 TopicMatcherDFAState stateMachine = _stateMachine.get();
092 if(stateMachine == null)
093 {
094 return Collections.EMPTY_SET;
095 }
096 else
097 {
098 return stateMachine.parse(_dictionary,routingKey);
099 }
100 }
101
102
103 TopicMatcherDFAState createStateMachine(AMQShortString bindingKey, TopicMatcherResult result)
104 {
105 List<TopicWord> wordList = createTopicWordList(bindingKey);
106 int wildCards = 0;
107 for(TopicWord word : wordList)
108 {
109 if(word == TopicWord.WILDCARD_WORD)
110 {
111 wildCards++;
112 }
113 }
114 if(wildCards == 0)
115 {
116 TopicMatcherDFAState[] states = new TopicMatcherDFAState[wordList.size()+1];
117 states[states.length-1] = new TopicMatcherDFAState(Collections.EMPTY_MAP, Collections.singleton(result));
118 for(int i = states.length-2; i >= 0; i--)
119 {
120 states[i] = new TopicMatcherDFAState(Collections.singletonMap(wordList.get(i),states[i+1]),Collections.EMPTY_SET);
121
122 }
123 return states[0];
124 }
125 else if(wildCards == wordList.size())
126 {
127 Map<TopicWord,TopicMatcherDFAState> stateMap = new HashMap<TopicWord,TopicMatcherDFAState>();
128 TopicMatcherDFAState state = new TopicMatcherDFAState(stateMap, Collections.singleton(result));
129 stateMap.put(TopicWord.ANY_WORD, state);
130 return state;
131 }
132
133
134 int positionCount = wordList.size() - wildCards;
135
136 Position[] positions = new Position[positionCount+1];
137
138 int lastWord;
139
140 if(wordList.get(wordList.size()-1)== TopicWord.WILDCARD_WORD)
141 {
142 lastWord = wordList.size()-1;
143 positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, true, true);
144 }
145 else
146 {
147 lastWord = wordList.size();
148 positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, false, true);
149 }
150
151
152 int pos = 0;
153 int wordPos = 0;
154
155
156 while(wordPos < lastWord)
157 {
158 TopicWord word = wordList.get(wordPos++);
159
160 if(word == TopicWord.WILDCARD_WORD)
161 {
162 int nextWordPos = wordPos++;
163 word = wordList.get(nextWordPos);
164
165 positions[pos] = new Position(pos++,word,true,false);
166 }
167 else
168 {
169 positions[pos] = new Position(pos++,word,false,false);
170 }
171
172 }
173
174
175 for(int p = 0; p<positionCount; p++)
176 {
177 boolean followedByWildcards = true;
178
179 int n = p;
180 while(followedByWildcards && n<(positionCount+1))
181 {
182
183 if(positions[n]._selfTransition)
184 {
185 break;
186 }
187 else if(positions[n]._word!=TopicWord.ANY_WORD)
188 {
189 followedByWildcards = false;
190 }
191 n++;
192 }
193
194
195 positions[p]._followedByAnyLoop = followedByWildcards && (n!= positionCount+1);
196 }
197
198
199 // from each position you transition to a set of other positions.
200 // we approach this by examining steps of increasing length - so we
201 // look how far we can go from the start position in 1 word, 2 words, etc...
202
203 Map<Set<Position>,SimpleState> stateMap = new HashMap<Set<Position>,SimpleState>();
204
205
206 SimpleState state = new SimpleState();
207 state._positions = Collections.singleton( positions[0] );
208 stateMap.put(state._positions, state);
209
210 calculateNextStates(state, stateMap, positions);
211
212 SimpleState[] simpleStates = stateMap.values().toArray(new SimpleState[stateMap.size()]);
213 HashMap<TopicWord, TopicMatcherDFAState>[] dfaStateMaps = new HashMap[simpleStates.length];
214 Map<SimpleState, TopicMatcherDFAState> simple2DFAMap = new HashMap<SimpleState, TopicMatcherDFAState>();
215
216 for(int i = 0; i < simpleStates.length; i++)
217 {
218
219 Collection<TopicMatcherResult> results;
220 boolean endState = false;
221
222 for(Position p : simpleStates[i]._positions)
223 {
224 if(p._endState)
225 {
226 endState = true;
227 break;
228 }
229 }
230
231 if(endState)
232 {
233 results = Collections.singleton(result);
234 }
235 else
236 {
237 results = Collections.EMPTY_SET;
238 }
239
240 dfaStateMaps[i] = new HashMap<TopicWord, TopicMatcherDFAState>();
241 simple2DFAMap.put(simpleStates[i], new TopicMatcherDFAState(dfaStateMaps[i],results));
242
243 }
244 for(int i = 0; i < simpleStates.length; i++)
245 {
246 SimpleState simpleState = simpleStates[i];
247
248 Map<TopicWord, SimpleState> nextSimpleStateMap = simpleState._nextState;
249 for(Map.Entry<TopicWord, SimpleState> stateMapEntry : nextSimpleStateMap.entrySet())
250 {
251 dfaStateMaps[i].put(stateMapEntry.getKey(), simple2DFAMap.get(stateMapEntry.getValue()));
252 }
253
254 }
255
256 return simple2DFAMap.get(state);
257
258 }
259
260
261
262 private void calculateNextStates(final SimpleState state,
263 final Map<Set<Position>, SimpleState> stateMap,
264 final Position[] positions)
265 {
266 Map<TopicWord, Set<Position>> transitions = new HashMap<TopicWord,Set<Position>>();
267
268 for(Position pos : state._positions)
269 {
270 if(pos._selfTransition)
271 {
272 Set<Position> dest = transitions.get(TopicWord.ANY_WORD);
273 if(dest == null)
274 {
275 dest = new HashSet<Position>();
276 transitions.put(TopicWord.ANY_WORD,dest);
277 }
278 dest.add(pos);
279 }
280
281 final int nextPos = pos._position + 1;
282 Position nextPosition = nextPos == positions.length ? ERROR_POSITION : positions[nextPos];
283
284 Set<Position> dest = transitions.get(pos._word);
285 if(dest == null)
286 {
287 dest = new HashSet<Position>();
288 transitions.put(pos._word,dest);
289 }
290 dest.add(nextPosition);
291
292 }
293
294 Set<Position> anyWordTransitions = transitions.get(TopicWord.ANY_WORD);
295 if(anyWordTransitions != null)
296 {
297 for(Set<Position> dest : transitions.values())
298 {
299 dest.addAll(anyWordTransitions);
300 }
301 }
302
303 state._nextState = new HashMap<TopicWord, SimpleState>();
304
305 for(Map.Entry<TopicWord,Set<Position>> dest : transitions.entrySet())
306 {
307
308 if(dest.getValue().size()>1)
309 {
310 dest.getValue().remove(ERROR_POSITION);
311 }
312 Position loopingTerminal = null;
313 for(Position destPos : dest.getValue())
314 {
315 if(destPos._selfTransition && destPos._endState)
316 {
317 loopingTerminal = destPos;
318 break;
319 }
320 }
321
322 if(loopingTerminal!=null)
323 {
324 dest.setValue(Collections.singleton(loopingTerminal));
325 }
326 else
327 {
328 Position anyLoop = null;
329 for(Position destPos : dest.getValue())
330 {
331 if(destPos._followedByAnyLoop)
332 {
333 if(anyLoop == null || anyLoop._position<destPos._position)
334 {
335 anyLoop = destPos;
336 }
337 }
338 }
339 if(anyLoop != null)
340 {
341 Collection<Position> removals = new ArrayList<Position>();
342 for(Position destPos : dest.getValue())
343 {
344 if(destPos._position < anyLoop._position)
345 {
346 removals.add(destPos);
347 }
348 }
349 dest.getValue().removeAll(removals);
350 }
351 }
352
353 SimpleState stateForEntry = stateMap.get(dest.getValue());
354 if(stateForEntry == null)
355 {
356 stateForEntry = new SimpleState();
357 stateForEntry._positions = dest.getValue();
358 stateMap.put(dest.getValue(),stateForEntry);
359 calculateNextStates(stateForEntry,
360 stateMap,
361 positions);
362 }
363 state._nextState.put(dest.getKey(),stateForEntry);
364
365
366
367 }
368
369 // remove redundant transitions
370 SimpleState anyWordState = state._nextState.get(TopicWord.ANY_WORD);
371 if(anyWordState != null)
372 {
373 List<TopicWord> removeList = new ArrayList<TopicWord>();
374 for(Map.Entry<TopicWord,SimpleState> entry : state._nextState.entrySet())
375 {
376 if(entry.getValue() == anyWordState && entry.getKey() != TopicWord.ANY_WORD)
377 {
378 removeList.add(entry.getKey());
379 }
380 }
381 for(TopicWord removeKey : removeList)
382 {
383 state._nextState.remove(removeKey);
384 }
385 }
386
387
388 }
389
390 private List<TopicWord> createTopicWordList(final AMQShortString bindingKey)
391 {
392 AMQShortStringTokenizer tokens = bindingKey.tokenize(TOPIC_DELIMITER);
393 TopicWord previousWord = null;
394
395 List<TopicWord> wordList = new ArrayList<TopicWord>();
396
397 while(tokens.hasMoreTokens())
398 {
399 TopicWord nextWord = _dictionary.getOrCreateWord(tokens.nextToken());
400 if(previousWord == TopicWord.WILDCARD_WORD)
401 {
402
403 if(nextWord == TopicWord.WILDCARD_WORD)
404 {
405 // consecutive wildcards can be merged
406 // i.e. subsequent wildcards can be discarded
407 continue;
408 }
409 else if(nextWord == TopicWord.ANY_WORD)
410 {
411 // wildcard and anyword can be reordered to always put anyword first
412 wordList.set(wordList.size()-1,TopicWord.ANY_WORD);
413 nextWord = TopicWord.WILDCARD_WORD;
414 }
415 }
416 wordList.add(nextWord);
417 previousWord = nextWord;
418
419 }
420 return wordList;
421 }
422
423
424 public static void main(String[] args)
425 {
426
427 printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.*.q.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
428 printMatches(new String[]{
429 "#.a.#",
430 "#.b.#",
431 "#.c.#",
432 "#.d.#",
433 "#.e.#",
434 "#.f.#",
435 "#.g.#",
436 "#.h.#",
437 "#.i.#",
438 "#.j.#",
439 "#.k.#",
440 "#.l.#",
441 "#.m.#",
442 "#.n.#",
443 "#.o.#",
444 "#.p.#",
445 "#.q.#"
446
447 }, "a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
448 /*
449 printMatches(new String[]{
450 "#.a.#",
451 "#.b.#",
452 "#.c.#",
453 "#.d.#",
454 "#.e.#",
455 "#.f.#",
456 "#.g.#",
457 "#.h.#",
458 "#.i.#",
459 "#.j.#",
460 "#.k.#",
461 "#.l.#",
462 "#.m.#",
463 "#.n.#",
464 "#.o.#",
465 "#.p.#",
466 "#.q.#",
467 "#.r.#",
468 "#.s.#",
469 "#.t.#",
470 "#.u.#",
471 "#.v.#",
472 "#.w.#",
473 "#.x.#",
474 "#.y.#",
475 "#.z.#"
476
477
478 },"a.b");
479
480 printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
481 printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
482 printMatches("a.#.b.#","a.b.b.b.b.b.b.b.c");
483
484 */
485
486 printMatches("","");
487 printMatches("a","a");
488 printMatches("a","");
489 printMatches("","a");
490 printMatches("a.b","a.b");
491 printMatches("a","a.b");
492 printMatches("a.b","a");
493 printMatches("*","a");
494 printMatches("*.b","a.b");
495 printMatches("*.*","a.b");
496 printMatches("a.*","a.b");
497 printMatches("a.*.#","a.b");
498 printMatches("a.#.b","a.b");
499
500 printMatches("#.b","a");
501 printMatches("#.b","a.b");
502 printMatches("#.a.b","a.b");
503
504
505 printMatches("#","");
506 printMatches("#","a");
507 printMatches("#","a.b");
508 printMatches("#.#","a.b");
509 printMatches("#.*","a.b");
510
511 printMatches("#.a.b","a.b");
512 printMatches("a.b.#","a.b");
513 printMatches("a.#","a.b");
514 printMatches("#.*.#","a.b");
515 printMatches("#.*.b.#","a.b");
516 printMatches("#.a.*.#","a.b");
517 printMatches("#.a.#.b.#","a.b");
518 printMatches("#.*.#.*.#","a.b");
519 printMatches("*.#.*.#","a.b");
520 printMatches("#.*.#.*","a.b");
521
522
523 printMatches(new String[]{"a.#.b.#","a.*.#.b.#"},"a.b.b.b.b.b.b.b.c");
524
525
526 printMatches(new String[]{"a.b", "a.c"},"a.b");
527 printMatches(new String[]{"a.#", "a.c", "#.b"},"a.b");
528 printMatches(new String[]{"a.#", "a.c", "#.b", "#", "*.*"},"a.b");
529
530 printMatches(new String[]{"a.b.c.d.e.#", "a.b.c.d.#", "a.b.c.d.*", "a.b.c.#", "#.e", "a.*.c.d.e","#.c.*.#.*.*"},"a.b.c.d.e");
531 printMatches(new String[]{"a.b.c.d.e.#", "a.b.c.d.#", "a.b.c.d.*", "a.b.c.#", "#.e", "a.*.c.d.e","#.c.*.#.*.*"},"a.b.c.d.f.g");
532
533
534
535
536 }
537
538 private static void printMatches(final String[] bindingKeys, final String routingKey)
539 {
540 TopicMatcherDFAState sm = null;
541 Map<TopicMatcherResult, String> resultMap = new HashMap<TopicMatcherResult, String>();
542
543 TopicParser parser = new TopicParser();
544
545 long start = System.currentTimeMillis();
546 for(int i = 0; i < bindingKeys.length; i++)
547 {
548 System.out.println((System.currentTimeMillis() - start) + ":\t" + bindingKeys[i]);
549 TopicMatcherResult r = new TopicMatcherResult(){};
550 resultMap.put(r, bindingKeys[i]);
551 AMQShortString bindingKeyShortString = new AMQShortString(bindingKeys[i]);
552
553 System.err.println("=====================================================");
554 System.err.println("Adding binding key: " + bindingKeyShortString);
555 System.err.println("-----------------------------------------------------");
556
557
558 if(i==0)
559 {
560 sm = parser.createStateMachine(bindingKeyShortString, r);
561 }
562 else
563 {
564 sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeyShortString, r));
565 }
566 System.err.println(sm.reachableStates());
567 System.err.println("=====================================================");
568 try
569 {
570 System.in.read();
571 }
572 catch (IOException e)
573 {
574 e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
575 }
576 }
577 AMQShortString routingKeyShortString = new AMQShortString(routingKey);
578
579 Collection<TopicMatcherResult> results = sm.parse(parser._dictionary, routingKeyShortString);
580 Collection<String> resultStrings = new ArrayList<String>();
581
582 for(TopicMatcherResult result : results)
583 {
584 resultStrings.add(resultMap.get(result));
585 }
586
587 final ArrayList<String> nonMatches = new ArrayList<String>(Arrays.asList(bindingKeys));
588 nonMatches.removeAll(resultStrings);
589 System.out.println("\""+routingKeyShortString+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches);
590
591
592 }
593
594 private static void printMatches(String bindingKey, String routingKey)
595 {
596 printMatches(new String[] { bindingKey }, routingKey);
597 }
598
599
600 private static boolean matches(String bindingKey, String routingKey)
601 {
602 AMQShortString bindingKeyShortString = new AMQShortString(bindingKey);
603 AMQShortString routingKeyShortString = new AMQShortString(routingKey);
604 TopicParser parser = new TopicParser();
605
606 final TopicMatcherResult result = new TopicMatcherResult(){};
607
608 TopicMatcherDFAState sm = parser.createStateMachine(bindingKeyShortString, result);
609 return !sm.parse(parser._dictionary,routingKeyShortString).isEmpty();
610
611 }
612
613 }
|