Publisher.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.oldtopic;
022 
023 import javax.jms.*;
024 
025 public class Publisher implements MessageListener
026 {
027     private final Object _lock = new Object();
028     private final Connection _connection;
029     private final Session _session;
030     private final MessageFactory _factory;
031     private final MessageProducer _publisher;
032     private int _count;
033 
034     Publisher(Connection connection, int size, int ackMode, boolean persistentthrows Exception
035     {
036         _connection = connection;
037         _session = _connection.createSession(false, ackMode);
038         _factory = new MessageFactory(_session, size);
039         _publisher = _factory.createTopicPublisher();
040         _publisher.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
041         System.out.println("Publishing " (persistent ? "persistent" "non-persistent"" messages of " + size + " bytes, " + Config.getAckModeDescription(ackMode".");
042     }
043 
044     private void test(Config configthrows Exception
045     {
046         test(config.getBatch(), config.getDelay(), config.getMessages(), config.getClients(), config.getWarmup());
047     }
048 
049     private void test(int batches, long delay, int msgCount, int consumerCount, int warmupthrows Exception
050     {
051         _factory.createControlConsumer().setMessageListener(this);
052         _connection.start();
053 
054         if(warmup > 0)
055         {
056             System.out.println("Runing warmup (" + warmup + " msgs)");
057             long time = batch(warmup, consumerCount);
058             System.out.println("Warmup completed in " + time + "ms");
059         }
060 
061         long[] times = new long[batches];
062         for(int i = 0; i < batches; i++)
063         {
064             if(i > 0Thread.sleep(delay*1000);
065             times[i= batch(msgCount, consumerCount);
066             System.out.println("Batch " (i+1" of " + batches + " completed in " + times[i" ms.");
067         }
068 
069         long min = min(times);
070         long max = max(times);
071         System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max));
072 
073         //request shutdown
074         _publisher.send(_factory.createShutdownMessage());
075 
076         _connection.stop();
077         _connection.close();
078     }
079 
080     private long batch(int msgCount, int consumerCountthrows Exception
081     {
082         _count = consumerCount;
083         long start = System.currentTimeMillis();
084         publish(msgCount);
085         waitForCompletion(consumerCount);
086         return System.currentTimeMillis() - start;
087     }
088 
089     private void publish(int countthrows Exception
090     {
091 
092         //send events
093         for (int i = 0; i < count; i++)
094         {
095             _publisher.send(_factory.createEventMessage());
096             if ((i + 1100 == 0)
097             {
098                 System.out.println("Sent " (i + 1" messages");
099             }
100         }
101 
102         //request report
103         _publisher.send(_factory.createReportRequestMessage());
104     }
105 
106     private void waitForCompletion(int consumersthrows Exception
107     {
108         System.out.println("Waiting for completion...");
109         synchronized (_lock)
110         {
111             while (_count > 0)
112             {
113                 _lock.wait();
114             }
115         }
116     }
117 
118 
119     public void onMessage(Message message)
120     {
121         System.out.println("Received report " + _factory.getReport(message" " + --_count + " remaining");
122         if (_count == 0)
123         {
124             synchronized (_lock)
125             {
126                 _lock.notify();
127             }
128         }
129     }
130 
131     static long min(long[] times)
132     {
133         long min = times.length > ? times[00;
134         for(int i = 0; i < times.length; i++)
135         {
136             min = Math.min(min, times[i]);
137         }
138         return min;
139     }
140 
141     static long max(long[] times)
142     {
143         long max = times.length > ? times[00;
144         for(int i = 0; i < times.length; i++)
145         {
146             max = Math.max(max, times[i]);
147         }
148         return max;
149     }
150 
151     static long avg(long[] times, long min, long max)
152     {
153         long sum = 0;
154         for(int i = 0; i < times.length; i++)
155         {
156             sum += times[i];
157         }
158         sum -= min;
159         sum -= max;
160 
161         return (sum / (times.length - 2));
162     }
163 
164     public static void main(String[] argvthrows Exception
165     {
166         Config config = new Config();
167         config.setOptions(argv);
168 
169         Connection con = config.createConnection();
170         int size = config.getPayload();
171         int ackMode = config.getAckMode();
172         boolean persistent = config.usePersistentMessages();
173         new Publisher(con, size, ackMode, persistent).test(config);
174     }
175 }