ExistingSocketConnector.java
001 /*
002  *  Licensed to the Apache Software Foundation (ASF) under one
003  *  or more contributor license agreements.  See the NOTICE file
004  *  distributed with this work for additional information
005  *  regarding copyright ownership.  The ASF licenses this file
006  *  to you under the Apache License, Version 2.0 (the
007  *  "License"); you may not use this file except in compliance
008  *  with the License.  You may obtain a copy of the License at
009  *
010  *    http://www.apache.org/licenses/LICENSE-2.0
011  *
012  *  Unless required by applicable law or agreed to in writing,
013  *  software distributed under the License is distributed on an
014  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015  *  KIND, either express or implied.  See the License for the
016  *  specific language governing permissions and limitations
017  *  under the License.
018  *
019  */
020 package org.apache.mina.transport.socket.nio;
021 
022 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
023 import org.apache.mina.common.ConnectFuture;
024 import org.apache.mina.common.ExceptionMonitor;
025 import org.apache.mina.common.IoConnector;
026 import org.apache.mina.common.IoConnectorConfig;
027 import org.apache.mina.common.IoHandler;
028 import org.apache.mina.common.IoServiceConfig;
029 import org.apache.mina.common.support.BaseIoConnector;
030 import org.apache.mina.common.support.DefaultConnectFuture;
031 import org.apache.mina.util.NamePreservingRunnable;
032 import org.apache.mina.util.NewThreadExecutor;
033 import org.apache.mina.util.Queue;
034 
035 import java.io.IOException;
036 import java.net.ConnectException;
037 import java.net.Socket;
038 import java.net.SocketAddress;
039 import java.nio.channels.SelectionKey;
040 import java.nio.channels.Selector;
041 import java.nio.channels.SocketChannel;
042 import java.util.Iterator;
043 import java.util.Set;
044 
045 /**
046  {@link IoConnector} for socket transport (TCP/IP).
047  *
048  @author The Apache Directory Project (mina-dev@directory.apache.org)
049  @version $Rev: 627427 $, $Date: 2008-02-13 14:39:10 +0000 (Wed, 13 Feb 2008) $
050  */
051 public class ExistingSocketConnector extends BaseIoConnector
052 {
053     /** @noinspection StaticNonFinalField */
054     private static volatile int nextId = 0;
055 
056     private final Object lock = new Object();
057     private final int id = nextId++;
058     private final String threadName = "SocketConnector-" + id;
059     private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
060     private final Queue connectQueue = new Queue();
061     private final SocketIoProcessor[] ioProcessors;
062     private final int processorCount;
063     private final Executor executor;
064 
065     /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
066     private Selector selector;
067     private Worker worker;
068     private int processorDistributor = 0;
069     private int workerTimeout = 60;  // 1 min.
070     private Socket _openSocket = null;
071 
072     /** Create a connector with a single processing thread using a NewThreadExecutor */
073     public ExistingSocketConnector()
074     {
075         this(1new NewThreadExecutor());
076     }
077 
078     /**
079      * Create a connector with the desired number of processing threads
080      *
081      @param processorCount Number of processing threads
082      @param executor       Executor to use for launching threads
083      */
084     public ExistingSocketConnector(int processorCount, Executor executor)
085     {
086         if (processorCount < 1)
087         {
088             throw new IllegalArgumentException("Must have at least one processor");
089         }
090 
091         this.executor = executor;
092         this.processorCount = processorCount;
093         ioProcessors = new SocketIoProcessor[processorCount];
094 
095         for (int i = 0; i < processorCount; i++)
096         {
097             ioProcessors[inew SocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
098         }
099     }
100 
101     /**
102      * How many seconds to keep the connection thread alive between connection requests
103      *
104      @return Number of seconds to keep connection thread alive
105      */
106     public int getWorkerTimeout()
107     {
108         return workerTimeout;
109     }
110 
111     /**
112      * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.
113      *
114      @param workerTimeout Number of seconds to keep thread alive. Must be >=0
115      */
116     public void setWorkerTimeout(int workerTimeout)
117     {
118         if (workerTimeout < 0)
119         {
120             throw new IllegalArgumentException("Must be >= 0");
121         }
122         this.workerTimeout = workerTimeout;
123     }
124 
125     public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
126     {
127         return connect(address, null, handler, config);
128     }
129 
130     public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
131                                  IoHandler handler, IoServiceConfig config)
132     {
133         /** Changes here from the Mina OpenSocketConnector.
134          * Ignoreing all address as they are not needed */
135 
136         if (handler == null)
137         {
138             throw new NullPointerException("handler");
139         }
140 
141 
142         if (config == null)
143         {
144             config = getDefaultConfig();
145         }
146 
147         if (_openSocket == null)
148         {
149             throw new IllegalArgumentException("Specifed Socket not active");
150         }
151 
152         boolean success = false;
153 
154         try
155         {
156             DefaultConnectFuture future = new DefaultConnectFuture();
157             newSession(_openSocket, handler, config, future);
158             success = true;
159             return future;
160         }
161         catch (IOException e)
162         {
163             return DefaultConnectFuture.newFailedFuture(e);
164         }
165         finally
166         {
167             if (!success && _openSocket != null)
168             {
169                 try
170                 {
171                     _openSocket.close();
172                 }
173                 catch (IOException e)
174                 {
175                     ExceptionMonitor.getInstance().exceptionCaught(e);
176                 }
177             }
178         }
179     }
180 
181     public IoServiceConfig getDefaultConfig()
182     {
183         return defaultConfig;
184     }
185 
186     /**
187      * Sets the config this connector will use by default.
188      *
189      @param defaultConfig the default config.
190      *
191      @throws NullPointerException if the specified value is <code>null</code>.
192      */
193     public void setDefaultConfig(SocketConnectorConfig defaultConfig)
194     {
195         if (defaultConfig == null)
196         {
197             throw new NullPointerException("defaultConfig");
198         }
199         this.defaultConfig = defaultConfig;
200     }
201 
202     private synchronized void startupWorker() throws IOException
203     {
204         if (worker == null)
205         {
206             selector = Selector.open();
207             worker = new Worker();
208             executor.execute(new NamePreservingRunnable(worker));
209         }
210     }
211 
212     private void registerNew()
213     {
214         if (connectQueue.isEmpty())
215         {
216             return;
217         }
218 
219         for (; ;)
220         {
221             ConnectionRequest req;
222             synchronized (connectQueue)
223             {
224                 req = (ConnectionRequestconnectQueue.pop();
225             }
226 
227             if (req == null)
228             {
229                 break;
230             }
231 
232             SocketChannel ch = req.channel;
233             try
234             {
235                 ch.register(selector, SelectionKey.OP_CONNECT, req);
236             }
237             catch (IOException e)
238             {
239                 req.setException(e);
240             }
241         }
242     }
243 
244     private void processSessions(Set keys)
245     {
246         Iterator it = keys.iterator();
247 
248         while (it.hasNext())
249         {
250             SelectionKey key = (SelectionKeyit.next();
251 
252             if (!key.isConnectable())
253             {
254                 continue;
255             }
256 
257             SocketChannel ch = (SocketChannelkey.channel();
258             ConnectionRequest entry = (ConnectionRequestkey.attachment();
259 
260             boolean success = false;
261             try
262             {
263                 ch.finishConnect();
264                 newSession(ch, entry.handler, entry.config, entry);
265                 success = true;
266             }
267             catch (Throwable e)
268             {
269                 entry.setException(e);
270             }
271             finally
272             {
273                 key.cancel();
274                 if (!success)
275                 {
276                     try
277                     {
278                         ch.close();
279                     }
280                     catch (IOException e)
281                     {
282                         ExceptionMonitor.getInstance().exceptionCaught(e);
283                     }
284                 }
285             }
286         }
287 
288         keys.clear();
289     }
290 
291     private void processTimedOutSessions(Set keys)
292     {
293         long currentTime = System.currentTimeMillis();
294         Iterator it = keys.iterator();
295 
296         while (it.hasNext())
297         {
298             SelectionKey key = (SelectionKeyit.next();
299 
300             if (!key.isValid())
301             {
302                 continue;
303             }
304 
305             ConnectionRequest entry = (ConnectionRequestkey.attachment();
306 
307             if (currentTime >= entry.deadline)
308             {
309                 entry.setException(new ConnectException());
310                 try
311                 {
312                     key.channel().close();
313                 }
314                 catch (IOException e)
315                 {
316                     ExceptionMonitor.getInstance().exceptionCaught(e);
317                 }
318                 finally
319                 {
320                     key.cancel();
321                 }
322             }
323         }
324     }
325 
326     private void newSession(Socket socket, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
327             throws IOException
328     {
329         SocketSessionImpl session = new SocketSessionImpl(this,
330                                                           nextProcessor(),
331                                                           getListeners(),
332                                                           config,
333                                                           socket.getChannel(),
334                                                           handler,
335                                                           socket.getRemoteSocketAddress());
336 
337         newSession(session, config, connectFuture);
338     }
339 
340     private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
341             throws IOException
342 
343     {
344         SocketSessionImpl session = new SocketSessionImpl(this,
345                                                           nextProcessor(),
346                                                           getListeners(),
347                                                           config,
348                                                           ch,
349                                                           handler,
350                                                           ch.socket().getRemoteSocketAddress());
351 
352         newSession(session, config, connectFuture);
353     }
354 
355     private void newSession(SocketSessionImpl session, IoServiceConfig config, ConnectFuture connectFuture)
356             throws IOException
357     {
358         try
359         {
360             getFilterChainBuilder().buildFilterChain(session.getFilterChain());
361             config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
362             config.getThreadModel().buildFilterChain(session.getFilterChain());
363         }
364         catch (Throwable e)
365         {
366             throw (IOExceptionnew IOException("Failed to create a session.").initCause(e);
367         }
368         session.getIoProcessor().addNew(session);
369         connectFuture.setSession(session);
370     }
371 
372     private SocketIoProcessor nextProcessor()
373     {
374         return ioProcessors[processorDistributor++ % processorCount];
375     }
376 
377     public void setOpenSocket(Socket openSocket)
378     {
379         _openSocket = openSocket;
380     }
381 
382     private class Worker implements Runnable
383     {
384         private long lastActive = System.currentTimeMillis();
385 
386         public void run()
387         {
388             Thread.currentThread().setName(ExistingSocketConnector.this.threadName);
389 
390             for (; ;)
391             {
392                 try
393                 {
394                     int nKeys = selector.select(1000);
395 
396                     registerNew();
397 
398                     if (nKeys > 0)
399                     {
400                         processSessions(selector.selectedKeys());
401                     }
402 
403                     processTimedOutSessions(selector.keys());
404 
405                     if (selector.keys().isEmpty())
406                     {
407                         if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L)
408                         {
409                             synchronized (lock)
410                             {
411                                 if (selector.keys().isEmpty() &&
412                                     connectQueue.isEmpty())
413                                 {
414                                     worker = null;
415                                     try
416                                     {
417                                         selector.close();
418                                     }
419                                     catch (IOException e)
420                                     {
421                                         ExceptionMonitor.getInstance().exceptionCaught(e);
422                                     }
423                                     finally
424                                     {
425                                         selector = null;
426                                     }
427                                     break;
428                                 }
429                             }
430                         }
431                     }
432                     else
433                     {
434                         lastActive = System.currentTimeMillis();
435                     }
436                 }
437                 catch (IOException e)
438                 {
439                     ExceptionMonitor.getInstance().exceptionCaught(e);
440 
441                     try
442                     {
443                         Thread.sleep(1000);
444                     }
445                     catch (InterruptedException e1)
446                     {
447                         ExceptionMonitor.getInstance().exceptionCaught(e1);
448                     }
449                 }
450             }
451         }
452     }
453 
454     private class ConnectionRequest extends DefaultConnectFuture
455     {
456         private final SocketChannel channel;
457         private final long deadline;
458         private final IoHandler handler;
459         private final IoServiceConfig config;
460 
461         private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config)
462         {
463             this.channel = channel;
464             long timeout;
465             if (config instanceof IoConnectorConfig)
466             {
467                 timeout = ((IoConnectorConfigconfig).getConnectTimeoutMillis();
468             }
469             else
470             {
471                 timeout = ((IoConnectorConfiggetDefaultConfig()).getConnectTimeoutMillis();
472             }
473             this.deadline = System.currentTimeMillis() + timeout;
474             this.handler = handler;
475             this.config = config;
476         }
477     }
478 }