001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 *
019 *
020 */
021
022 package org.apache.qpid.server.handler;
023
024 import org.apache.log4j.Logger;
025 import org.apache.qpid.AMQException;
026 import org.apache.qpid.framing.BasicGetBody;
027 import org.apache.qpid.framing.BasicGetEmptyBody;
028 import org.apache.qpid.framing.MethodRegistry;
029 import org.apache.qpid.framing.AMQShortString;
030 import org.apache.qpid.framing.FieldTable;
031 import org.apache.qpid.protocol.AMQConstant;
032 import org.apache.qpid.server.AMQChannel;
033 import org.apache.qpid.server.flow.FlowCreditManager;
034 import org.apache.qpid.server.flow.MessageOnlyCreditManager;
035 import org.apache.qpid.server.subscription.SubscriptionImpl;
036 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
037 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
038 import org.apache.qpid.server.subscription.Subscription;
039 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
040 import org.apache.qpid.server.protocol.AMQProtocolSession;
041 import org.apache.qpid.server.queue.AMQQueue;
042 import org.apache.qpid.server.queue.QueueEntry;
043 import org.apache.qpid.server.queue.SimpleAMQQueue;
044 import org.apache.qpid.server.security.access.Permission;
045 import org.apache.qpid.server.state.AMQStateManager;
046 import org.apache.qpid.server.state.StateAwareMethodListener;
047 import org.apache.qpid.server.virtualhost.VirtualHost;
048
049 public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
050 {
051 private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
052
053 private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler();
054
055 public static BasicGetMethodHandler getInstance()
056 {
057 return _instance;
058 }
059
060 private BasicGetMethodHandler()
061 {
062 }
063
064 public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException
065 {
066 AMQProtocolSession session = stateManager.getProtocolSession();
067
068
069 VirtualHost vHost = session.getVirtualHost();
070
071 AMQChannel channel = session.getChannel(channelId);
072 if (channel == null)
073 {
074 throw body.getChannelNotFoundException(channelId);
075 }
076 else
077 {
078 AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
079 if (queue == null)
080 {
081 _log.info("No queue for '" + body.getQueue() + "'");
082 if(body.getQueue()!=null)
083 {
084 throw body.getConnectionException(AMQConstant.NOT_FOUND,
085 "No such queue, '" + body.getQueue()+ "'");
086 }
087 else
088 {
089 throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
090 "No queue name provided, no default queue defined.");
091 }
092 }
093 else
094 {
095
096 //Perform ACLs
097 if (!vHost.getAccessManager().authoriseConsume(session, body.getNoAck(), queue))
098 {
099 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
100 }
101
102 if (!performGet(queue,session, channel, !body.getNoAck()))
103 {
104 MethodRegistry methodRegistry = session.getMethodRegistry();
105 // TODO - set clusterId
106 BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
107
108
109 session.writeFrame(responseBody.generateFrame(channelId));
110 }
111 }
112 }
113 }
114
115 public static boolean performGet(final AMQQueue queue,
116 final AMQProtocolSession session,
117 final AMQChannel channel,
118 final boolean acks)
119 throws AMQException
120 {
121
122 final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
123
124 final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
125 {
126
127 int _msg;
128
129 public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
130 throws AMQException
131 {
132 singleMessageCredit.useCreditForMessage(entry.getMessage());
133 session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
134 deliveryTag, queue.getMessageCount());
135
136 }
137 };
138 final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
139 {
140
141 public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
142 {
143 channel.addUnacknowledgedMessage(entry, deliveryTag, null);
144 }
145 };
146
147 Subscription sub;
148 if(acks)
149 {
150 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
151 }
152 else
153 {
154 sub = new GetNoAckSubscription(channel,
155 session,
156 null,
157 null,
158 false,
159 singleMessageCredit,
160 getDeliveryMethod,
161 getRecordMethod);
162 }
163
164 queue.registerSubscription(sub,false);
165 queue.flushSubscription(sub);
166 queue.unregisterSubscription(sub);
167 return(!singleMessageCredit.hasCredit());
168
169
170 }
171
172 public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
173 {
174 public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
175 AMQShortString consumerTag, FieldTable filters,
176 boolean noLocal, FlowCreditManager creditManager,
177 ClientDeliveryMethod deliveryMethod,
178 RecordDeliveryMethod recordMethod)
179 throws AMQException
180 {
181 super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
182 }
183
184 public boolean wouldSuspend(QueueEntry msg)
185 {
186 return !getCreditManager().useCreditForMessage(msg.getMessage());
187 }
188
189 }
190 }
|