001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 *
019 *
020 */
021
022 package org.apache.qpid.server.handler;
023
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.framing.QueuePurgeBody;
026 import org.apache.qpid.framing.MethodRegistry;
027 import org.apache.qpid.framing.AMQMethodBody;
028 import org.apache.qpid.protocol.AMQConstant;
029 import org.apache.qpid.server.protocol.AMQProtocolSession;
030 import org.apache.qpid.server.queue.QueueRegistry;
031 import org.apache.qpid.server.queue.AMQQueue;
032 import org.apache.qpid.server.state.AMQStateManager;
033 import org.apache.qpid.server.state.StateAwareMethodListener;
034 import org.apache.qpid.server.virtualhost.VirtualHost;
035 import org.apache.qpid.server.AMQChannel;
036 import org.apache.qpid.server.security.access.Permission;
037
038 public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
039 {
040 private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
041
042 public static QueuePurgeHandler getInstance()
043 {
044 return _instance;
045 }
046
047 private final boolean _failIfNotFound;
048
049 public QueuePurgeHandler()
050 {
051 this(true);
052 }
053
054 public QueuePurgeHandler(boolean failIfNotFound)
055 {
056 _failIfNotFound = failIfNotFound;
057 }
058
059 public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
060 {
061 AMQProtocolSession session = stateManager.getProtocolSession();
062 VirtualHost virtualHost = session.getVirtualHost();
063 QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
064
065 AMQChannel channel = session.getChannel(channelId);
066
067
068 AMQQueue queue;
069 if(body.getQueue() == null)
070 {
071
072 if (channel == null)
073 {
074 throw body.getChannelNotFoundException(channelId);
075 }
076
077 //get the default queue on the channel:
078 queue = channel.getDefaultQueue();
079
080 if(queue == null)
081 {
082 if(_failIfNotFound)
083 {
084 throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.");
085 }
086 }
087 }
088 else
089 {
090 queue = queueRegistry.getQueue(body.getQueue());
091 }
092
093 if(queue == null)
094 {
095 if(_failIfNotFound)
096 {
097 throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
098 }
099 }
100 else
101 {
102
103 //Perform ACLs
104 if (!virtualHost.getAccessManager().authorisePurge(session, queue))
105 {
106 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
107 }
108
109 long purged = queue.clearQueue(channel.getStoreContext());
110
111
112 if(!body.getNowait())
113 {
114
115 MethodRegistry methodRegistry = session.getMethodRegistry();
116 AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
117 session.writeFrame(responseBody.generateFrame(channelId));
118
119 }
120 }
121 }
122 }
|