001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.topic;
022
023 import javax.jms.*;
024
025 public class Publisher implements MessageListener
026 {
027 private final Object _lock = new Object();
028 private final Connection _connection;
029 private final Session _session;
030 private final MessageFactory _factory;
031 private final MessageProducer _publisher;
032 private int _count;
033
034 Publisher(Connection connection, int size, int ackMode, boolean persistent) throws Exception
035 {
036 _connection = connection;
037 _session = _connection.createSession(false, ackMode);
038 _factory = new MessageFactory(_session, size);
039 _publisher = _factory.createTopicPublisher();
040 _publisher.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
041 System.out.println("Publishing " + (persistent ? "persistent" : "non-persistent") + " messages of " + size + " bytes, " + Config.getAckModeDescription(ackMode) + ".");
042 }
043
044 private void test(Config config) throws Exception
045 {
046 test(config.getBatch(), config.getDelay(), config.getMessages(), config.getClients(), config.getWarmup());
047 }
048
049 private void test(int batches, long delay, int msgCount, int consumerCount, int warmup) throws Exception
050 {
051 _factory.createControlConsumer().setMessageListener(this);
052 _connection.start();
053
054 if (warmup > 0)
055 {
056 System.out.println("Runing warmup (" + warmup + " msgs)");
057 long time = batch(warmup, consumerCount);
058 System.out.println("Warmup completed in " + time + "ms");
059 }
060
061 long[] times = new long[batches];
062 for (int i = 0; i < batches; i++)
063 {
064 if (i > 0)
065 {
066 Thread.sleep(delay * 1000);
067 }
068 times[i] = batch(msgCount, consumerCount);
069 System.out.println("Batch " + (i + 1) + " of " + batches + " completed in " + times[i] + " ms.");
070 }
071
072 long min = min(times);
073 long max = max(times);
074 System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max));
075
076 //request shutdown
077 _publisher.send(_factory.createShutdownMessage());
078
079 _connection.stop();
080 _connection.close();
081 }
082
083 private long batch(int msgCount, int consumerCount) throws Exception
084 {
085 _count = consumerCount;
086 long start = System.currentTimeMillis();
087 publish(msgCount);
088 waitForCompletion(consumerCount);
089 return System.currentTimeMillis() - start;
090 }
091
092 private void publish(int count) throws Exception
093 {
094
095 //send events
096 for (int i = 0; i < count; i++)
097 {
098 _publisher.send(_factory.createEventMessage());
099 if ((i + 1) % 100 == 0)
100 {
101 System.out.println("Sent " + (i + 1) + " messages");
102 }
103 }
104
105 //request report
106 _publisher.send(_factory.createReportRequestMessage());
107 }
108
109 private void waitForCompletion(int consumers) throws Exception
110 {
111 System.out.println("Waiting for completion...");
112 synchronized (_lock)
113 {
114 while (_count > 0)
115 {
116 _lock.wait();
117 }
118 }
119 }
120
121
122 public void onMessage(Message message)
123 {
124 System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining");
125 if (_count == 0)
126 {
127 synchronized (_lock)
128 {
129 _lock.notify();
130 }
131 }
132 }
133
134 static long min(long[] times)
135 {
136 long min = times.length > 0 ? times[0] : 0;
137 for (int i = 0; i < times.length; i++)
138 {
139 min = Math.min(min, times[i]);
140 }
141 return min;
142 }
143
144 static long max(long[] times)
145 {
146 long max = times.length > 0 ? times[0] : 0;
147 for (int i = 0; i < times.length; i++)
148 {
149 max = Math.max(max, times[i]);
150 }
151 return max;
152 }
153
154 static long avg(long[] times, long min, long max)
155 {
156 long sum = 0;
157 for (int i = 0; i < times.length; i++)
158 {
159 sum += times[i];
160 }
161
162 int adjustment = 0;
163
164 // Remove min and max if we have run enough batches.
165 if (times.length > 2)
166 {
167 sum -= min;
168 sum -= max;
169 adjustment = 2;
170 }
171
172 return (sum / (times.length - adjustment));
173 }
174
175 public static void main(String[] argv) throws Exception
176 {
177 Config config = new Config();
178 config.setOptions(argv);
179
180 Connection con = config.createConnection();
181 int size = config.getPayload();
182 int ackMode = config.getAckMode();
183 boolean persistent = config.usePersistentMessages();
184 new Publisher(con, size, ackMode, persistent).test(config);
185 }
186 }
|