001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.pool;
022
023 import java.util.concurrent.ConcurrentLinkedQueue;
024 import java.util.concurrent.atomic.AtomicBoolean;
025
026 import org.apache.mina.common.IoSession;
027
028 /**
029 * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
030 * The {@link Event}s themselves provide methods to process themselves, so processing a job simply consists of sequentially
031 * processing all of its aggregated events.
032 *
033 * The constructor accepts a maximum number of events for the job, and only runs up to that maximum number when
034 * processing the job, but the add method does not enforce this maximum. In other words, not all the enqueued events
035 * may be processed in each run of the job, several runs may be required to clear the queue.
036 *
037 * <p/><table id="crc"><caption>CRC Card</caption>
038 * <tr><th> Responsibilities <th> Collaborations
039 * <tr><td> Aggregate many coninuations together into a single continuation.
040 * <tr><td> Sequentially process aggregated continuations. <td> {@link Event}
041 * <tr><td> Provide running and completion status of the aggregate continuation.
042 * <tr><td> Execute a terminal continuation upon job completion. <td> {@link JobCompletionHandler}
043 * </table>
044 *
045 * @todo Could make Job implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as a
046 * continuation. Job is a continuation that aggregates other continuations and as such is a usefull re-usable
047 * piece of code. There may be other palces than the mina filter chain where continuation batching is used within
048 * qpid, so abstracting this out could provide a usefull building block. This also opens the way to different
049 * kinds of job with a common interface, e.g. parallel or sequential jobs etc.
050 *
051 * @todo For better re-usability could make the completion handler optional. Only run it when one is set.
052 */
053 public class Job implements ReadWriteRunnable
054 {
055 /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
056 private final int _maxEvents;
057
058 /** The Mina session. */
059 private final IoSession _session;
060
061 /** Holds the queue of events that make up the job. */
062 private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
063
064 /** Holds a status flag, that indicates when the job is actively running. */
065 private final AtomicBoolean _active = new AtomicBoolean();
066
067 /** Holds the completion continuation, called upon completion of a run of the job. */
068 private final JobCompletionHandler _completionHandler;
069
070 private final boolean _readJob;
071
072 /**
073 * Creates a new job that aggregates many continuations together.
074 *
075 * @param session The Mina session.
076 * @param completionHandler The per job run, terminal continuation.
077 * @param maxEvents The maximum number of aggregated continuations to process per run of the job.
078 * @param readJob
079 */
080 Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
081 {
082 _session = session;
083 _completionHandler = completionHandler;
084 _maxEvents = maxEvents;
085 _readJob = readJob;
086 }
087
088 /**
089 * Enqueus a continuation for sequential processing by this job.
090 *
091 * @param evt The continuation to enqueue.
092 */
093 void add(Event evt)
094 {
095 _eventQueue.add(evt);
096 }
097
098 /**
099 * Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job.
100 */
101 boolean processAll()
102 {
103 // limit the number of events processed in one run
104 int i = _maxEvents;
105 while( --i != 0 )
106 {
107 Event e = _eventQueue.poll();
108 if (e == null)
109 {
110 return true;
111 }
112 else
113 {
114 e.process(_session);
115 }
116 }
117 return false;
118 }
119
120 /**
121 * Tests if there are no more enqueued continuations to process.
122 *
123 * @return <tt>true</tt> if there are no enqueued continuations in this job, <tt>false</tt> otherwise.
124 */
125 public boolean isComplete()
126 {
127 return _eventQueue.peek() == null;
128 }
129
130 /**
131 * Marks this job as active if it is inactive. This method is thread safe.
132 *
133 * @return <tt>true</tt> if this job was inactive and has now been marked as active, <tt>false</tt> otherwise.
134 */
135 public boolean activate()
136 {
137 return _active.compareAndSet(false, true);
138 }
139
140 /**
141 * Marks this job as inactive. This method is thread safe.
142 */
143 public void deactivate()
144 {
145 _active.set(false);
146 }
147
148 /**
149 * Processes a batch of aggregated continuations, marks this job as inactive and call the terminal continuation.
150 */
151 public void run()
152 {
153 if(processAll())
154 {
155 deactivate();
156 _completionHandler.completed(_session, this);
157 }
158 else
159 {
160 _completionHandler.notCompleted(_session, this);
161 }
162 }
163
164 public boolean isReadJob()
165 {
166 return _readJob;
167 }
168
169 public boolean isRead()
170 {
171 return _readJob;
172 }
173
174 public boolean isWrite()
175 {
176 return !_readJob;
177 }
178
179
180 /**
181 * Another interface for a continuation.
182 *
183 * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask,
184 * Runnable or a custom Continuation interface.
185 */
186 static interface JobCompletionHandler
187 {
188 public void completed(IoSession session, Job job);
189
190 public void notCompleted(final IoSession session, final Job job);
191 }
192 }
|