001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid;
022
023 import org.apache.qpid.transport.*;
024 import org.apache.qpid.transport.network.mina.MinaHandler;
025
026 import static org.apache.qpid.transport.util.Functions.str;
027
028 import java.io.IOException;
029 import java.nio.ByteBuffer;
030 import java.util.ArrayList;
031 import java.util.HashMap;
032 import java.util.List;
033 import java.util.Map;
034 import java.util.concurrent.ConcurrentHashMap;
035 import java.util.concurrent.LinkedBlockingQueue;
036
037
038 /**
039 * ToyBroker
040 *
041 * @author Rafael H. Schloming
042 */
043
044 class ToyBroker extends SessionDelegate
045 {
046
047 private ToyExchange exchange;
048 private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>();
049
050 public ToyBroker(ToyExchange exchange)
051 {
052 this.exchange = exchange;
053 }
054
055 public void messageAcquire(Session context, MessageAcquire struct)
056 {
057 System.out.println("\n==================> messageAcquire " );
058 context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers()));
059 }
060
061 @Override public void queueDeclare(Session ssn, QueueDeclare qd)
062 {
063 exchange.createQueue(qd.getQueue());
064 System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n");
065 }
066
067 @Override public void exchangeBind(Session ssn, ExchangeBind qb)
068 {
069 exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue());
070 System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n");
071 }
072
073 @Override public void queueQuery(Session ssn, QueueQuery qq)
074 {
075 QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
076 ssn.executionResult((int) qq.getId(), result);
077 }
078
079 @Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
080 {
081 Consumer c = new Consumer();
082 c._queueName = ms.getQueue();
083 consumers.put(ms.getDestination(),c);
084 System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n");
085 }
086
087 @Override public void messageFlow(Session ssn,MessageFlow struct)
088 {
089 Consumer c = consumers.get(struct.getDestination());
090 c._credit = struct.getValue();
091 System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n");
092 }
093
094 @Override public void messageFlush(Session ssn,MessageFlush struct)
095 {
096 System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n");
097 checkAndSendMessagesToConsumer(ssn,struct.getDestination());
098 }
099
100 @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
101 {
102 String dest = xfr.getDestination();
103 System.out.println("received transfer " + dest);
104 Header header = xfr.getHeader();
105 DeliveryProperties props = header.get(DeliveryProperties.class);
106 if (props != null)
107 {
108 System.out.println("received headers routing_key " + props.getRoutingKey());
109 }
110 MessageProperties mp = header.get(MessageProperties.class);
111 System.out.println("MP: " + mp);
112 if (mp != null)
113 {
114 System.out.println(mp.getApplicationHeaders());
115 }
116
117 if (exchange.route(dest,props.getRoutingKey(),xfr))
118 {
119 System.out.println("queued " + xfr);
120 dispatchMessages(ssn);
121 }
122 else
123 {
124
125 if (props == null || !props.getDiscardUnroutable())
126 {
127 RangeSet ranges = new RangeSet();
128 ranges.add(xfr.getId());
129 ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
130 "no such destination");
131 }
132 }
133 ssn.processed(xfr);
134 }
135
136 private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m)
137 {
138 System.out.println("\n==================> Transfering message to: " +dest + "\n");
139 ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT,
140 MessageAcquireMode.PRE_ACQUIRED,
141 m.getHeader(), m.getBody());
142 }
143
144 private void dispatchMessages(Session ssn)
145 {
146 for (String dest: consumers.keySet())
147 {
148 checkAndSendMessagesToConsumer(ssn,dest);
149 }
150 }
151
152 private void checkAndSendMessagesToConsumer(Session ssn,String dest)
153 {
154 Consumer c = consumers.get(dest);
155 LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName);
156 MessageTransfer m = queue.poll();
157 while (m != null && c._credit>0)
158 {
159 transferMessageToPeer(ssn,dest,m);
160 c._credit--;
161 m = queue.poll();
162 }
163 }
164
165 // ugly, but who cares :)
166 // assumes unit is always no of messages, not bytes
167 // assumes it's credit mode and not window
168 private class Consumer
169 {
170 long _credit;
171 String _queueName;
172 }
173
174 public static final void main(String[] args) throws IOException
175 {
176 final ToyExchange exchange = new ToyExchange();
177 ConnectionDelegate delegate = new ServerDelegate()
178 {
179 public SessionDelegate getSessionDelegate()
180 {
181 return new ToyBroker(exchange);
182 }
183 };
184
185 MinaHandler.accept("0.0.0.0", 5672, delegate);
186 }
187
188 }
|