0001 /*
0002 *
0003 * Licensed to the Apache Software Foundation (ASF) under one
0004 * or more contributor license agreements. See the NOTICE file
0005 * distributed with this work for additional information
0006 * regarding copyright ownership. The ASF licenses this file
0007 * to you under the Apache License, Version 2.0 (the
0008 * "License"); you may not use this file except in compliance
0009 * with the License. You may obtain a copy of the License at
0010 *
0011 * http://www.apache.org/licenses/LICENSE-2.0
0012 *
0013 * Unless required by applicable law or agreed to in writing,
0014 * software distributed under the License is distributed on an
0015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0016 * KIND, either express or implied. See the License for the
0017 * specific language governing permissions and limitations
0018 * under the License.
0019 *
0020 */
0021 package org.apache.qpid.client;
0022
0023 import java.io.IOException;
0024 import java.lang.reflect.InvocationTargetException;
0025 import java.net.ConnectException;
0026 import java.net.UnknownHostException;
0027 import java.nio.channels.UnresolvedAddressException;
0028 import java.util.ArrayList;
0029 import java.util.Collection;
0030 import java.util.Iterator;
0031 import java.util.LinkedHashMap;
0032 import java.util.LinkedList;
0033 import java.util.List;
0034 import java.util.concurrent.ExecutorService;
0035 import java.util.concurrent.Executors;
0036 import java.util.concurrent.TimeUnit;
0037 import java.util.concurrent.atomic.AtomicInteger;
0038
0039 import javax.jms.ConnectionConsumer;
0040 import javax.jms.ConnectionMetaData;
0041 import javax.jms.Destination;
0042 import javax.jms.ExceptionListener;
0043 import javax.jms.IllegalStateException;
0044 import javax.jms.JMSException;
0045 import javax.jms.Queue;
0046 import javax.jms.QueueConnection;
0047 import javax.jms.QueueSession;
0048 import javax.jms.ServerSessionPool;
0049 import javax.jms.Topic;
0050 import javax.jms.TopicConnection;
0051 import javax.jms.TopicSession;
0052 import javax.naming.NamingException;
0053 import javax.naming.Reference;
0054 import javax.naming.Referenceable;
0055 import javax.naming.StringRefAddr;
0056
0057 import org.apache.qpid.AMQConnectionFailureException;
0058 import org.apache.qpid.AMQException;
0059 import org.apache.qpid.AMQProtocolException;
0060 import org.apache.qpid.AMQUnresolvedAddressException;
0061 import org.apache.qpid.client.configuration.ClientProperties;
0062 import org.apache.qpid.client.failover.FailoverException;
0063 import org.apache.qpid.client.failover.FailoverProtectedOperation;
0064 import org.apache.qpid.client.protocol.AMQProtocolHandler;
0065 import org.apache.qpid.exchange.ExchangeDefaults;
0066 import org.apache.qpid.framing.AMQShortString;
0067 import org.apache.qpid.framing.BasicQosBody;
0068 import org.apache.qpid.framing.BasicQosOkBody;
0069 import org.apache.qpid.framing.ChannelOpenBody;
0070 import org.apache.qpid.framing.ChannelOpenOkBody;
0071 import org.apache.qpid.framing.ProtocolVersion;
0072 import org.apache.qpid.framing.TxSelectBody;
0073 import org.apache.qpid.framing.TxSelectOkBody;
0074 import org.apache.qpid.jms.BrokerDetails;
0075 import org.apache.qpid.jms.Connection;
0076 import org.apache.qpid.jms.ConnectionListener;
0077 import org.apache.qpid.jms.ConnectionURL;
0078 import org.apache.qpid.jms.FailoverPolicy;
0079 import org.apache.qpid.protocol.AMQConstant;
0080 import org.apache.qpid.url.URLSyntaxException;
0081 import org.slf4j.Logger;
0082 import org.slf4j.LoggerFactory;
0083
0084 public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
0085 {
0086 public static final class ChannelToSessionMap
0087 {
0088 private final AMQSession[] _fastAccessSessions = new AMQSession[16];
0089 private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
0090 private int _size = 0;
0091 private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
0092
0093 public AMQSession get(int channelId)
0094 {
0095 if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
0096 {
0097 return _fastAccessSessions[channelId];
0098 }
0099 else
0100 {
0101 return _slowAccessSessions.get(channelId);
0102 }
0103 }
0104
0105 public AMQSession put(int channelId, AMQSession session)
0106 {
0107 AMQSession oldVal;
0108 if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
0109 {
0110 oldVal = _fastAccessSessions[channelId];
0111 _fastAccessSessions[channelId] = session;
0112 }
0113 else
0114 {
0115 oldVal = _slowAccessSessions.put(channelId, session);
0116 }
0117 if ((oldVal != null) && (session == null))
0118 {
0119 _size--;
0120 }
0121 else if ((oldVal == null) && (session != null))
0122 {
0123 _size++;
0124 }
0125
0126 return session;
0127
0128 }
0129
0130 public AMQSession remove(int channelId)
0131 {
0132 AMQSession session;
0133 if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
0134 {
0135 session = _fastAccessSessions[channelId];
0136 _fastAccessSessions[channelId] = null;
0137 }
0138 else
0139 {
0140 session = _slowAccessSessions.remove(channelId);
0141 }
0142
0143 if (session != null)
0144 {
0145 _size--;
0146 }
0147 return session;
0148
0149 }
0150
0151 public Collection<AMQSession> values()
0152 {
0153 ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
0154
0155 for (int i = 0; i < 16; i++)
0156 {
0157 if (_fastAccessSessions[i] != null)
0158 {
0159 values.add(_fastAccessSessions[i]);
0160 }
0161 }
0162 values.addAll(_slowAccessSessions.values());
0163
0164 return values;
0165 }
0166
0167 public int size()
0168 {
0169 return _size;
0170 }
0171
0172 public void clear()
0173 {
0174 _size = 0;
0175 _slowAccessSessions.clear();
0176 for (int i = 0; i < 16; i++)
0177 {
0178 _fastAccessSessions[i] = null;
0179 }
0180 }
0181 }
0182
0183 private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
0184
0185 protected AtomicInteger _idFactory = new AtomicInteger(0);
0186
0187 /**
0188 * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
0189 * held by any child objects of this connection such as the session, producers and consumers.
0190 */
0191 private final Object _failoverMutex = new Object();
0192
0193 private final Object _sessionCreationLock = new Object();
0194
0195 /**
0196 * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
0197 * and we must prevent the client from opening too many. Zero means unlimited.
0198 */
0199 protected long _maximumChannelCount;
0200
0201 /** The maximum size of frame supported by the server */
0202 private long _maximumFrameSize;
0203
0204 /**
0205 * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
0206 * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
0207 * handler.
0208 */
0209 protected AMQProtocolHandler _protocolHandler;
0210
0211 /** Maps from session id (Integer) to AMQSession instance */
0212 private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
0213
0214 private String _clientName;
0215
0216 /** The user name to use for authentication */
0217 private String _username;
0218
0219 /** The password to use for authentication */
0220 private String _password;
0221
0222 /** The virtual path to connect to on the AMQ server */
0223 private String _virtualHost;
0224
0225 protected ExceptionListener _exceptionListener;
0226
0227 private ConnectionListener _connectionListener;
0228
0229 private ConnectionURL _connectionURL;
0230
0231 /**
0232 * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
0233 * publication.
0234 */
0235 protected volatile boolean _started;
0236
0237 /** Policy dictating how to failover */
0238 protected FailoverPolicy _failoverPolicy;
0239
0240 /*
0241 * _Connected should be refactored with a suitable wait object.
0242 */
0243 protected boolean _connected;
0244
0245 /*
0246 * The connection meta data
0247 */
0248 private QpidConnectionMetaData _connectionMetaData;
0249
0250 /** Configuration info for SSL */
0251 private SSLConfiguration _sslConfiguration;
0252
0253 private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
0254 private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
0255 private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
0256 private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
0257
0258 /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
0259 private final ExecutorService _taskPool = Executors.newCachedThreadPool();
0260 private static final long DEFAULT_TIMEOUT = 1000 * 30;
0261 private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this
0262
0263 protected AMQConnectionDelegate _delegate;
0264
0265 // this connection maximum number of prefetched messages
0266 protected int _maxPrefetch;
0267
0268 //Indicates whether persistent messages are synchronized
0269 private boolean _syncPersistence;
0270
0271 /**
0272 * @param broker brokerdetails
0273 * @param username username
0274 * @param password password
0275 * @param clientName clientid
0276 * @param virtualHost virtualhost
0277 *
0278 * @throws AMQException
0279 * @throws URLSyntaxException
0280 */
0281 public AMQConnection(String broker, String username, String password, String clientName, String virtualHost)
0282 throws AMQException, URLSyntaxException
0283 {
0284 this(new AMQConnectionURL(
0285 ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
0286 + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
0287 + AMQBrokerDetails.checkTransport(broker) + "'"), null);
0288 }
0289
0290 /**
0291 * @param broker brokerdetails
0292 * @param username username
0293 * @param password password
0294 * @param clientName clientid
0295 * @param virtualHost virtualhost
0296 *
0297 * @throws AMQException
0298 * @throws URLSyntaxException
0299 */
0300 public AMQConnection(String broker, String username, String password, String clientName, String virtualHost,
0301 SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
0302 {
0303 this(new AMQConnectionURL(
0304 ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
0305 + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
0306 + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
0307 }
0308
0309 public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
0310 throws AMQException, URLSyntaxException
0311 {
0312 this(host, port, false, username, password, clientName, virtualHost, null);
0313 }
0314
0315 public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost,
0316 SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
0317 {
0318 this(host, port, false, username, password, clientName, virtualHost, sslConfig);
0319 }
0320
0321 public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName,
0322 String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
0323 {
0324 this(new AMQConnectionURL(
0325 useSSL
0326 ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
0327 + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
0328 + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'")
0329 : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
0330 + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
0331 + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig);
0332 }
0333
0334 public AMQConnection(String connection) throws AMQException, URLSyntaxException
0335 {
0336 this(new AMQConnectionURL(connection), null);
0337 }
0338
0339 public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
0340 {
0341 this(new AMQConnectionURL(connection), sslConfig);
0342 }
0343
0344 /**
0345 * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
0346 * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
0347 */
0348 public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
0349 {
0350 // set this connection maxPrefetch
0351 if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
0352 {
0353 _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
0354 }
0355 else
0356 {
0357 // use the defaul value set for all connections
0358 _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
0359 ClientProperties.MAX_PREFETCH_DEFAULT));
0360 }
0361
0362 if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
0363 {
0364 _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE));
0365 }
0366 else
0367 {
0368 // use the defaul value set for all connections
0369 _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
0370 }
0371
0372 _failoverPolicy = new FailoverPolicy(connectionURL, this);
0373 BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails();
0374 if (brokerDetails.getTransport().equals(BrokerDetails.VM))
0375 {
0376 _delegate = new AMQConnectionDelegate_8_0(this);
0377 }
0378 else
0379 {
0380 _delegate = new AMQConnectionDelegate_0_10(this);
0381 }
0382
0383 if (_logger.isInfoEnabled())
0384 {
0385 _logger.info("Connection:" + connectionURL);
0386 }
0387
0388 _sslConfiguration = sslConfig;
0389 if (connectionURL == null)
0390 {
0391 throw new IllegalArgumentException("Connection must be specified");
0392 }
0393
0394 _connectionURL = connectionURL;
0395
0396 _clientName = connectionURL.getClientName();
0397 _username = connectionURL.getUsername();
0398 _password = connectionURL.getPassword();
0399
0400 setVirtualHost(connectionURL.getVirtualHost());
0401
0402 if (connectionURL.getDefaultQueueExchangeName() != null)
0403 {
0404 _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName();
0405 }
0406
0407 if (connectionURL.getDefaultTopicExchangeName() != null)
0408 {
0409 _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName();
0410 }
0411
0412 if (connectionURL.getTemporaryQueueExchangeName() != null)
0413 {
0414 _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName();
0415 }
0416
0417 if (connectionURL.getTemporaryTopicExchangeName() != null)
0418 {
0419 _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
0420 }
0421
0422 _protocolHandler = new AMQProtocolHandler(this);
0423
0424 // We are not currently connected
0425 _connected = false;
0426
0427 boolean retryAllowed = true;
0428 Exception connectionException = null;
0429 while (!_connected && retryAllowed)
0430 {
0431 ProtocolVersion pe = null;
0432 try
0433 {
0434 pe = makeBrokerConnection(brokerDetails);
0435 }
0436 catch (Exception e)
0437 {
0438 if (_logger.isInfoEnabled())
0439 {
0440 _logger.info("Unable to connect to broker at " +
0441 _failoverPolicy.getCurrentBrokerDetails(),
0442 e);
0443 }
0444 connectionException = e;
0445 }
0446
0447 if (pe != null)
0448 {
0449 // reset the delegate to the version returned by the
0450 // broker
0451 initDelegate(pe);
0452 }
0453 else if (!_connected)
0454 {
0455 retryAllowed = _failoverPolicy.failoverAllowed();
0456 brokerDetails = _failoverPolicy.getNextBrokerDetails();
0457 }
0458 }
0459
0460 if (_logger.isDebugEnabled())
0461 {
0462 _logger.debug("Are we connected:" + _connected);
0463 }
0464
0465 if (!_connected)
0466 {
0467 String message = null;
0468
0469 if (connectionException != null)
0470 {
0471 if (connectionException.getCause() != null)
0472 {
0473 message = connectionException.getCause().getMessage();
0474 connectionException.getCause().printStackTrace();
0475 }
0476 else
0477 {
0478 message = connectionException.getMessage();
0479 }
0480 }
0481
0482 if ((message == null) || message.equals(""))
0483 {
0484 if (message == null)
0485 {
0486 message = "Unable to Connect";
0487 }
0488 else // can only be "" if getMessage() returned it therfore lastException != null
0489 {
0490 message = "Unable to Connect:" + connectionException.getClass();
0491 }
0492 }
0493
0494 for (Throwable th = connectionException; th != null; th = th.getCause())
0495 {
0496 if (th instanceof UnresolvedAddressException ||
0497 th instanceof UnknownHostException)
0498 {
0499 throw new AMQUnresolvedAddressException
0500 (message,
0501 _failoverPolicy.getCurrentBrokerDetails().toString(),
0502 connectionException);
0503 }
0504 }
0505
0506 throw new AMQConnectionFailureException(message, connectionException);
0507 }
0508
0509 _connectionMetaData = new QpidConnectionMetaData(this);
0510 }
0511
0512 protected boolean checkException(Throwable thrown)
0513 {
0514 Throwable cause = thrown.getCause();
0515
0516 if (cause == null)
0517 {
0518 cause = thrown;
0519 }
0520
0521 return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
0522 }
0523
0524 private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
0525 {
0526 try
0527 {
0528 Class c = Class.forName(String.format
0529 ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s",
0530 pe.getMajorVersion(), pe.getMinorVersion()));
0531 Class partypes[] = new Class[1];
0532 partypes[0] = AMQConnection.class;
0533 _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
0534 }
0535 catch (ClassNotFoundException e)
0536 {
0537 throw new AMQProtocolException
0538 (AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
0539 String.format("Protocol: %s.%s is rquired by the broker but is not " +
0540 "currently supported by this client library implementation",
0541 pe.getMajorVersion(), pe.getMinorVersion()),
0542 e);
0543 }
0544 catch (NoSuchMethodException e)
0545 {
0546 throw new RuntimeException("unable to locate constructor for delegate", e);
0547 }
0548 catch (InstantiationException e)
0549 {
0550 throw new RuntimeException("error instantiating delegate", e);
0551 }
0552 catch (IllegalAccessException e)
0553 {
0554 throw new RuntimeException("error accessing delegate", e);
0555 }
0556 catch (InvocationTargetException e)
0557 {
0558 throw new RuntimeException("error invoking delegate", e);
0559 }
0560 }
0561
0562 protected AMQConnection(String username, String password, String clientName, String virtualHost)
0563 {
0564 _clientName = clientName;
0565 _username = username;
0566 _password = password;
0567 setVirtualHost(virtualHost);
0568 }
0569
0570 private void setVirtualHost(String virtualHost)
0571 {
0572 if (virtualHost != null && virtualHost.startsWith("/"))
0573 {
0574 virtualHost = virtualHost.substring(1);
0575 }
0576
0577 _virtualHost = virtualHost;
0578 }
0579
0580 public boolean attemptReconnection(String host, int port)
0581 {
0582 BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration);
0583
0584 _failoverPolicy.setBroker(bd);
0585
0586 try
0587 {
0588 makeBrokerConnection(bd);
0589
0590 return true;
0591 }
0592 catch (Exception e)
0593 {
0594 if (_logger.isInfoEnabled())
0595 {
0596 _logger.info("Unable to connect to broker at " + bd);
0597 }
0598
0599 attemptReconnection();
0600 }
0601
0602 return false;
0603 }
0604
0605 public boolean attemptReconnection()
0606 {
0607 while (_failoverPolicy.failoverAllowed())
0608 {
0609 try
0610 {
0611 makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
0612
0613 return true;
0614 }
0615 catch (Exception e)
0616 {
0617 if (!(e instanceof AMQException))
0618 {
0619 if (_logger.isInfoEnabled())
0620 {
0621 _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
0622 }
0623 }
0624 else
0625 {
0626 if (_logger.isInfoEnabled())
0627 {
0628 _logger.info(e.getMessage() + ":Unable to connect to broker at "
0629 + _failoverPolicy.getCurrentBrokerDetails());
0630 }
0631 }
0632 }
0633 }
0634
0635 // connection unsuccessful
0636 return false;
0637 }
0638
0639 public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
0640 {
0641 return _delegate.makeBrokerConnection(brokerDetail);
0642 }
0643
0644 public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
0645 {
0646 return _delegate.executeRetrySupport(operation);
0647 }
0648
0649 /**
0650 * Get the details of the currently active broker
0651 *
0652 * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance
0653 * otherwise
0654 */
0655 public BrokerDetails getActiveBrokerDetails()
0656 {
0657 return _failoverPolicy.getCurrentBrokerDetails();
0658 }
0659
0660 public boolean failoverAllowed()
0661 {
0662 if (!_connected)
0663 {
0664 return false;
0665 }
0666 else
0667 {
0668 return _failoverPolicy.failoverAllowed();
0669 }
0670 }
0671
0672 public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
0673 {
0674 return createSession(transacted, acknowledgeMode, _maxPrefetch);
0675 }
0676
0677 public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
0678 throws JMSException
0679 {
0680 return createSession(transacted, acknowledgeMode, prefetch, prefetch);
0681 }
0682
0683 public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
0684 final int prefetchHigh, final int prefetchLow) throws JMSException
0685 {
0686 synchronized (_sessionCreationLock)
0687 {
0688 checkNotClosed();
0689 return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
0690 }
0691 }
0692
0693 private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
0694 throws AMQException, FailoverException
0695 {
0696
0697 ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
0698
0699 // TODO: Be aware of possible changes to parameter order as versions change.
0700
0701 _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
0702
0703 BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false);
0704
0705 // todo send low water mark when protocol allows.
0706 // todo Be aware of possible changes to parameter order as versions change.
0707 _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class);
0708
0709 if (transacted)
0710 {
0711 if (_logger.isDebugEnabled())
0712 {
0713 _logger.debug("Issuing TxSelect for " + channelId);
0714 }
0715
0716 TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody();
0717
0718 // TODO: Be aware of possible changes to parameter order as versions change.
0719 _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
0720 }
0721 }
0722
0723 private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
0724 throws AMQException, FailoverException
0725 {
0726 try
0727 {
0728 createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
0729 }
0730 catch (AMQException e)
0731 {
0732 deregisterSession(channelId);
0733 throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
0734 }
0735 }
0736
0737 public void setFailoverPolicy(FailoverPolicy policy)
0738 {
0739 _failoverPolicy = policy;
0740 }
0741
0742 public FailoverPolicy getFailoverPolicy()
0743 {
0744 return _failoverPolicy;
0745 }
0746
0747 /**
0748 * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in
0749 * the JMS spec
0750 *
0751 * @param transacted
0752 * @param acknowledgeMode
0753 *
0754 * @return QueueSession
0755 *
0756 * @throws JMSException
0757 */
0758 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
0759 {
0760 return new AMQQueueSessionAdaptor(createSession(transacted, acknowledgeMode));
0761 }
0762
0763 /**
0764 * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in
0765 * the JMS spec
0766 *
0767 * @param transacted
0768 * @param acknowledgeMode
0769 *
0770 * @return TopicSession
0771 *
0772 * @throws JMSException
0773 */
0774 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
0775 {
0776 return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode));
0777 }
0778
0779 public boolean channelLimitReached()
0780 {
0781 return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount);
0782 }
0783
0784 public String getClientID() throws JMSException
0785 {
0786 checkNotClosed();
0787
0788 return _clientName;
0789 }
0790
0791 public void setClientID(String clientID) throws JMSException
0792 {
0793 checkNotClosed();
0794 // in AMQP it is not possible to change the client ID. If one is not specified
0795 // upon connection construction, an id is generated automatically. Therefore
0796 // we can always throw an exception.
0797 if (!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME))
0798 {
0799 throw new IllegalStateException("Client name cannot be changed after being set");
0800 }
0801 else
0802 {
0803 _logger.info("Operation setClientID is ignored using ID: " + getClientID());
0804 }
0805 }
0806
0807 public ConnectionMetaData getMetaData() throws JMSException
0808 {
0809 checkNotClosed();
0810
0811 return _connectionMetaData;
0812
0813 }
0814
0815 public ExceptionListener getExceptionListener() throws JMSException
0816 {
0817 checkNotClosed();
0818
0819 return _exceptionListener;
0820 }
0821
0822 public void setExceptionListener(ExceptionListener listener) throws JMSException
0823 {
0824 checkNotClosed();
0825 _exceptionListener = listener;
0826 }
0827
0828 /**
0829 * Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread
0830 * and is not thread safe (which is legal according to the JMS specification).
0831 *
0832 * @throws JMSException
0833 */
0834 public void start() throws JMSException
0835 {
0836 checkNotClosed();
0837 if (!_started)
0838 {
0839 _started = true;
0840 final Iterator it = _sessions.values().iterator();
0841 while (it.hasNext())
0842 {
0843 final AMQSession s = (AMQSession) (it.next());
0844 try
0845 {
0846 s.start();
0847 }
0848 catch (AMQException e)
0849 {
0850 throw new JMSAMQException(e);
0851 }
0852 }
0853
0854 }
0855 }
0856
0857 public void stop() throws JMSException
0858 {
0859 checkNotClosed();
0860 if (_started)
0861 {
0862 for (Iterator i = _sessions.values().iterator(); i.hasNext();)
0863 {
0864 try
0865 {
0866 ((AMQSession) i.next()).stop();
0867 }
0868 catch (AMQException e)
0869 {
0870 throw new JMSAMQException(e);
0871 }
0872 }
0873
0874 _started = false;
0875 }
0876 }
0877
0878 public void close() throws JMSException
0879 {
0880 close(DEFAULT_TIMEOUT);
0881 }
0882
0883 public void close(long timeout) throws JMSException
0884 {
0885 close(new ArrayList<AMQSession>(_sessions.values()), timeout);
0886 }
0887
0888 public void close(List<AMQSession> sessions, long timeout) throws JMSException
0889 {
0890 if (!_closed.getAndSet(true))
0891 {
0892 doClose(sessions, timeout);
0893 }
0894 }
0895
0896 private void doClose(List<AMQSession> sessions, long timeout) throws JMSException
0897 {
0898 synchronized (_sessionCreationLock)
0899 {
0900 if (!sessions.isEmpty())
0901 {
0902 AMQSession session = sessions.remove(0);
0903 synchronized (session.getMessageDeliveryLock())
0904 {
0905 doClose(sessions, timeout);
0906 }
0907 }
0908 else
0909 {
0910 synchronized (getFailoverMutex())
0911 {
0912 try
0913 {
0914 long startCloseTime = System.currentTimeMillis();
0915
0916 closeAllSessions(null, timeout, startCloseTime);
0917
0918 //This MUST occur after we have successfully closed all Channels/Sessions
0919 _taskPool.shutdown();
0920
0921 if (!_taskPool.isTerminated())
0922 {
0923 try
0924 {
0925 // adjust timeout
0926 long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
0927
0928 _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
0929 }
0930 catch (InterruptedException e)
0931 {
0932 _logger.info("Interrupted while shutting down connection thread pool.");
0933 }
0934 }
0935
0936 // adjust timeout
0937 timeout = adjustTimeout(timeout, startCloseTime);
0938 _delegate.closeConnection(timeout);
0939
0940 //If the taskpool hasn't shutdown by now then give it shutdownNow.
0941 // This will interupt any running tasks.
0942 if (!_taskPool.isTerminated())
0943 {
0944 List<Runnable> tasks = _taskPool.shutdownNow();
0945 for (Runnable r : tasks)
0946 {
0947 _logger.warn("Connection close forced taskpool to prevent execution:" + r);
0948 }
0949 }
0950 }
0951 catch (AMQException e)
0952 {
0953 _logger.error("error:", e);
0954 JMSException jmse = new JMSException("Error closing connection: " + e);
0955 jmse.setLinkedException(e);
0956 throw jmse;
0957 }
0958 }
0959 }
0960 }
0961 }
0962
0963 private long adjustTimeout(long timeout, long startTime)
0964 {
0965 long now = System.currentTimeMillis();
0966 timeout -= now - startTime;
0967 if (timeout < 0)
0968 {
0969 timeout = 0;
0970 }
0971
0972 return timeout;
0973 }
0974
0975 /**
0976 * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
0977 * mark objects "visible" in userland as closed after failover or other significant event that impacts the
0978 * connection. <p/> The caller must hold the failover mutex before calling this method.
0979 */
0980 private void markAllSessionsClosed()
0981 {
0982 final LinkedList sessionCopy = new LinkedList(_sessions.values());
0983 final Iterator it = sessionCopy.iterator();
0984 while (it.hasNext())
0985 {
0986 final AMQSession session = (AMQSession) it.next();
0987
0988 session.markClosed();
0989 }
0990
0991 _sessions.clear();
0992 }
0993
0994 /**
0995 * Close all the sessions, either due to normal connection closure or due to an error occurring.
0996 *
0997 * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex
0998 * before calling this method.
0999 */
1000 private void closeAllSessions(Throwable cause, long timeout, long starttime) throws JMSException
1001 {
1002 final LinkedList sessionCopy = new LinkedList(_sessions.values());
1003 final Iterator it = sessionCopy.iterator();
1004 JMSException sessionException = null;
1005 while (it.hasNext())
1006 {
1007 final AMQSession session = (AMQSession) it.next();
1008 if (cause != null)
1009 {
1010 session.closed(cause);
1011 }
1012 else
1013 {
1014 try
1015 {
1016 if (starttime != -1)
1017 {
1018 timeout = adjustTimeout(timeout, starttime);
1019 }
1020
1021 session.close(timeout);
1022 }
1023 catch (JMSException e)
1024 {
1025 _logger.error("Error closing session: " + e);
1026 sessionException = e;
1027 }
1028 }
1029 }
1030
1031 _sessions.clear();
1032 if (sessionException != null)
1033 {
1034 throw sessionException;
1035 }
1036 }
1037
1038 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
1039 ServerSessionPool sessionPool, int maxMessages) throws JMSException
1040 {
1041 checkNotClosed();
1042
1043 return null;
1044 }
1045
1046 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool,
1047 int maxMessages) throws JMSException
1048 {
1049 checkNotClosed();
1050
1051 return null;
1052 }
1053
1054 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool,
1055 int maxMessages) throws JMSException
1056 {
1057 checkNotClosed();
1058
1059 return null;
1060 }
1061
1062 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector,
1063 ServerSessionPool sessionPool, int maxMessages) throws JMSException
1064 {
1065 // TODO Auto-generated method stub
1066 checkNotClosed();
1067
1068 return null;
1069 }
1070
1071 public long getMaximumChannelCount() throws JMSException
1072 {
1073 checkNotClosed();
1074
1075 return _maximumChannelCount;
1076 }
1077
1078 public void setConnectionListener(ConnectionListener listener)
1079 {
1080 _connectionListener = listener;
1081 }
1082
1083 public ConnectionListener getConnectionListener()
1084 {
1085 return _connectionListener;
1086 }
1087
1088 public void setMaximumChannelCount(long maximumChannelCount)
1089 {
1090 _maximumChannelCount = maximumChannelCount;
1091 }
1092
1093 public void setMaximumFrameSize(long frameMax)
1094 {
1095 _maximumFrameSize = frameMax;
1096 }
1097
1098 public long getMaximumFrameSize()
1099 {
1100 return _maximumFrameSize;
1101 }
1102
1103 public ChannelToSessionMap getSessions()
1104 {
1105 return _sessions;
1106 }
1107
1108 public String getUsername()
1109 {
1110 return _username;
1111 }
1112
1113 public String getPassword()
1114 {
1115 return _password;
1116 }
1117
1118 public String getVirtualHost()
1119 {
1120 return _virtualHost;
1121 }
1122
1123 public AMQProtocolHandler getProtocolHandler()
1124 {
1125 return _protocolHandler;
1126 }
1127
1128 public boolean started()
1129 {
1130 return _started;
1131 }
1132
1133 public void bytesSent(long writtenBytes)
1134 {
1135 if (_connectionListener != null)
1136 {
1137 _connectionListener.bytesSent(writtenBytes);
1138 }
1139 }
1140
1141 public void bytesReceived(long receivedBytes)
1142 {
1143 if (_connectionListener != null)
1144 {
1145 _connectionListener.bytesReceived(receivedBytes);
1146 }
1147 }
1148
1149 /**
1150 * Fire the preFailover event to the registered connection listener (if any)
1151 *
1152 * @param redirect true if this is the result of a redirect request rather than a connection error
1153 *
1154 * @return true if no listener or listener does not veto change
1155 */
1156 public boolean firePreFailover(boolean redirect)
1157 {
1158 boolean proceed = true;
1159 if (_connectionListener != null)
1160 {
1161 proceed = _connectionListener.preFailover(redirect);
1162 }
1163
1164 return proceed;
1165 }
1166
1167 /**
1168 * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes
1169 * resubscription then all the sessions are closed.
1170 *
1171 * @return true if no listener or listener does not veto resubscription.
1172 *
1173 * @throws JMSException
1174 */
1175 public boolean firePreResubscribe() throws JMSException
1176 {
1177 if (_connectionListener != null)
1178 {
1179 boolean resubscribe = _connectionListener.preResubscribe();
1180 if (!resubscribe)
1181 {
1182 markAllSessionsClosed();
1183 }
1184
1185 return resubscribe;
1186 }
1187 else
1188 {
1189 return true;
1190 }
1191 }
1192
1193 /** Fires a failover complete event to the registered connection listener (if any). */
1194 public void fireFailoverComplete()
1195 {
1196 if (_connectionListener != null)
1197 {
1198 _connectionListener.failoverComplete();
1199 }
1200 }
1201
1202 /**
1203 * In order to protect the consistency of the connection and its child sessions, consumers and producers, the
1204 * "failover mutex" must be held when doing any operations that could be corrupted during failover.
1205 *
1206 * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
1207 */
1208 public final Object getFailoverMutex()
1209 {
1210 return _failoverMutex;
1211 }
1212
1213 public void failoverPrep()
1214 {
1215 _delegate.failoverPrep();
1216 }
1217
1218 public void resubscribeSessions() throws JMSException, AMQException, FailoverException
1219 {
1220 _delegate.resubscribeSessions();
1221 }
1222
1223 /**
1224 * If failover is taking place this will block until it has completed. If failover is not taking place it will
1225 * return immediately.
1226 *
1227 * @throws InterruptedException
1228 */
1229 public void blockUntilNotFailingOver() throws InterruptedException
1230 {
1231 _protocolHandler.blockUntilNotFailingOver();
1232 }
1233
1234 /**
1235 * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
1236 * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
1237 * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
1238 *
1239 * @param cause the exception
1240 */
1241 public void exceptionReceived(Throwable cause)
1242 {
1243
1244 if (_logger.isDebugEnabled())
1245 {
1246 _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);
1247 }
1248
1249 final JMSException je;
1250 if (cause instanceof JMSException)
1251 {
1252 je = (JMSException) cause;
1253 }
1254 else
1255 {
1256 AMQConstant code = null;
1257
1258 if (cause instanceof AMQException)
1259 {
1260 code = ((AMQException) cause).getErrorCode();
1261 }
1262
1263 if (code != null)
1264 {
1265 je =
1266 new JMSException(Integer.toString(code.getCode()), "Exception thrown against " + toString() + ": "
1267 + cause);
1268 }
1269 else
1270 {
1271 //Should never get here as all AMQEs are required to have an ErrorCode!
1272 je = new JMSException("Exception thrown against " + toString() + ": " + cause);
1273 }
1274
1275 if (cause instanceof Exception)
1276 {
1277 je.setLinkedException((Exception) cause);
1278 }
1279 }
1280
1281 boolean closer = false;
1282
1283 // in the case of an IOException, MINA has closed the protocol session so we set _closed to true
1284 // so that any generic client code that tries to close the connection will not mess up this error
1285 // handling sequence
1286 if (cause instanceof IOException)
1287 {
1288 closer = !_closed.getAndSet(true);
1289
1290 _protocolHandler.getProtocolSession().notifyError(je);
1291 }
1292
1293 if (_exceptionListener != null)
1294 {
1295 _exceptionListener.onException(je);
1296 }
1297 else
1298 {
1299 _logger.error("Throwable Received but no listener set: " + cause.getMessage());
1300 }
1301
1302 if (hardError(cause))
1303 {
1304 try
1305 {
1306 if (_logger.isInfoEnabled())
1307 {
1308 _logger.info("Closing AMQConnection due to :" + cause.getMessage());
1309 }
1310
1311 closer = (!_closed.getAndSet(true)) || closer;
1312 if (closer)
1313 {
1314 closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
1315 }
1316 }
1317 catch (JMSException e)
1318 {
1319 _logger.error("Error closing all sessions: " + e, e);
1320 }
1321
1322 }
1323 else
1324 {
1325 _logger.info("Not a hard-error connection not closing: " + cause.getMessage());
1326 }
1327 }
1328
1329 private boolean hardError(Throwable cause)
1330 {
1331 if (cause instanceof AMQException)
1332 {
1333 return ((AMQException) cause).isHardError();
1334 }
1335
1336 return true;
1337 }
1338
1339 void registerSession(int channelId, AMQSession session)
1340 {
1341 _sessions.put(channelId, session);
1342 }
1343
1344 void deregisterSession(int channelId)
1345 {
1346 _sessions.remove(channelId);
1347 }
1348
1349 public String toString()
1350 {
1351 StringBuffer buf = new StringBuffer("AMQConnection:\n");
1352 if (_failoverPolicy.getCurrentBrokerDetails() == null)
1353 {
1354 buf.append("No active broker connection");
1355 }
1356 else
1357 {
1358 BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails();
1359 buf.append("Host: ").append(String.valueOf(bd.getHost()));
1360 buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
1361 }
1362
1363 buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost));
1364 buf.append("\nClient ID: ").append(String.valueOf(_clientName));
1365 buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size());
1366
1367 return buf.toString();
1368 }
1369
1370 public String toURL()
1371 {
1372 return _connectionURL.toString();
1373 }
1374
1375 public Reference getReference() throws NamingException
1376 {
1377 return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), toURL()),
1378 AMQConnectionFactory.class.getName(), null); // factory location
1379 }
1380
1381 public SSLConfiguration getSSLConfiguration()
1382 {
1383 return _sslConfiguration;
1384 }
1385
1386 public AMQShortString getDefaultTopicExchangeName()
1387 {
1388 return _defaultTopicExchangeName;
1389 }
1390
1391 public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName)
1392 {
1393 _defaultTopicExchangeName = defaultTopicExchangeName;
1394 }
1395
1396 public AMQShortString getDefaultQueueExchangeName()
1397 {
1398 return _defaultQueueExchangeName;
1399 }
1400
1401 public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName)
1402 {
1403 _defaultQueueExchangeName = defaultQueueExchangeName;
1404 }
1405
1406 public AMQShortString getTemporaryTopicExchangeName()
1407 {
1408 return _temporaryTopicExchangeName;
1409 }
1410
1411 public AMQShortString getTemporaryQueueExchangeName()
1412 {
1413 return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates.
1414 }
1415
1416 public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName)
1417 {
1418 _temporaryTopicExchangeName = temporaryTopicExchangeName;
1419 }
1420
1421 public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName)
1422 {
1423 _temporaryQueueExchangeName = temporaryQueueExchangeName;
1424 }
1425
1426 public void performConnectionTask(Runnable task)
1427 {
1428 _taskPool.execute(task);
1429 }
1430
1431 public AMQSession getSession(int channelId)
1432 {
1433 return _sessions.get(channelId);
1434 }
1435
1436 public ProtocolVersion getProtocolVersion()
1437 {
1438 return _protocolVersion;
1439 }
1440
1441 public void setProtocolVersion(ProtocolVersion protocolVersion)
1442 {
1443 _protocolVersion = protocolVersion;
1444 _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion);
1445 }
1446
1447 public boolean isFailingOver()
1448 {
1449 return (_protocolHandler.getFailoverLatch() != null);
1450 }
1451
1452 /**
1453 * Get the maximum number of messages that this connection can pre-fetch.
1454 *
1455 * @return The maximum number of messages that this connection can pre-fetch.
1456 */
1457 public long getMaxPrefetch()
1458 {
1459 return _maxPrefetch;
1460 }
1461
1462 /**
1463 * Indicates whether persistent messages are synchronized
1464 *
1465 * @return true if persistent messages are synchronized false otherwise
1466 */
1467 public boolean getSyncPersistence()
1468 {
1469 return _syncPersistence;
1470 }
1471
1472 public void setIdleTimeout(long l)
1473 {
1474 _delegate.setIdleTimeout(l);
1475 }
1476 }
|