001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.handler;
022
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQConnectionException;
025 import org.apache.qpid.AMQException;
026 import org.apache.qpid.AMQUnknownExchangeType;
027 import org.apache.qpid.framing.*;
028 import org.apache.qpid.protocol.AMQConstant;
029 import org.apache.qpid.server.exchange.Exchange;
030 import org.apache.qpid.server.exchange.ExchangeFactory;
031 import org.apache.qpid.server.exchange.ExchangeRegistry;
032 import org.apache.qpid.server.protocol.AMQProtocolSession;
033 import org.apache.qpid.server.security.access.Permission;
034 import org.apache.qpid.server.state.AMQStateManager;
035 import org.apache.qpid.server.state.StateAwareMethodListener;
036 import org.apache.qpid.server.virtualhost.VirtualHost;
037
038 public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
039 {
040 private static final Logger _logger = Logger.getLogger(ExchangeDeclareHandler.class);
041
042 private static final ExchangeDeclareHandler _instance = new ExchangeDeclareHandler();
043
044 public static ExchangeDeclareHandler getInstance()
045 {
046 return _instance;
047 }
048
049
050
051 private ExchangeDeclareHandler()
052 {
053 }
054
055 public void methodReceived(AMQStateManager stateManager, ExchangeDeclareBody body, int channelId) throws AMQException
056 {
057 AMQProtocolSession session = stateManager.getProtocolSession();
058 VirtualHost virtualHost = session.getVirtualHost();
059 ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
060 ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
061
062 if (!body.getPassive())
063 {
064 // Perform ACL if request is not passive
065 if (!virtualHost.getAccessManager().authoriseCreateExchange(session, body.getAutoDelete(),
066 body.getDurable(), body.getExchange(), body.getInternal(), body.getNowait(), body.getPassive(),
067 body.getType()))
068 {
069 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
070 }
071
072 }
073
074 if (_logger.isDebugEnabled())
075 {
076 _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + body.getExchange());
077 }
078 synchronized(exchangeRegistry)
079 {
080 Exchange exchange = exchangeRegistry.getExchange(body.getExchange());
081
082
083
084 if (exchange == null)
085 {
086 if(body.getPassive() && ((body.getType() == null) || body.getType().length() ==0))
087 {
088 throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.getExchange());
089 }
090 else
091 {
092 try
093 {
094
095 exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
096 body.getType() == null ? null : body.getType().intern(),
097 body.getDurable(),
098 body.getPassive(), body.getTicket());
099 exchangeRegistry.registerExchange(exchange);
100 }
101 catch(AMQUnknownExchangeType e)
102 {
103 throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.getExchange(),e);
104 }
105 }
106 }
107 else if (!exchange.getType().equals(body.getType()))
108 {
109
110 throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
111 }
112
113 }
114 if(!body.getNowait())
115 {
116 MethodRegistry methodRegistry = session.getMethodRegistry();
117 AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
118 session.writeFrame(responseBody.generateFrame(channelId));
119
120 }
121 }
122 }
|