001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client.protocol;
022
023 import org.apache.mina.common.IdleStatus;
024 import org.apache.mina.common.IoFilterChain;
025 import org.apache.mina.common.IoHandlerAdapter;
026 import org.apache.mina.common.IoSession;
027 import org.apache.mina.filter.ReadThrottleFilterBuilder;
028 import org.apache.mina.filter.SSLFilter;
029 import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
030 import org.apache.mina.filter.codec.ProtocolCodecException;
031 import org.apache.mina.filter.codec.ProtocolCodecFilter;
032 import org.apache.mina.filter.executor.ExecutorFilter;
033 import org.apache.qpid.AMQConnectionClosedException;
034 import org.apache.qpid.AMQDisconnectedException;
035 import org.apache.qpid.AMQException;
036 import org.apache.qpid.AMQTimeoutException;
037 import org.apache.qpid.client.AMQConnection;
038 import org.apache.qpid.client.AMQSession;
039 import org.apache.qpid.client.SSLConfiguration;
040 import org.apache.qpid.client.configuration.ClientProperties;
041 import org.apache.qpid.client.failover.FailoverException;
042 import org.apache.qpid.client.failover.FailoverHandler;
043 import org.apache.qpid.client.failover.FailoverState;
044 import org.apache.qpid.client.state.AMQState;
045 import org.apache.qpid.client.state.AMQStateManager;
046 import org.apache.qpid.client.state.StateWaiter;
047 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
048 import org.apache.qpid.codec.AMQCodecFactory;
049 import org.apache.qpid.framing.*;
050 import org.apache.qpid.jms.BrokerDetails;
051 import org.apache.qpid.pool.ReadWriteThreadModel;
052 import org.apache.qpid.protocol.AMQConstant;
053 import org.apache.qpid.protocol.AMQMethodEvent;
054 import org.apache.qpid.protocol.AMQMethodListener;
055 import org.apache.qpid.ssl.SSLContextFactory;
056 import org.apache.qpid.transport.network.io.IoTransport;
057 import org.slf4j.Logger;
058 import org.slf4j.LoggerFactory;
059
060 import java.io.IOException;
061 import java.util.Iterator;
062 import java.util.Set;
063 import java.util.concurrent.CopyOnWriteArraySet;
064 import java.util.concurrent.CountDownLatch;
065
066 /**
067 * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
068 * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
069 * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the
070 * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP,
071 * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in
072 * terms of "message received" and so on.
073 *
074 * <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
075 * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public
076 * API calls through which an individual connection can be manipulated. This protocol handler talks to the network
077 * through MINA, in a behind the scenes role; it is not an exposed part of the client API.
078 *
079 * <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
080 * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in
081 * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol
082 * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions
083 * in the event of failover. See below for more information about this.
084 *
085 * <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
086 * attributes. A more convenient, type-safe, container for session data is provided in the form of
087 * {@link AMQProtocolSession}.
088 *
089 * <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
090 * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper
091 * as described above). This event handler is different, because dealing with failover complicates things. To the
092 * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but
093 * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot
094 * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old
095 * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
096 * and the protocol session data is held outside of the MINA IOSession.
097 *
098 * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through.
099 * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from
100 * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work,
101 * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
102 *
103 * <p/><table id="crc"><caption>CRC Card</caption>
104 * <tr><th> Responsibilities <th> Collaborations
105 * <tr><td> Create the filter chain to filter this handlers events.
106 * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
107 *
108 * <tr><td> Maintain fail-over state.
109 * <tr><td>
110 * </table>
111 *
112 * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
113 * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
114 * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
115 * filter before it mean not doing the read/write asynchronously but in the main filter thread?
116 * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
117 * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
118 * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
119 * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
120 * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
121 * that lifecycles of the fields match lifecycles of their containing objects.
122 */
123 public class AMQProtocolHandler extends IoHandlerAdapter
124 {
125 /** Used for debugging. */
126 private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
127 private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol");
128 private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null);
129
130 /**
131 * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
132 * instances and protocol handler instances.
133 */
134 private AMQConnection _connection;
135
136 /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
137 private volatile AMQProtocolSession _protocolSession;
138
139 /** Holds the state of the protocol session. */
140 private AMQStateManager _stateManager = new AMQStateManager();
141
142 /** Holds the method listeners, */
143 private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
144
145 /**
146 * We create the failover handler when the session is created since it needs a reference to the IoSession in order
147 * to be able to send errors during failover back to the client application. The session won't be available in the
148 * case where we failing over due to a Connection.Redirect message from the broker.
149 */
150 private FailoverHandler _failoverHandler;
151
152 /**
153 * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
154 * attempting failover where it is failing.
155 */
156 private FailoverState _failoverState = FailoverState.NOT_STARTED;
157
158 /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
159 private CountDownLatch _failoverLatch;
160
161 /** The last failover exception that occured */
162 private FailoverException _lastFailoverException;
163
164 /** Defines the default timeout to use for synchronous protocol commands. */
165 private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
166
167 /** Object to lock on when changing the latch */
168 private Object _failoverLatchChange = new Object();
169
170 /**
171 * Creates a new protocol handler, associated with the specified client connection instance.
172 *
173 * @param con The client connection that this is the event handler for.
174 */
175 public AMQProtocolHandler(AMQConnection con)
176 {
177 _connection = con;
178 }
179
180 /**
181 * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
182 * session, which filters the events handled by this handler. The filter chain consists of, handing off events
183 * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
184 *
185 * @param session The MINA session.
186 *
187 * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
188 */
189 public void sessionCreated(IoSession session) throws Exception
190 {
191 _logger.debug("Protocol session created for session " + System.identityHashCode(session));
192 _failoverHandler = new FailoverHandler(this, session);
193
194 final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
195
196 if (Boolean.getBoolean("amqj.shared_read_write_pool"))
197 {
198 session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
199 }
200 else
201 {
202 session.getFilterChain().addLast("protocolFilter", pcf);
203 }
204 // we only add the SSL filter where we have an SSL connection
205 if (_connection.getSSLConfiguration() != null)
206 {
207 SSLConfiguration sslConfig = _connection.getSSLConfiguration();
208 SSLContextFactory sslFactory =
209 new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
210 SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
211 sslFilter.setUseClientMode(true);
212 session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
213 }
214
215 try
216 {
217 ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
218 threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
219 threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
220 }
221 catch (RuntimeException e)
222 {
223 _logger.error(e.getMessage(), e);
224 }
225
226 if (Boolean.getBoolean(ClientProperties.PROTECTIO_PROP_NAME))
227 {
228 try
229 {
230 //Add IO Protection Filters
231 IoFilterChain chain = session.getFilterChain();
232
233 session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
234
235 ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
236 readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
237 ClientProperties.READ_BUFFER_LIMIT_PROP_NAME, ClientProperties.READ_BUFFER_LIMIT_DEFAULT)));
238 readfilter.attach(chain);
239
240 WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
241 writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
242 ClientProperties.WRITE_BUFFER_LIMIT_PROP_NAME, ClientProperties.WRITE_BUFFER_LIMIT_DEFAULT)));
243 writefilter.attach(chain);
244 session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
245
246 _logger.info("Using IO Read/Write Filter Protection");
247 }
248 catch (Exception e)
249 {
250 _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
251 }
252 }
253 _protocolSession = new AMQProtocolSession(this, session, _connection);
254
255 _stateManager.setProtocolSession(_protocolSession);
256
257 _protocolSession.init();
258 }
259
260 /**
261 * Called when we want to create a new IoTransport session
262 * @param brokerDetail
263 */
264 public void createIoTransportSession(BrokerDetails brokerDetail)
265 {
266 _protocolSession = new AMQProtocolSession(this, _connection);
267 _stateManager.setProtocolSession(_protocolSession);
268 IoTransport.connect_0_9(getProtocolSession(),
269 brokerDetail.getHost(),
270 brokerDetail.getPort(),
271 brokerDetail.useSSL());
272 _protocolSession.init();
273 }
274
275 /**
276 * Called when the network connection is closed. This can happen, either because the client explicitly requested
277 * that the connection be closed, in which case nothing is done, or because the connection died. In the case
278 * where the connection died, an attempt to failover automatically to a new connection may be started. The failover
279 * process will be started, provided that it is the clients policy to allow failover, and provided that a failover
280 * has not already been started or failed.
281 *
282 * <p/>It is important to note that when the connection dies this method may be called or {@link #exceptionCaught}
283 * may be called first followed by this method. This depends on whether the client was trying to send data at the
284 * time of the failure.
285 *
286 * @param session The MINA session.
287 *
288 * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
289 * not otherwise? The above comment doesn't make that clear.
290 */
291 public void sessionClosed(IoSession session)
292 {
293 if (_connection.isClosed())
294 {
295 _logger.debug("Session closed called by client");
296 }
297 else
298 {
299 _logger.debug("Session closed called with failover state currently " + _failoverState);
300
301 // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
302 // known through the policy settings.
303
304 if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
305 {
306 _logger.debug("FAILOVER STARTING");
307 if (_failoverState == FailoverState.NOT_STARTED)
308 {
309 _failoverState = FailoverState.IN_PROGRESS;
310 startFailoverThread();
311 }
312 else
313 {
314 _logger.debug("Not starting failover as state currently " + _failoverState);
315 }
316 }
317 else
318 {
319 _logger.debug("Failover not allowed by policy."); // or already in progress?
320
321 if (_logger.isDebugEnabled())
322 {
323 _logger.debug(_connection.getFailoverPolicy().toString());
324 }
325
326 if (_failoverState != FailoverState.IN_PROGRESS)
327 {
328 _logger.debug("sessionClose() not allowed to failover");
329 _connection.exceptionReceived(new AMQDisconnectedException(
330 "Server closed connection and reconnection " + "not permitted.", null));
331 }
332 else
333 {
334 _logger.debug("sessionClose() failover in progress");
335 }
336 }
337 }
338
339 _logger.debug("Protocol Session [" + this + "] closed");
340 }
341
342 /** See {@link FailoverHandler} to see rationale for separate thread. */
343 private void startFailoverThread()
344 {
345 Thread failoverThread = new Thread(_failoverHandler);
346 failoverThread.setName("Failover");
347 // Do not inherit daemon-ness from current thread as this can be a daemon
348 // thread such as a AnonymousIoService thread.
349 failoverThread.setDaemon(false);
350 failoverThread.start();
351 }
352
353 public void sessionIdle(IoSession session, IdleStatus status) throws Exception
354 {
355 _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
356 if (IdleStatus.WRITER_IDLE.equals(status))
357 {
358 // write heartbeat frame:
359 _logger.debug("Sent heartbeat");
360 session.write(HeartbeatBody.FRAME);
361 HeartbeatDiagnostics.sent();
362 }
363 else if (IdleStatus.READER_IDLE.equals(status))
364 {
365 // failover:
366 HeartbeatDiagnostics.timeout();
367 _logger.warn("Timed out while waiting for heartbeat from peer.");
368 session.close();
369 }
370 }
371
372 /**
373 * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an
374 * IOException, MINA will close the connection automatically.
375 *
376 * @param session The MINA session.
377 * @param cause The exception that triggered this event.
378 */
379 public void exceptionCaught(IoSession session, Throwable cause)
380 {
381 if (_failoverState == FailoverState.NOT_STARTED)
382 {
383 // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
384 if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
385 {
386 _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
387 // this will attemp failover
388
389 sessionClosed(session);
390 }
391 else
392 {
393
394 if (cause instanceof ProtocolCodecException)
395 {
396 _logger.info("Protocol Exception caught NOT going to attempt failover as " +
397 "cause isn't AMQConnectionClosedException: " + cause, cause);
398
399 AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
400 propagateExceptionToAllWaiters(amqe);
401 }
402 _connection.exceptionReceived(cause);
403
404 }
405
406 // FIXME Need to correctly handle other exceptions. Things like ...
407 // if (cause instanceof AMQChannelClosedException)
408 // which will cause the JMSSession to end due to a channel close and so that Session needs
409 // to be removed from the map so we can correctly still call close without an exception when trying to close
410 // the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception
411 }
412 // we reach this point if failover was attempted and failed therefore we need to let the calling app
413 // know since we cannot recover the situation
414 else if (_failoverState == FailoverState.FAILED)
415 {
416 _logger.error("Exception caught by protocol handler: " + cause, cause);
417
418 // we notify the state manager of the error in case we have any clients waiting on a state
419 // change. Those "waiters" will be interrupted and can handle the exception
420 AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
421 propagateExceptionToAllWaiters(amqe);
422 _connection.exceptionReceived(cause);
423 }
424 }
425
426 /**
427 * There are two cases where we have other threads potentially blocking for events to be handled by this class.
428 * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
429 * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
430 *
431 * This should be called only when the exception is fatal for the connection.
432 *
433 * @param e the exception to propagate
434 *
435 * @see #propagateExceptionToFrameListeners
436 * @see #propagateExceptionToStateWaiters
437 */
438 public void propagateExceptionToAllWaiters(Exception e)
439 {
440 propagateExceptionToFrameListeners(e);
441 propagateExceptionToStateWaiters(e);
442 }
443
444 /**
445 * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any
446 * protocol level waits.
447 *
448 * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should
449 * stop waiting and relinquish the Failover lock {@see FailoverHandler}.
450 *
451 * Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt
452 * their protocol request and so listen again for the correct frame.
453 *
454 * @param e the exception to propagate
455 */
456 public void propagateExceptionToFrameListeners(Exception e)
457 {
458 synchronized (_frameListeners)
459 {
460 if (!_frameListeners.isEmpty())
461 {
462 final Iterator it = _frameListeners.iterator();
463 while (it.hasNext())
464 {
465 final AMQMethodListener ml = (AMQMethodListener) it.next();
466 ml.error(e);
467 }
468 }
469 }
470 }
471
472 /**
473 * This caters for the case where we only need to propogate an exception to the the state manager to interupt any
474 * thing waiting for a state change.
475 *
476 * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement.
477 *
478 * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal
479 * cases {@link #propagateExceptionToAllWaiters} would be the correct choice.
480 *
481 * @param e the exception to propagate
482 */
483 public void propagateExceptionToStateWaiters(Exception e)
484 {
485 getStateManager().error(e);
486 }
487
488 public void notifyFailoverStarting()
489 {
490 // Set the last exception in the sync block to ensure the ordering with add.
491 // either this gets done and the add does the ml.error
492 // or the add completes first and the iterator below will do ml.error
493 synchronized (_frameListeners)
494 {
495 _lastFailoverException = new FailoverException("Failing over about to start");
496 }
497
498 //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be
499 // interupted unless failover cannot restore the state.
500 propagateExceptionToFrameListeners(_lastFailoverException);
501 }
502
503 public void failoverInProgress()
504 {
505 _lastFailoverException = null;
506 }
507
508 private static int _messageReceivedCount;
509
510 public void messageReceived(IoSession session, Object message) throws Exception
511 {
512 if (PROTOCOL_DEBUG)
513 {
514 _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
515 }
516
517 if(message instanceof AMQFrame)
518 {
519 final boolean debug = _logger.isDebugEnabled();
520 final long msgNumber = ++_messageReceivedCount;
521
522 if (debug && ((msgNumber % 1000) == 0))
523 {
524 _logger.debug("Received " + _messageReceivedCount + " protocol messages");
525 }
526
527 AMQFrame frame = (AMQFrame) message;
528
529 final AMQBody bodyFrame = frame.getBodyFrame();
530
531 HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
532
533 bodyFrame.handle(frame.getChannel(), _protocolSession);
534
535 _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
536 }
537 else if (message instanceof ProtocolInitiation)
538 {
539 // We get here if the server sends a response to our initial protocol header
540 // suggesting an alternate ProtocolVersion; the server will then close the
541 // connection.
542 ProtocolInitiation protocolInit = (ProtocolInitiation) message;
543 ProtocolVersion pv = protocolInit.checkVersion();
544 getConnection().setProtocolVersion(pv);
545
546 // get round a bug in old versions of qpid whereby the connection is not closed
547 _stateManager.changeState(AMQState.CONNECTION_CLOSED);
548 }
549 }
550
551 public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
552 throws AMQException
553 {
554
555 if (_logger.isDebugEnabled())
556 {
557 _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame);
558 }
559
560 final AMQMethodEvent<AMQMethodBody> evt =
561 new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame);
562
563 try
564 {
565
566 boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
567 synchronized (_frameListeners)
568 {
569 if (!_frameListeners.isEmpty())
570 {
571 //This iterator is safe from the error state as the frame listeners always add before they send so their
572 // will be ready and waiting for this response.
573 Iterator it = _frameListeners.iterator();
574 while (it.hasNext())
575 {
576 final AMQMethodListener listener = (AMQMethodListener) it.next();
577 wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
578 }
579 }
580 }
581 if (!wasAnyoneInterested)
582 {
583 throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
584 + _frameListeners, null);
585 }
586 }
587 catch (AMQException e)
588 {
589 propagateExceptionToFrameListeners(e);
590
591 exceptionCaught(session, e);
592 }
593
594 }
595
596 private static int _messagesOut;
597
598 public void messageSent(IoSession session, Object message) throws Exception
599 {
600 if (PROTOCOL_DEBUG)
601 {
602 _protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
603 }
604
605 final long sentMessages = _messagesOut++;
606
607 final boolean debug = _logger.isDebugEnabled();
608
609 if (debug && ((sentMessages % 1000) == 0))
610 {
611 _logger.debug("Sent " + _messagesOut + " protocol messages");
612 }
613
614 _connection.bytesSent(session.getWrittenBytes());
615 }
616
617 public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
618 {
619 return getStateManager().createWaiter(states);
620 }
621
622 /**
623 * Convenience method that writes a frame to the protocol session. Equivalent to calling
624 * getProtocolSession().write().
625 *
626 * @param frame the frame to write
627 */
628 public void writeFrame(AMQDataBlock frame)
629 {
630 _protocolSession.writeFrame(frame);
631 }
632
633 public void writeFrame(AMQDataBlock frame, boolean wait)
634 {
635 _protocolSession.writeFrame(frame, wait);
636 }
637
638 /**
639 * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
640 * calling getProtocolSession().write() then waiting for the response.
641 *
642 * @param frame
643 * @param listener the blocking listener. Note the calling thread will block.
644 */
645 public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener)
646 throws AMQException, FailoverException
647 {
648 return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
649 }
650
651 /**
652 * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
653 * calling getProtocolSession().write() then waiting for the response.
654 *
655 * @param frame
656 * @param listener the blocking listener. Note the calling thread will block.
657 */
658 public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener,
659 long timeout) throws AMQException, FailoverException
660 {
661 try
662 {
663 synchronized (_frameListeners)
664 {
665 if (_lastFailoverException != null)
666 {
667 throw _lastFailoverException;
668 }
669
670 if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED)
671 {
672 Exception e = _stateManager.getLastException();
673 if (e != null)
674 {
675 if (e instanceof AMQException)
676 {
677 AMQException amqe = (AMQException) e;
678
679 throw amqe.cloneForCurrentThread();
680 }
681 else
682 {
683 throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
684 }
685 }
686 }
687
688 _frameListeners.add(listener);
689 //FIXME: At this point here we should check or before add we should check _stateManager is in an open
690 // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255
691 }
692 _protocolSession.writeFrame(frame);
693
694 return listener.blockForFrame(timeout);
695 // When control resumes before this line, a reply will have been received
696 // that matches the criteria defined in the blocking listener
697 }
698 finally
699 {
700 // If we don't removeKey the listener then no-one will
701 _frameListeners.remove(listener);
702 }
703
704 }
705
706 /** More convenient method to write a frame and wait for it's response. */
707 public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException, FailoverException
708 {
709 return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
710 }
711
712 /** More convenient method to write a frame and wait for it's response. */
713 public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException
714 {
715 return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass),
716 timeout);
717 }
718
719 public void closeSession(AMQSession session) throws AMQException
720 {
721 _protocolSession.closeSession(session);
722 }
723
724 /**
725 * Closes the connection.
726 *
727 * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
728 * anyway.
729 *
730 * @param timeout The timeout to wait for an acknowledgement to the close request.
731 *
732 * @throws AMQException If the close fails for any reason.
733 */
734 public void closeConnection(long timeout) throws AMQException
735 {
736 getStateManager().changeState(AMQState.CONNECTION_CLOSING);
737
738 ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
739 new AMQShortString("JMS client is closing the connection."), 0, 0);
740
741 final AMQFrame frame = body.generateFrame(0);
742
743 try
744 {
745 syncWrite(frame, ConnectionCloseOkBody.class, timeout);
746 _protocolSession.closeProtocolSession();
747 }
748 catch (AMQTimeoutException e)
749 {
750 _protocolSession.closeProtocolSession(false);
751 }
752 catch (FailoverException e)
753 {
754 _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
755 }
756 }
757
758 /** @return the number of bytes read from this protocol session */
759 public long getReadBytes()
760 {
761 return _protocolSession.getIoSession().getReadBytes();
762 }
763
764 /** @return the number of bytes written to this protocol session */
765 public long getWrittenBytes()
766 {
767 return _protocolSession.getIoSession().getWrittenBytes();
768 }
769
770 public void failover(String host, int port)
771 {
772 _failoverHandler.setHost(host);
773 _failoverHandler.setPort(port);
774 // see javadoc for FailoverHandler to see rationale for separate thread
775 startFailoverThread();
776 }
777
778 public void blockUntilNotFailingOver() throws InterruptedException
779 {
780 synchronized(_failoverLatchChange)
781 {
782 if (_failoverLatch != null)
783 {
784 _failoverLatch.await();
785 }
786 }
787 }
788
789 public AMQShortString generateQueueName()
790 {
791 return _protocolSession.generateQueueName();
792 }
793
794 public CountDownLatch getFailoverLatch()
795 {
796 return _failoverLatch;
797 }
798
799 public void setFailoverLatch(CountDownLatch failoverLatch)
800 {
801 synchronized (_failoverLatchChange)
802 {
803 _failoverLatch = failoverLatch;
804 }
805 }
806
807 public AMQConnection getConnection()
808 {
809 return _connection;
810 }
811
812 public AMQStateManager getStateManager()
813 {
814 return _stateManager;
815 }
816
817 public void setStateManager(AMQStateManager stateManager)
818 {
819 _stateManager = stateManager;
820 }
821
822 public AMQProtocolSession getProtocolSession()
823 {
824 return _protocolSession;
825 }
826
827 FailoverState getFailoverState()
828 {
829 return _failoverState;
830 }
831
832 public void setFailoverState(FailoverState failoverState)
833 {
834 _failoverState = failoverState;
835 }
836
837 public byte getProtocolMajorVersion()
838 {
839 return _protocolSession.getProtocolMajorVersion();
840 }
841
842 public byte getProtocolMinorVersion()
843 {
844 return _protocolSession.getProtocolMinorVersion();
845 }
846
847 public MethodRegistry getMethodRegistry()
848 {
849 return _protocolSession.getMethodRegistry();
850 }
851
852 public ProtocolVersion getProtocolVersion()
853 {
854 return _protocolSession.getProtocolVersion();
855 }
856 }
|