001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.sustained;
022
023 import org.apache.log4j.Logger;
024
025 import org.apache.qpid.client.AMQNoConsumersException;
026 import org.apache.qpid.client.AMQNoRouteException;
027 import org.apache.qpid.test.framework.distributedtesting.TestClient;
028 import org.apache.qpid.interop.clienttestcases.TestCase3BasicPubSub;
029 import org.apache.qpid.test.framework.TestUtils;
030
031 import javax.jms.Connection;
032 import javax.jms.Destination;
033 import javax.jms.ExceptionListener;
034 import javax.jms.JMSException;
035 import javax.jms.Message;
036 import javax.jms.MessageConsumer;
037 import javax.jms.MessageListener;
038 import javax.jms.MessageProducer;
039 import javax.jms.Session;
040 import javax.jms.TextMessage;
041
042 import java.util.HashMap;
043 import java.util.Iterator;
044 import java.util.Map;
045 import java.util.concurrent.CountDownLatch;
046
047 /**
048 * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
049 * default topic exchange, using the specified number of receivers connections. Produces reports on the actual number of
050 * messages sent/received.
051 *
052 * <p><table id="crc"><caption>CRC Card</caption>
053 * <tr><th> Responsibilities <th> Collaborations
054 * <tr><td> Supply the name of the test case that this implements.
055 * <tr><td> Accept/Reject invites based on test parameters.
056 * <tr><td> Adapt to assigned roles.
057 * <tr><td> Send required number of test messages using pub/sub. <tr><td> Generate test reports.
058 * </table>
059 */
060 public class SustainedClientTestCase extends TestCase3BasicPubSub implements ExceptionListener, MessageListener
061 {
062 /** Used for debugging. */
063 private static final Logger log = Logger.getLogger(SustainedClientTestCase.class);
064
065 /** Used to log to the console. */
066 private static final Logger console = Logger.getLogger("SustainedTest");
067
068 /** The role to be played by the test. */
069 private Roles role;
070
071 /** The number of receivers connection to use. */
072 private int numReceivers;
073
074 /** The routing key to send them to on the default direct exchange. */
075 private Destination sendDestination;
076
077 /** The routing key to send updates to on the default direct exchange. */
078 private Destination sendUpdateDestination;
079
080 /** The connections to send/receive the test messages on. */
081 private Connection[] connection;
082
083 /** The sessions to send/receive the test messages on. */
084 private Session[] session;
085
086 /** The producer to send the test messages with. */
087 MessageProducer producer;
088
089 /** Adapter that adjusts the send rate based on the updates from clients. */
090 SustainedRateAdapter _rateAdapter;
091
092 /** */
093 int _batchSize;
094
095 private static final long TEN_MILLI_SEC = 10000000;
096 private static final int DEBUG_LOG_UPATE_INTERVAL = 10;
097 private static final int LOG_UPATE_INTERVAL = 10;
098 private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage");
099
100 /**
101 * Should provide the name of the test case that this class implements. The exact names are defined in the interop
102 * testing spec.
103 *
104 * @return The name of the test case that this implements.
105 */
106 public String getName()
107 {
108 log.debug("public String getName(): called");
109
110 return "Perf_SustainedPubSub";
111 }
112
113 /**
114 * Assigns the role to be played by this test case. The test parameters are fully specified in the assignment
115 * message. When this method return the test case will be ready to execute.
116 *
117 * @param role The role to be played; sender or receivers.
118 * @param assignRoleMessage The role assingment message, contains the full test parameters.
119 *
120 * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
121 */
122 public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
123 {
124 log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
125 + "): called");
126
127 // Take note of the role to be played.
128 this.role = role;
129
130 // Extract and retain the test parameters.
131 numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS");
132 _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL");
133 String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
134 String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
135 int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
136 String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME");
137
138 if (log.isDebugEnabled())
139 {
140 log.debug("numReceivers = " + numReceivers);
141 log.debug("_batchSize = " + _batchSize);
142 log.debug("ackMode = " + ackMode);
143 log.debug("sendKey = " + sendKey);
144 log.debug("sendUpdateKey = " + sendUpdateKey);
145 log.debug("role = " + role);
146 }
147
148 switch (role)
149 {
150 // Check if the sender role is being assigned, and set up a single message producer if so.
151 case SENDER:
152 console.info("Creating Sender");
153 // Create a new connection to pass the test messages on.
154 connection = new Connection[1];
155 session = new Session[1];
156
157 connection[0] = TestUtils.createConnection(TestClient.testContextProperties);
158 session[0] = connection[0].createSession(false, ackMode);
159
160 // Extract and retain the test parameters.
161 sendDestination = session[0].createTopic(sendKey);
162
163 connection[0].setExceptionListener(this);
164
165 producer = session[0].createProducer(sendDestination);
166
167 sendUpdateDestination = session[0].createTopic(sendUpdateKey);
168 MessageConsumer updateConsumer = session[0].createConsumer(sendUpdateDestination);
169
170 _rateAdapter = new SustainedRateAdapter(this);
171 updateConsumer.setMessageListener(_rateAdapter);
172
173 break;
174
175 // Otherwise the receivers role is being assigned, so set this up to listen for messages on the required number
176 // of receivers connections.
177 case RECEIVER:
178 console.info("Creating Receiver");
179 // Create the required number of receivers connections.
180 connection = new Connection[numReceivers];
181 session = new Session[numReceivers];
182
183 for (int i = 0; i < numReceivers; i++)
184 {
185 connection[i] = TestUtils.createConnection(TestClient.testContextProperties);
186 session[i] = connection[i].createSession(false, ackMode);
187
188 sendDestination = session[i].createTopic(sendKey);
189
190 sendUpdateDestination = session[i].createTopic(sendUpdateKey);
191
192 MessageConsumer consumer = session[i].createConsumer(sendDestination);
193
194 consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i],
195 sendUpdateDestination));
196 }
197
198 break;
199 }
200
201 // Start all the connection dispatcher threads running.
202 for (int i = 0; i < connection.length; i++)
203 {
204 connection[i].start();
205 }
206 }
207
208 /** Performs the test case actions.
209 * @param numMessages*/
210 public void start(int numMessages) throws JMSException
211 {
212 log.debug("public void start(): called");
213
214 // Check that the sender role is being performed.
215 switch (role)
216 {
217 // Check if the sender role is being assigned, and set up a single message producer if so.
218 case SENDER:
219 _rateAdapter.run();
220 break;
221 case RECEIVER:
222
223 }
224
225 // return from here when you have finished the test.. this will signal the controller and
226 }
227
228 public void terminate() throws JMSException, InterruptedException
229 {
230 if (_rateAdapter != null)
231 {
232 _rateAdapter.stop();
233 }
234 }
235
236 /**
237 * Gets a report on the actions performed by the test case in its assigned role.
238 *
239 * @param session The controlSession to create the report message in.
240 *
241 * @return The report message.
242 *
243 * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through.
244 */
245 public Message getReport(Session session) throws JMSException
246 {
247 log.debug("public Message getReport(Session controlSession): called");
248
249 // Close the test connections.
250 for (int i = 0; i < connection.length; i++)
251 {
252 connection[i].close();
253 }
254
255 Message report = session.createMessage();
256 report.setStringProperty("CONTROL_TYPE", "REPORT");
257
258 return report;
259 }
260
261 public void onException(JMSException jmsException)
262 {
263 Exception linked = jmsException.getLinkedException();
264
265 if (linked != null)
266 {
267 if (log.isDebugEnabled())
268 {
269 log.debug("Linked Exception:" + linked);
270 }
271
272 if ((linked instanceof AMQNoRouteException) || (linked instanceof AMQNoConsumersException))
273 {
274 if (log.isDebugEnabled())
275 {
276 if (linked instanceof AMQNoConsumersException)
277 {
278 log.warn("No clients currently available for message:"
279 + ((AMQNoConsumersException) linked).getUndeliveredMessage());
280 }
281 else
282 {
283 log.warn("No route for message");
284 }
285 }
286
287 // Tell the rate adapter that there are no clients ready yet
288 _rateAdapter.NO_CLIENTS = true;
289 }
290 }
291 else
292 {
293 log.warn("Exception:" + linked);
294 }
295 }
296
297 /**
298 * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and
299 * 'end' messages.
300 */
301 class SustainedListener implements MessageListener
302 {
303 /** Number of messages received */
304 private long _received = 0;
305 /** The number of messages in the batch */
306 private int _batchSize = 0;
307 /** Record of the when the 'start' messagse was sen */
308 private Long _startTime;
309 /** Message producer to use to send reports */
310 MessageProducer _updater;
311 /** Session to create the report message on */
312 Session _session;
313 /** Record of the client ID used for this SustainedListnener */
314 String _client;
315
316 /**
317 * Main Constructor
318 *
319 * @param clientname The _client id used to identify this connection.
320 * @param batchSize The number of messages that are to be sent per batch. Note: This is not used to
321 * control the interval between sending reports.
322 * @param session The controlSession used for communication.
323 * @param sendDestination The destination that update reports should be sent to.
324 *
325 * @throws JMSException My occur if creatingthe Producer fails
326 */
327 public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination)
328 throws JMSException
329 {
330 _batchSize = batchSize;
331 _client = clientname;
332 _session = session;
333 _updater = session.createProducer(sendDestination);
334 }
335
336 public void onMessage(Message message)
337 {
338 if (log.isDebugEnabled())
339 {
340 log.debug("Message " + _received + "received in listener");
341 }
342
343 if (message instanceof TextMessage)
344 {
345 try
346 {
347 _received++;
348 if (((TextMessage) message).getText().equals("start"))
349 {
350 log.debug("Starting Batch");
351 _startTime = System.nanoTime();
352 }
353 else if (((TextMessage) message).getText().equals("end"))
354 {
355 if (_startTime != null)
356 {
357 long currentTime = System.nanoTime();
358 sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH"));
359 log.debug("End Batch");
360 }
361 }
362 }
363 catch (JMSException e)
364 {
365 // ignore error
366 }
367 }
368
369 }
370
371 /**
372 * sendStatus creates and sends the report back to the publisher
373 *
374 * @param time taken for the the last batch
375 * @param received Total number of messages received.
376 * @param batchNumber the batch number
377 * @throws JMSException if an error occurs during the send
378 */
379 private void sendStatus(long time, long received, int batchNumber) throws JMSException
380 {
381 Message updateMessage = _session.createTextMessage("update");
382 updateMessage.setStringProperty("CLIENT_ID", ":" + _client);
383 updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
384 updateMessage.setLongProperty("RECEIVED", received);
385 updateMessage.setIntProperty("BATCH", batchNumber);
386 updateMessage.setLongProperty("DURATION", time);
387
388 if (log.isInfoEnabled())
389 {
390 log.info("**** SENDING [" + batchNumber + "]**** " + "CLIENT_ID:" + _client + " RECEIVED:" + received
391 + " BATCH:" + batchNumber + " DURATION:" + time);
392 }
393
394 // Output on the main console.info the details of this batch
395 if ((batchNumber % 10) == 0)
396 {
397 console.info("Sending Report [" + batchNumber + "] " + "CLIENT_ID:" + _client + " RECEIVED:" + received
398 + " BATCH:" + batchNumber + " DURATION:" + time);
399 }
400
401 _updater.send(updateMessage);
402 }
403 }
404
405 /**
406 * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second
407 * that are sent through the test system.
408 *
409 * By keeping a record of the messages recevied and the average time taken to process the batch size can be
410 * calculated and so the delay can be adjusted to maintain that rate.
411 *
412 * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no
413 * messages in the batch. Otherwise the delay is used at the end of the batch.
414 */
415 class SustainedRateAdapter implements MessageListener, Runnable
416 {
417 private SustainedClientTestCase _client;
418 private long _batchVariance = Integer.getInteger("batchVariance", 3); // no. batches to allow drifting
419 private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms)
420 private volatile long _delay; // in nanos
421 private long _sent;
422 private Map<String, Long> _slowClients = new HashMap<String, Long>();
423 private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms
424 private static final long NO_CLIENT_SLEEP = 1000; // 1s
425 private volatile boolean NO_CLIENTS = true;
426 private int _delayShifting;
427 private final int REPORTS_WITHOUT_CHANGE = Integer.getInteger("stableReportCount", 5);
428 private boolean _warmedup = false;
429 private static final long EXPECTED_TIME_PER_BATCH = 100000L;
430 private int _warmUpBatches = Integer.getInteger("warmUpBatches", 10);
431
432 SustainedRateAdapter(SustainedClientTestCase client)
433 {
434 _client = client;
435 }
436
437 public void onMessage(Message message)
438 {
439 if (log.isDebugEnabled())
440 {
441 log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called");
442 }
443
444 try
445 {
446 String controlType = message.getStringProperty("CONTROL_TYPE");
447
448 // Check if the message is a test invite.
449 if ("UPDATE".equals(controlType))
450 {
451 NO_CLIENTS = false;
452 long duration = message.getLongProperty("DURATION");
453 long totalReceived = message.getLongProperty("RECEIVED");
454 String client = message.getStringProperty("CLIENT_ID");
455 int batchNumber = message.getIntProperty("BATCH");
456
457 if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0))
458 {
459 log.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " Recevied BATCH:"
460 + batchNumber + " DURATION:" + duration);
461 }
462
463 recordSlow(client, totalReceived, batchNumber);
464
465 adjustDelay(client, batchNumber, duration);
466
467 // Warm up completes when:
468 // we haven't warmed up
469 // and the number of batches sent to each client is at least half of the required warmup batches
470 if (!_warmedup && (batchNumber >= _warmUpBatches))
471 {
472 _warmedup = true;
473 _warmup.countDown();
474
475 }
476 }
477 }
478 catch (JMSException e)
479 {
480 //
481 }
482 }
483
484 CountDownLatch _warmup = new CountDownLatch(1);
485
486 int _numBatches = 10000;
487
488 // long[] _timings = new long[_numBatches];
489 private boolean _running = true;
490
491 public void run()
492 {
493 console.info("Warming up");
494
495 doBatch(_warmUpBatches);
496
497 try
498 {
499 // wait for warmup to complete.
500 _warmup.await();
501
502 // set delay to the average length of the batches
503 _delay = _totalDuration / _warmUpBatches / delays.size();
504
505 console.info("Warmup complete delay set : " + _delay + " based on _totalDuration: " + _totalDuration
506 + " over no. batches: " + _warmUpBatches + " with client count: " + delays.size());
507
508 _totalDuration = 0L;
509 _totalReceived = 0L;
510 _sent = 0L;
511 }
512 catch (InterruptedException e)
513 {
514 //
515 }
516
517 doBatch(_numBatches);
518
519 }
520
521 private void doBatch(int batchSize) // long[] timings,
522 {
523 TextMessage testMessage = null;
524 try
525 {
526 testMessage = _client.session[0].createTextMessage("start");
527
528 for (int batch = 0; batch <= batchSize; batch++)
529 // while (_running)
530 {
531 long start = System.nanoTime();
532
533 testMessage.setText("start");
534 testMessage.setIntProperty("BATCH", batch);
535
536 _client.producer.send(testMessage);
537 _rateAdapter.sentMessage();
538
539 testMessage.setText("test");
540 // start at 2 so start and end count as part of batch
541 for (int m = 2; m < _batchSize; m++)
542 {
543 _client.producer.send(testMessage);
544 _rateAdapter.sentMessage();
545 }
546
547 testMessage.setText("end");
548 _client.producer.send(testMessage);
549 _rateAdapter.sentMessage();
550
551 long end = System.nanoTime();
552
553 long sendtime = end - start;
554
555 if (log.isDebugEnabled())
556 {
557 log.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime); // timings[batch]);
558 }
559
560 if ((batch % LOG_UPATE_INTERVAL) == 0)
561 {
562 console.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status());
563 }
564
565 _rateAdapter.sleepBatch();
566
567 }
568 }
569 catch (JMSException e)
570 {
571 console.error("Runner ended");
572 }
573 }
574
575 private String status()
576 {
577 return " TotalDuration: " + _totalDuration + " for " + delays.size() + " consumers" + " Delay is " + _delay
578 + " resulting in "
579 + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch"));
580 }
581
582 private void sleepBatch()
583 {
584 if (checkForSlowClients())
585 { // if there werwe slow clients we have already slept so don't sleep anymore again.
586 return;
587 }
588
589 if (!SLEEP_PER_MESSAGE)
590 {
591 // per batch sleep.. if sleep is to small to spread over the batch.
592 if (_delay <= (TEN_MILLI_SEC * _batchSize))
593 {
594 sleepLong(_delay);
595 }
596 else
597 {
598 log.info("Not sleeping _delay > ten*batch is:" + _delay);
599 }
600 }
601 }
602
603 public void stop()
604 {
605 _running = false;
606 }
607
608 Map<String, Long> delays = new HashMap<String, Long>();
609 Long _totalReceived = 0L;
610 Long _totalDuration = 0L;
611 int _skipUpdate = 0;
612
613 /**
614 * Adjust the delay for sending messages based on this update from the client
615 *
616 * @param client The client that send this update
617 * @param duration The time taken for the last batch of messagse
618 * @param batchNumber The reported batchnumber from the client
619 */
620 private void adjustDelay(String client, int batchNumber, long duration)
621 {
622 // Retrieve the current total time taken for this client.
623 Long currentTime = delays.get(client);
624
625 // Add the new duration time to this client
626 if (currentTime == null)
627 {
628 currentTime = duration;
629 }
630 else
631 {
632 currentTime += duration;
633 }
634
635 delays.put(client, currentTime);
636
637 long batchesSent = _sent / _batchSize;
638
639 // ensure we don't divide by zero
640 if (batchesSent == 0)
641 {
642 batchesSent = 1L;
643 }
644
645 _totalReceived += _batchSize;
646 _totalDuration += duration;
647
648 // calculate average duration accross clients per batch
649 long averageDuration = _totalDuration / delays.size() / batchesSent;
650
651 // calculate the difference between current send delay and average report delay
652 long diff = (duration) - averageDuration;
653
654 if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0))
655 {
656 log.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers." + " on batch: "
657 + batchesSent + " received batch: " + batchNumber + " Batch Duration: " + duration + " Average: "
658 + averageDuration + " so diff: " + diff + " for : " + client + " Delay is " + _delay + " resulting in "
659 + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch")));
660 }
661
662 // if the averageDuration differs from the current by more than the specified variane then adjust delay.
663 if (Math.abs(diff) > _timeVariance)
664 {
665
666 // if the the _delay is larger than the required duration to send report
667 // speed up
668 if (diff > TEN_MILLI_SEC)
669 {
670 _delay -= TEN_MILLI_SEC;
671
672 if (_delay < 0)
673 {
674 _delay = 0;
675 log.info("Reset _delay to 0");
676 delayStable();
677 }
678 else
679 {
680 delayChanged();
681 }
682
683 }
684 else if (diff < 0) // diff < 0 diff cannot be 0 as it is > _timeVariance
685 {
686 // the report took longer
687 _delay += TEN_MILLI_SEC;
688 delayChanged();
689 }
690 }
691 else
692 {
693 delayStable();
694 }
695
696 // If we have a consumer that is behind with the batches.
697 if ((batchesSent - batchNumber) > _batchVariance)
698 {
699 log.debug("Increasing _delay as sending more than receiving");
700
701 _delay += 2 * TEN_MILLI_SEC;
702 delayChanged();
703 }
704
705 }
706
707 /** Reset the number of iterations before we say the delay has stabilised. */
708 private void delayChanged()
709 {
710 _delayShifting = REPORTS_WITHOUT_CHANGE;
711 }
712
713 /**
714 * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will
715 * output Delay stabilised
716 */
717 private void delayStable()
718 {
719 _delayShifting--;
720
721 if (_delayShifting < 0)
722 {
723 _delayShifting = 0;
724 console.debug("Delay stabilised:" + _delay);
725 }
726 }
727
728 /**
729 * Checks that the client has received enough messages. If the client has fallen behind then they are put in the
730 * _slowClients lists which will increase the delay.
731 *
732 * @param client The client identifier to check
733 * @param received the number of messages received by that client
734 * @param batchNumber
735 */
736 private void recordSlow(String client, long received, int batchNumber)
737 {
738 if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance)
739 {
740 _slowClients.put(client, received);
741 }
742 else
743 {
744 _slowClients.remove(client);
745 }
746 }
747
748 /** Incrment the number of sent messages and then sleep, if required. */
749 public void sentMessage()
750 {
751
752 _sent++;
753
754 if (_delay > (TEN_MILLI_SEC * _batchSize))
755 {
756 long batchDelay = _delay / _batchSize;
757 // less than 10ms sleep doesn't always work.
758 // _delay is in nano seconds
759 // if (batchDelay < (TEN_MILLI_SEC))
760 // {
761 // sleep(0, (int) batchDelay);
762 // }
763 // else
764 {
765 // if (batchDelay < 30000000000L)
766 {
767 sleepLong(batchDelay);
768 }
769 }
770 }
771 else
772 {
773 if (SLEEP_PER_MESSAGE && (_delay > 0))
774 {
775 sleepLong(_delay / _batchSize);
776 }
777 }
778 }
779
780 /**
781 * Check at the end of each batch and pause sending messages to allow slow clients to catch up.
782 *
783 * @return true if there were slow clients that caught up.
784 */
785 private boolean checkForSlowClients()
786 {
787 // This will allways be true as we are running this at the end of each batchSize
788 // if (_sent % _batchSize == 0)
789 {
790 // Cause test to pause when we have slow
791 if (!_slowClients.isEmpty() || NO_CLIENTS)
792 {
793
794 while (!_slowClients.isEmpty())
795 {
796 if (log.isInfoEnabled() && ((_sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL) == 0))
797 {
798 String clients = "";
799 Iterator it = _slowClients.keySet().iterator();
800 while (it.hasNext())
801 {
802 clients += it.next();
803 if (it.hasNext())
804 {
805 clients += ", ";
806 }
807 }
808
809 log.info("Pausing for slow clients:" + clients);
810 }
811
812 if (console.isDebugEnabled() && ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0))
813 {
814 console.debug(_slowClients.size() + " slow clients.");
815 }
816
817 sleep(PAUSE_SLEEP);
818 }
819
820 if (NO_CLIENTS)
821 {
822 sleep(NO_CLIENT_SLEEP);
823 }
824
825 log.debug("Continuing");
826
827 return true;
828 }
829 else
830 {
831 if ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0)
832 {
833 console.info("Total Delay :" + _delay + " "
834 + ((_delayShifting == 0) ? "Stablised" : ("Not Stablised(" + _delayShifting + ")")));
835 }
836 }
837
838 }
839
840 return false;
841 }
842
843 /**
844 * Sleep normally takes micro-seconds this allows the use of a nano-second value.
845 *
846 * @param delay nanoseconds to sleep for.
847 */
848 private void sleepLong(long delay)
849 {
850 sleep(delay / 1000000, (int) (delay % 1000000));
851 }
852
853 /**
854 * Sleep for the specified micro-seconds.
855 * @param sleep microseconds to sleep for.
856 */
857 private void sleep(long sleep)
858 {
859 sleep(sleep, 0);
860 }
861
862 /**
863 * Perform the sleep , swallowing any InteruptException.
864 *
865 * NOTE: If a sleep request is > 10s then reset only sleep for 5s
866 *
867 * @param milli to sleep for
868 * @param nano sub miliseconds to sleep for
869 */
870 private void sleep(long milli, int nano)
871 {
872 try
873 {
874 log.debug("Sleep:" + milli + ":" + nano);
875 if (milli > 10000)
876 {
877
878 if (_delay == milli)
879 {
880 _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH;
881 log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000)
882 + "s. Reset _totalDuration:" + _totalDuration);
883 }
884 else
885 {
886 log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000) + "s");
887 }
888
889 milli = 5000;
890 }
891
892 Thread.sleep(milli, nano);
893 }
894 catch (InterruptedException e)
895 {
896 //
897 }
898 }
899
900 public void setClient(SustainedClientTestCase client)
901 {
902 _client = client;
903 }
904 }
905
906 }
|