001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.ping;
022
023 import org.apache.log4j.Logger;
024
025 import org.apache.qpid.requestreply.PingPongProducer;
026 import org.apache.qpid.util.CommandLineParser;
027
028 import org.apache.qpid.junit.extensions.util.MathUtils;
029 import org.apache.qpid.junit.extensions.util.ParsedProperties;
030
031 import javax.jms.*;
032
033 import java.io.BufferedReader;
034 import java.io.IOException;
035 import java.io.InputStreamReader;
036 import java.util.List;
037 import java.util.Properties;
038 import java.util.concurrent.atomic.AtomicInteger;
039
040 /**
041 * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and
042 * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop
043 * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the
044 * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with
045 * failure conditions when using durable messaging.
046 *
047 * <p/>The events that can stop it from sending are input from the user on the console, failure of its connection to
048 * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases
049 * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings
050 * with.
051 *
052 * <p/>The event to re-connect and attempt to recieve the pings is input from the user on the console.
053 *
054 * <p/>This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and
055 * additionally accepts the following parameters:
056 *
057 * <p/><table><caption>Parameters</caption>
058 * <tr><th> Parameter <th> Default <th> Comments
059 * <tr><td> numMessages <td> 100 <td> The total number of messages to send.
060 * <tr><td> numMessagesToAction <td> -1 <td> The number of messages to send before taking a custom 'action'.
061 * <tr><td> duration <td> 30S <td> The length of time to ping for. (Format dDhHmMsS, for d days, h hours,
062 * m minutes and s seconds).
063 * </table>
064 *
065 * <p/>This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up
066 * when no parameters are specified.
067 *
068 * <p/><table><caption>Parameters</caption>
069 * <tr><th> Parameter <th> Default <th> Comments
070 * <tr><td> uniqueDests <td> false <td> Prevents destination names being timestamped.
071 * <tr><td> transacted <td> true <td> Only makes sense to test with transactions.
072 * <tr><td> persistent <td> true <td> Only makes sense to test persistent.
073 * <tr><td> durableDests <td> true <td> Should use durable queues with persistent messages.
074 * <tr><td> commitBatchSize <td> 10
075 * <tr><td> rate <td> 20 <td> Total default test time is 5 seconds.
076 * </table>
077 *
078 * <p/>When a number of messages or duration is specified, this ping client will ping until the first of those limits
079 * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will
080 * wait for the second signal before receiving its pings.
081 *
082 * <p/>This class provides a mechanism for extensions to add arbitrary actions, after a particular number of messages
083 * have been sent. When the number of messages equal the value set in the 'numMessagesToAction' property is method,
084 * the {@link #takeAction} method is called. By default this does nothing, but extensions of this class can provide
085 * custom behaviour with alternative implementations of this method (for example taking a backup).
086 *
087 * <p><table id="crc"><caption>CRC Card</caption>
088 * <tr><th> Responsibilities <th> Collaborations
089 * <tr><td> Send and receive pings.
090 * <tr><td> Accept user input to signal stop sending.
091 * <tr><td> Accept user input to signal start receiving.
092 * <tr><td> Provide feedback on pings sent versus pings received.
093 * <tr><td> Provide extension point for arbitrary action on a particular message count.
094 * </table>
095 */
096 public class PingDurableClient extends PingPongProducer implements ExceptionListener
097 {
098 private static final Logger log = Logger.getLogger(PingDurableClient.class);
099
100 public static final String NUM_MESSAGES_PROPNAME = "numMessages";
101 public static final String NUM_MESSAGES_DEFAULT = "100";
102 public static final String DURATION_PROPNAME = "duration";
103 public static final String DURATION_DEFAULT = "30S";
104 public static final String NUM_MESSAGES_TO_ACTION_PROPNAME = "numMessagesToAction";
105 public static final String NUM_MESSAGES_TO_ACTION_DEFAULT = "-1";
106
107 /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */
108 private static final long TIME_OUT = 3000;
109
110 static
111 {
112 defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT);
113 defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT);
114 defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false");
115 defaults.setProperty(TRANSACTED_PROPNAME, "true");
116 defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
117 defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
118 defaults.setProperty(RATE_PROPNAME, "20");
119 defaults.setProperty(DURABLE_DESTS_PROPNAME, "true");
120 defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT);
121 }
122
123 /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */
124 private int numMessages;
125
126 /** Holds the number of messages to send before taking triggering the action. */
127 private int numMessagesToAction;
128
129 /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */
130 private long duration;
131
132 /** Used to indciate that this application should terminate. Set by the shutdown hook. */
133 private boolean terminate = false;
134
135 /**
136 * @throws Exception Any exceptions are allowed to fall through.
137 */
138 public PingDurableClient(Properties overrides) throws Exception
139 {
140 super(overrides);
141 log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called");
142
143 // Extract the additional configuration parameters.
144 ParsedProperties properties = new ParsedProperties(defaults);
145 properties.putAll(overrides);
146
147 numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME);
148 String durationSpec = properties.getProperty(DURATION_PROPNAME);
149 numMessagesToAction = properties.getPropertyAsInteger(NUM_MESSAGES_TO_ACTION_PROPNAME);
150
151 if (durationSpec != null)
152 {
153 duration = MathUtils.parseDuration(durationSpec) * 1000000;
154 }
155 }
156
157 /**
158 * Starts the ping/wait/receive process.
159 *
160 * @param args The command line arguments.
161 */
162 public static void main(String[] args)
163 {
164 try
165 {
166 // Create a ping producer overriding its defaults with all options passed on the command line.
167 Properties options =
168 CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
169 PingDurableClient pingProducer = new PingDurableClient(options);
170
171 // Create a shutdown hook to terminate the ping-pong producer.
172 Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
173
174 // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
175 // pingProducer.getConnection().setExceptionListener(pingProducer);
176
177 // Run the test procedure.
178 int sent = pingProducer.send();
179 pingProducer.closeConnection();
180 pingProducer.waitForUser("Press return to begin receiving the pings.");
181 pingProducer.receive(sent);
182
183 System.exit(0);
184 }
185 catch (Exception e)
186 {
187 System.err.println(e.getMessage());
188 log.error("Top level handler caught execption.", e);
189 System.exit(1);
190 }
191 }
192
193 /**
194 * Performs the main test procedure implemented by this ping client. See the class level comment for details.
195 */
196 protected int send() throws Exception
197 {
198 log.debug("public void sendWaitReceive(): called");
199
200 log.debug("duration = " + duration);
201 log.debug("numMessages = " + numMessages);
202
203 if (duration > 0)
204 {
205 System.out.println("Sending for up to " + (duration / 1000000000f) + " seconds.");
206 }
207
208 if (_rate > 0)
209 {
210 System.out.println("Sending at " + _rate + " messages per second.");
211 }
212
213 if (numMessages > 0)
214 {
215 System.out.println("Sending up to " + numMessages + " messages.");
216 }
217
218 // Establish the connection and the message producer.
219 establishConnection(true, false);
220 _connection.start();
221
222 Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
223
224 // Send pings until a terminating condition is received.
225 boolean endCondition = false;
226 int messagesSent = 0;
227 int messagesCommitted = 0;
228 int messagesNotCommitted = 0;
229 long start = System.nanoTime();
230
231 // Clear console in.
232 clearConsole();
233
234 while (!endCondition)
235 {
236 boolean committed = false;
237
238 try
239 {
240 committed = sendMessage(messagesSent, message) && _transacted;
241
242 messagesSent++;
243 messagesNotCommitted++;
244
245 // Keep count of the number of messsages currently committed and pending commit.
246 if (committed)
247 {
248 log.debug("Adding " + messagesNotCommitted + " messages to the committed count.");
249 messagesCommitted += messagesNotCommitted;
250 messagesNotCommitted = 0;
251
252 System.out.println("Commited: " + messagesCommitted);
253 }
254 }
255 catch (JMSException e)
256 {
257 log.debug("Got JMSException whilst sending.");
258 _publish = false;
259 }
260
261 // Perform the arbitrary action if the number of messages sent has reached the right number.
262 if (messagesSent == numMessagesToAction)
263 {
264 System.out.println("At action point, Messages sent = " + messagesSent + ", Messages Committed = "
265 + messagesCommitted + ", Messages not Committed = " + messagesNotCommitted);
266 takeAction();
267 }
268
269 // Determine if the end condition has been met, based on the number of messages, time passed, errors on
270 // the connection or user input.
271 long now = System.nanoTime();
272
273 if ((duration != 0) && ((now - start) > duration))
274 {
275 System.out.println("Send halted because duration expired.");
276 endCondition = true;
277 }
278 else if ((numMessages != 0) && (messagesSent >= numMessages))
279 {
280 System.out.println("Send halted because # messages completed.");
281 endCondition = true;
282 }
283 else if (System.in.available() > 0)
284 {
285 System.out.println("Send halted by user input.");
286 endCondition = true;
287
288 clearConsole();
289 }
290 else if (!_publish)
291 {
292 System.out.println("Send halted by error on the connection.");
293 endCondition = true;
294 }
295 }
296
297 log.debug("messagesSent = " + messagesSent);
298 log.debug("messagesCommitted = " + messagesCommitted);
299 log.debug("messagesNotCommitted = " + messagesNotCommitted);
300
301 System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted
302 + ", Messages not Committed = " + messagesNotCommitted);
303
304 return messagesSent;
305 }
306
307 protected void closeConnection()
308 {
309 // Clean up the connection.
310 try
311 {
312 close();
313 }
314 catch (JMSException e)
315 {
316 log.debug("There was an error whilst closing the connection: " + e, e);
317 System.out.println("There was an error whilst closing the connection.");
318
319 // Ignore as did best could manage to clean up.
320 }
321 }
322
323 protected void receive(int messagesSent) throws Exception
324 {
325 // Re-establish the connection and the message consumer.
326 _queueJVMSequenceID = new AtomicInteger();
327 _queueSharedID = new AtomicInteger();
328
329 establishConnection(false, true);
330 _consumer[0].setMessageListener(null);
331 _consumerConnection[0].start();
332
333 // Try to receive all of the pings that were successfully sent.
334 int messagesReceived = 0;
335 boolean endCondition = false;
336
337 while (!endCondition)
338 {
339 // Message received = _consumer.receiveNoWait();
340 Message received = _consumer[0].receive(TIME_OUT);
341 log.debug("received = " + received);
342
343 if (received != null)
344 {
345 messagesReceived++;
346 }
347
348 // Determine if the end condition has been met, based on the number of messages and time passed since last
349 // receiving a message.
350 if (received == null)
351 {
352 System.out.println("Timed out.");
353 endCondition = true;
354 }
355 else if (messagesReceived >= messagesSent)
356 {
357 System.out.println("Got all messages.");
358 endCondition = true;
359 }
360 }
361
362 // Ensure messages received are committed.
363 if (_consTransacted)
364 {
365 try
366 {
367 _consumerSession[0].commit();
368 System.out.println("Committed for all messages received.");
369 }
370 catch (JMSException e)
371 {
372 log.debug("Error during commit: " + e, e);
373 System.out.println("Error during commit.");
374 try
375 {
376 _consumerSession[0].rollback();
377 System.out.println("Rolled back on all messages received.");
378 }
379 catch (JMSException e2)
380 {
381 log.debug("Error during rollback: " + e, e);
382 System.out.println("Error on roll back of all messages received.");
383 }
384
385 }
386 }
387
388 log.debug("messagesReceived = " + messagesReceived);
389
390 System.out.println("Messages received: " + messagesReceived);
391
392 // Clean up the connection.
393 close();
394 }
395
396 /**
397 * Clears any pending input from the console.
398 */
399 private void clearConsole()
400 {
401 try
402 {
403 BufferedReader bis = new BufferedReader(new InputStreamReader(System.in));
404
405 // System.in.skip(System.in.available());
406 while (bis.ready())
407 {
408 bis.readLine();
409 }
410 }
411 catch (IOException e)
412 { }
413 }
414
415 /**
416 * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the
417 * effect of making this pinger listen to its own pings.
418 *
419 * @return The ping destinations.
420 */
421 public List<Destination> getReplyDestinations()
422 {
423 return _pingDestinations;
424 }
425
426 /**
427 * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with
428 * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the
429 * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving
430 * message should stop, not that the application should termiante.
431 *
432 * @return A shutdown hook for the ping loop.
433 */
434 public Thread getShutdownHook()
435 {
436 return new Thread(new Runnable()
437 {
438 public void run()
439 {
440 stop();
441 terminate = true;
442 }
443 });
444 }
445
446 /**
447 * Performs an aribtrary action once the 'numMesagesToAction' count is reached on sending messages. This default
448 * implementation does nothing.
449 */
450 public void takeAction()
451 { }
452 }
|