Sink.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.transport;
022 
023 import java.io.IOException;
024 import java.nio.ByteBuffer;
025 
026 import org.apache.qpid.transport.network.ConnectionBinding;
027 import org.apache.qpid.transport.network.io.IoAcceptor;
028 
029 /**
030  * Sink
031  *
032  */
033 
034 public class Sink implements SessionListener
035 {
036 
037     private static final String FORMAT_HDR = "%-12s %-18s %-18s %-18s";
038     private static final String FORMAT_ROW = "SSN#%-8X %-18s %-18s %-18s";
039 
040     private long interval = 100000;
041     private long start = System.currentTimeMillis();
042     private long count = 0;
043     private long bytes = 0;
044     private long interval_start = start;
045     private long bytes_start = bytes;
046     private long time = start;
047     private int id = System.identityHashCode(this);
048 
049     public Sink()
050     {
051     }
052 
053     private double msg_rate()
054     {
055         return 1000 (doublecount / (double) (time - start);
056     }
057 
058     private double byte_rate()
059     {
060         return (1000 (doublebytes / (double) (time - start)) (1024*1024);
061     }
062 
063     private double msg_interval_rate()
064     {
065         return 1000 (doubleinterval / (double) (time - interval_start);
066     }
067 
068     private double byte_interval_rate()
069     {
070         return (1000 (double) (bytes - bytes_start(double) (time - interval_start)) (1024*1024);
071     }
072 
073     private String rates()
074     {
075         return String.format("%.2f/%.2f", msg_rate(), byte_rate());
076     }
077 
078     private String interval_rates()
079     {
080         return String.format("%.2f/%.2f", msg_interval_rate(), byte_interval_rate());
081     }
082 
083     private String counts()
084     {
085         return String.format("%d/%.2f", count, ((doublebytes)/(1024*1024));
086     }
087 
088     public void opened(Session ssn) {}
089 
090     public void message(Session ssn, MessageTransfer xfr)
091     {
092         count++;
093         bytes += xfr.getBody().remaining();
094         if ((count % interval== 0)
095         {
096             time = System.currentTimeMillis();
097             System.out.println
098                 (String.format
099                  (FORMAT_ROW, id, counts(), rates(), interval_rates()));
100             interval_start = time;
101             bytes_start = bytes;
102         }
103         ssn.processed(xfr);
104     }
105 
106     public void exception(Session ssn, SessionException exc)
107     {
108         exc.printStackTrace();
109     }
110 
111     public void closed(Session ssn) {}
112 
113     public static final void main(String[] argsthrows IOException
114     {
115         ConnectionDelegate delegate = new ServerDelegate()
116         {
117             @Override public Session getSession(Connection conn, SessionAttach atc)
118             {
119                 Session ssn = super.getSession(conn, atc);
120                 ssn.setSessionListener(new Sink());
121                 return ssn;
122             }
123         };
124 
125         IoAcceptor ioa = new IoAcceptor
126             ("0.0.0.0"5672, ConnectionBinding.get(delegate));
127         System.out.println
128             (String.format
129              (FORMAT_HDR, "Session""Count/MBytes""Cumulative Rate""Interval Rate"));
130         System.out.println
131             (String.format
132              (FORMAT_HDR, "-------""------------""---------------""-------------"));
133         ioa.start();
134     }
135 
136 }