001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.pool;
022
023 import java.util.concurrent.ExecutorService;
024 import java.util.concurrent.Executors;
025 import java.util.concurrent.ThreadPoolExecutor;
026 import java.util.concurrent.TimeUnit;
027 import java.util.concurrent.LinkedBlockingQueue;
028
029 /**
030 * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
031 * the references taken, instantiating the service on the first reference, and shutting it down when the last
032 * reference is released.
033 *
034 * <p/>It is important to ensure that an executor service is correctly shut down as failing to do so prevents the JVM
035 * from terminating due to the existence of non-daemon threads.
036 *
037 * <p/><table id="crc><caption>CRC Card</caption>
038 * <tr><th> Responsibilities <th> Collaborations
039 * <tr><td> Provide a shared exector service. <td> {@link Executors}
040 * <tr><td> Shutdown the executor service when not needed. <td> {@link ExecutorService}
041 * <tr><td> Track references to the executor service.
042 * <tr><td> Provide configuration of the executor service.
043 * </table>
044 *
045 * @todo Might be more elegant to make this actually implement ExecutorService, providing better hiding of the
046 * implementation details. Also this class introduces a pattern (albeit specific to this usage) that could be
047 * generalized to reference count anything. That is, on first instance call a create method, on release of last
048 * instance call a destroy method. This could definitely be abstracted out as a re-usable piece of code; a
049 * reference counting factory. It could then be re-used to do reference counting in other places (such as
050 * messages). Countable objects have a simple create/destroy life cycle, capturable by an interface that the
051 * ref counting factory can call to manage the lifecycle.
052 *
053 * @todo {@link #_poolSize} should be static?
054 *
055 * @todo The {@link #getPool()} method breaks the encapsulation of the reference counter. Generally when getPool is used
056 * further checks are applied to ensure that the exector service has not been shutdown. This passes responsibility
057 * for managing the lifecycle of the reference counted object onto the caller rather than neatly encapsulating it
058 * here. Could think about adding more state to the lifecycle, to mark ref counted objects as invalid, and have an
059 * isValid method, or could make calling code deal with RejectedExecutionException raised by shutdown executors.
060 */
061 public class ReferenceCountingExecutorService
062 {
063 /** Defines the smallest thread pool that will be allocated, irrespective of the number of processors. */
064 private static final int MINIMUM_POOL_SIZE = 4;
065
066 /** Holds the number of processors on the machine. */
067 private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
068
069 /** Defines the thread pool size to use, which is the larger of the number of CPUs or the minimum size. */
070 private static final int DEFAULT_POOL_SIZE = Math.max(NUM_CPUS, MINIMUM_POOL_SIZE);
071
072 /**
073 * Holds the singleton instance of this reference counter. This is only created once, statically, so the
074 * {@link #getInstance()} method does not need to be synchronized.
075 */
076 private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService();
077
078 /** This lock is used to ensure that reference counts are updated atomically with create/destroy operations. */
079 private final Object _lock = new Object();
080
081 /** The shared executor service that is reference counted. */
082 private ExecutorService _pool;
083
084 /** Holds the number of references given out to the executor service. */
085 private int _refCount = 0;
086
087 /** Holds the number of executor threads to create. */
088 private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
089
090 private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
091
092 /**
093 * Retrieves the singleton instance of this reference counter.
094 *
095 * @return The singleton instance of this reference counter.
096 */
097 public static ReferenceCountingExecutorService getInstance()
098 {
099 return _instance;
100 }
101
102 /**
103 * Private constructor to ensure that only a singleton instance can be created.
104 */
105 private ReferenceCountingExecutorService()
106 { }
107
108 /**
109 * Provides a reference to a shared executor service, incrementing the reference count.
110 *
111 * @return An executor service.
112 */
113 public ExecutorService acquireExecutorService()
114 {
115 synchronized (_lock)
116 {
117 if (_refCount++ == 0)
118 {
119 // _pool = Executors.newFixedThreadPool(_poolSize);
120
121 // Use a job queue that biases to writes
122 if(_useBiasedPool)
123 {
124 _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
125 0L, TimeUnit.MILLISECONDS,
126 new ReadWriteJobQueue());
127 }
128 else
129 {
130 _pool = Executors.newFixedThreadPool(_poolSize);
131 }
132 }
133
134
135 return _pool;
136 }
137 }
138
139 /**
140 * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls
141 * to zero, the executor service is shut down.
142 */
143 public void releaseExecutorService()
144 {
145 synchronized (_lock)
146 {
147 if (--_refCount == 0)
148 {
149 _pool.shutdownNow();
150 }
151 }
152 }
153
154 /**
155 * Provides access to the executor service, without touching the reference count.
156 *
157 * @return The shared executor service, or <tt>null</tt> if none has been instantiated yet.
158 */
159 public ExecutorService getPool()
160 {
161 return _pool;
162 }
163
164 /**
165 * Return the ReferenceCount to this ExecutorService
166 * @return reference count
167 */
168 public int getReferenceCount()
169 {
170 return _refCount;
171 }
172 }
|