QpidDatasource.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.management.configuration;
022 
023 import java.util.HashMap;
024 import java.util.Map;
025 import java.util.UUID;
026 
027 import org.apache.commons.pool.BasePoolableObjectFactory;
028 import org.apache.commons.pool.ObjectPool;
029 import org.apache.commons.pool.impl.GenericObjectPool;
030 import org.apache.commons.pool.impl.GenericObjectPoolFactory;
031 import org.apache.qpid.management.Messages;
032 import org.apache.qpid.transport.Connection;
033 import org.apache.qpid.transport.ConnectionException;
034 import org.apache.qpid.transport.util.Logger;
035 
036 /**
037  * Qpid datasource.
038  * Basically it is a connection pool manager used for optimizing broker connections usage.
039  *
040  @author Andrea Gazzarini
041  */
042 public final class QpidDatasource
043 {
044     private final static Logger LOGGER = Logger.get(QpidDatasource.class);
045 
046     /**
047      * A connection decorator used for adding pool interaction behaviour to an existing connection.
048      *
049      @author Andrea Gazzarini
050      */
051     class PooledConnection extends Connection
052     {
053         private final UUID _brokerId;
054         private boolean _valid;
055 
056         /**
057          * Builds a new decorator with the given connection.
058          *
059          @param brokerId the broker identifier.
060          */
061         private PooledConnection(UUID brokerId)
062         {
063             this._brokerId = brokerId;
064             _valid = true;
065         }
066 
067         /**
068          * Returns true if the underlying connection is still valid and can be used.
069          *
070          @return true if the underlying connection is still valid and can be used.
071          */
072         boolean isValid()
073         {
074             return _valid;
075         }
076 
077         void reallyClose()
078         {
079             super.close();
080         }
081 
082         /**
083          * Returns the connection to the pool. That is, marks this connections as available.
084          * After that, this connection will be available for further operations.
085          */
086         public void close()
087         {
088             try
089             {
090                 pools.get(_brokerId).returnObject(this);
091                 
092                 LOGGER.debug(Messages.QMAN_200006_QPID_CONNECTION_RELEASED, this);
093             }
094             catch (Exception e)
095             {
096                 throw new ConnectionException(e);
097             }
098         }
099 
100         public void exception(Throwable t)
101         {
102             //super.exception(t);
103             _valid = false;
104         }
105     }
106 
107     /**
108      * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of
109      * the broker connection(s).
110      *
111      @author Andrea Gazzarini
112      */
113     class QpidConnectionFactory extends BasePoolableObjectFactory
114     {
115         private final BrokerConnectionData _connectionData;
116         private final UUID _brokerId;
117 
118         /**
119          * Builds a new connection factory with the given parameters.
120          *
121          @param brokerId the broker identifier.
122          @param connectionData the connecton data.
123          */
124         private QpidConnectionFactory(UUID brokerId, BrokerConnectionData connectionData)
125         {
126             this._connectionData = connectionData;
127             this._brokerId = brokerId;
128         }
129 
130         /**
131          * Creates a new underlying connection.
132          */
133         @Override
134         public Connection makeObject () throws Exception
135         {
136           PooledConnection connection = new PooledConnection(_brokerId);
137             connection.connect(
138                     _connectionData.getHost(),
139                     _connectionData.getPort(),
140                     _connectionData.getVirtualHost(),
141                     _connectionData.getUsername(),
142                     _connectionData.getPassword(),
143         false);
144             return connection;
145         }
146 
147         /**
148          * Validates the underlying connection.
149          */
150         @Override
151         public boolean validateObject (Object obj)
152         {
153             PooledConnection connection = (PooledConnectionobj;
154             boolean isValid = connection.isValid();
155 
156             LOGGER.debug(Messages.QMAN_200007_TEST_CONNECTION_ON_RESERVE,isValid);
157             
158             return isValid;
159         }
160 
161         /**
162          * Closes the underlying connection.
163          */
164         @Override
165         public void destroyObject (Object objthrows Exception
166         {
167             try
168             {
169                 PooledConnection connection = (PooledConnectionobj;
170                 connection.reallyClose();
171                 
172                 LOGGER.debug(Messages.QMAN_200008_CONNECTION_DESTROYED);
173             catch (Exception exception)
174             {
175                 LOGGER.debug(exception, Messages.QMAN_200009_CONNECTION_DESTROY_FAILURE);
176             }
177         }
178     }
179 
180     // Singleton instance.
181     private static QpidDatasource instance = new QpidDatasource();
182 
183     // Each entry contains a connection pool for a specific broker.
184     private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>();
185 
186     // Private constructor.
187     private QpidDatasource()
188     {
189     }
190 
191     /**
192      * Gets an available connection from the pool of the given broker.
193      *
194      @param brokerId the broker identifier.
195      @return a valid connection to the broker associated with the given identifier.
196      */
197     public Connection getConnection(UUID brokerIdthrows Exception
198     {
199         return (Connectionpools.get(brokerId).borrowObject();
200     }
201 
202     /**
203      * Entry point method for retrieving the singleton instance of this datasource.
204      *
205      @return the qpid datasource singleton instance.
206      */
207     public static QpidDatasource getInstance()
208     {
209         return instance;
210     }
211 
212     /**
213      * Adds a connection pool to this datasource.
214      
215      @param brokerId the broker identifier that will be associated with the new connection pool.
216      @param connectionData the broker connection data.
217      @throws Exception when the pool cannot be created.
218      */
219     void addConnectionPool(UUID brokerId,BrokerConnectionData connectionDatathrows Exception 
220     {
221         GenericObjectPoolFactory factory = new GenericObjectPoolFactory(
222                 new QpidConnectionFactory(brokerId,connectionData),
223                 connectionData.getMaxPoolCapacity(),
224                 GenericObjectPool.WHEN_EXHAUSTED_BLOCK,
225                 connectionData.getMaxWaitTimeout(),-1,
226                 true,
227                 false);
228     
229         ObjectPool pool = factory.createPool();
230 
231         // Open connections at startup according to initial capacity param value.
232         int howManyConnectionAtStartup = connectionData.getInitialPoolCapacity()
233         Object [] openStartupList = new Object[howManyConnectionAtStartup];
234         
235         // Open...
236         for (int index  = 0; index < howManyConnectionAtStartup; index++)
237         {
238             openStartupList[index= pool.borrowObject();
239         }
240         
241         // ...and immediately return them to pool. In this way the pooled connection has been opened.
242         for (int index = 0; index < howManyConnectionAtStartup; index++)
243         {
244             pool.returnObject(openStartupList[index]);
245         }
246 
247         pools.put(brokerId,pool);
248     }
249 }