001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.exchange;
022
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.framing.AMQShortString;
026 import org.apache.qpid.server.protocol.ExchangeInitialiser;
027 import org.apache.qpid.server.queue.IncomingMessage;
028 import org.apache.qpid.server.virtualhost.VirtualHost;
029 import org.apache.qpid.server.routing.RoutingTable;
030
031 import java.util.Collection;
032 import java.util.concurrent.ConcurrentHashMap;
033 import java.util.concurrent.ConcurrentMap;
034
035 public class DefaultExchangeRegistry implements ExchangeRegistry
036 {
037 private static final Logger _log = Logger.getLogger(DefaultExchangeRegistry.class);
038
039 /**
040 * Maps from exchange name to exchange instance
041 */
042 private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>();
043
044 private Exchange _defaultExchange;
045 private VirtualHost _host;
046
047 public DefaultExchangeRegistry(VirtualHost host)
048 {
049 //create 'standard' exchanges:
050 _host = host;
051
052 }
053
054 public void initialise() throws AMQException
055 {
056 new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this);
057 }
058
059 public RoutingTable getRoutingTable()
060 {
061 return _host.getRoutingTable();
062 }
063
064 public void registerExchange(Exchange exchange) throws AMQException
065 {
066 _exchangeMap.put(exchange.getName(), exchange);
067 if (exchange.isDurable())
068 {
069 getRoutingTable().createExchange(exchange);
070 }
071 }
072
073 public void setDefaultExchange(Exchange exchange)
074 {
075 _defaultExchange = exchange;
076 }
077
078 public Exchange getDefaultExchange()
079 {
080 return _defaultExchange;
081 }
082
083 public Collection<AMQShortString> getExchangeNames()
084 {
085 return _exchangeMap.keySet();
086 }
087
088 public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException
089 {
090 // TODO: check inUse argument
091 Exchange e = _exchangeMap.remove(name);
092 if (e != null)
093 {
094 if (e.isDurable())
095 {
096 getRoutingTable().removeExchange(e);
097 }
098 e.close();
099 }
100 else
101 {
102 throw new AMQException("Unknown exchange " + name);
103 }
104 }
105
106 public Exchange getExchange(AMQShortString name)
107 {
108 if ((name == null) || name.length() == 0)
109 {
110 return getDefaultExchange();
111 }
112 else
113 {
114 return _exchangeMap.get(name);
115 }
116
117 }
118
119 /**
120 * Routes content through exchanges, delivering it to 1 or more queues.
121 * @param payload
122 * @throws AMQException if something goes wrong delivering data
123 */
124 public void routeContent(IncomingMessage payload) throws AMQException
125 {
126 final AMQShortString exchange = payload.getExchange();
127 final Exchange exch = getExchange(exchange);
128 // there is a small window of opportunity for the exchange to be deleted in between
129 // the BasicPublish being received (where the exchange is validated) and the final
130 // content body being received (which triggers this method)
131 // TODO: check where the exchange is validated
132 if (exch == null)
133 {
134 throw new AMQException("Exchange '" + exchange + "' does not exist");
135 }
136 exch.route(payload);
137 }
138 }
|