0001 /*
0002 *
0003 * Licensed to the Apache Software Foundation (ASF) under one
0004 * or more contributor license agreements. See the NOTICE file
0005 * distributed with this work for additional information
0006 * regarding copyright ownership. The ASF licenses this file
0007 * to you under the Apache License, Version 2.0 (the
0008 * "License"); you may not use this file except in compliance
0009 * with the License. You may obtain a copy of the License at
0010 *
0011 * http://www.apache.org/licenses/LICENSE-2.0
0012 *
0013 * Unless required by applicable law or agreed to in writing,
0014 * software distributed under the License is distributed on an
0015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0016 * KIND, either express or implied. See the License for the
0017 * specific language governing permissions and limitations
0018 * under the License.
0019 *
0020 */
0021 package org.apache.qpid.requestreply;
0022
0023 import org.apache.log4j.Logger;
0024 import org.apache.log4j.NDC;
0025
0026 import org.apache.qpid.test.framework.TestUtils;
0027
0028 import org.apache.qpid.junit.extensions.BatchedThrottle;
0029 import org.apache.qpid.junit.extensions.Throttle;
0030 import org.apache.qpid.junit.extensions.util.CommandLineParser;
0031 import org.apache.qpid.junit.extensions.util.ParsedProperties;
0032
0033 import javax.jms.*;
0034 import javax.naming.Context;
0035 import javax.naming.InitialContext;
0036 import javax.naming.NamingException;
0037
0038 import java.io.*;
0039 import java.net.InetAddress;
0040 import java.text.DateFormat;
0041 import java.text.SimpleDateFormat;
0042 import java.util.*;
0043 import java.util.concurrent.CountDownLatch;
0044 import java.util.concurrent.SynchronousQueue;
0045 import java.util.concurrent.TimeUnit;
0046 import java.util.concurrent.atomic.AtomicInteger;
0047 import java.util.concurrent.atomic.AtomicLong;
0048
0049 /**
0050 * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may
0051 * either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens
0052 * to its own messages and does not send replies (see {@link org.apache.qpid.ping.PingClient}). The intention of ping
0053 * pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour
0054 * configurable.
0055 *
0056 * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This
0057 * means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation
0058 * id in the ping to be bounced back in the reply correlation id.
0059 *
0060 * <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It
0061 * can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within
0062 * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover
0063 * testing. A complete list of accepted parameters, default values and comments on their usage is provided here:
0064 *
0065 * <p/><table><caption>Parameters</caption>
0066 * <tr><th> Parameter <th> Default <th> Comments
0067 * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers.
0068 * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping.
0069 * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used.
0070 * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions.
0071 * <tr><td> broker <td> tcp://localhost:5672 <td> Determines the broker to connect to.
0072 * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over.
0073 * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
0074 * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message.
0075 * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default.
0076 * <tr><td> failAfterCommit <td> false <td> Whether to prompt user to kill broker after a commit batch.
0077 * <tr><td> failBeforeCommit <td> false <td> Whether to prompt user to kill broker before a commit batch.
0078 * <tr><td> failAfterSend <td> false <td> Whether to prompt user to kill broker after a send.
0079 * <tr><td> failBeforeSend <td> false <td> Whether to prompt user to kill broker before a send.
0080 * <tr><td> failOnce <td> true <td> Whether to prompt for failover only once.
0081 * <tr><td> username <td> guest <td> The username to access the broker with.
0082 * <tr><td> password <td> guest <td> The password to access the broker with.
0083 * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with.
0084 * <tr><td> destinationCount <td> 1 <td> The number of destinations to send pings to.
0085 * <tr><td> numConsumers <td> 1 <td> The number of consumers on each destination.
0086 * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
0087 * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
0088 * <tr><td> uniqueDests <td> true <td> Whether each receivers only listens to one ping destination or all.
0089 * <tr><td> durableDests <td> false <td> Whether or not durable destinations are used.
0090 * <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
0091 * 0 - SESSION_TRANSACTED
0092 * 1 - AUTO_ACKNOWLEDGE
0093 * 2 - CLIENT_ACKNOWLEDGE
0094 * 3 - DUPS_OK_ACKNOWLEDGE
0095 * 257 - NO_ACKNOWLEDGE
0096 * 258 - PRE_ACKNOWLEDGE
0097 * <tr><td> consTransacted <td> false <td> Whether or not consumers use transactions. Defaults to the same value
0098 * as the 'transacted' option if not seperately defined.
0099 * <tr><td> consAckMode <td> AUTO_ACK <td> The message acknowledgement mode for consumers. Defaults to the same
0100 * value as 'ackMode' if not seperately defined.
0101 * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages sent but not yet received.
0102 * Limits the volume of messages currently buffered on the client
0103 * or broker. Can help scale test clients by limiting amount of buffered
0104 * data to avoid out of memory errors.
0105 * </table>
0106 *
0107 * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
0108 * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by
0109 * starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also
0110 * registered to terminate the ping-pong loop cleanly.
0111 *
0112 * <p/><table id="crc"><caption>CRC Card</caption>
0113 * <tr><th> Responsibilities <th> Collaborations
0114 * <tr><td> Provide a ping and wait for all responses cycle.
0115 * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
0116 * </table>
0117 *
0118 * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair.
0119 * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a
0120 * block that obtains the write lock for the very last message, releases any waiting producer. Means that the last
0121 * message waits until all other messages have been handled before releasing producers but allows messages to be
0122 * processed concurrently, unlike the current synchronized block.
0123 */
0124 public class PingPongProducer implements Runnable, ExceptionListener
0125 {
0126 /** Used for debugging. */
0127 private static final Logger log = Logger.getLogger(PingPongProducer.class);
0128
0129 /** Holds the name of the property to determine whether of not client id is overridden at connection time. */
0130 public static final String OVERRIDE_CLIENT_ID_PROPNAME = "overrideClientId";
0131
0132 /** Holds the default value of the override client id flag. */
0133 public static final String OVERRIDE_CLIENT_ID_DEAFULT = "false";
0134
0135 /** Holds the name of the property to define the JNDI factory name with. */
0136 public static final String FACTORY_NAME_PROPNAME = "factoryName";
0137
0138 /** Holds the default JNDI name of the connection factory. */
0139 public static final String FACTORY_NAME_DEAFULT = "local";
0140
0141 /** Holds the name of the property to set the JNDI initial context properties with. */
0142 public static final String FILE_PROPERTIES_PROPNAME = "properties";
0143
0144 /** Holds the default file name of the JNDI initial context properties. */
0145 public static final String FILE_PROPERTIES_DEAFULT = "perftests.properties";
0146
0147 /** Holds the name of the property to get the test message size from. */
0148 public static final String MESSAGE_SIZE_PROPNAME = "messageSize";
0149
0150 /** Used to set up a default message size. */
0151 public static final int MESSAGE_SIZE_DEAFULT = 0;
0152
0153 /** Holds the name of the property to get the ping queue name from. */
0154 public static final String PING_QUEUE_NAME_PROPNAME = "destinationName";
0155
0156 /** Holds the name of the default destination to send pings on. */
0157 public static final String PING_QUEUE_NAME_DEFAULT = "ping";
0158
0159 /** Holds the name of the property to get the queue name postfix from. */
0160 public static final String QUEUE_NAME_POSTFIX_PROPNAME = "queueNamePostfix";
0161
0162 /** Holds the default queue name postfix value. */
0163 public static final String QUEUE_NAME_POSTFIX_DEFAULT = "";
0164
0165 /** Holds the name of the property to get the test delivery mode from. */
0166 public static final String PERSISTENT_MODE_PROPNAME = "persistent";
0167
0168 /** Holds the message delivery mode to use for the test. */
0169 public static final boolean PERSISTENT_MODE_DEFAULT = false;
0170
0171 /** Holds the name of the property to get the test transactional mode from. */
0172 public static final String TRANSACTED_PROPNAME = "transacted";
0173
0174 /** Holds the transactional mode to use for the test. */
0175 public static final boolean TRANSACTED_DEFAULT = false;
0176
0177 /** Holds the name of the property to get the test consumer transacted mode from. */
0178 public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted";
0179
0180 /** Holds the consumer transactional mode default setting. */
0181 public static final boolean CONSUMER_TRANSACTED_DEFAULT = false;
0182
0183 /** Holds the name of the property to get the test broker url from. */
0184 public static final String BROKER_PROPNAME = "broker";
0185
0186 /** Holds the default broker url for the test. */
0187 public static final String BROKER_DEFAULT = "tcp://localhost:5672";
0188
0189 /** Holds the name of the property to get the test broker virtual path. */
0190 public static final String VIRTUAL_HOST_PROPNAME = "virtualHost";
0191
0192 /** Holds the default virtual path for the test. */
0193 public static final String VIRTUAL_HOST_DEFAULT = "";
0194
0195 /** Holds the name of the property to get the message rate from. */
0196 public static final String RATE_PROPNAME = "rate";
0197
0198 /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */
0199 public static final int RATE_DEFAULT = 0;
0200
0201 /** Holds the name of the property to get the verbose mode proeprty from. */
0202 public static final String VERBOSE_PROPNAME = "verbose";
0203
0204 /** Holds the default verbose mode. */
0205 public static final boolean VERBOSE_DEFAULT = false;
0206
0207 /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */
0208 public static final String PUBSUB_PROPNAME = "pubsub";
0209
0210 /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
0211 public static final boolean PUBSUB_DEFAULT = false;
0212
0213 /** Holds the name of the property to get the fail after commit flag from. */
0214 public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit";
0215
0216 /** Holds the default failover after commit test flag. */
0217 public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false;
0218
0219 /** Holds the name of the proeprty to get the fail before commit flag from. */
0220 public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit";
0221
0222 /** Holds the default failover before commit test flag. */
0223 public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false;
0224
0225 /** Holds the name of the proeprty to get the fail after send flag from. */
0226 public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend";
0227
0228 /** Holds the default failover after send test flag. */
0229 public static final boolean FAIL_AFTER_SEND_DEFAULT = false;
0230
0231 /** Holds the name of the property to get the fail before send flag from. */
0232 public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend";
0233
0234 /** Holds the default failover before send test flag. */
0235 public static final boolean FAIL_BEFORE_SEND_DEFAULT = false;
0236
0237 /** Holds the name of the property to get the fail once flag from. */
0238 public static final String FAIL_ONCE_PROPNAME = "failOnce";
0239
0240 /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */
0241 public static final boolean FAIL_ONCE_DEFAULT = true;
0242
0243 /** Holds the name of the property to get the broker access username from. */
0244 public static final String USERNAME_PROPNAME = "username";
0245
0246 /** Holds the default broker log on username. */
0247 public static final String USERNAME_DEFAULT = "guest";
0248
0249 /** Holds the name of the property to get the broker access password from. */
0250 public static final String PASSWORD_PROPNAME = "password";
0251
0252 /** Holds the default broker log on password. */
0253 public static final String PASSWORD_DEFAULT = "guest";
0254
0255 /** Holds the name of the proeprty to get the. */
0256 public static final String SELECTOR_PROPNAME = "selector";
0257
0258 /** Holds the default message selector. */
0259 public static final String SELECTOR_DEFAULT = "";
0260
0261 /** Holds the name of the property to get the destination count from. */
0262 public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";
0263
0264 /** Defines the default number of destinations to ping. */
0265 public static final int DESTINATION_COUNT_DEFAULT = 1;
0266
0267 /** Holds the name of the property to get the number of consumers per destination from. */
0268 public static final String NUM_CONSUMERS_PROPNAME = "numConsumers";
0269
0270 /** Defines the default number consumers per destination. */
0271 public static final int NUM_CONSUMERS_DEFAULT = 1;
0272
0273 /** Holds the name of the property to get the waiting timeout for response messages. */
0274 public static final String TIMEOUT_PROPNAME = "timeout";
0275
0276 /** Default time to wait before assuming that a ping has timed out. */
0277 public static final long TIMEOUT_DEFAULT = 30000;
0278
0279 /** Holds the name of the property to get the commit batch size from. */
0280 public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize";
0281
0282 /** Defines the default number of pings to send in each transaction when running transactionally. */
0283 public static final int TX_BATCH_SIZE_DEFAULT = 1;
0284
0285 /** Holds the name of the property to get the unique destinations flag from. */
0286 public static final String UNIQUE_DESTS_PROPNAME = "uniqueDests";
0287
0288 /** Defines the default value for the unique destinations property. */
0289 public static final boolean UNIQUE_DESTS_DEFAULT = true;
0290
0291 /** Holds the name of the property to get the durable destinations flag from. */
0292 public static final String DURABLE_DESTS_PROPNAME = "durableDests";
0293
0294 /** Defines the default value of the durable destinations flag. */
0295 public static final boolean DURABLE_DESTS_DEFAULT = false;
0296
0297 /** Holds the name of the proeprty to get the message acknowledgement mode from. */
0298 public static final String ACK_MODE_PROPNAME = "ackMode";
0299
0300 /** Defines the default message acknowledgement mode. */
0301 public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
0302
0303 /** Holds the name of the property to get the consumers message acknowledgement mode from. */
0304 public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode";
0305
0306 /** Defines the default consumers message acknowledgement mode. */
0307 public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
0308
0309 /** Holds the name of the property to get the maximum pending message size setting from. */
0310 public static final String MAX_PENDING_PROPNAME = "maxPending";
0311
0312 /** Defines the default value for the maximum pending message size setting. 0 means no limit. */
0313 public static final int MAX_PENDING_DEFAULT = 0;
0314
0315 /** Defines the default prefetch size to use when consuming messages. */
0316 public static final int PREFETCH_DEFAULT = 100;
0317
0318 /** Defines the default value of the no local flag to use when consuming messages. */
0319 public static final boolean NO_LOCAL_DEFAULT = false;
0320
0321 /** Defines the default value of the exclusive flag to use when consuming messages. */
0322 public static final boolean EXCLUSIVE_DEFAULT = false;
0323
0324 /** Holds the name of the property to store nanosecond timestamps in ping messages with. */
0325 public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
0326
0327 /** Holds the default configuration properties. */
0328 public static ParsedProperties defaults = new ParsedProperties();
0329
0330 static
0331 {
0332 defaults.setPropertyIfNull(OVERRIDE_CLIENT_ID_PROPNAME, OVERRIDE_CLIENT_ID_DEAFULT);
0333 defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT);
0334 defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT);
0335 defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
0336 defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT);
0337 defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT);
0338 defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT);
0339 defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
0340 defaults.setPropertyIfNull(QUEUE_NAME_POSTFIX_PROPNAME, QUEUE_NAME_POSTFIX_DEFAULT);
0341 defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
0342 defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
0343 defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT);
0344 defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
0345 defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
0346 defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT);
0347 defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
0348 defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
0349 defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
0350 defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT);
0351 defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
0352 defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT);
0353 defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT);
0354 defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT);
0355 defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT);
0356 defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT);
0357 defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
0358 defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
0359 defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT);
0360 defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
0361 defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
0362 defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
0363 }
0364
0365 /** Allows setting of client ID on the connection, rather than through the connection URL. */
0366 protected boolean _overrideClientId;
0367
0368 /** Holds the JNDI name of the JMS connection factory. */
0369 protected String _factoryName;
0370
0371 /** Holds the name of the properties file to configure JNDI with. */
0372 protected String _fileProperties;
0373
0374 /** Holds the broker url. */
0375 protected String _brokerDetails;
0376
0377 /** Holds the username to access the broker with. */
0378 protected String _username;
0379
0380 /** Holds the password to access the broker with. */
0381 protected String _password;
0382
0383 /** Holds the virtual host on the broker to run the tests through. */
0384 protected String _virtualpath;
0385
0386 /** Holds the root name from which to generate test destination names. */
0387 protected String _destinationName;
0388
0389 /** Holds the default queue name postfix value. */
0390 protected String _queueNamePostfix;
0391
0392 /** Holds the message selector to filter the pings with. */
0393 protected String _selector;
0394
0395 /** Holds the producers transactional mode flag. */
0396 protected boolean _transacted;
0397
0398 /** Holds the consumers transactional mode flag. */
0399 protected boolean _consTransacted;
0400
0401 /** Determines whether this producer sends persistent messages. */
0402 protected boolean _persistent;
0403
0404 /** Holds the acknowledgement mode used for the producers. */
0405 protected int _ackMode;
0406
0407 /** Holds the acknowledgement mode setting for the consumers. */
0408 protected int _consAckMode;
0409
0410 /** Determines what size of messages this producer sends. */
0411 protected int _messageSize;
0412
0413 /** Used to indicate that the ping loop should print out whenever it pings. */
0414 protected boolean _verbose;
0415
0416 /** Flag used to indicate if this is a point to point or pub/sub ping client. */
0417 protected boolean _isPubSub;
0418
0419 /** Flag used to indicate if the destinations should be unique client. */
0420 protected boolean _isUnique;
0421
0422 /** Flag used to indicate that durable destination should be used. */
0423 protected boolean _isDurable;
0424
0425 /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
0426 protected boolean _failBeforeCommit;
0427
0428 /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
0429 protected boolean _failAfterCommit;
0430
0431 /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
0432 protected boolean _failBeforeSend;
0433
0434 /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
0435 protected boolean _failAfterSend;
0436
0437 /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
0438 protected boolean _failOnce;
0439
0440 /** Holds the number of sends that should be performed in every transaction when using transactions. */
0441 protected int _txBatchSize;
0442
0443 /** Holds the number of destinations to ping. */
0444 protected int _noOfDestinations;
0445
0446 /** Holds the number of consumers per destination. */
0447 protected int _noOfConsumers;
0448
0449 /** Holds the maximum send rate in herz. */
0450 protected int _rate;
0451
0452 /**
0453 * Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended
0454 * if this limit is breached.
0455 */
0456 protected int _maxPendingSize;
0457
0458 /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
0459 private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
0460
0461 /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */
0462 private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0);
0463
0464 /** Holds this instances unique id. */
0465 private int instanceId;
0466
0467 /**
0468 * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple
0469 * ping producers on the same JVM.
0470 */
0471 private static Map<String, PerCorrelationId> perCorrelationIds =
0472 Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
0473
0474 /** A convenient formatter to use when time stamping output. */
0475 protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
0476
0477 /** Holds the connection for the message producer. */
0478 protected Connection _connection;
0479
0480 /** Holds the consumer connections. */
0481 protected Connection[] _consumerConnection;
0482
0483 /** Holds the controlSession on which ping replies are received. */
0484 protected Session[] _consumerSession;
0485
0486 /** Holds the producer controlSession, needed to create ping messages. */
0487 protected Session _producerSession;
0488
0489 /** Holds the destination where the response messages will arrive. */
0490 protected Destination _replyDestination;
0491
0492 /** Holds the set of destinations that this ping producer pings. */
0493 protected List<Destination> _pingDestinations;
0494
0495 /** Used to restrict the sending rate to a specified limit. */
0496 protected Throttle _rateLimiter;
0497
0498 /** Holds a message listener that this message listener chains all its messages to. */
0499 protected ChainedMessageListener _chainedMessageListener = null;
0500
0501 /**
0502 * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
0503 * creating multiple ping producers in the same JVM.
0504 */
0505 protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
0506
0507 /**
0508 * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
0509 * on the same JVM using this id generator will allow them to ping on the same queues.
0510 */
0511 protected AtomicInteger _queueSharedID = new AtomicInteger();
0512
0513 /** Used to tell the ping loop when to terminate, it only runs while this is true. */
0514 protected boolean _publish = true;
0515
0516 /** Holds the message producer to send the pings through. */
0517 protected MessageProducer _producer;
0518
0519 /** Holds the message consumer to receive the ping replies through. */
0520 protected MessageConsumer[] _consumer;
0521
0522 /** The prompt to display when asking the user to kill the broker for failover testing. */
0523 private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
0524
0525 /** Holds the name for this test client to be identified to the broker with. */
0526 private String _clientID;
0527
0528 /** Keeps count of the total messages sent purely for debugging purposes. */
0529 private static AtomicInteger numSent = new AtomicInteger();
0530
0531 /**
0532 * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
0533 * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a
0534 * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an
0535 * equal chance to produce messages.
0536 */
0537 static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true);
0538
0539 /** Keeps a count of the number of message currently sent but not received. */
0540 static AtomicInteger _unreceived = new AtomicInteger(0);
0541
0542 /**
0543 * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
0544 * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
0545 * it, to send and recieve its pings and replies on.
0546 *
0547 * @param overrides Properties containing any desired overrides to the defaults.
0548 *
0549 * @throws Exception Any exceptions are allowed to fall through.
0550 */
0551 public PingPongProducer(Properties overrides) throws Exception
0552 {
0553 // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
0554 instanceId = _instanceIdGenerator.getAndIncrement();
0555
0556 // Create a set of parsed properties from the defaults overriden by the passed in values.
0557 ParsedProperties properties = new ParsedProperties(defaults);
0558 properties.putAll(overrides);
0559
0560 // Extract the configuration properties to set the pinger up with.
0561 _overrideClientId = properties.getPropertyAsBoolean(OVERRIDE_CLIENT_ID_PROPNAME);
0562 _factoryName = properties.getProperty(FACTORY_NAME_PROPNAME);
0563 _fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME);
0564 _brokerDetails = properties.getProperty(BROKER_PROPNAME);
0565 _username = properties.getProperty(USERNAME_PROPNAME);
0566 _password = properties.getProperty(PASSWORD_PROPNAME);
0567 _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME);
0568 _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
0569 _queueNamePostfix = properties.getProperty(QUEUE_NAME_POSTFIX_PROPNAME);
0570 _selector = properties.getProperty(SELECTOR_PROPNAME);
0571 _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
0572 _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME);
0573 _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
0574 _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
0575 _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
0576 _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME);
0577 _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME);
0578 _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME);
0579 _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME);
0580 _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME);
0581 _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME);
0582 _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME);
0583 _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME);
0584 _rate = properties.getPropertyAsInteger(RATE_PROPNAME);
0585 _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
0586 _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
0587 _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
0588 _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
0589 _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
0590 _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
0591
0592 // Check that one or more destinations were specified.
0593 if (_noOfDestinations < 1)
0594 {
0595 throw new IllegalArgumentException("There must be at least one destination.");
0596 }
0597
0598 // Set up a throttle to control the send rate, if a rate > 0 is specified.
0599 if (_rate > 0)
0600 {
0601 _rateLimiter = new BatchedThrottle();
0602 _rateLimiter.setRate(_rate);
0603 }
0604
0605 // Create the connection and message producers/consumers.
0606 // establishConnection(true, true);
0607 }
0608
0609 /**
0610 * Establishes a connection to the broker and creates message consumers and producers based on the parameters
0611 * that this ping client was created with.
0612 *
0613 * @param producer Flag to indicate whether or not the producer should be set up.
0614 * @param consumer Flag to indicate whether or not the consumers should be set up.
0615 *
0616 * @throws Exception Any exceptions are allowed to fall through.
0617 */
0618 public void establishConnection(boolean producer, boolean consumer) throws Exception
0619 {
0620 // log.debug("public void establishConnection(): called");
0621
0622 // Generate a unique identifying name for this client, based on it ip address and the current time.
0623 InetAddress address = InetAddress.getLocalHost();
0624 // _clientID = address.getHostName() + System.currentTimeMillis();
0625 _clientID = "perftest_" + instanceId;
0626
0627 // Create a connection to the broker.
0628 createConnection(_clientID);
0629
0630 // Create transactional or non-transactional sessions, based on the command line arguments.
0631 _producerSession = _connection.createSession(_transacted, _ackMode);
0632
0633 _consumerSession = new Session[_noOfConsumers];
0634
0635 for (int i = 0; i < _noOfConsumers; i++)
0636 {
0637 _consumerSession[i] = _consumerConnection[i].createSession(_consTransacted, _consAckMode);
0638 }
0639
0640 // Create the destinations to send pings to and receive replies from.
0641 _replyDestination = _consumerSession[0].createTemporaryQueue();
0642 createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
0643
0644 // Create the message producer only if instructed to.
0645 if (producer)
0646 {
0647 createProducer();
0648 }
0649
0650 // Create the message consumer only if instructed to.
0651 if (consumer)
0652 {
0653 createReplyConsumers(getReplyDestinations(), _selector);
0654 }
0655 }
0656
0657 /**
0658 * Establishes a connection to the broker, based on the configuration parameters that this ping client was
0659 * created with.
0660 *
0661 * @param clientID The clients identifier.
0662 *
0663 * @throws JMSException Underlying exceptions allowed to fall through.
0664 * @throws NamingException Underlying exceptions allowed to fall through.
0665 * @throws IOException Underlying exceptions allowed to fall through.
0666 */
0667 protected void createConnection(String clientID) throws JMSException, NamingException, IOException
0668 {
0669 // _log.debug("protected void createConnection(String clientID = " + clientID + "): called");
0670
0671 // _log.debug("Creating a connection for the message producer.");
0672 File propsFile = new File(_fileProperties);
0673 InputStream is = new FileInputStream(propsFile);
0674 Properties properties = new Properties();
0675 properties.load(is);
0676
0677 Context context = new InitialContext(properties);
0678 ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName);
0679 _connection = factory.createConnection(_username, _password);
0680
0681 if (_overrideClientId)
0682 {
0683 _connection.setClientID(clientID);
0684 }
0685
0686 // _log.debug("Creating " + _noOfConsumers + " connections for the consumers.");
0687
0688 _consumerConnection = new Connection[_noOfConsumers];
0689
0690 for (int i = 0; i < _noOfConsumers; i++)
0691 {
0692 _consumerConnection[i] = factory.createConnection(_username, _password);
0693 // _consumerConnection[i].setClientID(clientID);
0694 }
0695 }
0696
0697 /**
0698 * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
0699 * to be started to bounce the pings back again.
0700 *
0701 * @param args The command line arguments.
0702 */
0703 public static void main(String[] args)
0704 {
0705 try
0706 {
0707 Properties options =
0708 CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
0709
0710 // Create a ping producer overriding its defaults with all options passed on the command line.
0711 PingPongProducer pingProducer = new PingPongProducer(options);
0712 pingProducer.establishConnection(true, true);
0713
0714 // Start the ping producers dispatch thread running.
0715 pingProducer._connection.start();
0716
0717 // Create a shutdown hook to terminate the ping-pong producer.
0718 Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
0719
0720 // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
0721 pingProducer._connection.setExceptionListener(pingProducer);
0722
0723 // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
0724 Thread pingThread = new Thread(pingProducer);
0725 pingThread.run();
0726 pingThread.join();
0727 }
0728 catch (Exception e)
0729 {
0730 System.err.println(e.getMessage());
0731 log.error("Top level handler caught execption.", e);
0732 System.exit(1);
0733 }
0734 }
0735
0736 /**
0737 * Convenience method for a short pause.
0738 *
0739 * @param sleepTime The time in milliseconds to pause for.
0740 */
0741 public static void pause(long sleepTime)
0742 {
0743 if (sleepTime > 0)
0744 {
0745 try
0746 {
0747 Thread.sleep(sleepTime);
0748 }
0749 catch (InterruptedException ie)
0750 { }
0751 }
0752 }
0753
0754 /**
0755 * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply to
0756 * destination of this pinger.
0757 *
0758 * @return The single reply to destination of this pinger, wrapped in a list.
0759 */
0760 public List<Destination> getReplyDestinations()
0761 {
0762 // log.debug("public List<Destination> getReplyDestinations(): called");
0763
0764 List<Destination> replyDestinations = new ArrayList<Destination>();
0765 replyDestinations.add(_replyDestination);
0766
0767 // log.debug("replyDestinations = " + replyDestinations);
0768
0769 return replyDestinations;
0770 }
0771
0772 /**
0773 * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery
0774 * flag is set accoring the ping producer creation options.
0775 *
0776 * @throws JMSException Any JMSExceptions are allowed to fall through.
0777 */
0778 public void createProducer() throws JMSException
0779 {
0780 // log.debug("public void createProducer(): called");
0781
0782 _producer = (MessageProducer) _producerSession.createProducer(null);
0783 _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
0784
0785 // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
0786 }
0787
0788 /**
0789 * Creates consumers for the specified number of destinations. The destinations themselves are also created by this
0790 * method.
0791 *
0792 * @param noOfDestinations The number of destinations to create consumers for.
0793 * @param selector The message selector to filter the consumers with.
0794 * @param rootName The root of the name, or actual name if only one is being created.
0795 * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share the
0796 * numbering with all pingers on the same JVM.
0797 * @param durable If the destinations are durable topics.
0798 *
0799 * @throws JMSException Any JMSExceptions are allowed to fall through.
0800 */
0801 public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
0802 boolean durable) throws JMSException
0803 {
0804 /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
0805 + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
0806 + durable + "): called");*/
0807
0808 _pingDestinations = new ArrayList<Destination>();
0809
0810 // Create the desired number of ping destinations and consumers for them.
0811 // log.debug("Creating " + noOfDestinations + " destinations to ping.");
0812
0813 for (int i = 0; i < noOfDestinations; i++)
0814 {
0815 Destination destination;
0816 String id;
0817
0818 // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
0819 if (unique)
0820 {
0821 // log.debug("Creating unique destinations.");
0822 id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
0823 }
0824 else
0825 {
0826 // log.debug("Creating shared destinations.");
0827 id = "_" + _queueSharedID.incrementAndGet();
0828 }
0829
0830 // Check if this is a pub/sub pinger, in which case create topics.
0831 if (_isPubSub)
0832 {
0833 destination = _producerSession.createTopic(rootName + id);
0834 // log.debug("Created non-durable topic " + destination);
0835
0836 if (durable)
0837 {
0838 _producerSession.createDurableSubscriber((Topic) destination, _connection.getClientID());
0839 }
0840 }
0841 // Otherwise this is a p2p pinger, in which case create queues.
0842 else
0843 {
0844 destination = _producerSession.createQueue(rootName + id + _queueNamePostfix);
0845 // log.debug("Created queue " + destination);
0846 }
0847
0848 // Keep the destination.
0849 _pingDestinations.add(destination);
0850 }
0851 }
0852
0853 /**
0854 * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
0855 *
0856 * @param destinations The destinations to listen to.
0857 * @param selector A selector to filter the messages with.
0858 *
0859 * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
0860 */
0861 public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
0862 {
0863 /*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
0864 + ", String selector = " + selector + "): called");*/
0865
0866 log.debug("There are " + destinations.size() + " destinations.");
0867 log.debug("Creating " + _noOfConsumers + " consumers on each destination.");
0868 log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers));
0869
0870 for (Destination destination : destinations)
0871 {
0872 _consumer = new MessageConsumer[_noOfConsumers];
0873
0874 for (int i = 0; i < _noOfConsumers; i++)
0875 {
0876 // Create a consumer for the destination and set this pinger to listen to its messages.
0877 _consumer[i] = _consumerSession[i].createConsumer(destination, selector, NO_LOCAL_DEFAULT);
0878
0879 final int consumerNo = i;
0880
0881 _consumer[i].setMessageListener(new MessageListener()
0882 {
0883 public void onMessage(Message message)
0884 {
0885 onMessageWithConsumerNo(message, consumerNo);
0886 }
0887 });
0888
0889 log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination);
0890 }
0891 }
0892 }
0893
0894 /**
0895 * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
0896 * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the
0897 * replies map.
0898 *
0899 * @param message The received message.
0900 * @param consumerNo The consumer number within this test pinger instance.
0901 */
0902 public void onMessageWithConsumerNo(Message message, int consumerNo)
0903 {
0904 // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called");
0905 try
0906 {
0907 long now = System.nanoTime();
0908 long timestamp = getTimestamp(message);
0909 long pingTime = now - timestamp;
0910
0911 // NDC.push("id" + instanceId + "/cons" + consumerNo);
0912
0913 // Extract the messages correlation id.
0914 String correlationID = message.getJMSCorrelationID();
0915 // log.debug("correlationID = " + correlationID);
0916
0917 // int num = message.getIntProperty("MSG_NUM");
0918 // log.info("Message " + num + " received.");
0919
0920 boolean isRedelivered = message.getJMSRedelivered();
0921 // log.debug("isRedelivered = " + isRedelivered);
0922
0923 if (!isRedelivered)
0924 {
0925 // Countdown on the traffic light if there is one for the matching correlation id.
0926 PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
0927
0928 if (perCorrelationId != null)
0929 {
0930 CountDownLatch trafficLight = perCorrelationId.trafficLight;
0931
0932 // Restart the timeout timer on every message.
0933 perCorrelationId.timeOutStart = System.nanoTime();
0934
0935 // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
0936
0937 // Release waiting senders if there are some and using maxPending limit.
0938 if ((_maxPendingSize > 0))
0939 {
0940 // Decrement the count of sent but not yet received messages.
0941 int unreceived = _unreceived.decrementAndGet();
0942 int unreceivedSize =
0943 (unreceived * ((_messageSize == 0) ? 1 : _messageSize))
0944 / (_isPubSub ? getConsumersPerDestination() : 1);
0945
0946 // log.debug("unreceived = " + unreceived);
0947 // log.debug("unreceivedSize = " + unreceivedSize);
0948
0949 // synchronized (_sendPauseMonitor)
0950 // {
0951 if (unreceivedSize < _maxPendingSize)
0952 {
0953 _sendPauseMonitor.poll();
0954 }
0955 // }
0956 }
0957
0958 // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
0959 // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
0960 // ensures that each thread will get a unique value for the remaining messages.
0961 long trueCount;
0962 long remainingCount;
0963
0964 synchronized (trafficLight)
0965 {
0966 trafficLight.countDown();
0967
0968 trueCount = trafficLight.getCount();
0969 remainingCount = trueCount - 1;
0970
0971 // NDC.push("/rem" + remainingCount);
0972
0973 // log.debug("remainingCount = " + remainingCount);
0974 // log.debug("trueCount = " + trueCount);
0975
0976 // Commit on transaction batch size boundaries. At this point in time the waiting producer
0977 // remains blocked, even on the last message.
0978 // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on
0979 // each batch boundary. For pub/sub each consumer gets every message so no division is done.
0980 // When running in client ack mode, an ack is done instead of a commit, on the commit batch
0981 // size boundaries.
0982 long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
0983 // log.debug("commitCount = " + commitCount);
0984
0985 if ((commitCount % _txBatchSize) == 0)
0986 {
0987 if (_consAckMode == 2)
0988 {
0989 // log.debug("Doing client ack for consumer " + consumerNo + ".");
0990 message.acknowledge();
0991 }
0992 else
0993 {
0994 // log.debug("Trying commit for consumer " + consumerNo + ".");
0995 commitTx(_consumerSession[consumerNo]);
0996 // log.info("Tx committed on consumer " + consumerNo);
0997 }
0998 }
0999
1000 // Forward the message and remaining count to any interested chained message listener.
1001 if (_chainedMessageListener != null)
1002 {
1003 _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime);
1004 }
1005
1006 // Check if this is the last message, in which case release any waiting producers. This is done
1007 // after the transaction has been committed and any listeners notified.
1008 if (trueCount == 1)
1009 {
1010 trafficLight.countDown();
1011 }
1012 }
1013 }
1014 else
1015 {
1016 log.warn("Got unexpected message with correlationId: " + correlationID);
1017 }
1018 }
1019 else
1020 {
1021 log.warn("Got redelivered message, ignoring.");
1022 }
1023 }
1024 catch (JMSException e)
1025 {
1026 log.warn("There was a JMSException: " + e.getMessage(), e);
1027 }
1028 finally
1029 {
1030 // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending");
1031 // NDC.clear();
1032 }
1033 }
1034
1035 /**
1036 * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
1037 * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
1038 * the correlation id.
1039 *
1040 * @param message The message to send. If this is null, one is generated.
1041 * @param numPings The number of ping messages to send.
1042 * @param timeout The timeout in milliseconds.
1043 * @param messageCorrelationId The message correlation id. If this is null, one is generated.
1044 *
1045 * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait
1046 * for all prematurely.
1047 *
1048 * @throws JMSException All underlying JMSExceptions are allowed to fall through.
1049 * @throws InterruptedException When interrupted by a timeout
1050 */
1051 public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
1052 throws JMSException, InterruptedException
1053 {
1054 /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
1055 + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
1056
1057 // Generate a unique correlation id to put on the messages before sending them, if one was not specified.
1058 if (messageCorrelationId == null)
1059 {
1060 messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
1061 }
1062
1063 try
1064 {
1065 // NDC.push("prod");
1066
1067 // Create a count down latch to count the number of replies with. This is created before the messages are
1068 // sent so that the replies cannot be received before the count down is created.
1069 // One is added to this, so that the last reply becomes a special case. The special case is that the
1070 // chained message listener must be called before this sender can be unblocked, but that decrementing the
1071 // countdown needs to be done before the chained listener can be called.
1072 PerCorrelationId perCorrelationId = new PerCorrelationId();
1073
1074 perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1);
1075 perCorrelationIds.put(messageCorrelationId, perCorrelationId);
1076
1077 // Set up the current time as the start time for pinging on the correlation id. This is used to determine
1078 // timeouts.
1079 perCorrelationId.timeOutStart = System.nanoTime();
1080
1081 // Send the specifed number of messages.
1082 pingNoWaitForReply(message, numPings, messageCorrelationId);
1083
1084 boolean timedOut;
1085 boolean allMessagesReceived;
1086 int numReplies;
1087
1088 do
1089 {
1090 // Block the current thread until replies to all the messages are received, or it times out.
1091 perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);
1092
1093 // Work out how many replies were receieved.
1094 numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount();
1095
1096 allMessagesReceived = numReplies == getExpectedNumPings(numPings);
1097
1098 // log.debug("numReplies = " + numReplies);
1099 // log.debug("allMessagesReceived = " + allMessagesReceived);
1100
1101 // Recheck the timeout condition.
1102 long now = System.nanoTime();
1103 long lastMessageReceievedAt = perCorrelationId.timeOutStart;
1104 timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000);
1105
1106 // log.debug("now = " + now);
1107 // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
1108 }
1109 while (!timedOut && !allMessagesReceived);
1110
1111 if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
1112 {
1113 log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
1114 }
1115 else if (_verbose)
1116 {
1117 log.info("Got all replies on id, " + messageCorrelationId);
1118 }
1119
1120 // commitTx(_consumerSession);
1121
1122 // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
1123
1124 return numReplies;
1125 }
1126 // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived,
1127 // so will be a memory leak if this is not done.
1128 finally
1129 {
1130 // NDC.pop();
1131 perCorrelationIds.remove(messageCorrelationId);
1132 }
1133 }
1134
1135 /**
1136 * Sends the specified number of ping messages and does not wait for correlating replies.
1137 *
1138 * @param message The message to send.
1139 * @param numPings The number of pings to send.
1140 * @param messageCorrelationId A correlation id to place on all messages sent.
1141 *
1142 * @throws JMSException All underlying JMSExceptions are allowed to fall through.
1143 */
1144 public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
1145 {
1146 /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
1147 + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
1148
1149 if (message == null)
1150 {
1151 message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
1152 }
1153
1154 message.setJMSCorrelationID(messageCorrelationId);
1155
1156 // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
1157 // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is
1158 // needed.
1159 boolean committed = false;
1160
1161 // Send all of the ping messages.
1162 for (int i = 0; i < numPings; i++)
1163 {
1164 // Re-timestamp the message.
1165 // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
1166
1167 // Send the message, passing in the message count.
1168 committed = sendMessage(i, message);
1169
1170 // Spew out per message timings on every message sonly in verbose mode.
1171 /*if (_verbose)
1172 {
1173 log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
1174 }*/
1175 }
1176
1177 // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages.
1178 if (!committed)
1179 {
1180 commitTx(_producerSession);
1181 }
1182 }
1183
1184 /**
1185 * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of
1186 * messages sent so far must be specified and is used to round robin the ping destinations (where there are more
1187 * than one), and to determine if the transaction batch size has been reached and the sent messages should be
1188 * committed.
1189 *
1190 * @param i The count of messages sent so far in a loop of multiple calls to this send method.
1191 * @param message The message to send.
1192 *
1193 * @return <tt>true</tt> if the messages were committed, <tt>false</tt> otherwise.
1194 *
1195 * @throws JMSException All underlyiung JMSExceptions are allowed to fall through.
1196 */
1197 protected boolean sendMessage(int i, Message message) throws JMSException
1198 {
1199 try
1200 {
1201 NDC.push("id" + instanceId + "/prod");
1202
1203 // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
1204 // log.debug("_txBatchSize = " + _txBatchSize);
1205
1206 // Round robin the destinations as the messages are sent.
1207 Destination destination = _pingDestinations.get(i % _pingDestinations.size());
1208
1209 // Prompt the user to kill the broker when doing failover testing.
1210 _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
1211
1212 // Get the test setup for the correlation id.
1213 String correlationID = message.getJMSCorrelationID();
1214 PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
1215
1216 // If necessary, wait until the max pending message size comes within its limit.
1217 if (_maxPendingSize > 0)
1218 {
1219 synchronized (_sendPauseMonitor)
1220 {
1221 // Used to keep track of the number of times that send has to wait.
1222 int numWaits = 0;
1223
1224 // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with
1225 // the test timeout.
1226 int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
1227
1228 while (true)
1229 {
1230 // Get the size estimate of sent but not yet received messages.
1231 int unreceived = _unreceived.get();
1232 int unreceivedSize =
1233 (unreceived * ((_messageSize == 0) ? 1 : _messageSize))
1234 / (_isPubSub ? getConsumersPerDestination() : 1);
1235
1236 // log.debug("unreceived = " + unreceived);
1237 // log.debug("unreceivedSize = " + unreceivedSize);
1238 // log.debug("_maxPendingSize = " + _maxPendingSize);
1239
1240 if (unreceivedSize > _maxPendingSize)
1241 {
1242 // log.debug("unreceived size estimate over limit = " + unreceivedSize);
1243
1244 // Fail the test if the send has had to wait more than the maximum allowed number of times.
1245 if (numWaits > waitLimit)
1246 {
1247 String errorMessage =
1248 "Send has had to wait for the unreceivedSize (" + unreceivedSize
1249 + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit
1250 + " times.";
1251 log.warn(errorMessage);
1252 throw new RuntimeException(errorMessage);
1253 }
1254
1255 // Wait on the send pause barrier for the limit to be re-established.
1256 try
1257 {
1258 long start = System.nanoTime();
1259 // _sendPauseMonitor.wait(10000);
1260 _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS);
1261 long end = System.nanoTime();
1262
1263 // Count the wait only if it was for > 99% of the requested wait time.
1264 if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99)
1265 {
1266 numWaits++;
1267 }
1268 }
1269 catch (InterruptedException e)
1270 {
1271 // Restore the interrupted status
1272 Thread.currentThread().interrupt();
1273 throw new RuntimeException(e);
1274 }
1275 }
1276 else
1277 {
1278 break;
1279 }
1280 }
1281 }
1282 }
1283
1284 // Send the message either to its round robin destination, or its default destination.
1285 // int num = numSent.incrementAndGet();
1286 // message.setIntProperty("MSG_NUM", num);
1287 setTimestamp(message);
1288
1289 if (destination == null)
1290 {
1291 _producer.send(message);
1292 }
1293 else
1294 {
1295 _producer.send(destination, message);
1296 }
1297
1298 // Increase the unreceived size, this may actually happen after the message is received.
1299 // The unreceived size is incremented by the number of consumers that will get a copy of the message,
1300 // in pub/sub mode.
1301 if (_maxPendingSize > 0)
1302 {
1303 int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1);
1304 // log.debug("newUnreceivedCount = " + newUnreceivedCount);
1305 }
1306
1307 // Apply message rate throttling if a rate limit has been set up.
1308 if (_rateLimiter != null)
1309 {
1310 _rateLimiter.throttle();
1311 }
1312
1313 // Call commit every time the commit batch size is reached.
1314 boolean committed = false;
1315
1316 // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
1317 if (((i + 1) % _txBatchSize) == 0)
1318 {
1319 // log.debug("Trying commit on producer session.");
1320 committed = commitTx(_producerSession);
1321 }
1322
1323 return committed;
1324 }
1325 finally
1326 {
1327 NDC.clear();
1328 }
1329 }
1330
1331 /**
1332 * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the
1333 * test that the failure has occurred, before the method returns.
1334 *
1335 * @param failFlag The fail flag to test.
1336 *
1337 * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only
1338 * used once, then reset.
1339 */
1340 private boolean waitForUserToPromptOnFailure(boolean failFlag)
1341 {
1342 if (failFlag)
1343 {
1344 if (_failOnce)
1345 {
1346 failFlag = false;
1347 }
1348
1349 // log.debug("Failing Before Send");
1350 waitForUser(KILL_BROKER_PROMPT);
1351 }
1352
1353 return failFlag;
1354 }
1355
1356 /**
1357 * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch
1358 * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will
1359 * terminate the pinger.
1360 */
1361 public void pingLoop()
1362 {
1363 try
1364 {
1365 // Generate a sample message and time stamp it.
1366 Message msg = getTestMessage(_replyDestination, _messageSize, _persistent);
1367 // setTimestamp(msg);
1368
1369 // Send the message and wait for a reply.
1370 pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null);
1371 }
1372 catch (JMSException e)
1373 {
1374 _publish = false;
1375 // log.debug("There was a JMSException: " + e.getMessage(), e);
1376 }
1377 catch (InterruptedException e)
1378 {
1379 _publish = false;
1380 // log.debug("There was an interruption: " + e.getMessage(), e);
1381 }
1382 }
1383
1384 /**
1385 * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
1386 * here.
1387 *
1388 * @param messageListener The chained message listener.
1389 */
1390 public void setChainedMessageListener(ChainedMessageListener messageListener)
1391 {
1392 _chainedMessageListener = messageListener;
1393 }
1394
1395 /** Removes any chained message listeners from this pinger. */
1396 public void removeChainedMessageListener()
1397 {
1398 _chainedMessageListener = null;
1399 }
1400
1401 /**
1402 * Generates a test message of the specified size, with the specified reply-to destination and persistence flag.
1403 *
1404 * @param replyQueue The reply-to destination for the message.
1405 * @param messageSize The desired size of the message in bytes.
1406 * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
1407 *
1408 * @return A freshly generated test message.
1409 *
1410 * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
1411 */
1412 public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
1413 {
1414 // return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
1415 return TestUtils.createTestMessageOfSize(_producerSession, messageSize);
1416 }
1417
1418 /**
1419 * Sets the current time in nanoseconds as the timestamp on the message.
1420 *
1421 * @param msg The message to timestamp.
1422 *
1423 * @throws JMSException Any JMSExceptions are allowed to fall through.
1424 */
1425 protected void setTimestamp(Message msg) throws JMSException
1426 {
1427 /*if (((AMQSession)_producerSession).isStrictAMQP())
1428 {
1429 ((AMQMessage)msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime());
1430 }
1431 else
1432 {*/
1433 msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
1434 // }
1435 }
1436
1437 /**
1438 * Extracts the nanosecond timestamp from a message.
1439 *
1440 * @param msg The message to extract the time stamp from.
1441 *
1442 * @return The timestamp in nanos.
1443 *
1444 * @throws JMSException Any JMSExceptions are allowed to fall through.
1445 */
1446 protected long getTimestamp(Message msg) throws JMSException
1447 {
1448 /*if (((AMQSession)_producerSession).isStrictAMQP())
1449 {
1450 Long value = ((AMQMessage)msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME));
1451
1452 return (value == null) ? 0L : value;
1453 }
1454 else
1455 {*/
1456 return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
1457 // }
1458 }
1459
1460 /**
1461 * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag
1462 * has been cleared.
1463 */
1464 public void stop()
1465 {
1466 _publish = false;
1467 }
1468
1469 /**
1470 * Starts the producer and consumer connections.
1471 *
1472 * @throws JMSException Any JMSExceptions are allowed to fall through.
1473 */
1474 public void start() throws JMSException
1475 {
1476 // log.debug("public void start(): called");
1477
1478 _connection.start();
1479 // log.debug("Producer started.");
1480
1481 for (int i = 0; i < _noOfConsumers; i++)
1482 {
1483 _consumerConnection[i].start();
1484 // log.debug("Consumer " + i + " started.");
1485 }
1486 }
1487
1488 /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */
1489 public void run()
1490 {
1491 // Keep running until the publish flag is cleared.
1492 while (_publish)
1493 {
1494 pingLoop();
1495 }
1496 }
1497
1498 /**
1499 * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
1500 * connection, this clears the publish flag which in turn will halt the ping loop.
1501 *
1502 * @param e The exception that triggered this callback method.
1503 */
1504 public void onException(JMSException e)
1505 {
1506 // log.debug("public void onException(JMSException e = " + e + "): called", e);
1507 _publish = false;
1508 }
1509
1510 /**
1511 * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
1512 * with the runtime system as a shutdown hook.
1513 *
1514 * @return A shutdown hook for the ping loop.
1515 */
1516 public Thread getShutdownHook()
1517 {
1518 return new Thread(new Runnable()
1519 {
1520 public void run()
1521 {
1522 stop();
1523 }
1524 });
1525 }
1526
1527 /**
1528 * Closes all of the producer and consumer connections.
1529 *
1530 * @throws JMSException All JMSException are allowed to fall through.
1531 */
1532 public void close() throws JMSException
1533 {
1534 // log.debug("public void close(): called");
1535
1536 try
1537 {
1538 if (_connection != null)
1539 {
1540 // log.debug("Before close producer connection.");
1541 _connection.close();
1542 // log.debug("Closed producer connection.");
1543 }
1544
1545 for (int i = 0; i < _noOfConsumers; i++)
1546 {
1547 if (_consumerConnection[i] != null)
1548 {
1549 // log.debug("Before close consumer connection " + i + ".");
1550 _consumerConnection[i].close();
1551 // log.debug("Closed consumer connection " + i + ".");
1552 }
1553 }
1554 }
1555 finally
1556 {
1557 _connection = null;
1558 _producerSession = null;
1559 _consumerSession = null;
1560 _consumerConnection = null;
1561 _producer = null;
1562 _consumer = null;
1563 _pingDestinations = null;
1564 _replyDestination = null;
1565 }
1566 }
1567
1568 /**
1569 * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a
1570 * transactional controlSession, this method does nothing (unless the failover after send flag is set).
1571 *
1572 * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is
1573 * applied. This flag applies whether the pinger is transactional or not.
1574 *
1575 * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit
1576 * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the
1577 * commit is applied. These flags will only apply if using a transactional pinger.
1578 *
1579 * @param session The controlSession to commit
1580 *
1581 * @return <tt>true</tt> if the controlSession was committed, <tt>false</tt> if it was not.
1582 *
1583 * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
1584 *
1585 * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
1586 * method, because commits only apply to transactional pingers, but fail after send applied to transactional and
1587 * non-transactional alike.
1588 */
1589 protected boolean commitTx(Session session) throws JMSException
1590 {
1591 // log.debug("protected void commitTx(Session session): called");
1592
1593 boolean committed = false;
1594
1595 _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend);
1596
1597 if (session.getTransacted())
1598 {
1599 // log.debug("Session is transacted.");
1600
1601 try
1602 {
1603 _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit);
1604
1605 long start = System.nanoTime();
1606 session.commit();
1607 committed = true;
1608 // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms");
1609
1610 _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit);
1611
1612 // log.debug("Session Commited.");
1613 }
1614 catch (JMSException e)
1615 {
1616 // log.debug("JMSException on commit:" + e.getMessage(), e);
1617
1618 try
1619 {
1620 session.rollback();
1621 // log.debug("Message rolled back.");
1622 }
1623 catch (JMSException jmse)
1624 {
1625 // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
1626
1627 // Both commit and rollback failed. Throw the rollback exception.
1628 throw jmse;
1629 }
1630 }
1631 }
1632
1633 return committed;
1634 }
1635
1636 /**
1637 * Outputs a prompt to the console and waits for the user to press return.
1638 *
1639 * @param prompt The prompt to display on the console.
1640 */
1641 public void waitForUser(String prompt)
1642 {
1643 System.out.println(prompt);
1644
1645 try
1646 {
1647 System.in.read();
1648 }
1649 catch (IOException e)
1650 {
1651 // Ignored.
1652 }
1653
1654 System.out.println("Continuing.");
1655 }
1656
1657 /**
1658 * Gets the number of consumers that are listening to each destination in the test.
1659 *
1660 * @return int The number of consumers subscribing to each topic.
1661 */
1662 public int getConsumersPerDestination()
1663 {
1664 return _noOfConsumers;
1665 }
1666
1667 /**
1668 * Calculates how many pings are expected to be received for the given number sent.
1669 *
1670 * @param numpings The number of pings that will be sent.
1671 *
1672 * @return The number that should be received, for the test to pass.
1673 */
1674 public int getExpectedNumPings(int numpings)
1675 {
1676 // Wow, I'm freaking sorry about this return here...
1677 return ((_failAfterSend || _failBeforeCommit) ? numpings - 1: numpings) *
1678 (_isPubSub ? getConsumersPerDestination() : 1);
1679 }
1680
1681 /**
1682 * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link
1683 * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link
1684 * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of
1685 * messages with that correlation id.
1686 *
1687 * <p/>Provided only one pinger is producing messages with that correlation id, the chained listener will always be
1688 * given unique message counts. It will always be called while the producer waiting for all messages to arrive is
1689 * still blocked.
1690 */
1691 public static interface ChainedMessageListener
1692 {
1693 /**
1694 * Notifies interested listeners about message arrival and important test stats, the number of messages
1695 * remaining in the test, and the messages send timestamp.
1696 *
1697 * @param message The newly arrived message.
1698 * @param remainingCount The number of messages left to complete the test.
1699 * @param latency The nanosecond latency of the message.
1700 *
1701 * @throws JMSException Any JMS exceptions is allowed to fall through.
1702 */
1703 public void onMessage(Message message, int remainingCount, long latency) throws JMSException;
1704 }
1705
1706 /**
1707 * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be
1708 * added to this: read/write lock to make onMessage more concurrent as described in class header comment.
1709 */
1710 protected static class PerCorrelationId
1711 {
1712 /** Holds a countdown on number of expected messages. */
1713 CountDownLatch trafficLight;
1714
1715 /** Holds the last timestamp that the timeout was reset to. */
1716 Long timeOutStart;
1717 }
1718 }
|