FailoverPolicy.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.jms;
022 
023 import org.apache.qpid.client.AMQConnection;
024 import org.apache.qpid.jms.failover.FailoverExchangeMethod;
025 import org.apache.qpid.jms.failover.FailoverMethod;
026 import org.apache.qpid.jms.failover.FailoverRoundRobinServers;
027 import org.apache.qpid.jms.failover.FailoverSingleServer;
028 
029 import org.slf4j.Logger;
030 import org.slf4j.LoggerFactory;
031 
032 public class FailoverPolicy
033 {
034     private static final Logger _logger = LoggerFactory.getLogger(FailoverPolicy.class);
035 
036     private static final long MINUTE = 60000L;
037 
038     private static final long DEFAULT_METHOD_TIMEOUT = * MINUTE;
039 
040     private FailoverMethod[] _methods = new FailoverMethod[1];
041 
042     private int _currentMethod;
043 
044     private int _methodsRetries;
045 
046     private int _currentRetry;
047 
048     private boolean _timing;
049 
050     private long _lastMethodTime;
051     private long _lastFailTime;
052 
053     public FailoverPolicy(ConnectionURL connectionDetails, AMQConnection conn)
054     {
055         FailoverMethod method;
056 
057         // todo This should be integrated in to the connection url when it supports
058         // multiple strategies.
059 
060         _methodsRetries = 0;
061 
062         if (connectionDetails.getFailoverMethod() == null)
063         {
064             if (connectionDetails.getBrokerCount() 1)
065             {
066                 method = new FailoverRoundRobinServers(connectionDetails);
067             }
068             else
069             {
070                 method = new FailoverSingleServer(connectionDetails);
071             }
072         }
073         else
074         {
075             String failoverMethod = connectionDetails.getFailoverMethod();
076 
077             /*
078                         if (failoverMethod.equals(FailoverMethod.RANDOM))
079                         {
080                             //todo write a random connection Failover
081                         }
082              */
083             if (failoverMethod.equals(FailoverMethod.SINGLE_BROKER))
084             {
085                 method = new FailoverRoundRobinServers(connectionDetails);
086             }
087             else
088             {
089                 if (failoverMethod.equals(FailoverMethod.ROUND_ROBIN))
090                 {
091                     method = new FailoverRoundRobinServers(connectionDetails);
092                 }
093                 else if (failoverMethod.equals(FailoverMethod.FAILOVER_EXCHANGE))
094                 {
095                     method = new FailoverExchangeMethod(connectionDetails, conn);
096                 }
097                 else
098                 {
099                     try
100                     {
101                         Class[] constructorSpec = ConnectionURL.class };
102                         Object[] params = connectionDetails };
103 
104                         method =
105                             (FailoverMethodClassLoader.getSystemClassLoader().loadClass(failoverMethod)
106                                                         .getConstructor(constructorSpec).newInstance(params);
107                     }
108                     catch (Exception cnfe)
109                     {
110                         throw new IllegalArgumentException("Unknown failover method:" + failoverMethod, cnfe);
111                     }
112                 }
113             }
114         }
115 
116         if (method == null)
117         {
118             throw new IllegalArgumentException("Unknown failover method specified.");
119         }
120 
121         reset();
122 
123         _methods[_currentMethod= method;
124     }
125 
126     public FailoverPolicy(FailoverMethod method)
127     {
128         this(method, 0);
129     }
130 
131     public FailoverPolicy(FailoverMethod method, int retries)
132     {
133         _methodsRetries = retries;
134 
135         reset();
136 
137         _methods[_currentMethod= method;
138     }
139 
140     private void reset()
141     {
142         _currentMethod = 0;
143         _currentRetry = 0;
144         _timing = false;
145 
146     }
147 
148     public boolean failoverAllowed()
149     {
150         boolean failoverAllowed;
151 
152         if (_timing)
153         {
154             long now = System.currentTimeMillis();
155 
156             if ((now - _lastMethodTime>= DEFAULT_METHOD_TIMEOUT)
157             {
158                 _logger.info("Failover method timeout");
159                 _lastMethodTime = now;
160 
161                 if (!nextMethod())
162                 {
163                     return false;
164                 }
165 
166             }
167             else
168             {
169                 _lastMethodTime = now;
170             }
171         }
172         else
173         {
174             _timing = true;
175             _lastMethodTime = System.currentTimeMillis();
176             _lastFailTime = _lastMethodTime;
177         }
178 
179         if (_methods[_currentMethod].failoverAllowed())
180         {
181             failoverAllowed = true;
182         }
183         else
184         {
185             if (_currentMethod < (_methods.length - 1))
186             {
187                 nextMethod();
188                 _logger.info("Changing method to " + _methods[_currentMethod].methodName());
189 
190                 return failoverAllowed();
191             }
192             else
193             {
194                 return cycleMethods();
195             }
196         }
197 
198         return failoverAllowed;
199     }
200 
201     private boolean nextMethod()
202     {
203         if (_currentMethod < (_methods.length - 1))
204         {
205             _currentMethod++;
206             _methods[_currentMethod].reset();
207 
208             return true;
209         }
210         else
211         {
212             return cycleMethods();
213         }
214     }
215 
216     private boolean cycleMethods()
217     {
218         if (_currentRetry < _methodsRetries)
219         {
220             _currentRetry++;
221 
222             _currentMethod = 0;
223 
224             _logger.info("Retrying methods starting with " + _methods[_currentMethod].methodName());
225             _methods[_currentMethod].reset();
226 
227             return failoverAllowed();
228         }
229         else
230         {
231             _logger.debug("All failover methods exhausted");
232 
233             return false;
234         }
235     }
236 
237     /**
238      * Notification that connection was successful.
239      */
240     public void attainedConnection()
241     {
242         _currentRetry = 0;
243 
244         _methods[_currentMethod].attainedConnection();
245 
246         _timing = false;
247     }
248 
249     public BrokerDetails getCurrentBrokerDetails()
250     {
251         return _methods[_currentMethod].getCurrentBrokerDetails();
252     }
253 
254     public BrokerDetails getNextBrokerDetails()
255     {
256         return _methods[_currentMethod].getNextBrokerDetails();
257     }
258 
259     public void setBroker(BrokerDetails broker)
260     {
261         _methods[_currentMethod].setBroker(broker);
262     }
263 
264     public void addMethod(FailoverMethod method)
265     {
266         int len = _methods.length + 1;
267         FailoverMethod[] newMethods = new FailoverMethod[len];
268         System.arraycopy(_methods, 0, newMethods, 0, _methods.length);
269         int index = len - 1;
270         newMethods[index= method;
271         _methods = newMethods;
272     }
273 
274     public void setMethodRetries(int retries)
275     {
276         _methodsRetries = retries;
277     }
278 
279     public FailoverMethod getCurrentMethod()
280     {
281         if ((_currentMethod >= 0&& (_currentMethod < (_methods.length)))
282         {
283             return _methods[_currentMethod];
284         }
285         else
286         {
287             return null;
288         }
289     }
290 
291     public String toString()
292     {
293         StringBuffer sb = new StringBuffer();
294 
295         sb.append("Failover Policy:\n");
296 
297         if (failoverAllowed())
298         {
299             sb.append("Failover allowed\n");
300         }
301         else
302         {
303             sb.append("Failover not allowed\n");
304         }
305 
306         sb.append("Failover policy methods\n");
307         for (int i = 0; i < _methods.length; i++)
308         {
309 
310             if (i == _currentMethod)
311             {
312                 sb.append(">");
313             }
314 
315             sb.append(_methods[i].toString());
316         }
317 
318         return sb.toString();
319     }
320 }