001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.handler;
022
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.AMQInvalidRoutingKeyException;
026 import org.apache.qpid.framing.*;
027 import org.apache.qpid.protocol.AMQConstant;
028 import org.apache.qpid.server.AMQChannel;
029 import org.apache.qpid.server.exchange.Exchange;
030 import org.apache.qpid.server.exchange.ExchangeRegistry;
031 import org.apache.qpid.server.protocol.AMQProtocolSession;
032 import org.apache.qpid.server.queue.AMQQueue;
033 import org.apache.qpid.server.queue.QueueRegistry;
034 import org.apache.qpid.server.security.access.Permission;
035 import org.apache.qpid.server.state.AMQStateManager;
036 import org.apache.qpid.server.state.StateAwareMethodListener;
037 import org.apache.qpid.server.virtualhost.VirtualHost;
038
039 public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
040 {
041 private static final Logger _log = Logger.getLogger(QueueBindHandler.class);
042
043 private static final QueueBindHandler _instance = new QueueBindHandler();
044
045 public static QueueBindHandler getInstance()
046 {
047 return _instance;
048 }
049
050 private QueueBindHandler()
051 {
052 }
053
054 public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException
055 {
056 AMQProtocolSession session = stateManager.getProtocolSession();
057 VirtualHost virtualHost = session.getVirtualHost();
058 ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
059 QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
060
061
062 final AMQQueue queue;
063 final AMQShortString routingKey;
064
065 if (body.getQueue() == null)
066 {
067 AMQChannel channel = session.getChannel(channelId);
068
069 if (channel == null)
070 {
071 throw body.getChannelNotFoundException(channelId);
072 }
073
074 queue = channel.getDefaultQueue();
075
076 if (queue == null)
077 {
078 throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
079 }
080
081 if (body.getRoutingKey() == null)
082 {
083 routingKey = queue.getName();
084 }
085 else
086 {
087 routingKey = body.getRoutingKey().intern();
088 }
089 }
090 else
091 {
092 queue = queueRegistry.getQueue(body.getQueue());
093 routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern();
094 }
095
096 if (queue == null)
097 {
098 throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
099 }
100 final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
101 if (exch == null)
102 {
103 throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
104 }
105
106
107 try
108 {
109
110 //Perform ACLs
111 if (!virtualHost.getAccessManager().authoriseBind(session, exch,
112 queue, routingKey))
113 {
114 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
115 }
116
117 if (!exch.isBound(routingKey, body.getArguments(), queue))
118 {
119 queue.bind(exch, routingKey, body.getArguments());
120 }
121 }
122 catch (AMQInvalidRoutingKeyException rke)
123 {
124 throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
125 }
126 catch (AMQException e)
127 {
128 throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
129 }
130
131 if (_log.isInfoEnabled())
132 {
133 _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
134 }
135 if (!body.getNowait())
136 {
137 MethodRegistry methodRegistry = session.getMethodRegistry();
138 AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
139 session.writeFrame(responseBody.generateFrame(channelId));
140
141 }
142 }
143 }
|