PingAsyncTestPerf.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.ping;
022 
023 import junit.framework.Test;
024 import junit.framework.TestSuite;
025 
026 import org.apache.log4j.Logger;
027 
028 import org.apache.qpid.requestreply.PingPongProducer;
029 
030 import org.apache.qpid.junit.extensions.TimingController;
031 import org.apache.qpid.junit.extensions.TimingControllerAware;
032 
033 import javax.jms.JMSException;
034 import javax.jms.Message;
035 
036 import java.util.Collections;
037 import java.util.HashMap;
038 import java.util.Map;
039 import java.util.concurrent.atomic.AtomicLong;
040 
041 /**
042  * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller
043  * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test
044  * that it extends because it can output timings as replies are received, rather than waiting until all expected replies
045  * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending
046  * and recieving clients working asynchronously.
047  *
048  <p/><table id="crc"><caption>CRC Card</caption>
049  <tr><td> Responsibilities <th> Collaborations
050  <tr><td> Send many ping messages and output timings asynchronously on batches received.
051  </table>
052  */
053 public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware
054 {
055     private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
056 
057     /** Holds the name of the property to get the test results logging batch size. */
058     public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize";
059 
060     /** Holds the default test results logging batch size. */
061     public static final int TEST_RESULTS_BATCH_SIZE_DEFAULT = 1000;
062 
063     /** Used to hold the timing controller passed from the test runner. */
064     private TimingController _timingController;
065 
066     /** Used to generate unique correlation ids for each test run. */
067     private AtomicLong corellationIdGenerator = new AtomicLong();
068 
069     /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */
070     private Map<String, PerCorrelationId> perCorrelationIds =
071         Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
072 
073     /** Holds the batched results listener, that does logging on batch boundaries. */
074     private BatchedResultsListener batchedResultsListener = null;
075 
076     /**
077      * Creates a new asynchronous ping performance test with the specified name.
078      *
079      @param name The test name.
080      */
081     public PingAsyncTestPerf(String name)
082     {
083         super(name);
084 
085         // Sets up the test parameters with defaults.
086         testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
087             Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT));
088     }
089 
090     /**
091      * Compile all the tests into a test suite.
092      @return The test suite to run. Should only contain testAsyncPingOk method.
093      */
094     public static Test suite()
095     {
096         // Build a new test suite
097         TestSuite suite = new TestSuite("Ping Performance Tests");
098 
099         // Run performance tests in read committed mode.
100         suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
101 
102         return suite;
103     }
104 
105     /**
106      * Accepts a timing controller from the test runner.
107      *
108      @param timingController The timing controller to register mutliple timings with.
109      */
110     public void setTimingController(TimingController timingController)
111     {
112         _timingController = timingController;
113     }
114 
115     /**
116      * Gets the timing controller passed in by the test runner.
117      *
118      @return The timing controller passed in by the test runner.
119      */
120     public TimingController getTimingController()
121     {
122         return _timingController;
123     }
124 
125     /**
126      * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until
127      * all replies have been received or a time out occurs before exiting this method.
128      *
129      @param numPings The number of pings to send.
130      @throws Exception pass all errors out to the test harness
131      */
132     public void testAsyncPingOk(int numPingsthrows Exception
133     {
134         // _logger.debug("public void testAsyncPingOk(int numPings): called");
135 
136         // Ensure that at least one ping was requeusted.
137         if (numPings == 0)
138         {
139             _logger.error("Number of pings requested was zero.");
140             fail("Number of pings requested was zero.");
141         }
142 
143         // Get the per thread test setup to run the test through.
144         PerThreadSetup perThreadSetup = threadSetup.get();
145         PingClient pingClient = perThreadSetup._pingClient;
146 
147         // Advance the correlation id of messages to send, to make it unique for this run.
148         perThreadSetup._correlationId = Long.toString(corellationIdGenerator.incrementAndGet());
149         // String messageCorrelationId = perThreadSetup._correlationId;
150         // _logger.debug("messageCorrelationId = " + messageCorrelationId);
151 
152         // Initialize the count and timing controller for the new correlation id.
153         PerCorrelationId perCorrelationId = new PerCorrelationId();
154         TimingController tc = getTimingController().getControllerForCurrentThread();
155         perCorrelationId._tc = tc;
156         perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings);
157         perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
158 
159         // Send the requested number of messages, and wait until they have all been received.
160         long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
161         int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId);
162 
163         // Check that all the replies were received and log a fail if they were not.
164         if (numReplies < perCorrelationId._expectedCount)
165         {
166             perCorrelationId._tc.completeTest(false, numPings - perCorrelationId._expectedCount);
167         }
168 
169         // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
170         perCorrelationIds.remove(perThreadSetup._correlationId);
171     }
172 
173     /**
174      * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
175      */
176     public void threadSetUp()
177     {
178         _logger.debug("public void threadSetUp(): called");
179 
180         try
181         {
182             // Call the set up method in the super class. This creates a PingClient pinger.
183             super.threadSetUp();
184 
185             // Create the chained message listener, only if it has not already been created.  This is set up with the
186             // batch size property, to tell it what batch size to output results on. A synchronized block is used to
187             // ensure that only one thread creates this.
188             synchronized (this)
189             {
190                 if (batchedResultsListener == null)
191                 {
192                     int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
193                     batchedResultsListener = new BatchedResultsListener(batchSize);
194                 }
195             }
196 
197             // Get the set up that the super class created.
198             PerThreadSetup perThreadSetup = threadSetup.get();
199 
200             // Register the chained message listener on the pinger to do its asynchronous test timings from.
201             perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
202         }
203         catch (Exception e)
204         {
205             _logger.warn("There was an exception during per thread setup.", e);
206         }
207     }
208 
209     /**
210      * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the
211      * pinger, in order to receive notifications about every message received and the number remaining to be
212      * received. Whenever the number remaining crosses a batch size boundary this results listener outputs
213      * a test timing for the actual number of messages received in the current batch.
214      */
215     private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener
216     {
217         /** The test results logging batch size. */
218         int _batchSize;
219 
220         /**
221          * Creates a results listener on the specified batch size.
222          *
223          @param batchSize The batch size to use.
224          */
225         public BatchedResultsListener(int batchSize)
226         {
227             _batchSize = batchSize;
228         }
229 
230         /**
231          * This callback method is called from all of the pingers that this test creates. It uses the correlation id
232          * from the message to identify the timing controller for the test thread that was responsible for sending those
233          * messages.
234          *
235          @param message        The message.
236          @param remainingCount The count of messages remaining to be received with a particular correlation id.
237          *
238          @throws JMSException Any underlying JMSException is allowed to fall through.
239          */
240         public void onMessage(Message message, int remainingCount, long latencythrows JMSException
241         {
242             // Check if a batch boundary has been crossed.
243             if ((remainingCount % _batchSize== 0)
244             {
245                 // Extract the correlation id from the message.
246                 String correlationId = message.getJMSCorrelationID();
247 
248                 /*_logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount
249                     + "): called on batch boundary for message id: " + correlationId + " with thread id: "
250                     + Thread.currentThread().getId());*/
251 
252                 // Get the details for the correlation id and check that they are not null. They can become null
253                 // if a test times out.
254                 PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId);
255                 if (perCorrelationId != null)
256                 {
257                     // Get the timing controller and expected count for this correlation id.
258                     TimingController tc = perCorrelationId._tc;
259                     int expected = perCorrelationId._expectedCount;
260 
261                     // Calculate how many messages were actually received in the last batch. This will be the batch size
262                     // except where the number expected is not a multiple of the batch size and this is the first remaining
263                     // count to cross a batch size boundary, in which case it will be the number expected modulo the batch
264                     // size.
265                     int receivedInBatch = ((expected - remainingCount< _batchSize(expected % _batchSize: _batchSize;
266 
267                     // Register a test result for the correlation id.
268                     try
269                     {
270                         tc.completeTest(true, receivedInBatch);
271                     }
272                     catch (InterruptedException e)
273                     {
274                         // Ignore this. It means the test runner wants to stop as soon as possible.
275                         _logger.warn("Got InterruptedException.", e);
276                     }
277                 }
278                 // Else ignore, test timed out. Should log a fail here?
279             }
280         }
281     }
282 
283     /**
284      * Holds state specific to each correlation id, needed to output test results. This consists of the count of
285      * the total expected number of messages, and the timing controller for the thread sending those message ids.
286      */
287     private static class PerCorrelationId
288     {
289         public int _expectedCount;
290         public TimingController _tc;
291     }
292 }