QpidBench.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.tools;
022 
023 import static org.apache.qpid.tools.QpidBench.Mode.BOTH;
024 import static org.apache.qpid.tools.QpidBench.Mode.CONSUME;
025 import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH;
026 
027 import java.lang.reflect.Field;
028 import java.lang.reflect.InvocationTargetException;
029 import java.lang.reflect.Method;
030 import java.nio.ByteBuffer;
031 import java.util.ArrayList;
032 import java.util.List;
033 
034 import javax.jms.DeliveryMode;
035 import javax.jms.Destination;
036 import javax.jms.JMSException;
037 import javax.jms.Message;
038 import javax.jms.MessageConsumer;
039 import javax.jms.MessageListener;
040 import javax.jms.MessageProducer;
041 import javax.jms.TextMessage;
042 
043 import org.apache.qpid.client.AMQConnection;
044 import org.apache.qpid.thread.Threading;
045 import org.apache.qpid.transport.DeliveryProperties;
046 import org.apache.qpid.transport.ExchangeBind;
047 import org.apache.qpid.transport.Header;
048 import org.apache.qpid.transport.MessageAcceptMode;
049 import org.apache.qpid.transport.MessageAcquireMode;
050 import org.apache.qpid.transport.MessageCreditUnit;
051 import org.apache.qpid.transport.MessageDeliveryMode;
052 import org.apache.qpid.transport.MessageFlowMode;
053 import org.apache.qpid.transport.MessageProperties;
054 import org.apache.qpid.transport.MessageSubscribe;
055 import org.apache.qpid.transport.MessageTransfer;
056 import org.apache.qpid.transport.QueueDeclare;
057 import org.apache.qpid.transport.SessionException;
058 import org.apache.qpid.transport.SessionListener;
059 import org.apache.qpid.util.UUIDGen;
060 import org.apache.qpid.util.UUIDs;
061 
062 /**
063  * QpidBench
064  *
065  */
066 
067 public class QpidBench
068 {
069 
070     static enum Mode
071     {
072         PUBLISH, CONSUME, BOTH
073     }
074 
075     private static class Options
076     {
077         private StringBuilder usage = new StringBuilder("qpid-bench <options>");
078 
079         void usage(String name, String description, Object def)
080         {
081             String defval = "";
082             if (def != null)
083             {
084                 defval = String.format(" (%s)", def);
085             }
086             usage.append(String.format("\n  %-15s%-14s %s", name, defval, description));
087         }
088 
089         public String broker = "localhost";
090         public int port = 5672;
091         public long count = 1000000;
092         public long window = 100000;
093         public long sample = window;
094         public int size = 1024;
095         public Mode mode = BOTH;
096         public boolean timestamp = false;
097         public boolean message_id = false;
098         public boolean message_cache = false;
099         public boolean persistent = false;
100         public boolean jms_publish = false;
101         public boolean jms_consume = false;
102         public boolean help = false;
103 
104         {
105             usage("-b, --broker""the broker hostname", broker);
106         }
107 
108         public void parse__broker(String b)
109         {
110             this.broker = b;
111         }
112 
113         public void parse_b(String b)
114         {
115             parse__broker(b);
116         }
117 
118         {
119             usage("-p, --port""the broker port", port);
120         }
121 
122         public void parse__port(String p)
123         {
124             this.port = Integer.parseInt(p);
125         }
126 
127         public void parse_p(String p)
128         {
129             parse__port(p);
130         }
131 
132         {
133             usage("-c, --count""the number of messages to send/receive, 0 means no limit", count);
134         }
135 
136         public void parse__count(String c)
137         {
138             this.count = Long.parseLong(c);
139         }
140 
141         public void parse_c(String c)
142         {
143             parse__count(c);
144         }
145 
146         {
147             usage("-w, --window""the number of messages to send before blocking, 0 disables", window);
148         }
149 
150         public void parse__window(String w)
151         {
152             this.window = Long.parseLong(w);
153         }
154 
155         public void parse_w(String w)
156         {
157             parse__window(w);
158         }
159 
160         {
161             usage("--sample""print stats after this many messages, 0 disables", sample);
162         }
163 
164         public void parse__sample(String s)
165         {
166             this.sample = Long.parseLong(s);
167         }
168 
169         {
170             usage("-i, --interval""sets both --window and --sample", window);
171         }
172 
173         public void parse__interval(String i)
174         {
175             this.window = Long.parseLong(i);
176             this.sample = window;
177         }
178 
179         public void parse_i(String i)
180         {
181             parse__interval(i);
182         }
183 
184         {
185             usage("-s, --size""the message size", size);
186         }
187 
188         public void parse__size(String s)
189         {
190             this.size = Integer.parseInt(s);
191         }
192 
193         public void parse_s(String s)
194         {
195             parse__size(s);
196         }
197 
198         {
199             usage("-m, --mode""one of publish, consume, or both", mode);
200         }
201 
202         public void parse__mode(String m)
203         {
204             if (m.equalsIgnoreCase("publish"))
205             {
206                 this.mode = PUBLISH;
207             }
208             else if (m.equalsIgnoreCase("consume"))
209             {
210                 this.mode = CONSUME;
211             }
212             else if (m.equalsIgnoreCase("both"))
213             {
214                 this.mode = BOTH;
215             }
216             else
217             {
218                 throw new IllegalArgumentException
219                     ("must be one of 'publish', 'consume', or 'both'");
220             }
221         }
222 
223         public void parse_m(String m)
224         {
225             parse__mode(m);
226         }
227 
228         {
229             usage("--timestamp""set timestamps on each message if true", timestamp);
230         }
231 
232         public void parse__timestamp(String t)
233         {
234             this.timestamp = Boolean.parseBoolean(t);
235         }
236 
237         {
238             usage("--mesage-id""set the message-id on each message if true", message_id);
239         }
240 
241         public void parse__message_id(String m)
242         {
243             this.message_id = Boolean.parseBoolean(m);
244         }
245 
246         {
247             usage("--message-cache""reuse the same message for each send if true", message_cache);
248         }
249 
250         public void parse__message_cache(String c)
251         {
252             this.message_cache = Boolean.parseBoolean(c);
253         }
254 
255         {
256             usage("--persistent""set the delivery-mode to persistent if true", persistent);
257         }
258 
259         public void parse__persistent(String p)
260         {
261             this.persistent = Boolean.parseBoolean(p);
262         }
263 
264         {
265             usage("--jms-publish""use the jms client for publish", jms_publish);
266         }
267 
268         public void parse__jms_publish(String jp)
269         {
270             this.jms_publish = Boolean.parseBoolean(jp);
271         }
272 
273         {
274             usage("--jms-consume""use the jms client for consume", jms_consume);
275         }
276 
277         public void parse__jms_consume(String jc)
278         {
279             this.jms_consume = Boolean.parseBoolean(jc);
280         }
281 
282         {
283             usage("--jms""sets both --jms-publish and --jms-consume"false);
284         }
285 
286         public void parse__jms(String j)
287         {
288             this.jms_publish = this.jms_consume = Boolean.parseBoolean(j);
289         }
290 
291         {
292             usage("-h, --help""prints this message"null);
293         }
294 
295         public void parse__help()
296         {
297             this.help = true;
298         }
299 
300         public void parse_h()
301         {
302             parse__help();
303         }
304 
305         public String parse(String ... args)
306         {
307             Class klass = getClass();
308             List<String> arguments = new ArrayList<String>();
309             for (int i = 0; i < args.length; i++)
310             {
311                 String option = args[i];
312 
313                 if (!option.startsWith("-"))
314                 {
315                     arguments.add(option);
316                     continue;
317                 }
318 
319                 String method = "parse" + option.replace('-''_');
320                 try
321                 {
322                     try
323                     {
324                         Method parser = klass.getMethod(method);
325                         parser.invoke(this);
326                     }
327                     catch (NoSuchMethodException e)
328                     {
329                         try
330                         {
331                             Method parser = klass.getMethod(method, String.class);
332 
333                             String value = null;
334                             if (i + < args.length)
335                             {
336                                 value = args[i+1];
337                                 i++;
338                             }
339                             else
340                             {
341                                 return option + " requires a value";
342                             }
343 
344                             parser.invoke(this, value);
345                         }
346                         catch (NoSuchMethodException e2)
347                         {
348                             return "no such option: " + option;
349                         }
350                     }
351                 }
352                 catch (InvocationTargetException e)
353                 {
354                     Throwable t = e.getCause();
355                     return String.format
356                         ("error parsing %s: %s: %s", option, t.getClass().getName(),
357                          t.getMessage());
358                 }
359                 catch (IllegalAccessException e)
360                 {
361                     throw new RuntimeException
362                         ("unable to access parse method: " + option, e);
363                 }
364             }
365 
366             return parseArguments(arguments);
367         }
368 
369         public String parseArguments(List<String> arguments)
370         {
371             if (arguments.size() 0)
372             {
373                 String args = arguments.toString();
374                 return "unrecognized arguments: " + args.substring(1, args.length() 1);
375             }
376             else
377             {
378                 return null;
379             }
380         }
381 
382         public String toString()
383         {
384             Class klass = getClass();
385             Field[] fields = klass.getFields();
386             StringBuilder str = new StringBuilder();
387             for (int i = 0; i < fields.length; i++)
388             {
389                 if (i > 0)
390                 {
391                     str.append("\n");
392                 }
393 
394                 String name = fields[i].getName();
395                 str.append(name);
396                 str.append(" = ");
397                 Object value;
398                 try
399                 {
400                     value = fields[i].get(this);
401                 }
402                 catch (IllegalAccessException e)
403                 {
404                     throw new RuntimeException
405                         ("unable to access field: " + name, e);
406                 }
407                 str.append(value);
408             }
409 
410             return str.toString();
411         }
412     }
413 
414     public static final void main(String[] argsthrows Exception
415     {
416         final Options opts = new Options();
417         String error = opts.parse(args);
418         if (error != null)
419         {
420             System.err.println(error);
421             System.exit(-1);
422             return;
423         }
424 
425         if (opts.help)
426         {
427             System.out.println(opts.usage);
428             return;
429         }
430 
431         System.out.println(opts);
432 
433         switch (opts.mode)
434         {
435         case CONSUME:
436         case BOTH:
437             Runnable r = new Runnable()
438             {
439                 public void run()
440                 {
441                     try
442                     {
443                         if (opts.jms_consume)
444                         {
445                             jms_consumer(opts);
446                         }
447                         else
448                         {
449                             native_consumer(opts);
450                         }
451                     }
452                     catch (Exception e)
453                     {
454                         throw new RuntimeException(e);
455                     }
456                 }
457             };
458            
459             Thread t;
460             try
461             {
462                 t = Threading.getThreadFactory().createThread(r);                      
463             }
464             catch(Exception e)
465             {
466                 throw new Error("Error creating consumer thread",e);
467             }
468             t.start();
469             break;
470         }
471 
472         switch (opts.mode)
473         {
474         case PUBLISH:
475         case BOTH:
476             Runnable r = new Runnable()
477             {
478                 public void run()
479                 {
480                     try
481                     {
482                         if (opts.jms_publish)
483                         {
484                             jms_publisher(opts);
485                         }
486                         else
487                         {
488                             native_publisher(opts);
489                         }
490                     }
491                     catch (Exception e)
492                     {
493                         throw new RuntimeException(e);
494                     }
495                 }
496             };
497             Thread t;
498             try
499             {
500                 t = Threading.getThreadFactory().createThread(r);                      
501             }
502             catch(Exception e)
503             {
504                 throw new Error("Error creating publisher thread",e);
505             }
506             t.start();
507             break;
508         }
509     }
510 
511     private static enum Column
512     {
513         LEFT, RIGHT
514     }
515 
516     private static final void sample(Options opts, Column col, String name, long count,
517                                      long start, long time, long lastTime)
518     {
519         String pfx = "";
520         String sfx = "";
521         if (opts.mode == BOTH)
522         {
523             if (col == Column.RIGHT)
524             {
525                 pfx = "               --                   ";
526             }
527             else
528             {
529                 sfx = "               --";
530             }
531         }
532 
533         if (count == 0)
534         {
535             String stats = String.format("%s: %tc", name, start);
536             System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
537             return;
538         }
539 
540         double cumulative = 1000 (doublecount / (double) (time - start);
541         double interval = 1000 ((doubleopts.sample / (double) (time - lastTime));
542 
543         String stats = String.format
544             ("%s: %d %.2f %.2f", name, count, cumulative, interval);
545         System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
546     }
547 
548     private static final javax.jms.Connection getJMSConnection(Options optsthrows Exception
549     {
550         String url = String.format
551             ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'",
552              opts.broker, opts.port);
553         return new AMQConnection(url);
554     }
555 
556     private static final void jms_publisher(Options optsthrows Exception
557     {
558         javax.jms.Connection conn = getJMSConnection(opts);
559 
560         javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
561         Destination dest = ssn.createQueue("test-queue");
562         Destination echo_dest = ssn.createQueue("echo-queue");
563         MessageProducer prod = ssn.createProducer(dest);
564         MessageConsumer cons = ssn.createConsumer(echo_dest);
565         prod.setDisableMessageID(!opts.message_id);
566         prod.setDisableMessageTimestamp(!opts.timestamp);
567         prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
568 
569         StringBuilder str = new StringBuilder();
570         for (int i = 0; i < opts.size; i++)
571         {
572             str.append((char) (i % 128));
573         }
574 
575         String body = str.toString();
576 
577         TextMessage cached = ssn.createTextMessage();
578         cached.setText(body);
579 
580         conn.start();
581 
582         long count = 0;
583         long lastTime = 0;
584         long start = System.currentTimeMillis();
585         while (opts.count == || count < opts.count)
586         {
587             if (opts.window > && (count % opts.window== && count > 0)
588             {
589                 Message echo = cons.receive();
590             }
591 
592             if (opts.sample > && (count % opts.sample== 0)
593             {
594                 long time = System.currentTimeMillis();
595                 sample(opts, Column.LEFT, "JP", count, start, time, lastTime);
596                 lastTime = time;
597             }
598 
599             TextMessage m;
600             if (opts.message_cache)
601             {
602                 m = cached;
603             }
604             else
605             {
606                 m = ssn.createTextMessage();
607                 m.setText(body);
608             }
609 
610             prod.send(m);
611             count++;
612         }
613 
614         conn.close();
615     }
616 
617     private static final void jms_consumer(final Options optsthrows Exception
618     {
619         final javax.jms.Connection conn = getJMSConnection(opts);
620         javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
621         Destination dest = ssn.createQueue("test-queue");
622         Destination echo_dest = ssn.createQueue("echo-queue");
623         MessageConsumer cons = ssn.createConsumer(dest);
624         final MessageProducer prod = ssn.createProducer(echo_dest);
625         prod.setDisableMessageID(true);
626         prod.setDisableMessageTimestamp(true);
627         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
628         final TextMessage echo = ssn.createTextMessage();
629         echo.setText("ECHO");
630 
631         final Object done = new Object();
632         cons.setMessageListener(new MessageListener()
633         {
634             private long count = 0;
635             private long lastTime = 0;
636             private long start;
637 
638             public void onMessage(Message m)
639             {
640                 if (count == 0)
641                 {
642                     start = System.currentTimeMillis();
643                 }
644 
645                 try
646                 {
647                     boolean sample = opts.sample > && (count % opts.sample== 0;
648                     long time = sample ? System.currentTimeMillis() 0;
649 
650                     if (opts.window > && (count % opts.window== 0)
651                     {
652                         prod.send(echo);
653                     }
654 
655                     if (sample)
656                     {
657                         sample(opts, Column.RIGHT, "JC", count, start, time, lastTime);
658                         lastTime = time;
659                     }
660                 }
661                 catch (JMSException e)
662                 {
663                     throw new RuntimeException(e);
664                 }
665                 count++;
666 
667                 if (opts.count > && count >= opts.count)
668                 {
669                     synchronized (done)
670                     {
671                         done.notify();
672                     }
673                 }
674             }
675         });
676 
677         conn.start();
678         synchronized (done)
679         {
680             done.wait();
681         }
682         conn.close();
683     }
684 
685     private static final org.apache.qpid.transport.Connection getConnection
686         (Options opts)
687     {
688         org.apache.qpid.transport.Connection conn =
689             new org.apache.qpid.transport.Connection();
690         conn.connect(opts.broker, opts.port, null, "guest""guest",false);
691         return conn;
692     }
693 
694     private static abstract class NativeListener implements SessionListener
695     {
696 
697         public void opened(org.apache.qpid.transport.Session ssn) {}
698 
699         public void exception(org.apache.qpid.transport.Session ssn,
700                               SessionException exc)
701         {
702             exc.printStackTrace();
703         }
704 
705         public void closed(org.apache.qpid.transport.Session ssn) {}
706 
707     }
708 
709     private static final void native_publisher(Options optsthrows Exception
710     {
711         final long[] echos = };
712         org.apache.qpid.transport.Connection conn = getConnection(opts);
713         org.apache.qpid.transport.Session ssn = conn.createSession();
714         ssn.setSessionListener(new NativeListener()
715         {
716             public void message(org.apache.qpid.transport.Session ssn,
717                                 MessageTransfer xfr)
718             {
719                 synchronized (echos)
720                 {
721                     echos[0]++;
722                     echos.notify();
723                 }
724                 ssn.processed(xfr);
725             }
726         });
727 
728         ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
729         ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
730         ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
731         ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
732 
733         MessageProperties cached_mp = new MessageProperties();
734         DeliveryProperties cached_dp = new DeliveryProperties();
735         cached_dp.setRoutingKey("test-queue");
736         cached_dp.setDeliveryMode
737             (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
738 
739         int size = opts.size;
740         ByteBuffer body = ByteBuffer.allocate(size);
741         for (int i = 0; i < size; i++)
742         {
743             body.put((bytei);
744         }
745         body.flip();
746 
747         ssn.invoke(new MessageSubscribe()
748                    .queue("echo-queue")
749                    .destination("echo-queue")
750                    .acceptMode(MessageAcceptMode.NONE)
751                    .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
752         ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW);
753         ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
754         ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
755 
756         UUIDGen gen = UUIDs.newGenerator();
757 
758         long count = 0;
759         long lastTime = 0;
760         long start = System.currentTimeMillis();
761         while (opts.count == || count < opts.count)
762         {
763             if (opts.window > && (count % opts.window== && count > 0)
764             {
765                 synchronized (echos)
766                 {
767                     while (echos[0(count/opts.window))
768                     {
769                         echos.wait();
770                     }
771                 }
772             }
773 
774             if (opts.sample > && (count % opts.sample== 0)
775             {
776                 long time = System.currentTimeMillis();
777                 sample(opts, Column.LEFT, "NP", count, start, time, lastTime);
778                 lastTime = time;
779             }
780 
781             MessageProperties mp;
782             DeliveryProperties dp;
783             if (opts.message_cache)
784             {
785                 mp = cached_mp;
786                 dp = cached_dp;
787             }
788             else
789             {
790                 mp = new MessageProperties();
791                 dp = new DeliveryProperties();
792                 dp.setRoutingKey("test-queue");
793                 dp.setDeliveryMode
794                     (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
795 
796             }
797 
798             if (opts.message_id)
799             {
800                 mp.setMessageId(gen.generate());
801             }
802 
803             if (opts.timestamp)
804             {
805                 dp.setTimestamp(System.currentTimeMillis());
806             }
807 
808             ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
809                                 new Header(dp, mp), body.slice());
810             count++;
811         }
812 
813         ssn.messageCancel("echo-queue");
814 
815         ssn.sync();
816         ssn.close();
817         conn.close();
818     }
819 
820     private static final void native_consumer(final Options optsthrows Exception
821     {
822         final DeliveryProperties dp = new DeliveryProperties();
823         final byte[] echo = new byte[0];
824         dp.setRoutingKey("echo-queue");
825         dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
826         final MessageProperties mp = new MessageProperties();
827         final Object done = new Object();
828         org.apache.qpid.transport.Connection conn = getConnection(opts);
829         org.apache.qpid.transport.Session ssn = conn.createSession();
830         ssn.setSessionListener(new NativeListener()
831         {
832             private long count = 0;
833             private long lastTime = 0;
834             private long start;
835 
836             public void message(org.apache.qpid.transport.Session ssn,
837                                 MessageTransfer xfr)
838             {
839                 if (count == 0)
840                 {
841                     start = System.currentTimeMillis();
842                 }
843 
844                 boolean sample = opts.sample > && (count % opts.sample== 0;
845                 long time = sample ? System.currentTimeMillis() 0;
846 
847                 if (opts.window > && (count % opts.window== 0)
848                 {
849                     ssn.messageTransfer("amq.direct",
850                                         MessageAcceptMode.NONE,
851                                         MessageAcquireMode.PRE_ACQUIRED,
852                                         new Header(dp, mp),
853                                         echo);
854                 }
855 
856                 if (sample)
857                 {
858                     sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
859                     lastTime = time;
860                 }
861                 ssn.processed(xfr);
862                 count++;
863 
864                 if (opts.count > && count >= opts.count)
865                 {
866                     synchronized (done)
867                     {
868                         done.notify();
869                     }
870                 }
871             }
872         });
873 
874         ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
875         ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
876         ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
877         ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
878 
879         ssn.invoke(new MessageSubscribe()
880                    .queue("test-queue")
881                    .destination("test-queue")
882                    .acceptMode(MessageAcceptMode.NONE)
883                    .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
884         ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW);
885         ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
886         ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
887 
888         synchronized (done)
889         {
890             done.wait();
891         }
892 
893         ssn.messageCancel("test-queue");
894 
895         ssn.sync();
896         ssn.close();
897         conn.close();
898     }
899 
900 }