001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.transport.network.io;
022
023 import org.apache.qpid.thread.Threading;
024 import org.apache.qpid.transport.Receiver;
025 import org.apache.qpid.transport.TransportException;
026 import org.apache.qpid.transport.util.Logger;
027
028 import java.io.IOException;
029 import java.io.InputStream;
030 import java.net.Socket;
031 import java.net.SocketException;
032 import java.nio.ByteBuffer;
033 import java.util.concurrent.atomic.AtomicBoolean;
034
035 /**
036 * IoReceiver
037 *
038 */
039
040 final class IoReceiver implements Runnable
041 {
042
043 private static final Logger log = Logger.get(IoReceiver.class);
044
045 private final IoTransport transport;
046 private final Receiver<ByteBuffer> receiver;
047 private final int bufferSize;
048 private final Socket socket;
049 private final long timeout;
050 private final AtomicBoolean closed = new AtomicBoolean(false);
051 private final Thread receiverThread;
052 private final boolean shutdownBroken =
053 ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
054
055 public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver,
056 int bufferSize, long timeout)
057 {
058 this.transport = transport;
059 this.receiver = receiver;
060 this.bufferSize = bufferSize;
061 this.socket = transport.getSocket();
062 this.timeout = timeout;
063
064 try
065 {
066 receiverThread = Threading.getThreadFactory().createThread(this);
067 }
068 catch(Exception e)
069 {
070 throw new Error("Error creating IOReceiver thread",e);
071 }
072 receiverThread.setDaemon(true);
073 receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
074 receiverThread.start();
075 }
076
077 void close(boolean block)
078 {
079 if (!closed.getAndSet(true))
080 {
081 try
082 {
083 if (shutdownBroken)
084 {
085 socket.close();
086 }
087 else
088 {
089 socket.shutdownInput();
090 }
091 if (block && Thread.currentThread() != receiverThread)
092 {
093 receiverThread.join(timeout);
094 if (receiverThread.isAlive())
095 {
096 throw new TransportException("join timed out");
097 }
098 }
099 }
100 catch (InterruptedException e)
101 {
102 throw new TransportException(e);
103 }
104 catch (IOException e)
105 {
106 throw new TransportException(e);
107 }
108 }
109 }
110
111 public void run()
112 {
113 final int threshold = bufferSize / 2;
114
115 // I set the read buffer size simillar to SO_RCVBUF
116 // Haven't tested with a lower value to see if it's better or worse
117 byte[] buffer = new byte[bufferSize];
118 try
119 {
120 InputStream in = socket.getInputStream();
121 int read = 0;
122 int offset = 0;
123 while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
124 {
125 if (read > 0)
126 {
127 ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
128 receiver.received(b);
129 offset+=read;
130 if (offset > threshold)
131 {
132 offset = 0;
133 buffer = new byte[bufferSize];
134 }
135 }
136 }
137 socket.close();
138 }
139 catch (Throwable t)
140 {
141 if (!(shutdownBroken &&
142 t instanceof SocketException &&
143 t.getMessage().equalsIgnoreCase("socket closed") &&
144 closed.get()))
145 {
146 receiver.exception(t);
147 }
148 }
149 finally
150 {
151 receiver.closed();
152 }
153 }
154
155 }
|