001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.transport;
022
023 import java.io.IOException;
024 import java.nio.ByteBuffer;
025
026 import org.apache.qpid.transport.network.ConnectionBinding;
027 import org.apache.qpid.transport.network.io.IoAcceptor;
028
029 /**
030 * Sink
031 *
032 */
033
034 public class Sink implements SessionListener
035 {
036
037 private static final String FORMAT_HDR = "%-12s %-18s %-18s %-18s";
038 private static final String FORMAT_ROW = "SSN#%-8X %-18s %-18s %-18s";
039
040 private long interval = 100000;
041 private long start = System.currentTimeMillis();
042 private long count = 0;
043 private long bytes = 0;
044 private long interval_start = start;
045 private long bytes_start = bytes;
046 private long time = start;
047 private int id = System.identityHashCode(this);
048
049 public Sink()
050 {
051 }
052
053 private double msg_rate()
054 {
055 return 1000 * (double) count / (double) (time - start);
056 }
057
058 private double byte_rate()
059 {
060 return (1000 * (double) bytes / (double) (time - start)) / (1024*1024);
061 }
062
063 private double msg_interval_rate()
064 {
065 return 1000 * (double) interval / (double) (time - interval_start);
066 }
067
068 private double byte_interval_rate()
069 {
070 return (1000 * (double) (bytes - bytes_start) / (double) (time - interval_start)) / (1024*1024);
071 }
072
073 private String rates()
074 {
075 return String.format("%.2f/%.2f", msg_rate(), byte_rate());
076 }
077
078 private String interval_rates()
079 {
080 return String.format("%.2f/%.2f", msg_interval_rate(), byte_interval_rate());
081 }
082
083 private String counts()
084 {
085 return String.format("%d/%.2f", count, ((double) bytes)/(1024*1024));
086 }
087
088 public void opened(Session ssn) {}
089
090 public void message(Session ssn, MessageTransfer xfr)
091 {
092 count++;
093 bytes += xfr.getBody().remaining();
094 if ((count % interval) == 0)
095 {
096 time = System.currentTimeMillis();
097 System.out.println
098 (String.format
099 (FORMAT_ROW, id, counts(), rates(), interval_rates()));
100 interval_start = time;
101 bytes_start = bytes;
102 }
103 ssn.processed(xfr);
104 }
105
106 public void exception(Session ssn, SessionException exc)
107 {
108 exc.printStackTrace();
109 }
110
111 public void closed(Session ssn) {}
112
113 public static final void main(String[] args) throws IOException
114 {
115 ConnectionDelegate delegate = new ServerDelegate()
116 {
117 @Override public Session getSession(Connection conn, SessionAttach atc)
118 {
119 Session ssn = super.getSession(conn, atc);
120 ssn.setSessionListener(new Sink());
121 return ssn;
122 }
123 };
124
125 IoAcceptor ioa = new IoAcceptor
126 ("0.0.0.0", 5672, ConnectionBinding.get(delegate));
127 System.out.println
128 (String.format
129 (FORMAT_HDR, "Session", "Count/MBytes", "Cumulative Rate", "Interval Rate"));
130 System.out.println
131 (String.format
132 (FORMAT_HDR, "-------", "------------", "---------------", "-------------"));
133 ioa.start();
134 }
135
136 }
|