ToyBroker.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid;
022 
023 import org.apache.qpid.transport.*;
024 import org.apache.qpid.transport.network.mina.MinaHandler;
025 
026 import static org.apache.qpid.transport.util.Functions.str;
027 
028 import java.io.IOException;
029 import java.nio.ByteBuffer;
030 import java.util.ArrayList;
031 import java.util.HashMap;
032 import java.util.List;
033 import java.util.Map;
034 import java.util.concurrent.ConcurrentHashMap;
035 import java.util.concurrent.LinkedBlockingQueue;
036 
037 
038 /**
039  * ToyBroker
040  *
041  @author Rafael H. Schloming
042  */
043 
044 class ToyBroker extends SessionDelegate
045 {
046 
047     private ToyExchange exchange;
048     private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>();
049 
050     public ToyBroker(ToyExchange exchange)
051     {
052         this.exchange = exchange;
053     }
054 
055     public void messageAcquire(Session context, MessageAcquire struct)
056     {
057         System.out.println("\n==================> messageAcquire " );
058         context.executionResult((intstruct.getId()new Acquired(struct.getTransfers()));
059     }
060 
061     @Override public void queueDeclare(Session ssn, QueueDeclare qd)
062     {
063         exchange.createQueue(qd.getQueue());
064         System.out.println("\n==================> declared queue: " + qd.getQueue() "\n");
065     }
066 
067     @Override public void exchangeBind(Session ssn, ExchangeBind qb)
068     {
069         exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue());
070         System.out.println("\n==================> bound queue: " + qb.getQueue() " with binding key " + qb.getBindingKey() "\n");
071     }
072 
073     @Override public void queueQuery(Session ssn, QueueQuery qq)
074     {
075         QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
076         ssn.executionResult((intqq.getId(), result);
077     }
078 
079     @Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
080     {
081         Consumer c = new Consumer();
082         c._queueName = ms.getQueue();
083         consumers.put(ms.getDestination(),c);
084         System.out.println("\n==================> message subscribe : " + ms.getDestination() " queue: " + ms.getQueue()  "\n");
085     }
086 
087     @Override public void messageFlow(Session ssn,MessageFlow struct)
088     {
089         Consumer c = consumers.get(struct.getDestination());
090         c._credit = struct.getValue();
091         System.out.println("\n==================> message flow : " + struct.getDestination() " credit: " + struct.getValue()  "\n");
092     }
093 
094     @Override public void messageFlush(Session ssn,MessageFlush struct)
095     {
096         System.out.println("\n==================> message flush for consumer : " + struct.getDestination() "\n");
097         checkAndSendMessagesToConsumer(ssn,struct.getDestination());
098     }
099 
100     @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
101     {
102         String dest = xfr.getDestination();
103         System.out.println("received transfer " + dest);
104         Header header = xfr.getHeader();
105         DeliveryProperties props = header.get(DeliveryProperties.class);
106         if (props != null)
107         {
108             System.out.println("received headers routing_key " + props.getRoutingKey());
109         }
110         MessageProperties mp = header.get(MessageProperties.class);
111         System.out.println("MP: " + mp);
112         if (mp != null)
113         {
114             System.out.println(mp.getApplicationHeaders());
115         }
116 
117         if (exchange.route(dest,props.getRoutingKey(),xfr))
118         {
119             System.out.println("queued " + xfr);
120             dispatchMessages(ssn);
121         }
122         else
123         {
124 
125             if (props == null || !props.getDiscardUnroutable())
126             {
127                 RangeSet ranges = new RangeSet();
128                 ranges.add(xfr.getId());
129                 ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
130                                   "no such destination");
131             }
132         }
133         ssn.processed(xfr);
134     }
135 
136     private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m)
137     {
138         System.out.println("\n==================> Transfering message to: " +dest + "\n");
139         ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT,
140                             MessageAcquireMode.PRE_ACQUIRED,
141                             m.getHeader(), m.getBody());
142     }
143 
144     private void dispatchMessages(Session ssn)
145     {
146         for (String dest: consumers.keySet())
147         {
148             checkAndSendMessagesToConsumer(ssn,dest);
149         }
150     }
151 
152     private void checkAndSendMessagesToConsumer(Session ssn,String dest)
153     {
154         Consumer c = consumers.get(dest);
155         LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName);
156         MessageTransfer m = queue.poll();
157         while (m != null && c._credit>0)
158         {
159             transferMessageToPeer(ssn,dest,m);
160             c._credit--;
161             m = queue.poll();
162         }
163     }
164 
165     // ugly, but who cares :)
166     // assumes unit is always no of messages, not bytes
167     // assumes it's credit mode and not window
168     private class Consumer
169     {
170         long _credit;
171         String _queueName;
172     }
173 
174     public static final void main(String[] argsthrows IOException
175     {
176         final ToyExchange exchange = new ToyExchange();
177         ConnectionDelegate delegate = new ServerDelegate()
178         {
179             public SessionDelegate getSessionDelegate()
180             {
181                 return new ToyBroker(exchange);
182             }
183         };
184 
185         MinaHandler.accept("0.0.0.0"5672, delegate);
186     }
187 
188 }