001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.tools;
022
023 import static org.apache.qpid.tools.QpidBench.Mode.BOTH;
024 import static org.apache.qpid.tools.QpidBench.Mode.CONSUME;
025 import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH;
026
027 import java.lang.reflect.Field;
028 import java.lang.reflect.InvocationTargetException;
029 import java.lang.reflect.Method;
030 import java.nio.ByteBuffer;
031 import java.util.ArrayList;
032 import java.util.List;
033
034 import javax.jms.DeliveryMode;
035 import javax.jms.Destination;
036 import javax.jms.JMSException;
037 import javax.jms.Message;
038 import javax.jms.MessageConsumer;
039 import javax.jms.MessageListener;
040 import javax.jms.MessageProducer;
041 import javax.jms.TextMessage;
042
043 import org.apache.qpid.client.AMQConnection;
044 import org.apache.qpid.thread.Threading;
045 import org.apache.qpid.transport.DeliveryProperties;
046 import org.apache.qpid.transport.ExchangeBind;
047 import org.apache.qpid.transport.Header;
048 import org.apache.qpid.transport.MessageAcceptMode;
049 import org.apache.qpid.transport.MessageAcquireMode;
050 import org.apache.qpid.transport.MessageCreditUnit;
051 import org.apache.qpid.transport.MessageDeliveryMode;
052 import org.apache.qpid.transport.MessageFlowMode;
053 import org.apache.qpid.transport.MessageProperties;
054 import org.apache.qpid.transport.MessageSubscribe;
055 import org.apache.qpid.transport.MessageTransfer;
056 import org.apache.qpid.transport.QueueDeclare;
057 import org.apache.qpid.transport.SessionException;
058 import org.apache.qpid.transport.SessionListener;
059 import org.apache.qpid.util.UUIDGen;
060 import org.apache.qpid.util.UUIDs;
061
062 /**
063 * QpidBench
064 *
065 */
066
067 public class QpidBench
068 {
069
070 static enum Mode
071 {
072 PUBLISH, CONSUME, BOTH
073 }
074
075 private static class Options
076 {
077 private StringBuilder usage = new StringBuilder("qpid-bench <options>");
078
079 void usage(String name, String description, Object def)
080 {
081 String defval = "";
082 if (def != null)
083 {
084 defval = String.format(" (%s)", def);
085 }
086 usage.append(String.format("\n %-15s%-14s %s", name, defval, description));
087 }
088
089 public String broker = "localhost";
090 public int port = 5672;
091 public long count = 1000000;
092 public long window = 100000;
093 public long sample = window;
094 public int size = 1024;
095 public Mode mode = BOTH;
096 public boolean timestamp = false;
097 public boolean message_id = false;
098 public boolean message_cache = false;
099 public boolean persistent = false;
100 public boolean jms_publish = false;
101 public boolean jms_consume = false;
102 public boolean help = false;
103
104 {
105 usage("-b, --broker", "the broker hostname", broker);
106 }
107
108 public void parse__broker(String b)
109 {
110 this.broker = b;
111 }
112
113 public void parse_b(String b)
114 {
115 parse__broker(b);
116 }
117
118 {
119 usage("-p, --port", "the broker port", port);
120 }
121
122 public void parse__port(String p)
123 {
124 this.port = Integer.parseInt(p);
125 }
126
127 public void parse_p(String p)
128 {
129 parse__port(p);
130 }
131
132 {
133 usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count);
134 }
135
136 public void parse__count(String c)
137 {
138 this.count = Long.parseLong(c);
139 }
140
141 public void parse_c(String c)
142 {
143 parse__count(c);
144 }
145
146 {
147 usage("-w, --window", "the number of messages to send before blocking, 0 disables", window);
148 }
149
150 public void parse__window(String w)
151 {
152 this.window = Long.parseLong(w);
153 }
154
155 public void parse_w(String w)
156 {
157 parse__window(w);
158 }
159
160 {
161 usage("--sample", "print stats after this many messages, 0 disables", sample);
162 }
163
164 public void parse__sample(String s)
165 {
166 this.sample = Long.parseLong(s);
167 }
168
169 {
170 usage("-i, --interval", "sets both --window and --sample", window);
171 }
172
173 public void parse__interval(String i)
174 {
175 this.window = Long.parseLong(i);
176 this.sample = window;
177 }
178
179 public void parse_i(String i)
180 {
181 parse__interval(i);
182 }
183
184 {
185 usage("-s, --size", "the message size", size);
186 }
187
188 public void parse__size(String s)
189 {
190 this.size = Integer.parseInt(s);
191 }
192
193 public void parse_s(String s)
194 {
195 parse__size(s);
196 }
197
198 {
199 usage("-m, --mode", "one of publish, consume, or both", mode);
200 }
201
202 public void parse__mode(String m)
203 {
204 if (m.equalsIgnoreCase("publish"))
205 {
206 this.mode = PUBLISH;
207 }
208 else if (m.equalsIgnoreCase("consume"))
209 {
210 this.mode = CONSUME;
211 }
212 else if (m.equalsIgnoreCase("both"))
213 {
214 this.mode = BOTH;
215 }
216 else
217 {
218 throw new IllegalArgumentException
219 ("must be one of 'publish', 'consume', or 'both'");
220 }
221 }
222
223 public void parse_m(String m)
224 {
225 parse__mode(m);
226 }
227
228 {
229 usage("--timestamp", "set timestamps on each message if true", timestamp);
230 }
231
232 public void parse__timestamp(String t)
233 {
234 this.timestamp = Boolean.parseBoolean(t);
235 }
236
237 {
238 usage("--mesage-id", "set the message-id on each message if true", message_id);
239 }
240
241 public void parse__message_id(String m)
242 {
243 this.message_id = Boolean.parseBoolean(m);
244 }
245
246 {
247 usage("--message-cache", "reuse the same message for each send if true", message_cache);
248 }
249
250 public void parse__message_cache(String c)
251 {
252 this.message_cache = Boolean.parseBoolean(c);
253 }
254
255 {
256 usage("--persistent", "set the delivery-mode to persistent if true", persistent);
257 }
258
259 public void parse__persistent(String p)
260 {
261 this.persistent = Boolean.parseBoolean(p);
262 }
263
264 {
265 usage("--jms-publish", "use the jms client for publish", jms_publish);
266 }
267
268 public void parse__jms_publish(String jp)
269 {
270 this.jms_publish = Boolean.parseBoolean(jp);
271 }
272
273 {
274 usage("--jms-consume", "use the jms client for consume", jms_consume);
275 }
276
277 public void parse__jms_consume(String jc)
278 {
279 this.jms_consume = Boolean.parseBoolean(jc);
280 }
281
282 {
283 usage("--jms", "sets both --jms-publish and --jms-consume", false);
284 }
285
286 public void parse__jms(String j)
287 {
288 this.jms_publish = this.jms_consume = Boolean.parseBoolean(j);
289 }
290
291 {
292 usage("-h, --help", "prints this message", null);
293 }
294
295 public void parse__help()
296 {
297 this.help = true;
298 }
299
300 public void parse_h()
301 {
302 parse__help();
303 }
304
305 public String parse(String ... args)
306 {
307 Class klass = getClass();
308 List<String> arguments = new ArrayList<String>();
309 for (int i = 0; i < args.length; i++)
310 {
311 String option = args[i];
312
313 if (!option.startsWith("-"))
314 {
315 arguments.add(option);
316 continue;
317 }
318
319 String method = "parse" + option.replace('-', '_');
320 try
321 {
322 try
323 {
324 Method parser = klass.getMethod(method);
325 parser.invoke(this);
326 }
327 catch (NoSuchMethodException e)
328 {
329 try
330 {
331 Method parser = klass.getMethod(method, String.class);
332
333 String value = null;
334 if (i + 1 < args.length)
335 {
336 value = args[i+1];
337 i++;
338 }
339 else
340 {
341 return option + " requires a value";
342 }
343
344 parser.invoke(this, value);
345 }
346 catch (NoSuchMethodException e2)
347 {
348 return "no such option: " + option;
349 }
350 }
351 }
352 catch (InvocationTargetException e)
353 {
354 Throwable t = e.getCause();
355 return String.format
356 ("error parsing %s: %s: %s", option, t.getClass().getName(),
357 t.getMessage());
358 }
359 catch (IllegalAccessException e)
360 {
361 throw new RuntimeException
362 ("unable to access parse method: " + option, e);
363 }
364 }
365
366 return parseArguments(arguments);
367 }
368
369 public String parseArguments(List<String> arguments)
370 {
371 if (arguments.size() > 0)
372 {
373 String args = arguments.toString();
374 return "unrecognized arguments: " + args.substring(1, args.length() - 1);
375 }
376 else
377 {
378 return null;
379 }
380 }
381
382 public String toString()
383 {
384 Class klass = getClass();
385 Field[] fields = klass.getFields();
386 StringBuilder str = new StringBuilder();
387 for (int i = 0; i < fields.length; i++)
388 {
389 if (i > 0)
390 {
391 str.append("\n");
392 }
393
394 String name = fields[i].getName();
395 str.append(name);
396 str.append(" = ");
397 Object value;
398 try
399 {
400 value = fields[i].get(this);
401 }
402 catch (IllegalAccessException e)
403 {
404 throw new RuntimeException
405 ("unable to access field: " + name, e);
406 }
407 str.append(value);
408 }
409
410 return str.toString();
411 }
412 }
413
414 public static final void main(String[] args) throws Exception
415 {
416 final Options opts = new Options();
417 String error = opts.parse(args);
418 if (error != null)
419 {
420 System.err.println(error);
421 System.exit(-1);
422 return;
423 }
424
425 if (opts.help)
426 {
427 System.out.println(opts.usage);
428 return;
429 }
430
431 System.out.println(opts);
432
433 switch (opts.mode)
434 {
435 case CONSUME:
436 case BOTH:
437 Runnable r = new Runnable()
438 {
439 public void run()
440 {
441 try
442 {
443 if (opts.jms_consume)
444 {
445 jms_consumer(opts);
446 }
447 else
448 {
449 native_consumer(opts);
450 }
451 }
452 catch (Exception e)
453 {
454 throw new RuntimeException(e);
455 }
456 }
457 };
458
459 Thread t;
460 try
461 {
462 t = Threading.getThreadFactory().createThread(r);
463 }
464 catch(Exception e)
465 {
466 throw new Error("Error creating consumer thread",e);
467 }
468 t.start();
469 break;
470 }
471
472 switch (opts.mode)
473 {
474 case PUBLISH:
475 case BOTH:
476 Runnable r = new Runnable()
477 {
478 public void run()
479 {
480 try
481 {
482 if (opts.jms_publish)
483 {
484 jms_publisher(opts);
485 }
486 else
487 {
488 native_publisher(opts);
489 }
490 }
491 catch (Exception e)
492 {
493 throw new RuntimeException(e);
494 }
495 }
496 };
497 Thread t;
498 try
499 {
500 t = Threading.getThreadFactory().createThread(r);
501 }
502 catch(Exception e)
503 {
504 throw new Error("Error creating publisher thread",e);
505 }
506 t.start();
507 break;
508 }
509 }
510
511 private static enum Column
512 {
513 LEFT, RIGHT
514 }
515
516 private static final void sample(Options opts, Column col, String name, long count,
517 long start, long time, long lastTime)
518 {
519 String pfx = "";
520 String sfx = "";
521 if (opts.mode == BOTH)
522 {
523 if (col == Column.RIGHT)
524 {
525 pfx = " -- ";
526 }
527 else
528 {
529 sfx = " --";
530 }
531 }
532
533 if (count == 0)
534 {
535 String stats = String.format("%s: %tc", name, start);
536 System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
537 return;
538 }
539
540 double cumulative = 1000 * (double) count / (double) (time - start);
541 double interval = 1000 * ((double) opts.sample / (double) (time - lastTime));
542
543 String stats = String.format
544 ("%s: %d %.2f %.2f", name, count, cumulative, interval);
545 System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
546 }
547
548 private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception
549 {
550 String url = String.format
551 ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'",
552 opts.broker, opts.port);
553 return new AMQConnection(url);
554 }
555
556 private static final void jms_publisher(Options opts) throws Exception
557 {
558 javax.jms.Connection conn = getJMSConnection(opts);
559
560 javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
561 Destination dest = ssn.createQueue("test-queue");
562 Destination echo_dest = ssn.createQueue("echo-queue");
563 MessageProducer prod = ssn.createProducer(dest);
564 MessageConsumer cons = ssn.createConsumer(echo_dest);
565 prod.setDisableMessageID(!opts.message_id);
566 prod.setDisableMessageTimestamp(!opts.timestamp);
567 prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
568
569 StringBuilder str = new StringBuilder();
570 for (int i = 0; i < opts.size; i++)
571 {
572 str.append((char) (i % 128));
573 }
574
575 String body = str.toString();
576
577 TextMessage cached = ssn.createTextMessage();
578 cached.setText(body);
579
580 conn.start();
581
582 long count = 0;
583 long lastTime = 0;
584 long start = System.currentTimeMillis();
585 while (opts.count == 0 || count < opts.count)
586 {
587 if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
588 {
589 Message echo = cons.receive();
590 }
591
592 if (opts.sample > 0 && (count % opts.sample) == 0)
593 {
594 long time = System.currentTimeMillis();
595 sample(opts, Column.LEFT, "JP", count, start, time, lastTime);
596 lastTime = time;
597 }
598
599 TextMessage m;
600 if (opts.message_cache)
601 {
602 m = cached;
603 }
604 else
605 {
606 m = ssn.createTextMessage();
607 m.setText(body);
608 }
609
610 prod.send(m);
611 count++;
612 }
613
614 conn.close();
615 }
616
617 private static final void jms_consumer(final Options opts) throws Exception
618 {
619 final javax.jms.Connection conn = getJMSConnection(opts);
620 javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
621 Destination dest = ssn.createQueue("test-queue");
622 Destination echo_dest = ssn.createQueue("echo-queue");
623 MessageConsumer cons = ssn.createConsumer(dest);
624 final MessageProducer prod = ssn.createProducer(echo_dest);
625 prod.setDisableMessageID(true);
626 prod.setDisableMessageTimestamp(true);
627 prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
628 final TextMessage echo = ssn.createTextMessage();
629 echo.setText("ECHO");
630
631 final Object done = new Object();
632 cons.setMessageListener(new MessageListener()
633 {
634 private long count = 0;
635 private long lastTime = 0;
636 private long start;
637
638 public void onMessage(Message m)
639 {
640 if (count == 0)
641 {
642 start = System.currentTimeMillis();
643 }
644
645 try
646 {
647 boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
648 long time = sample ? System.currentTimeMillis() : 0;
649
650 if (opts.window > 0 && (count % opts.window) == 0)
651 {
652 prod.send(echo);
653 }
654
655 if (sample)
656 {
657 sample(opts, Column.RIGHT, "JC", count, start, time, lastTime);
658 lastTime = time;
659 }
660 }
661 catch (JMSException e)
662 {
663 throw new RuntimeException(e);
664 }
665 count++;
666
667 if (opts.count > 0 && count >= opts.count)
668 {
669 synchronized (done)
670 {
671 done.notify();
672 }
673 }
674 }
675 });
676
677 conn.start();
678 synchronized (done)
679 {
680 done.wait();
681 }
682 conn.close();
683 }
684
685 private static final org.apache.qpid.transport.Connection getConnection
686 (Options opts)
687 {
688 org.apache.qpid.transport.Connection conn =
689 new org.apache.qpid.transport.Connection();
690 conn.connect(opts.broker, opts.port, null, "guest", "guest",false);
691 return conn;
692 }
693
694 private static abstract class NativeListener implements SessionListener
695 {
696
697 public void opened(org.apache.qpid.transport.Session ssn) {}
698
699 public void exception(org.apache.qpid.transport.Session ssn,
700 SessionException exc)
701 {
702 exc.printStackTrace();
703 }
704
705 public void closed(org.apache.qpid.transport.Session ssn) {}
706
707 }
708
709 private static final void native_publisher(Options opts) throws Exception
710 {
711 final long[] echos = { 0 };
712 org.apache.qpid.transport.Connection conn = getConnection(opts);
713 org.apache.qpid.transport.Session ssn = conn.createSession();
714 ssn.setSessionListener(new NativeListener()
715 {
716 public void message(org.apache.qpid.transport.Session ssn,
717 MessageTransfer xfr)
718 {
719 synchronized (echos)
720 {
721 echos[0]++;
722 echos.notify();
723 }
724 ssn.processed(xfr);
725 }
726 });
727
728 ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
729 ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
730 ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
731 ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
732
733 MessageProperties cached_mp = new MessageProperties();
734 DeliveryProperties cached_dp = new DeliveryProperties();
735 cached_dp.setRoutingKey("test-queue");
736 cached_dp.setDeliveryMode
737 (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
738
739 int size = opts.size;
740 ByteBuffer body = ByteBuffer.allocate(size);
741 for (int i = 0; i < size; i++)
742 {
743 body.put((byte) i);
744 }
745 body.flip();
746
747 ssn.invoke(new MessageSubscribe()
748 .queue("echo-queue")
749 .destination("echo-queue")
750 .acceptMode(MessageAcceptMode.NONE)
751 .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
752 ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW);
753 ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
754 ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
755
756 UUIDGen gen = UUIDs.newGenerator();
757
758 long count = 0;
759 long lastTime = 0;
760 long start = System.currentTimeMillis();
761 while (opts.count == 0 || count < opts.count)
762 {
763 if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
764 {
765 synchronized (echos)
766 {
767 while (echos[0] < (count/opts.window))
768 {
769 echos.wait();
770 }
771 }
772 }
773
774 if (opts.sample > 0 && (count % opts.sample) == 0)
775 {
776 long time = System.currentTimeMillis();
777 sample(opts, Column.LEFT, "NP", count, start, time, lastTime);
778 lastTime = time;
779 }
780
781 MessageProperties mp;
782 DeliveryProperties dp;
783 if (opts.message_cache)
784 {
785 mp = cached_mp;
786 dp = cached_dp;
787 }
788 else
789 {
790 mp = new MessageProperties();
791 dp = new DeliveryProperties();
792 dp.setRoutingKey("test-queue");
793 dp.setDeliveryMode
794 (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
795
796 }
797
798 if (opts.message_id)
799 {
800 mp.setMessageId(gen.generate());
801 }
802
803 if (opts.timestamp)
804 {
805 dp.setTimestamp(System.currentTimeMillis());
806 }
807
808 ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
809 new Header(dp, mp), body.slice());
810 count++;
811 }
812
813 ssn.messageCancel("echo-queue");
814
815 ssn.sync();
816 ssn.close();
817 conn.close();
818 }
819
820 private static final void native_consumer(final Options opts) throws Exception
821 {
822 final DeliveryProperties dp = new DeliveryProperties();
823 final byte[] echo = new byte[0];
824 dp.setRoutingKey("echo-queue");
825 dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
826 final MessageProperties mp = new MessageProperties();
827 final Object done = new Object();
828 org.apache.qpid.transport.Connection conn = getConnection(opts);
829 org.apache.qpid.transport.Session ssn = conn.createSession();
830 ssn.setSessionListener(new NativeListener()
831 {
832 private long count = 0;
833 private long lastTime = 0;
834 private long start;
835
836 public void message(org.apache.qpid.transport.Session ssn,
837 MessageTransfer xfr)
838 {
839 if (count == 0)
840 {
841 start = System.currentTimeMillis();
842 }
843
844 boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
845 long time = sample ? System.currentTimeMillis() : 0;
846
847 if (opts.window > 0 && (count % opts.window) == 0)
848 {
849 ssn.messageTransfer("amq.direct",
850 MessageAcceptMode.NONE,
851 MessageAcquireMode.PRE_ACQUIRED,
852 new Header(dp, mp),
853 echo);
854 }
855
856 if (sample)
857 {
858 sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
859 lastTime = time;
860 }
861 ssn.processed(xfr);
862 count++;
863
864 if (opts.count > 0 && count >= opts.count)
865 {
866 synchronized (done)
867 {
868 done.notify();
869 }
870 }
871 }
872 });
873
874 ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
875 ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
876 ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
877 ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
878
879 ssn.invoke(new MessageSubscribe()
880 .queue("test-queue")
881 .destination("test-queue")
882 .acceptMode(MessageAcceptMode.NONE)
883 .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
884 ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW);
885 ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
886 ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
887
888 synchronized (done)
889 {
890 done.wait();
891 }
892
893 ssn.messageCancel("test-queue");
894
895 ssn.sync();
896 ssn.close();
897 conn.close();
898 }
899
900 }
|