PingPongProducer.java
0001 /*
0002  *
0003  * Licensed to the Apache Software Foundation (ASF) under one
0004  * or more contributor license agreements.  See the NOTICE file
0005  * distributed with this work for additional information
0006  * regarding copyright ownership.  The ASF licenses this file
0007  * to you under the Apache License, Version 2.0 (the
0008  * "License"); you may not use this file except in compliance
0009  * with the License.  You may obtain a copy of the License at
0010  *
0011  *   http://www.apache.org/licenses/LICENSE-2.0
0012  *
0013  * Unless required by applicable law or agreed to in writing,
0014  * software distributed under the License is distributed on an
0015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0016  * KIND, either express or implied.  See the License for the
0017  * specific language governing permissions and limitations
0018  * under the License.
0019  *
0020  */
0021 package org.apache.qpid.requestreply;
0022 
0023 import org.apache.log4j.Logger;
0024 import org.apache.log4j.NDC;
0025 
0026 import org.apache.qpid.test.framework.TestUtils;
0027 
0028 import org.apache.qpid.junit.extensions.BatchedThrottle;
0029 import org.apache.qpid.junit.extensions.Throttle;
0030 import org.apache.qpid.junit.extensions.util.CommandLineParser;
0031 import org.apache.qpid.junit.extensions.util.ParsedProperties;
0032 
0033 import javax.jms.*;
0034 import javax.naming.Context;
0035 import javax.naming.InitialContext;
0036 import javax.naming.NamingException;
0037 
0038 import java.io.*;
0039 import java.net.InetAddress;
0040 import java.text.DateFormat;
0041 import java.text.SimpleDateFormat;
0042 import java.util.*;
0043 import java.util.concurrent.CountDownLatch;
0044 import java.util.concurrent.SynchronousQueue;
0045 import java.util.concurrent.TimeUnit;
0046 import java.util.concurrent.atomic.AtomicInteger;
0047 import java.util.concurrent.atomic.AtomicLong;
0048 
0049 /**
0050  * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may
0051  * either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens
0052  * to its own messages and does not send replies (see {@link org.apache.qpid.ping.PingClient}). The intention of ping
0053  * pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour
0054  * configurable.
0055  *
0056  <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This
0057  * means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation
0058  * id in the ping to be bounced back in the reply correlation id.
0059  *
0060  <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It
0061  * can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within
0062  * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover
0063  * testing. A complete list of accepted parameters, default values and comments on their usage is provided here:
0064  *
0065  <p/><table><caption>Parameters</caption>
0066  <tr><th> Parameter        <th> Default  <th> Comments
0067  <tr><td> messageSize      <td> 0        <td> Message size in bytes. Not including any headers.
0068  <tr><td> destinationName  <td> ping     <td> The root name to use to generate destination names to ping.
0069  <tr><td> persistent       <td> false    <td> Determines whether peristent delivery is used.
0070  <tr><td> transacted       <td> false    <td> Determines whether messages are sent/received in transactions.
0071  <tr><td> broker           <td> tcp://localhost:5672 <td> Determines the broker to connect to.
0072  <tr><td> virtualHost      <td> test     <td> Determines the virtual host to send all ping over.
0073  <tr><td> rate             <td> 0        <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
0074  <tr><td> verbose          <td> false    <td> The verbose flag for debugging. Prints to console on every message.
0075  <tr><td> pubsub           <td> false    <td> Whether to ping topics or queues. Uses p2p by default.
0076  <tr><td> failAfterCommit  <td> false    <td> Whether to prompt user to kill broker after a commit batch.
0077  <tr><td> failBeforeCommit <td> false    <td> Whether to prompt user to kill broker before a commit batch.
0078  <tr><td> failAfterSend    <td> false    <td> Whether to prompt user to kill broker after a send.
0079  <tr><td> failBeforeSend   <td> false    <td> Whether to prompt user to kill broker before a send.
0080  <tr><td> failOnce         <td> true     <td> Whether to prompt for failover only once.
0081  <tr><td> username         <td> guest    <td> The username to access the broker with.
0082  <tr><td> password         <td> guest    <td> The password to access the broker with.
0083  <tr><td> selector         <td> null     <td> Not used. Defines a message selector to filter pings with.
0084  <tr><td> destinationCount <td> 1        <td> The number of destinations to send pings to.
0085  <tr><td> numConsumers     <td> 1        <td> The number of consumers on each destination.
0086  <tr><td> timeout          <td> 30000    <td> In milliseconds. The timeout to stop waiting for replies.
0087  <tr><td> commitBatchSize  <td> 1        <td> The number of messages per transaction in transactional mode.
0088  <tr><td> uniqueDests      <td> true     <td> Whether each receivers only listens to one ping destination or all.
0089  <tr><td> durableDests     <td> false    <td> Whether or not durable destinations are used.
0090  <tr><td> ackMode          <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
0091  *                                               0 - SESSION_TRANSACTED
0092  *                                               1 - AUTO_ACKNOWLEDGE
0093  *                                               2 - CLIENT_ACKNOWLEDGE
0094  *                                               3 - DUPS_OK_ACKNOWLEDGE
0095  *                                               257 - NO_ACKNOWLEDGE
0096  *                                               258 - PRE_ACKNOWLEDGE
0097  <tr><td> consTransacted   <td> false    <td> Whether or not consumers use transactions. Defaults to the same value
0098  *                                              as the 'transacted' option if not seperately defined.
0099  <tr><td> consAckMode      <td> AUTO_ACK <td> The message acknowledgement mode for consumers. Defaults to the same
0100  *                                              value as 'ackMode' if not seperately defined.
0101  <tr><td> maxPending       <td> 0        <td> The maximum size in bytes, of messages sent but not yet received.
0102  *                                              Limits the volume of messages currently buffered on the client
0103  *                                              or broker. Can help scale test clients by limiting amount of buffered
0104  *                                              data to avoid out of memory errors.
0105  </table>
0106  *
0107  <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
0108  * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by
0109  * starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also
0110  * registered to terminate the ping-pong loop cleanly.
0111  *
0112  <p/><table id="crc"><caption>CRC Card</caption>
0113  <tr><th> Responsibilities <th> Collaborations
0114  <tr><td> Provide a ping and wait for all responses cycle.
0115  <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
0116  </table>
0117  *
0118  * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair.
0119  *       Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a
0120  *       block that obtains the write lock for the very last message, releases any waiting producer. Means that the last
0121  *       message waits until all other messages have been handled before releasing producers but allows messages to be
0122  *       processed concurrently, unlike the current synchronized block.
0123  */
0124 public class PingPongProducer implements Runnable, ExceptionListener
0125 {
0126     /** Used for debugging. */
0127     private static final Logger log = Logger.getLogger(PingPongProducer.class);
0128 
0129     /** Holds the name of the property to determine whether of not client id is overridden at connection time.  */
0130     public static final String OVERRIDE_CLIENT_ID_PROPNAME = "overrideClientId";
0131 
0132     /** Holds the default value of the override client id flag. */
0133     public static final String OVERRIDE_CLIENT_ID_DEAFULT = "false";
0134 
0135     /** Holds the name of the property to define the JNDI factory name with. */
0136     public static final String FACTORY_NAME_PROPNAME = "factoryName";
0137 
0138     /** Holds the default JNDI name of the connection factory. */
0139     public static final String FACTORY_NAME_DEAFULT = "local";
0140 
0141     /** Holds the name of the property to set the JNDI initial context properties with. */
0142     public static final String FILE_PROPERTIES_PROPNAME = "properties";
0143 
0144     /** Holds the default file name of the JNDI initial context properties. */
0145     public static final String FILE_PROPERTIES_DEAFULT = "perftests.properties";
0146 
0147     /** Holds the name of the property to get the test message size from. */
0148     public static final String MESSAGE_SIZE_PROPNAME = "messageSize";
0149 
0150     /** Used to set up a default message size. */
0151     public static final int MESSAGE_SIZE_DEAFULT = 0;
0152 
0153     /** Holds the name of the property to get the ping queue name from. */
0154     public static final String PING_QUEUE_NAME_PROPNAME = "destinationName";
0155 
0156     /** Holds the name of the default destination to send pings on. */
0157     public static final String PING_QUEUE_NAME_DEFAULT = "ping";
0158 
0159     /** Holds the name of the property to get the queue name postfix from. */
0160     public static final String QUEUE_NAME_POSTFIX_PROPNAME = "queueNamePostfix";
0161 
0162     /** Holds the default queue name postfix value. */
0163     public static final String QUEUE_NAME_POSTFIX_DEFAULT = "";
0164 
0165     /** Holds the name of the property to get the test delivery mode from. */
0166     public static final String PERSISTENT_MODE_PROPNAME = "persistent";
0167 
0168     /** Holds the message delivery mode to use for the test. */
0169     public static final boolean PERSISTENT_MODE_DEFAULT = false;
0170 
0171     /** Holds the name of the property to get the test transactional mode from. */
0172     public static final String TRANSACTED_PROPNAME = "transacted";
0173 
0174     /** Holds the transactional mode to use for the test. */
0175     public static final boolean TRANSACTED_DEFAULT = false;
0176 
0177     /** Holds the name of the property to get the test consumer transacted mode from. */
0178     public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted";
0179 
0180     /** Holds the consumer transactional mode default setting. */
0181     public static final boolean CONSUMER_TRANSACTED_DEFAULT = false;
0182 
0183     /** Holds the name of the property to get the test broker url from. */
0184     public static final String BROKER_PROPNAME = "broker";
0185 
0186     /** Holds the default broker url for the test. */
0187     public static final String BROKER_DEFAULT = "tcp://localhost:5672";
0188 
0189     /** Holds the name of the property to get the test broker virtual path. */
0190     public static final String VIRTUAL_HOST_PROPNAME = "virtualHost";
0191 
0192     /** Holds the default virtual path for the test. */
0193     public static final String VIRTUAL_HOST_DEFAULT = "";
0194 
0195     /** Holds the name of the property to get the message rate from. */
0196     public static final String RATE_PROPNAME = "rate";
0197 
0198     /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */
0199     public static final int RATE_DEFAULT = 0;
0200 
0201     /** Holds the name of the property to get the verbose mode proeprty from. */
0202     public static final String VERBOSE_PROPNAME = "verbose";
0203 
0204     /** Holds the default verbose mode. */
0205     public static final boolean VERBOSE_DEFAULT = false;
0206 
0207     /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */
0208     public static final String PUBSUB_PROPNAME = "pubsub";
0209 
0210     /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
0211     public static final boolean PUBSUB_DEFAULT = false;
0212 
0213     /** Holds the name of the property to get the fail after commit flag from. */
0214     public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit";
0215 
0216     /** Holds the default failover after commit test flag. */
0217     public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false;
0218 
0219     /** Holds the name of the proeprty to get the fail before commit flag from. */
0220     public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit";
0221 
0222     /** Holds the default failover before commit test flag. */
0223     public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false;
0224 
0225     /** Holds the name of the proeprty to get the fail after send flag from. */
0226     public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend";
0227 
0228     /** Holds the default failover after send test flag. */
0229     public static final boolean FAIL_AFTER_SEND_DEFAULT = false;
0230 
0231     /** Holds the name of the property to get the fail before send flag from. */
0232     public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend";
0233 
0234     /** Holds the default failover before send test flag. */
0235     public static final boolean FAIL_BEFORE_SEND_DEFAULT = false;
0236 
0237     /** Holds the name of the property to get the fail once flag from. */
0238     public static final String FAIL_ONCE_PROPNAME = "failOnce";
0239 
0240     /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */
0241     public static final boolean FAIL_ONCE_DEFAULT = true;
0242 
0243     /** Holds the name of the property to get the broker access username from. */
0244     public static final String USERNAME_PROPNAME = "username";
0245 
0246     /** Holds the default broker log on username. */
0247     public static final String USERNAME_DEFAULT = "guest";
0248 
0249     /** Holds the name of the property to get the broker access password from. */
0250     public static final String PASSWORD_PROPNAME = "password";
0251 
0252     /** Holds the default broker log on password. */
0253     public static final String PASSWORD_DEFAULT = "guest";
0254 
0255     /** Holds the name of the proeprty to get the. */
0256     public static final String SELECTOR_PROPNAME = "selector";
0257 
0258     /** Holds the default message selector. */
0259     public static final String SELECTOR_DEFAULT = "";
0260 
0261     /** Holds the name of the property to get the destination count from. */
0262     public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";
0263 
0264     /** Defines the default number of destinations to ping. */
0265     public static final int DESTINATION_COUNT_DEFAULT = 1;
0266 
0267     /** Holds the name of the property to get the number of consumers per destination from. */
0268     public static final String NUM_CONSUMERS_PROPNAME = "numConsumers";
0269 
0270     /** Defines the default number consumers per destination. */
0271     public static final int NUM_CONSUMERS_DEFAULT = 1;
0272 
0273     /** Holds the name of the property to get the waiting timeout for response messages. */
0274     public static final String TIMEOUT_PROPNAME = "timeout";
0275 
0276     /** Default time to wait before assuming that a ping has timed out. */
0277     public static final long TIMEOUT_DEFAULT = 30000;
0278 
0279     /** Holds the name of the property to get the commit batch size from. */
0280     public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize";
0281 
0282     /** Defines the default number of pings to send in each transaction when running transactionally. */
0283     public static final int TX_BATCH_SIZE_DEFAULT = 1;
0284 
0285     /** Holds the name of the property to get the unique destinations flag from. */
0286     public static final String UNIQUE_DESTS_PROPNAME = "uniqueDests";
0287 
0288     /** Defines the default value for the unique destinations property. */
0289     public static final boolean UNIQUE_DESTS_DEFAULT = true;
0290 
0291     /** Holds the name of the property to get the durable destinations flag from. */
0292     public static final String DURABLE_DESTS_PROPNAME = "durableDests";
0293 
0294     /** Defines the default value of the durable destinations flag. */
0295     public static final boolean DURABLE_DESTS_DEFAULT = false;
0296 
0297     /** Holds the name of the proeprty to get the message acknowledgement mode from. */
0298     public static final String ACK_MODE_PROPNAME = "ackMode";
0299 
0300     /** Defines the default message acknowledgement mode. */
0301     public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
0302 
0303     /** Holds the name of the property to get the consumers message acknowledgement mode from. */
0304     public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode";
0305 
0306     /** Defines the default consumers message acknowledgement mode. */
0307     public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
0308 
0309     /** Holds the name of the property to get the maximum pending message size setting from. */
0310     public static final String MAX_PENDING_PROPNAME = "maxPending";
0311 
0312     /** Defines the default value for the maximum pending message size setting. 0 means no limit. */
0313     public static final int MAX_PENDING_DEFAULT = 0;
0314 
0315     /** Defines the default prefetch size to use when consuming messages. */
0316     public static final int PREFETCH_DEFAULT = 100;
0317 
0318     /** Defines the default value of the no local flag to use when consuming messages. */
0319     public static final boolean NO_LOCAL_DEFAULT = false;
0320 
0321     /** Defines the default value of the exclusive flag to use when consuming messages. */
0322     public static final boolean EXCLUSIVE_DEFAULT = false;
0323 
0324     /** Holds the name of the property to store nanosecond timestamps in ping messages with. */
0325     public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
0326 
0327     /** Holds the default configuration properties. */
0328     public static ParsedProperties defaults = new ParsedProperties();
0329 
0330     static
0331     {
0332         defaults.setPropertyIfNull(OVERRIDE_CLIENT_ID_PROPNAME, OVERRIDE_CLIENT_ID_DEAFULT);
0333         defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT);
0334         defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT);
0335         defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
0336         defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT);
0337         defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT);
0338         defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT);
0339         defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
0340         defaults.setPropertyIfNull(QUEUE_NAME_POSTFIX_PROPNAME, QUEUE_NAME_POSTFIX_DEFAULT);
0341         defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
0342         defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
0343         defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT);
0344         defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
0345         defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
0346         defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT);
0347         defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
0348         defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
0349         defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
0350         defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT);
0351         defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
0352         defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT);
0353         defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT);
0354         defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT);
0355         defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT);
0356         defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT);
0357         defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
0358         defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
0359         defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT);
0360         defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
0361         defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
0362         defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
0363     }
0364 
0365     /** Allows setting of client ID on the connection, rather than through the connection URL. */
0366     protected boolean _overrideClientId;
0367 
0368     /** Holds the JNDI name of the JMS connection factory. */
0369     protected String _factoryName;
0370 
0371     /** Holds the name of the properties file to configure JNDI with. */
0372     protected String _fileProperties;
0373 
0374     /** Holds the broker url. */
0375     protected String _brokerDetails;
0376 
0377     /** Holds the username to access the broker with. */
0378     protected String _username;
0379 
0380     /** Holds the password to access the broker with. */
0381     protected String _password;
0382 
0383     /** Holds the virtual host on the broker to run the tests through. */
0384     protected String _virtualpath;
0385 
0386     /** Holds the root name from which to generate test destination names. */
0387     protected String _destinationName;
0388 
0389     /** Holds the default queue name postfix value. */
0390     protected String _queueNamePostfix;
0391 
0392     /** Holds the message selector to filter the pings with. */
0393     protected String _selector;
0394 
0395     /** Holds the producers transactional mode flag. */
0396     protected boolean _transacted;
0397 
0398     /** Holds the consumers transactional mode flag. */
0399     protected boolean _consTransacted;
0400 
0401     /** Determines whether this producer sends persistent messages. */
0402     protected boolean _persistent;
0403 
0404     /** Holds the acknowledgement mode used for the producers. */
0405     protected int _ackMode;
0406 
0407     /** Holds the acknowledgement mode setting for the consumers. */
0408     protected int _consAckMode;
0409 
0410     /** Determines what size of messages this producer sends. */
0411     protected int _messageSize;
0412 
0413     /** Used to indicate that the ping loop should print out whenever it pings. */
0414     protected boolean _verbose;
0415 
0416     /** Flag used to indicate if this is a point to point or pub/sub ping client. */
0417     protected boolean _isPubSub;
0418 
0419     /** Flag used to indicate if the destinations should be unique client. */
0420     protected boolean _isUnique;
0421 
0422     /** Flag used to indicate that durable destination should be used. */
0423     protected boolean _isDurable;
0424 
0425     /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
0426     protected boolean _failBeforeCommit;
0427 
0428     /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
0429     protected boolean _failAfterCommit;
0430 
0431     /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
0432     protected boolean _failBeforeSend;
0433 
0434     /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
0435     protected boolean _failAfterSend;
0436 
0437     /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
0438     protected boolean _failOnce;
0439 
0440     /** Holds the number of sends that should be performed in every transaction when using transactions. */
0441     protected int _txBatchSize;
0442 
0443     /** Holds the number of destinations to ping. */
0444     protected int _noOfDestinations;
0445 
0446     /** Holds the number of consumers per destination. */
0447     protected int _noOfConsumers;
0448 
0449     /** Holds the maximum send rate in herz. */
0450     protected int _rate;
0451 
0452     /**
0453      * Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended
0454      * if this limit is breached.
0455      */
0456     protected int _maxPendingSize;
0457 
0458     /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
0459     private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
0460 
0461     /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */
0462     private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0);
0463 
0464     /** Holds this instances unique id. */
0465     private int instanceId;
0466 
0467     /**
0468      * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple
0469      * ping producers on the same JVM.
0470      */
0471     private static Map<String, PerCorrelationId> perCorrelationIds =
0472         Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
0473 
0474     /** A convenient formatter to use when time stamping output. */
0475     protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
0476 
0477     /** Holds the connection for the message producer. */
0478     protected Connection _connection;
0479 
0480     /** Holds the consumer connections. */
0481     protected Connection[] _consumerConnection;
0482 
0483     /** Holds the controlSession on which ping replies are received. */
0484     protected Session[] _consumerSession;
0485 
0486     /** Holds the producer controlSession, needed to create ping messages. */
0487     protected Session _producerSession;
0488 
0489     /** Holds the destination where the response messages will arrive. */
0490     protected Destination _replyDestination;
0491 
0492     /** Holds the set of destinations that this ping producer pings. */
0493     protected List<Destination> _pingDestinations;
0494 
0495     /** Used to restrict the sending rate to a specified limit. */
0496     protected Throttle _rateLimiter;
0497 
0498     /** Holds a message listener that this message listener chains all its messages to. */
0499     protected ChainedMessageListener _chainedMessageListener = null;
0500 
0501     /**
0502      * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
0503      * creating multiple ping producers in the same JVM.
0504      */
0505     protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
0506 
0507     /**
0508      * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
0509      * on the same JVM using this id generator will allow them to ping on the same queues.
0510      */
0511     protected AtomicInteger _queueSharedID = new AtomicInteger();
0512 
0513     /** Used to tell the ping loop when to terminate, it only runs while this is true. */
0514     protected boolean _publish = true;
0515 
0516     /** Holds the message producer to send the pings through. */
0517     protected MessageProducer _producer;
0518 
0519     /** Holds the message consumer to receive the ping replies through. */
0520     protected MessageConsumer[] _consumer;
0521 
0522     /** The prompt to display when asking the user to kill the broker for failover testing. */
0523     private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
0524 
0525     /** Holds the name for this test client to be identified to the broker with. */
0526     private String _clientID;
0527 
0528     /** Keeps count of the total messages sent purely for debugging purposes. */
0529     private static AtomicInteger numSent = new AtomicInteger();
0530 
0531     /**
0532      * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
0533      * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a
0534      * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an
0535      * equal chance to produce messages.
0536      */
0537     static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true);
0538 
0539     /** Keeps a count of the number of message currently sent but not received. */
0540     static AtomicInteger _unreceived = new AtomicInteger(0);
0541 
0542     /**
0543      * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
0544      * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
0545      * it, to send and recieve its pings and replies on.
0546      *
0547      @param overrides Properties containing any desired overrides to the defaults.
0548      *
0549      @throws Exception Any exceptions are allowed to fall through.
0550      */
0551     public PingPongProducer(Properties overridesthrows Exception
0552     {
0553         // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
0554         instanceId = _instanceIdGenerator.getAndIncrement();
0555 
0556         // Create a set of parsed properties from the defaults overriden by the passed in values.
0557         ParsedProperties properties = new ParsedProperties(defaults);
0558         properties.putAll(overrides);
0559 
0560         // Extract the configuration properties to set the pinger up with.
0561         _overrideClientId = properties.getPropertyAsBoolean(OVERRIDE_CLIENT_ID_PROPNAME);
0562         _factoryName = properties.getProperty(FACTORY_NAME_PROPNAME);
0563         _fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME);
0564         _brokerDetails = properties.getProperty(BROKER_PROPNAME);
0565         _username = properties.getProperty(USERNAME_PROPNAME);
0566         _password = properties.getProperty(PASSWORD_PROPNAME);
0567         _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME);
0568         _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
0569         _queueNamePostfix = properties.getProperty(QUEUE_NAME_POSTFIX_PROPNAME);
0570         _selector = properties.getProperty(SELECTOR_PROPNAME);
0571         _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
0572         _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME);
0573         _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
0574         _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
0575         _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
0576         _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME);
0577         _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME);
0578         _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME);
0579         _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME);
0580         _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME);
0581         _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME);
0582         _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME);
0583         _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME);
0584         _rate = properties.getPropertyAsInteger(RATE_PROPNAME);
0585         _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
0586         _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
0587         _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
0588         _ackMode = _transacted ? : properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
0589         _consAckMode = _consTransacted ? : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
0590         _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
0591 
0592         // Check that one or more destinations were specified.
0593         if (_noOfDestinations < 1)
0594         {
0595             throw new IllegalArgumentException("There must be at least one destination.");
0596         }
0597 
0598         // Set up a throttle to control the send rate, if a rate > 0 is specified.
0599         if (_rate > 0)
0600         {
0601             _rateLimiter = new BatchedThrottle();
0602             _rateLimiter.setRate(_rate);
0603         }
0604 
0605         // Create the connection and message producers/consumers.
0606         // establishConnection(true, true);
0607     }
0608 
0609     /**
0610      * Establishes a connection to the broker and creates message consumers and producers based on the parameters
0611      * that this ping client was created with.
0612      *
0613      @param producer Flag to indicate whether or not the producer should be set up.
0614      @param consumer Flag to indicate whether or not the consumers should be set up.
0615      *
0616      @throws Exception Any exceptions are allowed to fall through.
0617      */
0618     public void establishConnection(boolean producer, boolean consumerthrows Exception
0619     {
0620         // log.debug("public void establishConnection(): called");
0621 
0622         // Generate a unique identifying name for this client, based on it ip address and the current time.
0623         InetAddress address = InetAddress.getLocalHost();
0624         // _clientID = address.getHostName() + System.currentTimeMillis();
0625         _clientID = "perftest_" + instanceId;
0626 
0627         // Create a connection to the broker.
0628         createConnection(_clientID);
0629 
0630         // Create transactional or non-transactional sessions, based on the command line arguments.
0631         _producerSession = _connection.createSession(_transacted, _ackMode);
0632 
0633         _consumerSession = new Session[_noOfConsumers];
0634 
0635         for (int i = 0; i < _noOfConsumers; i++)
0636         {
0637             _consumerSession[i= _consumerConnection[i].createSession(_consTransacted, _consAckMode);
0638         }
0639 
0640         // Create the destinations to send pings to and receive replies from.
0641         _replyDestination = _consumerSession[0].createTemporaryQueue();
0642         createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
0643 
0644         // Create the message producer only if instructed to.
0645         if (producer)
0646         {
0647             createProducer();
0648         }
0649 
0650         // Create the message consumer only if instructed to.
0651         if (consumer)
0652         {
0653             createReplyConsumers(getReplyDestinations(), _selector);
0654         }
0655     }
0656 
0657     /**
0658      * Establishes a connection to the broker, based on the configuration parameters that this ping client was
0659      * created with.
0660      *
0661      @param clientID The clients identifier.
0662      *
0663      @throws JMSException Underlying exceptions allowed to fall through.
0664      @throws NamingException Underlying exceptions allowed to fall through.
0665      @throws IOException Underlying exceptions allowed to fall through.
0666      */
0667     protected void createConnection(String clientIDthrows JMSException, NamingException, IOException
0668     {
0669         // _log.debug("protected void createConnection(String clientID = " + clientID + "): called");
0670 
0671         // _log.debug("Creating a connection for the message producer.");
0672         File propsFile = new File(_fileProperties);
0673         InputStream is = new FileInputStream(propsFile);
0674         Properties properties = new Properties();
0675         properties.load(is);
0676 
0677         Context context = new InitialContext(properties);
0678         ConnectionFactory factory = (ConnectionFactorycontext.lookup(_factoryName);
0679         _connection = factory.createConnection(_username, _password);
0680 
0681         if (_overrideClientId)
0682         {
0683             _connection.setClientID(clientID);
0684         }
0685 
0686         // _log.debug("Creating " + _noOfConsumers + " connections for the consumers.");
0687 
0688         _consumerConnection = new Connection[_noOfConsumers];
0689 
0690         for (int i = 0; i < _noOfConsumers; i++)
0691         {
0692             _consumerConnection[i= factory.createConnection(_username, _password);
0693             // _consumerConnection[i].setClientID(clientID);
0694         }
0695     }
0696 
0697     /**
0698      * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
0699      * to be started to bounce the pings back again.
0700      *
0701      @param args The command line arguments.
0702      */
0703     public static void main(String[] args)
0704     {
0705         try
0706         {
0707             Properties options =
0708                 CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
0709 
0710             // Create a ping producer overriding its defaults with all options passed on the command line.
0711             PingPongProducer pingProducer = new PingPongProducer(options);
0712             pingProducer.establishConnection(true, true);
0713 
0714             // Start the ping producers dispatch thread running.
0715             pingProducer._connection.start();
0716 
0717             // Create a shutdown hook to terminate the ping-pong producer.
0718             Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
0719 
0720             // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
0721             pingProducer._connection.setExceptionListener(pingProducer);
0722 
0723             // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
0724             Thread pingThread = new Thread(pingProducer);
0725             pingThread.run();
0726             pingThread.join();
0727         }
0728         catch (Exception e)
0729         {
0730             System.err.println(e.getMessage());
0731             log.error("Top level handler caught execption.", e);
0732             System.exit(1);
0733         }
0734     }
0735 
0736     /**
0737      * Convenience method for a short pause.
0738      *
0739      @param sleepTime The time in milliseconds to pause for.
0740      */
0741     public static void pause(long sleepTime)
0742     {
0743         if (sleepTime > 0)
0744         {
0745             try
0746             {
0747                 Thread.sleep(sleepTime);
0748             }
0749             catch (InterruptedException ie)
0750             { }
0751         }
0752     }
0753 
0754     /**
0755      * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply to
0756      * destination of this pinger.
0757      *
0758      @return The single reply to destination of this pinger, wrapped in a list.
0759      */
0760     public List<Destination> getReplyDestinations()
0761     {
0762         // log.debug("public List<Destination> getReplyDestinations(): called");
0763 
0764         List<Destination> replyDestinations = new ArrayList<Destination>();
0765         replyDestinations.add(_replyDestination);
0766 
0767         // log.debug("replyDestinations = " + replyDestinations);
0768 
0769         return replyDestinations;
0770     }
0771 
0772     /**
0773      * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery
0774      * flag is set accoring the ping producer creation options.
0775      *
0776      @throws JMSException Any JMSExceptions are allowed to fall through.
0777      */
0778     public void createProducer() throws JMSException
0779     {
0780         // log.debug("public void createProducer(): called");
0781 
0782         _producer = (MessageProducer_producerSession.createProducer(null);
0783         _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
0784 
0785         // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
0786     }
0787 
0788     /**
0789      * Creates consumers for the specified number of destinations. The destinations themselves are also created by this
0790      * method.
0791      *
0792      @param noOfDestinations The number of destinations to create consumers for.
0793      @param selector         The message selector to filter the consumers with.
0794      @param rootName         The root of the name, or actual name if only one is being created.
0795      @param unique           <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share the
0796      *                         numbering with all pingers on the same JVM.
0797      @param durable          If the destinations are durable topics.
0798      *
0799      @throws JMSException Any JMSExceptions are allowed to fall through.
0800      */
0801     public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
0802         boolean durablethrows JMSException
0803     {
0804         /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
0805             + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
0806             + durable + "): called");*/
0807 
0808         _pingDestinations = new ArrayList<Destination>();
0809 
0810         // Create the desired number of ping destinations and consumers for them.
0811         // log.debug("Creating " + noOfDestinations + " destinations to ping.");
0812 
0813         for (int i = 0; i < noOfDestinations; i++)
0814         {
0815             Destination destination;
0816             String id;
0817 
0818             // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
0819             if (unique)
0820             {
0821                 // log.debug("Creating unique destinations.");
0822                 id = "_" + _queueJVMSequenceID.incrementAndGet() "_" + _connection.getClientID();
0823             }
0824             else
0825             {
0826                 // log.debug("Creating shared destinations.");
0827                 id = "_" + _queueSharedID.incrementAndGet();
0828             }
0829 
0830             // Check if this is a pub/sub pinger, in which case create topics.
0831             if (_isPubSub)
0832             {
0833                 destination = _producerSession.createTopic(rootName + id);
0834                 // log.debug("Created non-durable topic " + destination);
0835 
0836                 if (durable)
0837                 {
0838                     _producerSession.createDurableSubscriber((Topicdestination, _connection.getClientID());
0839                 }
0840             }
0841             // Otherwise this is a p2p pinger, in which case create queues.
0842             else
0843             {
0844                 destination = _producerSession.createQueue(rootName + id + _queueNamePostfix);
0845                 // log.debug("Created queue " + destination);
0846             }
0847 
0848             // Keep the destination.
0849             _pingDestinations.add(destination);
0850         }
0851     }
0852 
0853     /**
0854      * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
0855      *
0856      @param destinations The destinations to listen to.
0857      @param selector     A selector to filter the messages with.
0858      *
0859      @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
0860      */
0861     public void createReplyConsumers(Collection<Destination> destinations, String selectorthrows JMSException
0862     {
0863         /*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
0864             + ", String selector = " + selector + "): called");*/
0865 
0866         log.debug("There are " + destinations.size() " destinations.");
0867         log.debug("Creating " + _noOfConsumers + " consumers on each destination.");
0868         log.debug("Total number of consumers is: " (destinations.size() * _noOfConsumers));
0869 
0870         for (Destination destination : destinations)
0871         {
0872             _consumer = new MessageConsumer[_noOfConsumers];
0873 
0874             for (int i = 0; i < _noOfConsumers; i++)
0875             {
0876                 // Create a consumer for the destination and set this pinger to listen to its messages.
0877                 _consumer[i= _consumerSession[i].createConsumer(destination, selector, NO_LOCAL_DEFAULT);
0878 
0879                 final int consumerNo = i;
0880 
0881                 _consumer[i].setMessageListener(new MessageListener()
0882                     {
0883                         public void onMessage(Message message)
0884                         {
0885                             onMessageWithConsumerNo(message, consumerNo);
0886                         }
0887                     });
0888 
0889                 log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination);
0890             }
0891         }
0892     }
0893 
0894     /**
0895      * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
0896      * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the
0897      * replies map.
0898      *
0899      @param message    The received message.
0900      @param consumerNo The consumer number within this test pinger instance.
0901      */
0902     public void onMessageWithConsumerNo(Message message, int consumerNo)
0903     {
0904         // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called");
0905         try
0906         {
0907             long now = System.nanoTime();
0908             long timestamp = getTimestamp(message);
0909             long pingTime = now - timestamp;
0910 
0911             // NDC.push("id" + instanceId + "/cons" + consumerNo);
0912 
0913             // Extract the messages correlation id.
0914             String correlationID = message.getJMSCorrelationID();
0915             // log.debug("correlationID = " + correlationID);
0916 
0917             // int num = message.getIntProperty("MSG_NUM");
0918             // log.info("Message " + num + " received.");
0919 
0920             boolean isRedelivered = message.getJMSRedelivered();
0921             // log.debug("isRedelivered = " + isRedelivered);
0922 
0923             if (!isRedelivered)
0924             {
0925                 // Countdown on the traffic light if there is one for the matching correlation id.
0926                 PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
0927 
0928                 if (perCorrelationId != null)
0929                 {
0930                     CountDownLatch trafficLight = perCorrelationId.trafficLight;
0931 
0932                     // Restart the timeout timer on every message.
0933                     perCorrelationId.timeOutStart = System.nanoTime();
0934 
0935                     // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
0936 
0937                     // Release waiting senders if there are some and using maxPending limit.
0938                     if ((_maxPendingSize > 0))
0939                     {
0940                         // Decrement the count of sent but not yet received messages.
0941                         int unreceived = _unreceived.decrementAndGet();
0942                         int unreceivedSize =
0943                             (unreceived * ((_messageSize == 0: _messageSize))
0944                             (_isPubSub ? getConsumersPerDestination() 1);
0945 
0946                         // log.debug("unreceived = " + unreceived);
0947                         // log.debug("unreceivedSize = " + unreceivedSize);
0948 
0949                         // synchronized (_sendPauseMonitor)
0950                         // {
0951                         if (unreceivedSize < _maxPendingSize)
0952                         {
0953                             _sendPauseMonitor.poll();
0954                         }
0955                         // }
0956                     }
0957 
0958                     // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
0959                     // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
0960                     // ensures that each thread will get a unique value for the remaining messages.
0961                     long trueCount;
0962                     long remainingCount;
0963 
0964                     synchronized (trafficLight)
0965                     {
0966                         trafficLight.countDown();
0967 
0968                         trueCount = trafficLight.getCount();
0969                         remainingCount = trueCount - 1;
0970 
0971                         // NDC.push("/rem" + remainingCount);
0972 
0973                         // log.debug("remainingCount = " + remainingCount);
0974                         // log.debug("trueCount = " + trueCount);
0975 
0976                         // Commit on transaction batch size boundaries. At this point in time the waiting producer
0977                         // remains blocked, even on the last message.
0978                         // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on
0979                         // each batch boundary. For pub/sub each consumer gets every message so no division is done.
0980                         // When running in client ack mode, an ack is done instead of a commit, on the commit batch
0981                         // size boundaries.
0982                         long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
0983                         // log.debug("commitCount = " + commitCount);
0984 
0985                         if ((commitCount % _txBatchSize== 0)
0986                         {
0987                             if (_consAckMode == 2)
0988                             {
0989                                 // log.debug("Doing client ack for consumer " + consumerNo + ".");
0990                                 message.acknowledge();
0991                             }
0992                             else
0993                             {
0994                                 // log.debug("Trying commit for consumer " + consumerNo + ".");
0995                                 commitTx(_consumerSession[consumerNo]);
0996                                 // log.info("Tx committed on consumer " + consumerNo);
0997                             }
0998                         }
0999 
1000                         // Forward the message and remaining count to any interested chained message listener.
1001                         if (_chainedMessageListener != null)
1002                         {
1003                             _chainedMessageListener.onMessage(message, (intremainingCount, pingTime);
1004                         }
1005 
1006                         // Check if this is the last message, in which case release any waiting producers. This is done
1007                         // after the transaction has been committed and any listeners notified.
1008                         if (trueCount == 1)
1009                         {
1010                             trafficLight.countDown();
1011                         }
1012                     }
1013                 }
1014                 else
1015                 {
1016                     log.warn("Got unexpected message with correlationId: " + correlationID);
1017                 }
1018             }
1019             else
1020             {
1021                 log.warn("Got redelivered message, ignoring.");
1022             }
1023         }
1024         catch (JMSException e)
1025         {
1026             log.warn("There was a JMSException: " + e.getMessage(), e);
1027         }
1028         finally
1029         {
1030             // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending");
1031             // NDC.clear();
1032         }
1033     }
1034 
1035     /**
1036      * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
1037      * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
1038      * the correlation id.
1039      *
1040      @param message              The message to send. If this is null, one is generated.
1041      @param numPings             The number of ping messages to send.
1042      @param timeout              The timeout in milliseconds.
1043      @param messageCorrelationId The message correlation id. If this is null, one is generated.
1044      *
1045      @return The number of replies received. This may be less than the number sent if the timeout terminated the wait
1046      *         for all prematurely.
1047      *
1048      @throws JMSException         All underlying JMSExceptions are allowed to fall through.
1049      @throws InterruptedException When interrupted by a timeout
1050      */
1051     public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
1052         throws JMSException, InterruptedException
1053     {
1054         /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
1055             + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
1056 
1057         // Generate a unique correlation id to put on the messages before sending them, if one was not specified.
1058         if (messageCorrelationId == null)
1059         {
1060             messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
1061         }
1062 
1063         try
1064         {
1065             // NDC.push("prod");
1066 
1067             // Create a count down latch to count the number of replies with. This is created before the messages are
1068             // sent so that the replies cannot be received before the count down is created.
1069             // One is added to this, so that the last reply becomes a special case. The special case is that the
1070             // chained message listener must be called before this sender can be unblocked, but that decrementing the
1071             // countdown needs to be done before the chained listener can be called.
1072             PerCorrelationId perCorrelationId = new PerCorrelationId();
1073 
1074             perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings1);
1075             perCorrelationIds.put(messageCorrelationId, perCorrelationId);
1076 
1077             // Set up the current time as the start time for pinging on the correlation id. This is used to determine
1078             // timeouts.
1079             perCorrelationId.timeOutStart = System.nanoTime();
1080 
1081             // Send the specifed number of messages.
1082             pingNoWaitForReply(message, numPings, messageCorrelationId);
1083 
1084             boolean timedOut;
1085             boolean allMessagesReceived;
1086             int numReplies;
1087 
1088             do
1089             {
1090                 // Block the current thread until replies to all the messages are received, or it times out.
1091                 perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);
1092 
1093                 // Work out how many replies were receieved.
1094                 numReplies = getExpectedNumPings(numPings(intperCorrelationId.trafficLight.getCount();
1095 
1096                 allMessagesReceived = numReplies == getExpectedNumPings(numPings);
1097 
1098                 // log.debug("numReplies = " + numReplies);
1099                 // log.debug("allMessagesReceived = " + allMessagesReceived);
1100 
1101                 // Recheck the timeout condition.
1102                 long now = System.nanoTime();
1103                 long lastMessageReceievedAt = perCorrelationId.timeOutStart;
1104                 timedOut = (now - lastMessageReceievedAt(timeout * 1000000);
1105 
1106                 // log.debug("now = " + now);
1107                 // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
1108             }
1109             while (!timedOut && !allMessagesReceived);
1110 
1111             if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
1112             {
1113                 log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
1114             }
1115             else if (_verbose)
1116             {
1117                 log.info("Got all replies on id, " + messageCorrelationId);
1118             }
1119 
1120             // commitTx(_consumerSession);
1121 
1122             // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
1123 
1124             return numReplies;
1125         }
1126         // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived,
1127         // so will be a memory leak if this is not done.
1128         finally
1129         {
1130             // NDC.pop();
1131             perCorrelationIds.remove(messageCorrelationId);
1132         }
1133     }
1134 
1135     /**
1136      * Sends the specified number of ping messages and does not wait for correlating replies.
1137      *
1138      @param message              The message to send.
1139      @param numPings             The number of pings to send.
1140      @param messageCorrelationId A correlation id to place on all messages sent.
1141      *
1142      @throws JMSException All underlying JMSExceptions are allowed to fall through.
1143      */
1144     public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationIdthrows JMSException
1145     {
1146         /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
1147             + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
1148 
1149         if (message == null)
1150         {
1151             message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
1152         }
1153 
1154         message.setJMSCorrelationID(messageCorrelationId);
1155 
1156         // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
1157         // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is
1158         // needed.
1159         boolean committed = false;
1160 
1161         // Send all of the ping messages.
1162         for (int i = 0; i < numPings; i++)
1163         {
1164             // Re-timestamp the message.
1165             // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
1166 
1167             // Send the message, passing in the message count.
1168             committed = sendMessage(i, message);
1169 
1170             // Spew out per message timings on every message sonly in verbose mode.
1171             /*if (_verbose)
1172             {
1173                 log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
1174             }*/
1175         }
1176 
1177         // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages.
1178         if (!committed)
1179         {
1180             commitTx(_producerSession);
1181         }
1182     }
1183 
1184     /**
1185      * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of
1186      * messages sent so far must be specified and is used to round robin the ping destinations (where there are more
1187      * than one), and to determine if the transaction batch size has been reached and the sent messages should be
1188      * committed.
1189      *
1190      @param i       The count of messages sent so far in a loop of multiple calls to this send method.
1191      @param message The message to send.
1192      *
1193      @return <tt>true</tt> if the messages were committed, <tt>false</tt> otherwise.
1194      *
1195      @throws JMSException All underlyiung JMSExceptions are allowed to fall through.
1196      */
1197     protected boolean sendMessage(int i, Message messagethrows JMSException
1198     {
1199         try
1200         {
1201             NDC.push("id" + instanceId + "/prod");
1202 
1203             // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
1204             // log.debug("_txBatchSize = " + _txBatchSize);
1205 
1206             // Round robin the destinations as the messages are sent.
1207             Destination destination = _pingDestinations.get(i % _pingDestinations.size());
1208 
1209             // Prompt the user to kill the broker when doing failover testing.
1210             _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
1211 
1212             // Get the test setup for the correlation id.
1213             String correlationID = message.getJMSCorrelationID();
1214             PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
1215 
1216             // If necessary, wait until the max pending message size comes within its limit.
1217             if (_maxPendingSize > 0)
1218             {
1219                 synchronized (_sendPauseMonitor)
1220                 {
1221                     // Used to keep track of the number of times that send has to wait.
1222                     int numWaits = 0;
1223 
1224                     // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with
1225                     // the test timeout.
1226                     int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
1227 
1228                     while (true)
1229                     {
1230                         // Get the size estimate of sent but not yet received messages.
1231                         int unreceived = _unreceived.get();
1232                         int unreceivedSize =
1233                             (unreceived * ((_messageSize == 0: _messageSize))
1234                             (_isPubSub ? getConsumersPerDestination() 1);
1235 
1236                         // log.debug("unreceived = " + unreceived);
1237                         // log.debug("unreceivedSize = " + unreceivedSize);
1238                         // log.debug("_maxPendingSize = " + _maxPendingSize);
1239 
1240                         if (unreceivedSize > _maxPendingSize)
1241                         {
1242                             // log.debug("unreceived size estimate over limit = " + unreceivedSize);
1243 
1244                             // Fail the test if the send has had to wait more than the maximum allowed number of times.
1245                             if (numWaits > waitLimit)
1246                             {
1247                                 String errorMessage =
1248                                     "Send has had to wait for the unreceivedSize (" + unreceivedSize
1249                                     ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit
1250                                     " times.";
1251                                 log.warn(errorMessage);
1252                                 throw new RuntimeException(errorMessage);
1253                             }
1254 
1255                             // Wait on the send pause barrier for the limit to be re-established.
1256                             try
1257                             {
1258                                 long start = System.nanoTime();
1259                                 // _sendPauseMonitor.wait(10000);
1260                                 _sendPauseMonitor.offer(new Object()10000, TimeUnit.MILLISECONDS);
1261                                 long end = System.nanoTime();
1262 
1263                                 // Count the wait only if it was for > 99% of the requested wait time.
1264                                 if (((float) (end - start(float) (10000 1000000L)) 0.99)
1265                                 {
1266                                     numWaits++;
1267                                 }
1268                             }
1269                             catch (InterruptedException e)
1270                             {
1271                                 // Restore the interrupted status
1272                                 Thread.currentThread().interrupt();
1273                                 throw new RuntimeException(e);
1274                             }
1275                         }
1276                         else
1277                         {
1278                             break;
1279                         }
1280                     }
1281                 }
1282             }
1283 
1284             // Send the message either to its round robin destination, or its default destination.
1285             // int num = numSent.incrementAndGet();
1286             // message.setIntProperty("MSG_NUM", num);
1287             setTimestamp(message);
1288 
1289             if (destination == null)
1290             {
1291                 _producer.send(message);
1292             }
1293             else
1294             {
1295                 _producer.send(destination, message);
1296             }
1297 
1298             // Increase the unreceived size, this may actually happen after the message is received.
1299             // The unreceived size is incremented by the number of consumers that will get a copy of the message,
1300             // in pub/sub mode.
1301             if (_maxPendingSize > 0)
1302             {
1303                 int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() 1);
1304                 // log.debug("newUnreceivedCount = " + newUnreceivedCount);
1305             }
1306 
1307             // Apply message rate throttling if a rate limit has been set up.
1308             if (_rateLimiter != null)
1309             {
1310                 _rateLimiter.throttle();
1311             }
1312 
1313             // Call commit every time the commit batch size is reached.
1314             boolean committed = false;
1315 
1316             // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
1317             if (((i + 1% _txBatchSize== 0)
1318             {
1319                 // log.debug("Trying commit on producer session.");
1320                 committed = commitTx(_producerSession);
1321             }
1322 
1323             return committed;
1324         }
1325         finally
1326         {
1327             NDC.clear();
1328         }
1329     }
1330 
1331     /**
1332      * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the
1333      * test that the failure has occurred, before the method returns.
1334      *
1335      @param failFlag The fail flag to test.
1336      *
1337      @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only
1338      *         used once, then reset.
1339      */
1340     private boolean waitForUserToPromptOnFailure(boolean failFlag)
1341     {
1342         if (failFlag)
1343         {
1344             if (_failOnce)
1345             {
1346                 failFlag = false;
1347             }
1348 
1349             // log.debug("Failing Before Send");
1350             waitForUser(KILL_BROKER_PROMPT);
1351         }
1352 
1353         return failFlag;
1354     }
1355 
1356     /**
1357      * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch
1358      * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will
1359      * terminate the pinger.
1360      */
1361     public void pingLoop()
1362     {
1363         try
1364         {
1365             // Generate a sample message and time stamp it.
1366             Message msg = getTestMessage(_replyDestination, _messageSize, _persistent);
1367             // setTimestamp(msg);
1368 
1369             // Send the message and wait for a reply.
1370             pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null);
1371         }
1372         catch (JMSException e)
1373         {
1374             _publish = false;
1375             // log.debug("There was a JMSException: " + e.getMessage(), e);
1376         }
1377         catch (InterruptedException e)
1378         {
1379             _publish = false;
1380             // log.debug("There was an interruption: " + e.getMessage(), e);
1381         }
1382     }
1383 
1384     /**
1385      * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
1386      * here.
1387      *
1388      @param messageListener The chained message listener.
1389      */
1390     public void setChainedMessageListener(ChainedMessageListener messageListener)
1391     {
1392         _chainedMessageListener = messageListener;
1393     }
1394 
1395     /** Removes any chained message listeners from this pinger. */
1396     public void removeChainedMessageListener()
1397     {
1398         _chainedMessageListener = null;
1399     }
1400 
1401     /**
1402      * Generates a test message of the specified size, with the specified reply-to destination and persistence flag.
1403      *
1404      @param replyQueue  The reply-to destination for the message.
1405      @param messageSize The desired size of the message in bytes.
1406      @param persistent  <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
1407      *
1408      @return A freshly generated test message.
1409      *
1410      @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
1411      */
1412     public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistentthrows JMSException
1413     {
1414         // return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
1415         return TestUtils.createTestMessageOfSize(_producerSession, messageSize);
1416     }
1417 
1418     /**
1419      * Sets the current time in nanoseconds as the timestamp on the message.
1420      *
1421      @param msg The message to timestamp.
1422      *
1423      @throws JMSException Any JMSExceptions are allowed to fall through.
1424      */
1425     protected void setTimestamp(Message msgthrows JMSException
1426     {
1427         /*if (((AMQSession)_producerSession).isStrictAMQP())
1428         {
1429             ((AMQMessage)msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime());
1430         }
1431         else
1432         {*/
1433         msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
1434         // }
1435     }
1436 
1437     /**
1438      * Extracts the nanosecond timestamp from a message.
1439      *
1440      @param msg The message to extract the time stamp from.
1441      *
1442      @return The timestamp in nanos.
1443      *
1444      @throws JMSException Any JMSExceptions are allowed to fall through.
1445      */
1446     protected long getTimestamp(Message msgthrows JMSException
1447     {
1448         /*if (((AMQSession)_producerSession).isStrictAMQP())
1449         {
1450             Long value = ((AMQMessage)msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME));
1451 
1452             return (value == null) ? 0L : value;
1453         }
1454         else
1455         {*/
1456         return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
1457             // }
1458     }
1459 
1460     /**
1461      * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag
1462      * has been cleared.
1463      */
1464     public void stop()
1465     {
1466         _publish = false;
1467     }
1468 
1469     /**
1470      * Starts the producer and consumer connections.
1471      *
1472      @throws JMSException Any JMSExceptions are allowed to fall through.
1473      */
1474     public void start() throws JMSException
1475     {
1476         // log.debug("public void start(): called");
1477 
1478         _connection.start();
1479         // log.debug("Producer started.");
1480 
1481         for (int i = 0; i < _noOfConsumers; i++)
1482         {
1483             _consumerConnection[i].start();
1484             // log.debug("Consumer " + i + " started.");
1485         }
1486     }
1487 
1488     /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */
1489     public void run()
1490     {
1491         // Keep running until the publish flag is cleared.
1492         while (_publish)
1493         {
1494             pingLoop();
1495         }
1496     }
1497 
1498     /**
1499      * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
1500      * connection, this clears the publish flag which in turn will halt the ping loop.
1501      *
1502      @param e The exception that triggered this callback method.
1503      */
1504     public void onException(JMSException e)
1505     {
1506         // log.debug("public void onException(JMSException e = " + e + "): called", e);
1507         _publish = false;
1508     }
1509 
1510     /**
1511      * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
1512      * with the runtime system as a shutdown hook.
1513      *
1514      @return A shutdown hook for the ping loop.
1515      */
1516     public Thread getShutdownHook()
1517     {
1518         return new Thread(new Runnable()
1519                 {
1520                     public void run()
1521                     {
1522                         stop();
1523                     }
1524                 });
1525     }
1526 
1527     /**
1528      * Closes all of the producer and consumer connections.
1529      *
1530      @throws JMSException All JMSException are allowed to fall through.
1531      */
1532     public void close() throws JMSException
1533     {
1534         // log.debug("public void close(): called");
1535 
1536         try
1537         {
1538             if (_connection != null)
1539             {
1540                 // log.debug("Before close producer connection.");
1541                 _connection.close();
1542                 // log.debug("Closed producer connection.");
1543             }
1544 
1545             for (int i = 0; i < _noOfConsumers; i++)
1546             {
1547                 if (_consumerConnection[i!= null)
1548                 {
1549                     // log.debug("Before close consumer connection " + i + ".");
1550                     _consumerConnection[i].close();
1551                     // log.debug("Closed consumer connection " + i + ".");
1552                 }
1553             }
1554         }
1555         finally
1556         {
1557             _connection = null;
1558             _producerSession = null;
1559             _consumerSession = null;
1560             _consumerConnection = null;
1561             _producer = null;
1562             _consumer = null;
1563             _pingDestinations = null;
1564             _replyDestination = null;
1565         }
1566     }
1567 
1568     /**
1569      * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a
1570      * transactional controlSession, this method does nothing (unless the failover after send flag is set).
1571      *
1572      <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is
1573      * applied. This flag applies whether the pinger is transactional or not.
1574      *
1575      <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit
1576      * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the
1577      * commit is applied. These flags will only apply if using a transactional pinger.
1578      *
1579      @param session The controlSession to commit
1580      *
1581      @return <tt>true</tt> if the controlSession was committed, <tt>false</tt> if it was not.
1582      *
1583      @throws javax.jms.JMSException If the commit fails and then the rollback fails.
1584      *
1585      * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
1586      * method, because commits only apply to transactional pingers, but fail after send applied to transactional and
1587      * non-transactional alike.
1588      */
1589     protected boolean commitTx(Session sessionthrows JMSException
1590     {
1591         // log.debug("protected void commitTx(Session session): called");
1592 
1593         boolean committed = false;
1594 
1595         _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend);
1596 
1597         if (session.getTransacted())
1598         {
1599             // log.debug("Session is transacted.");
1600 
1601             try
1602             {
1603                 _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit);
1604 
1605                 long start = System.nanoTime();
1606                 session.commit();
1607                 committed = true;
1608                 // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms");
1609 
1610                 _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit);
1611 
1612                 // log.debug("Session Commited.");
1613             }
1614             catch (JMSException e)
1615             {
1616                 // log.debug("JMSException on commit:" + e.getMessage(), e);
1617 
1618                 try
1619                 {
1620                     session.rollback();
1621                     // log.debug("Message rolled back.");
1622                 }
1623                 catch (JMSException jmse)
1624                 {
1625                     // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
1626 
1627                     // Both commit and rollback failed. Throw the rollback exception.
1628                     throw jmse;
1629                 }
1630             }
1631         }
1632 
1633         return committed;
1634     }
1635 
1636     /**
1637      * Outputs a prompt to the console and waits for the user to press return.
1638      *
1639      @param prompt The prompt to display on the console.
1640      */
1641     public void waitForUser(String prompt)
1642     {
1643         System.out.println(prompt);
1644 
1645         try
1646         {
1647             System.in.read();
1648         }
1649         catch (IOException e)
1650         {
1651             // Ignored.
1652         }
1653 
1654         System.out.println("Continuing.");
1655     }
1656 
1657     /**
1658      * Gets the number of consumers that are listening to each destination in the test.
1659      *
1660      @return int The number of consumers subscribing to each topic.
1661      */
1662     public int getConsumersPerDestination()
1663     {
1664         return _noOfConsumers;
1665     }
1666 
1667     /**
1668      * Calculates how many pings are expected to be received for the given number sent.
1669      *
1670      @param numpings The number of pings that will be sent.
1671      *
1672      @return The number that should be received, for the test to pass.
1673      */
1674     public int getExpectedNumPings(int numpings)
1675     {
1676         // Wow, I'm freaking sorry about this return here...
1677         return ((_failAfterSend || _failBeforeCommit? numpings - 1: numpings*
1678                                     (_isPubSub ? getConsumersPerDestination() 1);
1679     }
1680 
1681     /**
1682      * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link
1683      * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link
1684      * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of
1685      * messages with that correlation id.
1686      *
1687      <p/>Provided only one pinger is producing messages with that correlation id, the chained listener will always be
1688      * given unique message counts. It will always be called while the producer waiting for all messages to arrive is
1689      * still blocked.
1690      */
1691     public static interface ChainedMessageListener
1692     {
1693         /**
1694          * Notifies interested listeners about message arrival and important test stats, the number of messages
1695          * remaining in the test, and the messages send timestamp.
1696          *
1697          @param message        The newly arrived message.
1698          @param remainingCount The number of messages left to complete the test.
1699          @param latency        The nanosecond latency of the message.
1700          *
1701          @throws JMSException Any JMS exceptions is allowed to fall through.
1702          */
1703         public void onMessage(Message message, int remainingCount, long latencythrows JMSException;
1704     }
1705 
1706     /**
1707      * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be
1708      * added to this: read/write lock to make onMessage more concurrent as described in class header comment.
1709      */
1710     protected static class PerCorrelationId
1711     {
1712         /** Holds a countdown on number of expected messages. */
1713         CountDownLatch trafficLight;
1714 
1715         /** Holds the last timestamp that the timeout was reset to. */
1716         Long timeOutStart;
1717     }
1718 }