IoReceiver.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.transport.network.io;
022 
023 import org.apache.qpid.thread.Threading;
024 import org.apache.qpid.transport.Receiver;
025 import org.apache.qpid.transport.TransportException;
026 import org.apache.qpid.transport.util.Logger;
027 
028 import java.io.IOException;
029 import java.io.InputStream;
030 import java.net.Socket;
031 import java.net.SocketException;
032 import java.nio.ByteBuffer;
033 import java.util.concurrent.atomic.AtomicBoolean;
034 
035 /**
036  * IoReceiver
037  *
038  */
039 
040 final class IoReceiver implements Runnable
041 {
042 
043     private static final Logger log = Logger.get(IoReceiver.class);
044 
045     private final IoTransport transport;
046     private final Receiver<ByteBuffer> receiver;
047     private final int bufferSize;
048     private final Socket socket;
049     private final long timeout;
050     private final AtomicBoolean closed = new AtomicBoolean(false);
051     private final Thread receiverThread;
052     private final boolean shutdownBroken =
053         ((StringSystem.getProperties().get("os.name")).matches("(?i).*windows.*");
054 
055     public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver,
056                       int bufferSize, long timeout)
057     {
058         this.transport = transport;
059         this.receiver = receiver;
060         this.bufferSize = bufferSize;
061         this.socket = transport.getSocket();
062         this.timeout = timeout;
063         
064         try
065         {
066             receiverThread = Threading.getThreadFactory().createThread(this);                      
067         }
068         catch(Exception e)
069         {
070             throw new Error("Error creating IOReceiver thread",e);
071         }
072         receiverThread.setDaemon(true);
073         receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
074         receiverThread.start();
075     }
076 
077     void close(boolean block)
078     {
079         if (!closed.getAndSet(true))
080         {
081             try
082             {
083                 if (shutdownBroken)
084                 {
085                    socket.close();
086                 }
087                 else
088                 {
089                     socket.shutdownInput();
090                 }
091                 if (block && Thread.currentThread() != receiverThread)
092                 {
093                     receiverThread.join(timeout);
094                     if (receiverThread.isAlive())
095                     {
096                         throw new TransportException("join timed out");
097                     }
098                 }
099             }
100             catch (InterruptedException e)
101             {
102                 throw new TransportException(e);
103             }
104             catch (IOException e)
105             {
106                 throw new TransportException(e);
107             }
108         }
109     }
110 
111     public void run()
112     {
113         final int threshold = bufferSize / 2;
114 
115         // I set the read buffer size simillar to SO_RCVBUF
116         // Haven't tested with a lower value to see if it's better or worse
117         byte[] buffer = new byte[bufferSize];
118         try
119         {
120             InputStream in = socket.getInputStream();
121             int read = 0;
122             int offset = 0;
123             while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
124             {
125                 if (read > 0)
126                 {
127                     ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
128                     receiver.received(b);
129                     offset+=read;
130                     if (offset > threshold)
131                     {
132                         offset = 0;
133                         buffer = new byte[bufferSize];
134                     }
135                 }
136             }
137             socket.close();
138         }
139         catch (Throwable t)
140         {
141             if (!(shutdownBroken &&
142                   instanceof SocketException &&
143                   t.getMessage().equalsIgnoreCase("socket closed"&&
144                   closed.get()))
145             {
146                 receiver.exception(t);
147             }
148         }
149         finally
150         {
151             receiver.closed();
152         }
153     }
154 
155 }