001 package org.apache.qpid.server.handler;
002 /*
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied. See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 *
021 */
022
023
024 import org.apache.log4j.Logger;
025
026 import org.apache.qpid.framing.*;
027 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
028 import org.apache.qpid.server.state.StateAwareMethodListener;
029 import org.apache.qpid.server.state.AMQStateManager;
030 import org.apache.qpid.server.protocol.AMQProtocolSession;
031 import org.apache.qpid.server.virtualhost.VirtualHost;
032 import org.apache.qpid.server.exchange.ExchangeRegistry;
033 import org.apache.qpid.server.exchange.Exchange;
034 import org.apache.qpid.server.queue.QueueRegistry;
035 import org.apache.qpid.server.queue.AMQQueue;
036 import org.apache.qpid.server.AMQChannel;
037 import org.apache.qpid.server.security.access.Permission;
038 import org.apache.qpid.AMQException;
039 import org.apache.qpid.AMQInvalidRoutingKeyException;
040 import org.apache.qpid.protocol.AMQConstant;
041
042 public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
043 {
044 private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
045
046 private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
047
048 public static QueueUnbindHandler getInstance()
049 {
050 return _instance;
051 }
052
053 private QueueUnbindHandler()
054 {
055 }
056
057 public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException
058 {
059 AMQProtocolSession session = stateManager.getProtocolSession();
060 VirtualHost virtualHost = session.getVirtualHost();
061 ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
062 QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
063
064
065 final AMQQueue queue;
066 final AMQShortString routingKey;
067
068 if (body.getQueue() == null)
069 {
070 AMQChannel channel = session.getChannel(channelId);
071
072 if (channel == null)
073 {
074 throw body.getChannelNotFoundException(channelId);
075 }
076
077 queue = channel.getDefaultQueue();
078
079 if (queue == null)
080 {
081 throw body.getConnectionException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
082 }
083
084 routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
085
086 }
087 else
088 {
089 queue = queueRegistry.getQueue(body.getQueue());
090 routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
091 }
092
093 if (queue == null)
094 {
095 throw body.getConnectionException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
096 }
097 final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
098 if (exch == null)
099 {
100 throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
101 }
102
103 //Perform ACLs
104 if (!virtualHost.getAccessManager().authoriseUnbind(session, exch, routingKey, queue))
105 {
106 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
107 }
108
109 try
110 {
111 queue.unBind(exch, routingKey, body.getArguments());
112 }
113 catch (AMQInvalidRoutingKeyException rke)
114 {
115 throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
116 }
117 catch (AMQException e)
118 {
119 if(e.getErrorCode() == AMQConstant.NOT_FOUND)
120 {
121 throw body.getConnectionException(AMQConstant.NOT_FOUND,e.getMessage(),e);
122 }
123 throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
124 }
125
126 if (_log.isInfoEnabled())
127 {
128 _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
129 }
130
131 MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
132 AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
133 session.writeFrame(responseBody.generateFrame(channelId));
134
135
136 }
137 }
|