001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.jms;
022
023 import org.apache.qpid.client.AMQConnection;
024 import org.apache.qpid.jms.failover.FailoverExchangeMethod;
025 import org.apache.qpid.jms.failover.FailoverMethod;
026 import org.apache.qpid.jms.failover.FailoverRoundRobinServers;
027 import org.apache.qpid.jms.failover.FailoverSingleServer;
028
029 import org.slf4j.Logger;
030 import org.slf4j.LoggerFactory;
031
032 public class FailoverPolicy
033 {
034 private static final Logger _logger = LoggerFactory.getLogger(FailoverPolicy.class);
035
036 private static final long MINUTE = 60000L;
037
038 private static final long DEFAULT_METHOD_TIMEOUT = 1 * MINUTE;
039
040 private FailoverMethod[] _methods = new FailoverMethod[1];
041
042 private int _currentMethod;
043
044 private int _methodsRetries;
045
046 private int _currentRetry;
047
048 private boolean _timing;
049
050 private long _lastMethodTime;
051 private long _lastFailTime;
052
053 public FailoverPolicy(ConnectionURL connectionDetails, AMQConnection conn)
054 {
055 FailoverMethod method;
056
057 // todo This should be integrated in to the connection url when it supports
058 // multiple strategies.
059
060 _methodsRetries = 0;
061
062 if (connectionDetails.getFailoverMethod() == null)
063 {
064 if (connectionDetails.getBrokerCount() > 1)
065 {
066 method = new FailoverRoundRobinServers(connectionDetails);
067 }
068 else
069 {
070 method = new FailoverSingleServer(connectionDetails);
071 }
072 }
073 else
074 {
075 String failoverMethod = connectionDetails.getFailoverMethod();
076
077 /*
078 if (failoverMethod.equals(FailoverMethod.RANDOM))
079 {
080 //todo write a random connection Failover
081 }
082 */
083 if (failoverMethod.equals(FailoverMethod.SINGLE_BROKER))
084 {
085 method = new FailoverRoundRobinServers(connectionDetails);
086 }
087 else
088 {
089 if (failoverMethod.equals(FailoverMethod.ROUND_ROBIN))
090 {
091 method = new FailoverRoundRobinServers(connectionDetails);
092 }
093 else if (failoverMethod.equals(FailoverMethod.FAILOVER_EXCHANGE))
094 {
095 method = new FailoverExchangeMethod(connectionDetails, conn);
096 }
097 else
098 {
099 try
100 {
101 Class[] constructorSpec = { ConnectionURL.class };
102 Object[] params = { connectionDetails };
103
104 method =
105 (FailoverMethod) ClassLoader.getSystemClassLoader().loadClass(failoverMethod)
106 .getConstructor(constructorSpec).newInstance(params);
107 }
108 catch (Exception cnfe)
109 {
110 throw new IllegalArgumentException("Unknown failover method:" + failoverMethod, cnfe);
111 }
112 }
113 }
114 }
115
116 if (method == null)
117 {
118 throw new IllegalArgumentException("Unknown failover method specified.");
119 }
120
121 reset();
122
123 _methods[_currentMethod] = method;
124 }
125
126 public FailoverPolicy(FailoverMethod method)
127 {
128 this(method, 0);
129 }
130
131 public FailoverPolicy(FailoverMethod method, int retries)
132 {
133 _methodsRetries = retries;
134
135 reset();
136
137 _methods[_currentMethod] = method;
138 }
139
140 private void reset()
141 {
142 _currentMethod = 0;
143 _currentRetry = 0;
144 _timing = false;
145
146 }
147
148 public boolean failoverAllowed()
149 {
150 boolean failoverAllowed;
151
152 if (_timing)
153 {
154 long now = System.currentTimeMillis();
155
156 if ((now - _lastMethodTime) >= DEFAULT_METHOD_TIMEOUT)
157 {
158 _logger.info("Failover method timeout");
159 _lastMethodTime = now;
160
161 if (!nextMethod())
162 {
163 return false;
164 }
165
166 }
167 else
168 {
169 _lastMethodTime = now;
170 }
171 }
172 else
173 {
174 _timing = true;
175 _lastMethodTime = System.currentTimeMillis();
176 _lastFailTime = _lastMethodTime;
177 }
178
179 if (_methods[_currentMethod].failoverAllowed())
180 {
181 failoverAllowed = true;
182 }
183 else
184 {
185 if (_currentMethod < (_methods.length - 1))
186 {
187 nextMethod();
188 _logger.info("Changing method to " + _methods[_currentMethod].methodName());
189
190 return failoverAllowed();
191 }
192 else
193 {
194 return cycleMethods();
195 }
196 }
197
198 return failoverAllowed;
199 }
200
201 private boolean nextMethod()
202 {
203 if (_currentMethod < (_methods.length - 1))
204 {
205 _currentMethod++;
206 _methods[_currentMethod].reset();
207
208 return true;
209 }
210 else
211 {
212 return cycleMethods();
213 }
214 }
215
216 private boolean cycleMethods()
217 {
218 if (_currentRetry < _methodsRetries)
219 {
220 _currentRetry++;
221
222 _currentMethod = 0;
223
224 _logger.info("Retrying methods starting with " + _methods[_currentMethod].methodName());
225 _methods[_currentMethod].reset();
226
227 return failoverAllowed();
228 }
229 else
230 {
231 _logger.debug("All failover methods exhausted");
232
233 return false;
234 }
235 }
236
237 /**
238 * Notification that connection was successful.
239 */
240 public void attainedConnection()
241 {
242 _currentRetry = 0;
243
244 _methods[_currentMethod].attainedConnection();
245
246 _timing = false;
247 }
248
249 public BrokerDetails getCurrentBrokerDetails()
250 {
251 return _methods[_currentMethod].getCurrentBrokerDetails();
252 }
253
254 public BrokerDetails getNextBrokerDetails()
255 {
256 return _methods[_currentMethod].getNextBrokerDetails();
257 }
258
259 public void setBroker(BrokerDetails broker)
260 {
261 _methods[_currentMethod].setBroker(broker);
262 }
263
264 public void addMethod(FailoverMethod method)
265 {
266 int len = _methods.length + 1;
267 FailoverMethod[] newMethods = new FailoverMethod[len];
268 System.arraycopy(_methods, 0, newMethods, 0, _methods.length);
269 int index = len - 1;
270 newMethods[index] = method;
271 _methods = newMethods;
272 }
273
274 public void setMethodRetries(int retries)
275 {
276 _methodsRetries = retries;
277 }
278
279 public FailoverMethod getCurrentMethod()
280 {
281 if ((_currentMethod >= 0) && (_currentMethod < (_methods.length)))
282 {
283 return _methods[_currentMethod];
284 }
285 else
286 {
287 return null;
288 }
289 }
290
291 public String toString()
292 {
293 StringBuffer sb = new StringBuffer();
294
295 sb.append("Failover Policy:\n");
296
297 if (failoverAllowed())
298 {
299 sb.append("Failover allowed\n");
300 }
301 else
302 {
303 sb.append("Failover not allowed\n");
304 }
305
306 sb.append("Failover policy methods\n");
307 for (int i = 0; i < _methods.length; i++)
308 {
309
310 if (i == _currentMethod)
311 {
312 sb.append(">");
313 }
314
315 sb.append(_methods[i].toString());
316 }
317
318 return sb.toString();
319 }
320 }
|