001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.oldtopic;
022
023 import javax.jms.*;
024
025 public class Publisher implements MessageListener
026 {
027 private final Object _lock = new Object();
028 private final Connection _connection;
029 private final Session _session;
030 private final MessageFactory _factory;
031 private final MessageProducer _publisher;
032 private int _count;
033
034 Publisher(Connection connection, int size, int ackMode, boolean persistent) throws Exception
035 {
036 _connection = connection;
037 _session = _connection.createSession(false, ackMode);
038 _factory = new MessageFactory(_session, size);
039 _publisher = _factory.createTopicPublisher();
040 _publisher.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
041 System.out.println("Publishing " + (persistent ? "persistent" : "non-persistent") + " messages of " + size + " bytes, " + Config.getAckModeDescription(ackMode) + ".");
042 }
043
044 private void test(Config config) throws Exception
045 {
046 test(config.getBatch(), config.getDelay(), config.getMessages(), config.getClients(), config.getWarmup());
047 }
048
049 private void test(int batches, long delay, int msgCount, int consumerCount, int warmup) throws Exception
050 {
051 _factory.createControlConsumer().setMessageListener(this);
052 _connection.start();
053
054 if(warmup > 0)
055 {
056 System.out.println("Runing warmup (" + warmup + " msgs)");
057 long time = batch(warmup, consumerCount);
058 System.out.println("Warmup completed in " + time + "ms");
059 }
060
061 long[] times = new long[batches];
062 for(int i = 0; i < batches; i++)
063 {
064 if(i > 0) Thread.sleep(delay*1000);
065 times[i] = batch(msgCount, consumerCount);
066 System.out.println("Batch " + (i+1) + " of " + batches + " completed in " + times[i] + " ms.");
067 }
068
069 long min = min(times);
070 long max = max(times);
071 System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max));
072
073 //request shutdown
074 _publisher.send(_factory.createShutdownMessage());
075
076 _connection.stop();
077 _connection.close();
078 }
079
080 private long batch(int msgCount, int consumerCount) throws Exception
081 {
082 _count = consumerCount;
083 long start = System.currentTimeMillis();
084 publish(msgCount);
085 waitForCompletion(consumerCount);
086 return System.currentTimeMillis() - start;
087 }
088
089 private void publish(int count) throws Exception
090 {
091
092 //send events
093 for (int i = 0; i < count; i++)
094 {
095 _publisher.send(_factory.createEventMessage());
096 if ((i + 1) % 100 == 0)
097 {
098 System.out.println("Sent " + (i + 1) + " messages");
099 }
100 }
101
102 //request report
103 _publisher.send(_factory.createReportRequestMessage());
104 }
105
106 private void waitForCompletion(int consumers) throws Exception
107 {
108 System.out.println("Waiting for completion...");
109 synchronized (_lock)
110 {
111 while (_count > 0)
112 {
113 _lock.wait();
114 }
115 }
116 }
117
118
119 public void onMessage(Message message)
120 {
121 System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining");
122 if (_count == 0)
123 {
124 synchronized (_lock)
125 {
126 _lock.notify();
127 }
128 }
129 }
130
131 static long min(long[] times)
132 {
133 long min = times.length > 0 ? times[0] : 0;
134 for(int i = 0; i < times.length; i++)
135 {
136 min = Math.min(min, times[i]);
137 }
138 return min;
139 }
140
141 static long max(long[] times)
142 {
143 long max = times.length > 0 ? times[0] : 0;
144 for(int i = 0; i < times.length; i++)
145 {
146 max = Math.max(max, times[i]);
147 }
148 return max;
149 }
150
151 static long avg(long[] times, long min, long max)
152 {
153 long sum = 0;
154 for(int i = 0; i < times.length; i++)
155 {
156 sum += times[i];
157 }
158 sum -= min;
159 sum -= max;
160
161 return (sum / (times.length - 2));
162 }
163
164 public static void main(String[] argv) throws Exception
165 {
166 Config config = new Config();
167 config.setOptions(argv);
168
169 Connection con = config.createConnection();
170 int size = config.getPayload();
171 int ackMode = config.getAckMode();
172 boolean persistent = config.usePersistentMessages();
173 new Publisher(con, size, ackMode, persistent).test(config);
174 }
175 }
|