001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.handler;
022
023 import org.apache.qpid.AMQException;
024 import org.apache.qpid.framing.QueueDeleteBody;
025 import org.apache.qpid.framing.QueueDeleteOkBody;
026 import org.apache.qpid.framing.MethodRegistry;
027 import org.apache.qpid.protocol.AMQConstant;
028 import org.apache.qpid.server.protocol.AMQProtocolSession;
029 import org.apache.qpid.server.queue.QueueRegistry;
030 import org.apache.qpid.server.queue.AMQQueue;
031 import org.apache.qpid.server.state.AMQStateManager;
032 import org.apache.qpid.server.state.StateAwareMethodListener;
033 import org.apache.qpid.server.virtualhost.VirtualHost;
034 import org.apache.qpid.server.AMQChannel;
035 import org.apache.qpid.server.routing.RoutingTable;
036
037 public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
038 {
039 private static final QueueDeleteHandler _instance = new QueueDeleteHandler();
040
041 public static QueueDeleteHandler getInstance()
042 {
043 return _instance;
044 }
045
046 private final boolean _failIfNotFound;
047
048 public QueueDeleteHandler()
049 {
050 this(true);
051 }
052
053 public QueueDeleteHandler(boolean failIfNotFound)
054 {
055 _failIfNotFound = failIfNotFound;
056
057 }
058
059 public void methodReceived(AMQStateManager stateManager, QueueDeleteBody body, int channelId) throws AMQException
060 {
061 AMQProtocolSession session = stateManager.getProtocolSession();
062 VirtualHost virtualHost = session.getVirtualHost();
063 QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
064 RoutingTable routingTable = virtualHost.getRoutingTable();
065
066 AMQQueue queue;
067 if (body.getQueue() == null)
068 {
069 AMQChannel channel = session.getChannel(channelId);
070
071 if (channel == null)
072 {
073 throw body.getChannelNotFoundException(channelId);
074 }
075
076 //get the default queue on the channel:
077 queue = channel.getDefaultQueue();
078 }
079 else
080 {
081 queue = queueRegistry.getQueue(body.getQueue());
082 }
083
084 if (queue == null)
085 {
086 if (_failIfNotFound)
087 {
088 throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
089 }
090 }
091 else
092 {
093 if (body.getIfEmpty() && !queue.isEmpty())
094 {
095 throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.");
096 }
097 else if (body.getIfUnused() && !queue.isUnused())
098 {
099 // TODO - Error code
100 throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.");
101
102 }
103 else
104 {
105
106 //Perform ACLs
107 if (!virtualHost.getAccessManager().authoriseDelete(session, queue))
108 {
109 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
110 }
111
112 int purged = queue.delete();
113
114
115 if (queue.isDurable())
116 {
117 routingTable.removeQueue(queue);
118 }
119
120 MethodRegistry methodRegistry = session.getMethodRegistry();
121 QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
122 session.writeFrame(responseBody.generateFrame(channelId));
123 }
124 }
125 }
126 }
|