AMQConnectionDelegate_0_10.java
001 package org.apache.qpid.client;
002 /*
003  
004  * Licensed to the Apache Software Foundation (ASF) under one
005  * or more contributor license agreements.  See the NOTICE file
006  * distributed with this work for additional information
007  * regarding copyright ownership.  The ASF licenses this file
008  * to you under the Apache License, Version 2.0 (the
009  * "License"); you may not use this file except in compliance
010  * with the License.  You may obtain a copy of the License at
011  
012  *   http://www.apache.org/licenses/LICENSE-2.0
013  
014  * Unless required by applicable law or agreed to in writing,
015  * software distributed under the License is distributed on an
016  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017  * KIND, either express or implied.  See the License for the
018  * specific language governing permissions and limitations
019  * under the License.
020  
021  */
022 
023 
024 import java.io.IOException;
025 import java.util.ArrayList;
026 import java.util.List;
027 
028 import javax.jms.ExceptionListener;
029 import javax.jms.JMSException;
030 import javax.jms.XASession;
031 
032 import org.apache.qpid.AMQException;
033 import org.apache.qpid.client.configuration.ClientProperties;
034 import org.apache.qpid.client.failover.FailoverException;
035 import org.apache.qpid.client.failover.FailoverProtectedOperation;
036 import org.apache.qpid.framing.ProtocolVersion;
037 import org.apache.qpid.jms.BrokerDetails;
038 import org.apache.qpid.jms.Session;
039 import org.apache.qpid.protocol.AMQConstant;
040 import org.apache.qpid.transport.Connection;
041 import org.apache.qpid.transport.ConnectionClose;
042 import org.apache.qpid.transport.ConnectionException;
043 import org.apache.qpid.transport.ConnectionListener;
044 import org.apache.qpid.transport.ProtocolVersionException;
045 import org.apache.qpid.transport.TransportException;
046 import org.slf4j.Logger;
047 import org.slf4j.LoggerFactory;
048 
049 public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener
050 {
051     /**
052      * This class logger.
053      */
054     private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class);
055 
056     /**
057      * The AMQ Connection.
058      */
059     private AMQConnection _conn;
060 
061     /**
062      * The QpidConeection instance that is mapped with thie JMS connection.
063      */
064     org.apache.qpid.transport.Connection _qpidConnection;
065     private ConnectionException exception = null;
066 
067     //--- constructor
068     public AMQConnectionDelegate_0_10(AMQConnection conn)
069     {
070         _conn = conn;
071         _qpidConnection = new Connection();
072         _qpidConnection.setConnectionListener(this);
073     }
074 
075     /**
076      * create a Session and start it if required.
077      */
078     public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow)
079             throws JMSException
080     {
081         _conn.checkNotClosed();
082         int channelId = _conn._idFactory.incrementAndGet();
083         AMQSession session;
084         try
085         {
086             session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh,
087                                           prefetchLow);
088             _conn.registerSession(channelId, session);
089             if (_conn._started)
090             {
091                 session.start();
092             }
093         }
094         catch (Exception e)
095         {
096             _logger.error("exception creating session:", e);
097             throw new JMSAMQException("cannot create session", e);
098         }
099         return session;
100     }
101 
102     /**
103      * create an XA Session and start it if required.
104      */
105     public XASession createXASession(int prefetchHigh, int prefetchLowthrows JMSException
106     {
107         _conn.checkNotClosed();
108         int channelId = _conn._idFactory.incrementAndGet();
109         XASessionImpl session;
110         try
111         {
112             session = new XASessionImpl(_qpidConnection, _conn, channelId, prefetchHigh, prefetchLow);
113             _conn.registerSession(channelId, session);
114             if (_conn._started)
115             {
116                 session.start();
117             }
118         }
119         catch (Exception e)
120         {
121             throw new JMSAMQException("cannot create session", e);
122         }
123         return session;
124     }
125 
126 
127     /**
128      * Make a connection with the broker
129      *
130      @param brokerDetail The detail of the broker to connect to.
131      @throws IOException
132      @throws AMQException
133      */
134     public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetailthrows IOException, AMQException
135     {
136         try
137         {
138             if (_logger.isDebugEnabled())
139             {
140                 _logger.debug("connecting to host: " + brokerDetail.getHost() +
141                               " port: " + brokerDetail.getPort() +
142                               " vhost: " + _conn.getVirtualHost() +
143                               " username: " + _conn.getUsername() +
144                               " password: " + _conn.getPassword());
145             }
146             
147             if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT!= null)
148             {
149                 this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT)));
150             }
151             else
152             {
153                 // use the default value set for all connections
154                 this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,0));
155             }
156             
157             String saslMechs = brokerDetail.getProperty("sasl_mechs")!= null?
158                                brokerDetail.getProperty("sasl_mechs"):
159                                System.getProperty("qpid.sasl_mechs","PLAIN");
160             
161             _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
162                                     _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL(),saslMechs);
163             _conn._connected = true;
164             _conn._failoverPolicy.attainedConnection();
165         }
166         catch(ProtocolVersionException pe)
167         {
168             return new ProtocolVersion(pe.getMajor(), pe.getMinor());
169         }
170         catch (ConnectionException e)
171         {            
172             throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
173         }
174 
175         return null;
176     }
177 
178     public void failoverPrep()
179     {
180         List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
181         for (AMQSession s : sessions)
182         {
183             s.failoverPrep();
184         }
185     }
186 
187     public void resubscribeSessions() throws JMSException, AMQException, FailoverException
188     {
189         List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
190         _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size()));
191         for (AMQSession s : sessions)
192         {
193             ((AMQSession_0_10s)._qpidConnection = _qpidConnection;
194             s.resubscribe();
195         }
196     }
197 
198 
199     public void closeConnection(long timeoutthrows JMSException, AMQException
200     {
201         try
202         {
203             _qpidConnection.close();
204         }
205         catch (TransportException e)
206         {
207             throw new AMQException(e.getMessage(), e);
208         }
209     }
210 
211     public void opened(Connection conn) {}
212 
213     public void exception(Connection conn, ConnectionException exc)
214     {
215         if (exception != null)
216         {
217             _logger.error("previous exception", exception);
218         }
219 
220         exception = exc;
221     }
222 
223     public void closed(Connection conn)
224     {
225         ConnectionException exc = exception;
226         exception = null;
227 
228         if (exc == null)
229         {
230             return;
231         }
232 
233         ConnectionClose close = exc.getClose();
234         if (close == null)
235         {
236             try
237             {
238                 if (_conn.firePreFailover(false&& _conn.attemptReconnection())
239                 {
240                     _conn.failoverPrep();
241                     _qpidConnection.resume();
242 
243                     if (_conn.firePreResubscribe())
244                     {
245                         _conn.resubscribeSessions();
246                     }
247 
248                     _conn.fireFailoverComplete();
249                     return;
250                 }
251             }
252             catch (Exception e)
253             {
254                 _logger.error("error during failover", e);
255             }
256         }
257 
258         ExceptionListener listener = _conn._exceptionListener;
259         if (listener == null)
260         {
261             _logger.error("connection exception: " + conn, exc);
262         }
263         else
264         {
265             String code = null;
266             if (close != null)
267             {
268                 code = close.getReplyCode().toString();
269             }
270 
271             JMSException ex = new JMSException(exc.getMessage(), code);
272             ex.initCause(exc);
273             listener.onException(ex);
274         }
275     }
276 
277     public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operationthrows E
278     {
279         try
280         {
281             return operation.execute();
282         }
283         catch (FailoverException e)
284         {
285             throw new RuntimeException(e);
286         }
287     }
288 
289     public void setIdleTimeout(long l)
290     {
291         _qpidConnection.setIdleTimeout(l);
292     }
293 }