BasicGetMethodHandler.java
001 /*
002  *  Licensed to the Apache Software Foundation (ASF) under one
003  *  or more contributor license agreements.  See the NOTICE file
004  *  distributed with this work for additional information
005  *  regarding copyright ownership.  The ASF licenses this file
006  *  to you under the Apache License, Version 2.0 (the
007  *  "License"); you may not use this file except in compliance
008  *  with the License.  You may obtain a copy of the License at
009  *
010  *    http://www.apache.org/licenses/LICENSE-2.0
011  *
012  *  Unless required by applicable law or agreed to in writing,
013  *  software distributed under the License is distributed on an
014  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015  *  KIND, either express or implied.  See the License for the
016  *  specific language governing permissions and limitations
017  *  under the License.
018  *
019  *
020  */
021 
022 package org.apache.qpid.server.handler;
023 
024 import org.apache.log4j.Logger;
025 import org.apache.qpid.AMQException;
026 import org.apache.qpid.framing.BasicGetBody;
027 import org.apache.qpid.framing.BasicGetEmptyBody;
028 import org.apache.qpid.framing.MethodRegistry;
029 import org.apache.qpid.framing.AMQShortString;
030 import org.apache.qpid.framing.FieldTable;
031 import org.apache.qpid.protocol.AMQConstant;
032 import org.apache.qpid.server.AMQChannel;
033 import org.apache.qpid.server.flow.FlowCreditManager;
034 import org.apache.qpid.server.flow.MessageOnlyCreditManager;
035 import org.apache.qpid.server.subscription.SubscriptionImpl;
036 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
037 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
038 import org.apache.qpid.server.subscription.Subscription;
039 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
040 import org.apache.qpid.server.protocol.AMQProtocolSession;
041 import org.apache.qpid.server.queue.AMQQueue;
042 import org.apache.qpid.server.queue.QueueEntry;
043 import org.apache.qpid.server.queue.SimpleAMQQueue;
044 import org.apache.qpid.server.security.access.Permission;
045 import org.apache.qpid.server.state.AMQStateManager;
046 import org.apache.qpid.server.state.StateAwareMethodListener;
047 import org.apache.qpid.server.virtualhost.VirtualHost;
048 
049 public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
050 {
051     private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
052 
053     private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler();
054 
055     public static BasicGetMethodHandler getInstance()
056     {
057         return _instance;
058     }
059 
060     private BasicGetMethodHandler()
061     {
062     }
063 
064     public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelIdthrows AMQException
065     {
066         AMQProtocolSession session = stateManager.getProtocolSession();
067 
068 
069         VirtualHost vHost = session.getVirtualHost();
070 
071         AMQChannel channel = session.getChannel(channelId);
072         if (channel == null)
073         {
074             throw body.getChannelNotFoundException(channelId);
075         }
076         else
077         {
078             AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
079             if (queue == null)
080             {
081                 _log.info("No queue for '" + body.getQueue() "'");
082                 if(body.getQueue()!=null)
083                 {
084                     throw body.getConnectionException(AMQConstant.NOT_FOUND,
085                                                       "No such queue, '" + body.getQueue()"'");
086                 }
087                 else
088                 {
089                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
090                                                       "No queue name provided, no default queue defined.");
091                 }
092             }
093             else
094             {
095 
096                 //Perform ACLs
097                 if (!vHost.getAccessManager().authoriseConsume(session, body.getNoAck(), queue))
098                 {
099                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
100                 }
101 
102                 if (!performGet(queue,session, channel, !body.getNoAck()))
103                 {
104                     MethodRegistry methodRegistry = session.getMethodRegistry();
105                     // TODO - set clusterId
106                     BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
107 
108 
109                     session.writeFrame(responseBody.generateFrame(channelId));
110                 }
111             }
112         }
113     }
114 
115     public static boolean performGet(final AMQQueue queue,
116                                      final AMQProtocolSession session,
117                                      final AMQChannel channel,
118                                      final boolean acks)
119             throws AMQException
120     {
121 
122         final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
123 
124         final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
125         {
126 
127             int _msg;
128 
129             public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
130             throws AMQException
131             {
132                 singleMessageCredit.useCreditForMessage(entry.getMessage());
133                 session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
134                                                                         deliveryTag, queue.getMessageCount());
135 
136             }
137         };
138         final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
139         {
140 
141             public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
142             {
143                 channel.addUnacknowledgedMessage(entry, deliveryTag, null);
144             }
145         };
146 
147         Subscription sub;
148         if(acks)
149         {
150             sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
151         }
152         else
153         {
154             sub = new GetNoAckSubscription(channel,
155                                                  session,
156                                                  null,
157                                                  null,
158                                                  false,
159                                                  singleMessageCredit,
160                                                  getDeliveryMethod,
161                                                  getRecordMethod);
162         }
163 
164         queue.registerSubscription(sub,false);
165         queue.flushSubscription(sub);
166         queue.unregisterSubscription(sub);
167         return(!singleMessageCredit.hasCredit());
168 
169 
170     }
171 
172     public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
173     {
174         public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
175                                AMQShortString consumerTag, FieldTable filters,
176                                boolean noLocal, FlowCreditManager creditManager,
177                                    ClientDeliveryMethod deliveryMethod,
178                                    RecordDeliveryMethod recordMethod)
179             throws AMQException
180         {
181             super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
182         }
183 
184         public boolean wouldSuspend(QueueEntry msg)
185         {
186             return !getCreditManager().useCreditForMessage(msg.getMessage());
187         }
188 
189     }
190 }