001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 /*
022 *
023 * Copyright (c) 2006 The Apache Software Foundation
024 *
025 * Licensed under the Apache License, Version 2.0 (the "License");
026 * you may not use this file except in compliance with the License.
027 * You may obtain a copy of the License at
028 *
029 * http://www.apache.org/licenses/LICENSE-2.0
030 *
031 * Unless required by applicable law or agreed to in writing, software
032 * distributed under the License is distributed on an "AS IS" BASIS,
033 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
034 * See the License for the specific language governing permissions and
035 * limitations under the License.
036 *
037 */
038 package org.apache.qpid.server;
039
040 import javax.management.JMException;
041 import javax.management.MBeanException;
042 import javax.management.MalformedObjectNameException;
043 import javax.management.ObjectName;
044
045 import org.apache.qpid.AMQException;
046 import org.apache.qpid.framing.AMQShortString;
047 import org.apache.qpid.server.exchange.Exchange;
048 import org.apache.qpid.server.exchange.ExchangeFactory;
049 import org.apache.qpid.server.exchange.ExchangeRegistry;
050 import org.apache.qpid.server.management.AMQManagedObject;
051 import org.apache.qpid.server.management.MBeanConstructor;
052 import org.apache.qpid.server.management.MBeanDescription;
053 import org.apache.qpid.server.management.ManagedBroker;
054 import org.apache.qpid.server.management.ManagedObject;
055 import org.apache.qpid.server.queue.AMQQueue;
056 import org.apache.qpid.server.queue.AMQQueueFactory;
057 import org.apache.qpid.server.queue.QueueRegistry;
058 import org.apache.qpid.server.virtualhost.VirtualHost;
059 import org.apache.qpid.server.transactionlog.TransactionLog;
060 import org.apache.qpid.server.routing.RoutingTable;
061
062 /**
063 * This MBean implements the broker management interface and exposes the
064 * Broker level management features like creating and deleting exchanges and queue.
065 */
066 @MBeanDescription("This MBean exposes the broker level management features")
067 public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBroker
068 {
069 private final QueueRegistry _queueRegistry;
070 private final ExchangeRegistry _exchangeRegistry;
071 private final ExchangeFactory _exchangeFactory;
072 private final TransactionLog _tranasctionLog;
073 private final RoutingTable _routingTable;
074
075 private final VirtualHost.VirtualHostMBean _virtualHostMBean;
076
077 @MBeanConstructor("Creates the Broker Manager MBean")
078 public AMQBrokerManagerMBean(VirtualHost.VirtualHostMBean virtualHostMBean) throws JMException
079 {
080 super(ManagedBroker.class, ManagedBroker.TYPE);
081
082 _virtualHostMBean = virtualHostMBean;
083 VirtualHost virtualHost = virtualHostMBean.getVirtualHost();
084
085 _queueRegistry = virtualHost.getQueueRegistry();
086 _exchangeRegistry = virtualHost.getExchangeRegistry();
087 _tranasctionLog = virtualHost.getTransactionLog();
088 _exchangeFactory = virtualHost.getExchangeFactory();
089 _routingTable = virtualHost.getRoutingTable();
090 }
091
092 public String getObjectInstanceName()
093 {
094 return _virtualHostMBean.getVirtualHost().getName();
095 }
096
097 /**
098 * Creates new exchange and registers it with the registry.
099 *
100 * @param exchangeName
101 * @param type
102 * @param durable
103 * @throws JMException
104 */
105 public void createNewExchange(String exchangeName, String type, boolean durable) throws JMException
106 {
107 try
108 {
109 synchronized (_exchangeRegistry)
110 {
111 Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName));
112 if (exchange == null)
113 {
114 exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type),
115 durable, false, 0);
116 _exchangeRegistry.registerExchange(exchange);
117 }
118 else
119 {
120 throw new JMException("The exchange \"" + exchangeName + "\" already exists.");
121 }
122 }
123 }
124 catch (AMQException ex)
125 {
126 throw new MBeanException(ex, "Error in creating exchange " + exchangeName);
127 }
128 }
129
130 /**
131 * Unregisters the exchange from registry.
132 *
133 * @param exchangeName
134 * @throws JMException
135 */
136 public void unregisterExchange(String exchangeName) throws JMException
137 {
138 // TODO
139 // Check if the exchange is in use.
140 // boolean inUse = false;
141 // Check if there are queue-bindings with the exchange and unregister
142 // when there are no bindings.
143 try
144 {
145 _exchangeRegistry.unregisterExchange(new AMQShortString(exchangeName), false);
146 }
147 catch (AMQException ex)
148 {
149 throw new MBeanException(ex, "Error in unregistering exchange " + exchangeName);
150 }
151 }
152
153 /**
154 * Creates a new queue and registers it with the registry and puts it
155 * in persistance storage if durable queue.
156 *
157 * @param queueName
158 * @param durable
159 * @param owner
160 * @throws JMException
161 */
162 public void createNewQueue(String queueName, String owner, boolean durable) throws JMException
163 {
164 AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
165 if (queue != null)
166 {
167 throw new JMException("The queue \"" + queueName + "\" already exists.");
168 }
169
170 try
171 {
172 AMQShortString ownerShortString = null;
173 if (owner != null)
174 {
175 ownerShortString = new AMQShortString(owner);
176 }
177
178 queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost(),
179 null);
180 if (queue.isDurable() && !queue.isAutoDelete())
181 {
182 _routingTable.createQueue(queue);
183 }
184
185 _queueRegistry.registerQueue(queue);
186 }
187 catch (AMQException ex)
188 {
189 JMException jme = new JMException(ex.getMessage());
190 jme.initCause(ex);
191 throw new MBeanException(jme, "Error in creating queue " + queueName);
192 }
193 }
194
195 private VirtualHost getVirtualHost()
196 {
197 return _virtualHostMBean.getVirtualHost();
198 }
199
200 /**
201 * Deletes the queue from queue registry and persistant storage.
202 *
203 * @param queueName
204 * @throws JMException
205 */
206 public void deleteQueue(String queueName) throws JMException
207 {
208 AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
209 if (queue == null)
210 {
211 throw new JMException("The Queue " + queueName + " is not a registerd queue.");
212 }
213
214 try
215 {
216 queue.delete();
217 _routingTable.removeQueue(queue);
218 }
219 catch (AMQException ex)
220 {
221 JMException jme = new JMException(ex.getMessage());
222 jme.initCause(ex);
223 throw new MBeanException(jme, "Error in deleting queue " + queueName);
224 }
225 }
226
227 public ManagedObject getParentObject()
228 {
229 return _virtualHostMBean;
230 }
231
232 // This will have a single instance for a virtual host, so not having the name property in the ObjectName
233 public ObjectName getObjectName() throws MalformedObjectNameException
234 {
235 return getObjectNameForSingleInstanceMBean();
236 }
237 } // End of MBean class
|