001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.ping;
022
023 import junit.framework.Test;
024 import junit.framework.TestSuite;
025
026 import org.apache.log4j.Logger;
027
028 import org.apache.qpid.client.AMQSession;
029 import org.apache.qpid.requestreply.PingPongProducer;
030
031 import org.apache.qpid.junit.extensions.TimingController;
032 import org.apache.qpid.junit.extensions.TimingControllerAware;
033 import org.apache.qpid.junit.extensions.util.ParsedProperties;
034
035 import javax.jms.JMSException;
036 import javax.jms.Message;
037
038 import java.util.Collections;
039 import java.util.HashMap;
040 import java.util.Map;
041 import java.util.concurrent.atomic.AtomicLong;
042
043 /**
044 * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing
045 * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for
046 * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from
047 * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than
048 * waiting until all expected replies are received.
049 *
050 * <p/>This test does not output timings for every single ping message, as when running at high volume, writing the test
051 * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The
052 * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the
053 * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}.
054 *
055 * <p/>The size parameter logged for each individual ping is set to the size of the batch of messages that the
056 * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput
057 * (messages / time) can be calculated in order to examine the relationship between throughput and latency.
058 *
059 * <p/><table id="crc"><caption>CRC Card</caption> <tr><td> Responsibilities <th> Collaborations <tr><td> Send many ping
060 * messages and output timings for sampled individual pings. </table>
061 */
062 public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware
063 {
064 private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class);
065
066 /** Holds the name of the property to get the test results logging batch size. */
067 public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize";
068
069 /** Holds the default test results logging batch size. */
070 public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000;
071
072 /** Used to hold the timing controller passed from the test runner. */
073 private TimingController _timingController;
074
075 /** Used to generate unique correlation ids for each test run. */
076 private AtomicLong corellationIdGenerator = new AtomicLong();
077
078 /**
079 * Holds test specifics by correlation id. This consists of the expected number of messages and the timing
080 * controler.
081 */
082 private Map<String, PerCorrelationId> perCorrelationIds =
083 Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
084
085 /** Holds the batched results listener, that does logging on batch boundaries. */
086 private BatchedResultsListener batchedResultsListener = null;
087
088 /**
089 * Creates a new asynchronous ping performance test with the specified name.
090 *
091 * @param name The test name.
092 */
093 public PingLatencyTestPerf(String name)
094 {
095 super(name);
096
097 // Sets up the test parameters with defaults.
098 ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
099 Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE));
100 }
101
102 /** Compile all the tests into a test suite. */
103 public static Test suite()
104 {
105 // Build a new test suite
106 TestSuite suite = new TestSuite("Ping Latency Tests");
107
108 // Run performance tests in read committed mode.
109 suite.addTest(new PingLatencyTestPerf("testPingLatency"));
110
111 return suite;
112 }
113
114 /**
115 * Accepts a timing controller from the test runner.
116 *
117 * @param timingController The timing controller to register mutliple timings with.
118 */
119 public void setTimingController(TimingController timingController)
120 {
121 _timingController = timingController;
122 }
123
124 /**
125 * Gets the timing controller passed in by the test runner.
126 *
127 * @return The timing controller passed in by the test runner.
128 */
129 public TimingController getTimingController()
130 {
131 return _timingController;
132 }
133
134 /**
135 * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all
136 * replies have been received or a time out occurs before exiting this method.
137 *
138 * @param numPings The number of pings to send.
139 */
140 public void testPingLatency(int numPings) throws Exception
141 {
142 _logger.debug("public void testPingLatency(int numPings): called");
143
144 // Ensure that at least one ping was requeusted.
145 if (numPings == 0)
146 {
147 _logger.error("Number of pings requested was zero.");
148 }
149
150 // Get the per thread test setup to run the test through.
151 PerThreadSetup perThreadSetup = threadSetup.get();
152 PingClient pingClient = perThreadSetup._pingClient;
153
154 // Advance the correlation id of messages to send, to make it unique for this run.
155 String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet());
156 _logger.debug("messageCorrelationId = " + messageCorrelationId);
157
158 // Initialize the count and timing controller for the new correlation id.
159 PerCorrelationId perCorrelationId = new PerCorrelationId();
160 TimingController tc = getTimingController().getControllerForCurrentThread();
161 perCorrelationId._tc = tc;
162 perCorrelationId._expectedCount = numPings;
163 perCorrelationIds.put(messageCorrelationId, perCorrelationId);
164
165 // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
166 // messages.
167 pingClient.setChainedMessageListener(batchedResultsListener);
168
169 // Generate a sample message of the specified size.
170 Message msg =
171 pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
172 testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
173 testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
174
175 // Send the requested number of messages, and wait until they have all been received.
176 long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
177 int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
178
179 // Check that all the replies were received and log a fail if they were not.
180 if (numReplies < numPings)
181 {
182 tc.completeTest(false, 0);
183 }
184
185 // Remove the chained message listener from the ping producer.
186 pingClient.removeChainedMessageListener();
187
188 // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
189 perCorrelationIds.remove(messageCorrelationId);
190 }
191
192 /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */
193 public void threadSetUp()
194 {
195 _logger.debug("public void threadSetUp(): called");
196
197 try
198 {
199 // Call the set up method in the super class. This creates a PingClient pinger.
200 super.threadSetUp();
201
202 // Create the chained message listener, only if it has not already been created. This is set up with the
203 // batch size property, to tell it what batch size to output results on. A synchronized block is used to
204 // ensure that only one thread creates this.
205 synchronized (this)
206 {
207 if (batchedResultsListener == null)
208 {
209 int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
210 batchedResultsListener = new BatchedResultsListener(batchSize);
211 }
212 }
213
214 // Get the set up that the super class created.
215 PerThreadSetup perThreadSetup = threadSetup.get();
216
217 // Register the chained message listener on the pinger to do its asynchronous test timings from.
218 perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
219 }
220 catch (Exception e)
221 {
222 _logger.warn("There was an exception during per thread setup.", e);
223 }
224 }
225
226 /**
227 * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can
228 * be attached to the pinger, in order to receive notifications about every message received and the number
229 * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener
230 * outputs a test timing for the actual number of messages received in the current batch.
231 */
232 private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener
233 {
234 /** The test results logging batch size. */
235 int _batchSize;
236 private boolean _strictAMQP;
237
238 /**
239 * Creates a results listener on the specified batch size.
240 *
241 * @param batchSize The batch size to use.
242 */
243 public BatchedResultsListener(int batchSize)
244 {
245 _batchSize = batchSize;
246 _strictAMQP =
247 Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP,
248 AMQSession.STRICT_AMQP_DEFAULT));
249 }
250
251 /**
252 * This callback method is called from all of the pingers that this test creates. It uses the correlation id
253 * from the message to identify the timing controller for the test thread that was responsible for sending those
254 * messages.
255 *
256 * @param message The message.
257 * @param remainingCount The count of messages remaining to be received with a particular correlation id.
258 *
259 * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through.
260 */
261 public void onMessage(Message message, int remainingCount, long latency) throws JMSException
262 {
263 _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called");
264
265 // Check if a batch boundary has been crossed.
266 if ((remainingCount % _batchSize) == 0)
267 {
268 // Extract the correlation id from the message.
269 String correlationId = message.getJMSCorrelationID();
270
271 // Get the details for the correlation id and check that they are not null. They can become null
272 // if a test times out.
273 PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId);
274 if (perCorrelationId != null)
275 {
276 // Get the timing controller and expected count for this correlation id.
277 TimingController tc = perCorrelationId._tc;
278 int expected = perCorrelationId._expectedCount;
279
280 // Calculate how many messages were actually received in the last batch. This will be the batch size
281 // except where the number expected is not a multiple of the batch size and this is the first remaining
282 // count to cross a batch size boundary, in which case it will be the number expected modulo the batch
283 // size.
284 int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize;
285
286 // Register a test result for the correlation id.
287 try
288 {
289 tc.completeTest(true, receivedInBatch, latency);
290 }
291 catch (InterruptedException e)
292 {
293 // Ignore this. It means the test runner wants to stop as soon as possible.
294 _logger.warn("Got InterruptedException.", e);
295 }
296 }
297 // Else ignore, test timed out. Should log a fail here?
298 }
299 }
300 }
301
302 /**
303 * Holds state specific to each correlation id, needed to output test results. This consists of the count of the
304 * total expected number of messages, and the timing controller for the thread sending those message ids.
305 */
306 private static class PerCorrelationId
307 {
308 public int _expectedCount;
309 public TimingController _tc;
310 }
311 }
|