001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.handler;
022
023 import java.util.UUID;
024 import java.util.concurrent.atomic.AtomicInteger;
025
026 import org.apache.log4j.Logger;
027 import org.apache.qpid.AMQException;
028 import org.apache.qpid.framing.AMQShortString;
029 import org.apache.qpid.framing.MethodRegistry;
030 import org.apache.qpid.framing.QueueDeclareBody;
031 import org.apache.qpid.framing.QueueDeclareOkBody;
032 import org.apache.qpid.protocol.AMQConstant;
033 import org.apache.qpid.server.AMQChannel;
034 import org.apache.qpid.server.exchange.Exchange;
035 import org.apache.qpid.server.exchange.ExchangeRegistry;
036 import org.apache.qpid.server.protocol.AMQProtocolSession;
037 import org.apache.qpid.server.queue.AMQQueue;
038 import org.apache.qpid.server.queue.AMQQueueFactory;
039 import org.apache.qpid.server.queue.QueueRegistry;
040 import org.apache.qpid.server.registry.ApplicationRegistry;
041 import org.apache.qpid.server.routing.RoutingTable;
042 import org.apache.qpid.server.state.AMQStateManager;
043 import org.apache.qpid.server.state.StateAwareMethodListener;
044 import org.apache.qpid.server.virtualhost.VirtualHost;
045
046 public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
047 {
048 private static final Logger _logger = Logger.getLogger(QueueDeclareHandler.class);
049
050 private static final QueueDeclareHandler _instance = new QueueDeclareHandler();
051
052 public static QueueDeclareHandler getInstance()
053 {
054 return _instance;
055 }
056
057 public boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister();
058
059 private final AtomicInteger _counter = new AtomicInteger();
060
061 public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
062 {
063 AMQProtocolSession session = stateManager.getProtocolSession();
064 VirtualHost virtualHost = session.getVirtualHost();
065 ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
066 QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
067 RoutingTable routingTable = virtualHost.getRoutingTable();
068
069
070 if (!body.getPassive())
071 {
072 // Perform ACL if request is not passive
073 if (!virtualHost.getAccessManager().authoriseCreateQueue(session, body.getAutoDelete(), body.getDurable(),
074 body.getExclusive(), body.getNowait(), body.getPassive(), body.getQueue()))
075 {
076 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
077 }
078 }
079
080 final AMQShortString queueName;
081
082 // if we aren't given a queue name, we create one which we return to the client
083
084 if ((body.getQueue() == null) || (body.getQueue().length() == 0))
085 {
086 queueName = createName();
087 }
088 else
089 {
090 queueName = body.getQueue().intern();
091 }
092
093 AMQQueue queue;
094 //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
095
096 synchronized (queueRegistry)
097 {
098
099
100
101 if (((queue = queueRegistry.getQueue(queueName)) == null))
102 {
103
104 if (body.getPassive())
105 {
106 String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
107 throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
108 }
109 else
110 {
111 queue = createQueue(queueName, body, virtualHost, session);
112 if (queue.isDurable() && !queue.isAutoDelete())
113 {
114 routingTable.createQueue(queue, body.getArguments());
115 }
116 queueRegistry.registerQueue(queue);
117 if (autoRegister)
118 {
119 Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
120
121 queue.bind(defaultExchange, queueName, null);
122 _logger.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getName() + ")");
123 }
124 }
125 }
126 else if (queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
127 {
128 throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
129 + " as exclusive queue with same name "
130 + "declared on another client ID('"
131 + queue.getOwner() + "')");
132 }
133
134 AMQChannel channel = session.getChannel(channelId);
135
136 if (channel == null)
137 {
138 throw body.getChannelNotFoundException(channelId);
139 }
140
141 //set this as the default queue on the channel:
142 channel.setDefaultQueue(queue);
143 }
144
145 if (!body.getNowait())
146 {
147 MethodRegistry methodRegistry = session.getMethodRegistry();
148 QueueDeclareOkBody responseBody =
149 methodRegistry.createQueueDeclareOkBody(queueName,
150 queue.getMessageCount(),
151 queue.getConsumerCount());
152 session.writeFrame(responseBody.generateFrame(channelId));
153
154 _logger.info("Queue " + queueName + " declared successfully");
155 }
156 }
157
158 protected AMQShortString createName()
159 {
160 return new AMQShortString("tmp_" + UUID.randomUUID());
161 }
162
163 protected AMQQueue createQueue(final AMQShortString queueName,
164 QueueDeclareBody body,
165 VirtualHost virtualHost,
166 final AMQProtocolSession session)
167 throws AMQException
168 {
169 final QueueRegistry registry = virtualHost.getQueueRegistry();
170 AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
171
172 final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost,
173 body.getArguments());
174
175
176 if (body.getExclusive() && !body.getDurable())
177 {
178 final AMQProtocolSession.Task deleteQueueTask =
179 new AMQProtocolSession.Task()
180 {
181 public void doTask(AMQProtocolSession session) throws AMQException
182 {
183 if (registry.getQueue(queueName) == queue)
184 {
185 queue.delete();
186 }
187 }
188 };
189
190 session.addSessionCloseTask(deleteQueueTask);
191
192 queue.addQueueDeleteTask(new AMQQueue.Task()
193 {
194 public void doTask(AMQQueue queue)
195 {
196 session.removeSessionCloseTask(deleteQueueTask);
197 }
198 });
199 }// if exclusive and not durable
200
201 return queue;
202 }
203 }
|