BasicConsumeMethodHandler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.handler;
022 
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.framing.*;
026 import org.apache.qpid.protocol.AMQConstant;
027 import org.apache.qpid.server.AMQChannel;
028 import org.apache.qpid.server.ConsumerTagNotUniqueException;
029 import org.apache.qpid.server.protocol.AMQProtocolSession;
030 import org.apache.qpid.server.queue.AMQQueue;
031 import org.apache.qpid.server.security.access.Permission;
032 import org.apache.qpid.server.state.AMQStateManager;
033 import org.apache.qpid.server.state.StateAwareMethodListener;
034 import org.apache.qpid.server.virtualhost.VirtualHost;
035 
036 public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody>
037 {
038     private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class);
039 
040     private static final BasicConsumeMethodHandler _instance = new BasicConsumeMethodHandler();
041 
042     public static BasicConsumeMethodHandler getInstance()
043     {
044         return _instance;
045     }
046 
047     private BasicConsumeMethodHandler()
048     {
049     }
050 
051     public void methodReceived(AMQStateManager stateManager, BasicConsumeBody body, int channelIdthrows AMQException
052     {
053         AMQProtocolSession session = stateManager.getProtocolSession();
054 
055 
056 
057 
058         AMQChannel channel = session.getChannel(channelId);
059 
060         VirtualHost vHost = session.getVirtualHost();
061 
062         if (channel == null)
063         {
064             throw body.getChannelNotFoundException(channelId);
065         }
066         else
067         {
068             if (_logger.isDebugEnabled())
069             {
070                 _logger.debug("BasicConsume: from '" + body.getQueue() +
071                               "' for:" + body.getConsumerTag() +
072                               " nowait:" + body.getNowait() +
073                               " args:" + body.getArguments());
074             }
075 
076             AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue().intern());
077 
078             if (queue == null)
079             {
080                 if (_logger.isDebugEnabled())
081                 {
082                     _logger.debug("No queue for '" + body.getQueue() "'");
083                 }
084                 if (body.getQueue() != null)
085                 {
086                     String msg = "No such queue, '" + body.getQueue() "'";
087                     throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
088                 }
089                 else
090                 {
091                     String msg = "No queue name provided, no default queue defined.";
092                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg);
093                 }
094             }
095             else
096             {
097 
098                 final AMQShortString consumerTagName;
099 
100                 // Check authz
101                 if (!vHost.getAccessManager().authoriseConsume(session,
102                         body.getExclusive(), body.getNoAck(),
103                         body.getNoLocal(), body.getNowait(), queue))
104                 {
105                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
106                 }
107 
108                 if (body.getConsumerTag() != null)
109                 {
110                     consumerTagName = body.getConsumerTag().intern();
111                 }
112                 else
113                 {
114                     consumerTagName = null;
115                 }
116 
117                 try
118                 {
119                     AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
120                                                                           body.getArguments(), body.getNoLocal(), body.getExclusive());
121                     if (!body.getNowait())
122                     {
123                         MethodRegistry methodRegistry = session.getMethodRegistry();
124                         AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
125                         session.writeFrame(responseBody.generateFrame(channelId));
126 
127                     }
128 
129                     
130                 }
131                 catch (org.apache.qpid.AMQInvalidArgumentException ise)
132                 {
133                     _logger.debug("Closing connection due to invalid selector");
134 
135                     MethodRegistry methodRegistry = session.getMethodRegistry();
136                     AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.INVALID_ARGUMENT.getCode(),
137                                                                                        new AMQShortString(ise.getMessage()),
138                                                                                        body.getClazz(),
139                                                                                        body.getMethod());
140                     session.writeFrame(responseBody.generateFrame(channelId));
141 
142 
143                 }
144                 catch (ConsumerTagNotUniqueException e)
145                 {
146                     AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() "'");
147 
148                     MethodRegistry methodRegistry = session.getMethodRegistry();
149                     AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
150                                                              msg,               // replytext
151                                                              body.getClazz(),
152                                                              body.getMethod());
153                     session.writeFrame(responseBody.generateFrame(0));
154                 }
155                 catch (AMQQueue.ExistingExclusiveSubscription e)
156                 {
157                     throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
158                                                    "Cannot subscribe to queue "
159                                                    + queue.getName()
160                                                    " as it already has an existing exclusive consumer");
161                 }
162                 catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
163                 {
164                     throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
165                                                    "Cannot subscribe to queue "
166                                                    + queue.getName()
167                                                    " exclusively as it already has a consumer");
168                 }
169 
170             }
171         }
172     }
173 }