001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.jms.failover;
022
023 import java.util.ArrayList;
024 import java.util.List;
025
026 import javax.jms.JMSException;
027 import javax.jms.Message;
028 import javax.jms.MessageConsumer;
029 import javax.jms.MessageListener;
030 import javax.jms.Session;
031
032 import org.apache.qpid.client.AMQAnyDestination;
033 import org.apache.qpid.client.AMQBrokerDetails;
034 import org.apache.qpid.client.AMQConnection;
035 import org.apache.qpid.framing.AMQShortString;
036 import org.apache.qpid.jms.BrokerDetails;
037 import org.apache.qpid.jms.ConnectionURL;
038 import org.slf4j.Logger;
039 import org.slf4j.LoggerFactory;
040
041 /**
042 * When using the Failover exchange a single broker is supplied in the URL.
043 * The connection will then connect to the cluster using the above broker details.
044 * Once connected, the membership details of the cluster will be obtained via
045 * subscribing to a queue bound to the failover exchange.
046 *
047 * The failover exchange will provide a list of broker URLs in the format "transport:ip:port"
048 * Out of this list we only select brokers that match the transport of the original
049 * broker supplied in the connection URL.
050 *
051 * Also properties defined for the original broker will be applied to all the brokers selected
052 * from the list.
053 */
054
055 public class FailoverExchangeMethod extends FailoverRoundRobinServers implements FailoverMethod, MessageListener
056 {
057 private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class);
058
059 /** This is not safe to use until attainConnection is called */
060 private AMQConnection _conn;
061
062 /** Protects the broker list when modifications happens */
063 private Object _brokerListLock = new Object();
064
065 /** The session used to subscribe to failover exchange */
066 private Session _ssn;
067
068 private BrokerDetails _orginalBrokerDetail;
069
070 public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn)
071 {
072 super(connectionDetails);
073 _orginalBrokerDetail = _connectionDetails.getBrokerDetails(0);
074
075 // This is not safe to use until attainConnection is called, as this ref will not initialized fully.
076 // The reason being this constructor is called inside the AMWConnection constructor.
077 // It would be best if we find a way to pass this ref after AMQConnection is fully initialized.
078 _conn = conn;
079 }
080
081 private void subscribeForUpdates() throws JMSException
082 {
083 if (_ssn == null)
084 {
085 _ssn = _conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
086 MessageConsumer cons = _ssn.createConsumer(
087 new AMQAnyDestination(new AMQShortString("amq.failover"),
088 new AMQShortString("amq.failover"),
089 new AMQShortString(""),
090 true,true,null,false,
091 new AMQShortString[0]));
092 cons.setMessageListener(this);
093 }
094 }
095
096 public void onMessage(Message m)
097 {
098 _logger.info("Failover exchange notified cluster membership change");
099 List<BrokerDetails> brokerList = new ArrayList<BrokerDetails>();
100 try
101 {
102 List<String> list = (List<String>)m.getObjectProperty("amq.failover");
103 for (String brokerEntry:list)
104 {
105 String[] urls = brokerEntry.substring(5) .split(",");
106 // Iterate until you find the correct transport
107 // Need to reconsider the logic when the C++ broker supports
108 // SSL URLs.
109 for (String url:urls)
110 {
111 String[] tokens = url.split(":");
112 if (tokens[0].equalsIgnoreCase(_orginalBrokerDetail.getTransport()))
113 {
114 BrokerDetails broker = new AMQBrokerDetails();
115 broker.setTransport(tokens[0]);
116 broker.setHost(tokens[1]);
117 broker.setPort(Integer.parseInt(tokens[2]));
118 broker.setProperties(_orginalBrokerDetail.getProperties());
119 broker.setSSLConfiguration(_orginalBrokerDetail.getSSLConfiguration());
120 brokerList.add(broker);
121 break;
122 }
123 }
124 }
125 }
126 catch(JMSException e)
127 {
128 _logger.error("Error parsing the message sent by failover exchange",e);
129 }
130
131 synchronized (_brokerListLock)
132 {
133 _connectionDetails.setBrokerDetails(brokerList);
134 }
135 }
136
137 public void attainedConnection()
138 {
139 super.attainedConnection();
140 try
141 {
142 subscribeForUpdates();
143 }
144 catch (JMSException e)
145 {
146 throw new RuntimeException("Unable to subscribe for cluster membership updates",e);
147 }
148 }
149
150 public BrokerDetails getCurrentBrokerDetails()
151 {
152 synchronized (_brokerListLock)
153 {
154 return super.getCurrentBrokerDetails();
155 }
156 }
157
158 public BrokerDetails getNextBrokerDetails()
159 {
160 synchronized(_brokerListLock)
161 {
162 return super.getNextBrokerDetails();
163 }
164 }
165
166 public String methodName()
167 {
168 return "Failover Exchange";
169 }
170
171 public String toString()
172 {
173 StringBuffer sb = new StringBuffer();
174 sb.append("FailoverExchange:\n");
175 sb.append(super.toString());
176 return sb.toString();
177 }
178 }
|