001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019 package org.apache.qpid.transport.network.io;
020
021 import static org.apache.qpid.transport.util.Functions.mod;
022
023 import java.io.IOException;
024 import java.io.OutputStream;
025 import java.net.Socket;
026 import java.nio.ByteBuffer;
027 import java.util.concurrent.atomic.AtomicBoolean;
028
029 import org.apache.qpid.thread.Threading;
030 import org.apache.qpid.transport.Sender;
031 import org.apache.qpid.transport.SenderException;
032 import org.apache.qpid.transport.TransportException;
033 import org.apache.qpid.transport.util.Logger;
034
035
036 public final class IoSender implements Runnable, Sender<ByteBuffer>
037 {
038
039 private static final Logger log = Logger.get(IoSender.class);
040
041 // by starting here, we ensure that we always test the wraparound
042 // case, we should probably make this configurable somehow so that
043 // we can test other cases as well
044 private final static int START = Integer.MAX_VALUE - 10;
045
046 private final IoTransport transport;
047 private final long timeout;
048 private final Socket socket;
049 private final OutputStream out;
050
051 private final byte[] buffer;
052 private volatile int head = START;
053 private volatile int tail = START;
054 private volatile boolean idle = true;
055 private final Object notFull = new Object();
056 private final Object notEmpty = new Object();
057 private final AtomicBoolean closed = new AtomicBoolean(false);
058 private final Thread senderThread;
059 private long idleTimeout;
060
061 private volatile Throwable exception = null;
062
063
064 public IoSender(IoTransport transport, int bufferSize, long timeout)
065 {
066 this.transport = transport;
067 this.socket = transport.getSocket();
068 this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
069 this.timeout = timeout;
070
071 try
072 {
073 out = socket.getOutputStream();
074 }
075 catch (IOException e)
076 {
077 throw new TransportException("Error getting output stream for socket", e);
078 }
079
080 try
081 {
082 senderThread = Threading.getThreadFactory().createThread(this);
083 }
084 catch(Exception e)
085 {
086 throw new Error("Error creating IOSender thread",e);
087 }
088
089 senderThread.setDaemon(true);
090 senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
091 senderThread.start();
092 }
093
094 private static final int pof2(int n)
095 {
096 int result = 1;
097 while (result < n)
098 {
099 result *= 2;
100 }
101 return result;
102 }
103
104 public void send(ByteBuffer buf)
105 {
106 if (closed.get())
107 {
108 throw new SenderException("sender is closed", exception);
109 }
110
111 final int size = buffer.length;
112 int remaining = buf.remaining();
113
114 while (remaining > 0)
115 {
116 final int hd = head;
117 final int tl = tail;
118
119 if (hd - tl >= size)
120 {
121 flush();
122 synchronized (notFull)
123 {
124 long start = System.currentTimeMillis();
125 long elapsed = 0;
126 while (!closed.get() && head - tail >= size && elapsed < timeout)
127 {
128 try
129 {
130 notFull.wait(timeout - elapsed);
131 }
132 catch (InterruptedException e)
133 {
134 // pass
135 }
136 elapsed += System.currentTimeMillis() - start;
137 }
138
139 if (closed.get())
140 {
141 throw new SenderException("sender is closed", exception);
142 }
143
144 if (head - tail >= size)
145 {
146 throw new SenderException(String.format("write timed out: %s, %s", head, tail));
147 }
148 }
149 continue;
150 }
151
152 final int hd_idx = mod(hd, size);
153 final int tl_idx = mod(tl, size);
154 final int length;
155
156 if (tl_idx > hd_idx)
157 {
158 length = Math.min(tl_idx - hd_idx, remaining);
159 }
160 else
161 {
162 length = Math.min(size - hd_idx, remaining);
163 }
164
165 buf.get(buffer, hd_idx, length);
166 head += length;
167 remaining -= length;
168 }
169 }
170
171 public void flush()
172 {
173 if (idle)
174 {
175 synchronized (notEmpty)
176 {
177 notEmpty.notify();
178 }
179 }
180 }
181
182 public void close()
183 {
184 close(true);
185 }
186
187 void close(boolean reportException)
188 {
189 if (!closed.getAndSet(true))
190 {
191 synchronized (notFull)
192 {
193 notFull.notify();
194 }
195
196 synchronized (notEmpty)
197 {
198 notEmpty.notify();
199 }
200
201 try
202 {
203 if (Thread.currentThread() != senderThread)
204 {
205 senderThread.join(timeout);
206 if (senderThread.isAlive())
207 {
208 throw new SenderException("join timed out");
209 }
210 }
211 transport.getReceiver().close(false);
212 }
213 catch (InterruptedException e)
214 {
215 throw new SenderException(e);
216 }
217
218 if (reportException && exception != null)
219 {
220 throw new SenderException(exception);
221 }
222 }
223 }
224
225 public void run()
226 {
227 final int size = buffer.length;
228 while (true)
229 {
230 final int hd = head;
231 final int tl = tail;
232
233 if (hd == tl)
234 {
235 if (closed.get())
236 {
237 break;
238 }
239
240 idle = true;
241
242 synchronized (notEmpty)
243 {
244 while (head == tail && !closed.get())
245 {
246 try
247 {
248 notEmpty.wait();
249 }
250 catch (InterruptedException e)
251 {
252 // pass
253 }
254 }
255 }
256
257 idle = false;
258
259 continue;
260 }
261
262 final int hd_idx = mod(hd, size);
263 final int tl_idx = mod(tl, size);
264
265 final int length;
266 if (tl_idx < hd_idx)
267 {
268 length = hd_idx - tl_idx;
269 }
270 else
271 {
272 length = size - tl_idx;
273 }
274
275 try
276 {
277 out.write(buffer, tl_idx, length);
278 }
279 catch (IOException e)
280 {
281 log.error(e, "error in write thread");
282 exception = e;
283 close(false);
284 break;
285 }
286 tail += length;
287 if (head - tl >= size)
288 {
289 synchronized (notFull)
290 {
291 notFull.notify();
292 }
293 }
294 }
295 }
296
297 public void setIdleTimeout(long l)
298 {
299 try
300 {
301 socket.setSoTimeout((int)l*2);
302 idleTimeout = l;
303 }
304 catch (Exception e)
305 {
306 throw new SenderException(e);
307 }
308 }
309 }
|