QueueDeclareHandler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.handler;
022 
023 import java.util.UUID;
024 import java.util.concurrent.atomic.AtomicInteger;
025 
026 import org.apache.log4j.Logger;
027 import org.apache.qpid.AMQException;
028 import org.apache.qpid.framing.AMQShortString;
029 import org.apache.qpid.framing.MethodRegistry;
030 import org.apache.qpid.framing.QueueDeclareBody;
031 import org.apache.qpid.framing.QueueDeclareOkBody;
032 import org.apache.qpid.protocol.AMQConstant;
033 import org.apache.qpid.server.AMQChannel;
034 import org.apache.qpid.server.exchange.Exchange;
035 import org.apache.qpid.server.exchange.ExchangeRegistry;
036 import org.apache.qpid.server.protocol.AMQProtocolSession;
037 import org.apache.qpid.server.queue.AMQQueue;
038 import org.apache.qpid.server.queue.AMQQueueFactory;
039 import org.apache.qpid.server.queue.QueueRegistry;
040 import org.apache.qpid.server.registry.ApplicationRegistry;
041 import org.apache.qpid.server.routing.RoutingTable;
042 import org.apache.qpid.server.state.AMQStateManager;
043 import org.apache.qpid.server.state.StateAwareMethodListener;
044 import org.apache.qpid.server.virtualhost.VirtualHost;
045 
046 public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
047 {
048     private static final Logger _logger = Logger.getLogger(QueueDeclareHandler.class);
049 
050     private static final QueueDeclareHandler _instance = new QueueDeclareHandler();
051 
052     public static QueueDeclareHandler getInstance()
053     {
054         return _instance;
055     }
056 
057     public boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister();
058 
059     private final AtomicInteger _counter = new AtomicInteger();
060 
061     public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelIdthrows AMQException
062     {
063         AMQProtocolSession session = stateManager.getProtocolSession();
064         VirtualHost virtualHost = session.getVirtualHost();
065         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
066         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
067         RoutingTable routingTable = virtualHost.getRoutingTable();
068 
069 
070         if (!body.getPassive())
071         {
072             // Perform ACL if request is not passive
073             if (!virtualHost.getAccessManager().authoriseCreateQueue(session, body.getAutoDelete(), body.getDurable(),
074                     body.getExclusive(), body.getNowait(), body.getPassive(), body.getQueue()))
075             {
076                 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
077             }
078         }
079 
080         final AMQShortString queueName;
081 
082         // if we aren't given a queue name, we create one which we return to the client
083 
084         if ((body.getQueue() == null|| (body.getQueue().length() == 0))
085         {
086             queueName = createName();
087         }
088         else
089         {
090             queueName = body.getQueue().intern();
091         }
092 
093         AMQQueue queue;
094         //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
095 
096         synchronized (queueRegistry)
097         {
098 
099 
100 
101             if (((queue = queueRegistry.getQueue(queueName)) == null))
102             {
103 
104                 if (body.getPassive())
105                 {
106                     String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
107                     throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
108                 }
109                 else
110                 {
111                     queue = createQueue(queueName, body, virtualHost, session);
112                     if (queue.isDurable() && !queue.isAutoDelete())
113                     {
114                         routingTable.createQueue(queue, body.getArguments());
115                     }
116                     queueRegistry.registerQueue(queue);
117                     if (autoRegister)
118                     {
119                         Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
120 
121                         queue.bind(defaultExchange, queueName, null);
122                         _logger.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getName() ")");
123                     }
124                 }
125             }
126             else if (queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
127             {
128                 throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
129                                                                            " as exclusive queue with same name "
130                                                                            "declared on another client ID('"
131                                                                            + queue.getOwner() "')");
132             }
133 
134             AMQChannel channel = session.getChannel(channelId);
135 
136             if (channel == null)
137             {
138                 throw body.getChannelNotFoundException(channelId);
139             }
140 
141             //set this as the default queue on the channel:
142             channel.setDefaultQueue(queue);
143         }
144 
145         if (!body.getNowait())
146         {
147             MethodRegistry methodRegistry = session.getMethodRegistry();
148             QueueDeclareOkBody responseBody =
149                     methodRegistry.createQueueDeclareOkBody(queueName,
150                                                             queue.getMessageCount(),
151                                                             queue.getConsumerCount());
152             session.writeFrame(responseBody.generateFrame(channelId));
153 
154             _logger.info("Queue " + queueName + " declared successfully");
155         }
156     }
157 
158     protected AMQShortString createName()
159     {
160         return new AMQShortString("tmp_" + UUID.randomUUID());
161     }
162 
163     protected AMQQueue createQueue(final AMQShortString queueName,
164                                    QueueDeclareBody body,
165                                    VirtualHost virtualHost,
166                                    final AMQProtocolSession session)
167             throws AMQException
168     {
169         final QueueRegistry registry = virtualHost.getQueueRegistry();
170         AMQShortString owner = body.getExclusive() ? session.getContextKey() null;
171 
172         final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost,
173                                                                   body.getArguments());
174 
175 
176         if (body.getExclusive() && !body.getDurable())
177         {
178             final AMQProtocolSession.Task deleteQueueTask =
179                     new AMQProtocolSession.Task()
180                     {
181                         public void doTask(AMQProtocolSession sessionthrows AMQException
182                         {
183                             if (registry.getQueue(queueName== queue)
184                             {
185                                 queue.delete();
186                             }
187                         }
188                     };
189 
190             session.addSessionCloseTask(deleteQueueTask);
191 
192             queue.addQueueDeleteTask(new AMQQueue.Task()
193             {
194                 public void doTask(AMQQueue queue)
195                 {
196                     session.removeSessionCloseTask(deleteQueueTask);
197                 }
198             });
199         }// if exclusive and not durable
200         
201         return queue;
202     }
203 }