MultiThreadSocketIoProcessor.java
0001 /*
0002  *  Licensed to the Apache Software Foundation (ASF) under one
0003  *  or more contributor license agreements.  See the NOTICE file
0004  *  distributed with this work for additional information
0005  *  regarding copyright ownership.  The ASF licenses this file
0006  *  to you under the Apache License, Version 2.0 (the
0007  *  "License"); you may not use this file except in compliance
0008  *  with the License.  You may obtain a copy of the License at
0009  *
0010  *    http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  *  Unless required by applicable law or agreed to in writing,
0013  *  software distributed under the License is distributed on an
0014  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0015  *  KIND, either express or implied.  See the License for the
0016  *  specific language governing permissions and limitations
0017  *  under the License.
0018  *
0019  */
0020 package org.apache.mina.transport.socket.nio;
0021 
0022 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
0023 import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
0024 import org.apache.mina.common.ByteBuffer;
0025 import org.apache.mina.common.ExceptionMonitor;
0026 import org.apache.mina.common.IdleStatus;
0027 import org.apache.mina.common.IoFilter.WriteRequest;
0028 import org.apache.mina.common.WriteTimeoutException;
0029 import org.apache.mina.util.IdentityHashSet;
0030 import org.apache.mina.util.NamePreservingRunnable;
0031 import org.apache.mina.util.Queue;
0032 import org.slf4j.Logger;
0033 import org.slf4j.LoggerFactory;
0034 
0035 import java.io.IOException;
0036 import java.nio.channels.SelectionKey;
0037 import java.nio.channels.Selector;
0038 import java.nio.channels.SocketChannel;
0039 import java.util.Iterator;
0040 import java.util.Set;
0041 import java.util.concurrent.BlockingQueue;
0042 import java.util.concurrent.LinkedBlockingQueue;
0043 
0044 /**
0045  * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
0046  *
0047  @author The Apache Directory Project (mina-dev@directory.apache.org)
0048  @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $,
0049  */
0050 class MultiThreadSocketIoProcessor extends SocketIoProcessor
0051 {
0052     Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class);
0053     Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class ".Reader");
0054     Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class ".Writer");
0055 
0056     private static final long SELECTOR_TIMEOUT = 1000L;
0057 
0058     private int MAX_READ_BYTES_PER_SESSION = 524288//512K
0059     private int MAX_FLUSH_BYTES_PER_SESSION = 524288//512K
0060 
0061     private final Object readLock = new Object();
0062     private final Object writeLock = new Object();
0063 
0064     private final String threadName;
0065     private final Executor executor;
0066 
0067     private ReentrantLock trafficMaskUpdateLock = new ReentrantLock();
0068 
0069     /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
0070     private volatile Selector selector, writeSelector;
0071 
0072     private final Queue newSessions = new Queue();
0073     private final Queue removingSessions = new Queue();
0074     private final BlockingQueue flushingSessions = new LinkedBlockingQueue();
0075     private final IdentityHashSet flushingSessionsSet = new IdentityHashSet();
0076 
0077     private final Queue trafficControllingSessions = new Queue();
0078 
0079     private ReadWorker readWorker;
0080     private WriteWorker writeWorker;
0081     private long lastIdleReadCheckTime = System.currentTimeMillis();
0082     private long lastIdleWriteCheckTime = System.currentTimeMillis();
0083 
0084     MultiThreadSocketIoProcessor(String threadName, Executor executor)
0085     {
0086         super(threadName, executor);
0087         this.threadName = threadName;
0088         this.executor = executor;
0089     }
0090 
0091     void addNew(SocketSessionImpl sessionthrows IOException
0092     {
0093         synchronized (newSessions)
0094         {
0095             newSessions.push(session);
0096         }
0097 
0098         startupWorker();
0099 
0100         selector.wakeup();
0101         writeSelector.wakeup();
0102     }
0103 
0104     void remove(SocketSessionImpl sessionthrows IOException
0105     {
0106         scheduleRemove(session);
0107         startupWorker();
0108         selector.wakeup();
0109     }
0110 
0111     private void startupWorker() throws IOException
0112     {
0113         synchronized (readLock)
0114         {
0115             if (readWorker == null)
0116             {
0117                 selector = Selector.open();
0118                 readWorker = new ReadWorker();
0119                 executor.execute(new NamePreservingRunnable(readWorker));
0120             }
0121         }
0122 
0123         synchronized (writeLock)
0124         {
0125             if (writeWorker == null)
0126             {
0127                 writeSelector = Selector.open();
0128                 writeWorker = new WriteWorker();
0129                 executor.execute(new NamePreservingRunnable(writeWorker));
0130             }
0131         }
0132 
0133     }
0134 
0135     void flush(SocketSessionImpl session)
0136     {
0137         scheduleFlush(session);
0138         Selector selector = this.writeSelector;
0139 
0140         if (selector != null)
0141         {
0142             selector.wakeup();
0143         }
0144     }
0145 
0146     void updateTrafficMask(SocketSessionImpl session)
0147     {
0148         scheduleTrafficControl(session);
0149         Selector selector = this.selector;
0150         if (selector != null)
0151         {
0152             selector.wakeup();
0153         }
0154     }
0155 
0156     private void scheduleRemove(SocketSessionImpl session)
0157     {
0158         synchronized (removingSessions)
0159         {
0160             removingSessions.push(session);
0161         }
0162     }
0163 
0164     private void scheduleFlush(SocketSessionImpl session)
0165     {
0166         synchronized (flushingSessionsSet)
0167         {
0168             //if flushingSessions grows to contain Integer.MAX_VALUE sessions
0169             // then this will fail.
0170             if (flushingSessionsSet.add(session))
0171             {
0172                 flushingSessions.offer(session);
0173             }
0174         }
0175     }
0176 
0177     private void scheduleTrafficControl(SocketSessionImpl session)
0178     {
0179         synchronized (trafficControllingSessions)
0180         {
0181             trafficControllingSessions.push(session);
0182         }
0183     }
0184 
0185     private void doAddNewReader() throws InterruptedException
0186     {
0187         if (newSessions.isEmpty())
0188         {
0189             return;
0190         }
0191 
0192         for (; ;)
0193         {
0194             MultiThreadSocketSessionImpl session;
0195 
0196             synchronized (newSessions)
0197             {
0198                 session = (MultiThreadSocketSessionImplnewSessions.peek();
0199             }
0200 
0201             if (session == null)
0202             {
0203                 break;
0204             }
0205 
0206             SocketChannel ch = session.getChannel();
0207 
0208 
0209             try
0210             {
0211 
0212                 ch.configureBlocking(false);
0213                 session.setSelectionKey(ch.register(selector,
0214                                                     SelectionKey.OP_READ,
0215                                                     session));
0216 
0217                 //System.out.println("ReadDebug:"+"Awaiting Registration");
0218                 session.awaitRegistration();
0219                 sessionCreated(session);
0220             }
0221             catch (IOException e)
0222             {
0223                 // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
0224                 // and call ConnectFuture.setException().
0225                 session.getFilterChain().fireExceptionCaught(session, e);
0226             }
0227         }
0228     }
0229 
0230 
0231     private void doAddNewWrite() throws InterruptedException
0232     {
0233         if (newSessions.isEmpty())
0234         {
0235             return;
0236         }
0237 
0238         for (; ;)
0239         {
0240             MultiThreadSocketSessionImpl session;
0241 
0242             synchronized (newSessions)
0243             {
0244                 session = (MultiThreadSocketSessionImplnewSessions.peek();
0245             }
0246 
0247             if (session == null)
0248             {
0249                 break;
0250             }
0251 
0252             SocketChannel ch = session.getChannel();
0253 
0254             try
0255             {
0256                 ch.configureBlocking(false);
0257                 synchronized (flushingSessionsSet)
0258                 {
0259                     flushingSessionsSet.add(session);
0260                 }
0261 
0262                 session.setWriteSelectionKey(ch.register(writeSelector,
0263                                                          SelectionKey.OP_WRITE,
0264                                                          session));
0265 
0266                 //System.out.println("WriteDebug:"+"Awaiting Registration");
0267                 session.awaitRegistration();
0268                 sessionCreated(session);
0269             }
0270             catch (IOException e)
0271             {
0272 
0273                 // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
0274                 // and call ConnectFuture.setException().
0275                 session.getFilterChain().fireExceptionCaught(session, e);
0276             }
0277         }
0278     }
0279 
0280 
0281     private void sessionCreated(SocketSessionImpl sessionParamthrows InterruptedException
0282     {
0283         MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImplsessionParam;
0284         synchronized (newSessions)
0285         {
0286             if (!session.created())
0287             {
0288                 _logger.debug("Popping new session");
0289                 newSessions.pop();
0290 
0291                 // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
0292                 // in AbstractIoFilterChain.fireSessionOpened().
0293                 session.getServiceListeners().fireSessionCreated(session);
0294 
0295                 session.doneCreation();
0296             }
0297         }
0298     }
0299 
0300     private void doRemove()
0301     {
0302         if (removingSessions.isEmpty())
0303         {
0304             return;
0305         }
0306 
0307         for (; ;)
0308         {
0309             MultiThreadSocketSessionImpl session;
0310 
0311             synchronized (removingSessions)
0312             {
0313                 session = (MultiThreadSocketSessionImplremovingSessions.pop();
0314             }
0315 
0316             if (session == null)
0317             {
0318                 break;
0319             }
0320 
0321             SocketChannel ch = session.getChannel();
0322             SelectionKey key = session.getReadSelectionKey();
0323             SelectionKey writeKey = session.getWriteSelectionKey();
0324 
0325             // Retry later if session is not yet fully initialized.
0326             // (In case that Session.close() is called before addSession() is processed)
0327             if (key == null || writeKey == null)
0328             {
0329                 scheduleRemove(session);
0330                 break;
0331             }
0332             // skip if channel is already closed
0333             if (!key.isValid() || !writeKey.isValid())
0334             {
0335                 continue;
0336             }
0337 
0338             try
0339             {
0340                 //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session));
0341                 synchronized (readLock)
0342                 {
0343                     key.cancel();
0344                 }
0345                 synchronized (writeLock)
0346                 {
0347                     writeKey.cancel();
0348                 }
0349                 ch.close();
0350             }
0351             catch (IOException e)
0352             {
0353                 session.getFilterChain().fireExceptionCaught(session, e);
0354             }
0355             finally
0356             {
0357                 releaseWriteBuffers(session);
0358                 session.getServiceListeners().fireSessionDestroyed(session);
0359             }
0360         }
0361     }
0362 
0363     private void processRead(Set selectedKeys)
0364     {
0365         Iterator it = selectedKeys.iterator();
0366 
0367         while (it.hasNext())
0368         {
0369             SelectionKey key = (SelectionKeyit.next();
0370             MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImplkey.attachment();
0371 
0372             synchronized (readLock)
0373             {
0374                 if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable())
0375                 {
0376                     read(session);
0377                 }
0378             }
0379 
0380         }
0381 
0382         selectedKeys.clear();
0383     }
0384 
0385     private void processWrite(Set selectedKeys)
0386     {
0387         Iterator it = selectedKeys.iterator();
0388 
0389         while (it.hasNext())
0390         {
0391             SelectionKey key = (SelectionKeyit.next();
0392             SocketSessionImpl session = (SocketSessionImplkey.attachment();
0393 
0394             synchronized (writeLock)
0395             {
0396                 if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable())
0397                 {
0398 
0399                     // Clear OP_WRITE
0400                     key.interestOps(key.interestOps() (~SelectionKey.OP_WRITE));
0401 
0402                     synchronized (flushingSessionsSet)
0403                     {
0404                         flushingSessions.offer(session);
0405                     }
0406                 }
0407             }
0408         }
0409 
0410         selectedKeys.clear();
0411     }
0412 
0413     private void read(SocketSessionImpl session)
0414     {
0415 
0416         //if (_loggerWrite.isDebugEnabled())
0417         {
0418             //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session));
0419         }
0420 
0421         int totalReadBytes = 0;
0422 
0423         while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION)
0424         {
0425             ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
0426             SocketChannel ch = session.getChannel();
0427 
0428             try
0429             {
0430                 buf.clear();
0431 
0432                 int readBytes = 0;
0433                 int ret;
0434 
0435                 try
0436                 {
0437                     while ((ret = ch.read(buf.buf())) 0)
0438                     {
0439                         readBytes += ret;
0440                         totalReadBytes += ret;
0441                     }
0442                 }
0443                 finally
0444                 {
0445                     buf.flip();
0446                 }
0447 
0448 
0449                 if (readBytes > 0)
0450                 {
0451                     session.increaseReadBytes(readBytes);
0452 
0453                     session.getFilterChain().fireMessageReceived(session, buf);
0454                     buf = null;
0455                 }
0456 
0457                 if (ret <= 0)
0458                 {
0459                     if (ret == 0)
0460                     {
0461                         if (readBytes == session.getReadBufferSize())
0462                         {
0463                             continue;
0464                         }
0465                     }
0466                     else
0467                     {
0468                         scheduleRemove(session);
0469                     }
0470 
0471                     break;
0472                 }
0473             }
0474             catch (Throwable e)
0475             {
0476                 if (instanceof IOException)
0477                 {
0478                     scheduleRemove(session);
0479                 }
0480                 session.getFilterChain().fireExceptionCaught(session, e);
0481 
0482                 //Stop Reading this session.
0483                 return;
0484             }
0485             finally
0486             {
0487                 if (buf != null)
0488                 {
0489                     buf.release();
0490                 }
0491             }
0492         }//for
0493 
0494         // if (_loggerWrite.isDebugEnabled())
0495         {
0496             //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes);
0497         }
0498     }
0499 
0500 
0501     private void notifyReadIdleness()
0502     {
0503         // process idle sessions
0504         long currentTime = System.currentTimeMillis();
0505         if ((currentTime - lastIdleReadCheckTime>= 1000)
0506         {
0507             lastIdleReadCheckTime = currentTime;
0508             Set keys = selector.keys();
0509             if (keys != null)
0510             {
0511                 for (Iterator it = keys.iterator(); it.hasNext();)
0512                 {
0513                     SelectionKey key = (SelectionKeyit.next();
0514                     SocketSessionImpl session = (SocketSessionImplkey.attachment();
0515                     notifyReadIdleness(session, currentTime);
0516                 }
0517             }
0518         }
0519     }
0520 
0521     private void notifyWriteIdleness()
0522     {
0523         // process idle sessions
0524         long currentTime = System.currentTimeMillis();
0525         if ((currentTime - lastIdleWriteCheckTime>= 1000)
0526         {
0527             lastIdleWriteCheckTime = currentTime;
0528             Set keys = writeSelector.keys();
0529             if (keys != null)
0530             {
0531                 for (Iterator it = keys.iterator(); it.hasNext();)
0532                 {
0533                     SelectionKey key = (SelectionKeyit.next();
0534                     SocketSessionImpl session = (SocketSessionImplkey.attachment();
0535                     notifyWriteIdleness(session, currentTime);
0536                 }
0537             }
0538         }
0539     }
0540 
0541     private void notifyReadIdleness(SocketSessionImpl session, long currentTime)
0542     {
0543         notifyIdleness0(
0544                 session, currentTime,
0545                 session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
0546                 IdleStatus.BOTH_IDLE,
0547                 Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
0548         notifyIdleness0(
0549                 session, currentTime,
0550                 session.getIdleTimeInMillis(IdleStatus.READER_IDLE),
0551                 IdleStatus.READER_IDLE,
0552                 Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
0553 
0554         notifyWriteTimeout(session, currentTime, session
0555                 .getWriteTimeoutInMillis(), session.getLastWriteTime());
0556     }
0557 
0558     private void notifyWriteIdleness(SocketSessionImpl session, long currentTime)
0559     {
0560         notifyIdleness0(
0561                 session, currentTime,
0562                 session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
0563                 IdleStatus.BOTH_IDLE,
0564                 Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
0565         notifyIdleness0(
0566                 session, currentTime,
0567                 session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
0568                 IdleStatus.WRITER_IDLE,
0569                 Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
0570 
0571         notifyWriteTimeout(session, currentTime, session
0572                 .getWriteTimeoutInMillis(), session.getLastWriteTime());
0573     }
0574 
0575     private void notifyIdleness0(SocketSessionImpl session, long currentTime,
0576                                  long idleTime, IdleStatus status,
0577                                  long lastIoTime)
0578     {
0579         if (idleTime > && lastIoTime != 0
0580             && (currentTime - lastIoTime>= idleTime)
0581         {
0582             session.increaseIdleCount(status);
0583             session.getFilterChain().fireSessionIdle(session, status);
0584         }
0585     }
0586 
0587     private void notifyWriteTimeout(SocketSessionImpl session,
0588                                     long currentTime,
0589                                     long writeTimeout, long lastIoTime)
0590     {
0591 
0592         MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImplsession;
0593         SelectionKey key = sesh.getWriteSelectionKey();
0594 
0595         synchronized (writeLock)
0596         {
0597             if (writeTimeout > 0
0598                 && (currentTime - lastIoTime>= writeTimeout
0599                 && key != null && key.isValid()
0600                 && (key.interestOps() & SelectionKey.OP_WRITE!= 0)
0601             {
0602                 session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException());
0603             }
0604         }
0605     }
0606 
0607     private SocketSessionImpl getNextFlushingSession()
0608     {
0609         return (SocketSessionImplflushingSessions.poll();
0610     }
0611 
0612     private void releaseSession(SocketSessionImpl session)
0613     {
0614         synchronized (session.getWriteRequestQueue())
0615         {
0616             synchronized (flushingSessionsSet)
0617             {
0618                 if (session.getScheduledWriteRequests() 0)
0619                 {
0620                     if (_loggerWrite.isDebugEnabled())
0621                     {
0622                         //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session));
0623                     }
0624                     flushingSessions.offer(session);
0625                 }
0626                 else
0627                 {
0628                     if (_loggerWrite.isDebugEnabled())
0629                     {
0630                         //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session));
0631                     }
0632                     flushingSessionsSet.remove(session);
0633                 }
0634             }
0635         }
0636     }
0637 
0638     private void releaseWriteBuffers(SocketSessionImpl session)
0639     {
0640         Queue writeRequestQueue = session.getWriteRequestQueue();
0641         WriteRequest req;
0642 
0643         //Should this be synchronized?
0644         synchronized (writeRequestQueue)
0645         {
0646             while ((req = (WriteRequestwriteRequestQueue.pop()) != null)
0647             {
0648                 try
0649                 {
0650                     ((ByteBufferreq.getMessage()).release();
0651                 }
0652                 catch (IllegalStateException e)
0653                 {
0654                     session.getFilterChain().fireExceptionCaught(session, e);
0655                 }
0656                 finally
0657                 {
0658                     req.getFuture().setWritten(false);
0659                 }
0660             }
0661         }
0662     }
0663 
0664     private void doFlush()
0665     {
0666         MultiThreadSocketSessionImpl session;
0667 
0668         while ((session = (MultiThreadSocketSessionImplgetNextFlushingSession()) != null)
0669         {
0670             if (!session.isConnected())
0671             {
0672                 releaseWriteBuffers(session);
0673                 releaseSession(session);
0674                 continue;
0675             }
0676 
0677             SelectionKey key = session.getWriteSelectionKey();
0678             // Retry later if session is not yet fully initialized.
0679             // (In case that Session.write() is called before addSession() is processed)
0680             if (key == null)
0681             {
0682                 scheduleFlush(session);
0683                 releaseSession(session);
0684                 continue;
0685             }
0686             // skip if channel is already closed
0687             if (!key.isValid())
0688             {
0689                 releaseSession(session);
0690                 continue;
0691             }
0692 
0693             try
0694             {
0695                 if (doFlush(session))
0696                 {
0697                     releaseSession(session);
0698                 }
0699             }
0700             catch (IOException e)
0701             {
0702                 releaseSession(session);
0703                 scheduleRemove(session);
0704                 session.getFilterChain().fireExceptionCaught(session, e);
0705             }
0706 
0707         }
0708 
0709     }
0710 
0711     private boolean doFlush(SocketSessionImpl sessionParamthrows IOException
0712     {
0713         MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImplsessionParam;
0714         // Clear OP_WRITE
0715         SelectionKey key = session.getWriteSelectionKey();
0716         synchronized (writeLock)
0717         {
0718             key.interestOps(key.interestOps() (~SelectionKey.OP_WRITE));
0719         }
0720         SocketChannel ch = session.getChannel();
0721         Queue writeRequestQueue = session.getWriteRequestQueue();
0722 
0723         long totalFlushedBytes = 0;
0724         while (true)
0725         {
0726             WriteRequest req;
0727 
0728             synchronized (writeRequestQueue)
0729             {
0730                 req = (WriteRequestwriteRequestQueue.first();
0731             }
0732 
0733             if (req == null)
0734             {
0735                 break;
0736             }
0737 
0738             ByteBuffer buf = (ByteBufferreq.getMessage();
0739             if (buf.remaining() == 0)
0740             {
0741                 synchronized (writeRequestQueue)
0742                 {
0743                     writeRequestQueue.pop();
0744                 }
0745 
0746                 session.increaseWrittenMessages();
0747 
0748                 buf.reset();
0749                 session.getFilterChain().fireMessageSent(session, req);
0750                 continue;
0751             }
0752 
0753 
0754             int writtenBytes = 0;
0755 
0756             // Reported as DIRMINA-362
0757             //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
0758             if (key.isWritable())
0759             {
0760                 writtenBytes = ch.write(buf.buf());
0761                 totalFlushedBytes += writtenBytes;
0762             }
0763 
0764             if (writtenBytes > 0)
0765             {
0766                 session.increaseWrittenBytes(writtenBytes);
0767             }
0768 
0769             if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION))
0770             {
0771                 // Kernel buffer is full
0772                 synchronized (writeLock)
0773                 {
0774                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
0775                 }
0776                 if (_loggerWrite.isDebugEnabled())
0777                 {
0778                     //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
0779                 }
0780                 return false;
0781             }
0782         }
0783 
0784         if (_loggerWrite.isDebugEnabled())
0785         {
0786             //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
0787         }
0788         return true;
0789     }
0790 
0791     private void doUpdateTrafficMask()
0792     {
0793         if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked())
0794         {
0795             return;
0796         }
0797 
0798         // Synchronize over entire operation as this method should be called
0799         // from both read and write thread and we don't want the order of the
0800         //  updates to get changed.
0801         trafficMaskUpdateLock.lock();
0802         try
0803         {
0804             for (; ;)
0805             {
0806                 MultiThreadSocketSessionImpl session;
0807 
0808                 session = (MultiThreadSocketSessionImpltrafficControllingSessions.pop();
0809 
0810                 if (session == null)
0811                 {
0812                     break;
0813                 }
0814 
0815                 SelectionKey key = session.getReadSelectionKey();
0816                 // Retry later if session is not yet fully initialized.
0817                 // (In case that Session.suspend??() or session.resume??() is
0818                 // called before addSession() is processed)
0819                 if (key == null)
0820                 {
0821                     scheduleTrafficControl(session);
0822                     break;
0823                 }
0824                 // skip if channel is already closed
0825                 if (!key.isValid())
0826                 {
0827                     continue;
0828                 }
0829 
0830                 // The normal is OP_READ and, if there are write requests in the
0831                 // session's write queue, set OP_WRITE to trigger flushing.
0832 
0833                 //Sset to Read and Write if there is nothing then the cost
0834                 // is one loop through the flusher.
0835                 int ops = SelectionKey.OP_READ;
0836 
0837                 // Now mask the preferred ops with the mask of the current session
0838                 int mask = session.getTrafficMask().getInterestOps();
0839                 synchronized (readLock)
0840                 {
0841                     key.interestOps(ops & mask);
0842                 }
0843                 //Change key to the WriteSelection Key
0844                 key = session.getWriteSelectionKey();
0845                 if (key != null && key.isValid())
0846                 {
0847                     Queue writeRequestQueue = session.getWriteRequestQueue();
0848                     synchronized (writeRequestQueue)
0849                     {
0850                         if (!writeRequestQueue.isEmpty())
0851                         {
0852                             ops = SelectionKey.OP_WRITE;
0853                             synchronized (writeLock)
0854                             {
0855                                 key.interestOps(ops & mask);
0856                             }
0857                         }
0858                     }
0859                 }
0860             }
0861         }
0862         finally
0863         {
0864             trafficMaskUpdateLock.unlock();
0865         }
0866 
0867     }
0868 
0869     private class WriteWorker implements Runnable
0870     {
0871 
0872         public void run()
0873         {
0874             Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer");
0875 
0876             //System.out.println("WriteDebug:"+"Startup");
0877             for (; ;)
0878             {
0879                 try
0880                 {
0881                     int nKeys = writeSelector.select(SELECTOR_TIMEOUT);
0882 
0883                     doAddNewWrite();
0884                     doUpdateTrafficMask();
0885 
0886                     if (nKeys > 0)
0887                     {
0888                         //System.out.println("WriteDebug:"+nKeys + " keys from writeselector");
0889                         processWrite(writeSelector.selectedKeys());
0890                     }
0891                     else
0892                     {
0893                         //System.out.println("WriteDebug:"+"No keys from writeselector");
0894                     }
0895 
0896                     doRemove();
0897                     notifyWriteIdleness();
0898 
0899                     if (flushingSessionsSet.size() 0)
0900                     {
0901                         doFlush();
0902                     }
0903 
0904                     if (writeSelector.keys().isEmpty())
0905                     {
0906                         synchronized (writeLock)
0907                         {
0908 
0909                             if (writeSelector.keys().isEmpty() && newSessions.isEmpty())
0910                             {
0911                                 writeWorker = null;
0912                                 try
0913                                 {
0914                                     writeSelector.close();
0915                                 }
0916                                 catch (IOException e)
0917                                 {
0918                                     ExceptionMonitor.getInstance().exceptionCaught(e);
0919                                 }
0920                                 finally
0921                                 {
0922                                     writeSelector = null;
0923                                 }
0924 
0925                                 break;
0926                             }
0927                         }
0928                     }
0929 
0930                 }
0931                 catch (Throwable t)
0932                 {
0933                     ExceptionMonitor.getInstance().exceptionCaught(t);
0934 
0935                     try
0936                     {
0937                         Thread.sleep(1000);
0938                     }
0939                     catch (InterruptedException e1)
0940                     {
0941                         ExceptionMonitor.getInstance().exceptionCaught(e1);
0942                     }
0943                 }
0944             }
0945             //System.out.println("WriteDebug:"+"Shutdown");
0946         }
0947 
0948     }
0949 
0950     private class ReadWorker implements Runnable
0951     {
0952 
0953         public void run()
0954         {
0955             Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader");
0956 
0957             //System.out.println("ReadDebug:"+"Startup");
0958             for (; ;)
0959             {
0960                 try
0961                 {
0962                     int nKeys = selector.select(SELECTOR_TIMEOUT);
0963 
0964                     doAddNewReader();
0965                     doUpdateTrafficMask();
0966 
0967                     if (nKeys > 0)
0968                     {
0969                         //System.out.println("ReadDebug:"+nKeys + " keys from selector");
0970 
0971                         processRead(selector.selectedKeys());
0972                     }
0973                     else
0974                     {
0975                         //System.out.println("ReadDebug:"+"No keys from selector");
0976                     }
0977 
0978 
0979                     doRemove();
0980                     notifyReadIdleness();
0981 
0982                     if (selector.keys().isEmpty())
0983                     {
0984 
0985                         synchronized (readLock)
0986                         {
0987                             if (selector.keys().isEmpty() && newSessions.isEmpty())
0988                             {
0989                                 readWorker = null;
0990                                 try
0991                                 {
0992                                     selector.close();
0993                                 }
0994                                 catch (IOException e)
0995                                 {
0996                                     ExceptionMonitor.getInstance().exceptionCaught(e);
0997                                 }
0998                                 finally
0999                                 {
1000                                     selector = null;
1001                                 }
1002 
1003                                 break;
1004                             }
1005                         }
1006                     }
1007                 }
1008                 catch (Throwable t)
1009                 {
1010                     ExceptionMonitor.getInstance().exceptionCaught(t);
1011 
1012                     try
1013                     {
1014                         Thread.sleep(1000);
1015                     }
1016                     catch (InterruptedException e1)
1017                     {
1018                         ExceptionMonitor.getInstance().exceptionCaught(e1);
1019                     }
1020                 }
1021             }
1022             //System.out.println("ReadDebug:"+"Shutdown");
1023         }
1024 
1025     }
1026 }