Listener.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.oldtopic;
022 import org.apache.log4j.*;
023 import javax.jms.Connection;
024 import javax.jms.Message;
025 import javax.jms.MessageListener;
026 import javax.jms.MessageProducer;
027 import javax.jms.Session;
028 
029 public class Listener implements MessageListener
030 {
031     private final Connection _connection;
032     private final MessageProducer _controller;
033     private final javax.jms.Session _session;
034     private final MessageFactory _factory;
035     private boolean init;
036     private int count;
037     private long start;
038 
039     Listener(Connection connection, int ackModethrows Exception
040     {
041         this(connection, ackMode, null);
042     }
043 
044     Listener(Connection connection, int ackMode, String namethrows Exception
045     {
046         _connection = connection;
047         _session = connection.createSession(false, ackMode);
048         _factory = new MessageFactory(_session);
049 
050         //register for events
051         if(name == null)
052         {
053             _factory.createTopicConsumer().setMessageListener(this);
054         }
055         else
056         {
057             _factory.createDurableTopicConsumer(name).setMessageListener(this);
058         }
059 
060         _connection.start();
061 
062         _controller = _factory.createControlPublisher();
063         System.out.println("Waiting for messages " +
064                 Config.getAckModeDescription(ackMode)
065                 (name == null "" " (subscribed with name " + name + " and client id " + connection.getClientID() ")")
066                 "...");
067 
068     }
069 
070     private void shutdown()
071     {
072         try
073         {
074             _session.close();
075             _connection.stop();
076             _connection.close();
077         }
078         catch(Exception e)
079         {
080             e.printStackTrace(System.out);
081         }
082     }
083 
084     private void report()
085     {
086         try
087         {
088             String msg = getReport();
089             _controller.send(_factory.createReportResponseMessage(msg));
090             System.out.println("Sent report: " + msg);
091         }
092         catch(Exception e)
093         {
094             e.printStackTrace(System.out);
095         }
096     }
097 
098     private String getReport()
099     {
100         long time = (System.currentTimeMillis() - start);
101         return "Received " + count + " in " + time + "ms";
102     }
103 
104     public void onMessage(Message message)
105     {
106         if(!init)
107         {
108             start = System.currentTimeMillis();
109             count = 0;
110             init = true;
111         }
112 
113         if(_factory.isShutdown(message))
114         {
115             shutdown();
116         }
117         else if(_factory.isReport(message))
118         {
119             //send a report:
120             report();
121             init = false;
122         }
123         else if (++count % 100 == 0)
124         {
125             System.out.println("Received " + count + " messages.");
126         }
127     }
128 
129     public static void main(String[] argvthrows Exception
130     {
131         Config config = new Config();
132         config.setOptions(argv);
133 
134         Connection con = config.createConnection();
135         if(config.getClientId() != null)
136         {
137             con.setClientID(config.getClientId());
138         }
139         new Listener(con, config.getAckMode(), config.getSubscriptionId());
140     }
141 }