Publisher.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.topic;
022 
023 import javax.jms.*;
024 
025 public class Publisher implements MessageListener
026 {
027     private final Object _lock = new Object();
028     private final Connection _connection;
029     private final Session _session;
030     private final MessageFactory _factory;
031     private final MessageProducer _publisher;
032     private int _count;
033 
034     Publisher(Connection connection, int size, int ackMode, boolean persistentthrows Exception
035     {
036         _connection = connection;
037         _session = _connection.createSession(false, ackMode);
038         _factory = new MessageFactory(_session, size);
039         _publisher = _factory.createTopicPublisher();
040         _publisher.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
041         System.out.println("Publishing " (persistent ? "persistent" "non-persistent"" messages of " + size + " bytes, " + Config.getAckModeDescription(ackMode".");
042     }
043 
044     private void test(Config configthrows Exception
045     {
046         test(config.getBatch(), config.getDelay(), config.getMessages(), config.getClients(), config.getWarmup());
047     }
048 
049     private void test(int batches, long delay, int msgCount, int consumerCount, int warmupthrows Exception
050     {
051         _factory.createControlConsumer().setMessageListener(this);
052         _connection.start();
053 
054         if (warmup > 0)
055         {
056             System.out.println("Runing warmup (" + warmup + " msgs)");
057             long time = batch(warmup, consumerCount);
058             System.out.println("Warmup completed in " + time + "ms");
059         }
060 
061         long[] times = new long[batches];
062         for (int i = 0; i < batches; i++)
063         {
064             if (i > 0)
065             {
066                 Thread.sleep(delay * 1000);
067             }
068             times[i= batch(msgCount, consumerCount);
069             System.out.println("Batch " (i + 1" of " + batches + " completed in " + times[i" ms.");
070         }
071 
072         long min = min(times);
073         long max = max(times);
074         System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max));
075 
076         //request shutdown
077         _publisher.send(_factory.createShutdownMessage());
078 
079         _connection.stop();
080         _connection.close();
081     }
082 
083     private long batch(int msgCount, int consumerCountthrows Exception
084     {
085         _count = consumerCount;
086         long start = System.currentTimeMillis();
087         publish(msgCount);
088         waitForCompletion(consumerCount);
089         return System.currentTimeMillis() - start;
090     }
091 
092     private void publish(int countthrows Exception
093     {
094 
095         //send events
096         for (int i = 0; i < count; i++)
097         {
098             _publisher.send(_factory.createEventMessage());
099             if ((i + 1100 == 0)
100             {
101                 System.out.println("Sent " (i + 1" messages");
102             }
103         }
104 
105         //request report
106         _publisher.send(_factory.createReportRequestMessage());
107     }
108 
109     private void waitForCompletion(int consumersthrows Exception
110     {
111         System.out.println("Waiting for completion...");
112         synchronized (_lock)
113         {
114             while (_count > 0)
115             {
116                 _lock.wait();
117             }
118         }
119     }
120 
121 
122     public void onMessage(Message message)
123     {
124         System.out.println("Received report " + _factory.getReport(message" " + --_count + " remaining");
125         if (_count == 0)
126         {
127             synchronized (_lock)
128             {
129                 _lock.notify();
130             }
131         }
132     }
133 
134     static long min(long[] times)
135     {
136         long min = times.length > ? times[00;
137         for (int i = 0; i < times.length; i++)
138         {
139             min = Math.min(min, times[i]);
140         }
141         return min;
142     }
143 
144     static long max(long[] times)
145     {
146         long max = times.length > ? times[00;
147         for (int i = 0; i < times.length; i++)
148         {
149             max = Math.max(max, times[i]);
150         }
151         return max;
152     }
153 
154     static long avg(long[] times, long min, long max)
155     {
156         long sum = 0;
157         for (int i = 0; i < times.length; i++)
158         {
159             sum += times[i];
160         }
161 
162         int adjustment = 0;
163 
164         // Remove min and max if we have run enough batches.
165         if (times.length > 2)
166         {
167             sum -= min;
168             sum -= max;
169             adjustment = 2;
170         }
171 
172         return (sum / (times.length - adjustment));
173     }
174 
175     public static void main(String[] argvthrows Exception
176     {
177         Config config = new Config();
178         config.setOptions(argv);
179 
180         Connection con = config.createConnection();
181         int size = config.getPayload();
182         int ackMode = config.getAckMode();
183         boolean persistent = config.usePersistentMessages();
184         new Publisher(con, size, ackMode, persistent).test(config);
185     }
186 }