SustainedClientTestCase.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.sustained;
022 
023 import org.apache.log4j.Logger;
024 
025 import org.apache.qpid.client.AMQNoConsumersException;
026 import org.apache.qpid.client.AMQNoRouteException;
027 import org.apache.qpid.test.framework.distributedtesting.TestClient;
028 import org.apache.qpid.interop.clienttestcases.TestCase3BasicPubSub;
029 import org.apache.qpid.test.framework.TestUtils;
030 
031 import javax.jms.Connection;
032 import javax.jms.Destination;
033 import javax.jms.ExceptionListener;
034 import javax.jms.JMSException;
035 import javax.jms.Message;
036 import javax.jms.MessageConsumer;
037 import javax.jms.MessageListener;
038 import javax.jms.MessageProducer;
039 import javax.jms.Session;
040 import javax.jms.TextMessage;
041 
042 import java.util.HashMap;
043 import java.util.Iterator;
044 import java.util.Map;
045 import java.util.concurrent.CountDownLatch;
046 
047 /**
048  * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
049  * default topic exchange, using the specified number of receivers connections. Produces reports on the actual number of
050  * messages sent/received.
051  *
052  <p><table id="crc"><caption>CRC Card</caption>
053  <tr><th> Responsibilities <th> Collaborations
054  <tr><td> Supply the name of the test case that this implements.
055  <tr><td> Accept/Reject invites based on test parameters.
056  <tr><td> Adapt to assigned roles.
057  <tr><td> Send required number of test messages using pub/sub. <tr><td> Generate test reports.
058  </table>
059  */
060 public class SustainedClientTestCase extends TestCase3BasicPubSub implements ExceptionListener, MessageListener
061 {
062     /** Used for debugging. */
063     private static final Logger log = Logger.getLogger(SustainedClientTestCase.class);
064 
065     /** Used to log to the console. */
066     private static final Logger console = Logger.getLogger("SustainedTest");
067 
068     /** The role to be played by the test. */
069     private Roles role;
070 
071     /** The number of receivers connection to use. */
072     private int numReceivers;
073 
074     /** The routing key to send them to on the default direct exchange. */
075     private Destination sendDestination;
076 
077     /** The routing key to send updates to on the default direct exchange. */
078     private Destination sendUpdateDestination;
079 
080     /** The connections to send/receive the test messages on. */
081     private Connection[] connection;
082 
083     /** The sessions to send/receive the test messages on. */
084     private Session[] session;
085 
086     /** The producer to send the test messages with. */
087     MessageProducer producer;
088 
089     /** Adapter that adjusts the send rate based on the updates from clients. */
090     SustainedRateAdapter _rateAdapter;
091 
092     /**  */
093     int _batchSize;
094 
095     private static final long TEN_MILLI_SEC = 10000000;
096     private static final int DEBUG_LOG_UPATE_INTERVAL = 10;
097     private static final int LOG_UPATE_INTERVAL = 10;
098     private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage");
099 
100     /**
101      * Should provide the name of the test case that this class implements. The exact names are defined in the interop
102      * testing spec.
103      *
104      @return The name of the test case that this implements.
105      */
106     public String getName()
107     {
108         log.debug("public String getName(): called");
109 
110         return "Perf_SustainedPubSub";
111     }
112 
113     /**
114      * Assigns the role to be played by this test case. The test parameters are fully specified in the assignment
115      * message. When this method return the test case will be ready to execute.
116      *
117      @param role              The role to be played; sender or receivers.
118      @param assignRoleMessage The role assingment message, contains the full test parameters.
119      *
120      @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
121      */
122     public void assignRole(Roles role, Message assignRoleMessagethrows JMSException
123     {
124         log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
125             "): called");
126 
127         // Take note of the role to be played.
128         this.role = role;
129 
130         // Extract and retain the test parameters.
131         numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS");
132         _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL");
133         String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
134         String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
135         int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
136         String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME");
137 
138         if (log.isDebugEnabled())
139         {
140             log.debug("numReceivers = " + numReceivers);
141             log.debug("_batchSize = " + _batchSize);
142             log.debug("ackMode = " + ackMode);
143             log.debug("sendKey = " + sendKey);
144             log.debug("sendUpdateKey = " + sendUpdateKey);
145             log.debug("role = " + role);
146         }
147 
148         switch (role)
149         {
150         // Check if the sender role is being assigned, and set up a single message producer if so.
151         case SENDER:
152             console.info("Creating Sender");
153             // Create a new connection to pass the test messages on.
154             connection = new Connection[1];
155             session = new Session[1];
156 
157             connection[0= TestUtils.createConnection(TestClient.testContextProperties);
158             session[0= connection[0].createSession(false, ackMode);
159 
160             // Extract and retain the test parameters.
161             sendDestination = session[0].createTopic(sendKey);
162 
163             connection[0].setExceptionListener(this);
164 
165             producer = session[0].createProducer(sendDestination);
166 
167             sendUpdateDestination = session[0].createTopic(sendUpdateKey);
168             MessageConsumer updateConsumer = session[0].createConsumer(sendUpdateDestination);
169 
170             _rateAdapter = new SustainedRateAdapter(this);
171             updateConsumer.setMessageListener(_rateAdapter);
172 
173             break;
174 
175         // Otherwise the receivers role is being assigned, so set this up to listen for messages on the required number
176         // of receivers connections.
177         case RECEIVER:
178             console.info("Creating Receiver");
179             // Create the required number of receivers connections.
180             connection = new Connection[numReceivers];
181             session = new Session[numReceivers];
182 
183             for (int i = 0; i < numReceivers; i++)
184             {
185                 connection[i= TestUtils.createConnection(TestClient.testContextProperties);
186                 session[i= connection[i].createSession(false, ackMode);
187 
188                 sendDestination = session[i].createTopic(sendKey);
189 
190                 sendUpdateDestination = session[i].createTopic(sendUpdateKey);
191 
192                 MessageConsumer consumer = session[i].createConsumer(sendDestination);
193 
194                 consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i],
195                         sendUpdateDestination));
196             }
197 
198             break;
199         }
200 
201         // Start all the connection dispatcher threads running.
202         for (int i = 0; i < connection.length; i++)
203         {
204             connection[i].start();
205         }
206     }
207 
208     /** Performs the test case actions.
209      @param numMessages*/
210     public void start(int numMessagesthrows JMSException
211     {
212         log.debug("public void start(): called");
213 
214         // Check that the sender role is being performed.
215         switch (role)
216         {
217         // Check if the sender role is being assigned, and set up a single message producer if so.
218         case SENDER:
219             _rateAdapter.run();
220             break;
221         case RECEIVER:
222 
223         }
224 
225         // return from here when you have finished the test.. this will signal the controller and
226     }
227 
228     public void terminate() throws JMSException, InterruptedException
229     {
230         if (_rateAdapter != null)
231         {
232             _rateAdapter.stop();
233         }
234     }
235 
236     /**
237      * Gets a report on the actions performed by the test case in its assigned role.
238      *
239      @param session The controlSession to create the report message in.
240      *
241      @return The report message.
242      *
243      @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through.
244      */
245     public Message getReport(Session sessionthrows JMSException
246     {
247         log.debug("public Message getReport(Session controlSession): called");
248 
249         // Close the test connections.
250         for (int i = 0; i < connection.length; i++)
251         {
252             connection[i].close();
253         }
254 
255         Message report = session.createMessage();
256         report.setStringProperty("CONTROL_TYPE""REPORT");
257 
258         return report;
259     }
260 
261     public void onException(JMSException jmsException)
262     {
263         Exception linked = jmsException.getLinkedException();
264 
265         if (linked != null)
266         {
267             if (log.isDebugEnabled())
268             {
269                 log.debug("Linked Exception:" + linked);
270             }
271 
272             if ((linked instanceof AMQNoRouteException|| (linked instanceof AMQNoConsumersException))
273             {
274                 if (log.isDebugEnabled())
275                 {
276                     if (linked instanceof AMQNoConsumersException)
277                     {
278                         log.warn("No clients currently available for message:"
279                             ((AMQNoConsumersExceptionlinked).getUndeliveredMessage());
280                     }
281                     else
282                     {
283                         log.warn("No route for message");
284                     }
285                 }
286 
287                 // Tell the rate adapter that there are no clients ready yet
288                 _rateAdapter.NO_CLIENTS = true;
289             }
290         }
291         else
292         {
293             log.warn("Exception:" + linked);
294         }
295     }
296 
297     /**
298      * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and
299      * 'end' messages.
300      */
301     class SustainedListener implements MessageListener
302     {
303         /** Number of messages received */
304         private long _received = 0;
305         /** The number of messages in the batch */
306         private int _batchSize = 0;
307         /** Record of the when the 'start' messagse was sen */
308         private Long _startTime;
309         /** Message producer to use to send reports */
310         MessageProducer _updater;
311         /** Session to create the report message on */
312         Session _session;
313         /** Record of the client ID used for this SustainedListnener */
314         String _client;
315 
316         /**
317          * Main Constructor
318          *
319          @param clientname      The _client id used to identify this connection.
320          @param batchSize       The number of messages that are to be sent per batch. Note: This is not used to
321          *                        control the interval between sending reports.
322          @param session         The controlSession used for communication.
323          @param sendDestination The destination that update reports should be sent to.
324          *
325          @throws JMSException My occur if creatingthe Producer fails
326          */
327         public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination)
328             throws JMSException
329         {
330             _batchSize = batchSize;
331             _client = clientname;
332             _session = session;
333             _updater = session.createProducer(sendDestination);
334         }
335 
336         public void onMessage(Message message)
337         {
338             if (log.isDebugEnabled())
339             {
340                 log.debug("Message " + _received + "received in listener");
341             }
342 
343             if (message instanceof TextMessage)
344             {
345                 try
346                 {
347                     _received++;
348                     if (((TextMessagemessage).getText().equals("start"))
349                     {
350                         log.debug("Starting Batch");
351                         _startTime = System.nanoTime();
352                     }
353                     else if (((TextMessagemessage).getText().equals("end"))
354                     {
355                         if (_startTime != null)
356                         {
357                             long currentTime = System.nanoTime();
358                             sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH"));
359                             log.debug("End Batch");
360                         }
361                     }
362                 }
363                 catch (JMSException e)
364                 {
365                     // ignore error
366                 }
367             }
368 
369         }
370 
371         /**
372          * sendStatus creates and sends the report back to the publisher
373          *
374          @param time     taken for the the last batch
375          @param received Total number of messages received.
376          @param batchNumber the batch number
377          @throws JMSException if an error occurs during the send
378          */
379         private void sendStatus(long time, long received, int batchNumberthrows JMSException
380         {
381             Message updateMessage = _session.createTextMessage("update");
382             updateMessage.setStringProperty("CLIENT_ID"":" + _client);
383             updateMessage.setStringProperty("CONTROL_TYPE""UPDATE");
384             updateMessage.setLongProperty("RECEIVED", received);
385             updateMessage.setIntProperty("BATCH", batchNumber);
386             updateMessage.setLongProperty("DURATION", time);
387 
388             if (log.isInfoEnabled())
389             {
390                 log.info("**** SENDING [" + batchNumber + "]**** " "CLIENT_ID:" + _client + " RECEIVED:" + received
391                     " BATCH:" + batchNumber + " DURATION:" + time);
392             }
393 
394             // Output on the main console.info the details of this batch
395             if ((batchNumber % 10== 0)
396             {
397                 console.info("Sending Report [" + batchNumber + "] " "CLIENT_ID:" + _client + " RECEIVED:" + received
398                     " BATCH:" + batchNumber + " DURATION:" + time);
399             }
400 
401             _updater.send(updateMessage);
402         }
403     }
404 
405     /**
406      * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second
407      * that are sent through the test system.
408      *
409      * By keeping a record of the messages recevied and the average time taken to process the batch size can be
410      * calculated and so the delay can be adjusted to maintain that rate.
411      *
412      * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no
413      * messages in the batch. Otherwise the delay is used at the end of the batch.
414      */
415     class SustainedRateAdapter implements MessageListener, Runnable
416     {
417         private SustainedClientTestCase _client;
418         private long _batchVariance = Integer.getInteger("batchVariance"3)// no. batches to allow drifting
419         private long _timeVariance = TEN_MILLI_SEC * 5// no. nanos between send and report delay (10ms)
420         private volatile long _delay; // in nanos
421         private long _sent;
422         private Map<String, Long> _slowClients = new HashMap<String, Long>();
423         private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000// 10 ms
424         private static final long NO_CLIENT_SLEEP = 1000// 1s
425         private volatile boolean NO_CLIENTS = true;
426         private int _delayShifting;
427         private final int REPORTS_WITHOUT_CHANGE = Integer.getInteger("stableReportCount"5);
428         private boolean _warmedup = false;
429         private static final long EXPECTED_TIME_PER_BATCH = 100000L;
430         private int _warmUpBatches = Integer.getInteger("warmUpBatches"10);
431 
432         SustainedRateAdapter(SustainedClientTestCase client)
433         {
434             _client = client;
435         }
436 
437         public void onMessage(Message message)
438         {
439             if (log.isDebugEnabled())
440             {
441                 log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called");
442             }
443 
444             try
445             {
446                 String controlType = message.getStringProperty("CONTROL_TYPE");
447 
448                 // Check if the message is a test invite.
449                 if ("UPDATE".equals(controlType))
450                 {
451                     NO_CLIENTS = false;
452                     long duration = message.getLongProperty("DURATION");
453                     long totalReceived = message.getLongProperty("RECEIVED");
454                     String client = message.getStringProperty("CLIENT_ID");
455                     int batchNumber = message.getIntProperty("BATCH");
456 
457                     if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL== 0))
458                     {
459                         log.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " Recevied BATCH:"
460                             + batchNumber + " DURATION:" + duration);
461                     }
462 
463                     recordSlow(client, totalReceived, batchNumber);
464 
465                     adjustDelay(client, batchNumber, duration);
466 
467                     // Warm up completes when:
468                     // we haven't warmed up
469                     // and the number of batches sent to each client is at least half of the required warmup batches
470                     if (!_warmedup && (batchNumber >= _warmUpBatches))
471                     {
472                         _warmedup = true;
473                         _warmup.countDown();
474 
475                     }
476                 }
477             }
478             catch (JMSException e)
479             {
480                 //
481             }
482         }
483 
484         CountDownLatch _warmup = new CountDownLatch(1);
485 
486         int _numBatches = 10000;
487 
488         // long[] _timings = new long[_numBatches];
489         private boolean _running = true;
490 
491         public void run()
492         {
493             console.info("Warming up");
494 
495             doBatch(_warmUpBatches);
496 
497             try
498             {
499                 // wait for warmup to complete.
500                 _warmup.await();
501 
502                 // set delay to the average length of the batches
503                 _delay = _totalDuration / _warmUpBatches / delays.size();
504 
505                 console.info("Warmup complete delay set : " + _delay + " based on _totalDuration: " + _totalDuration
506                     " over no. batches: " + _warmUpBatches + " with client count: " + delays.size());
507 
508                 _totalDuration = 0L;
509                 _totalReceived = 0L;
510                 _sent = 0L;
511             }
512             catch (InterruptedException e)
513             {
514                 //
515             }
516 
517             doBatch(_numBatches);
518 
519         }
520 
521         private void doBatch(int batchSize// long[] timings,
522         {
523             TextMessage testMessage = null;
524             try
525             {
526                 testMessage = _client.session[0].createTextMessage("start");
527 
528                 for (int batch = 0; batch <= batchSize; batch++)
529                 // while (_running)
530                 {
531                     long start = System.nanoTime();
532 
533                     testMessage.setText("start");
534                     testMessage.setIntProperty("BATCH", batch);
535 
536                     _client.producer.send(testMessage);
537                     _rateAdapter.sentMessage();
538 
539                     testMessage.setText("test");
540                     // start at 2 so start and end count as part of batch
541                     for (int m = 2; m < _batchSize; m++)
542                     {
543                         _client.producer.send(testMessage);
544                         _rateAdapter.sentMessage();
545                     }
546 
547                     testMessage.setText("end");
548                     _client.producer.send(testMessage);
549                     _rateAdapter.sentMessage();
550 
551                     long end = System.nanoTime();
552 
553                     long sendtime = end - start;
554 
555                     if (log.isDebugEnabled())
556                     {
557                         log.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime)// timings[batch]);
558                     }
559 
560                     if ((batch % LOG_UPATE_INTERVAL== 0)
561                     {
562                         console.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status());
563                     }
564 
565                     _rateAdapter.sleepBatch();
566 
567                 }
568             }
569             catch (JMSException e)
570             {
571                 console.error("Runner ended");
572             }
573         }
574 
575         private String status()
576         {
577             return " TotalDuration: " + _totalDuration + " for " + delays.size() " consumers" " Delay is " + _delay
578                 " resulting in "
579                 ((_delay > (TEN_MILLI_SEC * _batchSize)) ((_delay / _batchSize"/msg"(_delay + "/batch"));
580         }
581 
582         private void sleepBatch()
583         {
584             if (checkForSlowClients())
585             // if there werwe slow clients we have already slept so don't sleep anymore again.
586                 return;
587             }
588 
589             if (!SLEEP_PER_MESSAGE)
590             {
591                 // per batch sleep.. if sleep is to small to spread over the batch.
592                 if (_delay <= (TEN_MILLI_SEC * _batchSize))
593                 {
594                     sleepLong(_delay);
595                 }
596                 else
597                 {
598                     log.info("Not sleeping _delay > ten*batch is:" + _delay);
599                 }
600             }
601         }
602 
603         public void stop()
604         {
605             _running = false;
606         }
607 
608         Map<String, Long> delays = new HashMap<String, Long>();
609         Long _totalReceived = 0L;
610         Long _totalDuration = 0L;
611         int _skipUpdate = 0;
612 
613         /**
614          * Adjust the delay for sending messages based on this update from the client
615          *
616          @param client        The client that send this update
617          @param duration      The time taken for the last batch of messagse
618          @param batchNumber   The reported batchnumber from the client
619          */
620         private void adjustDelay(String client, int batchNumber, long duration)
621         {
622             // Retrieve the current total time taken for this client.
623             Long currentTime = delays.get(client);
624 
625             // Add the new duration time to this client
626             if (currentTime == null)
627             {
628                 currentTime = duration;
629             }
630             else
631             {
632                 currentTime += duration;
633             }
634 
635             delays.put(client, currentTime);
636 
637             long batchesSent = _sent / _batchSize;
638 
639             // ensure we don't divide by zero
640             if (batchesSent == 0)
641             {
642                 batchesSent = 1L;
643             }
644 
645             _totalReceived += _batchSize;
646             _totalDuration += duration;
647 
648             // calculate average duration accross clients per batch
649             long averageDuration = _totalDuration / delays.size() / batchesSent;
650 
651             // calculate the difference between current send delay and average report delay
652             long diff = (duration- averageDuration;
653 
654             if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL== 0))
655             {
656                 log.info("TotalDuration:" + _totalDuration + " for " + delays.size() " consumers." " on batch: "
657                     + batchesSent + " received batch: " + batchNumber + " Batch Duration: " + duration + " Average: "
658                     + averageDuration + " so diff: " + diff + " for : " + client + " Delay is " + _delay + " resulting in "
659                     ((_delay > (TEN_MILLI_SEC * _batchSize)) ((_delay / _batchSize"/msg"(_delay + "/batch")));
660             }
661 
662             // if the averageDuration differs from the current by more than the specified variane then adjust delay.
663             if (Math.abs(diff> _timeVariance)
664             {
665 
666                 // if the the _delay is larger than the required duration to send report
667                 // speed up
668                 if (diff > TEN_MILLI_SEC)
669                 {
670                     _delay -= TEN_MILLI_SEC;
671 
672                     if (_delay < 0)
673                     {
674                         _delay = 0;
675                         log.info("Reset _delay to 0");
676                         delayStable();
677                     }
678                     else
679                     {
680                         delayChanged();
681                     }
682 
683                 }
684                 else if (diff < 0// diff < 0 diff cannot be 0 as it is > _timeVariance
685                 {
686                     // the report took longer
687                     _delay += TEN_MILLI_SEC;
688                     delayChanged();
689                 }
690             }
691             else
692             {
693                 delayStable();
694             }
695 
696             // If we have a consumer that is behind with the batches.
697             if ((batchesSent - batchNumber> _batchVariance)
698             {
699                 log.debug("Increasing _delay as sending more than receiving");
700 
701                 _delay += * TEN_MILLI_SEC;
702                 delayChanged();
703             }
704 
705         }
706 
707         /** Reset the number of iterations before we say the delay has stabilised. */
708         private void delayChanged()
709         {
710             _delayShifting = REPORTS_WITHOUT_CHANGE;
711         }
712 
713         /**
714          * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will
715          * output Delay stabilised
716          */
717         private void delayStable()
718         {
719             _delayShifting--;
720 
721             if (_delayShifting < 0)
722             {
723                 _delayShifting = 0;
724                 console.debug("Delay stabilised:" + _delay);
725             }
726         }
727 
728         /**
729          * Checks that the client has received enough messages. If the client has fallen behind then they are put in the
730          * _slowClients lists which will increase the delay.
731          *
732          @param client   The client identifier to check
733          @param received the number of messages received by that client
734          @param batchNumber
735          */
736         private void recordSlow(String client, long received, int batchNumber)
737         {
738             if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance)
739             {
740                 _slowClients.put(client, received);
741             }
742             else
743             {
744                 _slowClients.remove(client);
745             }
746         }
747 
748         /** Incrment the number of sent messages and then sleep, if required. */
749         public void sentMessage()
750         {
751 
752             _sent++;
753 
754             if (_delay > (TEN_MILLI_SEC * _batchSize))
755             {
756                 long batchDelay = _delay / _batchSize;
757                 // less than 10ms sleep doesn't always work.
758                 // _delay is in nano seconds
759                 // if (batchDelay < (TEN_MILLI_SEC))
760                 // {
761                 // sleep(0, (int) batchDelay);
762                 // }
763                 // else
764                 {
765                     // if (batchDelay < 30000000000L)
766                     {
767                         sleepLong(batchDelay);
768                     }
769                 }
770             }
771             else
772             {
773                 if (SLEEP_PER_MESSAGE && (_delay > 0))
774                 {
775                     sleepLong(_delay / _batchSize);
776                 }
777             }
778         }
779 
780         /**
781          * Check at the end of each batch and pause sending messages to allow slow clients to catch up.
782          *
783          @return true if there were slow clients that caught up.
784          */
785         private boolean checkForSlowClients()
786         {
787             // This will allways be true as we are running this at the end of each batchSize
788             // if (_sent % _batchSize == 0)
789             {
790                 // Cause test to pause when we have slow
791                 if (!_slowClients.isEmpty() || NO_CLIENTS)
792                 {
793 
794                     while (!_slowClients.isEmpty())
795                     {
796                         if (log.isInfoEnabled() && ((_sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL== 0))
797                         {
798                             String clients = "";
799                             Iterator it = _slowClients.keySet().iterator();
800                             while (it.hasNext())
801                             {
802                                 clients += it.next();
803                                 if (it.hasNext())
804                                 {
805                                     clients += ", ";
806                                 }
807                             }
808 
809                             log.info("Pausing for slow clients:" + clients);
810                         }
811 
812                         if (console.isDebugEnabled() && ((_sent / _batchSize % LOG_UPATE_INTERVAL== 0))
813                         {
814                             console.debug(_slowClients.size() " slow clients.");
815                         }
816 
817                         sleep(PAUSE_SLEEP);
818                     }
819 
820                     if (NO_CLIENTS)
821                     {
822                         sleep(NO_CLIENT_SLEEP);
823                     }
824 
825                     log.debug("Continuing");
826 
827                     return true;
828                 }
829                 else
830                 {
831                     if ((_sent / _batchSize % LOG_UPATE_INTERVAL== 0)
832                     {
833                         console.info("Total Delay :" + _delay + " "
834                             ((_delayShifting == 0"Stablised" ("Not Stablised(" + _delayShifting + ")")));
835                     }
836                 }
837 
838             }
839 
840             return false;
841         }
842 
843         /**
844          * Sleep normally takes micro-seconds this allows the use of a nano-second value.
845          *
846          @param delay nanoseconds to sleep for.
847          */
848         private void sleepLong(long delay)
849         {
850             sleep(delay / 1000000(int) (delay % 1000000));
851         }
852 
853         /**
854          * Sleep for the specified micro-seconds.
855          @param sleep microseconds to sleep for.
856          */
857         private void sleep(long sleep)
858         {
859             sleep(sleep, 0);
860         }
861 
862         /**
863          * Perform the sleep , swallowing any InteruptException.
864          *
865          * NOTE: If a sleep request is > 10s then reset only sleep for 5s
866          *
867          @param milli to sleep for
868          @param nano sub miliseconds to sleep for
869          */
870         private void sleep(long milli, int nano)
871         {
872             try
873             {
874                 log.debug("Sleep:" + milli + ":" + nano);
875                 if (milli > 10000)
876                 {
877 
878                     if (_delay == milli)
879                     {
880                         _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH;
881                         log.error("Sleeping for more than 10 seconds adjusted to 5s!:" (milli / 1000)
882                             "s. Reset _totalDuration:" + _totalDuration);
883                     }
884                     else
885                     {
886                         log.error("Sleeping for more than 10 seconds adjusted to 5s!:" (milli / 1000"s");
887                     }
888 
889                     milli = 5000;
890                 }
891 
892                 Thread.sleep(milli, nano);
893             }
894             catch (InterruptedException e)
895             {
896                 //
897             }
898         }
899 
900         public void setClient(SustainedClientTestCase client)
901         {
902             _client = client;
903         }
904     }
905 
906 }