AMQConnection.java
0001 /*
0002 *
0003  * Licensed to the Apache Software Foundation (ASF) under one
0004  * or more contributor license agreements.  See the NOTICE file
0005  * distributed with this work for additional information
0006  * regarding copyright ownership.  The ASF licenses this file
0007  * to you under the Apache License, Version 2.0 (the
0008  * "License"); you may not use this file except in compliance
0009  * with the License.  You may obtain a copy of the License at
0010  *
0011  *   http://www.apache.org/licenses/LICENSE-2.0
0012  *
0013  * Unless required by applicable law or agreed to in writing,
0014  * software distributed under the License is distributed on an
0015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0016  * KIND, either express or implied.  See the License for the
0017  * specific language governing permissions and limitations
0018  * under the License.
0019  *
0020  */
0021 package org.apache.qpid.client;
0022 
0023 import java.io.IOException;
0024 import java.lang.reflect.InvocationTargetException;
0025 import java.net.ConnectException;
0026 import java.net.UnknownHostException;
0027 import java.nio.channels.UnresolvedAddressException;
0028 import java.util.ArrayList;
0029 import java.util.Collection;
0030 import java.util.Iterator;
0031 import java.util.LinkedHashMap;
0032 import java.util.LinkedList;
0033 import java.util.List;
0034 import java.util.concurrent.ExecutorService;
0035 import java.util.concurrent.Executors;
0036 import java.util.concurrent.TimeUnit;
0037 import java.util.concurrent.atomic.AtomicInteger;
0038 
0039 import javax.jms.ConnectionConsumer;
0040 import javax.jms.ConnectionMetaData;
0041 import javax.jms.Destination;
0042 import javax.jms.ExceptionListener;
0043 import javax.jms.IllegalStateException;
0044 import javax.jms.JMSException;
0045 import javax.jms.Queue;
0046 import javax.jms.QueueConnection;
0047 import javax.jms.QueueSession;
0048 import javax.jms.ServerSessionPool;
0049 import javax.jms.Topic;
0050 import javax.jms.TopicConnection;
0051 import javax.jms.TopicSession;
0052 import javax.naming.NamingException;
0053 import javax.naming.Reference;
0054 import javax.naming.Referenceable;
0055 import javax.naming.StringRefAddr;
0056 
0057 import org.apache.qpid.AMQConnectionFailureException;
0058 import org.apache.qpid.AMQException;
0059 import org.apache.qpid.AMQProtocolException;
0060 import org.apache.qpid.AMQUnresolvedAddressException;
0061 import org.apache.qpid.client.configuration.ClientProperties;
0062 import org.apache.qpid.client.failover.FailoverException;
0063 import org.apache.qpid.client.failover.FailoverProtectedOperation;
0064 import org.apache.qpid.client.protocol.AMQProtocolHandler;
0065 import org.apache.qpid.exchange.ExchangeDefaults;
0066 import org.apache.qpid.framing.AMQShortString;
0067 import org.apache.qpid.framing.BasicQosBody;
0068 import org.apache.qpid.framing.BasicQosOkBody;
0069 import org.apache.qpid.framing.ChannelOpenBody;
0070 import org.apache.qpid.framing.ChannelOpenOkBody;
0071 import org.apache.qpid.framing.ProtocolVersion;
0072 import org.apache.qpid.framing.TxSelectBody;
0073 import org.apache.qpid.framing.TxSelectOkBody;
0074 import org.apache.qpid.jms.BrokerDetails;
0075 import org.apache.qpid.jms.Connection;
0076 import org.apache.qpid.jms.ConnectionListener;
0077 import org.apache.qpid.jms.ConnectionURL;
0078 import org.apache.qpid.jms.FailoverPolicy;
0079 import org.apache.qpid.protocol.AMQConstant;
0080 import org.apache.qpid.url.URLSyntaxException;
0081 import org.slf4j.Logger;
0082 import org.slf4j.LoggerFactory;
0083 
0084 public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
0085 {
0086     public static final class ChannelToSessionMap
0087     {
0088         private final AMQSession[] _fastAccessSessions = new AMQSession[16];
0089         private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
0090         private int _size = 0;
0091         private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
0092 
0093         public AMQSession get(int channelId)
0094         {
0095             if ((channelId & FAST_CHANNEL_ACCESS_MASK== 0)
0096             {
0097                 return _fastAccessSessions[channelId];
0098             }
0099             else
0100             {
0101                 return _slowAccessSessions.get(channelId);
0102             }
0103         }
0104 
0105         public AMQSession put(int channelId, AMQSession session)
0106         {
0107             AMQSession oldVal;
0108             if ((channelId & FAST_CHANNEL_ACCESS_MASK== 0)
0109             {
0110                 oldVal = _fastAccessSessions[channelId];
0111                 _fastAccessSessions[channelId= session;
0112             }
0113             else
0114             {
0115                 oldVal = _slowAccessSessions.put(channelId, session);
0116             }
0117             if ((oldVal != null&& (session == null))
0118             {
0119                 _size--;
0120             }
0121             else if ((oldVal == null&& (session != null))
0122             {
0123                 _size++;
0124             }
0125 
0126             return session;
0127 
0128         }
0129 
0130         public AMQSession remove(int channelId)
0131         {
0132             AMQSession session;
0133             if ((channelId & FAST_CHANNEL_ACCESS_MASK== 0)
0134             {
0135                 session = _fastAccessSessions[channelId];
0136                 _fastAccessSessions[channelIdnull;
0137             }
0138             else
0139             {
0140                 session = _slowAccessSessions.remove(channelId);
0141             }
0142 
0143             if (session != null)
0144             {
0145                 _size--;
0146             }
0147             return session;
0148 
0149         }
0150 
0151         public Collection<AMQSession> values()
0152         {
0153             ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
0154 
0155             for (int i = 0; i < 16; i++)
0156             {
0157                 if (_fastAccessSessions[i!= null)
0158                 {
0159                     values.add(_fastAccessSessions[i]);
0160                 }
0161             }
0162             values.addAll(_slowAccessSessions.values());
0163 
0164             return values;
0165         }
0166 
0167         public int size()
0168         {
0169             return _size;
0170         }
0171 
0172         public void clear()
0173         {
0174             _size = 0;
0175             _slowAccessSessions.clear();
0176             for (int i = 0; i < 16; i++)
0177             {
0178                 _fastAccessSessions[inull;
0179             }
0180         }
0181     }
0182 
0183     private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
0184 
0185     protected AtomicInteger _idFactory = new AtomicInteger(0);
0186 
0187     /**
0188      * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
0189      * held by any child objects of this connection such as the session, producers and consumers.
0190      */
0191     private final Object _failoverMutex = new Object();
0192 
0193     private final Object _sessionCreationLock = new Object();
0194 
0195     /**
0196      * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
0197      * and we must prevent the client from opening too many. Zero means unlimited.
0198      */
0199     protected long _maximumChannelCount;
0200 
0201     /** The maximum size of frame supported by the server */
0202     private long _maximumFrameSize;
0203 
0204     /**
0205      * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
0206      * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
0207      * handler.
0208      */
0209     protected AMQProtocolHandler _protocolHandler;
0210 
0211     /** Maps from session id (Integer) to AMQSession instance */
0212     private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
0213 
0214     private String _clientName;
0215 
0216     /** The user name to use for authentication */
0217     private String _username;
0218 
0219     /** The password to use for authentication */
0220     private String _password;
0221 
0222     /** The virtual path to connect to on the AMQ server */
0223     private String _virtualHost;
0224 
0225     protected ExceptionListener _exceptionListener;
0226 
0227     private ConnectionListener _connectionListener;
0228 
0229     private ConnectionURL _connectionURL;
0230 
0231     /**
0232      * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
0233      * publication.
0234      */
0235     protected volatile boolean _started;
0236 
0237     /** Policy dictating how to failover */
0238     protected FailoverPolicy _failoverPolicy;
0239 
0240     /*
0241      * _Connected should be refactored with a suitable wait object.
0242      */
0243     protected boolean _connected;
0244 
0245     /*
0246      * The connection meta data
0247      */
0248     private QpidConnectionMetaData _connectionMetaData;
0249 
0250     /** Configuration info for SSL */
0251     private SSLConfiguration _sslConfiguration;
0252 
0253     private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
0254     private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
0255     private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
0256     private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
0257 
0258     /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
0259     private final ExecutorService _taskPool = Executors.newCachedThreadPool();
0260     private static final long DEFAULT_TIMEOUT = 1000 30;
0261     private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this
0262 
0263     protected AMQConnectionDelegate _delegate;
0264 
0265     // this connection maximum number of prefetched messages
0266     protected int _maxPrefetch;
0267 
0268     //Indicates whether persistent messages are synchronized
0269     private boolean _syncPersistence;
0270 
0271     /**
0272      @param broker      brokerdetails
0273      @param username    username
0274      @param password    password
0275      @param clientName  clientid
0276      @param virtualHost virtualhost
0277      *
0278      @throws AMQException
0279      @throws URLSyntaxException
0280      */
0281     public AMQConnection(String broker, String username, String password, String clientName, String virtualHost)
0282             throws AMQException, URLSyntaxException
0283     {
0284         this(new AMQConnectionURL(
0285                 ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
0286                 ((clientName == null"" : clientName"/" + virtualHost + "?brokerlist='"
0287                 + AMQBrokerDetails.checkTransport(broker"'")null);
0288     }
0289 
0290     /**
0291      @param broker      brokerdetails
0292      @param username    username
0293      @param password    password
0294      @param clientName  clientid
0295      @param virtualHost virtualhost
0296      *
0297      @throws AMQException
0298      @throws URLSyntaxException
0299      */
0300     public AMQConnection(String broker, String username, String password, String clientName, String virtualHost,
0301                          SSLConfiguration sslConfigthrows AMQException, URLSyntaxException
0302     {
0303         this(new AMQConnectionURL(
0304                 ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
0305                 ((clientName == null"" : clientName"/" + virtualHost + "?brokerlist='"
0306                 + AMQBrokerDetails.checkTransport(broker"'"), sslConfig);
0307     }
0308 
0309     public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
0310             throws AMQException, URLSyntaxException
0311     {
0312         this(host, port, false, username, password, clientName, virtualHost, null);
0313     }
0314 
0315     public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost,
0316                          SSLConfiguration sslConfigthrows AMQException, URLSyntaxException
0317     {
0318         this(host, port, false, username, password, clientName, virtualHost, sslConfig);
0319     }
0320 
0321     public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName,
0322                          String virtualHost, SSLConfiguration sslConfigthrows AMQException, URLSyntaxException
0323     {
0324         this(new AMQConnectionURL(
0325                 useSSL
0326                 (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
0327                    ((clientName == null"" : clientName+ virtualHost + "?brokerlist='tcp://" + host + ":" + port
0328                    "'" "," + ConnectionURL.OPTIONS_SSL + "='true'")
0329                 (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
0330                    ((clientName == null"" : clientName+ virtualHost + "?brokerlist='tcp://" + host + ":" + port
0331                    "'" "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig);
0332     }
0333 
0334     public AMQConnection(String connectionthrows AMQException, URLSyntaxException
0335     {
0336         this(new AMQConnectionURL(connection)null);
0337     }
0338 
0339     public AMQConnection(String connection, SSLConfiguration sslConfigthrows AMQException, URLSyntaxException
0340     {
0341         this(new AMQConnectionURL(connection), sslConfig);
0342     }
0343 
0344     /**
0345      * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
0346      * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
0347      */
0348     public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfigthrows AMQException
0349     {
0350         // set this connection maxPrefetch
0351         if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH!= null)
0352         {
0353             _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
0354         }
0355         else
0356         {
0357             // use the defaul value set for all connections
0358             _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
0359                                                                                ClientProperties.MAX_PREFETCH_DEFAULT));
0360         }
0361 
0362         if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE!= null)
0363         {
0364             _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE));
0365         }
0366         else
0367         {
0368             // use the defaul value set for all connections
0369             _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
0370         }
0371         
0372         _failoverPolicy = new FailoverPolicy(connectionURL, this);
0373         BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails();
0374         if (brokerDetails.getTransport().equals(BrokerDetails.VM))
0375         {
0376             _delegate = new AMQConnectionDelegate_8_0(this);
0377         }
0378         else
0379         {
0380             _delegate = new AMQConnectionDelegate_0_10(this);
0381         }
0382 
0383         if (_logger.isInfoEnabled())
0384         {
0385             _logger.info("Connection:" + connectionURL);
0386         }
0387 
0388         _sslConfiguration = sslConfig;
0389         if (connectionURL == null)
0390         {
0391             throw new IllegalArgumentException("Connection must be specified");
0392         }
0393 
0394         _connectionURL = connectionURL;
0395 
0396         _clientName = connectionURL.getClientName();
0397         _username = connectionURL.getUsername();
0398         _password = connectionURL.getPassword();
0399 
0400         setVirtualHost(connectionURL.getVirtualHost());
0401 
0402         if (connectionURL.getDefaultQueueExchangeName() != null)
0403         {
0404             _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName();
0405         }
0406 
0407         if (connectionURL.getDefaultTopicExchangeName() != null)
0408         {
0409             _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName();
0410         }
0411 
0412         if (connectionURL.getTemporaryQueueExchangeName() != null)
0413         {
0414             _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName();
0415         }
0416 
0417         if (connectionURL.getTemporaryTopicExchangeName() != null)
0418         {
0419             _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
0420         }
0421 
0422         _protocolHandler = new AMQProtocolHandler(this);
0423 
0424         // We are not currently connected
0425         _connected = false;
0426 
0427         boolean retryAllowed = true;
0428         Exception connectionException = null;
0429         while (!_connected && retryAllowed)
0430         {
0431             ProtocolVersion pe = null;
0432             try
0433             {
0434                 pe = makeBrokerConnection(brokerDetails);
0435             }
0436             catch (Exception e)
0437             {
0438                 if (_logger.isInfoEnabled())
0439                 {
0440                     _logger.info("Unable to connect to broker at " +
0441                                  _failoverPolicy.getCurrentBrokerDetails(),
0442                                  e);
0443                 }
0444                 connectionException = e;
0445             }
0446 
0447             if (pe != null)
0448             {
0449                 // reset the delegate to the version returned by the
0450                 // broker
0451                 initDelegate(pe);
0452             }
0453             else if (!_connected)
0454             {
0455                 retryAllowed = _failoverPolicy.failoverAllowed()
0456                 brokerDetails = _failoverPolicy.getNextBrokerDetails();
0457             }
0458         }
0459 
0460         if (_logger.isDebugEnabled())
0461         {
0462             _logger.debug("Are we connected:" + _connected);
0463         }
0464 
0465         if (!_connected)
0466         {
0467             String message = null;
0468 
0469             if (connectionException != null)
0470             {
0471                 if (connectionException.getCause() != null)
0472                 {
0473                     message = connectionException.getCause().getMessage();
0474                     connectionException.getCause().printStackTrace();
0475                 }
0476                 else
0477                 {
0478                     message = connectionException.getMessage();
0479                 }
0480             }
0481 
0482             if ((message == null|| message.equals(""))
0483             {
0484                 if (message == null)
0485                 {
0486                     message = "Unable to Connect";
0487                 }
0488                 else // can only be "" if getMessage() returned it therfore lastException != null
0489                 {
0490                     message = "Unable to Connect:" + connectionException.getClass();
0491                 }
0492             }
0493 
0494             for (Throwable th = connectionException; th != null; th = th.getCause())
0495             {
0496                 if (th instanceof UnresolvedAddressException ||
0497                     th instanceof UnknownHostException)
0498                 {
0499                     throw new AMQUnresolvedAddressException
0500                         (message,
0501                          _failoverPolicy.getCurrentBrokerDetails().toString(),
0502                          connectionException);
0503                 }
0504             }
0505 
0506             throw new AMQConnectionFailureException(message, connectionException);
0507         }
0508         
0509         _connectionMetaData = new QpidConnectionMetaData(this);
0510     }
0511 
0512     protected boolean checkException(Throwable thrown)
0513     {
0514         Throwable cause = thrown.getCause();
0515 
0516         if (cause == null)
0517         {
0518             cause = thrown;
0519         }
0520 
0521         return ((cause instanceof ConnectException|| (cause instanceof UnresolvedAddressException));
0522     }
0523 
0524     private void initDelegate(ProtocolVersion pethrows AMQProtocolException
0525     {
0526         try
0527         {
0528             Class c = Class.forName(String.format
0529                                     ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s",
0530                                      pe.getMajorVersion(), pe.getMinorVersion()));
0531             Class partypes[] new Class[1];
0532             partypes[0= AMQConnection.class;
0533             _delegate = (AMQConnectionDelegatec.getConstructor(partypes).newInstance(this);
0534         }
0535         catch (ClassNotFoundException e)
0536         {
0537             throw new AMQProtocolException
0538                 (AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
0539                  String.format("Protocol: %s.%s is rquired by the broker but is not " +
0540                                "currently supported by this client library implementation",
0541                                pe.getMajorVersion(), pe.getMinorVersion()),
0542                  e);
0543         }
0544         catch (NoSuchMethodException e)
0545         {
0546             throw new RuntimeException("unable to locate constructor for delegate", e);
0547         }
0548         catch (InstantiationException e)
0549         {
0550             throw new RuntimeException("error instantiating delegate", e);
0551         }
0552         catch (IllegalAccessException e)
0553         {
0554             throw new RuntimeException("error accessing delegate", e);
0555         }
0556         catch (InvocationTargetException e)
0557         {
0558             throw new RuntimeException("error invoking delegate", e);
0559         }
0560     }
0561 
0562     protected AMQConnection(String username, String password, String clientName, String virtualHost)
0563     {
0564         _clientName = clientName;
0565         _username = username;
0566         _password = password;
0567         setVirtualHost(virtualHost);
0568     }
0569 
0570     private void setVirtualHost(String virtualHost)
0571     {
0572         if (virtualHost != null && virtualHost.startsWith("/"))
0573         {
0574             virtualHost = virtualHost.substring(1);
0575         }
0576 
0577         _virtualHost = virtualHost;
0578     }
0579 
0580     public boolean attemptReconnection(String host, int port)
0581     {
0582         BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration);
0583 
0584         _failoverPolicy.setBroker(bd);
0585 
0586         try
0587         {
0588             makeBrokerConnection(bd);
0589 
0590             return true;
0591         }
0592         catch (Exception e)
0593         {
0594             if (_logger.isInfoEnabled())
0595             {
0596                 _logger.info("Unable to connect to broker at " + bd);
0597             }
0598 
0599             attemptReconnection();
0600         }
0601 
0602         return false;
0603     }
0604 
0605     public boolean attemptReconnection()
0606     {
0607         while (_failoverPolicy.failoverAllowed())
0608         {
0609             try
0610             {
0611                 makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
0612 
0613                 return true;
0614             }
0615             catch (Exception e)
0616             {
0617                 if (!(instanceof AMQException))
0618                 {
0619                     if (_logger.isInfoEnabled())
0620                     {
0621                         _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
0622                     }
0623                 }
0624                 else
0625                 {
0626                     if (_logger.isInfoEnabled())
0627                     {
0628                         _logger.info(e.getMessage() ":Unable to connect to broker at "
0629                                      + _failoverPolicy.getCurrentBrokerDetails());
0630                     }
0631                 }
0632             }
0633         }
0634 
0635         // connection unsuccessful
0636         return false;
0637     }
0638 
0639     public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetailthrows IOException, AMQException
0640     {
0641         return _delegate.makeBrokerConnection(brokerDetail);
0642     }
0643 
0644     public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operationthrows E
0645     {
0646         return _delegate.executeRetrySupport(operation);
0647     }
0648 
0649     /**
0650      * Get the details of the currently active broker
0651      *
0652      @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance
0653      *         otherwise
0654      */
0655     public BrokerDetails getActiveBrokerDetails()
0656     {
0657         return _failoverPolicy.getCurrentBrokerDetails();
0658     }
0659 
0660     public boolean failoverAllowed()
0661     {
0662         if (!_connected)
0663         {
0664             return false;
0665         }
0666         else
0667         {
0668             return _failoverPolicy.failoverAllowed();
0669         }
0670     }
0671 
0672     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeModethrows JMSException
0673     {
0674         return createSession(transacted, acknowledgeMode, _maxPrefetch);
0675     }
0676 
0677     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
0678             throws JMSException
0679     {
0680         return createSession(transacted, acknowledgeMode, prefetch, prefetch);
0681     }
0682 
0683     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
0684                                                      final int prefetchHigh, final int prefetchLowthrows JMSException
0685     {
0686         synchronized (_sessionCreationLock)
0687         {
0688             checkNotClosed();
0689             return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
0690         }
0691     }
0692 
0693     private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
0694             throws AMQException, FailoverException
0695     {
0696 
0697         ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
0698 
0699         // TODO: Be aware of possible changes to parameter order as versions change.
0700 
0701         _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
0702 
0703         BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false);
0704 
0705         // todo send low water mark when protocol allows.
0706         // todo Be aware of possible changes to parameter order as versions change.
0707         _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class);
0708 
0709         if (transacted)
0710         {
0711             if (_logger.isDebugEnabled())
0712             {
0713                 _logger.debug("Issuing TxSelect for " + channelId);
0714             }
0715 
0716             TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody();
0717 
0718             // TODO: Be aware of possible changes to parameter order as versions change.
0719             _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
0720         }
0721     }
0722 
0723     private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
0724             throws AMQException, FailoverException
0725     {
0726         try
0727         {
0728             createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
0729         }
0730         catch (AMQException e)
0731         {
0732             deregisterSession(channelId);
0733             throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
0734         }
0735     }
0736 
0737     public void setFailoverPolicy(FailoverPolicy policy)
0738     {
0739         _failoverPolicy = policy;
0740     }
0741 
0742     public FailoverPolicy getFailoverPolicy()
0743     {
0744         return _failoverPolicy;
0745     }
0746 
0747     /**
0748      * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in
0749      * the JMS spec
0750      *
0751      @param transacted
0752      @param acknowledgeMode
0753      *
0754      @return QueueSession
0755      *
0756      @throws JMSException
0757      */
0758     public QueueSession createQueueSession(boolean transacted, int acknowledgeModethrows JMSException
0759     {
0760         return new AMQQueueSessionAdaptor(createSession(transacted, acknowledgeMode));
0761     }
0762 
0763     /**
0764      * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in
0765      * the JMS spec
0766      *
0767      @param transacted
0768      @param acknowledgeMode
0769      *
0770      @return TopicSession
0771      *
0772      @throws JMSException
0773      */
0774     public TopicSession createTopicSession(boolean transacted, int acknowledgeModethrows JMSException
0775     {
0776         return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode));
0777     }
0778 
0779     public boolean channelLimitReached()
0780     {
0781         return (_maximumChannelCount != 0&& (_sessions.size() == _maximumChannelCount);
0782     }
0783 
0784     public String getClientID() throws JMSException
0785     {
0786         checkNotClosed();
0787 
0788         return _clientName;
0789     }
0790 
0791     public void setClientID(String clientIDthrows JMSException
0792     {
0793         checkNotClosed();
0794         // in AMQP it is not possible to change the client ID. If one is not specified
0795         // upon connection construction, an id is generated automatically. Therefore
0796         // we can always throw an exception.
0797         if (!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME))
0798         {
0799             throw new IllegalStateException("Client name cannot be changed after being set");
0800         }
0801         else
0802         {
0803             _logger.info("Operation setClientID is ignored using ID: " + getClientID());
0804         }
0805     }
0806 
0807     public ConnectionMetaData getMetaData() throws JMSException
0808     {
0809         checkNotClosed();
0810 
0811         return _connectionMetaData;
0812 
0813     }
0814 
0815     public ExceptionListener getExceptionListener() throws JMSException
0816     {
0817         checkNotClosed();
0818 
0819         return _exceptionListener;
0820     }
0821 
0822     public void setExceptionListener(ExceptionListener listenerthrows JMSException
0823     {
0824         checkNotClosed();
0825         _exceptionListener = listener;
0826     }
0827 
0828     /**
0829      * Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread
0830      * and is not thread safe (which is legal according to the JMS specification).
0831      *
0832      @throws JMSException
0833      */
0834     public void start() throws JMSException
0835     {
0836         checkNotClosed();
0837         if (!_started)
0838         {
0839             _started = true;
0840             final Iterator it = _sessions.values().iterator();
0841             while (it.hasNext())
0842             {
0843                 final AMQSession s = (AMQSession) (it.next());
0844                 try
0845                 {
0846                     s.start();
0847                 }
0848                 catch (AMQException e)
0849                 {
0850                     throw new JMSAMQException(e);
0851                 }
0852             }
0853 
0854         }
0855     }
0856 
0857     public void stop() throws JMSException
0858     {
0859         checkNotClosed();
0860         if (_started)
0861         {
0862             for (Iterator i = _sessions.values().iterator(); i.hasNext();)
0863             {
0864                 try
0865                 {
0866                     ((AMQSessioni.next()).stop();
0867                 }
0868                 catch (AMQException e)
0869                 {
0870                     throw new JMSAMQException(e);
0871                 }
0872             }
0873 
0874             _started = false;
0875         }
0876     }
0877 
0878     public void     close() throws JMSException
0879     {
0880         close(DEFAULT_TIMEOUT);
0881     }
0882 
0883     public void close(long timeoutthrows JMSException
0884     {
0885         close(new ArrayList<AMQSession>(_sessions.values()), timeout);
0886     }
0887 
0888     public void close(List<AMQSession> sessions, long timeoutthrows JMSException
0889     {
0890         if (!_closed.getAndSet(true))
0891         {
0892             doClose(sessions, timeout);
0893         }
0894     }
0895 
0896     private void doClose(List<AMQSession> sessions, long timeoutthrows JMSException
0897     {
0898         synchronized (_sessionCreationLock)
0899         {
0900             if (!sessions.isEmpty())
0901             {
0902                 AMQSession session = sessions.remove(0);
0903                 synchronized (session.getMessageDeliveryLock())
0904                 {
0905                     doClose(sessions, timeout);
0906                 }
0907             }
0908             else
0909             {
0910                 synchronized (getFailoverMutex())
0911                 {
0912                     try
0913                     {
0914                         long startCloseTime = System.currentTimeMillis();
0915 
0916                         closeAllSessions(null, timeout, startCloseTime);
0917 
0918                         //This MUST occur after we have successfully closed all Channels/Sessions
0919                         _taskPool.shutdown();
0920 
0921                         if (!_taskPool.isTerminated())
0922                         {
0923                             try
0924                             {
0925                                 // adjust timeout
0926                                 long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
0927 
0928                                 _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
0929                             }
0930                             catch (InterruptedException e)
0931                             {
0932                                 _logger.info("Interrupted while shutting down connection thread pool.");
0933                             }
0934                         }
0935 
0936                         // adjust timeout
0937                         timeout = adjustTimeout(timeout, startCloseTime);
0938                         _delegate.closeConnection(timeout);
0939 
0940                         //If the taskpool hasn't shutdown by now then give it shutdownNow.
0941                         // This will interupt any running tasks.
0942                         if (!_taskPool.isTerminated())
0943                         {
0944                             List<Runnable> tasks = _taskPool.shutdownNow();
0945                             for (Runnable r : tasks)
0946                             {
0947                                 _logger.warn("Connection close forced taskpool to prevent execution:" + r);
0948                             }
0949                         }
0950                     }
0951                     catch (AMQException e)
0952                     {
0953                         _logger.error("error:", e);
0954                         JMSException jmse = new JMSException("Error closing connection: " + e);
0955                         jmse.setLinkedException(e);
0956                         throw jmse;
0957                     }
0958                 }
0959             }
0960         }
0961     }
0962 
0963     private long adjustTimeout(long timeout, long startTime)
0964     {
0965         long now = System.currentTimeMillis();
0966         timeout -= now - startTime;
0967         if (timeout < 0)
0968         {
0969             timeout = 0;
0970         }
0971 
0972         return timeout;
0973     }
0974 
0975     /**
0976      * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
0977      * mark objects "visible" in userland as closed after failover or other significant event that impacts the
0978      * connection. <p/> The caller must hold the failover mutex before calling this method.
0979      */
0980     private void markAllSessionsClosed()
0981     {
0982         final LinkedList sessionCopy = new LinkedList(_sessions.values());
0983         final Iterator it = sessionCopy.iterator();
0984         while (it.hasNext())
0985         {
0986             final AMQSession session = (AMQSessionit.next();
0987 
0988             session.markClosed();
0989         }
0990 
0991         _sessions.clear();
0992     }
0993 
0994     /**
0995      * Close all the sessions, either due to normal connection closure or due to an error occurring.
0996      *
0997      @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex
0998      *              before calling this method.
0999      */
1000     private void closeAllSessions(Throwable cause, long timeout, long starttimethrows JMSException
1001     {
1002         final LinkedList sessionCopy = new LinkedList(_sessions.values());
1003         final Iterator it = sessionCopy.iterator();
1004         JMSException sessionException = null;
1005         while (it.hasNext())
1006         {
1007             final AMQSession session = (AMQSessionit.next();
1008             if (cause != null)
1009             {
1010                 session.closed(cause);
1011             }
1012             else
1013             {
1014                 try
1015                 {
1016                     if (starttime != -1)
1017                     {
1018                         timeout = adjustTimeout(timeout, starttime);
1019                     }
1020 
1021                     session.close(timeout);
1022                 }
1023                 catch (JMSException e)
1024                 {
1025                     _logger.error("Error closing session: " + e);
1026                     sessionException = e;
1027                 }
1028             }
1029         }
1030 
1031         _sessions.clear();
1032         if (sessionException != null)
1033         {
1034             throw sessionException;
1035         }
1036     }
1037 
1038     public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
1039                                                        ServerSessionPool sessionPool, int maxMessagesthrows JMSException
1040     {
1041         checkNotClosed();
1042 
1043         return null;
1044     }
1045 
1046     public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool,
1047                                                        int maxMessagesthrows JMSException
1048     {
1049         checkNotClosed();
1050 
1051         return null;
1052     }
1053 
1054     public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool,
1055                                                        int maxMessagesthrows JMSException
1056     {
1057         checkNotClosed();
1058 
1059         return null;
1060     }
1061 
1062     public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector,
1063                                                               ServerSessionPool sessionPool, int maxMessagesthrows JMSException
1064     {
1065         // TODO Auto-generated method stub
1066         checkNotClosed();
1067 
1068         return null;
1069     }
1070 
1071     public long getMaximumChannelCount() throws JMSException
1072     {
1073         checkNotClosed();
1074 
1075         return _maximumChannelCount;
1076     }
1077 
1078     public void setConnectionListener(ConnectionListener listener)
1079     {
1080         _connectionListener = listener;
1081     }
1082 
1083     public ConnectionListener getConnectionListener()
1084     {
1085         return _connectionListener;
1086     }
1087 
1088     public void setMaximumChannelCount(long maximumChannelCount)
1089     {
1090         _maximumChannelCount = maximumChannelCount;
1091     }
1092 
1093     public void setMaximumFrameSize(long frameMax)
1094     {
1095         _maximumFrameSize = frameMax;
1096     }
1097 
1098     public long getMaximumFrameSize()
1099     {
1100         return _maximumFrameSize;
1101     }
1102 
1103     public ChannelToSessionMap getSessions()
1104     {
1105         return _sessions;
1106     }
1107 
1108     public String getUsername()
1109     {
1110         return _username;
1111     }
1112 
1113     public String getPassword()
1114     {
1115         return _password;
1116     }
1117 
1118     public String getVirtualHost()
1119     {
1120         return _virtualHost;
1121     }
1122 
1123     public AMQProtocolHandler getProtocolHandler()
1124     {
1125         return _protocolHandler;
1126     }
1127 
1128     public boolean started()
1129     {
1130         return _started;
1131     }
1132 
1133     public void bytesSent(long writtenBytes)
1134     {
1135         if (_connectionListener != null)
1136         {
1137             _connectionListener.bytesSent(writtenBytes);
1138         }
1139     }
1140 
1141     public void bytesReceived(long receivedBytes)
1142     {
1143         if (_connectionListener != null)
1144         {
1145             _connectionListener.bytesReceived(receivedBytes);
1146         }
1147     }
1148 
1149     /**
1150      * Fire the preFailover event to the registered connection listener (if any)
1151      *
1152      @param redirect true if this is the result of a redirect request rather than a connection error
1153      *
1154      @return true if no listener or listener does not veto change
1155      */
1156     public boolean firePreFailover(boolean redirect)
1157     {
1158         boolean proceed = true;
1159         if (_connectionListener != null)
1160         {
1161             proceed = _connectionListener.preFailover(redirect);
1162         }
1163 
1164         return proceed;
1165     }
1166 
1167     /**
1168      * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes
1169      * resubscription then all the sessions are closed.
1170      *
1171      @return true if no listener or listener does not veto resubscription.
1172      *
1173      @throws JMSException
1174      */
1175     public boolean firePreResubscribe() throws JMSException
1176     {
1177         if (_connectionListener != null)
1178         {
1179             boolean resubscribe = _connectionListener.preResubscribe();
1180             if (!resubscribe)
1181             {
1182                 markAllSessionsClosed();
1183             }
1184 
1185             return resubscribe;
1186         }
1187         else
1188         {
1189             return true;
1190         }
1191     }
1192 
1193     /** Fires a failover complete event to the registered connection listener (if any). */
1194     public void fireFailoverComplete()
1195     {
1196         if (_connectionListener != null)
1197         {
1198             _connectionListener.failoverComplete();
1199         }
1200     }
1201 
1202     /**
1203      * In order to protect the consistency of the connection and its child sessions, consumers and producers, the
1204      * "failover mutex" must be held when doing any operations that could be corrupted during failover.
1205      *
1206      @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
1207      */
1208     public final Object getFailoverMutex()
1209     {
1210         return _failoverMutex;
1211     }
1212 
1213     public void failoverPrep()
1214     {
1215         _delegate.failoverPrep();
1216     }
1217 
1218     public void resubscribeSessions() throws JMSException, AMQException, FailoverException
1219     {
1220         _delegate.resubscribeSessions();
1221     }
1222 
1223     /**
1224      * If failover is taking place this will block until it has completed. If failover is not taking place it will
1225      * return immediately.
1226      *
1227      @throws InterruptedException
1228      */
1229     public void blockUntilNotFailingOver() throws InterruptedException
1230     {
1231         _protocolHandler.blockUntilNotFailingOver();
1232     }
1233 
1234     /**
1235      * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
1236      * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
1237      * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
1238      *
1239      @param cause the exception
1240      */
1241     public void exceptionReceived(Throwable cause)
1242     {
1243 
1244         if (_logger.isDebugEnabled())
1245         {
1246             _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);
1247         }
1248 
1249         final JMSException je;
1250         if (cause instanceof JMSException)
1251         {
1252             je = (JMSExceptioncause;
1253         }
1254         else
1255         {
1256             AMQConstant code = null;
1257 
1258             if (cause instanceof AMQException)
1259             {
1260                 code = ((AMQExceptioncause).getErrorCode();
1261             }
1262 
1263             if (code != null)
1264             {
1265                 je =
1266                         new JMSException(Integer.toString(code.getCode())"Exception thrown against " + toString() ": "
1267                                                                            + cause);
1268             }
1269             else
1270             {
1271                 //Should never get here as all AMQEs are required to have an ErrorCode!
1272                 je = new JMSException("Exception thrown against " + toString() ": " + cause);
1273             }
1274 
1275             if (cause instanceof Exception)
1276             {
1277                 je.setLinkedException((Exceptioncause);
1278             }
1279         }
1280 
1281         boolean closer = false;
1282 
1283         // in the case of an IOException, MINA has closed the protocol session so we set _closed to true
1284         // so that any generic client code that tries to close the connection will not mess up this error
1285         // handling sequence
1286         if (cause instanceof IOException)
1287         {
1288             closer = !_closed.getAndSet(true);
1289 
1290             _protocolHandler.getProtocolSession().notifyError(je);
1291         }
1292 
1293         if (_exceptionListener != null)
1294         {
1295             _exceptionListener.onException(je);
1296         }
1297         else
1298         {
1299             _logger.error("Throwable Received but no listener set: " + cause.getMessage());
1300         }
1301 
1302         if (hardError(cause))
1303         {
1304             try
1305             {
1306                 if (_logger.isInfoEnabled())
1307                 {
1308                     _logger.info("Closing AMQConnection due to :" + cause.getMessage());
1309                 }
1310 
1311                 closer = (!_closed.getAndSet(true)) || closer;
1312                 if (closer)
1313                 {
1314                     closeAllSessions(cause, -1, -1)// FIXME: when doing this end up with RejectedExecutionException from executor.
1315                 }
1316             }
1317             catch (JMSException e)
1318             {
1319                 _logger.error("Error closing all sessions: " + e, e);
1320             }
1321 
1322         }
1323         else
1324         {
1325             _logger.info("Not a hard-error connection not closing: " + cause.getMessage());
1326         }
1327     }
1328 
1329     private boolean hardError(Throwable cause)
1330     {
1331         if (cause instanceof AMQException)
1332         {
1333             return ((AMQExceptioncause).isHardError();
1334         }
1335 
1336         return true;
1337     }
1338 
1339     void registerSession(int channelId, AMQSession session)
1340     {
1341         _sessions.put(channelId, session);
1342     }
1343 
1344     void deregisterSession(int channelId)
1345     {
1346         _sessions.remove(channelId);
1347     }
1348 
1349     public String toString()
1350     {
1351         StringBuffer buf = new StringBuffer("AMQConnection:\n");
1352         if (_failoverPolicy.getCurrentBrokerDetails() == null)
1353         {
1354             buf.append("No active broker connection");
1355         }
1356         else
1357         {
1358             BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails();
1359             buf.append("Host: ").append(String.valueOf(bd.getHost()));
1360             buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
1361         }
1362 
1363         buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost));
1364         buf.append("\nClient ID: ").append(String.valueOf(_clientName));
1365         buf.append("\nActive session count: ").append((_sessions == null: _sessions.size());
1366 
1367         return buf.toString();
1368     }
1369 
1370     public String toURL()
1371     {
1372         return _connectionURL.toString();
1373     }
1374 
1375     public Reference getReference() throws NamingException
1376     {
1377         return new Reference(AMQConnection.class.getName()new StringRefAddr(AMQConnection.class.getName(), toURL()),
1378                              AMQConnectionFactory.class.getName()null)// factory location
1379     }
1380 
1381     public SSLConfiguration getSSLConfiguration()
1382     {
1383         return _sslConfiguration;
1384     }
1385 
1386     public AMQShortString getDefaultTopicExchangeName()
1387     {
1388         return _defaultTopicExchangeName;
1389     }
1390 
1391     public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName)
1392     {
1393         _defaultTopicExchangeName = defaultTopicExchangeName;
1394     }
1395 
1396     public AMQShortString getDefaultQueueExchangeName()
1397     {
1398         return _defaultQueueExchangeName;
1399     }
1400 
1401     public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName)
1402     {
1403         _defaultQueueExchangeName = defaultQueueExchangeName;
1404     }
1405 
1406     public AMQShortString getTemporaryTopicExchangeName()
1407     {
1408         return _temporaryTopicExchangeName;
1409     }
1410 
1411     public AMQShortString getTemporaryQueueExchangeName()
1412     {
1413         return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates.
1414     }
1415 
1416     public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName)
1417     {
1418         _temporaryTopicExchangeName = temporaryTopicExchangeName;
1419     }
1420 
1421     public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName)
1422     {
1423         _temporaryQueueExchangeName = temporaryQueueExchangeName;
1424     }
1425 
1426     public void performConnectionTask(Runnable task)
1427     {
1428         _taskPool.execute(task);
1429     }
1430 
1431     public AMQSession getSession(int channelId)
1432     {
1433         return _sessions.get(channelId);
1434     }
1435 
1436     public ProtocolVersion getProtocolVersion()
1437     {
1438         return _protocolVersion;
1439     }
1440 
1441     public void setProtocolVersion(ProtocolVersion protocolVersion)
1442     {
1443         _protocolVersion = protocolVersion;
1444         _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion);
1445     }
1446 
1447     public boolean isFailingOver()
1448     {
1449         return (_protocolHandler.getFailoverLatch() != null);
1450     }
1451 
1452     /**
1453      * Get the maximum number of messages that this connection can pre-fetch.
1454      *
1455      @return The maximum number of messages that this connection can pre-fetch.
1456      */
1457     public long getMaxPrefetch()
1458     {
1459         return _maxPrefetch;
1460     }
1461 
1462     /**
1463      * Indicates whether persistent messages are synchronized
1464      *
1465      @return true if persistent messages are synchronized false otherwise
1466      */
1467     public boolean getSyncPersistence()
1468     {
1469         return _syncPersistence;
1470     }
1471     
1472     public void setIdleTimeout(long l)
1473     {
1474         _delegate.setIdleTimeout(l);
1475     }
1476 }