FailoverHandler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client.failover;
022 
023 import org.apache.mina.common.IoSession;
024 
025 import org.apache.qpid.AMQDisconnectedException;
026 import org.apache.qpid.client.protocol.AMQProtocolHandler;
027 import org.apache.qpid.client.state.AMQStateManager;
028 
029 import org.slf4j.Logger;
030 import org.slf4j.LoggerFactory;
031 
032 import java.util.concurrent.CountDownLatch;
033 
034 /**
035  * FailoverHandler is a continuation that performs the failover procedure on a protocol session. As described in the
036  * class level comment for {@link AMQProtocolHandler}, a protocol connection can span many physical transport
037  * connections, failing over to a new connection if the transport connection fails. The procedure to establish a new
038  * connection is expressed as a continuation, in order that it may be run in a seperate thread to the i/o thread that
039  * detected the failure and is used to handle the communication to establish a new connection.
040  *
041  </p>The reason this needs to be a separate thread is because this work cannot be done inside the i/o processor
042  * thread. The significant task is the connection setup which involves a protocol exchange until a particular state
043  * is achieved. This procedure waits until the state is achieved which would prevent the i/o thread doing the work
044  * it needs to do to achieve the new state.
045  *
046  <p/>The failover procedure does the following:
047  *
048  <ol>
049  <li>Sets the failing over condition to true.</li>
050  <li>Creates a {@link FailoverException} and gets the protocol connection handler to propagate this event to all
051  *     interested parties.</li>
052  <li>Takes the failover mutex on the protocol connection handler.</li>
053  <li>Abandons the fail over if any of the interested parties vetoes it. The mutex is released and the condition
054  *     reset.</li>
055  <li>Creates a new {@link AMQStateManager} and re-established the connection through it.</li>
056  <li>Informs the AMQConnection if the connection cannot be re-established.</li>
057  <li>Recreates all sessions from the old connection to the new.</li>
058  <li>Resets the failing over condition and releases the mutex.</li>
059  </ol>
060  *
061  <p/><table id="crc"><caption>CRC Card</caption>
062  <tr><th> Responsibilities <th> Collaborations
063  <tr><td> Update fail-over state <td> {@link AMQProtocolHandler}
064  </table>
065  *
066  * @todo The failover latch and mutex are used like a lock and condition. If the retrotranlator supports lock/condition
067  *       then could change over to using them. 1.4 support still needed.
068  *
069  * @todo If the condition is set to null on a vetoes fail-over and there are already other threads waiting on the
070  *       condition, they will never be released. It might be an idea to reset the condition in a finally block.
071  *
072  * @todo Creates a {@link AMQDisconnectedException} and passes it to the AMQConnection. No need to use an
073  *       exception-as-argument here, could just as easily call a specific method for this purpose on AMQConnection.
074  *
075  * @todo Creates a {@link FailoverException} and propagates it to the MethodHandlers. No need to use an
076  *       exception-as-argument here, could just as easily call a specific method for this purpose on
077  *       {@link org.apache.qpid.protocol.AMQMethodListener}.
078  */
079 public class FailoverHandler implements Runnable
080 {
081     /** Used for debugging. */
082     private static final Logger _logger = LoggerFactory.getLogger(FailoverHandler.class);
083 
084     /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */
085     private final IoSession _session;
086 
087     /** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */
088     private AMQProtocolHandler _amqProtocolHandler;
089 
090     /** Used to hold the host to fail over to. This is optional and if not set a reconnect to the previous host is tried. */
091     private String _host;
092 
093     /** Used to hold the port to fail over to. */
094     private int _port;
095 
096     /**
097      * Creates a failover handler on a protocol session, for a particular MINA session (network connection).
098      *
099      @param amqProtocolHandler The protocol handler that spans the failover.
100      @param session            The MINA session, for the failing connection.
101      */
102     public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
103     {
104         _amqProtocolHandler = amqProtocolHandler;
105         _session = session;
106     }
107 
108     /**
109      * Performs the failover procedure. See the class level comment, {@link FailoverHandler}, for a description of the
110      * failover procedure.
111      */
112     public void run()
113     {
114         if (Thread.currentThread().isDaemon())
115         {
116             throw new IllegalStateException("FailoverHandler must run on a non-daemon thread.");
117         }
118 
119         // Create a latch, upon which tasks that must not run in parallel with a failover can wait for completion of
120         // the fail over.
121         _amqProtocolHandler.setFailoverLatch(new CountDownLatch(1));
122 
123         // We wake up listeners. If they can handle failover, they will extend the
124         // FailoverRetrySupport class and will in turn block on the latch until failover
125         // has completed before retrying the operation.
126         _amqProtocolHandler.notifyFailoverStarting();
127 
128         // Since failover impacts several structures we protect them all with a single mutex. These structures
129         // are also in child objects of the connection. This allows us to manipulate them without affecting
130         // client code which runs in a separate thread.
131         synchronized (_amqProtocolHandler.getConnection().getFailoverMutex())
132         {
133             //Clear the exception now that we have the failover mutex there can be no one else waiting for a frame so
134             // we can clear the exception.
135             _amqProtocolHandler.failoverInProgress();
136 
137             // We switch in a new state manager temporarily so that the interaction to get to the "connection open"
138             // state works, without us having to terminate any existing "state waiters". We could theoretically
139             // have a state waiter waiting until the connection is closed for some reason. Or in future we may have
140             // a slightly more complex state model therefore I felt it was worthwhile doing this.
141             AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
142 
143             _amqProtocolHandler.setStateManager(new AMQStateManager());
144 
145 
146             if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
147             {
148                 _logger.info("Failover process veto-ed by client");
149 
150                 //Restore Existing State Manager
151                 _amqProtocolHandler.setStateManager(existingStateManager);
152 
153                 //todo: ritchiem these exceptions are useless... Would be better to attempt to propogate exception that
154                 // prompted the failover event.
155                 if (_host != null)
156                 {
157                     _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client"null));
158                 }
159                 else
160                 {
161                     _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client"null));
162                 }
163 
164                 _amqProtocolHandler.getFailoverLatch().countDown();
165                 _amqProtocolHandler.setFailoverLatch(null);
166 
167                 return;
168             }
169 
170             _logger.info("Starting failover process");
171 
172             boolean failoverSucceeded;
173             // when host is non null we have a specified failover host otherwise we all the client to cycle through
174             // all specified hosts
175 
176             // if _host has value then we are performing a redirect.
177             if (_host != null)
178             {
179                 failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(_host, _port);
180             }
181             else
182             {
183                 failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection();
184             }
185 
186             if (!failoverSucceeded)
187             {
188                 //Restore Existing State Manager
189                 _amqProtocolHandler.setStateManager(existingStateManager);
190 
191                 _amqProtocolHandler.getConnection().exceptionReceived(
192                         new AMQDisconnectedException("Server closed connection and no failover " +
193                                 "was successful"null));
194             }
195             else
196             {
197                 // Set the new Protocol Session in the StateManager.               
198                 existingStateManager.setProtocolSession(_amqProtocolHandler.getProtocolSession());
199 
200                 //Restore Existing State Manager
201                 _amqProtocolHandler.setStateManager(existingStateManager);
202                 try
203                 {
204                     if (_amqProtocolHandler.getConnection().firePreResubscribe())
205                     {
206                         _logger.info("Resubscribing on new connection");
207                         _amqProtocolHandler.getConnection().resubscribeSessions();
208                     }
209                     else
210                     {
211                         _logger.info("Client vetoed automatic resubscription");
212                     }
213 
214                     _amqProtocolHandler.getConnection().fireFailoverComplete();
215                     _amqProtocolHandler.setFailoverState(FailoverState.NOT_STARTED);
216                     _logger.info("Connection failover completed successfully");
217                 }
218                 catch (Exception e)
219                 {
220                     _logger.info("Failover process failed - exception being propagated by protocol handler");
221                     _amqProtocolHandler.setFailoverState(FailoverState.FAILED);
222                     /*try
223                     {*/
224                     _amqProtocolHandler.exceptionCaught(_session, e);
225                     /*}
226                     catch (Exception ex)
227                     {
228                         _logger.error("Error notifying protocol session of error: " + ex, ex);
229                     }*/
230                 }
231             }
232         }
233 
234         _amqProtocolHandler.getFailoverLatch().countDown();
235     }
236 
237     /**
238      * Sets the host name to fail over to. This is optional and if not set a reconnect to the previous host is tried.
239      *
240      @param host The host name to fail over to.
241      */
242     public void setHost(String host)
243     {
244         _host = host;
245     }
246 
247     /**
248      * Sets the port to fail over to.
249      *
250      @param port The port to fail over to.
251      */
252     public void setPort(int port)
253     {
254         _port = port;
255     }
256 }