QueueBindHandler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.handler;
022 
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.AMQInvalidRoutingKeyException;
026 import org.apache.qpid.framing.*;
027 import org.apache.qpid.protocol.AMQConstant;
028 import org.apache.qpid.server.AMQChannel;
029 import org.apache.qpid.server.exchange.Exchange;
030 import org.apache.qpid.server.exchange.ExchangeRegistry;
031 import org.apache.qpid.server.protocol.AMQProtocolSession;
032 import org.apache.qpid.server.queue.AMQQueue;
033 import org.apache.qpid.server.queue.QueueRegistry;
034 import org.apache.qpid.server.security.access.Permission;
035 import org.apache.qpid.server.state.AMQStateManager;
036 import org.apache.qpid.server.state.StateAwareMethodListener;
037 import org.apache.qpid.server.virtualhost.VirtualHost;
038 
039 public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
040 {
041     private static final Logger _log = Logger.getLogger(QueueBindHandler.class);
042 
043     private static final QueueBindHandler _instance = new QueueBindHandler();
044 
045     public static QueueBindHandler getInstance()
046     {
047         return _instance;
048     }
049 
050     private QueueBindHandler()
051     {
052     }
053 
054     public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelIdthrows AMQException
055     {
056         AMQProtocolSession session = stateManager.getProtocolSession();
057         VirtualHost virtualHost = session.getVirtualHost();
058         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
059         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
060 
061 
062         final AMQQueue queue;
063         final AMQShortString routingKey;
064 
065         if (body.getQueue() == null)
066         {
067             AMQChannel channel = session.getChannel(channelId);
068 
069             if (channel == null)
070             {
071                 throw body.getChannelNotFoundException(channelId);
072             }
073 
074             queue = channel.getDefaultQueue();
075 
076             if (queue == null)
077             {
078                 throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
079             }
080 
081             if (body.getRoutingKey() == null)
082             {
083                 routingKey = queue.getName();
084             }
085             else
086             {
087                 routingKey = body.getRoutingKey().intern();
088             }
089         }
090         else
091         {
092             queue = queueRegistry.getQueue(body.getQueue());
093             routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern();
094         }
095 
096         if (queue == null)
097         {
098             throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() " does not exist.");
099         }
100         final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
101         if (exch == null)
102         {
103             throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() " does not exist.");
104         }
105 
106 
107         try
108         {
109 
110             //Perform ACLs
111             if (!virtualHost.getAccessManager().authoriseBind(session, exch,
112                     queue, routingKey))
113             {
114                 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
115             }
116 
117             if (!exch.isBound(routingKey, body.getArguments(), queue))
118             {
119                 queue.bind(exch, routingKey, body.getArguments());
120             }
121         }
122         catch (AMQInvalidRoutingKeyException rke)
123         {
124             throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
125         }
126         catch (AMQException e)
127         {
128             throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
129         }
130 
131         if (_log.isInfoEnabled())
132         {
133             _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
134         }
135         if (!body.getNowait())
136         {
137             MethodRegistry methodRegistry = session.getMethodRegistry();
138             AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
139             session.writeFrame(responseBody.generateFrame(channelId));
140 
141         }
142     }
143 }