ToyExchange.java
001 package org.apache.qpid;
002 /*
003  
004  * Licensed to the Apache Software Foundation (ASF) under one
005  * or more contributor license agreements.  See the NOTICE file
006  * distributed with this work for additional information
007  * regarding copyright ownership.  The ASF licenses this file
008  * to you under the Apache License, Version 2.0 (the
009  * "License"); you may not use this file except in compliance
010  * with the License.  You may obtain a copy of the License at
011  
012  *   http://www.apache.org/licenses/LICENSE-2.0
013  
014  * Unless required by applicable law or agreed to in writing,
015  * software distributed under the License is distributed on an
016  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017  * KIND, either express or implied.  See the License for the
018  * specific language governing permissions and limitations
019  * under the License.
020  
021  */
022 
023 
024 import java.util.ArrayList;
025 import java.util.HashMap;
026 import java.util.LinkedList;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.concurrent.LinkedBlockingQueue;
030 import java.util.regex.Matcher;
031 import java.util.regex.Pattern;
032 
033 import org.apache.qpid.transport.MessageTransfer;
034 
035 
036 public class ToyExchange
037 {
038   final static String DIRECT = "amq.direct";
039   final static String TOPIC = "amq.topic";
040   
041   private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> directEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
042   private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>()
043   private Map<String,LinkedBlockingQueue<MessageTransfer>> queues = new HashMap<String,LinkedBlockingQueue<MessageTransfer>>()
044   
045   public void createQueue(String name)
046   {
047       queues.put(name, new LinkedBlockingQueue<MessageTransfer>());
048   }
049   
050   public LinkedBlockingQueue<MessageTransfer> getQueue(String name)
051   {
052       return queues.get(name);
053   }
054   
055   public void bindQueue(String type,String binding,String queueName)
056   {
057       LinkedBlockingQueue<MessageTransfer> queue = queues.get(queueName);
058       binding = normalizeKey(binding);
059       if(DIRECT.equals(type))
060       {
061           
062           if (directEx.containsKey(binding))
063           {
064               List<LinkedBlockingQueue<MessageTransfer>> list = directEx.get(binding);
065               list.add(queue);
066           }
067           else
068           {
069               List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
070               list.add(queue);
071               directEx.put(binding,list);
072           }
073       }
074       else
075       {
076           if (topicEx.containsKey(binding))
077           {
078               List<LinkedBlockingQueue<MessageTransfer>> list = topicEx.get(binding);
079               list.add(queue);
080           }
081           else
082           {
083               List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
084               list.add(queue);
085               topicEx.put(binding,list);
086           }
087       }
088   }
089   
090   public boolean route(String dest, String routingKey, MessageTransfer msg)
091   {
092       List<LinkedBlockingQueue<MessageTransfer>> queues;
093       if(DIRECT.equals(dest))
094       {
095           queues = directEx.get(routingKey);          
096       }
097       else
098       {
099           queues = matchWildCard(routingKey);
100       }
101       if(queues != null && queues.size()>0)
102       {
103           System.out.println("Message stored in " + queues.size() " queues");
104           storeMessage(msg,queues);
105           return true;
106       }
107       else
108       {
109           System.out.println("Message unroutable " + msg);
110           return false;
111       }
112   }
113   
114   private String normalizeKey(String routingKey)
115   {
116       if(routingKey.indexOf(".*")>1)
117       {
118           return routingKey.substring(0,routingKey.indexOf(".*"));
119       }
120       else
121       {
122           return routingKey;
123       }
124   }
125   
126   private List<LinkedBlockingQueue<MessageTransfer>> matchWildCard(String routingKey)
127   {        
128       List<LinkedBlockingQueue<MessageTransfer>> selected = new ArrayList<LinkedBlockingQueue<MessageTransfer>>();
129       
130       for(String key: topicEx.keySet())
131       {
132           Pattern p = Pattern.compile(key);
133           Matcher m = p.matcher(routingKey);
134           if (m.find())
135           {
136               for(LinkedBlockingQueue<MessageTransfer> queue : topicEx.get(key))
137               {
138                     selected.add(queue);
139               }
140           }
141       }
142       
143       return selected;      
144   }
145 
146   private void storeMessage(MessageTransfer msg,List<LinkedBlockingQueue<MessageTransfer>> selected)
147   {
148       for(LinkedBlockingQueue<MessageTransfer> queue : selected)
149       {
150           queue.offer(msg);
151       }
152   }
153   
154 }