ExchangeDeclareHandler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.handler;
022 
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQConnectionException;
025 import org.apache.qpid.AMQException;
026 import org.apache.qpid.AMQUnknownExchangeType;
027 import org.apache.qpid.framing.*;
028 import org.apache.qpid.protocol.AMQConstant;
029 import org.apache.qpid.server.exchange.Exchange;
030 import org.apache.qpid.server.exchange.ExchangeFactory;
031 import org.apache.qpid.server.exchange.ExchangeRegistry;
032 import org.apache.qpid.server.protocol.AMQProtocolSession;
033 import org.apache.qpid.server.security.access.Permission;
034 import org.apache.qpid.server.state.AMQStateManager;
035 import org.apache.qpid.server.state.StateAwareMethodListener;
036 import org.apache.qpid.server.virtualhost.VirtualHost;
037 
038 public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
039 {
040     private static final Logger _logger = Logger.getLogger(ExchangeDeclareHandler.class);
041 
042     private static final ExchangeDeclareHandler _instance = new ExchangeDeclareHandler();
043 
044     public static ExchangeDeclareHandler getInstance()
045     {
046         return _instance;
047     }
048 
049 
050 
051     private ExchangeDeclareHandler()
052     {
053     }
054 
055     public void methodReceived(AMQStateManager stateManager, ExchangeDeclareBody body, int channelIdthrows AMQException
056     {
057         AMQProtocolSession session = stateManager.getProtocolSession();
058         VirtualHost virtualHost = session.getVirtualHost();
059         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
060         ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
061 
062         if (!body.getPassive())
063         {
064             // Perform ACL if request is not passive
065             if (!virtualHost.getAccessManager().authoriseCreateExchange(session, body.getAutoDelete(),
066                     body.getDurable(), body.getExchange(), body.getInternal(), body.getNowait(), body.getPassive(),
067                     body.getType()))
068             {
069                 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
070             }
071 
072         }
073 
074         if (_logger.isDebugEnabled())
075         {
076             _logger.debug("Request to declare exchange of type " + body.getType() " with name " + body.getExchange());
077         }
078         synchronized(exchangeRegistry)
079         {
080             Exchange exchange = exchangeRegistry.getExchange(body.getExchange());
081 
082 
083 
084             if (exchange == null)
085             {
086                 if(body.getPassive() && ((body.getType() == null|| body.getType().length() ==0))
087                 {
088                     throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.getExchange());
089                 }
090                 else
091                 {
092                     try
093                     {
094 
095                     exchange = exchangeFactory.createExchange(body.getExchange() == null null : body.getExchange().intern(),
096                                                               body.getType() == null null : body.getType().intern(),
097                                                               body.getDurable(),
098                                                               body.getPassive(), body.getTicket());
099                     exchangeRegistry.registerExchange(exchange);
100                     }
101                     catch(AMQUnknownExchangeType e)
102                     {
103                         throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.getExchange(),e);
104                     }
105                 }
106             }
107             else if (!exchange.getType().equals(body.getType()))
108             {
109 
110                 throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() " of type " + exchange.getType() " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
111             }
112 
113         }
114         if(!body.getNowait())
115         {
116             MethodRegistry methodRegistry = session.getMethodRegistry();
117             AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
118             session.writeFrame(responseBody.generateFrame(channelId));
119 
120         }
121     }
122 }