MultiThreadSocketSessionImpl.java
001 /*
002  *  Licensed to the Apache Software Foundation (ASF) under one
003  *  or more contributor license agreements.  See the NOTICE file
004  *  distributed with this work for additional information
005  *  regarding copyright ownership.  The ASF licenses this file
006  *  to you under the Apache License, Version 2.0 (the
007  *  "License"); you may not use this file except in compliance
008  *  with the License.  You may obtain a copy of the License at
009  *
010  *    http://www.apache.org/licenses/LICENSE-2.0
011  *
012  *  Unless required by applicable law or agreed to in writing,
013  *  software distributed under the License is distributed on an
014  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015  *  KIND, either express or implied.  See the License for the
016  *  specific language governing permissions and limitations
017  *  under the License.
018  *
019  */
020 package org.apache.mina.transport.socket.nio;
021 
022 import org.apache.mina.common.IoFilter.WriteRequest;
023 import org.apache.mina.common.IoFilterChain;
024 import org.apache.mina.common.IoHandler;
025 import org.apache.mina.common.IoService;
026 import org.apache.mina.common.IoServiceConfig;
027 import org.apache.mina.common.IoSession;
028 import org.apache.mina.common.IoSessionConfig;
029 import org.apache.mina.common.RuntimeIOException;
030 import org.apache.mina.common.TransportType;
031 import org.apache.mina.common.support.BaseIoSessionConfig;
032 import org.apache.mina.common.support.IoServiceListenerSupport;
033 import org.apache.mina.util.Queue;
034 
035 import java.net.SocketAddress;
036 import java.net.SocketException;
037 import java.nio.channels.SelectionKey;
038 import java.nio.channels.SocketChannel;
039 import java.util.concurrent.CountDownLatch;
040 import java.util.concurrent.atomic.AtomicBoolean;
041 
042 /**
043  * An {@link IoSession} for socket transport (TCP/IP).
044  *
045  @author The Apache Directory Project (mina-dev@directory.apache.org)
046  @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
047  */
048 class MultiThreadSocketSessionImpl extends SocketSessionImpl
049 {
050     private final IoService manager;
051     private final IoServiceConfig serviceConfig;
052     private final SocketSessionConfig config = new SessionConfigImpl();
053     private final MultiThreadSocketIoProcessor ioProcessor;
054     private final MultiThreadSocketFilterChain filterChain;
055     private final SocketChannel ch;
056     private final Queue writeRequestQueue;
057     private final IoHandler handler;
058     private final SocketAddress remoteAddress;
059     private final SocketAddress localAddress;
060     private final SocketAddress serviceAddress;
061     private final IoServiceListenerSupport serviceListeners;
062     private SelectionKey readKey, writeKey;
063     private int readBufferSize;
064     private CountDownLatch registeredReadyLatch = new CountDownLatch(2);
065     private AtomicBoolean created = new AtomicBoolean(false);
066 
067     /**
068      * Creates a new instance.
069      */
070     MultiThreadSocketSessionImplIoService manager,
071                        SocketIoProcessor ioProcessor,
072                        IoServiceListenerSupport listeners,
073                        IoServiceConfig serviceConfig,
074                        SocketChannel ch,
075                        IoHandler defaultHandler,
076                        SocketAddress serviceAddress )
077     {
078         super(manager, ioProcessor, listeners, serviceConfig, ch,defaultHandler,serviceAddress);
079         this.manager = manager;
080         this.serviceListeners = listeners;
081         this.ioProcessor = (MultiThreadSocketIoProcessorioProcessor;
082         this.filterChain = new MultiThreadSocketFilterChain(this);
083         this.ch = ch;
084         this.writeRequestQueue = new Queue();
085         this.handler = defaultHandler;
086         this.remoteAddress = ch.socket().getRemoteSocketAddress();
087         this.localAddress = ch.socket().getLocalSocketAddress();
088         this.serviceAddress = serviceAddress;
089         this.serviceConfig = serviceConfig;
090 
091         // Apply the initial session settings
092         IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
093         ifsessionConfig instanceof SocketSessionConfig )
094         {
095             SocketSessionConfig cfg = SocketSessionConfig sessionConfig;
096             this.config.setKeepAlivecfg.isKeepAlive() );
097             this.config.setOobInlinecfg.isOobInline() );
098             this.config.setReceiveBufferSizecfg.getReceiveBufferSize() );
099             this.readBufferSize = cfg.getReceiveBufferSize();
100             this.config.setReuseAddresscfg.isReuseAddress() );
101             this.config.setSendBufferSizecfg.getSendBufferSize() );
102             this.config.setSoLingercfg.getSoLinger() );
103             this.config.setTcpNoDelaycfg.isTcpNoDelay() );
104 
105             ifthis.config.getTrafficClass() != cfg.getTrafficClass() )
106             {
107                 this.config.setTrafficClasscfg.getTrafficClass() );
108             }
109         }
110     }
111 
112     void awaitRegistration() throws InterruptedException
113     {
114         registeredReadyLatch.countDown();
115 
116         registeredReadyLatch.await();
117     }
118 
119     boolean created() throws InterruptedException
120     {
121         return created.get();
122     }
123 
124     void doneCreation()
125     {
126         created.getAndSet(true);
127     }
128 
129     public IoService getService()
130     {
131         return manager;
132     }
133 
134     public IoServiceConfig getServiceConfig()
135     {
136         return serviceConfig;
137     }
138 
139     public IoSessionConfig getConfig()
140     {
141         return config;
142     }
143 
144     SocketIoProcessor getIoProcessor()
145     {
146         return ioProcessor;
147     }
148 
149     public IoFilterChain getFilterChain()
150     {
151         return filterChain;
152     }
153 
154     SocketChannel getChannel()
155     {
156         return ch;
157     }
158 
159     IoServiceListenerSupport getServiceListeners()
160     {
161         return serviceListeners;
162     }
163 
164     SelectionKey getSelectionKey()
165     {
166         return readKey;
167     }
168 
169     SelectionKey getReadSelectionKey()
170     {
171         return readKey;
172     }
173 
174     SelectionKey getWriteSelectionKey()
175     {
176         return writeKey;
177     }
178 
179     void setSelectionKey(SelectionKey key)
180     {
181         this.readKey = key;
182     }
183 
184     void setWriteSelectionKey(SelectionKey key)
185     {
186         this.writeKey = key;
187     }
188 
189     public IoHandler getHandler()
190     {
191         return handler;
192     }
193 
194     protected void close0()
195     {
196         filterChain.fireFilterClosethis );
197     }
198 
199     Queue getWriteRequestQueue()
200     {
201         return writeRequestQueue;
202     }
203 
204     /**
205      @return int Number of write scheduled write requests
206      @deprecated
207      */
208     public int getScheduledWriteMessages()
209     {
210         return getScheduledWriteRequests();
211     }
212 
213     public int getScheduledWriteRequests()
214     {
215         synchronizedwriteRequestQueue )
216         {
217             return writeRequestQueue.size();
218         }
219     }
220 
221     public int getScheduledWriteBytes()
222     {
223         synchronizedwriteRequestQueue )
224         {
225             return writeRequestQueue.byteSize();
226         }
227     }
228 
229     protected void write0WriteRequest writeRequest )
230     {
231         filterChain.fireFilterWritethis, writeRequest );
232     }
233 
234     public TransportType getTransportType()
235     {
236         return TransportType.SOCKET;
237     }
238 
239     public SocketAddress getRemoteAddress()
240     {
241         //This is what I had previously
242 //        return ch.socket().getRemoteSocketAddress();
243         return remoteAddress;
244     }
245 
246     public SocketAddress getLocalAddress()
247     {
248         //This is what I had previously
249 //        return ch.socket().getLocalSocketAddress();
250         return localAddress;
251     }
252 
253     public SocketAddress getServiceAddress()
254     {
255         return serviceAddress;
256     }
257 
258     protected void updateTrafficMask()
259     {
260         this.ioProcessor.updateTrafficMaskthis );
261     }
262 
263     int getReadBufferSize()
264     {
265         return readBufferSize;
266     }
267 
268     private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig
269     {
270         public boolean isKeepAlive()
271         {
272             try
273             {
274                 return ch.socket().getKeepAlive();
275             }
276             catchSocketException e )
277             {
278                 throw new RuntimeIOException);
279             }
280         }
281 
282         public void setKeepAliveboolean on )
283         {
284             try
285             {
286                 ch.socket().setKeepAliveon );
287             }
288             catchSocketException e )
289             {
290                 throw new RuntimeIOException);
291             }
292         }
293 
294         public boolean isOobInline()
295         {
296             try
297             {
298                 return ch.socket().getOOBInline();
299             }
300             catchSocketException e )
301             {
302                 throw new RuntimeIOException);
303             }
304         }
305 
306         public void setOobInlineboolean on )
307         {
308             try
309             {
310                 ch.socket().setOOBInlineon );
311             }
312             catchSocketException e )
313             {
314                 throw new RuntimeIOException);
315             }
316         }
317 
318         public boolean isReuseAddress()
319         {
320             try
321             {
322                 return ch.socket().getReuseAddress();
323             }
324             catchSocketException e )
325             {
326                 throw new RuntimeIOException);
327             }
328         }
329 
330         public void setReuseAddressboolean on )
331         {
332             try
333             {
334                 ch.socket().setReuseAddresson );
335             }
336             catchSocketException e )
337             {
338                 throw new RuntimeIOException);
339             }
340         }
341 
342         public int getSoLinger()
343         {
344             try
345             {
346                 return ch.socket().getSoLinger();
347             }
348             catchSocketException e )
349             {
350                 throw new RuntimeIOException);
351             }
352         }
353 
354         public void setSoLingerint linger )
355         {
356             try
357             {
358                 iflinger < )
359                 {
360                     ch.socket().setSoLingerfalse, );
361                 }
362                 else
363                 {
364                     ch.socket().setSoLingertrue, linger );
365                 }
366             }
367             catchSocketException e )
368             {
369                 throw new RuntimeIOException);
370             }
371         }
372 
373         public boolean isTcpNoDelay()
374         {
375             try
376             {
377                 return ch.socket().getTcpNoDelay();
378             }
379             catchSocketException e )
380             {
381                 throw new RuntimeIOException);
382             }
383         }
384 
385         public void setTcpNoDelayboolean on )
386         {
387             try
388             {
389                 ch.socket().setTcpNoDelayon );
390             }
391             catchSocketException e )
392             {
393                 throw new RuntimeIOException);
394             }
395         }
396 
397         public int getTrafficClass()
398         {
399             ifSocketSessionConfigImpl.isGetTrafficClassAvailable() )
400             {
401                 try
402                 {
403                     return ch.socket().getTrafficClass();
404                 }
405                 catchSocketException e )
406                 {
407                     // Throw an exception only when setTrafficClass is also available.
408                     ifSocketSessionConfigImpl.isSetTrafficClassAvailable() )
409                     {
410                         throw new RuntimeIOException);
411                     }
412                 }
413             }
414 
415             return 0;
416         }
417 
418         public void setTrafficClassint tc )
419         {
420             ifSocketSessionConfigImpl.isSetTrafficClassAvailable() )
421             {
422                 try
423                 {
424                     ch.socket().setTrafficClasstc );
425                 }
426                 catchSocketException e )
427                 {
428                     throw new RuntimeIOException);
429                 }
430             }
431         }
432 
433         public int getSendBufferSize()
434         {
435             try
436             {
437                 return ch.socket().getSendBufferSize();
438             }
439             catchSocketException e )
440             {
441                 throw new RuntimeIOException);
442             }
443         }
444 
445         public void setSendBufferSizeint size )
446         {
447             ifSocketSessionConfigImpl.isSetSendBufferSizeAvailable() )
448             {
449                 try
450                 {
451                     ch.socket().setSendBufferSizesize );
452                 }
453                 catchSocketException e )
454                 {
455                     throw new RuntimeIOException);
456                 }
457             }
458         }
459 
460         public int getReceiveBufferSize()
461         {
462             try
463             {
464                 return ch.socket().getReceiveBufferSize();
465             }
466             catchSocketException e )
467             {
468                 throw new RuntimeIOException);
469             }
470         }
471 
472         public void setReceiveBufferSizeint size )
473         {
474             ifSocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() )
475             {
476                 try
477                 {
478                     ch.socket().setReceiveBufferSizesize );
479                     MultiThreadSocketSessionImpl.this.readBufferSize = size;
480                 }
481                 catchSocketException e )
482                 {
483                     throw new RuntimeIOException);
484                 }
485             }
486         }
487     }
488 }