001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.virtualhost;
022
023 import java.util.Collections;
024 import java.util.List;
025 import java.util.Timer;
026 import java.util.TimerTask;
027
028 import javax.management.NotCompliantMBeanException;
029
030 import org.apache.commons.configuration.ConfigurationException;
031 import org.apache.log4j.Logger;
032 import org.apache.qpid.AMQException;
033 import org.apache.qpid.framing.AMQShortString;
034 import org.apache.qpid.server.AMQBrokerManagerMBean;
035 import org.apache.qpid.server.configuration.ExchangeConfiguration;
036 import org.apache.qpid.server.configuration.QueueConfiguration;
037 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
038 import org.apache.qpid.server.connection.ConnectionRegistry;
039 import org.apache.qpid.server.connection.IConnectionRegistry;
040 import org.apache.qpid.server.exchange.DefaultExchangeFactory;
041 import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
042 import org.apache.qpid.server.exchange.Exchange;
043 import org.apache.qpid.server.exchange.ExchangeFactory;
044 import org.apache.qpid.server.exchange.ExchangeRegistry;
045 import org.apache.qpid.server.management.AMQManagedObject;
046 import org.apache.qpid.server.management.ManagedObject;
047 import org.apache.qpid.server.queue.AMQQueue;
048 import org.apache.qpid.server.queue.AMQQueueFactory;
049 import org.apache.qpid.server.queue.DefaultQueueRegistry;
050 import org.apache.qpid.server.queue.QueueRegistry;
051 import org.apache.qpid.server.registry.ApplicationRegistry;
052 import org.apache.qpid.server.routing.RoutingTable;
053 import org.apache.qpid.server.security.access.ACLManager;
054 import org.apache.qpid.server.security.access.Accessable;
055 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
056 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
057 import org.apache.qpid.server.transactionlog.TransactionLog;
058
059 public class VirtualHost implements Accessable
060 {
061 private static final Logger _logger = Logger.getLogger(VirtualHost.class);
062
063
064 private final String _name;
065
066 private ConnectionRegistry _connectionRegistry;
067
068 private QueueRegistry _queueRegistry;
069
070 private ExchangeRegistry _exchangeRegistry;
071
072 private ExchangeFactory _exchangeFactory;
073
074 private TransactionLog _transactionLog;
075
076 private RoutingTable _routingTable;
077
078 protected VirtualHostMBean _virtualHostMBean;
079
080 private AMQBrokerManagerMBean _brokerMBean;
081
082 private AuthenticationManager _authenticationManager;
083
084 private ACLManager _accessManager;
085
086 private final Timer _houseKeepingTimer;
087
088 public void setAccessableName(String name)
089 {
090 _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
091 + name + ") ignored remains :" + getAccessableName());
092 }
093
094 public String getAccessableName()
095 {
096 return _name;
097 }
098
099 public IConnectionRegistry getConnectionRegistry()
100 {
101 return _connectionRegistry;
102 }
103
104 public RoutingTable getRoutingTable()
105 {
106 return _routingTable;
107 }
108
109 /**
110 * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
111 * implementaion of an Exchange MBean should extend this class.
112 */
113 public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
114 {
115 public VirtualHostMBean() throws NotCompliantMBeanException
116 {
117 super(ManagedVirtualHost.class, "VirtualHost");
118 }
119
120 public String getObjectInstanceName()
121 {
122 return _name.toString();
123 }
124
125 public String getName()
126 {
127 return _name.toString();
128 }
129
130 public VirtualHost getVirtualHost()
131 {
132 return VirtualHost.this;
133 }
134
135
136 } // End of MBean class
137
138 /**
139 * Normal Constructor
140 * @param name
141 * @param hostConfig
142 * @throws Exception
143 */
144 public VirtualHost(VirtualHostConfiguration hostConfig) throws Exception
145 {
146 this(hostConfig, null);
147 }
148
149 public VirtualHost(VirtualHostConfiguration hostConfig, TransactionLog transactionLog) throws Exception
150 {
151 _name = hostConfig.getName();
152
153 if (_name == null || _name.length() == 0)
154 {
155 throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
156 }
157
158 _virtualHostMBean = new VirtualHostMBean();
159
160 _connectionRegistry = new ConnectionRegistry(this);
161
162 _houseKeepingTimer = new Timer("Queue-housekeeping-"+_name, true);
163
164 _queueRegistry = new DefaultQueueRegistry(this);
165 _exchangeFactory = new DefaultExchangeFactory(this);
166 _exchangeRegistry = new DefaultExchangeRegistry(this);
167
168 if (transactionLog != null)
169 {
170 _transactionLog = transactionLog;
171 if (_transactionLog instanceof RoutingTable)
172 {
173 _routingTable = (RoutingTable) _transactionLog;
174 }
175 }
176 else
177 {
178 initialiseTransactionLog(hostConfig);
179 initialiseRoutingTable(hostConfig);
180 }
181
182 _exchangeFactory.initialise(hostConfig);
183 _exchangeRegistry.initialise();
184
185 initialiseModel(hostConfig);
186
187 _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, hostConfig);
188
189 _accessManager = ApplicationRegistry.getInstance().getAccessManager();
190 _accessManager.configureHostPlugins(hostConfig.getSecurityConfiguration());
191
192 _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
193 _brokerMBean.register();
194 initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
195 }
196
197 private void initialiseHouseKeeping(long period)
198 {
199 /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
200 if(period != 0L)
201 {
202 class RemoveExpiredMessagesTask extends TimerTask
203 {
204 public void run()
205 {
206 for(AMQQueue q : _queueRegistry.getQueues())
207 {
208
209 try
210 {
211 q.checkMessageStatus();
212 }
213 catch (AMQException e)
214 {
215 _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
216 throw new RuntimeException(e);
217 }
218 }
219 }
220 }
221
222 _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
223 period/2,
224 period);
225 }
226 }
227
228 //todo we need to move from store.class to transactionlog.class
229 private void initialiseTransactionLog(VirtualHostConfiguration config) throws Exception
230 {
231 String transactionLogClass = config.getTransactionLogClass();
232
233 Class clazz = Class.forName(transactionLogClass);
234 Object o = clazz.newInstance();
235
236 if (!(o instanceof TransactionLog))
237 {
238 throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz +
239 " does not.");
240 }
241 _transactionLog = (TransactionLog) o;
242
243 //Assign RoutingTable as old MessageStores converted to TransactionLog may require the _routingTable.
244 if (_transactionLog instanceof RoutingTable)
245 {
246 _routingTable = (RoutingTable)_transactionLog;
247 }
248
249 _transactionLog.configure(this, "store", config);
250 }
251
252 //todo we need to move from store.class to transactionlog.class
253 private void initialiseRoutingTable(VirtualHostConfiguration hostConfig) throws Exception
254 {
255 String transactionLogClass = hostConfig.getRoutingTableClass();
256
257 if (transactionLogClass != null)
258 {
259 Class clazz = Class.forName(transactionLogClass);
260 Object o = clazz.newInstance();
261
262 if (!(o instanceof RoutingTable))
263 {
264 throw new ClassCastException("RoutingTable class must implement " + RoutingTable.class + ". Class " + clazz +
265 " does not.");
266 }
267 _routingTable = (RoutingTable) o;
268 _routingTable.configure(this, "routingtable", hostConfig);
269 }
270 else
271 {
272 if (_routingTable == null)
273 {
274 throw new RuntimeException("No Routing Table configured unable to startup.");
275 }
276 }
277 }
278
279 private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
280 {
281 _logger.debug("Loading configuration for virtualhost: "+config.getName());
282
283 List exchangeNames = config.getExchanges();
284
285 for(Object exchangeNameObj : exchangeNames)
286 {
287 String exchangeName = String.valueOf(exchangeNameObj);
288 configureExchange(config.getExchangeConfiguration(exchangeName));
289 }
290
291 String[] queueNames = config.getQueueNames();
292
293 for(Object queueNameObj : queueNames)
294 {
295 String queueName = String.valueOf(queueNameObj);
296 configureQueue(config.getQueueConfiguration(queueName));
297 }
298 }
299
300 private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException
301 {
302 AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName());
303
304 Exchange exchange;
305 exchange = _exchangeRegistry.getExchange(exchangeName);
306 if(exchange == null)
307 {
308
309 AMQShortString type = new AMQShortString(exchangeConfiguration.getType());
310 boolean durable = exchangeConfiguration.getDurable();
311 boolean autodelete = exchangeConfiguration.getAutoDelete();
312
313 Exchange newExchange = _exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0);
314 _exchangeRegistry.registerExchange(newExchange);
315 }
316 }
317
318 private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
319 {
320 AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
321
322 if (queue.isDurable())
323 {
324 _routingTable.createQueue(queue);
325 }
326
327 String exchangeName = queueConfiguration.getExchange();
328
329 Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
330
331 if(exchange == null)
332 {
333 exchange = _exchangeRegistry.getDefaultExchange();
334 }
335
336 if (exchange == null)
337 {
338 throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
339 }
340
341 List routingKeys = queueConfiguration.getRoutingKeys();
342 if(routingKeys == null || routingKeys.isEmpty())
343 {
344 routingKeys = Collections.singletonList(queue.getName());
345 }
346
347 for(Object routingKeyNameObj : routingKeys)
348 {
349 AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
350 queue.bind(exchange, routingKey, null);
351 _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
352 }
353
354 if(exchange != _exchangeRegistry.getDefaultExchange())
355 {
356 queue.bind(_exchangeRegistry.getDefaultExchange(), queue.getName(), null);
357 }
358 }
359
360 public String getName()
361 {
362 return _name;
363 }
364
365 public QueueRegistry getQueueRegistry()
366 {
367 return _queueRegistry;
368 }
369
370 public ExchangeRegistry getExchangeRegistry()
371 {
372 return _exchangeRegistry;
373 }
374
375 public ExchangeFactory getExchangeFactory()
376 {
377 return _exchangeFactory;
378 }
379
380 public ApplicationRegistry getApplicationRegistry()
381 {
382 throw new UnsupportedOperationException();
383 }
384
385 public TransactionLog getTransactionLog()
386 {
387 return _transactionLog;
388 }
389
390 public AuthenticationManager getAuthenticationManager()
391 {
392 return _authenticationManager;
393 }
394
395 public ACLManager getAccessManager()
396 {
397 return _accessManager;
398 }
399
400 public void close() throws Exception
401 {
402
403 //Stop Connections
404 _connectionRegistry.close();
405
406 //Stop the Queues processing
407 if (_queueRegistry != null)
408 {
409 for (AMQQueue queue : _queueRegistry.getQueues())
410 {
411 queue.stop();
412 }
413 }
414
415 //Stop Housekeeping
416 if (_houseKeepingTimer != null)
417 {
418 _houseKeepingTimer.cancel();
419 }
420
421 //Close TransactionLog
422 if (_transactionLog != null)
423 {
424 _transactionLog.close();
425 }
426 }
427
428 public ManagedObject getBrokerMBean()
429 {
430 return _brokerMBean;
431 }
432
433 public ManagedObject getManagedObject()
434 {
435 return _virtualHostMBean;
436 }
437 }
|