QueuePurgeHandler.java
001 /*
002  *  Licensed to the Apache Software Foundation (ASF) under one
003  *  or more contributor license agreements.  See the NOTICE file
004  *  distributed with this work for additional information
005  *  regarding copyright ownership.  The ASF licenses this file
006  *  to you under the Apache License, Version 2.0 (the
007  *  "License"); you may not use this file except in compliance
008  *  with the License.  You may obtain a copy of the License at
009  *
010  *    http://www.apache.org/licenses/LICENSE-2.0
011  *
012  *  Unless required by applicable law or agreed to in writing,
013  *  software distributed under the License is distributed on an
014  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015  *  KIND, either express or implied.  See the License for the
016  *  specific language governing permissions and limitations
017  *  under the License.
018  *
019  *
020  */
021 
022 package org.apache.qpid.server.handler;
023 
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.framing.QueuePurgeBody;
026 import org.apache.qpid.framing.MethodRegistry;
027 import org.apache.qpid.framing.AMQMethodBody;
028 import org.apache.qpid.protocol.AMQConstant;
029 import org.apache.qpid.server.protocol.AMQProtocolSession;
030 import org.apache.qpid.server.queue.QueueRegistry;
031 import org.apache.qpid.server.queue.AMQQueue;
032 import org.apache.qpid.server.state.AMQStateManager;
033 import org.apache.qpid.server.state.StateAwareMethodListener;
034 import org.apache.qpid.server.virtualhost.VirtualHost;
035 import org.apache.qpid.server.AMQChannel;
036 import org.apache.qpid.server.security.access.Permission;
037 
038 public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
039 {
040     private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
041 
042     public static QueuePurgeHandler getInstance()
043     {
044         return _instance;
045     }
046 
047     private final boolean _failIfNotFound;
048 
049     public QueuePurgeHandler()
050     {
051         this(true);
052     }
053 
054     public QueuePurgeHandler(boolean failIfNotFound)
055     {
056         _failIfNotFound = failIfNotFound;
057     }
058 
059     public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelIdthrows AMQException
060     {
061         AMQProtocolSession session = stateManager.getProtocolSession();
062         VirtualHost virtualHost = session.getVirtualHost();
063         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
064 
065         AMQChannel channel = session.getChannel(channelId);
066 
067 
068         AMQQueue queue;
069         if(body.getQueue() == null)
070         {
071 
072            if (channel == null)
073            {
074                throw body.getChannelNotFoundException(channelId);
075            }
076 
077            //get the default queue on the channel:
078            queue = channel.getDefaultQueue();
079             
080             if(queue == null)
081             {
082                 if(_failIfNotFound)
083                 {
084                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.");
085                 }
086             }
087         }
088         else
089         {
090             queue = queueRegistry.getQueue(body.getQueue());
091         }
092 
093         if(queue == null)
094         {
095             if(_failIfNotFound)
096             {
097                 throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() " does not exist.");
098             }
099         }
100         else
101         {
102 
103                 //Perform ACLs
104                 if (!virtualHost.getAccessManager().authorisePurge(session, queue))
105                 {
106                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
107                 }
108 
109                 long purged = queue.clearQueue(channel.getStoreContext());
110 
111 
112                 if(!body.getNowait())
113                 {
114 
115                     MethodRegistry methodRegistry = session.getMethodRegistry();
116                     AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
117                     session.writeFrame(responseBody.generateFrame(channelId));
118                     
119                 }
120         }
121     }
122 }