TopicParser.java
001 package org.apache.qpid.server.exchange.topic;
002 
003 import org.apache.qpid.framing.AMQShortString;
004 import org.apache.qpid.framing.AMQShortStringTokenizer;
005 
006 import java.util.*;
007 import java.util.concurrent.atomic.AtomicReference;
008 import java.io.IOException;
009 
010 /*
011 *
012 * Licensed to the Apache Software Foundation (ASF) under one
013 * or more contributor license agreements.  See the NOTICE file
014 * distributed with this work for additional information
015 * regarding copyright ownership.  The ASF licenses this file
016 * to you under the Apache License, Version 2.0 (the
017 * "License"); you may not use this file except in compliance
018 * with the License.  You may obtain a copy of the License at
019 *
020 *   http://www.apache.org/licenses/LICENSE-2.0
021 *
022 * Unless required by applicable law or agreed to in writing,
023 * software distributed under the License is distributed on an
024 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
025 * KIND, either express or implied.  See the License for the
026 * specific language governing permissions and limitations
027 * under the License.
028 *
029 */
030 public class TopicParser
031 {
032     private static final byte TOPIC_DELIMITER = (byte)'.';
033 
034     private final TopicWordDictionary _dictionary = new TopicWordDictionary();
035     private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<TopicMatcherDFAState>();
036 
037     private static class Position
038     {
039         private final TopicWord _word;
040         private final boolean _selfTransition;
041         private final int _position;
042         private final boolean _endState;
043         private boolean _followedByAnyLoop;
044 
045 
046         public Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState)
047         {
048             _position = position;
049             _word = word;
050             _selfTransition = selfTransition;
051             _endState = endState;
052         }
053 
054 
055     }
056 
057     private static final Position ERROR_POSITION = new Position(Integer.MAX_VALUE,null, true, false);
058 
059     private static class SimpleState
060     {
061         Set<Position> _positions;
062         Map<TopicWord, SimpleState> _nextState;
063     }
064 
065 
066     public void addBinding(AMQShortString bindingKey, TopicMatcherResult result)
067     {
068 
069         TopicMatcherDFAState startingStateMachine;
070         TopicMatcherDFAState newStateMachine;
071 
072         do
073         {
074             startingStateMachine = _stateMachine.get();
075             if(startingStateMachine == null)
076             {
077                 newStateMachine = createStateMachine(bindingKey, result);
078             }
079             else
080             {
081                 newStateMachine = startingStateMachine.mergeStateMachines(createStateMachine(bindingKey, result));
082             }
083 
084         }
085         while(!_stateMachine.compareAndSet(startingStateMachine,newStateMachine));
086 
087     }
088 
089     public Collection<TopicMatcherResult> parse(AMQShortString routingKey)
090     {
091         TopicMatcherDFAState stateMachine = _stateMachine.get();
092         if(stateMachine == null)
093         {
094             return Collections.EMPTY_SET;
095         }
096         else
097         {
098             return stateMachine.parse(_dictionary,routingKey);
099         }
100     }
101 
102 
103     TopicMatcherDFAState createStateMachine(AMQShortString bindingKey, TopicMatcherResult result)
104     {
105         List<TopicWord> wordList = createTopicWordList(bindingKey);
106         int wildCards = 0;
107         for(TopicWord word : wordList)
108         {
109             if(word == TopicWord.WILDCARD_WORD)
110             {
111                 wildCards++;
112             }
113         }
114         if(wildCards == 0)
115         {
116             TopicMatcherDFAState[] states = new TopicMatcherDFAState[wordList.size()+1];
117             states[states.length-1new TopicMatcherDFAState(Collections.EMPTY_MAP, Collections.singleton(result));
118             for(int i = states.length-2; i >= 0; i--)
119             {
120                 states[inew TopicMatcherDFAState(Collections.singletonMap(wordList.get(i),states[i+1]),Collections.EMPTY_SET);
121 
122             }
123             return states[0];
124         }
125         else if(wildCards == wordList.size())
126         {
127             Map<TopicWord,TopicMatcherDFAState> stateMap = new HashMap<TopicWord,TopicMatcherDFAState>();
128             TopicMatcherDFAState state = new TopicMatcherDFAState(stateMap, Collections.singleton(result));
129             stateMap.put(TopicWord.ANY_WORD, state);
130             return state;
131         }
132 
133 
134         int positionCount = wordList.size() - wildCards;
135 
136         Position[] positions = new Position[positionCount+1];
137 
138         int lastWord;
139 
140         if(wordList.get(wordList.size()-1)== TopicWord.WILDCARD_WORD)
141         {
142             lastWord = wordList.size()-1;
143             positions[positionCountnew Position(positionCount, TopicWord.ANY_WORD, true, true);
144         }
145         else
146         {
147             lastWord = wordList.size();
148             positions[positionCountnew Position(positionCount, TopicWord.ANY_WORD, false, true);
149         }
150 
151 
152         int pos = 0;
153         int wordPos = 0;
154 
155 
156         while(wordPos < lastWord)
157         {
158             TopicWord word = wordList.get(wordPos++);
159 
160             if(word == TopicWord.WILDCARD_WORD)
161             {
162                 int nextWordPos = wordPos++;
163                 word = wordList.get(nextWordPos);
164 
165                 positions[posnew Position(pos++,word,true,false);
166             }
167             else
168             {
169                 positions[posnew Position(pos++,word,false,false);
170             }
171 
172         }
173 
174 
175         for(int p = 0; p<positionCount; p++)
176         {
177             boolean followedByWildcards = true;
178 
179             int n = p;
180             while(followedByWildcards && n<(positionCount+1))
181             {
182 
183                 if(positions[n]._selfTransition)
184                 {
185                     break;
186                 }
187                 else if(positions[n]._word!=TopicWord.ANY_WORD)
188                 {
189                     followedByWildcards = false;
190                 }
191                 n++;
192             }
193 
194 
195             positions[p]._followedByAnyLoop = followedByWildcards && (n!= positionCount+1);
196         }
197 
198 
199         // from each position you transition to a set of other positions.
200         // we approach this by examining steps of increasing length - so we
201         // look how far we can go from the start position in 1 word, 2 words, etc...
202 
203         Map<Set<Position>,SimpleState> stateMap = new HashMap<Set<Position>,SimpleState>();
204 
205 
206         SimpleState state = new SimpleState();
207         state._positions = Collections.singletonpositions[0] );
208         stateMap.put(state._positions, state);
209 
210         calculateNextStates(state, stateMap, positions);
211 
212         SimpleState[] simpleStates = stateMap.values().toArray(new SimpleState[stateMap.size()]);
213         HashMap<TopicWord, TopicMatcherDFAState>[] dfaStateMaps = new HashMap[simpleStates.length];
214         Map<SimpleState, TopicMatcherDFAState> simple2DFAMap = new HashMap<SimpleState, TopicMatcherDFAState>();
215 
216         for(int i = 0; i < simpleStates.length; i++)
217         {
218 
219             Collection<TopicMatcherResult> results;
220             boolean endState = false;
221 
222             for(Position p : simpleStates[i]._positions)
223             {
224                 if(p._endState)
225                 {
226                     endState = true;
227                     break;
228                 }
229             }
230 
231             if(endState)
232             {
233                 results = Collections.singleton(result);
234             }
235             else
236             {
237                 results = Collections.EMPTY_SET;
238             }
239 
240             dfaStateMaps[inew HashMap<TopicWord, TopicMatcherDFAState>();
241             simple2DFAMap.put(simpleStates[i]new TopicMatcherDFAState(dfaStateMaps[i],results));
242 
243         }
244         for(int i = 0; i < simpleStates.length; i++)
245         {
246             SimpleState simpleState = simpleStates[i];
247 
248             Map<TopicWord, SimpleState> nextSimpleStateMap = simpleState._nextState;
249             for(Map.Entry<TopicWord, SimpleState> stateMapEntry : nextSimpleStateMap.entrySet())
250             {
251                 dfaStateMaps[i].put(stateMapEntry.getKey(), simple2DFAMap.get(stateMapEntry.getValue()));
252             }
253 
254         }
255 
256         return simple2DFAMap.get(state);
257 
258     }
259 
260 
261 
262     private void calculateNextStates(final SimpleState state,
263                                      final Map<Set<Position>, SimpleState> stateMap,
264                                      final Position[] positions)
265     {
266         Map<TopicWord, Set<Position>> transitions = new HashMap<TopicWord,Set<Position>>();
267 
268         for(Position pos : state._positions)
269         {
270             if(pos._selfTransition)
271             {
272                 Set<Position> dest = transitions.get(TopicWord.ANY_WORD);
273                 if(dest == null)
274                 {
275                     dest = new HashSet<Position>();
276                     transitions.put(TopicWord.ANY_WORD,dest);
277                 }
278                 dest.add(pos);
279             }
280 
281             final int nextPos = pos._position + 1;
282             Position nextPosition = nextPos == positions.length ? ERROR_POSITION : positions[nextPos];
283 
284             Set<Position> dest = transitions.get(pos._word);
285             if(dest == null)
286             {
287                 dest = new HashSet<Position>();
288                 transitions.put(pos._word,dest);
289             }
290             dest.add(nextPosition);
291 
292         }
293 
294         Set<Position> anyWordTransitions = transitions.get(TopicWord.ANY_WORD);
295         if(anyWordTransitions != null)
296         {
297             for(Set<Position> dest : transitions.values())
298             {
299                 dest.addAll(anyWordTransitions);
300             }
301         }
302 
303         state._nextState = new HashMap<TopicWord, SimpleState>();
304 
305         for(Map.Entry<TopicWord,Set<Position>> dest : transitions.entrySet())
306         {
307 
308             if(dest.getValue().size()>1)
309             {
310                 dest.getValue().remove(ERROR_POSITION);
311             }
312             Position loopingTerminal = null;
313             for(Position destPos : dest.getValue())
314             {
315                 if(destPos._selfTransition && destPos._endState)
316                 {
317                     loopingTerminal = destPos;
318                     break;
319                 }
320             }
321 
322             if(loopingTerminal!=null)
323             {
324                 dest.setValue(Collections.singleton(loopingTerminal));
325             }
326             else
327             {
328                 Position anyLoop = null;
329                 for(Position destPos : dest.getValue())
330                 {
331                     if(destPos._followedByAnyLoop)
332                     {
333                         if(anyLoop == null || anyLoop._position<destPos._position)
334                         {
335                             anyLoop = destPos;
336                         }
337                     }
338                 }
339                 if(anyLoop != null)
340                 {
341                     Collection<Position> removals = new ArrayList<Position>();
342                     for(Position destPos : dest.getValue())
343                     {
344                         if(destPos._position < anyLoop._position)
345                         {
346                             removals.add(destPos);
347                         }
348                     }
349                     dest.getValue().removeAll(removals);
350                 }
351             }
352 
353             SimpleState stateForEntry = stateMap.get(dest.getValue());
354             if(stateForEntry == null)
355             {
356                 stateForEntry = new SimpleState();
357                 stateForEntry._positions = dest.getValue();
358                 stateMap.put(dest.getValue(),stateForEntry);
359                 calculateNextStates(stateForEntry,
360                                     stateMap,
361                                     positions);
362             }
363             state._nextState.put(dest.getKey(),stateForEntry);
364 
365 
366 
367         }
368 
369         // remove redundant transitions
370         SimpleState anyWordState = state._nextState.get(TopicWord.ANY_WORD);
371         if(anyWordState != null)
372         {
373             List<TopicWord> removeList = new ArrayList<TopicWord>();
374             for(Map.Entry<TopicWord,SimpleState> entry : state._nextState.entrySet())
375             {
376                 if(entry.getValue() == anyWordState && entry.getKey() != TopicWord.ANY_WORD)
377                 {
378                     removeList.add(entry.getKey());
379                 }
380             }
381             for(TopicWord removeKey : removeList)
382             {
383                 state._nextState.remove(removeKey);
384             }
385         }
386 
387 
388     }
389 
390     private List<TopicWord> createTopicWordList(final AMQShortString bindingKey)
391     {
392         AMQShortStringTokenizer tokens = bindingKey.tokenize(TOPIC_DELIMITER);
393         TopicWord previousWord = null;
394 
395         List<TopicWord> wordList = new ArrayList<TopicWord>();
396 
397         while(tokens.hasMoreTokens())
398         {
399             TopicWord nextWord = _dictionary.getOrCreateWord(tokens.nextToken());
400             if(previousWord == TopicWord.WILDCARD_WORD)
401             {
402 
403                 if(nextWord == TopicWord.WILDCARD_WORD)
404                 {
405                     // consecutive wildcards can be merged
406                     // i.e. subsequent wildcards can be discarded
407                     continue;
408                 }
409                 else if(nextWord == TopicWord.ANY_WORD)
410                 {
411                     // wildcard and anyword can be reordered to always put anyword first
412                     wordList.set(wordList.size()-1,TopicWord.ANY_WORD);
413                     nextWord = TopicWord.WILDCARD_WORD;
414                 }
415             }
416             wordList.add(nextWord);
417             previousWord = nextWord;
418 
419         }
420         return wordList;
421     }
422 
423 
424     public static void main(String[] args)
425     {
426 
427         printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.*.q.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
428         printMatches(new String[]{
429                         "#.a.#",
430                         "#.b.#",
431                         "#.c.#",
432                         "#.d.#",
433                         "#.e.#",
434                         "#.f.#",
435                         "#.g.#",
436                         "#.h.#",
437                         "#.i.#",
438                         "#.j.#",
439                         "#.k.#",
440                         "#.l.#",
441                         "#.m.#",
442                         "#.n.#",
443                         "#.o.#",
444                         "#.p.#",
445                         "#.q.#"
446 
447         }"a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");        
448 /*
449         printMatches(new String[]{
450                 "#.a.#",
451                 "#.b.#",
452                 "#.c.#",
453                 "#.d.#",
454                 "#.e.#",
455                 "#.f.#",
456                 "#.g.#",
457                 "#.h.#",
458                 "#.i.#",
459                 "#.j.#",
460                 "#.k.#",
461                 "#.l.#",
462                 "#.m.#",
463                 "#.n.#",
464                 "#.o.#",
465                 "#.p.#",
466                 "#.q.#",
467                 "#.r.#",
468                 "#.s.#",
469                 "#.t.#",
470                 "#.u.#",
471                 "#.v.#",
472                 "#.w.#",
473                 "#.x.#",
474                 "#.y.#",
475                 "#.z.#"
476 
477 
478         },"a.b");
479 
480         printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
481         printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
482         printMatches("a.#.b.#","a.b.b.b.b.b.b.b.c");
483 
484 */
485 
486         printMatches("","");
487         printMatches("a","a");
488         printMatches("a","");
489         printMatches("","a");
490         printMatches("a.b","a.b");
491         printMatches("a","a.b");
492         printMatches("a.b","a");
493         printMatches("*","a");
494         printMatches("*.b","a.b");
495         printMatches("*.*","a.b");
496         printMatches("a.*","a.b");
497         printMatches("a.*.#","a.b");
498         printMatches("a.#.b","a.b");
499 
500         printMatches("#.b","a");
501         printMatches("#.b","a.b");
502         printMatches("#.a.b","a.b");
503 
504 
505         printMatches("#","");
506         printMatches("#","a");
507         printMatches("#","a.b");
508         printMatches("#.#","a.b");
509         printMatches("#.*","a.b");
510 
511         printMatches("#.a.b","a.b");
512         printMatches("a.b.#","a.b");
513         printMatches("a.#","a.b");
514         printMatches("#.*.#","a.b");
515         printMatches("#.*.b.#","a.b");
516         printMatches("#.a.*.#","a.b");
517         printMatches("#.a.#.b.#","a.b");
518         printMatches("#.*.#.*.#","a.b");
519         printMatches("*.#.*.#","a.b");
520         printMatches("#.*.#.*","a.b");
521 
522 
523         printMatches(new String[]{"a.#.b.#","a.*.#.b.#"},"a.b.b.b.b.b.b.b.c");
524 
525 
526         printMatches(new String[]{"a.b""a.c"},"a.b");
527         printMatches(new String[]{"a.#""a.c""#.b"},"a.b");
528         printMatches(new String[]{"a.#""a.c""#.b""#""*.*"},"a.b");
529 
530         printMatches(new String[]{"a.b.c.d.e.#""a.b.c.d.#""a.b.c.d.*""a.b.c.#""#.e""a.*.c.d.e","#.c.*.#.*.*"},"a.b.c.d.e");
531         printMatches(new String[]{"a.b.c.d.e.#""a.b.c.d.#""a.b.c.d.*""a.b.c.#""#.e""a.*.c.d.e","#.c.*.#.*.*"},"a.b.c.d.f.g");
532 
533 
534 
535 
536     }
537 
538     private static void printMatches(final String[] bindingKeys, final String routingKey)
539     {
540         TopicMatcherDFAState sm = null;
541         Map<TopicMatcherResult, String> resultMap = new HashMap<TopicMatcherResult, String>();
542 
543         TopicParser parser = new TopicParser();
544 
545         long start = System.currentTimeMillis();
546         for(int i = 0; i < bindingKeys.length; i++)
547         {
548             System.out.println((System.currentTimeMillis() - start":\t" + bindingKeys[i]);
549             TopicMatcherResult r = new TopicMatcherResult(){};
550             resultMap.put(r, bindingKeys[i]);
551             AMQShortString bindingKeyShortString = new AMQShortString(bindingKeys[i]);
552 
553             System.err.println("=====================================================");
554             System.err.println("Adding binding key: " + bindingKeyShortString);
555             System.err.println("-----------------------------------------------------");
556 
557 
558             if(i==0)
559             {
560                 sm = parser.createStateMachine(bindingKeyShortString, r);
561             }
562             else
563             {
564                 sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeyShortString, r));
565             }
566             System.err.println(sm.reachableStates());
567             System.err.println("=====================================================");
568             try
569             {
570                 System.in.read();
571             }
572             catch (IOException e)
573             {
574                 e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
575             }
576         }
577         AMQShortString routingKeyShortString = new AMQShortString(routingKey);
578 
579         Collection<TopicMatcherResult> results = sm.parse(parser._dictionary, routingKeyShortString);
580         Collection<String> resultStrings = new ArrayList<String>();
581 
582         for(TopicMatcherResult result : results)
583         {
584             resultStrings.add(resultMap.get(result));
585         }
586 
587         final ArrayList<String> nonMatches = new ArrayList<String>(Arrays.asList(bindingKeys));
588         nonMatches.removeAll(resultStrings);
589         System.out.println("\""+routingKeyShortString+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches);
590 
591 
592     }
593 
594     private static void printMatches(String bindingKey, String routingKey)
595     {
596         printMatches(new String[] { bindingKey }, routingKey);
597     }
598 
599 
600     private static boolean matches(String bindingKey, String routingKey)
601     {
602         AMQShortString bindingKeyShortString = new AMQShortString(bindingKey);
603         AMQShortString routingKeyShortString = new AMQShortString(routingKey);
604         TopicParser parser = new TopicParser();
605 
606         final TopicMatcherResult result = new TopicMatcherResult(){};
607 
608         TopicMatcherDFAState sm = parser.createStateMachine(bindingKeyShortString, result);
609         return !sm.parse(parser._dictionary,routingKeyShortString).isEmpty();
610 
611     }
612 
613 }