001 package org.apache.qpid;
002 /*
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied. See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 *
021 */
022
023
024 import java.util.ArrayList;
025 import java.util.HashMap;
026 import java.util.LinkedList;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.concurrent.LinkedBlockingQueue;
030 import java.util.regex.Matcher;
031 import java.util.regex.Pattern;
032
033 import org.apache.qpid.transport.MessageTransfer;
034
035
036 public class ToyExchange
037 {
038 final static String DIRECT = "amq.direct";
039 final static String TOPIC = "amq.topic";
040
041 private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> directEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
042 private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
043 private Map<String,LinkedBlockingQueue<MessageTransfer>> queues = new HashMap<String,LinkedBlockingQueue<MessageTransfer>>();
044
045 public void createQueue(String name)
046 {
047 queues.put(name, new LinkedBlockingQueue<MessageTransfer>());
048 }
049
050 public LinkedBlockingQueue<MessageTransfer> getQueue(String name)
051 {
052 return queues.get(name);
053 }
054
055 public void bindQueue(String type,String binding,String queueName)
056 {
057 LinkedBlockingQueue<MessageTransfer> queue = queues.get(queueName);
058 binding = normalizeKey(binding);
059 if(DIRECT.equals(type))
060 {
061
062 if (directEx.containsKey(binding))
063 {
064 List<LinkedBlockingQueue<MessageTransfer>> list = directEx.get(binding);
065 list.add(queue);
066 }
067 else
068 {
069 List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
070 list.add(queue);
071 directEx.put(binding,list);
072 }
073 }
074 else
075 {
076 if (topicEx.containsKey(binding))
077 {
078 List<LinkedBlockingQueue<MessageTransfer>> list = topicEx.get(binding);
079 list.add(queue);
080 }
081 else
082 {
083 List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
084 list.add(queue);
085 topicEx.put(binding,list);
086 }
087 }
088 }
089
090 public boolean route(String dest, String routingKey, MessageTransfer msg)
091 {
092 List<LinkedBlockingQueue<MessageTransfer>> queues;
093 if(DIRECT.equals(dest))
094 {
095 queues = directEx.get(routingKey);
096 }
097 else
098 {
099 queues = matchWildCard(routingKey);
100 }
101 if(queues != null && queues.size()>0)
102 {
103 System.out.println("Message stored in " + queues.size() + " queues");
104 storeMessage(msg,queues);
105 return true;
106 }
107 else
108 {
109 System.out.println("Message unroutable " + msg);
110 return false;
111 }
112 }
113
114 private String normalizeKey(String routingKey)
115 {
116 if(routingKey.indexOf(".*")>1)
117 {
118 return routingKey.substring(0,routingKey.indexOf(".*"));
119 }
120 else
121 {
122 return routingKey;
123 }
124 }
125
126 private List<LinkedBlockingQueue<MessageTransfer>> matchWildCard(String routingKey)
127 {
128 List<LinkedBlockingQueue<MessageTransfer>> selected = new ArrayList<LinkedBlockingQueue<MessageTransfer>>();
129
130 for(String key: topicEx.keySet())
131 {
132 Pattern p = Pattern.compile(key);
133 Matcher m = p.matcher(routingKey);
134 if (m.find())
135 {
136 for(LinkedBlockingQueue<MessageTransfer> queue : topicEx.get(key))
137 {
138 selected.add(queue);
139 }
140 }
141 }
142
143 return selected;
144 }
145
146 private void storeMessage(MessageTransfer msg,List<LinkedBlockingQueue<MessageTransfer>> selected)
147 {
148 for(LinkedBlockingQueue<MessageTransfer> queue : selected)
149 {
150 queue.offer(msg);
151 }
152 }
153
154 }
|