VirtualHost.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.virtualhost;
022 
023 import java.util.Collections;
024 import java.util.List;
025 import java.util.Timer;
026 import java.util.TimerTask;
027 
028 import javax.management.NotCompliantMBeanException;
029 
030 import org.apache.commons.configuration.ConfigurationException;
031 import org.apache.log4j.Logger;
032 import org.apache.qpid.AMQException;
033 import org.apache.qpid.framing.AMQShortString;
034 import org.apache.qpid.server.AMQBrokerManagerMBean;
035 import org.apache.qpid.server.configuration.ExchangeConfiguration;
036 import org.apache.qpid.server.configuration.QueueConfiguration;
037 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
038 import org.apache.qpid.server.connection.ConnectionRegistry;
039 import org.apache.qpid.server.connection.IConnectionRegistry;
040 import org.apache.qpid.server.exchange.DefaultExchangeFactory;
041 import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
042 import org.apache.qpid.server.exchange.Exchange;
043 import org.apache.qpid.server.exchange.ExchangeFactory;
044 import org.apache.qpid.server.exchange.ExchangeRegistry;
045 import org.apache.qpid.server.management.AMQManagedObject;
046 import org.apache.qpid.server.management.ManagedObject;
047 import org.apache.qpid.server.queue.AMQQueue;
048 import org.apache.qpid.server.queue.AMQQueueFactory;
049 import org.apache.qpid.server.queue.DefaultQueueRegistry;
050 import org.apache.qpid.server.queue.QueueRegistry;
051 import org.apache.qpid.server.registry.ApplicationRegistry;
052 import org.apache.qpid.server.routing.RoutingTable;
053 import org.apache.qpid.server.security.access.ACLManager;
054 import org.apache.qpid.server.security.access.Accessable;
055 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
056 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
057 import org.apache.qpid.server.transactionlog.TransactionLog;
058 
059 public class VirtualHost implements Accessable
060 {
061     private static final Logger _logger = Logger.getLogger(VirtualHost.class);
062 
063 
064     private final String _name;
065 
066     private ConnectionRegistry _connectionRegistry;
067 
068     private QueueRegistry _queueRegistry;
069 
070     private ExchangeRegistry _exchangeRegistry;
071 
072     private ExchangeFactory _exchangeFactory;
073 
074     private TransactionLog _transactionLog;
075 
076     private RoutingTable _routingTable;
077 
078     protected VirtualHostMBean _virtualHostMBean;
079 
080     private AMQBrokerManagerMBean _brokerMBean;
081 
082     private AuthenticationManager _authenticationManager;
083 
084     private ACLManager _accessManager;
085 
086     private final Timer _houseKeepingTimer;
087      
088     public void setAccessableName(String name)
089     {
090         _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
091                      + name + ") ignored remains :" + getAccessableName());
092     }
093 
094     public String getAccessableName()
095     {
096         return _name;
097     }
098 
099     public IConnectionRegistry getConnectionRegistry()
100     {
101         return _connectionRegistry;
102     }
103 
104     public RoutingTable getRoutingTable()
105     {
106         return _routingTable;
107     }
108 
109     /**
110      * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
111      * implementaion of an Exchange MBean should extend this class.
112      */
113     public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
114     {
115         public VirtualHostMBean() throws NotCompliantMBeanException
116         {
117             super(ManagedVirtualHost.class, "VirtualHost");
118         }
119 
120         public String getObjectInstanceName()
121         {
122             return _name.toString();
123         }
124 
125         public String getName()
126         {
127             return _name.toString();
128         }
129 
130         public VirtualHost getVirtualHost()
131         {
132             return VirtualHost.this;
133         }
134 
135 
136     // End of MBean class
137 
138     /**
139      * Normal Constructor
140      @param name
141      @param hostConfig
142      @throws Exception
143      */
144     public VirtualHost(VirtualHostConfiguration hostConfigthrows Exception
145     {
146         this(hostConfig, null);
147     }
148 
149     public VirtualHost(VirtualHostConfiguration hostConfig, TransactionLog transactionLogthrows Exception
150     {
151         _name = hostConfig.getName();
152         
153         if (_name == null || _name.length() == 0)
154         {
155             throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
156         }
157 
158         _virtualHostMBean = new VirtualHostMBean();
159 
160         _connectionRegistry = new ConnectionRegistry(this);
161 
162         _houseKeepingTimer = new Timer("Queue-housekeeping-"+_name, true);
163 
164         _queueRegistry = new DefaultQueueRegistry(this);
165         _exchangeFactory = new DefaultExchangeFactory(this);
166         _exchangeRegistry = new DefaultExchangeRegistry(this);
167 
168         if (transactionLog != null)
169         {
170             _transactionLog = transactionLog;
171             if (_transactionLog instanceof RoutingTable)
172             {
173                 _routingTable = (RoutingTable_transactionLog;
174             }
175         }
176         else
177         {
178             initialiseTransactionLog(hostConfig);
179             initialiseRoutingTable(hostConfig);
180         }
181 
182         _exchangeFactory.initialise(hostConfig);
183         _exchangeRegistry.initialise();
184 
185         initialiseModel(hostConfig);
186         
187         _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, hostConfig);
188 
189         _accessManager = ApplicationRegistry.getInstance().getAccessManager();
190         _accessManager.configureHostPlugins(hostConfig.getSecurityConfiguration());
191         
192         _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
193         _brokerMBean.register();
194         initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
195     }
196 
197     private void initialiseHouseKeeping(long period)
198     {
199         /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
200         if(period != 0L)
201         {
202             class RemoveExpiredMessagesTask extends TimerTask
203             {
204                 public void run()
205                 {
206                     for(AMQQueue q : _queueRegistry.getQueues())
207                     {
208 
209                         try
210                         {
211                             q.checkMessageStatus();
212                         }
213                         catch (AMQException e)
214                         {
215                             _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
216                             throw new RuntimeException(e);
217                         }
218                     }
219                 }
220             }
221 
222             _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
223                     period/2,
224                     period);
225         }
226     }
227 
228     //todo we need to move from store.class to transactionlog.class
229     private void initialiseTransactionLog(VirtualHostConfiguration configthrows Exception
230     {
231         String transactionLogClass = config.getTransactionLogClass();
232 
233         Class clazz = Class.forName(transactionLogClass);
234         Object o = clazz.newInstance();
235 
236         if (!(instanceof TransactionLog))
237         {
238             throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class ". Class " + clazz +
239                                          " does not.");
240         }
241         _transactionLog = (TransactionLogo;
242 
243         //Assign RoutingTable as old MessageStores converted to TransactionLog may require the _routingTable.
244         if (_transactionLog instanceof RoutingTable)
245         {
246             _routingTable = (RoutingTable)_transactionLog;
247         }
248 
249         _transactionLog.configure(this, "store", config);
250     }
251 
252     //todo we need to move from store.class to transactionlog.class
253     private void initialiseRoutingTable(VirtualHostConfiguration hostConfigthrows Exception
254     {
255         String transactionLogClass = hostConfig.getRoutingTableClass();
256 
257         if (transactionLogClass != null)
258         {
259             Class clazz = Class.forName(transactionLogClass);
260             Object o = clazz.newInstance();
261 
262             if (!(instanceof RoutingTable))
263             {
264                 throw new ClassCastException("RoutingTable class must implement " + RoutingTable.class ". Class " + clazz +
265                                              " does not.");
266             }
267             _routingTable = (RoutingTableo;
268             _routingTable.configure(this, "routingtable", hostConfig);
269         }
270         else
271         {
272             if (_routingTable == null)
273             {
274                 throw new RuntimeException("No Routing Table configured unable to startup.");
275             }
276         }
277     }
278     
279     private void initialiseModel(VirtualHostConfiguration configthrows ConfigurationException, AMQException
280     {
281         _logger.debug("Loading configuration for virtualhost: "+config.getName());
282 
283         List exchangeNames = config.getExchanges();
284 
285         for(Object exchangeNameObj : exchangeNames)
286         {
287             String exchangeName = String.valueOf(exchangeNameObj);
288             configureExchange(config.getExchangeConfiguration(exchangeName));
289         }
290 
291         String[] queueNames = config.getQueueNames();
292 
293         for(Object queueNameObj : queueNames)
294         {
295             String queueName = String.valueOf(queueNameObj);
296             configureQueue(config.getQueueConfiguration(queueName));
297         }
298     }
299 
300     private void configureExchange(ExchangeConfiguration exchangeConfigurationthrows AMQException
301     {
302         AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName());
303 
304         Exchange exchange;
305         exchange = _exchangeRegistry.getExchange(exchangeName);
306         if(exchange == null)
307         {
308 
309             AMQShortString type = new AMQShortString(exchangeConfiguration.getType());
310             boolean durable = exchangeConfiguration.getDurable();
311             boolean autodelete = exchangeConfiguration.getAutoDelete();
312 
313             Exchange newExchange = _exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0);
314             _exchangeRegistry.registerExchange(newExchange);
315         }
316     }
317 
318     private void configureQueue(QueueConfiguration queueConfigurationthrows AMQException, ConfigurationException
319     {
320         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
321 
322         if (queue.isDurable())
323         {
324             _routingTable.createQueue(queue);
325         }
326 
327         String exchangeName = queueConfiguration.getExchange();
328 
329         Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null null new AMQShortString(exchangeName));
330 
331         if(exchange == null)
332         {
333             exchange = _exchangeRegistry.getDefaultExchange();
334         }
335 
336         if (exchange == null)
337         {
338             throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
339         }
340 
341         List routingKeys = queueConfiguration.getRoutingKeys();
342         if(routingKeys == null || routingKeys.isEmpty())
343         {
344             routingKeys = Collections.singletonList(queue.getName());
345         }
346 
347         for(Object routingKeyNameObj : routingKeys)
348         {
349             AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
350             queue.bind(exchange, routingKey, null);
351             _logger.info("Queue '" + queue.getName() "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
352         }
353 
354         if(exchange != _exchangeRegistry.getDefaultExchange())
355         {
356             queue.bind(_exchangeRegistry.getDefaultExchange(), queue.getName()null);
357         }
358     }
359 
360     public String getName()
361     {
362         return _name;
363     }
364 
365     public QueueRegistry getQueueRegistry()
366     {
367         return _queueRegistry;
368     }
369 
370     public ExchangeRegistry getExchangeRegistry()
371     {
372         return _exchangeRegistry;
373     }
374 
375     public ExchangeFactory getExchangeFactory()
376     {
377         return _exchangeFactory;
378     }
379 
380     public ApplicationRegistry getApplicationRegistry()
381     {
382         throw new UnsupportedOperationException();
383     }
384 
385     public TransactionLog getTransactionLog()
386     {
387         return _transactionLog;
388     }
389 
390     public AuthenticationManager getAuthenticationManager()
391     {
392         return _authenticationManager;
393     }
394 
395     public ACLManager getAccessManager()
396     {
397         return _accessManager;
398     }                                                                   
399 
400     public void close() throws Exception
401     {
402 
403         //Stop Connections
404         _connectionRegistry.close();
405 
406         //Stop the Queues processing
407         if (_queueRegistry != null)
408         {
409             for (AMQQueue queue : _queueRegistry.getQueues())
410             {
411                 queue.stop();
412             }
413         }        
414 
415         //Stop Housekeeping
416         if (_houseKeepingTimer != null)
417         {
418             _houseKeepingTimer.cancel();
419         }        
420 
421         //Close TransactionLog
422         if (_transactionLog != null)
423         {
424             _transactionLog.close();
425         }
426     }
427 
428     public ManagedObject getBrokerMBean()
429     {
430         return _brokerMBean;
431     }
432 
433     public ManagedObject getManagedObject()
434     {
435         return _virtualHostMBean;
436     }
437 }