001 package org.apache.qpid.transport.network.nio;
002 /*
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied. See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 *
021 */
022
023
024 import java.io.EOFException;
025 import java.io.IOException;
026 import java.net.InetSocketAddress;
027 import java.net.SocketAddress;
028 import java.net.SocketException;
029 import java.nio.ByteBuffer;
030 import java.nio.channels.SocketChannel;
031 import java.util.Map;
032 import java.util.concurrent.ConcurrentHashMap;
033 import java.util.concurrent.atomic.AtomicInteger;
034
035 import org.apache.qpid.transport.Connection;
036 import org.apache.qpid.transport.ConnectionDelegate;
037 import org.apache.qpid.transport.Receiver;
038 import org.apache.qpid.transport.network.Assembler;
039 import org.apache.qpid.transport.network.Disassembler;
040 import org.apache.qpid.transport.network.InputHandler;
041
042 public class NioHandler implements Runnable
043 {
044 private Receiver<ByteBuffer> _receiver;
045 private SocketChannel _ch;
046 private ByteBuffer _readBuf;
047 private static Map<Integer,NioSender> _handlers = new ConcurrentHashMap<Integer,NioSender>();
048 private AtomicInteger _count = new AtomicInteger();
049
050 private NioHandler(){}
051
052 public static final Connection connect(String host, int port,
053 ConnectionDelegate delegate)
054 {
055 NioHandler handler = new NioHandler();
056 return handler.connectInternal(host,port,delegate);
057 }
058
059 private Connection connectInternal(String host, int port,
060 ConnectionDelegate delegate)
061 {
062 try
063 {
064 SocketAddress address = new InetSocketAddress(host,port);
065 _ch = SocketChannel.open();
066 _ch.socket().setReuseAddress(true);
067 _ch.configureBlocking(true);
068 _ch.socket().setTcpNoDelay(true);
069 if (address != null)
070 {
071 _ch.socket().connect(address);
072 }
073 while (_ch.isConnectionPending())
074 {
075
076 }
077
078 }
079 catch (SocketException e)
080 {
081
082 e.printStackTrace();
083 }
084 catch (IOException e)
085 {
086 e.printStackTrace();
087 }
088
089 NioSender sender = new NioSender(_ch);
090 Connection con = new Connection();
091 con.setSender(new Disassembler(sender, 64*1024 - 1));
092 con.setConnectionDelegate(delegate);
093
094 con.setConnectionId(_count.incrementAndGet());
095 _handlers.put(con.getConnectionId(),sender);
096
097 _receiver = new InputHandler(new Assembler(con), InputHandler.State.FRAME_HDR);
098
099 Thread t = new Thread(this);
100 t.start();
101
102 return con;
103 }
104
105 public void run()
106 {
107 _readBuf = ByteBuffer.allocate(512);
108 long read = 0;
109 while(_ch.isConnected() && _ch.isOpen())
110 {
111 try
112 {
113 read = _ch.read(_readBuf);
114 if (read > 0)
115 {
116 _readBuf.flip();
117 ByteBuffer b = ByteBuffer.allocate(_readBuf.remaining());
118 b.put(_readBuf);
119 b.flip();
120 _readBuf.clear();
121 _receiver.received(b);
122 }
123 }
124 catch(Exception e)
125 {
126 e.printStackTrace();
127 }
128 }
129
130 //throw new EOFException("The underlying socket/channel has closed");
131 }
132
133 public static void startBatchingFrames(int connectionId)
134 {
135 NioSender sender = _handlers.get(connectionId);
136 sender.setStartBatching();
137 }
138
139
140 }
|