001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.handler;
022
023 import org.apache.qpid.AMQException;
024 import org.apache.qpid.framing.*;
025 import org.apache.qpid.server.exchange.Exchange;
026 import org.apache.qpid.server.protocol.AMQProtocolSession;
027 import org.apache.qpid.server.queue.QueueRegistry;
028 import org.apache.qpid.server.queue.AMQQueue;
029 import org.apache.qpid.server.state.AMQStateManager;
030 import org.apache.qpid.server.state.StateAwareMethodListener;
031 import org.apache.qpid.server.virtualhost.VirtualHost;
032
033 /**
034 * @author Apache Software Foundation
035 *
036 *
037 */
038 public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
039 {
040 private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler();
041
042 public static final int OK = 0;
043
044 public static final int EXCHANGE_NOT_FOUND = 1;
045
046 public static final int QUEUE_NOT_FOUND = 2;
047
048 public static final int NO_BINDINGS = 3;
049
050 public static final int QUEUE_NOT_BOUND = 4;
051
052 public static final int NO_QUEUE_BOUND_WITH_RK = 5;
053
054 public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
055
056 public static ExchangeBoundHandler getInstance()
057 {
058 return _instance;
059 }
060
061 private ExchangeBoundHandler()
062 {
063 }
064
065 public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException
066 {
067 AMQProtocolSession session = stateManager.getProtocolSession();
068
069 VirtualHost virtualHost = session.getVirtualHost();
070 QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
071 MethodRegistry methodRegistry = session.getMethodRegistry();
072
073
074
075
076 AMQShortString exchangeName = body.getExchange();
077 AMQShortString queueName = body.getQueue();
078 AMQShortString routingKey = body.getRoutingKey();
079 if (exchangeName == null)
080 {
081 throw new AMQException("Exchange exchange must not be null");
082 }
083 Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
084 ExchangeBoundOkBody response;
085 if (exchange == null)
086 {
087
088
089 response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
090 new AMQShortString("Exchange " + exchangeName + " not found"));
091 }
092 else if (routingKey == null)
093 {
094 if (queueName == null)
095 {
096 if (exchange.hasBindings())
097 {
098 response = methodRegistry.createExchangeBoundOkBody(OK, null);
099 }
100 else
101 {
102
103 response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode
104 null); // replyText
105 }
106 }
107 else
108 {
109
110 AMQQueue queue = queueRegistry.getQueue(queueName);
111 if (queue == null)
112 {
113
114 response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
115 new AMQShortString("Queue " + queueName + " not found")); // replyText
116 }
117 else
118 {
119 if (exchange.isBound(queue))
120 {
121
122 response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
123 null); // replyText
124 }
125 else
126 {
127
128 response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode
129 new AMQShortString("Queue " + queueName + " not bound to exchange " + exchangeName)); // replyText
130 }
131 }
132 }
133 }
134 else if (queueName != null)
135 {
136 AMQQueue queue = queueRegistry.getQueue(queueName);
137 if (queue == null)
138 {
139
140 response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
141 new AMQShortString("Queue " + queueName + " not found")); // replyText
142 }
143 else
144 {
145 if (exchange.isBound(body.getRoutingKey(), queue))
146 {
147
148 response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
149 null); // replyText
150 }
151 else
152 {
153
154 response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
155 new AMQShortString("Queue " + queueName + " not bound with routing key " +
156 body.getRoutingKey() + " to exchange " + exchangeName)); // replyText
157 }
158 }
159 }
160 else
161 {
162 if (exchange.isBound(body.getRoutingKey()))
163 {
164
165 response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
166 null); // replyText
167 }
168 else
169 {
170
171 response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode
172 new AMQShortString("No queue bound with routing key " + body.getRoutingKey() +
173 " to exchange " + exchangeName)); // replyText
174 }
175 }
176 session.writeFrame(response.generateFrame(channelId));
177 }
178 }
|