AMQProtocolHandler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client.protocol;
022 
023 import org.apache.mina.common.IdleStatus;
024 import org.apache.mina.common.IoFilterChain;
025 import org.apache.mina.common.IoHandlerAdapter;
026 import org.apache.mina.common.IoSession;
027 import org.apache.mina.filter.ReadThrottleFilterBuilder;
028 import org.apache.mina.filter.SSLFilter;
029 import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
030 import org.apache.mina.filter.codec.ProtocolCodecException;
031 import org.apache.mina.filter.codec.ProtocolCodecFilter;
032 import org.apache.mina.filter.executor.ExecutorFilter;
033 import org.apache.qpid.AMQConnectionClosedException;
034 import org.apache.qpid.AMQDisconnectedException;
035 import org.apache.qpid.AMQException;
036 import org.apache.qpid.AMQTimeoutException;
037 import org.apache.qpid.client.AMQConnection;
038 import org.apache.qpid.client.AMQSession;
039 import org.apache.qpid.client.SSLConfiguration;
040 import org.apache.qpid.client.configuration.ClientProperties;
041 import org.apache.qpid.client.failover.FailoverException;
042 import org.apache.qpid.client.failover.FailoverHandler;
043 import org.apache.qpid.client.failover.FailoverState;
044 import org.apache.qpid.client.state.AMQState;
045 import org.apache.qpid.client.state.AMQStateManager;
046 import org.apache.qpid.client.state.StateWaiter;
047 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
048 import org.apache.qpid.codec.AMQCodecFactory;
049 import org.apache.qpid.framing.*;
050 import org.apache.qpid.jms.BrokerDetails;
051 import org.apache.qpid.pool.ReadWriteThreadModel;
052 import org.apache.qpid.protocol.AMQConstant;
053 import org.apache.qpid.protocol.AMQMethodEvent;
054 import org.apache.qpid.protocol.AMQMethodListener;
055 import org.apache.qpid.ssl.SSLContextFactory;
056 import org.apache.qpid.transport.network.io.IoTransport;
057 import org.slf4j.Logger;
058 import org.slf4j.LoggerFactory;
059 
060 import java.io.IOException;
061 import java.util.Iterator;
062 import java.util.Set;
063 import java.util.concurrent.CopyOnWriteArraySet;
064 import java.util.concurrent.CountDownLatch;
065 
066 /**
067  * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
068  * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
069  * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the
070  * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP,
071  * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in
072  * terms of "message received" and so on.
073  *
074  <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
075  * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public
076  * API calls through which an individual connection can be manipulated. This protocol handler talks to the network
077  * through MINA, in a behind the scenes role; it is not an exposed part of the client API.
078  *
079  <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
080  * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in
081  * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol
082  * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions
083  * in the event of failover. See below for more information about this.
084  *
085  <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
086  * attributes. A more convenient, type-safe, container for session data is provided in the form of
087  {@link AMQProtocolSession}.
088  *
089  <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
090  * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper
091  * as described above). This event handler is different, because dealing with failover complicates things. To the
092  * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but
093  * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot
094  * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old
095  * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
096  * and the protocol session data is held outside of the MINA IOSession.
097  *
098  <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through.
099  * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from
100  * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work,
101  * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
102  *
103  <p/><table id="crc"><caption>CRC Card</caption>
104  <tr><th> Responsibilities <th> Collaborations
105  <tr><td> Create the filter chain to filter this handlers events.
106  <td> {@link ProtocolCodecFilter}{@link SSLContextFactory}{@link SSLFilter}{@link ReadWriteThreadModel}.
107  *
108  <tr><td> Maintain fail-over state.
109  <tr><td>
110  </table>
111  *
112  * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
113  * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
114  * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
115  * filter before it mean not doing the read/write asynchronously but in the main filter thread?
116  * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
117  * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
118  * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
119  * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
120  * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
121  * that lifecycles of the fields match lifecycles of their containing objects.
122  */
123 public class AMQProtocolHandler extends IoHandlerAdapter
124 {
125     /** Used for debugging. */
126     private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
127     private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol");
128     private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level"!= null);
129 
130     /**
131      * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
132      * instances and protocol handler instances.
133      */
134     private AMQConnection _connection;
135 
136     /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
137     private volatile AMQProtocolSession _protocolSession;
138 
139     /** Holds the state of the protocol session. */
140     private AMQStateManager _stateManager = new AMQStateManager();
141 
142     /** Holds the method listeners, */
143     private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
144 
145     /**
146      * We create the failover handler when the session is created since it needs a reference to the IoSession in order
147      * to be able to send errors during failover back to the client application. The session won't be available in the
148      * case where we failing over due to a Connection.Redirect message from the broker.
149      */
150     private FailoverHandler _failoverHandler;
151 
152     /**
153      * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
154      * attempting failover where it is failing.
155      */
156     private FailoverState _failoverState = FailoverState.NOT_STARTED;
157 
158     /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
159     private CountDownLatch _failoverLatch;
160 
161     /** The last failover exception that occured */
162     private FailoverException _lastFailoverException;
163 
164     /** Defines the default timeout to use for synchronous protocol commands. */
165     private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout"1000 30);
166 
167     /** Object to lock on when changing the latch */
168     private Object _failoverLatchChange = new Object();
169 
170     /**
171      * Creates a new protocol handler, associated with the specified client connection instance.
172      *
173      @param con The client connection that this is the event handler for.
174      */
175     public AMQProtocolHandler(AMQConnection con)
176     {
177         _connection = con;
178     }
179 
180     /**
181      * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
182      * session, which filters the events handled by this handler. The filter chain consists of, handing off events
183      * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
184      *
185      @param session The MINA session.
186      *
187      @throws Exception Any underlying exceptions are allowed to fall through to MINA.
188      */
189     public void sessionCreated(IoSession sessionthrows Exception
190     {
191         _logger.debug("Protocol session created for session " + System.identityHashCode(session));
192         _failoverHandler = new FailoverHandler(this, session);
193 
194         final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
195 
196         if (Boolean.getBoolean("amqj.shared_read_write_pool"))
197         {
198             session.getFilterChain().addBefore("AsynchronousWriteFilter""protocolFilter", pcf);
199         }
200         else
201         {
202             session.getFilterChain().addLast("protocolFilter", pcf);
203         }
204         // we only add the SSL filter where we have an SSL connection
205         if (_connection.getSSLConfiguration() != null)
206         {
207             SSLConfiguration sslConfig = _connection.getSSLConfiguration();
208             SSLContextFactory sslFactory =
209                     new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
210             SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
211             sslFilter.setUseClientMode(true);
212             session.getFilterChain().addBefore("protocolFilter""ssl", sslFilter);
213         }
214 
215         try
216         {
217             ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
218             threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
219             threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
220         }
221         catch (RuntimeException e)
222         {
223             _logger.error(e.getMessage(), e);
224         }
225 
226         if (Boolean.getBoolean(ClientProperties.PROTECTIO_PROP_NAME))
227         {
228             try
229             {
230                 //Add IO Protection Filters
231                 IoFilterChain chain = session.getFilterChain();
232 
233                 session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder"new ExecutorFilter());
234 
235                 ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
236                 readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
237                         ClientProperties.READ_BUFFER_LIMIT_PROP_NAME, ClientProperties.READ_BUFFER_LIMIT_DEFAULT)));
238                 readfilter.attach(chain);
239 
240                 WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
241                 writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
242                         ClientProperties.WRITE_BUFFER_LIMIT_PROP_NAME, ClientProperties.WRITE_BUFFER_LIMIT_DEFAULT)));
243                 writefilter.attach(chain);
244                 session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
245 
246                 _logger.info("Using IO Read/Write Filter Protection");
247             }
248             catch (Exception e)
249             {
250                 _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
251             }
252         }
253         _protocolSession = new AMQProtocolSession(this, session, _connection);
254 
255         _stateManager.setProtocolSession(_protocolSession);
256 
257         _protocolSession.init();
258     }
259 
260     /**
261      * Called when we want to create a new IoTransport session
262      @param brokerDetail 
263      */
264     public void createIoTransportSession(BrokerDetails brokerDetail)
265     {
266         _protocolSession = new AMQProtocolSession(this, _connection);
267         _stateManager.setProtocolSession(_protocolSession);
268         IoTransport.connect_0_9(getProtocolSession(),
269                                 brokerDetail.getHost(),
270                                 brokerDetail.getPort(),
271                                 brokerDetail.useSSL());
272         _protocolSession.init();
273     }
274     
275     /**
276      * Called when the network connection is closed. This can happen, either because the client explicitly requested
277      * that the connection be closed, in which case nothing is done, or because the connection died. In the case
278      * where the connection died, an attempt to failover automatically to a new connection may be started. The failover
279      * process will be started, provided that it is the clients policy to allow failover, and provided that a failover
280      * has not already been started or failed.
281      *
282      <p/>It is important to note that when the connection dies this method may be called or {@link #exceptionCaught}
283      * may be called first followed by this method. This depends on whether the client was trying to send data at the
284      * time of the failure.
285      *
286      @param session The MINA session.
287      *
288      * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
289      * not otherwise? The above comment doesn't make that clear.
290      */
291     public void sessionClosed(IoSession session)
292     {
293         if (_connection.isClosed())
294         {
295             _logger.debug("Session closed called by client");
296         }
297         else
298         {
299             _logger.debug("Session closed called with failover state currently " + _failoverState);
300 
301             // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
302             // known through the policy settings.
303 
304             if ((_failoverState != FailoverState.IN_PROGRESS&& _connection.failoverAllowed())
305             {
306                 _logger.debug("FAILOVER STARTING");
307                 if (_failoverState == FailoverState.NOT_STARTED)
308                 {
309                     _failoverState = FailoverState.IN_PROGRESS;
310                     startFailoverThread();
311                 }
312                 else
313                 {
314                     _logger.debug("Not starting failover as state currently " + _failoverState);
315                 }
316             }
317             else
318             {
319                 _logger.debug("Failover not allowed by policy.")// or already in progress?
320 
321                 if (_logger.isDebugEnabled())
322                 {
323                     _logger.debug(_connection.getFailoverPolicy().toString());
324                 }
325 
326                 if (_failoverState != FailoverState.IN_PROGRESS)
327                 {
328                     _logger.debug("sessionClose() not allowed to failover");
329                     _connection.exceptionReceived(new AMQDisconnectedException(
330                             "Server closed connection and reconnection " "not permitted."null));
331                 }
332                 else
333                 {
334                     _logger.debug("sessionClose() failover in progress");
335                 }
336             }
337         }
338 
339         _logger.debug("Protocol Session [" this "] closed");
340     }
341 
342     /** See {@link FailoverHandler} to see rationale for separate thread. */
343     private void startFailoverThread()
344     {
345         Thread failoverThread = new Thread(_failoverHandler);
346         failoverThread.setName("Failover");
347         // Do not inherit daemon-ness from current thread as this can be a daemon
348         // thread such as a AnonymousIoService thread.
349         failoverThread.setDaemon(false);
350         failoverThread.start();
351     }
352 
353     public void sessionIdle(IoSession session, IdleStatus statusthrows Exception
354     {
355         _logger.debug("Protocol Session [" this ":" + session + "] idle: " + status);
356         if (IdleStatus.WRITER_IDLE.equals(status))
357         {
358             // write heartbeat frame:
359             _logger.debug("Sent heartbeat");
360             session.write(HeartbeatBody.FRAME);
361             HeartbeatDiagnostics.sent();
362         }
363         else if (IdleStatus.READER_IDLE.equals(status))
364         {
365             // failover:
366             HeartbeatDiagnostics.timeout();
367             _logger.warn("Timed out while waiting for heartbeat from peer.");
368             session.close();
369         }
370     }
371 
372     /**
373      * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an
374      * IOException, MINA will close the connection automatically.
375      *
376      @param session The MINA session.
377      @param cause   The exception that triggered this event.
378      */
379     public void exceptionCaught(IoSession session, Throwable cause)
380     {
381         if (_failoverState == FailoverState.NOT_STARTED)
382         {
383             // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
384             if ((cause instanceof AMQConnectionClosedException|| cause instanceof IOException)
385             {
386                 _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
387                 // this will attemp failover
388 
389                 sessionClosed(session);
390             }
391             else
392             {
393 
394                 if (cause instanceof ProtocolCodecException)
395                 {
396                     _logger.info("Protocol Exception caught NOT going to attempt failover as " +
397                                  "cause isn't AMQConnectionClosedException: " + cause, cause);
398 
399                     AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
400                     propagateExceptionToAllWaiters(amqe);
401                 }
402                 _connection.exceptionReceived(cause);
403 
404             }
405 
406             // FIXME Need to correctly handle other exceptions. Things like ...
407             // if (cause instanceof AMQChannelClosedException)
408             // which will cause the JMSSession to end due to a channel close and so that Session needs
409             // to be removed from the map so we can correctly still call close without an exception when trying to close
410             // the server closed session.  See also CloseChannelMethodHandler as the sessionClose is never called on exception
411         }
412         // we reach this point if failover was attempted and failed therefore we need to let the calling app
413         // know since we cannot recover the situation
414         else if (_failoverState == FailoverState.FAILED)
415         {
416             _logger.error("Exception caught by protocol handler: " + cause, cause);
417 
418             // we notify the state manager of the error in case we have any clients waiting on a state
419             // change. Those "waiters" will be interrupted and can handle the exception
420             AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
421             propagateExceptionToAllWaiters(amqe);
422             _connection.exceptionReceived(cause);
423         }
424     }
425 
426     /**
427      * There are two cases where we have other threads potentially blocking for events to be handled by this class.
428      * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
429      * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
430      *
431      * This should be called only when the exception is fatal for the connection.
432      *
433      @param e the exception to propagate
434      *
435      @see #propagateExceptionToFrameListeners
436      @see #propagateExceptionToStateWaiters
437      */
438     public void propagateExceptionToAllWaiters(Exception e)
439     {
440         propagateExceptionToFrameListeners(e);
441         propagateExceptionToStateWaiters(e);
442     }
443 
444     /**
445      * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any
446      * protocol level waits.
447      *
448      * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should
449      * stop waiting and relinquish the Failover lock {@see FailoverHandler}.
450      *
451      * Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt
452      * their protocol request and so listen again for the correct frame.
453      *
454      @param e the exception to propagate
455      */
456     public void propagateExceptionToFrameListeners(Exception e)
457     {
458         synchronized (_frameListeners)
459         {
460             if (!_frameListeners.isEmpty())
461             {
462                 final Iterator it = _frameListeners.iterator();
463                 while (it.hasNext())
464                 {
465                     final AMQMethodListener ml = (AMQMethodListenerit.next();
466                     ml.error(e);
467                 }
468             }
469         }
470     }
471 
472     /**
473      * This caters for the case where we only need to propogate an exception to the the state manager to interupt any
474      * thing waiting for a state change.
475      *
476      * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement.
477      *
478      * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal
479      * cases {@link #propagateExceptionToAllWaiters} would be the correct choice.
480      *
481      @param e the exception to propagate
482      */
483     public void propagateExceptionToStateWaiters(Exception e)
484     {
485         getStateManager().error(e);
486     }
487 
488     public void notifyFailoverStarting()
489     {
490         // Set the last exception in the sync block to ensure the ordering with add.
491         // either this gets done and the add does the ml.error
492         // or the add completes first and the iterator below will do ml.error
493         synchronized (_frameListeners)
494         {
495             _lastFailoverException = new FailoverException("Failing over about to start");
496         }
497 
498         //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be
499         // interupted unless failover cannot restore the state.
500         propagateExceptionToFrameListeners(_lastFailoverException);
501     }
502 
503     public void failoverInProgress()
504     {
505         _lastFailoverException = null;
506     }
507 
508     private static int _messageReceivedCount;
509 
510     public void messageReceived(IoSession session, Object messagethrows Exception
511     {
512         if (PROTOCOL_DEBUG)
513         {
514             _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
515         }
516 
517         if(message instanceof AMQFrame)
518         {
519             final boolean debug = _logger.isDebugEnabled();
520             final long msgNumber = ++_messageReceivedCount;
521 
522             if (debug && ((msgNumber % 1000== 0))
523             {
524                 _logger.debug("Received " + _messageReceivedCount + " protocol messages");
525             }
526 
527             AMQFrame frame = (AMQFramemessage;
528 
529             final AMQBody bodyFrame = frame.getBodyFrame();
530 
531             HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
532 
533             bodyFrame.handle(frame.getChannel(), _protocolSession);
534 
535             _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
536         }
537         else if (message instanceof ProtocolInitiation)
538         {
539             // We get here if the server sends a response to our initial protocol header
540             // suggesting an alternate ProtocolVersion; the server will then close the
541             // connection.
542             ProtocolInitiation protocolInit = (ProtocolInitiationmessage;
543             ProtocolVersion pv = protocolInit.checkVersion();
544             getConnection().setProtocolVersion(pv);
545 
546             // get round a bug in old versions of qpid whereby the connection is not closed
547             _stateManager.changeState(AMQState.CONNECTION_CLOSED);
548         }
549     }
550 
551     public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
552             throws AMQException
553     {
554 
555         if (_logger.isDebugEnabled())
556         {
557             _logger.debug("(" + System.identityHashCode(this")Method frame received: " + bodyFrame);
558         }
559 
560         final AMQMethodEvent<AMQMethodBody> evt =
561                 new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBodybodyFrame);
562 
563         try
564         {
565 
566             boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
567             synchronized (_frameListeners)
568             {
569                 if (!_frameListeners.isEmpty())
570                 {
571                     //This iterator is safe from the error state as the frame listeners always add before they send so their
572                     // will be ready and waiting for this response.
573                     Iterator it = _frameListeners.iterator();
574                     while (it.hasNext())
575                     {
576                         final AMQMethodListener listener = (AMQMethodListenerit.next();
577                         wasAnyoneInterested = listener.methodReceived(evt|| wasAnyoneInterested;
578                     }
579                 }
580             }
581             if (!wasAnyoneInterested)
582             {
583                 throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"
584                                              + _frameListeners, null);
585             }
586         }
587         catch (AMQException e)
588         {
589             propagateExceptionToFrameListeners(e);
590 
591             exceptionCaught(session, e);
592         }
593 
594     }
595 
596     private static int _messagesOut;
597 
598     public void messageSent(IoSession session, Object messagethrows Exception
599     {
600         if (PROTOCOL_DEBUG)
601         {
602             _protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
603         }
604         
605         final long sentMessages = _messagesOut++;
606 
607         final boolean debug = _logger.isDebugEnabled();
608 
609         if (debug && ((sentMessages % 1000== 0))
610         {
611             _logger.debug("Sent " + _messagesOut + " protocol messages");
612         }
613 
614         _connection.bytesSent(session.getWrittenBytes());
615     }
616 
617     public StateWaiter createWaiter(Set<AMQState> statesthrows AMQException
618     {
619         return getStateManager().createWaiter(states);
620     }
621 
622     /**
623      * Convenience method that writes a frame to the protocol session. Equivalent to calling
624      * getProtocolSession().write().
625      *
626      @param frame the frame to write
627      */
628     public void writeFrame(AMQDataBlock frame)
629     {
630         _protocolSession.writeFrame(frame);
631     }
632 
633     public void writeFrame(AMQDataBlock frame, boolean wait)
634     {
635         _protocolSession.writeFrame(frame, wait);
636     }
637 
638     /**
639      * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
640      * calling getProtocolSession().write() then waiting for the response.
641      *
642      @param frame
643      @param listener the blocking listener. Note the calling thread will block.
644      */
645     public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener)
646             throws AMQException, FailoverException
647     {
648         return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
649     }
650 
651     /**
652      * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
653      * calling getProtocolSession().write() then waiting for the response.
654      *
655      @param frame
656      @param listener the blocking listener. Note the calling thread will block.
657      */
658     public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener,
659                                                            long timeoutthrows AMQException, FailoverException
660     {
661         try
662         {
663             synchronized (_frameListeners)
664             {
665                 if (_lastFailoverException != null)
666                 {
667                     throw _lastFailoverException;
668                 }
669 
670                 if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED)
671                 {
672                     Exception e = _stateManager.getLastException();
673                     if (e != null)
674                     {
675                         if (instanceof AMQException)
676                         {
677                             AMQException amqe = (AMQExceptione;
678 
679                             throw amqe.cloneForCurrentThread();
680                         }
681                         else
682                         {
683                             throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
684                         }
685                     }
686                 }
687 
688                 _frameListeners.add(listener);
689                 //FIXME: At this point here we should check or before add we should check _stateManager is in an open
690                 // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 
691             }
692             _protocolSession.writeFrame(frame);
693 
694             return listener.blockForFrame(timeout);
695             // When control resumes before this line, a reply will have been received
696             // that matches the criteria defined in the blocking listener
697         }
698         finally
699         {
700             // If we don't removeKey the listener then no-one will
701             _frameListeners.remove(listener);
702         }
703 
704     }
705 
706     /** More convenient method to write a frame and wait for it's response. */
707     public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClassthrows AMQException, FailoverException
708     {
709         return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
710     }
711 
712     /** More convenient method to write a frame and wait for it's response. */
713     public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeoutthrows AMQException, FailoverException
714     {
715         return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass),
716                                                 timeout);
717     }
718 
719     public void closeSession(AMQSession sessionthrows AMQException
720     {
721         _protocolSession.closeSession(session);
722     }
723 
724     /**
725      * Closes the connection.
726      *
727      <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
728      * anyway.
729      *
730      @param timeout The timeout to wait for an acknowledgement to the close request.
731      *
732      @throws AMQException If the close fails for any reason.
733      */
734     public void closeConnection(long timeoutthrows AMQException
735     {
736         getStateManager().changeState(AMQState.CONNECTION_CLOSING);
737 
738         ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode()// replyCode
739                                                                                                   new AMQShortString("JMS client is closing the connection.")00);
740 
741         final AMQFrame frame = body.generateFrame(0);
742 
743         try
744         {
745             syncWrite(frame, ConnectionCloseOkBody.class, timeout);
746             _protocolSession.closeProtocolSession();
747         }
748         catch (AMQTimeoutException e)
749         {
750             _protocolSession.closeProtocolSession(false);
751         }
752         catch (FailoverException e)
753         {
754             _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
755         }
756     }
757 
758     /** @return the number of bytes read from this protocol session */
759     public long getReadBytes()
760     {
761         return _protocolSession.getIoSession().getReadBytes();
762     }
763 
764     /** @return the number of bytes written to this protocol session */
765     public long getWrittenBytes()
766     {
767         return _protocolSession.getIoSession().getWrittenBytes();
768     }
769 
770     public void failover(String host, int port)
771     {
772         _failoverHandler.setHost(host);
773         _failoverHandler.setPort(port);
774         // see javadoc for FailoverHandler to see rationale for separate thread
775         startFailoverThread();
776     }
777 
778     public void blockUntilNotFailingOver() throws InterruptedException
779     {
780         synchronized(_failoverLatchChange)
781         {
782             if (_failoverLatch != null)
783             {
784                 _failoverLatch.await();
785             }
786         }
787     }
788 
789     public AMQShortString generateQueueName()
790     {
791         return _protocolSession.generateQueueName();
792     }
793 
794     public CountDownLatch getFailoverLatch()
795     {
796         return _failoverLatch;
797     }
798 
799     public void setFailoverLatch(CountDownLatch failoverLatch)
800     {
801         synchronized (_failoverLatchChange)
802         {
803             _failoverLatch = failoverLatch;
804         }
805     }
806 
807     public AMQConnection getConnection()
808     {
809         return _connection;
810     }
811 
812     public AMQStateManager getStateManager()
813     {
814         return _stateManager;
815     }
816 
817     public void setStateManager(AMQStateManager stateManager)
818     {
819         _stateManager = stateManager;
820     }
821 
822     public AMQProtocolSession getProtocolSession()
823     {
824         return _protocolSession;
825     }
826 
827     FailoverState getFailoverState()
828     {
829         return _failoverState;
830     }
831 
832     public void setFailoverState(FailoverState failoverState)
833     {
834         _failoverState = failoverState;
835     }
836 
837     public byte getProtocolMajorVersion()
838     {
839         return _protocolSession.getProtocolMajorVersion();
840     }
841 
842     public byte getProtocolMinorVersion()
843     {
844         return _protocolSession.getProtocolMinorVersion();
845     }
846 
847     public MethodRegistry getMethodRegistry()
848     {
849         return _protocolSession.getMethodRegistry();
850     }
851 
852     public ProtocolVersion getProtocolVersion()
853     {
854         return _protocolSession.getProtocolVersion();
855     }
856 }