001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 *
019 */
020 package org.apache.mina.transport.socket.nio;
021
022 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
023 import org.apache.mina.common.ConnectFuture;
024 import org.apache.mina.common.ExceptionMonitor;
025 import org.apache.mina.common.IoConnector;
026 import org.apache.mina.common.IoConnectorConfig;
027 import org.apache.mina.common.IoHandler;
028 import org.apache.mina.common.IoServiceConfig;
029 import org.apache.mina.common.support.AbstractIoFilterChain;
030 import org.apache.mina.common.support.DefaultConnectFuture;
031 import org.apache.mina.util.NamePreservingRunnable;
032 import org.apache.mina.util.NewThreadExecutor;
033 import org.apache.mina.util.Queue;
034
035 import java.io.IOException;
036 import java.net.ConnectException;
037 import java.net.InetSocketAddress;
038 import java.net.SocketAddress;
039 import java.nio.channels.SelectionKey;
040 import java.nio.channels.Selector;
041 import java.nio.channels.SocketChannel;
042 import java.util.Iterator;
043 import java.util.Set;
044
045 /**
046 * {@link IoConnector} for socket transport (TCP/IP).
047 *
048 * @author The Apache Directory Project (mina-dev@directory.apache.org)
049 * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
050 */
051 public class MultiThreadSocketConnector extends SocketConnector
052 {
053 /** @noinspection StaticNonFinalField */
054 private static volatile int nextId = 0;
055
056 private final Object lock = new Object();
057 private final int id = nextId++;
058 private final String threadName = "SocketConnector-" + id;
059 private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
060 private final Queue connectQueue = new Queue();
061 private final MultiThreadSocketIoProcessor[] ioProcessors;
062 private final int processorCount;
063 private final Executor executor;
064
065 /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
066 private Selector selector;
067 private Worker worker;
068 private int processorDistributor = 0;
069 private int workerTimeout = 60; // 1 min.
070
071 /** Create a connector with a single processing thread using a NewThreadExecutor */
072 public MultiThreadSocketConnector()
073 {
074 this(1, new NewThreadExecutor());
075 }
076
077 /**
078 * Create a connector with the desired number of processing threads
079 *
080 * @param processorCount Number of processing threads
081 * @param executor Executor to use for launching threads
082 */
083 public MultiThreadSocketConnector(int processorCount, Executor executor)
084 {
085 if (processorCount < 1)
086 {
087 throw new IllegalArgumentException("Must have at least one processor");
088 }
089
090 this.executor = executor;
091 this.processorCount = processorCount;
092 ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
093
094 for (int i = 0; i < processorCount; i++)
095 {
096 ioProcessors[i] = new MultiThreadSocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
097 }
098 }
099
100 /**
101 * How many seconds to keep the connection thread alive between connection requests
102 *
103 * @return Number of seconds to keep connection thread alive
104 */
105 public int getWorkerTimeout()
106 {
107 return workerTimeout;
108 }
109
110 /**
111 * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.
112 *
113 * @param workerTimeout Number of seconds to keep thread alive. Must be >=0
114 */
115 public void setWorkerTimeout(int workerTimeout)
116 {
117 if (workerTimeout < 0)
118 {
119 throw new IllegalArgumentException("Must be >= 0");
120 }
121 this.workerTimeout = workerTimeout;
122 }
123
124 public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
125 {
126 return connect(address, null, handler, config);
127 }
128
129 public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
130 IoHandler handler, IoServiceConfig config)
131 {
132 if (address == null)
133 {
134 throw new NullPointerException("address");
135 }
136 if (handler == null)
137 {
138 throw new NullPointerException("handler");
139 }
140
141 if (!(address instanceof InetSocketAddress))
142 {
143 throw new IllegalArgumentException("Unexpected address type: "
144 + address.getClass());
145 }
146
147 if (localAddress != null && !(localAddress instanceof InetSocketAddress))
148 {
149 throw new IllegalArgumentException("Unexpected local address type: "
150 + localAddress.getClass());
151 }
152
153 if (config == null)
154 {
155 config = getDefaultConfig();
156 }
157
158 SocketChannel ch = null;
159 boolean success = false;
160 try
161 {
162 ch = SocketChannel.open();
163 ch.socket().setReuseAddress(true);
164 if (localAddress != null)
165 {
166 ch.socket().bind(localAddress);
167 }
168
169 ch.configureBlocking(false);
170
171 if (ch.connect(address))
172 {
173 DefaultConnectFuture future = new DefaultConnectFuture();
174 newSession(ch, handler, config, future);
175 success = true;
176 return future;
177 }
178
179 success = true;
180 }
181 catch (IOException e)
182 {
183 return DefaultConnectFuture.newFailedFuture(e);
184 }
185 finally
186 {
187 if (!success && ch != null)
188 {
189 try
190 {
191 ch.close();
192 }
193 catch (IOException e)
194 {
195 ExceptionMonitor.getInstance().exceptionCaught(e);
196 }
197 }
198 }
199
200 ConnectionRequest request = new ConnectionRequest(ch, handler, config);
201 synchronized (lock)
202 {
203 try
204 {
205 startupWorker();
206 }
207 catch (IOException e)
208 {
209 try
210 {
211 ch.close();
212 }
213 catch (IOException e2)
214 {
215 ExceptionMonitor.getInstance().exceptionCaught(e2);
216 }
217
218 return DefaultConnectFuture.newFailedFuture(e);
219 }
220 }
221
222 synchronized (connectQueue)
223 {
224 connectQueue.push(request);
225 }
226 selector.wakeup();
227
228 return request;
229 }
230
231 private synchronized void startupWorker() throws IOException
232 {
233 if (worker == null)
234 {
235 selector = Selector.open();
236 worker = new Worker();
237 executor.execute(new NamePreservingRunnable(worker));
238 }
239 }
240
241 private void registerNew()
242 {
243 if (connectQueue.isEmpty())
244 {
245 return;
246 }
247
248 for (; ;)
249 {
250 ConnectionRequest req;
251 synchronized (connectQueue)
252 {
253 req = (ConnectionRequest) connectQueue.pop();
254 }
255
256 if (req == null)
257 {
258 break;
259 }
260
261 SocketChannel ch = req.channel;
262 try
263 {
264 ch.register(selector, SelectionKey.OP_CONNECT, req);
265 }
266 catch (IOException e)
267 {
268 req.setException(e);
269 }
270 }
271 }
272
273 private void processSessions(Set keys)
274 {
275 Iterator it = keys.iterator();
276
277 while (it.hasNext())
278 {
279 SelectionKey key = (SelectionKey) it.next();
280
281 if (!key.isConnectable())
282 {
283 continue;
284 }
285
286 SocketChannel ch = (SocketChannel) key.channel();
287 ConnectionRequest entry = (ConnectionRequest) key.attachment();
288
289 boolean success = false;
290 try
291 {
292 ch.finishConnect();
293 newSession(ch, entry.handler, entry.config, entry);
294 success = true;
295 }
296 catch (Throwable e)
297 {
298 entry.setException(e);
299 }
300 finally
301 {
302 key.cancel();
303 if (!success)
304 {
305 try
306 {
307 ch.close();
308 }
309 catch (IOException e)
310 {
311 ExceptionMonitor.getInstance().exceptionCaught(e);
312 }
313 }
314 }
315 }
316
317 keys.clear();
318 }
319
320 private void processTimedOutSessions(Set keys)
321 {
322 long currentTime = System.currentTimeMillis();
323 Iterator it = keys.iterator();
324
325 while (it.hasNext())
326 {
327 SelectionKey key = (SelectionKey) it.next();
328
329 if (!key.isValid())
330 {
331 continue;
332 }
333
334 ConnectionRequest entry = (ConnectionRequest) key.attachment();
335
336 if (currentTime >= entry.deadline)
337 {
338 entry.setException(new ConnectException());
339 try
340 {
341 key.channel().close();
342 }
343 catch (IOException e)
344 {
345 ExceptionMonitor.getInstance().exceptionCaught(e);
346 }
347 finally
348 {
349 key.cancel();
350 }
351 }
352 }
353 }
354
355 private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
356 throws IOException
357 {
358 MultiThreadSocketSessionImpl session =
359 new MultiThreadSocketSessionImpl(this, nextProcessor(), getListeners(),
360 config, ch, handler, ch.socket().getRemoteSocketAddress());
361
362 //new interface
363 // SocketSessionImpl session = new SocketSessionImpl(
364 // this, nextProcessor(), getListeners(),
365 // config, ch, handler, ch.socket().getRemoteSocketAddress() );
366 try
367 {
368 getFilterChainBuilder().buildFilterChain(session.getFilterChain());
369 config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
370 config.getThreadModel().buildFilterChain(session.getFilterChain());
371 }
372 catch (Throwable e)
373 {
374 throw (IOException) new IOException("Failed to create a session.").initCause(e);
375 }
376
377 // Set the ConnectFuture of the specified session, which will be
378 // removed and notified by AbstractIoFilterChain eventually.
379 session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture );
380
381 // Forward the remaining process to the SocketIoProcessor.
382 session.getIoProcessor().addNew(session);
383 }
384
385 private MultiThreadSocketIoProcessor nextProcessor()
386 {
387 return ioProcessors[processorDistributor++ % processorCount];
388 }
389
390 private class Worker implements Runnable
391 {
392 private long lastActive = System.currentTimeMillis();
393
394 public void run()
395 {
396 Thread.currentThread().setName(MultiThreadSocketConnector.this.threadName);
397
398 for (; ;)
399 {
400 try
401 {
402 int nKeys = selector.select(1000);
403
404 registerNew();
405
406 if (nKeys > 0)
407 {
408 processSessions(selector.selectedKeys());
409 }
410
411 processTimedOutSessions(selector.keys());
412
413 if (selector.keys().isEmpty())
414 {
415 if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L)
416 {
417 synchronized (lock)
418 {
419 if (selector.keys().isEmpty() &&
420 connectQueue.isEmpty())
421 {
422 worker = null;
423 try
424 {
425 selector.close();
426 }
427 catch (IOException e)
428 {
429 ExceptionMonitor.getInstance().exceptionCaught(e);
430 }
431 finally
432 {
433 selector = null;
434 }
435 break;
436 }
437 }
438 }
439 }
440 else
441 {
442 lastActive = System.currentTimeMillis();
443 }
444 }
445 catch (IOException e)
446 {
447 ExceptionMonitor.getInstance().exceptionCaught(e);
448
449 try
450 {
451 Thread.sleep(1000);
452 }
453 catch (InterruptedException e1)
454 {
455 ExceptionMonitor.getInstance().exceptionCaught(e1);
456 }
457 }
458 }
459 }
460 }
461
462 private class ConnectionRequest extends DefaultConnectFuture
463 {
464 private final SocketChannel channel;
465 private final long deadline;
466 private final IoHandler handler;
467 private final IoServiceConfig config;
468
469 private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config)
470 {
471 this.channel = channel;
472 long timeout;
473 if (config instanceof IoConnectorConfig)
474 {
475 timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis();
476 }
477 else
478 {
479 timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis();
480 }
481 this.deadline = System.currentTimeMillis() + timeout;
482 this.handler = handler;
483 this.config = config;
484 }
485 }
486 }
|