DefaultExchangeRegistry.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.exchange;
022 
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.framing.AMQShortString;
026 import org.apache.qpid.server.protocol.ExchangeInitialiser;
027 import org.apache.qpid.server.queue.IncomingMessage;
028 import org.apache.qpid.server.virtualhost.VirtualHost;
029 import org.apache.qpid.server.routing.RoutingTable;
030 
031 import java.util.Collection;
032 import java.util.concurrent.ConcurrentHashMap;
033 import java.util.concurrent.ConcurrentMap;
034 
035 public class DefaultExchangeRegistry implements ExchangeRegistry
036 {
037     private static final Logger _log = Logger.getLogger(DefaultExchangeRegistry.class);
038 
039     /**
040      * Maps from exchange name to exchange instance
041      */
042     private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>();
043 
044     private Exchange _defaultExchange;
045     private VirtualHost _host;
046 
047     public DefaultExchangeRegistry(VirtualHost host)
048     {
049         //create 'standard' exchanges:
050         _host = host;
051 
052     }
053 
054     public void initialise() throws AMQException
055     {
056         new ExchangeInitialiser().initialise(_host.getExchangeFactory()this);
057     }
058 
059     public RoutingTable getRoutingTable()
060     {
061         return _host.getRoutingTable();
062     }
063 
064     public void registerExchange(Exchange exchangethrows AMQException
065     {
066         _exchangeMap.put(exchange.getName(), exchange);
067         if (exchange.isDurable())
068         {
069             getRoutingTable().createExchange(exchange);
070         }
071     }
072 
073     public void setDefaultExchange(Exchange exchange)
074     {
075         _defaultExchange = exchange;
076     }
077 
078     public Exchange getDefaultExchange()
079     {
080         return _defaultExchange;
081     }
082 
083     public Collection<AMQShortString> getExchangeNames()
084     {
085         return _exchangeMap.keySet();
086     }
087 
088     public void unregisterExchange(AMQShortString name, boolean inUsethrows AMQException
089     {
090         // TODO: check inUse argument
091         Exchange e = _exchangeMap.remove(name);
092         if (e != null)
093         {
094             if (e.isDurable())
095             {
096                 getRoutingTable().removeExchange(e);
097             }
098             e.close();
099         }
100         else
101         {
102             throw new AMQException("Unknown exchange " + name);
103         }
104     }
105 
106     public Exchange getExchange(AMQShortString name)
107     {
108         if ((name == null|| name.length() == 0)
109         {
110             return getDefaultExchange();
111         }
112         else
113         {
114             return _exchangeMap.get(name);
115         }
116 
117     }
118 
119     /**
120      * Routes content through exchanges, delivering it to 1 or more queues.
121      @param payload
122      @throws AMQException if something goes wrong delivering data
123      */
124     public void routeContent(IncomingMessage payloadthrows AMQException
125     {
126         final AMQShortString exchange = payload.getExchange();
127         final Exchange exch = getExchange(exchange);
128         // there is a small window of opportunity for the exchange to be deleted in between
129         // the BasicPublish being received (where the exchange is validated) and the final
130         // content body being received (which triggers this method)
131         // TODO: check where the exchange is validated
132         if (exch == null)
133         {
134             throw new AMQException("Exchange '" + exchange + "' does not exist");
135         }
136         exch.route(payload);
137     }
138 }