0001 /*
0002 * Licensed to the Apache Software Foundation (ASF) under one
0003 * or more contributor license agreements. See the NOTICE file
0004 * distributed with this work for additional information
0005 * regarding copyright ownership. The ASF licenses this file
0006 * to you under the Apache License, Version 2.0 (the
0007 * "License"); you may not use this file except in compliance
0008 * with the License. You may obtain a copy of the License at
0009 *
0010 * http://www.apache.org/licenses/LICENSE-2.0
0011 *
0012 * Unless required by applicable law or agreed to in writing,
0013 * software distributed under the License is distributed on an
0014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0015 * KIND, either express or implied. See the License for the
0016 * specific language governing permissions and limitations
0017 * under the License.
0018 *
0019 */
0020 package org.apache.mina.transport.socket.nio;
0021
0022 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
0023 import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
0024 import org.apache.mina.common.ByteBuffer;
0025 import org.apache.mina.common.ExceptionMonitor;
0026 import org.apache.mina.common.IdleStatus;
0027 import org.apache.mina.common.IoFilter.WriteRequest;
0028 import org.apache.mina.common.WriteTimeoutException;
0029 import org.apache.mina.util.IdentityHashSet;
0030 import org.apache.mina.util.NamePreservingRunnable;
0031 import org.apache.mina.util.Queue;
0032 import org.slf4j.Logger;
0033 import org.slf4j.LoggerFactory;
0034
0035 import java.io.IOException;
0036 import java.nio.channels.SelectionKey;
0037 import java.nio.channels.Selector;
0038 import java.nio.channels.SocketChannel;
0039 import java.util.Iterator;
0040 import java.util.Set;
0041 import java.util.concurrent.BlockingQueue;
0042 import java.util.concurrent.LinkedBlockingQueue;
0043
0044 /**
0045 * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
0046 *
0047 * @author The Apache Directory Project (mina-dev@directory.apache.org)
0048 * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $,
0049 */
0050 class MultiThreadSocketIoProcessor extends SocketIoProcessor
0051 {
0052 Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class);
0053 Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader");
0054 Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer");
0055
0056 private static final long SELECTOR_TIMEOUT = 1000L;
0057
0058 private int MAX_READ_BYTES_PER_SESSION = 524288; //512K
0059 private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K
0060
0061 private final Object readLock = new Object();
0062 private final Object writeLock = new Object();
0063
0064 private final String threadName;
0065 private final Executor executor;
0066
0067 private ReentrantLock trafficMaskUpdateLock = new ReentrantLock();
0068
0069 /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
0070 private volatile Selector selector, writeSelector;
0071
0072 private final Queue newSessions = new Queue();
0073 private final Queue removingSessions = new Queue();
0074 private final BlockingQueue flushingSessions = new LinkedBlockingQueue();
0075 private final IdentityHashSet flushingSessionsSet = new IdentityHashSet();
0076
0077 private final Queue trafficControllingSessions = new Queue();
0078
0079 private ReadWorker readWorker;
0080 private WriteWorker writeWorker;
0081 private long lastIdleReadCheckTime = System.currentTimeMillis();
0082 private long lastIdleWriteCheckTime = System.currentTimeMillis();
0083
0084 MultiThreadSocketIoProcessor(String threadName, Executor executor)
0085 {
0086 super(threadName, executor);
0087 this.threadName = threadName;
0088 this.executor = executor;
0089 }
0090
0091 void addNew(SocketSessionImpl session) throws IOException
0092 {
0093 synchronized (newSessions)
0094 {
0095 newSessions.push(session);
0096 }
0097
0098 startupWorker();
0099
0100 selector.wakeup();
0101 writeSelector.wakeup();
0102 }
0103
0104 void remove(SocketSessionImpl session) throws IOException
0105 {
0106 scheduleRemove(session);
0107 startupWorker();
0108 selector.wakeup();
0109 }
0110
0111 private void startupWorker() throws IOException
0112 {
0113 synchronized (readLock)
0114 {
0115 if (readWorker == null)
0116 {
0117 selector = Selector.open();
0118 readWorker = new ReadWorker();
0119 executor.execute(new NamePreservingRunnable(readWorker));
0120 }
0121 }
0122
0123 synchronized (writeLock)
0124 {
0125 if (writeWorker == null)
0126 {
0127 writeSelector = Selector.open();
0128 writeWorker = new WriteWorker();
0129 executor.execute(new NamePreservingRunnable(writeWorker));
0130 }
0131 }
0132
0133 }
0134
0135 void flush(SocketSessionImpl session)
0136 {
0137 scheduleFlush(session);
0138 Selector selector = this.writeSelector;
0139
0140 if (selector != null)
0141 {
0142 selector.wakeup();
0143 }
0144 }
0145
0146 void updateTrafficMask(SocketSessionImpl session)
0147 {
0148 scheduleTrafficControl(session);
0149 Selector selector = this.selector;
0150 if (selector != null)
0151 {
0152 selector.wakeup();
0153 }
0154 }
0155
0156 private void scheduleRemove(SocketSessionImpl session)
0157 {
0158 synchronized (removingSessions)
0159 {
0160 removingSessions.push(session);
0161 }
0162 }
0163
0164 private void scheduleFlush(SocketSessionImpl session)
0165 {
0166 synchronized (flushingSessionsSet)
0167 {
0168 //if flushingSessions grows to contain Integer.MAX_VALUE sessions
0169 // then this will fail.
0170 if (flushingSessionsSet.add(session))
0171 {
0172 flushingSessions.offer(session);
0173 }
0174 }
0175 }
0176
0177 private void scheduleTrafficControl(SocketSessionImpl session)
0178 {
0179 synchronized (trafficControllingSessions)
0180 {
0181 trafficControllingSessions.push(session);
0182 }
0183 }
0184
0185 private void doAddNewReader() throws InterruptedException
0186 {
0187 if (newSessions.isEmpty())
0188 {
0189 return;
0190 }
0191
0192 for (; ;)
0193 {
0194 MultiThreadSocketSessionImpl session;
0195
0196 synchronized (newSessions)
0197 {
0198 session = (MultiThreadSocketSessionImpl) newSessions.peek();
0199 }
0200
0201 if (session == null)
0202 {
0203 break;
0204 }
0205
0206 SocketChannel ch = session.getChannel();
0207
0208
0209 try
0210 {
0211
0212 ch.configureBlocking(false);
0213 session.setSelectionKey(ch.register(selector,
0214 SelectionKey.OP_READ,
0215 session));
0216
0217 //System.out.println("ReadDebug:"+"Awaiting Registration");
0218 session.awaitRegistration();
0219 sessionCreated(session);
0220 }
0221 catch (IOException e)
0222 {
0223 // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
0224 // and call ConnectFuture.setException().
0225 session.getFilterChain().fireExceptionCaught(session, e);
0226 }
0227 }
0228 }
0229
0230
0231 private void doAddNewWrite() throws InterruptedException
0232 {
0233 if (newSessions.isEmpty())
0234 {
0235 return;
0236 }
0237
0238 for (; ;)
0239 {
0240 MultiThreadSocketSessionImpl session;
0241
0242 synchronized (newSessions)
0243 {
0244 session = (MultiThreadSocketSessionImpl) newSessions.peek();
0245 }
0246
0247 if (session == null)
0248 {
0249 break;
0250 }
0251
0252 SocketChannel ch = session.getChannel();
0253
0254 try
0255 {
0256 ch.configureBlocking(false);
0257 synchronized (flushingSessionsSet)
0258 {
0259 flushingSessionsSet.add(session);
0260 }
0261
0262 session.setWriteSelectionKey(ch.register(writeSelector,
0263 SelectionKey.OP_WRITE,
0264 session));
0265
0266 //System.out.println("WriteDebug:"+"Awaiting Registration");
0267 session.awaitRegistration();
0268 sessionCreated(session);
0269 }
0270 catch (IOException e)
0271 {
0272
0273 // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
0274 // and call ConnectFuture.setException().
0275 session.getFilterChain().fireExceptionCaught(session, e);
0276 }
0277 }
0278 }
0279
0280
0281 private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException
0282 {
0283 MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
0284 synchronized (newSessions)
0285 {
0286 if (!session.created())
0287 {
0288 _logger.debug("Popping new session");
0289 newSessions.pop();
0290
0291 // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
0292 // in AbstractIoFilterChain.fireSessionOpened().
0293 session.getServiceListeners().fireSessionCreated(session);
0294
0295 session.doneCreation();
0296 }
0297 }
0298 }
0299
0300 private void doRemove()
0301 {
0302 if (removingSessions.isEmpty())
0303 {
0304 return;
0305 }
0306
0307 for (; ;)
0308 {
0309 MultiThreadSocketSessionImpl session;
0310
0311 synchronized (removingSessions)
0312 {
0313 session = (MultiThreadSocketSessionImpl) removingSessions.pop();
0314 }
0315
0316 if (session == null)
0317 {
0318 break;
0319 }
0320
0321 SocketChannel ch = session.getChannel();
0322 SelectionKey key = session.getReadSelectionKey();
0323 SelectionKey writeKey = session.getWriteSelectionKey();
0324
0325 // Retry later if session is not yet fully initialized.
0326 // (In case that Session.close() is called before addSession() is processed)
0327 if (key == null || writeKey == null)
0328 {
0329 scheduleRemove(session);
0330 break;
0331 }
0332 // skip if channel is already closed
0333 if (!key.isValid() || !writeKey.isValid())
0334 {
0335 continue;
0336 }
0337
0338 try
0339 {
0340 //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session));
0341 synchronized (readLock)
0342 {
0343 key.cancel();
0344 }
0345 synchronized (writeLock)
0346 {
0347 writeKey.cancel();
0348 }
0349 ch.close();
0350 }
0351 catch (IOException e)
0352 {
0353 session.getFilterChain().fireExceptionCaught(session, e);
0354 }
0355 finally
0356 {
0357 releaseWriteBuffers(session);
0358 session.getServiceListeners().fireSessionDestroyed(session);
0359 }
0360 }
0361 }
0362
0363 private void processRead(Set selectedKeys)
0364 {
0365 Iterator it = selectedKeys.iterator();
0366
0367 while (it.hasNext())
0368 {
0369 SelectionKey key = (SelectionKey) it.next();
0370 MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment();
0371
0372 synchronized (readLock)
0373 {
0374 if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable())
0375 {
0376 read(session);
0377 }
0378 }
0379
0380 }
0381
0382 selectedKeys.clear();
0383 }
0384
0385 private void processWrite(Set selectedKeys)
0386 {
0387 Iterator it = selectedKeys.iterator();
0388
0389 while (it.hasNext())
0390 {
0391 SelectionKey key = (SelectionKey) it.next();
0392 SocketSessionImpl session = (SocketSessionImpl) key.attachment();
0393
0394 synchronized (writeLock)
0395 {
0396 if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable())
0397 {
0398
0399 // Clear OP_WRITE
0400 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
0401
0402 synchronized (flushingSessionsSet)
0403 {
0404 flushingSessions.offer(session);
0405 }
0406 }
0407 }
0408 }
0409
0410 selectedKeys.clear();
0411 }
0412
0413 private void read(SocketSessionImpl session)
0414 {
0415
0416 //if (_loggerWrite.isDebugEnabled())
0417 {
0418 //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session));
0419 }
0420
0421 int totalReadBytes = 0;
0422
0423 while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION)
0424 {
0425 ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
0426 SocketChannel ch = session.getChannel();
0427
0428 try
0429 {
0430 buf.clear();
0431
0432 int readBytes = 0;
0433 int ret;
0434
0435 try
0436 {
0437 while ((ret = ch.read(buf.buf())) > 0)
0438 {
0439 readBytes += ret;
0440 totalReadBytes += ret;
0441 }
0442 }
0443 finally
0444 {
0445 buf.flip();
0446 }
0447
0448
0449 if (readBytes > 0)
0450 {
0451 session.increaseReadBytes(readBytes);
0452
0453 session.getFilterChain().fireMessageReceived(session, buf);
0454 buf = null;
0455 }
0456
0457 if (ret <= 0)
0458 {
0459 if (ret == 0)
0460 {
0461 if (readBytes == session.getReadBufferSize())
0462 {
0463 continue;
0464 }
0465 }
0466 else
0467 {
0468 scheduleRemove(session);
0469 }
0470
0471 break;
0472 }
0473 }
0474 catch (Throwable e)
0475 {
0476 if (e instanceof IOException)
0477 {
0478 scheduleRemove(session);
0479 }
0480 session.getFilterChain().fireExceptionCaught(session, e);
0481
0482 //Stop Reading this session.
0483 return;
0484 }
0485 finally
0486 {
0487 if (buf != null)
0488 {
0489 buf.release();
0490 }
0491 }
0492 }//for
0493
0494 // if (_loggerWrite.isDebugEnabled())
0495 {
0496 //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes);
0497 }
0498 }
0499
0500
0501 private void notifyReadIdleness()
0502 {
0503 // process idle sessions
0504 long currentTime = System.currentTimeMillis();
0505 if ((currentTime - lastIdleReadCheckTime) >= 1000)
0506 {
0507 lastIdleReadCheckTime = currentTime;
0508 Set keys = selector.keys();
0509 if (keys != null)
0510 {
0511 for (Iterator it = keys.iterator(); it.hasNext();)
0512 {
0513 SelectionKey key = (SelectionKey) it.next();
0514 SocketSessionImpl session = (SocketSessionImpl) key.attachment();
0515 notifyReadIdleness(session, currentTime);
0516 }
0517 }
0518 }
0519 }
0520
0521 private void notifyWriteIdleness()
0522 {
0523 // process idle sessions
0524 long currentTime = System.currentTimeMillis();
0525 if ((currentTime - lastIdleWriteCheckTime) >= 1000)
0526 {
0527 lastIdleWriteCheckTime = currentTime;
0528 Set keys = writeSelector.keys();
0529 if (keys != null)
0530 {
0531 for (Iterator it = keys.iterator(); it.hasNext();)
0532 {
0533 SelectionKey key = (SelectionKey) it.next();
0534 SocketSessionImpl session = (SocketSessionImpl) key.attachment();
0535 notifyWriteIdleness(session, currentTime);
0536 }
0537 }
0538 }
0539 }
0540
0541 private void notifyReadIdleness(SocketSessionImpl session, long currentTime)
0542 {
0543 notifyIdleness0(
0544 session, currentTime,
0545 session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
0546 IdleStatus.BOTH_IDLE,
0547 Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
0548 notifyIdleness0(
0549 session, currentTime,
0550 session.getIdleTimeInMillis(IdleStatus.READER_IDLE),
0551 IdleStatus.READER_IDLE,
0552 Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
0553
0554 notifyWriteTimeout(session, currentTime, session
0555 .getWriteTimeoutInMillis(), session.getLastWriteTime());
0556 }
0557
0558 private void notifyWriteIdleness(SocketSessionImpl session, long currentTime)
0559 {
0560 notifyIdleness0(
0561 session, currentTime,
0562 session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
0563 IdleStatus.BOTH_IDLE,
0564 Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
0565 notifyIdleness0(
0566 session, currentTime,
0567 session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
0568 IdleStatus.WRITER_IDLE,
0569 Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
0570
0571 notifyWriteTimeout(session, currentTime, session
0572 .getWriteTimeoutInMillis(), session.getLastWriteTime());
0573 }
0574
0575 private void notifyIdleness0(SocketSessionImpl session, long currentTime,
0576 long idleTime, IdleStatus status,
0577 long lastIoTime)
0578 {
0579 if (idleTime > 0 && lastIoTime != 0
0580 && (currentTime - lastIoTime) >= idleTime)
0581 {
0582 session.increaseIdleCount(status);
0583 session.getFilterChain().fireSessionIdle(session, status);
0584 }
0585 }
0586
0587 private void notifyWriteTimeout(SocketSessionImpl session,
0588 long currentTime,
0589 long writeTimeout, long lastIoTime)
0590 {
0591
0592 MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session;
0593 SelectionKey key = sesh.getWriteSelectionKey();
0594
0595 synchronized (writeLock)
0596 {
0597 if (writeTimeout > 0
0598 && (currentTime - lastIoTime) >= writeTimeout
0599 && key != null && key.isValid()
0600 && (key.interestOps() & SelectionKey.OP_WRITE) != 0)
0601 {
0602 session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException());
0603 }
0604 }
0605 }
0606
0607 private SocketSessionImpl getNextFlushingSession()
0608 {
0609 return (SocketSessionImpl) flushingSessions.poll();
0610 }
0611
0612 private void releaseSession(SocketSessionImpl session)
0613 {
0614 synchronized (session.getWriteRequestQueue())
0615 {
0616 synchronized (flushingSessionsSet)
0617 {
0618 if (session.getScheduledWriteRequests() > 0)
0619 {
0620 if (_loggerWrite.isDebugEnabled())
0621 {
0622 //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session));
0623 }
0624 flushingSessions.offer(session);
0625 }
0626 else
0627 {
0628 if (_loggerWrite.isDebugEnabled())
0629 {
0630 //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session));
0631 }
0632 flushingSessionsSet.remove(session);
0633 }
0634 }
0635 }
0636 }
0637
0638 private void releaseWriteBuffers(SocketSessionImpl session)
0639 {
0640 Queue writeRequestQueue = session.getWriteRequestQueue();
0641 WriteRequest req;
0642
0643 //Should this be synchronized?
0644 synchronized (writeRequestQueue)
0645 {
0646 while ((req = (WriteRequest) writeRequestQueue.pop()) != null)
0647 {
0648 try
0649 {
0650 ((ByteBuffer) req.getMessage()).release();
0651 }
0652 catch (IllegalStateException e)
0653 {
0654 session.getFilterChain().fireExceptionCaught(session, e);
0655 }
0656 finally
0657 {
0658 req.getFuture().setWritten(false);
0659 }
0660 }
0661 }
0662 }
0663
0664 private void doFlush()
0665 {
0666 MultiThreadSocketSessionImpl session;
0667
0668 while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null)
0669 {
0670 if (!session.isConnected())
0671 {
0672 releaseWriteBuffers(session);
0673 releaseSession(session);
0674 continue;
0675 }
0676
0677 SelectionKey key = session.getWriteSelectionKey();
0678 // Retry later if session is not yet fully initialized.
0679 // (In case that Session.write() is called before addSession() is processed)
0680 if (key == null)
0681 {
0682 scheduleFlush(session);
0683 releaseSession(session);
0684 continue;
0685 }
0686 // skip if channel is already closed
0687 if (!key.isValid())
0688 {
0689 releaseSession(session);
0690 continue;
0691 }
0692
0693 try
0694 {
0695 if (doFlush(session))
0696 {
0697 releaseSession(session);
0698 }
0699 }
0700 catch (IOException e)
0701 {
0702 releaseSession(session);
0703 scheduleRemove(session);
0704 session.getFilterChain().fireExceptionCaught(session, e);
0705 }
0706
0707 }
0708
0709 }
0710
0711 private boolean doFlush(SocketSessionImpl sessionParam) throws IOException
0712 {
0713 MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
0714 // Clear OP_WRITE
0715 SelectionKey key = session.getWriteSelectionKey();
0716 synchronized (writeLock)
0717 {
0718 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
0719 }
0720 SocketChannel ch = session.getChannel();
0721 Queue writeRequestQueue = session.getWriteRequestQueue();
0722
0723 long totalFlushedBytes = 0;
0724 while (true)
0725 {
0726 WriteRequest req;
0727
0728 synchronized (writeRequestQueue)
0729 {
0730 req = (WriteRequest) writeRequestQueue.first();
0731 }
0732
0733 if (req == null)
0734 {
0735 break;
0736 }
0737
0738 ByteBuffer buf = (ByteBuffer) req.getMessage();
0739 if (buf.remaining() == 0)
0740 {
0741 synchronized (writeRequestQueue)
0742 {
0743 writeRequestQueue.pop();
0744 }
0745
0746 session.increaseWrittenMessages();
0747
0748 buf.reset();
0749 session.getFilterChain().fireMessageSent(session, req);
0750 continue;
0751 }
0752
0753
0754 int writtenBytes = 0;
0755
0756 // Reported as DIRMINA-362
0757 //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
0758 if (key.isWritable())
0759 {
0760 writtenBytes = ch.write(buf.buf());
0761 totalFlushedBytes += writtenBytes;
0762 }
0763
0764 if (writtenBytes > 0)
0765 {
0766 session.increaseWrittenBytes(writtenBytes);
0767 }
0768
0769 if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION))
0770 {
0771 // Kernel buffer is full
0772 synchronized (writeLock)
0773 {
0774 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
0775 }
0776 if (_loggerWrite.isDebugEnabled())
0777 {
0778 //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
0779 }
0780 return false;
0781 }
0782 }
0783
0784 if (_loggerWrite.isDebugEnabled())
0785 {
0786 //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
0787 }
0788 return true;
0789 }
0790
0791 private void doUpdateTrafficMask()
0792 {
0793 if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked())
0794 {
0795 return;
0796 }
0797
0798 // Synchronize over entire operation as this method should be called
0799 // from both read and write thread and we don't want the order of the
0800 // updates to get changed.
0801 trafficMaskUpdateLock.lock();
0802 try
0803 {
0804 for (; ;)
0805 {
0806 MultiThreadSocketSessionImpl session;
0807
0808 session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop();
0809
0810 if (session == null)
0811 {
0812 break;
0813 }
0814
0815 SelectionKey key = session.getReadSelectionKey();
0816 // Retry later if session is not yet fully initialized.
0817 // (In case that Session.suspend??() or session.resume??() is
0818 // called before addSession() is processed)
0819 if (key == null)
0820 {
0821 scheduleTrafficControl(session);
0822 break;
0823 }
0824 // skip if channel is already closed
0825 if (!key.isValid())
0826 {
0827 continue;
0828 }
0829
0830 // The normal is OP_READ and, if there are write requests in the
0831 // session's write queue, set OP_WRITE to trigger flushing.
0832
0833 //Sset to Read and Write if there is nothing then the cost
0834 // is one loop through the flusher.
0835 int ops = SelectionKey.OP_READ;
0836
0837 // Now mask the preferred ops with the mask of the current session
0838 int mask = session.getTrafficMask().getInterestOps();
0839 synchronized (readLock)
0840 {
0841 key.interestOps(ops & mask);
0842 }
0843 //Change key to the WriteSelection Key
0844 key = session.getWriteSelectionKey();
0845 if (key != null && key.isValid())
0846 {
0847 Queue writeRequestQueue = session.getWriteRequestQueue();
0848 synchronized (writeRequestQueue)
0849 {
0850 if (!writeRequestQueue.isEmpty())
0851 {
0852 ops = SelectionKey.OP_WRITE;
0853 synchronized (writeLock)
0854 {
0855 key.interestOps(ops & mask);
0856 }
0857 }
0858 }
0859 }
0860 }
0861 }
0862 finally
0863 {
0864 trafficMaskUpdateLock.unlock();
0865 }
0866
0867 }
0868
0869 private class WriteWorker implements Runnable
0870 {
0871
0872 public void run()
0873 {
0874 Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer");
0875
0876 //System.out.println("WriteDebug:"+"Startup");
0877 for (; ;)
0878 {
0879 try
0880 {
0881 int nKeys = writeSelector.select(SELECTOR_TIMEOUT);
0882
0883 doAddNewWrite();
0884 doUpdateTrafficMask();
0885
0886 if (nKeys > 0)
0887 {
0888 //System.out.println("WriteDebug:"+nKeys + " keys from writeselector");
0889 processWrite(writeSelector.selectedKeys());
0890 }
0891 else
0892 {
0893 //System.out.println("WriteDebug:"+"No keys from writeselector");
0894 }
0895
0896 doRemove();
0897 notifyWriteIdleness();
0898
0899 if (flushingSessionsSet.size() > 0)
0900 {
0901 doFlush();
0902 }
0903
0904 if (writeSelector.keys().isEmpty())
0905 {
0906 synchronized (writeLock)
0907 {
0908
0909 if (writeSelector.keys().isEmpty() && newSessions.isEmpty())
0910 {
0911 writeWorker = null;
0912 try
0913 {
0914 writeSelector.close();
0915 }
0916 catch (IOException e)
0917 {
0918 ExceptionMonitor.getInstance().exceptionCaught(e);
0919 }
0920 finally
0921 {
0922 writeSelector = null;
0923 }
0924
0925 break;
0926 }
0927 }
0928 }
0929
0930 }
0931 catch (Throwable t)
0932 {
0933 ExceptionMonitor.getInstance().exceptionCaught(t);
0934
0935 try
0936 {
0937 Thread.sleep(1000);
0938 }
0939 catch (InterruptedException e1)
0940 {
0941 ExceptionMonitor.getInstance().exceptionCaught(e1);
0942 }
0943 }
0944 }
0945 //System.out.println("WriteDebug:"+"Shutdown");
0946 }
0947
0948 }
0949
0950 private class ReadWorker implements Runnable
0951 {
0952
0953 public void run()
0954 {
0955 Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader");
0956
0957 //System.out.println("ReadDebug:"+"Startup");
0958 for (; ;)
0959 {
0960 try
0961 {
0962 int nKeys = selector.select(SELECTOR_TIMEOUT);
0963
0964 doAddNewReader();
0965 doUpdateTrafficMask();
0966
0967 if (nKeys > 0)
0968 {
0969 //System.out.println("ReadDebug:"+nKeys + " keys from selector");
0970
0971 processRead(selector.selectedKeys());
0972 }
0973 else
0974 {
0975 //System.out.println("ReadDebug:"+"No keys from selector");
0976 }
0977
0978
0979 doRemove();
0980 notifyReadIdleness();
0981
0982 if (selector.keys().isEmpty())
0983 {
0984
0985 synchronized (readLock)
0986 {
0987 if (selector.keys().isEmpty() && newSessions.isEmpty())
0988 {
0989 readWorker = null;
0990 try
0991 {
0992 selector.close();
0993 }
0994 catch (IOException e)
0995 {
0996 ExceptionMonitor.getInstance().exceptionCaught(e);
0997 }
0998 finally
0999 {
1000 selector = null;
1001 }
1002
1003 break;
1004 }
1005 }
1006 }
1007 }
1008 catch (Throwable t)
1009 {
1010 ExceptionMonitor.getInstance().exceptionCaught(t);
1011
1012 try
1013 {
1014 Thread.sleep(1000);
1015 }
1016 catch (InterruptedException e1)
1017 {
1018 ExceptionMonitor.getInstance().exceptionCaught(e1);
1019 }
1020 }
1021 }
1022 //System.out.println("ReadDebug:"+"Shutdown");
1023 }
1024
1025 }
1026 }
|