PingPongBouncer.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.requestreply;
022 
023 import java.io.IOException;
024 import java.net.InetAddress;
025 import java.text.SimpleDateFormat;
026 import java.util.Date;
027 
028 import javax.jms.*;
029 
030 import org.apache.log4j.Logger;
031 
032 import org.apache.qpid.client.AMQConnection;
033 import org.apache.qpid.client.AMQQueue;
034 import org.apache.qpid.client.AMQTopic;
035 import org.apache.qpid.jms.ConnectionListener;
036 import org.apache.qpid.jms.Session;
037 import org.apache.qpid.topic.Config;
038 import org.apache.qpid.exchange.ExchangeDefaults;
039 
040 /**
041  * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return
042  * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes
043  * too.
044  *
045  <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages
046  * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique
047  * temporary queue or the correlation id to correlate the original message to the reply.
048  *
049  <p/>There is a verbose mode flag which causes information about each ping to be output to the console
050  * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
051  * be disabled for real timing tests as writing to the console will slow things down.
052  *
053  <p><table id="crc"><caption>CRC Card</caption>
054  <tr><th> Responsibilities <th> Collaborations
055  <tr><td> Bounce back messages to their reply to destination.
056  <tr><td> Provide command line invocation to start the bounce back on a configurable broker url.
057  </table>
058  *
059  * @todo Replace the command line parsing with a neater tool.
060  *
061  * @todo Make verbose accept a number of messages, only prints to console every X messages.
062  */
063 public class PingPongBouncer implements MessageListener
064 {
065     private static final Logger _logger = Logger.getLogger(PingPongBouncer.class);
066 
067     /** The default prefetch size for the message consumer. */
068     private static final int PREFETCH = 1;
069 
070     /** The default no local flag for the message consumer. */
071     private static final boolean NO_LOCAL = true;
072 
073     private static final String DEFAULT_DESTINATION_NAME = "ping";
074 
075     /** The default exclusive flag for the message consumer. */
076     private static final boolean EXCLUSIVE = false;
077 
078     /** A convenient formatter to use when time stamping output. */
079     protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
080 
081     /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
082     private boolean _verbose = false;
083 
084     /** Determines whether this bounce back client bounces back messages persistently. */
085     private boolean _persistent = false;
086 
087     private Destination _consumerDestination;
088 
089     /** Keeps track of the response destination of the previous message for the last reply to producer cache. */
090     private Destination _lastResponseDest;
091 
092     /** The producer for sending replies with. */
093     private MessageProducer _replyProducer;
094 
095     /** The consumer controlSession. */
096     private Session _consumerSession;
097 
098     /** The producer controlSession. */
099     private Session _producerSession;
100 
101     /** Holds the connection to the broker. */
102     private AMQConnection _connection;
103 
104     /** Flag used to indicate if this is a point to point or pub/sub ping client. */
105     private boolean _isPubSub = false;
106 
107     /**
108      * This flag is used to indicate that the user should be prompted to kill a broker, in order to test
109      * failover, immediately before committing a transaction.
110      */
111     protected boolean _failBeforeCommit = false;
112 
113     /**
114      * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test
115      * failover, immediate after committing a transaction.
116      */
117     protected boolean _failAfterCommit = false;
118 
119     /**
120      * Creates a PingPongBouncer on the specified producer and consumer sessions.
121      *
122      @param brokerDetails The addresses of the brokers to connect to.
123      @param username        The broker username.
124      @param password        The broker password.
125      @param virtualpath     The virtual host name within the broker.
126      @param destinationName The name of the queue to receive pings on
127      *                        (or root of the queue name where many queues are generated).
128      @param persistent      A flag to indicate that persistent message should be used.
129      @param transacted      A flag to indicate that pings should be sent within transactions.
130      @param selector        A message selector to filter received pings with.
131      @param verbose         A flag to indicate that message timings should be sent to the console.
132      *
133      @throws Exception All underlying exceptions allowed to fall through. This is only test code...
134      */
135     public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath,
136                            String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose,
137                            boolean pubsubthrows Exception
138     {
139         // Create a client id to uniquely identify this client.
140         InetAddress address = InetAddress.getLocalHost();
141         String clientId = address.getHostName() + System.currentTimeMillis();
142         _verbose = verbose;
143         _persistent = persistent;
144         setPubSub(pubsub);
145         // Connect to the broker.
146         setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath));
147         _logger.info("Connected with URL:" + getConnection().toURL());
148 
149         // Set up the failover notifier.
150         getConnection().setConnectionListener(new FailoverNotifier());
151 
152         // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the
153         // command line option.
154         _consumerSession = (SessiongetConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
155         _producerSession = (SessiongetConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
156 
157         // Create the queue to listen for message on.
158         createConsumerDestination(destinationName);
159         MessageConsumer consumer =
160             _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
161 
162         // Create a producer for the replies, without a default destination.
163         _replyProducer = _producerSession.createProducer(null);
164         _replyProducer.setDisableMessageTimestamp(true);
165         _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
166 
167         // Set this up to listen for messages on the queue.
168         consumer.setMessageListener(this);
169     }
170 
171     /**
172      * Starts a stand alone ping-pong client running in verbose mode.
173      *
174      @param args
175      */
176     public static void main(String[] argsthrows Exception
177     {
178         System.out.println("Starting...");
179 
180         // Display help on the command line.
181         if (args.length == 0)
182         {
183             _logger.info("Running test with default values...");
184             //usage();
185             //System.exit(0);
186         }
187 
188         // Extract all command line parameters.
189         Config config = new Config();
190         config.setOptions(args);
191         String brokerDetails = config.getHost() ":" + config.getPort();
192         String virtualpath = "test";
193         String destinationName = config.getDestination();
194         if (destinationName == null)
195         {
196             destinationName = DEFAULT_DESTINATION_NAME;
197         }
198 
199         String selector = config.getSelector();
200         boolean transacted = config.isTransacted();
201         boolean persistent = config.usePersistentMessages();
202         boolean pubsub = config.isPubSub();
203         boolean verbose = true;
204 
205         //String selector = null;
206 
207         // Instantiate the ping pong client with the command line options and start it running.
208         PingPongBouncer pingBouncer =
209             new PingPongBouncer(brokerDetails, "guest""guest", virtualpath, destinationName, persistent, transacted,
210                                 selector, verbose, pubsub);
211         pingBouncer.getConnection().start();
212 
213         System.out.println("Waiting...");
214     }
215 
216     private static void usage()
217     {
218         System.err.println("Usage: PingPongBouncer \n" "-host : broker host\n" "-port : broker port\n"
219                            "-destinationname : queue/topic name\n" "-transacted : (true/false). Default is false\n"
220                            "-persistent : (true/false). Default is false\n"
221                            "-pubsub     : (true/false). Default is false\n" "-selector   : selector string\n");
222     }
223 
224     /**
225      * This is a callback method that is notified of all messages for which this has been registered as a message
226      * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to
227      * destination of the message.
228      *
229      @param message The message that triggered this callback.
230      */
231     public void onMessage(Message message)
232     {
233         try
234         {
235             String messageCorrelationId = message.getJMSCorrelationID();
236             if (_verbose)
237             {
238                 _logger.info(timestampFormatter.format(new Date()) ": Got ping with correlation id, "
239                              + messageCorrelationId);
240             }
241 
242             // Get the reply to destination from the message and check it is set.
243             Destination responseDest = message.getJMSReplyTo();
244 
245             if (responseDest == null)
246             {
247                 _logger.debug("Cannot send reply because reply-to destination is null.");
248 
249                 return;
250             }
251 
252             // Spew out some timing information if verbose mode is on.
253             if (_verbose)
254             {
255                 Long timestamp = message.getLongProperty("timestamp");
256 
257                 if (timestamp != null)
258                 {
259                     long diff = System.currentTimeMillis() - timestamp;
260                     _logger.info("Time to bounce point: " + diff);
261                 }
262             }
263 
264             // Correlate the reply to the original.
265             message.setJMSCorrelationID(messageCorrelationId);
266 
267             // Send the receieved message as the pong reply.
268             _replyProducer.send(responseDest, message);
269 
270             if (_verbose)
271             {
272                 _logger.info(timestampFormatter.format(new Date()) ": Sent reply with correlation id, "
273                              + messageCorrelationId);
274             }
275 
276             // Commit the transaction if running in transactional mode.
277             commitTx(_producerSession);
278         }
279         catch (JMSException e)
280         {
281             _logger.debug("There was a JMSException: " + e.getMessage(), e);
282         }
283     }
284 
285     /**
286      * Gets the underlying connection that this ping client is running on.
287      *
288      @return The underlying connection that this ping client is running on.
289      */
290     public AMQConnection getConnection()
291     {
292         return _connection;
293     }
294 
295     /**
296      * Sets the connection that this ping client is using.
297      *
298      @param connection The ping connection.
299      */
300     public void setConnection(AMQConnection connection)
301     {
302         this._connection = connection;
303     }
304 
305     /**
306      * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic.
307      *
308      @param pubsub <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
309      */
310     public void setPubSub(boolean pubsub)
311     {
312         _isPubSub = pubsub;
313     }
314 
315     /**
316      * Checks whether this client is a p2p or pub/sub ping client.
317      *
318      @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
319      */
320     public boolean isPubSub()
321     {
322         return _isPubSub;
323     }
324 
325     /**
326      * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not
327      * a transactional controlSession, this method does nothing.
328      *
329      <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
330      * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
331      * after the commit is applied.
332      *
333      @throws javax.jms.JMSException If the commit fails and then the rollback fails.
334      */
335     protected void commitTx(Session sessionthrows JMSException
336     {
337         if (session.getTransacted())
338         {
339             try
340             {
341                 if (_failBeforeCommit)
342                 {
343                     _logger.debug("Failing Before Commit");
344                     doFailover();
345                 }
346 
347                 session.commit();
348 
349                 if (_failAfterCommit)
350                 {
351                     _logger.debug("Failing After Commit");
352                     doFailover();
353                 }
354 
355                 _logger.debug("Session Commited.");
356             }
357             catch (JMSException e)
358             {
359                 _logger.trace("JMSException on commit:" + e.getMessage(), e);
360 
361                 try
362                 {
363                     session.rollback();
364                     _logger.debug("Message rolled back.");
365                 }
366                 catch (JMSException jmse)
367                 {
368                     _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
369 
370                     // Both commit and rollback failed. Throw the rollback exception.
371                     throw jmse;
372                 }
373             }
374         }
375     }
376 
377     /**
378      * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block
379      * until the user supplied some input on the terminal.
380      *
381      @param broker The name of the broker to terminate.
382      */
383     protected void doFailover(String broker)
384     {
385         System.out.println("Kill Broker " + broker + " now.");
386         try
387         {
388             System.in.read();
389         }
390         catch (IOException e)
391         { }
392 
393         System.out.println("Continuing.");
394     }
395 
396     /**
397      * Prompts the user to terminate the broker, in order to test failover functionality. This method will block
398      * until the user supplied some input on the terminal.
399      */
400     protected void doFailover()
401     {
402         System.out.println("Kill Broker now.");
403         try
404         {
405             System.in.read();
406         }
407         catch (IOException e)
408         { }
409 
410         System.out.println("Continuing.");
411 
412     }
413 
414     private void createConsumerDestination(String name)
415     {
416         if (isPubSub())
417         {
418             _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name);
419         }
420         else
421         {
422             _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name);
423         }
424     }
425 
426     /**
427      * A connection listener that logs out any failover complete events. Could do more interesting things with this
428      * at some point...
429      */
430     public static class FailoverNotifier implements ConnectionListener
431     {
432         public void bytesSent(long count)
433         { }
434 
435         public void bytesReceived(long count)
436         { }
437 
438         public boolean preFailover(boolean redirect)
439         {
440             return true;
441         }
442 
443         public boolean preResubscribe()
444         {
445             return true;
446         }
447 
448         public void failoverComplete()
449         {
450             _logger.info("App got failover complete callback.");
451         }
452     }
453 }