ExchangeBoundHandler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.handler;
022 
023 import org.apache.qpid.AMQException;
024 import org.apache.qpid.framing.*;
025 import org.apache.qpid.server.exchange.Exchange;
026 import org.apache.qpid.server.protocol.AMQProtocolSession;
027 import org.apache.qpid.server.queue.QueueRegistry;
028 import org.apache.qpid.server.queue.AMQQueue;
029 import org.apache.qpid.server.state.AMQStateManager;
030 import org.apache.qpid.server.state.StateAwareMethodListener;
031 import org.apache.qpid.server.virtualhost.VirtualHost;
032 
033 /**
034  @author Apache Software Foundation
035  *
036  *
037  */
038 public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
039 {
040     private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler();
041 
042     public static final int OK = 0;
043 
044     public static final int EXCHANGE_NOT_FOUND = 1;
045 
046     public static final int QUEUE_NOT_FOUND = 2;
047 
048     public static final int NO_BINDINGS = 3;
049 
050     public static final int QUEUE_NOT_BOUND = 4;
051 
052     public static final int NO_QUEUE_BOUND_WITH_RK = 5;
053 
054     public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
055 
056     public static ExchangeBoundHandler getInstance()
057     {
058         return _instance;
059     }
060 
061     private ExchangeBoundHandler()
062     {
063     }
064 
065     public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelIdthrows AMQException
066     {
067         AMQProtocolSession session = stateManager.getProtocolSession();
068         
069         VirtualHost virtualHost = session.getVirtualHost();
070         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
071         MethodRegistry methodRegistry = session.getMethodRegistry();
072 
073         
074 
075 
076         AMQShortString exchangeName = body.getExchange();
077         AMQShortString queueName = body.getQueue();
078         AMQShortString routingKey = body.getRoutingKey();
079         if (exchangeName == null)
080         {
081             throw new AMQException("Exchange exchange must not be null");
082         }
083         Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
084         ExchangeBoundOkBody response;
085         if (exchange == null)
086         {
087 
088 
089             response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
090                                                                 new AMQShortString("Exchange " + exchangeName + " not found"));
091         }
092         else if (routingKey == null)
093         {
094             if (queueName == null)
095             {
096                 if (exchange.hasBindings())
097                 {
098                     response = methodRegistry.createExchangeBoundOkBody(OK, null);
099                 }
100                 else
101                 {
102 
103                     response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS,  // replyCode
104                         null);  // replyText
105                 }
106             }
107             else
108             {
109 
110                 AMQQueue queue = queueRegistry.getQueue(queueName);
111                 if (queue == null)
112                 {
113 
114                     response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,  // replyCode
115                         new AMQShortString("Queue " + queueName + " not found"));  // replyText
116                 }
117                 else
118                 {
119                     if (exchange.isBound(queue))
120                     {
121 
122                         response = methodRegistry.createExchangeBoundOkBody(OK,  // replyCode
123                             null);  // replyText
124                     }
125                     else
126                     {
127 
128                         response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND,  // replyCode
129                             new AMQShortString("Queue " + queueName + " not bound to exchange " + exchangeName));  // replyText
130                     }
131                 }
132             }
133         }
134         else if (queueName != null)
135         {
136             AMQQueue queue = queueRegistry.getQueue(queueName);
137             if (queue == null)
138             {
139 
140                 response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,  // replyCode
141                     new AMQShortString("Queue " + queueName + " not found"));  // replyText
142             }
143             else
144             {
145                 if (exchange.isBound(body.getRoutingKey(), queue))
146                 {
147 
148                     response = methodRegistry.createExchangeBoundOkBody(OK,  // replyCode
149                         null);  // replyText
150                 }
151                 else
152                 {
153 
154                     response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,  // replyCode
155                         new AMQShortString("Queue " + queueName + " not bound with routing key " +
156                         body.getRoutingKey() " to exchange " + exchangeName));  // replyText
157                 }
158             }
159         }
160         else
161         {
162             if (exchange.isBound(body.getRoutingKey()))
163             {
164 
165                 response = methodRegistry.createExchangeBoundOkBody(OK,  // replyCode
166                     null);  // replyText
167             }
168             else
169             {
170 
171                 response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK,  // replyCode
172                     new AMQShortString("No queue bound with routing key " + body.getRoutingKey() +
173                     " to exchange " + exchangeName));  // replyText
174             }
175         }
176         session.writeFrame(response.generateFrame(channelId));
177     }
178 }