001 package org.apache.qpid.client;
002 /*
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied. See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 *
021 */
022
023
024 import java.io.IOException;
025 import java.util.ArrayList;
026 import java.util.List;
027
028 import javax.jms.ExceptionListener;
029 import javax.jms.JMSException;
030 import javax.jms.XASession;
031
032 import org.apache.qpid.AMQException;
033 import org.apache.qpid.client.configuration.ClientProperties;
034 import org.apache.qpid.client.failover.FailoverException;
035 import org.apache.qpid.client.failover.FailoverProtectedOperation;
036 import org.apache.qpid.framing.ProtocolVersion;
037 import org.apache.qpid.jms.BrokerDetails;
038 import org.apache.qpid.jms.Session;
039 import org.apache.qpid.protocol.AMQConstant;
040 import org.apache.qpid.transport.Connection;
041 import org.apache.qpid.transport.ConnectionClose;
042 import org.apache.qpid.transport.ConnectionException;
043 import org.apache.qpid.transport.ConnectionListener;
044 import org.apache.qpid.transport.ProtocolVersionException;
045 import org.apache.qpid.transport.TransportException;
046 import org.slf4j.Logger;
047 import org.slf4j.LoggerFactory;
048
049 public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener
050 {
051 /**
052 * This class logger.
053 */
054 private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class);
055
056 /**
057 * The AMQ Connection.
058 */
059 private AMQConnection _conn;
060
061 /**
062 * The QpidConeection instance that is mapped with thie JMS connection.
063 */
064 org.apache.qpid.transport.Connection _qpidConnection;
065 private ConnectionException exception = null;
066
067 //--- constructor
068 public AMQConnectionDelegate_0_10(AMQConnection conn)
069 {
070 _conn = conn;
071 _qpidConnection = new Connection();
072 _qpidConnection.setConnectionListener(this);
073 }
074
075 /**
076 * create a Session and start it if required.
077 */
078 public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow)
079 throws JMSException
080 {
081 _conn.checkNotClosed();
082 int channelId = _conn._idFactory.incrementAndGet();
083 AMQSession session;
084 try
085 {
086 session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh,
087 prefetchLow);
088 _conn.registerSession(channelId, session);
089 if (_conn._started)
090 {
091 session.start();
092 }
093 }
094 catch (Exception e)
095 {
096 _logger.error("exception creating session:", e);
097 throw new JMSAMQException("cannot create session", e);
098 }
099 return session;
100 }
101
102 /**
103 * create an XA Session and start it if required.
104 */
105 public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
106 {
107 _conn.checkNotClosed();
108 int channelId = _conn._idFactory.incrementAndGet();
109 XASessionImpl session;
110 try
111 {
112 session = new XASessionImpl(_qpidConnection, _conn, channelId, prefetchHigh, prefetchLow);
113 _conn.registerSession(channelId, session);
114 if (_conn._started)
115 {
116 session.start();
117 }
118 }
119 catch (Exception e)
120 {
121 throw new JMSAMQException("cannot create session", e);
122 }
123 return session;
124 }
125
126
127 /**
128 * Make a connection with the broker
129 *
130 * @param brokerDetail The detail of the broker to connect to.
131 * @throws IOException
132 * @throws AMQException
133 */
134 public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
135 {
136 try
137 {
138 if (_logger.isDebugEnabled())
139 {
140 _logger.debug("connecting to host: " + brokerDetail.getHost() +
141 " port: " + brokerDetail.getPort() +
142 " vhost: " + _conn.getVirtualHost() +
143 " username: " + _conn.getUsername() +
144 " password: " + _conn.getPassword());
145 }
146
147 if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
148 {
149 this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT)));
150 }
151 else
152 {
153 // use the default value set for all connections
154 this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,0));
155 }
156
157 String saslMechs = brokerDetail.getProperty("sasl_mechs")!= null?
158 brokerDetail.getProperty("sasl_mechs"):
159 System.getProperty("qpid.sasl_mechs","PLAIN");
160
161 _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
162 _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL(),saslMechs);
163 _conn._connected = true;
164 _conn._failoverPolicy.attainedConnection();
165 }
166 catch(ProtocolVersionException pe)
167 {
168 return new ProtocolVersion(pe.getMajor(), pe.getMinor());
169 }
170 catch (ConnectionException e)
171 {
172 throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
173 }
174
175 return null;
176 }
177
178 public void failoverPrep()
179 {
180 List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
181 for (AMQSession s : sessions)
182 {
183 s.failoverPrep();
184 }
185 }
186
187 public void resubscribeSessions() throws JMSException, AMQException, FailoverException
188 {
189 List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
190 _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size()));
191 for (AMQSession s : sessions)
192 {
193 ((AMQSession_0_10) s)._qpidConnection = _qpidConnection;
194 s.resubscribe();
195 }
196 }
197
198
199 public void closeConnection(long timeout) throws JMSException, AMQException
200 {
201 try
202 {
203 _qpidConnection.close();
204 }
205 catch (TransportException e)
206 {
207 throw new AMQException(e.getMessage(), e);
208 }
209 }
210
211 public void opened(Connection conn) {}
212
213 public void exception(Connection conn, ConnectionException exc)
214 {
215 if (exception != null)
216 {
217 _logger.error("previous exception", exception);
218 }
219
220 exception = exc;
221 }
222
223 public void closed(Connection conn)
224 {
225 ConnectionException exc = exception;
226 exception = null;
227
228 if (exc == null)
229 {
230 return;
231 }
232
233 ConnectionClose close = exc.getClose();
234 if (close == null)
235 {
236 try
237 {
238 if (_conn.firePreFailover(false) && _conn.attemptReconnection())
239 {
240 _conn.failoverPrep();
241 _qpidConnection.resume();
242
243 if (_conn.firePreResubscribe())
244 {
245 _conn.resubscribeSessions();
246 }
247
248 _conn.fireFailoverComplete();
249 return;
250 }
251 }
252 catch (Exception e)
253 {
254 _logger.error("error during failover", e);
255 }
256 }
257
258 ExceptionListener listener = _conn._exceptionListener;
259 if (listener == null)
260 {
261 _logger.error("connection exception: " + conn, exc);
262 }
263 else
264 {
265 String code = null;
266 if (close != null)
267 {
268 code = close.getReplyCode().toString();
269 }
270
271 JMSException ex = new JMSException(exc.getMessage(), code);
272 ex.initCause(exc);
273 listener.onException(ex);
274 }
275 }
276
277 public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
278 {
279 try
280 {
281 return operation.execute();
282 }
283 catch (FailoverException e)
284 {
285 throw new RuntimeException(e);
286 }
287 }
288
289 public void setIdleTimeout(long l)
290 {
291 _qpidConnection.setIdleTimeout(l);
292 }
293 }
|