001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.requestreply;
022
023 import java.io.IOException;
024 import java.net.InetAddress;
025 import java.text.SimpleDateFormat;
026 import java.util.Date;
027
028 import javax.jms.*;
029
030 import org.apache.log4j.Logger;
031
032 import org.apache.qpid.client.AMQConnection;
033 import org.apache.qpid.client.AMQQueue;
034 import org.apache.qpid.client.AMQTopic;
035 import org.apache.qpid.jms.ConnectionListener;
036 import org.apache.qpid.jms.Session;
037 import org.apache.qpid.topic.Config;
038 import org.apache.qpid.exchange.ExchangeDefaults;
039
040 /**
041 * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return
042 * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes
043 * too.
044 *
045 * <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages
046 * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique
047 * temporary queue or the correlation id to correlate the original message to the reply.
048 *
049 * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
050 * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
051 * be disabled for real timing tests as writing to the console will slow things down.
052 *
053 * <p><table id="crc"><caption>CRC Card</caption>
054 * <tr><th> Responsibilities <th> Collaborations
055 * <tr><td> Bounce back messages to their reply to destination.
056 * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url.
057 * </table>
058 *
059 * @todo Replace the command line parsing with a neater tool.
060 *
061 * @todo Make verbose accept a number of messages, only prints to console every X messages.
062 */
063 public class PingPongBouncer implements MessageListener
064 {
065 private static final Logger _logger = Logger.getLogger(PingPongBouncer.class);
066
067 /** The default prefetch size for the message consumer. */
068 private static final int PREFETCH = 1;
069
070 /** The default no local flag for the message consumer. */
071 private static final boolean NO_LOCAL = true;
072
073 private static final String DEFAULT_DESTINATION_NAME = "ping";
074
075 /** The default exclusive flag for the message consumer. */
076 private static final boolean EXCLUSIVE = false;
077
078 /** A convenient formatter to use when time stamping output. */
079 protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
080
081 /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
082 private boolean _verbose = false;
083
084 /** Determines whether this bounce back client bounces back messages persistently. */
085 private boolean _persistent = false;
086
087 private Destination _consumerDestination;
088
089 /** Keeps track of the response destination of the previous message for the last reply to producer cache. */
090 private Destination _lastResponseDest;
091
092 /** The producer for sending replies with. */
093 private MessageProducer _replyProducer;
094
095 /** The consumer controlSession. */
096 private Session _consumerSession;
097
098 /** The producer controlSession. */
099 private Session _producerSession;
100
101 /** Holds the connection to the broker. */
102 private AMQConnection _connection;
103
104 /** Flag used to indicate if this is a point to point or pub/sub ping client. */
105 private boolean _isPubSub = false;
106
107 /**
108 * This flag is used to indicate that the user should be prompted to kill a broker, in order to test
109 * failover, immediately before committing a transaction.
110 */
111 protected boolean _failBeforeCommit = false;
112
113 /**
114 * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test
115 * failover, immediate after committing a transaction.
116 */
117 protected boolean _failAfterCommit = false;
118
119 /**
120 * Creates a PingPongBouncer on the specified producer and consumer sessions.
121 *
122 * @param brokerDetails The addresses of the brokers to connect to.
123 * @param username The broker username.
124 * @param password The broker password.
125 * @param virtualpath The virtual host name within the broker.
126 * @param destinationName The name of the queue to receive pings on
127 * (or root of the queue name where many queues are generated).
128 * @param persistent A flag to indicate that persistent message should be used.
129 * @param transacted A flag to indicate that pings should be sent within transactions.
130 * @param selector A message selector to filter received pings with.
131 * @param verbose A flag to indicate that message timings should be sent to the console.
132 *
133 * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
134 */
135 public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath,
136 String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose,
137 boolean pubsub) throws Exception
138 {
139 // Create a client id to uniquely identify this client.
140 InetAddress address = InetAddress.getLocalHost();
141 String clientId = address.getHostName() + System.currentTimeMillis();
142 _verbose = verbose;
143 _persistent = persistent;
144 setPubSub(pubsub);
145 // Connect to the broker.
146 setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath));
147 _logger.info("Connected with URL:" + getConnection().toURL());
148
149 // Set up the failover notifier.
150 getConnection().setConnectionListener(new FailoverNotifier());
151
152 // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the
153 // command line option.
154 _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
155 _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
156
157 // Create the queue to listen for message on.
158 createConsumerDestination(destinationName);
159 MessageConsumer consumer =
160 _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
161
162 // Create a producer for the replies, without a default destination.
163 _replyProducer = _producerSession.createProducer(null);
164 _replyProducer.setDisableMessageTimestamp(true);
165 _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
166
167 // Set this up to listen for messages on the queue.
168 consumer.setMessageListener(this);
169 }
170
171 /**
172 * Starts a stand alone ping-pong client running in verbose mode.
173 *
174 * @param args
175 */
176 public static void main(String[] args) throws Exception
177 {
178 System.out.println("Starting...");
179
180 // Display help on the command line.
181 if (args.length == 0)
182 {
183 _logger.info("Running test with default values...");
184 //usage();
185 //System.exit(0);
186 }
187
188 // Extract all command line parameters.
189 Config config = new Config();
190 config.setOptions(args);
191 String brokerDetails = config.getHost() + ":" + config.getPort();
192 String virtualpath = "test";
193 String destinationName = config.getDestination();
194 if (destinationName == null)
195 {
196 destinationName = DEFAULT_DESTINATION_NAME;
197 }
198
199 String selector = config.getSelector();
200 boolean transacted = config.isTransacted();
201 boolean persistent = config.usePersistentMessages();
202 boolean pubsub = config.isPubSub();
203 boolean verbose = true;
204
205 //String selector = null;
206
207 // Instantiate the ping pong client with the command line options and start it running.
208 PingPongBouncer pingBouncer =
209 new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted,
210 selector, verbose, pubsub);
211 pingBouncer.getConnection().start();
212
213 System.out.println("Waiting...");
214 }
215
216 private static void usage()
217 {
218 System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n"
219 + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n"
220 + "-persistent : (true/false). Default is false\n"
221 + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n");
222 }
223
224 /**
225 * This is a callback method that is notified of all messages for which this has been registered as a message
226 * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to
227 * destination of the message.
228 *
229 * @param message The message that triggered this callback.
230 */
231 public void onMessage(Message message)
232 {
233 try
234 {
235 String messageCorrelationId = message.getJMSCorrelationID();
236 if (_verbose)
237 {
238 _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, "
239 + messageCorrelationId);
240 }
241
242 // Get the reply to destination from the message and check it is set.
243 Destination responseDest = message.getJMSReplyTo();
244
245 if (responseDest == null)
246 {
247 _logger.debug("Cannot send reply because reply-to destination is null.");
248
249 return;
250 }
251
252 // Spew out some timing information if verbose mode is on.
253 if (_verbose)
254 {
255 Long timestamp = message.getLongProperty("timestamp");
256
257 if (timestamp != null)
258 {
259 long diff = System.currentTimeMillis() - timestamp;
260 _logger.info("Time to bounce point: " + diff);
261 }
262 }
263
264 // Correlate the reply to the original.
265 message.setJMSCorrelationID(messageCorrelationId);
266
267 // Send the receieved message as the pong reply.
268 _replyProducer.send(responseDest, message);
269
270 if (_verbose)
271 {
272 _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, "
273 + messageCorrelationId);
274 }
275
276 // Commit the transaction if running in transactional mode.
277 commitTx(_producerSession);
278 }
279 catch (JMSException e)
280 {
281 _logger.debug("There was a JMSException: " + e.getMessage(), e);
282 }
283 }
284
285 /**
286 * Gets the underlying connection that this ping client is running on.
287 *
288 * @return The underlying connection that this ping client is running on.
289 */
290 public AMQConnection getConnection()
291 {
292 return _connection;
293 }
294
295 /**
296 * Sets the connection that this ping client is using.
297 *
298 * @param connection The ping connection.
299 */
300 public void setConnection(AMQConnection connection)
301 {
302 this._connection = connection;
303 }
304
305 /**
306 * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic.
307 *
308 * @param pubsub <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
309 */
310 public void setPubSub(boolean pubsub)
311 {
312 _isPubSub = pubsub;
313 }
314
315 /**
316 * Checks whether this client is a p2p or pub/sub ping client.
317 *
318 * @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
319 */
320 public boolean isPubSub()
321 {
322 return _isPubSub;
323 }
324
325 /**
326 * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not
327 * a transactional controlSession, this method does nothing.
328 *
329 * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
330 * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
331 * after the commit is applied.
332 *
333 * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
334 */
335 protected void commitTx(Session session) throws JMSException
336 {
337 if (session.getTransacted())
338 {
339 try
340 {
341 if (_failBeforeCommit)
342 {
343 _logger.debug("Failing Before Commit");
344 doFailover();
345 }
346
347 session.commit();
348
349 if (_failAfterCommit)
350 {
351 _logger.debug("Failing After Commit");
352 doFailover();
353 }
354
355 _logger.debug("Session Commited.");
356 }
357 catch (JMSException e)
358 {
359 _logger.trace("JMSException on commit:" + e.getMessage(), e);
360
361 try
362 {
363 session.rollback();
364 _logger.debug("Message rolled back.");
365 }
366 catch (JMSException jmse)
367 {
368 _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
369
370 // Both commit and rollback failed. Throw the rollback exception.
371 throw jmse;
372 }
373 }
374 }
375 }
376
377 /**
378 * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block
379 * until the user supplied some input on the terminal.
380 *
381 * @param broker The name of the broker to terminate.
382 */
383 protected void doFailover(String broker)
384 {
385 System.out.println("Kill Broker " + broker + " now.");
386 try
387 {
388 System.in.read();
389 }
390 catch (IOException e)
391 { }
392
393 System.out.println("Continuing.");
394 }
395
396 /**
397 * Prompts the user to terminate the broker, in order to test failover functionality. This method will block
398 * until the user supplied some input on the terminal.
399 */
400 protected void doFailover()
401 {
402 System.out.println("Kill Broker now.");
403 try
404 {
405 System.in.read();
406 }
407 catch (IOException e)
408 { }
409
410 System.out.println("Continuing.");
411
412 }
413
414 private void createConsumerDestination(String name)
415 {
416 if (isPubSub())
417 {
418 _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name);
419 }
420 else
421 {
422 _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name);
423 }
424 }
425
426 /**
427 * A connection listener that logs out any failover complete events. Could do more interesting things with this
428 * at some point...
429 */
430 public static class FailoverNotifier implements ConnectionListener
431 {
432 public void bytesSent(long count)
433 { }
434
435 public void bytesReceived(long count)
436 { }
437
438 public boolean preFailover(boolean redirect)
439 {
440 return true;
441 }
442
443 public boolean preResubscribe()
444 {
445 return true;
446 }
447
448 public void failoverComplete()
449 {
450 _logger.info("App got failover complete callback.");
451 }
452 }
453 }
|