FailoverExchangeMethod.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.jms.failover;
022 
023 import java.util.ArrayList;
024 import java.util.List;
025 
026 import javax.jms.JMSException;
027 import javax.jms.Message;
028 import javax.jms.MessageConsumer;
029 import javax.jms.MessageListener;
030 import javax.jms.Session;
031 
032 import org.apache.qpid.client.AMQAnyDestination;
033 import org.apache.qpid.client.AMQBrokerDetails;
034 import org.apache.qpid.client.AMQConnection;
035 import org.apache.qpid.framing.AMQShortString;
036 import org.apache.qpid.jms.BrokerDetails;
037 import org.apache.qpid.jms.ConnectionURL;
038 import org.slf4j.Logger;
039 import org.slf4j.LoggerFactory;
040 
041 /**
042  * When using the Failover exchange a single broker is supplied in the URL.
043  * The connection will then connect to the cluster using the above broker details.
044  * Once connected, the membership details of the cluster will be obtained via 
045  * subscribing to a queue bound to the failover exchange.
046  
047  * The failover exchange will provide a list of broker URLs in the format "transport:ip:port"
048  * Out of this list we only select brokers that match the transport of the original 
049  * broker supplied in the connection URL.
050  
051  * Also properties defined for the original broker will be applied to all the brokers selected
052  * from the list.   
053  */
054 
055 public class FailoverExchangeMethod extends FailoverRoundRobinServers implements FailoverMethod, MessageListener
056 {
057     private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class);
058    
059     /** This is not safe to use until attainConnection is called */
060     private AMQConnection _conn;
061     
062     /** Protects the broker list when modifications happens */
063     private Object _brokerListLock = new Object();
064     
065     /** The session used to subscribe to failover exchange */
066     private Session _ssn;
067     
068     private BrokerDetails _orginalBrokerDetail;
069     
070     public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn)
071     {
072         super(connectionDetails);        
073         _orginalBrokerDetail = _connectionDetails.getBrokerDetails(0);
074         
075         // This is not safe to use until attainConnection is called, as this ref will not initialized fully.
076         // The reason being this constructor is called inside the AMWConnection constructor.
077         // It would be best if we find a way to pass this ref after AMQConnection is fully initialized.
078         _conn = conn; 
079     }
080 
081     private void subscribeForUpdates() throws JMSException
082     {
083         if (_ssn == null)
084         {
085             _ssn = _conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
086             MessageConsumer cons = _ssn.createConsumer(
087                                         new AMQAnyDestination(new AMQShortString("amq.failover"),
088                                                               new AMQShortString("amq.failover"),
089                                                               new AMQShortString(""),
090                                                               true,true,null,false,
091                                                               new AMQShortString[0]))
092             cons.setMessageListener(this);
093         }                               
094     }
095     
096     public void onMessage(Message m)
097     {
098         _logger.info("Failover exchange notified cluster membership change");
099         List<BrokerDetails> brokerList = new ArrayList<BrokerDetails>();
100         try
101         {            
102             List<String> list = (List<String>)m.getObjectProperty("amq.failover");
103             for (String brokerEntry:list)
104             {                
105                 String[] urls = brokerEntry.substring(5.split(",");
106                 // Iterate until you find the correct transport
107                 // Need to reconsider the logic when the C++ broker supports
108                 // SSL URLs.
109                 for (String url:urls)
110                 {
111                     String[] tokens = url.split(":");
112                     if (tokens[0].equalsIgnoreCase(_orginalBrokerDetail.getTransport()))
113                     {
114                         BrokerDetails broker = new AMQBrokerDetails();
115                         broker.setTransport(tokens[0]);
116                         broker.setHost(tokens[1]);
117                         broker.setPort(Integer.parseInt(tokens[2]));
118                         broker.setProperties(_orginalBrokerDetail.getProperties());
119                         broker.setSSLConfiguration(_orginalBrokerDetail.getSSLConfiguration());
120                         brokerList.add(broker);
121                         break;
122                     }
123                 }                
124             }
125         }
126         catch(JMSException e)
127         {
128             _logger.error("Error parsing the message sent by failover exchange",e);
129         }
130         
131         synchronized (_brokerListLock)
132         {
133             _connectionDetails.setBrokerDetails(brokerList);
134         }
135     }
136     
137     public void attainedConnection()
138     {
139         super.attainedConnection();
140         try
141         {
142             subscribeForUpdates();
143         }
144         catch (JMSException e)
145         {
146             throw new RuntimeException("Unable to subscribe for cluster membership updates",e);
147         }
148     }
149 
150     public BrokerDetails getCurrentBrokerDetails()
151     {
152         synchronized (_brokerListLock)
153         {
154             return super.getCurrentBrokerDetails();
155         }
156     }
157 
158     public BrokerDetails getNextBrokerDetails()
159     {
160         synchronized(_brokerListLock)
161         {
162             return super.getNextBrokerDetails();
163         }
164     }
165 
166     public String methodName()
167     {
168         return "Failover Exchange";
169     }
170 
171     public String toString()
172     {
173         StringBuffer sb = new StringBuffer();
174         sb.append("FailoverExchange:\n");
175         sb.append(super.toString());
176         return sb.toString();
177     }
178 }