NioHandler.java
001 package org.apache.qpid.transport.network.nio;
002 /*
003  
004  * Licensed to the Apache Software Foundation (ASF) under one
005  * or more contributor license agreements.  See the NOTICE file
006  * distributed with this work for additional information
007  * regarding copyright ownership.  The ASF licenses this file
008  * to you under the Apache License, Version 2.0 (the
009  * "License"); you may not use this file except in compliance
010  * with the License.  You may obtain a copy of the License at
011  
012  *   http://www.apache.org/licenses/LICENSE-2.0
013  
014  * Unless required by applicable law or agreed to in writing,
015  * software distributed under the License is distributed on an
016  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017  * KIND, either express or implied.  See the License for the
018  * specific language governing permissions and limitations
019  * under the License.
020  
021  */
022 
023 
024 import java.io.EOFException;
025 import java.io.IOException;
026 import java.net.InetSocketAddress;
027 import java.net.SocketAddress;
028 import java.net.SocketException;
029 import java.nio.ByteBuffer;
030 import java.nio.channels.SocketChannel;
031 import java.util.Map;
032 import java.util.concurrent.ConcurrentHashMap;
033 import java.util.concurrent.atomic.AtomicInteger;
034 
035 import org.apache.qpid.transport.Connection;
036 import org.apache.qpid.transport.ConnectionDelegate;
037 import org.apache.qpid.transport.Receiver;
038 import org.apache.qpid.transport.network.Assembler;
039 import org.apache.qpid.transport.network.Disassembler;
040 import org.apache.qpid.transport.network.InputHandler;
041 
042 public class NioHandler implements Runnable
043 {
044     private Receiver<ByteBuffer> _receiver;
045     private SocketChannel _ch;
046     private ByteBuffer _readBuf;
047     private static Map<Integer,NioSender> _handlers = new ConcurrentHashMap<Integer,NioSender>();
048     private AtomicInteger _count = new AtomicInteger();
049 
050     private NioHandler(){}
051 
052     public static final Connection connect(String host, int port,
053             ConnectionDelegate delegate)
054     {
055         NioHandler handler = new NioHandler();
056         return handler.connectInternal(host,port,delegate);
057     }
058 
059     private Connection connectInternal(String host, int port,
060             ConnectionDelegate delegate)
061     {
062         try
063         {
064             SocketAddress address = new InetSocketAddress(host,port);
065             _ch = SocketChannel.open();
066             _ch.socket().setReuseAddress(true);
067             _ch.configureBlocking(true);
068             _ch.socket().setTcpNoDelay(true);
069             if (address != null)
070             {
071                 _ch.socket().connect(address);
072             }
073             while (_ch.isConnectionPending())
074             {
075 
076             }
077 
078         }
079         catch (SocketException e)
080         {
081 
082             e.printStackTrace();
083         }
084         catch (IOException e)
085         {
086             e.printStackTrace();
087         }
088 
089         NioSender sender = new NioSender(_ch);
090         Connection con = new Connection();
091         con.setSender(new Disassembler(sender, 64*1024 1));
092         con.setConnectionDelegate(delegate);
093 
094         con.setConnectionId(_count.incrementAndGet());
095         _handlers.put(con.getConnectionId(),sender);
096 
097         _receiver = new InputHandler(new Assembler(con), InputHandler.State.FRAME_HDR);
098 
099         Thread t = new Thread(this);
100         t.start();
101 
102         return con;
103     }
104 
105     public void run()
106     {
107         _readBuf = ByteBuffer.allocate(512);
108         long read = 0;
109         while(_ch.isConnected() && _ch.isOpen())
110         {
111             try
112             {
113                 read = _ch.read(_readBuf);
114                 if (read > 0)
115                 {
116                     _readBuf.flip();
117                     ByteBuffer b = ByteBuffer.allocate(_readBuf.remaining());
118                     b.put(_readBuf);
119                     b.flip();
120                     _readBuf.clear();
121                     _receiver.received(b);
122                 }
123             }
124             catch(Exception e)
125             {
126                 e.printStackTrace();
127             }
128         }
129 
130         //throw new EOFException("The underlying socket/channel has closed");
131     }
132 
133     public static void startBatchingFrames(int connectionId)
134     {
135         NioSender sender = _handlers.get(connectionId);
136         sender.setStartBatching();
137     }
138 
139 
140 }