001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.management.configuration;
022
023 import java.util.HashMap;
024 import java.util.Map;
025 import java.util.UUID;
026
027 import org.apache.commons.pool.BasePoolableObjectFactory;
028 import org.apache.commons.pool.ObjectPool;
029 import org.apache.commons.pool.impl.GenericObjectPool;
030 import org.apache.commons.pool.impl.GenericObjectPoolFactory;
031 import org.apache.qpid.management.Messages;
032 import org.apache.qpid.transport.Connection;
033 import org.apache.qpid.transport.ConnectionException;
034 import org.apache.qpid.transport.util.Logger;
035
036 /**
037 * Qpid datasource.
038 * Basically it is a connection pool manager used for optimizing broker connections usage.
039 *
040 * @author Andrea Gazzarini
041 */
042 public final class QpidDatasource
043 {
044 private final static Logger LOGGER = Logger.get(QpidDatasource.class);
045
046 /**
047 * A connection decorator used for adding pool interaction behaviour to an existing connection.
048 *
049 * @author Andrea Gazzarini
050 */
051 class PooledConnection extends Connection
052 {
053 private final UUID _brokerId;
054 private boolean _valid;
055
056 /**
057 * Builds a new decorator with the given connection.
058 *
059 * @param brokerId the broker identifier.
060 */
061 private PooledConnection(UUID brokerId)
062 {
063 this._brokerId = brokerId;
064 _valid = true;
065 }
066
067 /**
068 * Returns true if the underlying connection is still valid and can be used.
069 *
070 * @return true if the underlying connection is still valid and can be used.
071 */
072 boolean isValid()
073 {
074 return _valid;
075 }
076
077 void reallyClose()
078 {
079 super.close();
080 }
081
082 /**
083 * Returns the connection to the pool. That is, marks this connections as available.
084 * After that, this connection will be available for further operations.
085 */
086 public void close()
087 {
088 try
089 {
090 pools.get(_brokerId).returnObject(this);
091
092 LOGGER.debug(Messages.QMAN_200006_QPID_CONNECTION_RELEASED, this);
093 }
094 catch (Exception e)
095 {
096 throw new ConnectionException(e);
097 }
098 }
099
100 public void exception(Throwable t)
101 {
102 //super.exception(t);
103 _valid = false;
104 }
105 }
106
107 /**
108 * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of
109 * the broker connection(s).
110 *
111 * @author Andrea Gazzarini
112 */
113 class QpidConnectionFactory extends BasePoolableObjectFactory
114 {
115 private final BrokerConnectionData _connectionData;
116 private final UUID _brokerId;
117
118 /**
119 * Builds a new connection factory with the given parameters.
120 *
121 * @param brokerId the broker identifier.
122 * @param connectionData the connecton data.
123 */
124 private QpidConnectionFactory(UUID brokerId, BrokerConnectionData connectionData)
125 {
126 this._connectionData = connectionData;
127 this._brokerId = brokerId;
128 }
129
130 /**
131 * Creates a new underlying connection.
132 */
133 @Override
134 public Connection makeObject () throws Exception
135 {
136 PooledConnection connection = new PooledConnection(_brokerId);
137 connection.connect(
138 _connectionData.getHost(),
139 _connectionData.getPort(),
140 _connectionData.getVirtualHost(),
141 _connectionData.getUsername(),
142 _connectionData.getPassword(),
143 false);
144 return connection;
145 }
146
147 /**
148 * Validates the underlying connection.
149 */
150 @Override
151 public boolean validateObject (Object obj)
152 {
153 PooledConnection connection = (PooledConnection) obj;
154 boolean isValid = connection.isValid();
155
156 LOGGER.debug(Messages.QMAN_200007_TEST_CONNECTION_ON_RESERVE,isValid);
157
158 return isValid;
159 }
160
161 /**
162 * Closes the underlying connection.
163 */
164 @Override
165 public void destroyObject (Object obj) throws Exception
166 {
167 try
168 {
169 PooledConnection connection = (PooledConnection) obj;
170 connection.reallyClose();
171
172 LOGGER.debug(Messages.QMAN_200008_CONNECTION_DESTROYED);
173 } catch (Exception exception)
174 {
175 LOGGER.debug(exception, Messages.QMAN_200009_CONNECTION_DESTROY_FAILURE);
176 }
177 }
178 }
179
180 // Singleton instance.
181 private static QpidDatasource instance = new QpidDatasource();
182
183 // Each entry contains a connection pool for a specific broker.
184 private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>();
185
186 // Private constructor.
187 private QpidDatasource()
188 {
189 }
190
191 /**
192 * Gets an available connection from the pool of the given broker.
193 *
194 * @param brokerId the broker identifier.
195 * @return a valid connection to the broker associated with the given identifier.
196 */
197 public Connection getConnection(UUID brokerId) throws Exception
198 {
199 return (Connection) pools.get(brokerId).borrowObject();
200 }
201
202 /**
203 * Entry point method for retrieving the singleton instance of this datasource.
204 *
205 * @return the qpid datasource singleton instance.
206 */
207 public static QpidDatasource getInstance()
208 {
209 return instance;
210 }
211
212 /**
213 * Adds a connection pool to this datasource.
214 *
215 * @param brokerId the broker identifier that will be associated with the new connection pool.
216 * @param connectionData the broker connection data.
217 * @throws Exception when the pool cannot be created.
218 */
219 void addConnectionPool(UUID brokerId,BrokerConnectionData connectionData) throws Exception
220 {
221 GenericObjectPoolFactory factory = new GenericObjectPoolFactory(
222 new QpidConnectionFactory(brokerId,connectionData),
223 connectionData.getMaxPoolCapacity(),
224 GenericObjectPool.WHEN_EXHAUSTED_BLOCK,
225 connectionData.getMaxWaitTimeout(),-1,
226 true,
227 false);
228
229 ObjectPool pool = factory.createPool();
230
231 // Open connections at startup according to initial capacity param value.
232 int howManyConnectionAtStartup = connectionData.getInitialPoolCapacity();
233 Object [] openStartupList = new Object[howManyConnectionAtStartup];
234
235 // Open...
236 for (int index = 0; index < howManyConnectionAtStartup; index++)
237 {
238 openStartupList[index] = pool.borrowObject();
239 }
240
241 // ...and immediately return them to pool. In this way the pooled connection has been opened.
242 for (int index = 0; index < howManyConnectionAtStartup; index++)
243 {
244 pool.returnObject(openStartupList[index]);
245 }
246
247 pools.put(brokerId,pool);
248 }
249 }
|