001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.ping;
022
023 import junit.framework.Test;
024 import junit.framework.TestSuite;
025
026 import org.apache.log4j.Logger;
027
028 import org.apache.qpid.requestreply.PingPongProducer;
029
030 import org.apache.qpid.junit.extensions.TimingController;
031 import org.apache.qpid.junit.extensions.TimingControllerAware;
032
033 import javax.jms.JMSException;
034 import javax.jms.Message;
035
036 import java.util.Collections;
037 import java.util.HashMap;
038 import java.util.Map;
039 import java.util.concurrent.atomic.AtomicLong;
040
041 /**
042 * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller
043 * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test
044 * that it extends because it can output timings as replies are received, rather than waiting until all expected replies
045 * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending
046 * and recieving clients working asynchronously.
047 *
048 * <p/><table id="crc"><caption>CRC Card</caption>
049 * <tr><td> Responsibilities <th> Collaborations
050 * <tr><td> Send many ping messages and output timings asynchronously on batches received.
051 * </table>
052 */
053 public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware
054 {
055 private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
056
057 /** Holds the name of the property to get the test results logging batch size. */
058 public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize";
059
060 /** Holds the default test results logging batch size. */
061 public static final int TEST_RESULTS_BATCH_SIZE_DEFAULT = 1000;
062
063 /** Used to hold the timing controller passed from the test runner. */
064 private TimingController _timingController;
065
066 /** Used to generate unique correlation ids for each test run. */
067 private AtomicLong corellationIdGenerator = new AtomicLong();
068
069 /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */
070 private Map<String, PerCorrelationId> perCorrelationIds =
071 Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
072
073 /** Holds the batched results listener, that does logging on batch boundaries. */
074 private BatchedResultsListener batchedResultsListener = null;
075
076 /**
077 * Creates a new asynchronous ping performance test with the specified name.
078 *
079 * @param name The test name.
080 */
081 public PingAsyncTestPerf(String name)
082 {
083 super(name);
084
085 // Sets up the test parameters with defaults.
086 testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
087 Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT));
088 }
089
090 /**
091 * Compile all the tests into a test suite.
092 * @return The test suite to run. Should only contain testAsyncPingOk method.
093 */
094 public static Test suite()
095 {
096 // Build a new test suite
097 TestSuite suite = new TestSuite("Ping Performance Tests");
098
099 // Run performance tests in read committed mode.
100 suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
101
102 return suite;
103 }
104
105 /**
106 * Accepts a timing controller from the test runner.
107 *
108 * @param timingController The timing controller to register mutliple timings with.
109 */
110 public void setTimingController(TimingController timingController)
111 {
112 _timingController = timingController;
113 }
114
115 /**
116 * Gets the timing controller passed in by the test runner.
117 *
118 * @return The timing controller passed in by the test runner.
119 */
120 public TimingController getTimingController()
121 {
122 return _timingController;
123 }
124
125 /**
126 * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until
127 * all replies have been received or a time out occurs before exiting this method.
128 *
129 * @param numPings The number of pings to send.
130 * @throws Exception pass all errors out to the test harness
131 */
132 public void testAsyncPingOk(int numPings) throws Exception
133 {
134 // _logger.debug("public void testAsyncPingOk(int numPings): called");
135
136 // Ensure that at least one ping was requeusted.
137 if (numPings == 0)
138 {
139 _logger.error("Number of pings requested was zero.");
140 fail("Number of pings requested was zero.");
141 }
142
143 // Get the per thread test setup to run the test through.
144 PerThreadSetup perThreadSetup = threadSetup.get();
145 PingClient pingClient = perThreadSetup._pingClient;
146
147 // Advance the correlation id of messages to send, to make it unique for this run.
148 perThreadSetup._correlationId = Long.toString(corellationIdGenerator.incrementAndGet());
149 // String messageCorrelationId = perThreadSetup._correlationId;
150 // _logger.debug("messageCorrelationId = " + messageCorrelationId);
151
152 // Initialize the count and timing controller for the new correlation id.
153 PerCorrelationId perCorrelationId = new PerCorrelationId();
154 TimingController tc = getTimingController().getControllerForCurrentThread();
155 perCorrelationId._tc = tc;
156 perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings);
157 perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
158
159 // Send the requested number of messages, and wait until they have all been received.
160 long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
161 int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId);
162
163 // Check that all the replies were received and log a fail if they were not.
164 if (numReplies < perCorrelationId._expectedCount)
165 {
166 perCorrelationId._tc.completeTest(false, numPings - perCorrelationId._expectedCount);
167 }
168
169 // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
170 perCorrelationIds.remove(perThreadSetup._correlationId);
171 }
172
173 /**
174 * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
175 */
176 public void threadSetUp()
177 {
178 _logger.debug("public void threadSetUp(): called");
179
180 try
181 {
182 // Call the set up method in the super class. This creates a PingClient pinger.
183 super.threadSetUp();
184
185 // Create the chained message listener, only if it has not already been created. This is set up with the
186 // batch size property, to tell it what batch size to output results on. A synchronized block is used to
187 // ensure that only one thread creates this.
188 synchronized (this)
189 {
190 if (batchedResultsListener == null)
191 {
192 int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
193 batchedResultsListener = new BatchedResultsListener(batchSize);
194 }
195 }
196
197 // Get the set up that the super class created.
198 PerThreadSetup perThreadSetup = threadSetup.get();
199
200 // Register the chained message listener on the pinger to do its asynchronous test timings from.
201 perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
202 }
203 catch (Exception e)
204 {
205 _logger.warn("There was an exception during per thread setup.", e);
206 }
207 }
208
209 /**
210 * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the
211 * pinger, in order to receive notifications about every message received and the number remaining to be
212 * received. Whenever the number remaining crosses a batch size boundary this results listener outputs
213 * a test timing for the actual number of messages received in the current batch.
214 */
215 private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener
216 {
217 /** The test results logging batch size. */
218 int _batchSize;
219
220 /**
221 * Creates a results listener on the specified batch size.
222 *
223 * @param batchSize The batch size to use.
224 */
225 public BatchedResultsListener(int batchSize)
226 {
227 _batchSize = batchSize;
228 }
229
230 /**
231 * This callback method is called from all of the pingers that this test creates. It uses the correlation id
232 * from the message to identify the timing controller for the test thread that was responsible for sending those
233 * messages.
234 *
235 * @param message The message.
236 * @param remainingCount The count of messages remaining to be received with a particular correlation id.
237 *
238 * @throws JMSException Any underlying JMSException is allowed to fall through.
239 */
240 public void onMessage(Message message, int remainingCount, long latency) throws JMSException
241 {
242 // Check if a batch boundary has been crossed.
243 if ((remainingCount % _batchSize) == 0)
244 {
245 // Extract the correlation id from the message.
246 String correlationId = message.getJMSCorrelationID();
247
248 /*_logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount
249 + "): called on batch boundary for message id: " + correlationId + " with thread id: "
250 + Thread.currentThread().getId());*/
251
252 // Get the details for the correlation id and check that they are not null. They can become null
253 // if a test times out.
254 PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId);
255 if (perCorrelationId != null)
256 {
257 // Get the timing controller and expected count for this correlation id.
258 TimingController tc = perCorrelationId._tc;
259 int expected = perCorrelationId._expectedCount;
260
261 // Calculate how many messages were actually received in the last batch. This will be the batch size
262 // except where the number expected is not a multiple of the batch size and this is the first remaining
263 // count to cross a batch size boundary, in which case it will be the number expected modulo the batch
264 // size.
265 int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize;
266
267 // Register a test result for the correlation id.
268 try
269 {
270 tc.completeTest(true, receivedInBatch);
271 }
272 catch (InterruptedException e)
273 {
274 // Ignore this. It means the test runner wants to stop as soon as possible.
275 _logger.warn("Got InterruptedException.", e);
276 }
277 }
278 // Else ignore, test timed out. Should log a fail here?
279 }
280 }
281 }
282
283 /**
284 * Holds state specific to each correlation id, needed to output test results. This consists of the count of
285 * the total expected number of messages, and the timing controller for the thread sending those message ids.
286 */
287 private static class PerCorrelationId
288 {
289 public int _expectedCount;
290 public TimingController _tc;
291 }
292 }
|