PingDurableClient.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.ping;
022 
023 import org.apache.log4j.Logger;
024 
025 import org.apache.qpid.requestreply.PingPongProducer;
026 import org.apache.qpid.util.CommandLineParser;
027 
028 import org.apache.qpid.junit.extensions.util.MathUtils;
029 import org.apache.qpid.junit.extensions.util.ParsedProperties;
030 
031 import javax.jms.*;
032 
033 import java.io.BufferedReader;
034 import java.io.IOException;
035 import java.io.InputStreamReader;
036 import java.util.List;
037 import java.util.Properties;
038 import java.util.concurrent.atomic.AtomicInteger;
039 
040 /**
041  * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and
042  * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop
043  * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the
044  * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with
045  * failure conditions when using durable messaging.
046  *
047  <p/>The events that can stop it from sending are input from the user on the console, failure of its connection to
048  * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases
049  * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings
050  * with.
051  *
052  <p/>The event to re-connect and attempt to recieve the pings is input from the user on the console.
053  *
054  <p/>This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and
055  * additionally accepts the following parameters:
056  *
057  <p/><table><caption>Parameters</caption>
058  <tr><th> Parameter           <th> Default  <th> Comments
059  <tr><td> numMessages         <td> 100      <td> The total number of messages to send.
060  <tr><td> numMessagesToAction <td> -1       <td> The number of messages to send before taking a custom 'action'.
061  <tr><td> duration            <td> 30S      <td> The length of time to ping for. (Format dDhHmMsS, for d days, h hours,
062  *                                                 m minutes and s seconds).
063  </table>
064  *
065  <p/>This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up
066  * when no parameters are specified.
067  *
068  <p/><table><caption>Parameters</caption>
069  <tr><th> Parameter        <th> Default  <th> Comments
070  <tr><td> uniqueDests      <td> false    <td> Prevents destination names being timestamped.
071  <tr><td> transacted       <td> true     <td> Only makes sense to test with transactions.
072  <tr><td> persistent       <td> true     <td> Only makes sense to test persistent.
073  <tr><td> durableDests     <td> true     <td> Should use durable queues with persistent messages.
074  <tr><td> commitBatchSize  <td> 10
075  <tr><td> rate             <td> 20       <td> Total default test time is 5 seconds.
076  </table>
077  *
078  <p/>When a number of messages or duration is specified, this ping client will ping until the first of those limits
079  * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will
080  * wait for the second signal before receiving its pings.
081  *
082  <p/>This class provides a mechanism for extensions to add arbitrary actions, after a particular number of messages
083  * have been sent. When the number of messages equal the value set in the 'numMessagesToAction' property is method,
084  * the {@link #takeAction} method is called. By default this does nothing, but extensions of this class can provide
085  * custom behaviour with alternative implementations of this method (for example taking a backup).
086  *
087  <p><table id="crc"><caption>CRC Card</caption>
088  <tr><th> Responsibilities <th> Collaborations
089  <tr><td> Send and receive pings.
090  <tr><td> Accept user input to signal stop sending.
091  <tr><td> Accept user input to signal start receiving.
092  <tr><td> Provide feedback on pings sent versus pings received.
093  <tr><td> Provide extension point for arbitrary action on a particular message count.
094  </table>
095  */
096 public class PingDurableClient extends PingPongProducer implements ExceptionListener
097 {
098     private static final Logger log = Logger.getLogger(PingDurableClient.class);
099 
100     public static final String NUM_MESSAGES_PROPNAME = "numMessages";
101     public static final String NUM_MESSAGES_DEFAULT = "100";
102     public static final String DURATION_PROPNAME = "duration";
103     public static final String DURATION_DEFAULT = "30S";
104     public static final String NUM_MESSAGES_TO_ACTION_PROPNAME = "numMessagesToAction";
105     public static final String NUM_MESSAGES_TO_ACTION_DEFAULT = "-1";
106 
107     /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */
108     private static final long TIME_OUT = 3000;
109 
110     static
111     {
112         defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT);
113         defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT);
114         defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false");
115         defaults.setProperty(TRANSACTED_PROPNAME, "true");
116         defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
117         defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
118         defaults.setProperty(RATE_PROPNAME, "20");
119         defaults.setProperty(DURABLE_DESTS_PROPNAME, "true");
120         defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT);
121     }
122 
123     /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */
124     private int numMessages;
125 
126     /** Holds the number of messages to send before taking triggering the action. */
127     private int numMessagesToAction;
128 
129     /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */
130     private long duration;
131 
132     /** Used to indciate that this application should terminate. Set by the shutdown hook. */
133     private boolean terminate = false;
134 
135     /**
136      @throws Exception Any exceptions are allowed to fall through.
137      */
138     public PingDurableClient(Properties overridesthrows Exception
139     {
140         super(overrides);
141         log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called");
142 
143         // Extract the additional configuration parameters.
144         ParsedProperties properties = new ParsedProperties(defaults);
145         properties.putAll(overrides);
146 
147         numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME);
148         String durationSpec = properties.getProperty(DURATION_PROPNAME);
149         numMessagesToAction = properties.getPropertyAsInteger(NUM_MESSAGES_TO_ACTION_PROPNAME);
150 
151         if (durationSpec != null)
152         {
153             duration = MathUtils.parseDuration(durationSpec1000000;
154         }
155     }
156 
157     /**
158      * Starts the ping/wait/receive process.
159      *
160      @param args The command line arguments.
161      */
162     public static void main(String[] args)
163     {
164         try
165         {
166             // Create a ping producer overriding its defaults with all options passed on the command line.
167             Properties options =
168                 CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
169             PingDurableClient pingProducer = new PingDurableClient(options);
170 
171             // Create a shutdown hook to terminate the ping-pong producer.
172             Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
173 
174             // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
175             // pingProducer.getConnection().setExceptionListener(pingProducer);
176 
177             // Run the test procedure.
178             int sent = pingProducer.send();
179             pingProducer.closeConnection();
180             pingProducer.waitForUser("Press return to begin receiving the pings.");
181             pingProducer.receive(sent);
182 
183             System.exit(0);
184         }
185         catch (Exception e)
186         {
187             System.err.println(e.getMessage());
188             log.error("Top level handler caught execption.", e);
189             System.exit(1);
190         }
191     }
192 
193     /**
194      * Performs the main test procedure implemented by this ping client. See the class level comment for details.
195      */
196     protected int send() throws Exception
197     {
198         log.debug("public void sendWaitReceive(): called");
199 
200         log.debug("duration = " + duration);
201         log.debug("numMessages = " + numMessages);
202 
203         if (duration > 0)
204         {
205             System.out.println("Sending for up to " (duration / 1000000000f" seconds.");
206         }
207 
208         if (_rate > 0)
209         {
210             System.out.println("Sending at " + _rate + " messages per second.");
211         }
212 
213         if (numMessages > 0)
214         {
215             System.out.println("Sending up to " + numMessages + " messages.");
216         }
217 
218         // Establish the connection and the message producer.
219         establishConnection(true, false);
220         _connection.start();
221 
222         Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
223 
224         // Send pings until a terminating condition is received.
225         boolean endCondition = false;
226         int messagesSent = 0;
227         int messagesCommitted = 0;
228         int messagesNotCommitted = 0;
229         long start = System.nanoTime();
230 
231         // Clear console in.
232         clearConsole();
233 
234         while (!endCondition)
235         {
236             boolean committed = false;
237 
238             try
239             {
240                 committed = sendMessage(messagesSent, message&& _transacted;
241 
242                 messagesSent++;
243                 messagesNotCommitted++;
244 
245                 // Keep count of the number of messsages currently committed and pending commit.
246                 if (committed)
247                 {
248                     log.debug("Adding " + messagesNotCommitted + " messages to the committed count.");
249                     messagesCommitted += messagesNotCommitted;
250                     messagesNotCommitted = 0;
251 
252                     System.out.println("Commited: " + messagesCommitted);
253                 }
254             }
255             catch (JMSException e)
256             {
257                 log.debug("Got JMSException whilst sending.");
258                 _publish = false;
259             }
260 
261             // Perform the arbitrary action if the number of messages sent has reached the right number.
262             if (messagesSent == numMessagesToAction)
263             {
264                 System.out.println("At action point, Messages sent = " + messagesSent + ", Messages Committed = "
265                     + messagesCommitted + ", Messages not Committed = " + messagesNotCommitted);
266                 takeAction();
267             }
268 
269             // Determine if the end condition has been met, based on the number of messages, time passed, errors on
270             // the connection or user input.
271             long now = System.nanoTime();
272 
273             if ((duration != 0&& ((now - start> duration))
274             {
275                 System.out.println("Send halted because duration expired.");
276                 endCondition = true;
277             }
278             else if ((numMessages != 0&& (messagesSent >= numMessages))
279             {
280                 System.out.println("Send halted because # messages completed.");
281                 endCondition = true;
282             }
283             else if (System.in.available() 0)
284             {
285                 System.out.println("Send halted by user input.");
286                 endCondition = true;
287 
288                 clearConsole();
289             }
290             else if (!_publish)
291             {
292                 System.out.println("Send halted by error on the connection.");
293                 endCondition = true;
294             }
295         }
296 
297         log.debug("messagesSent = " + messagesSent);
298         log.debug("messagesCommitted = " + messagesCommitted);
299         log.debug("messagesNotCommitted = " + messagesNotCommitted);
300 
301         System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted
302             ", Messages not Committed = " + messagesNotCommitted);
303 
304         return messagesSent;
305     }
306 
307     protected void closeConnection()
308     {
309         // Clean up the connection.
310         try
311         {
312             close();
313         }
314         catch (JMSException e)
315         {
316             log.debug("There was an error whilst closing the connection: " + e, e);
317             System.out.println("There was an error whilst closing the connection.");
318 
319             // Ignore as did best could manage to clean up.
320         }
321     }
322 
323     protected void receive(int messagesSentthrows Exception
324     {
325         // Re-establish the connection and the message consumer.
326         _queueJVMSequenceID = new AtomicInteger();
327         _queueSharedID = new AtomicInteger();
328 
329         establishConnection(false, true);
330         _consumer[0].setMessageListener(null);
331         _consumerConnection[0].start();
332 
333         // Try to receive all of the pings that were successfully sent.
334         int messagesReceived = 0;
335         boolean endCondition = false;
336 
337         while (!endCondition)
338         {
339             // Message received = _consumer.receiveNoWait();
340             Message received = _consumer[0].receive(TIME_OUT);
341             log.debug("received = " + received);
342 
343             if (received != null)
344             {
345                 messagesReceived++;
346             }
347 
348             // Determine if the end condition has been met, based on the number of messages and time passed since last
349             // receiving a message.
350             if (received == null)
351             {
352                 System.out.println("Timed out.");
353                 endCondition = true;
354             }
355             else if (messagesReceived >= messagesSent)
356             {
357                 System.out.println("Got all messages.");
358                 endCondition = true;
359             }
360         }
361 
362         // Ensure messages received are committed.
363         if (_consTransacted)
364         {
365             try
366             {
367                 _consumerSession[0].commit();
368                 System.out.println("Committed for all messages received.");
369             }
370             catch (JMSException e)
371             {
372                 log.debug("Error during commit: " + e, e);
373                 System.out.println("Error during commit.");
374                 try
375                 {
376                     _consumerSession[0].rollback();
377                     System.out.println("Rolled back on all messages received.");
378                 }
379                 catch (JMSException e2)
380                 {
381                     log.debug("Error during rollback: " + e, e);
382                     System.out.println("Error on roll back of all messages received.");
383                 }
384 
385             }
386         }
387 
388         log.debug("messagesReceived = " + messagesReceived);
389 
390         System.out.println("Messages received: " + messagesReceived);
391 
392         // Clean up the connection.
393         close();
394     }
395 
396     /**
397      * Clears any pending input from the console.
398      */
399     private void clearConsole()
400     {
401         try
402         {
403             BufferedReader bis = new BufferedReader(new InputStreamReader(System.in));
404 
405             // System.in.skip(System.in.available());
406             while (bis.ready())
407             {
408                 bis.readLine();
409             }
410         }
411         catch (IOException e)
412         { }
413     }
414 
415     /**
416      * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the
417      * effect of making this pinger listen to its own pings.
418      *
419      @return The ping destinations.
420      */
421     public List<Destination> getReplyDestinations()
422     {
423         return _pingDestinations;
424     }
425 
426     /**
427      * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with
428      * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the
429      * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving
430      * message should stop, not that the application should termiante.
431      *
432      @return A shutdown hook for the ping loop.
433      */
434     public Thread getShutdownHook()
435     {
436         return new Thread(new Runnable()
437                 {
438                     public void run()
439                     {
440                         stop();
441                         terminate = true;
442                     }
443                 });
444     }
445 
446     /**
447      * Performs an aribtrary action once the 'numMesagesToAction' count is reached on sending messages. This default
448      * implementation does nothing.
449      */
450     public void takeAction()
451     { }
452 }