001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.handler;
022
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.framing.*;
026 import org.apache.qpid.protocol.AMQConstant;
027 import org.apache.qpid.server.AMQChannel;
028 import org.apache.qpid.server.ConsumerTagNotUniqueException;
029 import org.apache.qpid.server.protocol.AMQProtocolSession;
030 import org.apache.qpid.server.queue.AMQQueue;
031 import org.apache.qpid.server.security.access.Permission;
032 import org.apache.qpid.server.state.AMQStateManager;
033 import org.apache.qpid.server.state.StateAwareMethodListener;
034 import org.apache.qpid.server.virtualhost.VirtualHost;
035
036 public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody>
037 {
038 private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class);
039
040 private static final BasicConsumeMethodHandler _instance = new BasicConsumeMethodHandler();
041
042 public static BasicConsumeMethodHandler getInstance()
043 {
044 return _instance;
045 }
046
047 private BasicConsumeMethodHandler()
048 {
049 }
050
051 public void methodReceived(AMQStateManager stateManager, BasicConsumeBody body, int channelId) throws AMQException
052 {
053 AMQProtocolSession session = stateManager.getProtocolSession();
054
055
056
057
058 AMQChannel channel = session.getChannel(channelId);
059
060 VirtualHost vHost = session.getVirtualHost();
061
062 if (channel == null)
063 {
064 throw body.getChannelNotFoundException(channelId);
065 }
066 else
067 {
068 if (_logger.isDebugEnabled())
069 {
070 _logger.debug("BasicConsume: from '" + body.getQueue() +
071 "' for:" + body.getConsumerTag() +
072 " nowait:" + body.getNowait() +
073 " args:" + body.getArguments());
074 }
075
076 AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue().intern());
077
078 if (queue == null)
079 {
080 if (_logger.isDebugEnabled())
081 {
082 _logger.debug("No queue for '" + body.getQueue() + "'");
083 }
084 if (body.getQueue() != null)
085 {
086 String msg = "No such queue, '" + body.getQueue() + "'";
087 throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
088 }
089 else
090 {
091 String msg = "No queue name provided, no default queue defined.";
092 throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg);
093 }
094 }
095 else
096 {
097
098 final AMQShortString consumerTagName;
099
100 // Check authz
101 if (!vHost.getAccessManager().authoriseConsume(session,
102 body.getExclusive(), body.getNoAck(),
103 body.getNoLocal(), body.getNowait(), queue))
104 {
105 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
106 }
107
108 if (body.getConsumerTag() != null)
109 {
110 consumerTagName = body.getConsumerTag().intern();
111 }
112 else
113 {
114 consumerTagName = null;
115 }
116
117 try
118 {
119 AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
120 body.getArguments(), body.getNoLocal(), body.getExclusive());
121 if (!body.getNowait())
122 {
123 MethodRegistry methodRegistry = session.getMethodRegistry();
124 AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
125 session.writeFrame(responseBody.generateFrame(channelId));
126
127 }
128
129
130 }
131 catch (org.apache.qpid.AMQInvalidArgumentException ise)
132 {
133 _logger.debug("Closing connection due to invalid selector");
134
135 MethodRegistry methodRegistry = session.getMethodRegistry();
136 AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.INVALID_ARGUMENT.getCode(),
137 new AMQShortString(ise.getMessage()),
138 body.getClazz(),
139 body.getMethod());
140 session.writeFrame(responseBody.generateFrame(channelId));
141
142
143 }
144 catch (ConsumerTagNotUniqueException e)
145 {
146 AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
147
148 MethodRegistry methodRegistry = session.getMethodRegistry();
149 AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
150 msg, // replytext
151 body.getClazz(),
152 body.getMethod());
153 session.writeFrame(responseBody.generateFrame(0));
154 }
155 catch (AMQQueue.ExistingExclusiveSubscription e)
156 {
157 throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
158 "Cannot subscribe to queue "
159 + queue.getName()
160 + " as it already has an existing exclusive consumer");
161 }
162 catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
163 {
164 throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
165 "Cannot subscribe to queue "
166 + queue.getName()
167 + " exclusively as it already has a consumer");
168 }
169
170 }
171 }
172 }
173 }
|