001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.jms.failover;
022
023 import org.apache.qpid.jms.BrokerDetails;
024 import org.apache.qpid.jms.ConnectionURL;
025 import org.slf4j.Logger;
026 import org.slf4j.LoggerFactory;
027
028 public class FailoverRoundRobinServers implements FailoverMethod
029 {
030 private static final Logger _logger = LoggerFactory.getLogger(FailoverRoundRobinServers.class);
031
032 /** The default number of times to cycle through all servers */
033 public static final int DEFAULT_CYCLE_RETRIES = 0;
034 /** The default number of times to retry each server */
035 public static final int DEFAULT_SERVER_RETRIES = 0;
036
037 /** The index into the hostDetails array of the broker to which we are connected */
038 private int _currentBrokerIndex = -1;
039
040 /** The number of times to retry connecting for each server */
041 private int _serverRetries;
042
043 /** The current number of retry attempts made */
044 private int _currentServerRetry;
045
046 /** The number of times to cycle through the servers */
047 private int _cycleRetries;
048
049 /** The current number of cycles performed. */
050 private int _currentCycleRetries;
051
052 /** Array of BrokerDetail used to make connections. */
053 protected ConnectionURL _connectionDetails;
054
055 public FailoverRoundRobinServers(ConnectionURL connectionDetails)
056 {
057 if (!(connectionDetails.getBrokerCount() > 0))
058 {
059 throw new IllegalArgumentException("At least one broker details must be specified.");
060 }
061
062 _connectionDetails = connectionDetails;
063
064 // There is no current broker at startup so set it to -1.
065 _currentBrokerIndex = -1;
066
067 String cycleRetries = _connectionDetails.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE);
068
069 if (cycleRetries != null)
070 {
071 try
072 {
073 _cycleRetries = Integer.parseInt(cycleRetries);
074 }
075 catch (NumberFormatException nfe)
076 {
077 _cycleRetries = DEFAULT_CYCLE_RETRIES;
078 }
079 }
080
081 _currentCycleRetries = 0;
082
083 _serverRetries = 0;
084 _currentServerRetry = -1;
085 }
086
087 public void reset()
088 {
089 _currentBrokerIndex = 0;
090 _currentCycleRetries = 0;
091 _currentServerRetry = -1;
092 }
093
094 public boolean failoverAllowed()
095 {
096 return ((_currentCycleRetries < _cycleRetries) || (_currentServerRetry < _serverRetries)
097 || (_currentBrokerIndex < (_connectionDetails.getBrokerCount() - 1)));
098 }
099
100 public void attainedConnection()
101 {
102 _currentCycleRetries = 0;
103 _currentServerRetry = -1;
104 }
105
106 public BrokerDetails getCurrentBrokerDetails()
107 {
108 if (_currentBrokerIndex == -1)
109 {
110 return null;
111 }
112
113 return _connectionDetails.getBrokerDetails(_currentBrokerIndex);
114 }
115
116 public BrokerDetails getNextBrokerDetails()
117 {
118 boolean doDelay = false;
119
120 if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1))
121 {
122 if (_currentServerRetry < _serverRetries)
123 {
124 if (_currentBrokerIndex == -1)
125 {
126 _currentBrokerIndex = 0;
127
128 setBroker(_connectionDetails.getBrokerDetails(_currentBrokerIndex));
129
130 _logger.info("First run using " + _connectionDetails.getBrokerDetails(_currentBrokerIndex));
131 }
132 else
133 {
134 _logger.info("Retrying " + _connectionDetails.getBrokerDetails(_currentBrokerIndex));
135 doDelay=true;
136 }
137
138 _currentServerRetry++;
139 }
140 else
141 {
142 _currentCycleRetries++;
143 // failed to connect to first broker
144 _currentBrokerIndex = 0;
145
146 setBroker(_connectionDetails.getBrokerDetails(_currentBrokerIndex));
147
148 // This is zero rather than -1 as we are already retrieving the details.
149 _currentServerRetry = 0;
150 }
151 // else - should force client to stop as max retries has been reached.
152 }
153 else
154 {
155 if (_currentServerRetry < _serverRetries)
156 {
157 if (_currentBrokerIndex == -1)
158 {
159 _currentBrokerIndex = 0;
160
161 setBroker(_connectionDetails.getBrokerDetails(_currentBrokerIndex));
162
163 _logger.info("First run using " + _connectionDetails.getBrokerDetails(_currentBrokerIndex));
164 }
165 else
166 {
167 _logger.info("Retrying " + _connectionDetails.getBrokerDetails(_currentBrokerIndex));
168 doDelay=true;
169 }
170
171 _currentServerRetry++;
172 }
173 else
174 {
175 _currentBrokerIndex++;
176
177 setBroker(_connectionDetails.getBrokerDetails(_currentBrokerIndex));
178 // This is zero rather than -1 as we are already retrieving the details.
179 _currentServerRetry = 0;
180 }
181 }
182
183 BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex);
184
185 String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);
186 if (delayStr != null && doDelay)
187 {
188 Long delay = Long.parseLong(delayStr);
189 _logger.info("Delay between connect retries:" + delay);
190 try
191 {
192 Thread.sleep(delay);
193 }
194 catch (InterruptedException ie)
195 {
196 return null;
197 }
198 }
199 else
200 {
201 _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable.");
202 }
203
204 return broker;
205 }
206
207 public void setBroker(BrokerDetails broker)
208 {
209
210 _connectionDetails.addBrokerDetails(broker);
211
212 int index = _connectionDetails.getAllBrokerDetails().indexOf(broker);
213
214 String serverRetries = broker.getProperty(BrokerDetails.OPTIONS_RETRY);
215
216 if (serverRetries != null)
217 {
218 try
219 {
220 _serverRetries = Integer.parseInt(serverRetries);
221 }
222 catch (NumberFormatException nfe)
223 {
224 _serverRetries = DEFAULT_SERVER_RETRIES;
225 }
226 }
227
228 _currentServerRetry = -1;
229 _currentBrokerIndex = index;
230 }
231
232 public void setRetries(int maxRetries)
233 {
234 _cycleRetries = maxRetries;
235 }
236
237 public String methodName()
238 {
239 return "Cycle Servers";
240 }
241
242 public String toString()
243 {
244 StringBuffer sb = new StringBuffer();
245
246 sb.append("Cycle Servers:\n");
247
248 sb.append("Cycle Retries:");
249 sb.append(_cycleRetries);
250 sb.append("\nCurrent Cycle:");
251 sb.append(_currentCycleRetries);
252 sb.append("\nServer Retries:");
253 sb.append(_serverRetries);
254 sb.append("\nCurrent Retry:");
255 sb.append(_currentServerRetry);
256 sb.append("\nCurrent Broker:");
257 sb.append(_currentBrokerIndex);
258 sb.append("\n");
259
260 for (int i = 0; i < _connectionDetails.getBrokerCount(); i++)
261 {
262 if (i == _currentBrokerIndex)
263 {
264 sb.append(">");
265 }
266
267 sb.append(_connectionDetails.getBrokerDetails(i));
268 sb.append("\n");
269 }
270
271 return sb.toString();
272 }
273 }
|