ReferenceCountingExecutorService.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.pool;
022 
023 import java.util.concurrent.ExecutorService;
024 import java.util.concurrent.Executors;
025 import java.util.concurrent.ThreadPoolExecutor;
026 import java.util.concurrent.TimeUnit;
027 import java.util.concurrent.LinkedBlockingQueue;
028 
029 /**
030  * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
031  * the references taken, instantiating the service on the first reference, and shutting it down when the last
032  * reference is released.
033  *
034  <p/>It is important to ensure that an executor service is correctly shut down as failing to do so prevents the JVM
035  * from terminating due to the existence of non-daemon threads.
036  *
037  <p/><table id="crc><caption>CRC Card</caption>
038  <tr><th> Responsibilities <th> Collaborations
039  <tr><td> Provide a shared exector service. <td> {@link Executors}
040  <tr><td> Shutdown the executor service when not needed. <td> {@link ExecutorService}
041  <tr><td> Track references to the executor service.
042  <tr><td> Provide configuration of the executor service.
043  </table>
044  *
045  * @todo Might be more elegant to make this actually implement ExecutorService, providing better hiding of the
046  *       implementation details. Also this class introduces a pattern (albeit specific to this usage) that could be
047  *       generalized to reference count anything. That is, on first instance call a create method, on release of last
048  *       instance call a destroy method. This could definitely be abstracted out as a re-usable piece of code; a
049  *       reference counting factory. It could then be re-used to do reference counting in other places (such as
050  *       messages). Countable objects have a simple create/destroy life cycle, capturable by an interface that the
051  *       ref counting factory can call to manage the lifecycle.
052  *
053  * @todo {@link #_poolSize} should be static?
054  *
055  * @todo The {@link #getPool()} method breaks the encapsulation of the reference counter. Generally when getPool is used
056  *       further checks are applied to ensure that the exector service has not been shutdown. This passes responsibility
057  *       for managing the lifecycle of the reference counted object onto the caller rather than neatly encapsulating it
058  *       here. Could think about adding more state to the lifecycle, to mark ref counted objects as invalid, and have an
059  *       isValid method, or could make calling code deal with RejectedExecutionException raised by shutdown executors.
060  */
061 public class ReferenceCountingExecutorService
062 {
063     /** Defines the smallest thread pool that will be allocated, irrespective of the number of processors. */
064     private static final int MINIMUM_POOL_SIZE = 4;
065 
066     /** Holds the number of processors on the machine. */
067     private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
068 
069     /** Defines the thread pool size to use, which is the larger of the number of CPUs or the minimum size. */
070     private static final int DEFAULT_POOL_SIZE = Math.max(NUM_CPUS, MINIMUM_POOL_SIZE);
071 
072     /**
073      * Holds the singleton instance of this reference counter. This is only created once, statically, so the
074      {@link #getInstance()} method does not need to be synchronized.
075      */
076     private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService();
077 
078     /** This lock is used to ensure that reference counts are updated atomically with create/destroy operations. */
079     private final Object _lock = new Object();
080 
081     /** The shared executor service that is reference counted. */
082     private ExecutorService _pool;
083 
084     /** Holds the number of references given out to the executor service. */
085     private int _refCount = 0;
086 
087     /** Holds the number of executor threads to create. */
088     private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
089 
090     private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
091 
092     /**
093      * Retrieves the singleton instance of this reference counter.
094      *
095      @return The singleton instance of this reference counter.
096      */
097     public static ReferenceCountingExecutorService getInstance()
098     {
099         return _instance;
100     }
101 
102     /**
103      * Private constructor to ensure that only a singleton instance can be created.
104      */
105     private ReferenceCountingExecutorService()
106     { }
107 
108     /**
109      * Provides a reference to a shared executor service, incrementing the reference count.
110      *
111      @return An executor service.
112      */
113     public ExecutorService acquireExecutorService()
114     {
115         synchronized (_lock)
116         {
117             if (_refCount++ == 0)
118             {
119 //                _pool = Executors.newFixedThreadPool(_poolSize);
120 
121                 // Use a job queue that biases to writes
122                 if(_useBiasedPool)
123                 {
124                     _pool =  new ThreadPoolExecutor(_poolSize, _poolSize,
125                                           0L, TimeUnit.MILLISECONDS,
126                                           new ReadWriteJobQueue());
127                 }
128                 else
129                 {
130                     _pool = Executors.newFixedThreadPool(_poolSize);
131                 }
132             }
133 
134 
135             return _pool;
136         }
137     }
138 
139     /**
140      * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls
141      * to zero, the executor service is shut down.
142      */
143     public void releaseExecutorService()
144     {
145         synchronized (_lock)
146         {
147             if (--_refCount == 0)
148             {
149                 _pool.shutdownNow();
150             }
151         }
152     }
153 
154     /**
155      * Provides access to the executor service, without touching the reference count.
156      *
157      @return The shared executor service, or <tt>null</tt> if none has been instantiated yet.
158      */
159     public ExecutorService getPool()
160     {
161         return _pool;
162     }
163 
164     /**
165      * Return the ReferenceCount to this ExecutorService
166      @return reference count
167      */
168     public int getReferenceCount()
169     {
170         return _refCount;
171     }
172 }