001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 *
019 */
020 package org.apache.mina.transport.socket.nio;
021
022 import org.apache.mina.common.IoFilter.WriteRequest;
023 import org.apache.mina.common.IoFilterChain;
024 import org.apache.mina.common.IoHandler;
025 import org.apache.mina.common.IoService;
026 import org.apache.mina.common.IoServiceConfig;
027 import org.apache.mina.common.IoSession;
028 import org.apache.mina.common.IoSessionConfig;
029 import org.apache.mina.common.RuntimeIOException;
030 import org.apache.mina.common.TransportType;
031 import org.apache.mina.common.support.BaseIoSessionConfig;
032 import org.apache.mina.common.support.IoServiceListenerSupport;
033 import org.apache.mina.util.Queue;
034
035 import java.net.SocketAddress;
036 import java.net.SocketException;
037 import java.nio.channels.SelectionKey;
038 import java.nio.channels.SocketChannel;
039 import java.util.concurrent.CountDownLatch;
040 import java.util.concurrent.atomic.AtomicBoolean;
041
042 /**
043 * An {@link IoSession} for socket transport (TCP/IP).
044 *
045 * @author The Apache Directory Project (mina-dev@directory.apache.org)
046 * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
047 */
048 class MultiThreadSocketSessionImpl extends SocketSessionImpl
049 {
050 private final IoService manager;
051 private final IoServiceConfig serviceConfig;
052 private final SocketSessionConfig config = new SessionConfigImpl();
053 private final MultiThreadSocketIoProcessor ioProcessor;
054 private final MultiThreadSocketFilterChain filterChain;
055 private final SocketChannel ch;
056 private final Queue writeRequestQueue;
057 private final IoHandler handler;
058 private final SocketAddress remoteAddress;
059 private final SocketAddress localAddress;
060 private final SocketAddress serviceAddress;
061 private final IoServiceListenerSupport serviceListeners;
062 private SelectionKey readKey, writeKey;
063 private int readBufferSize;
064 private CountDownLatch registeredReadyLatch = new CountDownLatch(2);
065 private AtomicBoolean created = new AtomicBoolean(false);
066
067 /**
068 * Creates a new instance.
069 */
070 MultiThreadSocketSessionImpl( IoService manager,
071 SocketIoProcessor ioProcessor,
072 IoServiceListenerSupport listeners,
073 IoServiceConfig serviceConfig,
074 SocketChannel ch,
075 IoHandler defaultHandler,
076 SocketAddress serviceAddress )
077 {
078 super(manager, ioProcessor, listeners, serviceConfig, ch,defaultHandler,serviceAddress);
079 this.manager = manager;
080 this.serviceListeners = listeners;
081 this.ioProcessor = (MultiThreadSocketIoProcessor) ioProcessor;
082 this.filterChain = new MultiThreadSocketFilterChain(this);
083 this.ch = ch;
084 this.writeRequestQueue = new Queue();
085 this.handler = defaultHandler;
086 this.remoteAddress = ch.socket().getRemoteSocketAddress();
087 this.localAddress = ch.socket().getLocalSocketAddress();
088 this.serviceAddress = serviceAddress;
089 this.serviceConfig = serviceConfig;
090
091 // Apply the initial session settings
092 IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
093 if( sessionConfig instanceof SocketSessionConfig )
094 {
095 SocketSessionConfig cfg = ( SocketSessionConfig ) sessionConfig;
096 this.config.setKeepAlive( cfg.isKeepAlive() );
097 this.config.setOobInline( cfg.isOobInline() );
098 this.config.setReceiveBufferSize( cfg.getReceiveBufferSize() );
099 this.readBufferSize = cfg.getReceiveBufferSize();
100 this.config.setReuseAddress( cfg.isReuseAddress() );
101 this.config.setSendBufferSize( cfg.getSendBufferSize() );
102 this.config.setSoLinger( cfg.getSoLinger() );
103 this.config.setTcpNoDelay( cfg.isTcpNoDelay() );
104
105 if( this.config.getTrafficClass() != cfg.getTrafficClass() )
106 {
107 this.config.setTrafficClass( cfg.getTrafficClass() );
108 }
109 }
110 }
111
112 void awaitRegistration() throws InterruptedException
113 {
114 registeredReadyLatch.countDown();
115
116 registeredReadyLatch.await();
117 }
118
119 boolean created() throws InterruptedException
120 {
121 return created.get();
122 }
123
124 void doneCreation()
125 {
126 created.getAndSet(true);
127 }
128
129 public IoService getService()
130 {
131 return manager;
132 }
133
134 public IoServiceConfig getServiceConfig()
135 {
136 return serviceConfig;
137 }
138
139 public IoSessionConfig getConfig()
140 {
141 return config;
142 }
143
144 SocketIoProcessor getIoProcessor()
145 {
146 return ioProcessor;
147 }
148
149 public IoFilterChain getFilterChain()
150 {
151 return filterChain;
152 }
153
154 SocketChannel getChannel()
155 {
156 return ch;
157 }
158
159 IoServiceListenerSupport getServiceListeners()
160 {
161 return serviceListeners;
162 }
163
164 SelectionKey getSelectionKey()
165 {
166 return readKey;
167 }
168
169 SelectionKey getReadSelectionKey()
170 {
171 return readKey;
172 }
173
174 SelectionKey getWriteSelectionKey()
175 {
176 return writeKey;
177 }
178
179 void setSelectionKey(SelectionKey key)
180 {
181 this.readKey = key;
182 }
183
184 void setWriteSelectionKey(SelectionKey key)
185 {
186 this.writeKey = key;
187 }
188
189 public IoHandler getHandler()
190 {
191 return handler;
192 }
193
194 protected void close0()
195 {
196 filterChain.fireFilterClose( this );
197 }
198
199 Queue getWriteRequestQueue()
200 {
201 return writeRequestQueue;
202 }
203
204 /**
205 @return int Number of write scheduled write requests
206 @deprecated
207 */
208 public int getScheduledWriteMessages()
209 {
210 return getScheduledWriteRequests();
211 }
212
213 public int getScheduledWriteRequests()
214 {
215 synchronized( writeRequestQueue )
216 {
217 return writeRequestQueue.size();
218 }
219 }
220
221 public int getScheduledWriteBytes()
222 {
223 synchronized( writeRequestQueue )
224 {
225 return writeRequestQueue.byteSize();
226 }
227 }
228
229 protected void write0( WriteRequest writeRequest )
230 {
231 filterChain.fireFilterWrite( this, writeRequest );
232 }
233
234 public TransportType getTransportType()
235 {
236 return TransportType.SOCKET;
237 }
238
239 public SocketAddress getRemoteAddress()
240 {
241 //This is what I had previously
242 // return ch.socket().getRemoteSocketAddress();
243 return remoteAddress;
244 }
245
246 public SocketAddress getLocalAddress()
247 {
248 //This is what I had previously
249 // return ch.socket().getLocalSocketAddress();
250 return localAddress;
251 }
252
253 public SocketAddress getServiceAddress()
254 {
255 return serviceAddress;
256 }
257
258 protected void updateTrafficMask()
259 {
260 this.ioProcessor.updateTrafficMask( this );
261 }
262
263 int getReadBufferSize()
264 {
265 return readBufferSize;
266 }
267
268 private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig
269 {
270 public boolean isKeepAlive()
271 {
272 try
273 {
274 return ch.socket().getKeepAlive();
275 }
276 catch( SocketException e )
277 {
278 throw new RuntimeIOException( e );
279 }
280 }
281
282 public void setKeepAlive( boolean on )
283 {
284 try
285 {
286 ch.socket().setKeepAlive( on );
287 }
288 catch( SocketException e )
289 {
290 throw new RuntimeIOException( e );
291 }
292 }
293
294 public boolean isOobInline()
295 {
296 try
297 {
298 return ch.socket().getOOBInline();
299 }
300 catch( SocketException e )
301 {
302 throw new RuntimeIOException( e );
303 }
304 }
305
306 public void setOobInline( boolean on )
307 {
308 try
309 {
310 ch.socket().setOOBInline( on );
311 }
312 catch( SocketException e )
313 {
314 throw new RuntimeIOException( e );
315 }
316 }
317
318 public boolean isReuseAddress()
319 {
320 try
321 {
322 return ch.socket().getReuseAddress();
323 }
324 catch( SocketException e )
325 {
326 throw new RuntimeIOException( e );
327 }
328 }
329
330 public void setReuseAddress( boolean on )
331 {
332 try
333 {
334 ch.socket().setReuseAddress( on );
335 }
336 catch( SocketException e )
337 {
338 throw new RuntimeIOException( e );
339 }
340 }
341
342 public int getSoLinger()
343 {
344 try
345 {
346 return ch.socket().getSoLinger();
347 }
348 catch( SocketException e )
349 {
350 throw new RuntimeIOException( e );
351 }
352 }
353
354 public void setSoLinger( int linger )
355 {
356 try
357 {
358 if( linger < 0 )
359 {
360 ch.socket().setSoLinger( false, 0 );
361 }
362 else
363 {
364 ch.socket().setSoLinger( true, linger );
365 }
366 }
367 catch( SocketException e )
368 {
369 throw new RuntimeIOException( e );
370 }
371 }
372
373 public boolean isTcpNoDelay()
374 {
375 try
376 {
377 return ch.socket().getTcpNoDelay();
378 }
379 catch( SocketException e )
380 {
381 throw new RuntimeIOException( e );
382 }
383 }
384
385 public void setTcpNoDelay( boolean on )
386 {
387 try
388 {
389 ch.socket().setTcpNoDelay( on );
390 }
391 catch( SocketException e )
392 {
393 throw new RuntimeIOException( e );
394 }
395 }
396
397 public int getTrafficClass()
398 {
399 if( SocketSessionConfigImpl.isGetTrafficClassAvailable() )
400 {
401 try
402 {
403 return ch.socket().getTrafficClass();
404 }
405 catch( SocketException e )
406 {
407 // Throw an exception only when setTrafficClass is also available.
408 if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
409 {
410 throw new RuntimeIOException( e );
411 }
412 }
413 }
414
415 return 0;
416 }
417
418 public void setTrafficClass( int tc )
419 {
420 if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
421 {
422 try
423 {
424 ch.socket().setTrafficClass( tc );
425 }
426 catch( SocketException e )
427 {
428 throw new RuntimeIOException( e );
429 }
430 }
431 }
432
433 public int getSendBufferSize()
434 {
435 try
436 {
437 return ch.socket().getSendBufferSize();
438 }
439 catch( SocketException e )
440 {
441 throw new RuntimeIOException( e );
442 }
443 }
444
445 public void setSendBufferSize( int size )
446 {
447 if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() )
448 {
449 try
450 {
451 ch.socket().setSendBufferSize( size );
452 }
453 catch( SocketException e )
454 {
455 throw new RuntimeIOException( e );
456 }
457 }
458 }
459
460 public int getReceiveBufferSize()
461 {
462 try
463 {
464 return ch.socket().getReceiveBufferSize();
465 }
466 catch( SocketException e )
467 {
468 throw new RuntimeIOException( e );
469 }
470 }
471
472 public void setReceiveBufferSize( int size )
473 {
474 if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() )
475 {
476 try
477 {
478 ch.socket().setReceiveBufferSize( size );
479 MultiThreadSocketSessionImpl.this.readBufferSize = size;
480 }
481 catch( SocketException e )
482 {
483 throw new RuntimeIOException( e );
484 }
485 }
486 }
487 }
488 }
|