001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 *
019 */
020 package org.apache.mina.transport.vmpipe;
021
022 import java.io.IOException;
023 import java.net.SocketAddress;
024
025 import org.apache.mina.common.ConnectFuture;
026 import org.apache.mina.common.ExceptionMonitor;
027 import org.apache.mina.common.IoFilterChain;
028 import org.apache.mina.common.IoHandler;
029 import org.apache.mina.common.IoServiceConfig;
030 import org.apache.mina.common.IoSessionConfig;
031 import org.apache.mina.common.support.AbstractIoFilterChain;
032 import org.apache.mina.common.support.BaseIoConnector;
033 import org.apache.mina.common.support.BaseIoConnectorConfig;
034 import org.apache.mina.common.support.BaseIoSessionConfig;
035 import org.apache.mina.common.support.DefaultConnectFuture;
036 import org.apache.mina.transport.vmpipe.support.VmPipe;
037 import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker;
038 import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl;
039 import org.apache.mina.util.AnonymousSocketAddress;
040
041 /**
042 * Connects to {@link IoHandler}s which is bound on the specified
043 * {@link VmPipeAddress}.
044 *
045 * @author The Apache Directory Project (mina-dev@directory.apache.org)
046 * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
047 */
048 public class QpidVmPipeConnector extends VmPipeConnector
049 {
050 private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {};
051 private final IoServiceConfig defaultConfig = new BaseIoConnectorConfig()
052 {
053 public IoSessionConfig getSessionConfig()
054 {
055 return CONFIG;
056 }
057 };
058
059 /**
060 * Creates a new instance.
061 */
062 public QpidVmPipeConnector()
063 {
064 }
065
066 public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config )
067 {
068 return connect( address, null, handler, config );
069 }
070
071 public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config )
072 {
073 if( address == null )
074 throw new NullPointerException( "address" );
075 if( handler == null )
076 throw new NullPointerException( "handler" );
077 if( ! ( address instanceof VmPipeAddress ) )
078 throw new IllegalArgumentException(
079 "address must be VmPipeAddress." );
080
081 if( config == null )
082 {
083 config = getDefaultConfig();
084 }
085
086 VmPipe entry = ( VmPipe ) VmPipeAcceptor.boundHandlers.get( address );
087 if( entry == null )
088 {
089 return DefaultConnectFuture.newFailedFuture(
090 new IOException( "Endpoint unavailable: " + address ) );
091 }
092
093 DefaultConnectFuture future = new DefaultConnectFuture();
094 VmPipeSessionImpl localSession =
095 new VmPipeSessionImpl(
096 this,
097 config,
098 getListeners(),
099 new Object(), // lock
100 new AnonymousSocketAddress(),
101 handler,
102 entry );
103
104 // initialize acceptor session
105 VmPipeSessionImpl remoteSession = localSession.getRemoteSession();
106 try
107 {
108 IoFilterChain filterChain = remoteSession.getFilterChain();
109 entry.getAcceptor().getFilterChainBuilder().buildFilterChain( filterChain );
110 entry.getConfig().getFilterChainBuilder().buildFilterChain( filterChain );
111 entry.getConfig().getThreadModel().buildFilterChain( filterChain );
112
113 // The following sentences don't throw any exceptions.
114 entry.getListeners().fireSessionCreated( remoteSession );
115 VmPipeIdleStatusChecker.getInstance().addSession( remoteSession );
116 }
117 catch( Throwable t )
118 {
119 ExceptionMonitor.getInstance().exceptionCaught( t );
120 remoteSession.close();
121 }
122
123
124 // initialize connector session
125 try
126 {
127 IoFilterChain filterChain = localSession.getFilterChain();
128 this.getFilterChainBuilder().buildFilterChain( filterChain );
129 config.getFilterChainBuilder().buildFilterChain( filterChain );
130 config.getThreadModel().buildFilterChain( filterChain );
131
132 // The following sentences don't throw any exceptions.
133 localSession.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future );
134 getListeners().fireSessionCreated( localSession );
135 VmPipeIdleStatusChecker.getInstance().addSession( localSession);
136 }
137 catch( Throwable t )
138 {
139 future.setException( t );
140 }
141
142
143
144 return future;
145 }
146
147 public IoServiceConfig getDefaultConfig()
148 {
149 return defaultConfig;
150 }
151 }
|