PingLatencyTestPerf.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.ping;
022 
023 import junit.framework.Test;
024 import junit.framework.TestSuite;
025 
026 import org.apache.log4j.Logger;
027 
028 import org.apache.qpid.client.AMQSession;
029 import org.apache.qpid.requestreply.PingPongProducer;
030 
031 import org.apache.qpid.junit.extensions.TimingController;
032 import org.apache.qpid.junit.extensions.TimingControllerAware;
033 import org.apache.qpid.junit.extensions.util.ParsedProperties;
034 
035 import javax.jms.JMSException;
036 import javax.jms.Message;
037 
038 import java.util.Collections;
039 import java.util.HashMap;
040 import java.util.Map;
041 import java.util.concurrent.atomic.AtomicLong;
042 
043 /**
044  * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing
045  * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for
046  * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from
047  * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than
048  * waiting until all expected replies are received.
049  *
050  <p/>This test does not output timings for every single ping message, as when running at high volume, writing the test
051  * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The
052  * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the
053  * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}.
054  *
055  <p/>The size parameter logged for each individual ping is set to the size of the batch of messages that the
056  * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput
057  * (messages / time) can be calculated in order to examine the relationship between throughput and latency.
058  *
059  <p/><table id="crc"><caption>CRC Card</caption> <tr><td> Responsibilities <th> Collaborations <tr><td> Send many ping
060  * messages and output timings for sampled individual pings. </table>
061  */
062 public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware
063 {
064     private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class);
065 
066     /** Holds the name of the property to get the test results logging batch size. */
067     public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize";
068 
069     /** Holds the default test results logging batch size. */
070     public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000;
071 
072     /** Used to hold the timing controller passed from the test runner. */
073     private TimingController _timingController;
074 
075     /** Used to generate unique correlation ids for each test run. */
076     private AtomicLong corellationIdGenerator = new AtomicLong();
077 
078     /**
079      * Holds test specifics by correlation id. This consists of the expected number of messages and the timing
080      * controler.
081      */
082     private Map<String, PerCorrelationId> perCorrelationIds =
083         Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
084 
085     /** Holds the batched results listener, that does logging on batch boundaries. */
086     private BatchedResultsListener batchedResultsListener = null;
087 
088     /**
089      * Creates a new asynchronous ping performance test with the specified name.
090      *
091      @param name The test name.
092      */
093     public PingLatencyTestPerf(String name)
094     {
095         super(name);
096 
097         // Sets up the test parameters with defaults.
098         ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
099             Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE));
100     }
101 
102     /** Compile all the tests into a test suite. */
103     public static Test suite()
104     {
105         // Build a new test suite
106         TestSuite suite = new TestSuite("Ping Latency Tests");
107 
108         // Run performance tests in read committed mode.
109         suite.addTest(new PingLatencyTestPerf("testPingLatency"));
110 
111         return suite;
112     }
113 
114     /**
115      * Accepts a timing controller from the test runner.
116      *
117      @param timingController The timing controller to register mutliple timings with.
118      */
119     public void setTimingController(TimingController timingController)
120     {
121         _timingController = timingController;
122     }
123 
124     /**
125      * Gets the timing controller passed in by the test runner.
126      *
127      @return The timing controller passed in by the test runner.
128      */
129     public TimingController getTimingController()
130     {
131         return _timingController;
132     }
133 
134     /**
135      * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all
136      * replies have been received or a time out occurs before exiting this method.
137      *
138      @param numPings The number of pings to send.
139      */
140     public void testPingLatency(int numPingsthrows Exception
141     {
142         _logger.debug("public void testPingLatency(int numPings): called");
143 
144         // Ensure that at least one ping was requeusted.
145         if (numPings == 0)
146         {
147             _logger.error("Number of pings requested was zero.");
148         }
149 
150         // Get the per thread test setup to run the test through.
151         PerThreadSetup perThreadSetup = threadSetup.get();
152         PingClient pingClient = perThreadSetup._pingClient;
153 
154         // Advance the correlation id of messages to send, to make it unique for this run.
155         String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet());
156         _logger.debug("messageCorrelationId = " + messageCorrelationId);
157 
158         // Initialize the count and timing controller for the new correlation id.
159         PerCorrelationId perCorrelationId = new PerCorrelationId();
160         TimingController tc = getTimingController().getControllerForCurrentThread();
161         perCorrelationId._tc = tc;
162         perCorrelationId._expectedCount = numPings;
163         perCorrelationIds.put(messageCorrelationId, perCorrelationId);
164 
165         // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
166         // messages.
167         pingClient.setChainedMessageListener(batchedResultsListener);
168 
169         // Generate a sample message of the specified size.
170         Message msg =
171             pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
172                 testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
173                 testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
174 
175         // Send the requested number of messages, and wait until they have all been received.
176         long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
177         int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
178 
179         // Check that all the replies were received and log a fail if they were not.
180         if (numReplies < numPings)
181         {
182             tc.completeTest(false, 0);
183         }
184 
185         // Remove the chained message listener from the ping producer.
186         pingClient.removeChainedMessageListener();
187 
188         // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
189         perCorrelationIds.remove(messageCorrelationId);
190     }
191 
192     /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */
193     public void threadSetUp()
194     {
195         _logger.debug("public void threadSetUp(): called");
196 
197         try
198         {
199             // Call the set up method in the super class. This creates a PingClient pinger.
200             super.threadSetUp();
201 
202             // Create the chained message listener, only if it has not already been created.  This is set up with the
203             // batch size property, to tell it what batch size to output results on. A synchronized block is used to
204             // ensure that only one thread creates this.
205             synchronized (this)
206             {
207                 if (batchedResultsListener == null)
208                 {
209                     int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
210                     batchedResultsListener = new BatchedResultsListener(batchSize);
211                 }
212             }
213 
214             // Get the set up that the super class created.
215             PerThreadSetup perThreadSetup = threadSetup.get();
216 
217             // Register the chained message listener on the pinger to do its asynchronous test timings from.
218             perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
219         }
220         catch (Exception e)
221         {
222             _logger.warn("There was an exception during per thread setup.", e);
223         }
224     }
225 
226     /**
227      * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can
228      * be attached to the pinger, in order to receive notifications about every message received and the number
229      * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener
230      * outputs a test timing for the actual number of messages received in the current batch.
231      */
232     private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener
233     {
234         /** The test results logging batch size. */
235         int _batchSize;
236         private boolean _strictAMQP;
237 
238         /**
239          * Creates a results listener on the specified batch size.
240          *
241          @param batchSize The batch size to use.
242          */
243         public BatchedResultsListener(int batchSize)
244         {
245             _batchSize = batchSize;
246             _strictAMQP =
247                 Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP,
248                         AMQSession.STRICT_AMQP_DEFAULT));
249         }
250 
251         /**
252          * This callback method is called from all of the pingers that this test creates. It uses the correlation id
253          * from the message to identify the timing controller for the test thread that was responsible for sending those
254          * messages.
255          *
256          @param message        The message.
257          @param remainingCount The count of messages remaining to be received with a particular correlation id.
258          *
259          @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through.
260          */
261         public void onMessage(Message message, int remainingCount, long latencythrows JMSException
262         {
263             _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called");
264 
265             // Check if a batch boundary has been crossed.
266             if ((remainingCount % _batchSize== 0)
267             {
268                 // Extract the correlation id from the message.
269                 String correlationId = message.getJMSCorrelationID();
270 
271                 // Get the details for the correlation id and check that they are not null. They can become null
272                 // if a test times out.
273                 PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId);
274                 if (perCorrelationId != null)
275                 {
276                     // Get the timing controller and expected count for this correlation id.
277                     TimingController tc = perCorrelationId._tc;
278                     int expected = perCorrelationId._expectedCount;
279 
280                     // Calculate how many messages were actually received in the last batch. This will be the batch size
281                     // except where the number expected is not a multiple of the batch size and this is the first remaining
282                     // count to cross a batch size boundary, in which case it will be the number expected modulo the batch
283                     // size.
284                     int receivedInBatch = ((expected - remainingCount< _batchSize(expected % _batchSize: _batchSize;
285 
286                     // Register a test result for the correlation id.
287                     try
288                     {
289                         tc.completeTest(true, receivedInBatch, latency);
290                     }
291                     catch (InterruptedException e)
292                     {
293                         // Ignore this. It means the test runner wants to stop as soon as possible.
294                         _logger.warn("Got InterruptedException.", e);
295                     }
296                 }
297                 // Else ignore, test timed out. Should log a fail here?
298             }
299         }
300     }
301 
302     /**
303      * Holds state specific to each correlation id, needed to output test results. This consists of the count of the
304      * total expected number of messages, and the timing controller for the thread sending those message ids.
305      */
306     private static class PerCorrelationId
307     {
308         public int _expectedCount;
309         public TimingController _tc;
310     }
311 }