Job.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.pool;
022 
023 import java.util.concurrent.ConcurrentLinkedQueue;
024 import java.util.concurrent.atomic.AtomicBoolean;
025 
026 import org.apache.mina.common.IoSession;
027 
028 /**
029  * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
030  * The {@link Event}s themselves provide methods to process themselves, so processing a job simply consists of sequentially
031  * processing all of its aggregated events.
032  *
033  * The constructor accepts a maximum number of events for the job, and only runs up to that maximum number when
034  * processing the job, but the add method does not enforce this maximum. In other words, not all the enqueued events
035  * may be processed in each run of the job, several runs may be required to clear the queue.
036  *
037  <p/><table id="crc"><caption>CRC Card</caption>
038  <tr><th> Responsibilities <th> Collaborations
039  <tr><td> Aggregate many coninuations together into a single continuation.
040  <tr><td> Sequentially process aggregated continuations. <td> {@link Event}
041  <tr><td> Provide running and completion status of the aggregate continuation.
042  <tr><td> Execute a terminal continuation upon job completion. <td> {@link JobCompletionHandler}
043  </table>
044  *
045  * @todo Could make Job implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as a
046  *       continuation. Job is a continuation that aggregates other continuations and as such is a usefull re-usable
047  *       piece of code. There may be other palces than the mina filter chain where continuation batching is used within
048  *       qpid, so abstracting this out could provide a usefull building block. This also opens the way to different
049  *       kinds of job with a common interface, e.g. parallel or sequential jobs etc.
050  *
051  * @todo For better re-usability could make the completion handler optional. Only run it when one is set.
052  */
053 public class Job implements ReadWriteRunnable
054 {
055     /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
056     private final int _maxEvents;
057 
058     /** The Mina session. */
059     private final IoSession _session;
060 
061     /** Holds the queue of events that make up the job. */
062     private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
063 
064     /** Holds a status flag, that indicates when the job is actively running. */
065     private final AtomicBoolean _active = new AtomicBoolean();
066 
067     /** Holds the completion continuation, called upon completion of a run of the job. */
068     private final JobCompletionHandler _completionHandler;
069 
070     private final boolean _readJob;
071 
072     /**
073      * Creates a new job that aggregates many continuations together.
074      *
075      @param session           The Mina session.
076      @param completionHandler The per job run, terminal continuation.
077      @param maxEvents         The maximum number of aggregated continuations to process per run of the job.
078      @param readJob
079      */
080     Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
081     {
082         _session = session;
083         _completionHandler = completionHandler;
084         _maxEvents = maxEvents;
085         _readJob = readJob;
086     }
087 
088     /**
089      * Enqueus a continuation for sequential processing by this job.
090      *
091      @param evt The continuation to enqueue.
092      */
093     void add(Event evt)
094     {
095         _eventQueue.add(evt);
096     }
097 
098     /**
099      * Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job.
100      */
101     boolean processAll()
102     {
103         // limit the number of events processed in one run
104         int i = _maxEvents;
105         while--i != )
106         {
107             Event e = _eventQueue.poll();
108             if (e == null)
109             {
110                 return true;
111             }
112             else
113             {
114                 e.process(_session);
115             }
116         }
117         return false;
118     }
119 
120     /**
121      * Tests if there are no more enqueued continuations to process.
122      *
123      @return <tt>true</tt> if there are no enqueued continuations in this job, <tt>false</tt> otherwise.
124      */
125     public boolean isComplete()
126     {
127         return _eventQueue.peek() == null;
128     }
129 
130     /**
131      * Marks this job as active if it is inactive. This method is thread safe.
132      *
133      @return <tt>true</tt> if this job was inactive and has now been marked as active, <tt>false</tt> otherwise.
134      */
135     public boolean activate()
136     {
137         return _active.compareAndSet(false, true);
138     }
139 
140     /**
141      * Marks this job as inactive. This method is thread safe.
142      */
143     public void deactivate()
144     {
145         _active.set(false);
146     }
147 
148     /**
149      * Processes a batch of aggregated continuations, marks this job as inactive and call the terminal continuation.
150      */
151     public void run()
152     {
153         if(processAll())
154         {
155             deactivate();
156             _completionHandler.completed(_session, this);
157         }
158         else
159         {
160             _completionHandler.notCompleted(_session, this);
161         }
162     }
163 
164     public boolean isReadJob()
165     {
166         return _readJob;
167     }
168 
169     public boolean isRead()
170     {
171         return _readJob;
172     }
173 
174     public boolean isWrite()
175     {
176         return !_readJob;
177     }
178 
179 
180     /**
181      * Another interface for a continuation.
182      *
183      * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask,
184      *       Runnable or a custom Continuation interface.
185      */
186     static interface JobCompletionHandler
187     {
188         public void completed(IoSession session, Job job);
189 
190         public void notCompleted(final IoSession session, final Job job);
191     }
192 }