MultiThreadSocketAcceptor.java
001 /*
002  *  Licensed to the Apache Software Foundation (ASF) under one
003  *  or more contributor license agreements.  See the NOTICE file
004  *  distributed with this work for additional information
005  *  regarding copyright ownership.  The ASF licenses this file
006  *  to you under the Apache License, Version 2.0 (the
007  *  "License"); you may not use this file except in compliance
008  *  with the License.  You may obtain a copy of the License at
009  *
010  *    http://www.apache.org/licenses/LICENSE-2.0
011  *
012  *  Unless required by applicable law or agreed to in writing,
013  *  software distributed under the License is distributed on an
014  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015  *  KIND, either express or implied.  See the License for the
016  *  specific language governing permissions and limitations
017  *  under the License.
018  *
019  */
020 package org.apache.mina.transport.socket.nio;
021 
022 import java.io.IOException;
023 import java.net.InetSocketAddress;
024 import java.net.SocketAddress;
025 import java.nio.channels.SelectionKey;
026 import java.nio.channels.Selector;
027 import java.nio.channels.ServerSocketChannel;
028 import java.nio.channels.SocketChannel;
029 import java.util.ArrayList;
030 import java.util.HashMap;
031 import java.util.Iterator;
032 import java.util.List;
033 import java.util.Map;
034 import java.util.Set;
035 
036 import org.apache.mina.common.ExceptionMonitor;
037 import org.apache.mina.common.IoAcceptor;
038 import org.apache.mina.common.IoHandler;
039 import org.apache.mina.common.IoServiceConfig;
040 import org.apache.mina.common.support.BaseIoAcceptor;
041 import org.apache.mina.util.Queue;
042 import org.apache.mina.util.NewThreadExecutor;
043 import org.apache.mina.util.NamePreservingRunnable;
044 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
045 
046 /**
047  {@link IoAcceptor} for socket transport (TCP/IP).
048  *
049  @author The Apache Directory Project (mina-dev@directory.apache.org)
050  @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
051  */
052 public class MultiThreadSocketAcceptor extends SocketAcceptor
053 {
054     /**
055      * @noinspection StaticNonFinalField
056      */
057     private static volatile int nextId = 0;
058 
059     private final Executor executor;
060     private final Object lock = new Object();
061     private final int id = nextId ++;
062     private final String threadName = "SocketAcceptor-" + id;
063     private final Map channels = new HashMap();
064 
065     private final Queue registerQueue = new Queue();
066     private final Queue cancelQueue = new Queue();
067 
068     private final MultiThreadSocketIoProcessor[] ioProcessors;
069     private final int processorCount;
070 
071     /**
072      * @noinspection FieldAccessedSynchronizedAndUnsynchronized
073      */
074     private Selector selector;
075     private Worker worker;
076     private int processorDistributor = 0;
077 
078     /**
079      * Create an acceptor with a single processing thread using a NewThreadExecutor
080      */
081     public MultiThreadSocketAcceptor()
082     {
083         this1new NewThreadExecutor() );
084     }
085 
086     /**
087      * Create an acceptor with the desired number of processing threads
088      *
089      @param processorCount Number of processing threads
090      @param executor Executor to use for launching threads
091      */
092     public MultiThreadSocketAcceptorint processorCount, Executor executor )
093     {
094         ifprocessorCount < )
095         {
096             throw new IllegalArgumentException"Must have at least one processor" );
097         }
098 
099         this.executor = executor;
100         this.processorCount = processorCount;
101         ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
102 
103         forint i = 0; i < processorCount; i++ )
104         {
105             ioProcessors[inew MultiThreadSocketIoProcessor"SocketAcceptorIoProcessor-" + id + "." + i, executor );
106         }
107     }
108 
109 
110     /**
111      * Binds to the specified <code>address</code> and handles incoming connections with the specified
112      <code>handler</code>.  Backlog value is configured to the value of <code>backlog</code> property.
113      *
114      @throws IOException if failed to bind
115      */
116     public void bindSocketAddress address, IoHandler handler, IoServiceConfig config throws IOException
117     {
118         ifhandler == null )
119         {
120             throw new NullPointerException"handler" );
121         }
122 
123         ifaddress != null && !address instanceof InetSocketAddress ) )
124         {
125             throw new IllegalArgumentException"Unexpected address type: " + address.getClass() );
126         }
127 
128         ifconfig == null )
129         {
130             config = getDefaultConfig();
131         }
132 
133         RegistrationRequest request = new RegistrationRequestaddress, handler, config );
134 
135         synchronizedregisterQueue )
136         {
137             registerQueue.pushrequest );
138         }
139 
140         startupWorker();
141 
142         selector.wakeup();
143 
144         synchronizedrequest )
145         {
146             while!request.done )
147             {
148                 try
149                 {
150                     request.wait();
151                 }
152                 catchInterruptedException e )
153                 {
154                     ExceptionMonitor.getInstance().exceptionCaught);
155                 }
156             }
157         }
158 
159         ifrequest.exception != null )
160         {
161             throw request.exception;
162         }
163     }
164 
165 
166     private synchronized void startupWorker() throws IOException
167     {
168         synchronizedlock )
169         {
170             ifworker == null )
171             {
172                 selector = Selector.open();
173                 worker = new Worker();
174 
175                 executor.executenew NamePreservingRunnableworker ) );
176             }
177         }
178     }
179 
180     public void unbindSocketAddress address )
181     {
182         ifaddress == null )
183         {
184             throw new NullPointerException"address" );
185         }
186 
187         CancellationRequest request = new CancellationRequestaddress );
188 
189         try
190         {
191             startupWorker();
192         }
193         catchIOException e )
194         {
195             // IOException is thrown only when Worker thread is not
196             // running and failed to open a selector.  We simply throw
197             // IllegalArgumentException here because we can simply
198             // conclude that nothing is bound to the selector.
199             throw new IllegalArgumentException"Address not bound: " + address );
200         }
201 
202         synchronizedcancelQueue )
203         {
204             cancelQueue.pushrequest );
205         }
206 
207         selector.wakeup();
208 
209         synchronizedrequest )
210         {
211             while!request.done )
212             {
213                 try
214                 {
215                     request.wait();
216                 }
217                 catchInterruptedException e )
218                 {
219                     ExceptionMonitor.getInstance().exceptionCaught);
220                 }
221             }
222         }
223 
224         ifrequest.exception != null )
225         {
226             request.exception.fillInStackTrace();
227 
228             throw request.exception;
229         }
230     }
231 
232 
233     private class Worker implements Runnable
234     {
235         public void run()
236         {
237             Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName );
238 
239             for; ; )
240             {
241                 try
242                 {
243                     int nKeys = selector.select();
244 
245                     registerNew();
246 
247                     ifnKeys > )
248                     {
249                         processSessionsselector.selectedKeys() );
250                     }
251 
252                     cancelKeys();
253 
254                     ifselector.keys().isEmpty() )
255                     {
256                         synchronizedlock )
257                         {
258                             ifselector.keys().isEmpty() &&
259                                 registerQueue.isEmpty() &&
260                                 cancelQueue.isEmpty() )
261                             {
262                                 worker = null;
263                                 try
264                                 {
265                                     selector.close();
266                                 }
267                                 catchIOException e )
268                                 {
269                                     ExceptionMonitor.getInstance().exceptionCaught);
270                                 }
271                                 finally
272                                 {
273                                     selector = null;
274                                 }
275                                 break;
276                             }
277                         }
278                     }
279                 }
280                 catchIOException e )
281                 {
282                     ExceptionMonitor.getInstance().exceptionCaught);
283 
284                     try
285                     {
286                         Thread.sleep1000 );
287                     }
288                     catchInterruptedException e1 )
289                     {
290                         ExceptionMonitor.getInstance().exceptionCaughte1 );
291                     }
292                 }
293             }
294         }
295 
296         private void processSessionsSet keys throws IOException
297         {
298             Iterator it = keys.iterator();
299             whileit.hasNext() )
300             {
301                 SelectionKey key = SelectionKey it.next();
302 
303                 it.remove();
304 
305                 if!key.isAcceptable() )
306                 {
307                     continue;
308                 }
309 
310                 ServerSocketChannel ssc = ServerSocketChannel key.channel();
311 
312                 SocketChannel ch = ssc.accept();
313 
314                 ifch == null )
315                 {
316                     continue;
317                 }
318 
319                 boolean success = false;
320                 try
321                 {
322 
323                     RegistrationRequest req = RegistrationRequest key.attachment();
324 
325                     MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl(
326                             MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(),
327                             req.config, ch, req.handler, req.address );
328 
329                     // New Interface
330 //                    SocketSessionImpl session = new SocketSessionImpl(
331 //                            SocketAcceptor.this, nextProcessor(), getListeners(),
332 //                            req.config, ch, req.handler, req.address );
333 
334 
335                     getFilterChainBuilder().buildFilterChainsession.getFilterChain() );
336                     req.config.getFilterChainBuilder().buildFilterChainsession.getFilterChain() );
337                     req.config.getThreadModel().buildFilterChainsession.getFilterChain() );
338                     session.getIoProcessor().addNewsession );
339                     success = true;
340                 }
341                 catchThrowable t )
342                 {
343                     ExceptionMonitor.getInstance().exceptionCaught);
344                 }
345                 finally
346                 {
347                     if!success )
348                     {
349                         ch.close();
350                     }
351                 }
352             }
353         }
354     }
355 
356     private MultiThreadSocketIoProcessor nextProcessor()
357     {
358         return ioProcessors[processorDistributor++ % processorCount];
359     }
360 
361 
362     private void registerNew()
363     {
364         ifregisterQueue.isEmpty() )
365         {
366             return;
367         }
368 
369         for; ; )
370         {
371             RegistrationRequest req;
372 
373             synchronizedregisterQueue )
374             {
375                 req = RegistrationRequest registerQueue.pop();
376             }
377 
378             ifreq == null )
379             {
380                 break;
381             }
382 
383             ServerSocketChannel ssc = null;
384 
385             try
386             {
387                 ssc = ServerSocketChannel.open();
388                 ssc.configureBlockingfalse );
389 
390                 // Configure the server socket,
391                 SocketAcceptorConfig cfg;
392                 ifreq.config instanceof SocketAcceptorConfig )
393                 {
394                     cfg = SocketAcceptorConfig req.config;
395                 }
396                 else
397                 {
398                     cfg = SocketAcceptorConfig getDefaultConfig();
399                 }
400 
401                 ssc.socket().setReuseAddresscfg.isReuseAddress() );
402                 ssc.socket().setReceiveBufferSize(
403                     ( ( SocketSessionConfig cfg.getSessionConfig() ).getReceiveBufferSize() );
404 
405                 // and bind.
406                 ssc.socket().bindreq.address, cfg.getBacklog() );
407                 ifreq.address == null || req.address.getPort() == )
408                 {
409                     req.address = InetSocketAddress ssc.socket().getLocalSocketAddress();
410                 }
411                 ssc.registerselector, SelectionKey.OP_ACCEPT, req );
412 
413                 synchronizedchannels )
414                 {
415                     channels.putreq.address, ssc );
416                 }
417 
418                 getListeners().fireServiceActivated(
419                         this, req.address, req.handler, req.config );
420             }
421             catchIOException e )
422             {
423                 req.exception = e;
424             }
425             finally
426             {
427                 synchronizedreq )
428                 {
429                     req.done = true;
430 
431                     req.notifyAll();
432                 }
433 
434                 ifssc != null && req.exception != null )
435                 {
436                     try
437                     {
438                         ssc.close();
439                     }
440                     catchIOException e )
441                     {
442                         ExceptionMonitor.getInstance().exceptionCaught);
443                     }
444                 }
445             }
446         }
447     }
448 
449 
450     private void cancelKeys()
451     {
452         ifcancelQueue.isEmpty() )
453         {
454             return;
455         }
456 
457         for; ; )
458         {
459             CancellationRequest request;
460 
461             synchronizedcancelQueue )
462             {
463                 request = CancellationRequest cancelQueue.pop();
464             }
465 
466             ifrequest == null )
467             {
468                 break;
469             }
470 
471             ServerSocketChannel ssc;
472             synchronizedchannels )
473             {
474                 ssc = ServerSocketChannel channels.removerequest.address );
475             }
476 
477             // close the channel
478             try
479             {
480                 ifssc == null )
481                 {
482                     request.exception = new IllegalArgumentException"Address not bound: " + request.address );
483                 }
484                 else
485                 {
486                     SelectionKey key = ssc.keyForselector );
487                     request.registrationRequest = RegistrationRequest key.attachment();
488                     key.cancel();
489 
490                     selector.wakeup()// wake up again to trigger thread death
491 
492                     ssc.close();
493                 }
494             }
495             catchIOException e )
496             {
497                 ExceptionMonitor.getInstance().exceptionCaught);
498             }
499             finally
500             {
501                 synchronizedrequest )
502                 {
503                     request.done = true;
504                     request.notifyAll();
505                 }
506 
507                 ifrequest.exception == null )
508                 {
509                     getListeners().fireServiceDeactivated(
510                             this, request.address,
511                             request.registrationRequest.handler,
512                             request.registrationRequest.config );
513                 }
514             }
515         }
516     }
517 
518     private static class RegistrationRequest
519     {
520         private InetSocketAddress address;
521         private final IoHandler handler;
522         private final IoServiceConfig config;
523         private IOException exception;
524         private boolean done;
525 
526         private RegistrationRequestSocketAddress address, IoHandler handler, IoServiceConfig config )
527         {
528             this.address = InetSocketAddress address;
529             this.handler = handler;
530             this.config = config;
531         }
532     }
533 
534 
535     private static class CancellationRequest
536     {
537         private final SocketAddress address;
538         private boolean done;
539         private RegistrationRequest registrationRequest;
540         private RuntimeException exception;
541 
542         private CancellationRequestSocketAddress address )
543         {
544             this.address = address;
545         }
546     }
547 }