001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.oldtopic;
022 import org.apache.log4j.*;
023 import javax.jms.Connection;
024 import javax.jms.Message;
025 import javax.jms.MessageListener;
026 import javax.jms.MessageProducer;
027 import javax.jms.Session;
028
029 public class Listener implements MessageListener
030 {
031 private final Connection _connection;
032 private final MessageProducer _controller;
033 private final javax.jms.Session _session;
034 private final MessageFactory _factory;
035 private boolean init;
036 private int count;
037 private long start;
038
039 Listener(Connection connection, int ackMode) throws Exception
040 {
041 this(connection, ackMode, null);
042 }
043
044 Listener(Connection connection, int ackMode, String name) throws Exception
045 {
046 _connection = connection;
047 _session = connection.createSession(false, ackMode);
048 _factory = new MessageFactory(_session);
049
050 //register for events
051 if(name == null)
052 {
053 _factory.createTopicConsumer().setMessageListener(this);
054 }
055 else
056 {
057 _factory.createDurableTopicConsumer(name).setMessageListener(this);
058 }
059
060 _connection.start();
061
062 _controller = _factory.createControlPublisher();
063 System.out.println("Waiting for messages " +
064 Config.getAckModeDescription(ackMode)
065 + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")")
066 + "...");
067
068 }
069
070 private void shutdown()
071 {
072 try
073 {
074 _session.close();
075 _connection.stop();
076 _connection.close();
077 }
078 catch(Exception e)
079 {
080 e.printStackTrace(System.out);
081 }
082 }
083
084 private void report()
085 {
086 try
087 {
088 String msg = getReport();
089 _controller.send(_factory.createReportResponseMessage(msg));
090 System.out.println("Sent report: " + msg);
091 }
092 catch(Exception e)
093 {
094 e.printStackTrace(System.out);
095 }
096 }
097
098 private String getReport()
099 {
100 long time = (System.currentTimeMillis() - start);
101 return "Received " + count + " in " + time + "ms";
102 }
103
104 public void onMessage(Message message)
105 {
106 if(!init)
107 {
108 start = System.currentTimeMillis();
109 count = 0;
110 init = true;
111 }
112
113 if(_factory.isShutdown(message))
114 {
115 shutdown();
116 }
117 else if(_factory.isReport(message))
118 {
119 //send a report:
120 report();
121 init = false;
122 }
123 else if (++count % 100 == 0)
124 {
125 System.out.println("Received " + count + " messages.");
126 }
127 }
128
129 public static void main(String[] argv) throws Exception
130 {
131 Config config = new Config();
132 config.setOptions(argv);
133
134 Connection con = config.createConnection();
135 if(config.getClientId() != null)
136 {
137 con.setClientID(config.getClientId());
138 }
139 new Listener(con, config.getAckMode(), config.getSubscriptionId());
140 }
141 }
|