QueueUnbindHandler.java
001 package org.apache.qpid.server.handler;
002 /*
003  
004  * Licensed to the Apache Software Foundation (ASF) under one
005  * or more contributor license agreements.  See the NOTICE file
006  * distributed with this work for additional information
007  * regarding copyright ownership.  The ASF licenses this file
008  * to you under the Apache License, Version 2.0 (the
009  * "License"); you may not use this file except in compliance
010  * with the License.  You may obtain a copy of the License at
011  
012  *   http://www.apache.org/licenses/LICENSE-2.0
013  
014  * Unless required by applicable law or agreed to in writing,
015  * software distributed under the License is distributed on an
016  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017  * KIND, either express or implied.  See the License for the
018  * specific language governing permissions and limitations
019  * under the License.
020  
021  */
022 
023 
024 import org.apache.log4j.Logger;
025 
026 import org.apache.qpid.framing.*;
027 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
028 import org.apache.qpid.server.state.StateAwareMethodListener;
029 import org.apache.qpid.server.state.AMQStateManager;
030 import org.apache.qpid.server.protocol.AMQProtocolSession;
031 import org.apache.qpid.server.virtualhost.VirtualHost;
032 import org.apache.qpid.server.exchange.ExchangeRegistry;
033 import org.apache.qpid.server.exchange.Exchange;
034 import org.apache.qpid.server.queue.QueueRegistry;
035 import org.apache.qpid.server.queue.AMQQueue;
036 import org.apache.qpid.server.AMQChannel;
037 import org.apache.qpid.server.security.access.Permission;
038 import org.apache.qpid.AMQException;
039 import org.apache.qpid.AMQInvalidRoutingKeyException;
040 import org.apache.qpid.protocol.AMQConstant;
041 
042 public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
043 {
044     private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
045 
046     private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
047 
048     public static QueueUnbindHandler getInstance()
049     {
050         return _instance;
051     }
052 
053     private QueueUnbindHandler()
054     {
055     }
056 
057     public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelIdthrows AMQException
058     {
059         AMQProtocolSession session = stateManager.getProtocolSession();
060         VirtualHost virtualHost = session.getVirtualHost();
061         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
062         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
063 
064 
065         final AMQQueue queue;
066         final AMQShortString routingKey;               
067 
068         if (body.getQueue() == null)
069         {
070             AMQChannel channel = session.getChannel(channelId);
071 
072             if (channel == null)
073             {
074                 throw body.getChannelNotFoundException(channelId);
075             }
076 
077             queue = channel.getDefaultQueue();
078 
079             if (queue == null)
080             {
081                 throw body.getConnectionException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
082             }
083 
084             routingKey = body.getRoutingKey() == null null : body.getRoutingKey().intern();
085 
086         }
087         else
088         {
089             queue = queueRegistry.getQueue(body.getQueue());
090             routingKey = body.getRoutingKey() == null null : body.getRoutingKey().intern();
091         }
092 
093         if (queue == null)
094         {
095             throw body.getConnectionException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() " does not exist.");
096         }
097         final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
098         if (exch == null)
099         {
100             throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() " does not exist.");
101         }
102 
103         //Perform ACLs
104         if (!virtualHost.getAccessManager().authoriseUnbind(session, exch, routingKey, queue))
105         {
106             throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
107         }
108 
109         try
110         {
111             queue.unBind(exch, routingKey, body.getArguments());
112         }
113         catch (AMQInvalidRoutingKeyException rke)
114         {
115             throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
116         }
117         catch (AMQException e)
118         {
119             if(e.getErrorCode() == AMQConstant.NOT_FOUND)
120             {
121                 throw body.getConnectionException(AMQConstant.NOT_FOUND,e.getMessage(),e);
122             }
123             throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
124         }
125 
126         if (_log.isInfoEnabled())
127         {
128             _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
129         }
130 
131         MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9session.getMethodRegistry();
132         AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
133         session.writeFrame(responseBody.generateFrame(channelId));
134 
135 
136     }
137 }