001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.protocol;
022
023 import org.apache.log4j.Logger;
024
025 import org.apache.mina.common.IdleStatus;
026 import org.apache.mina.common.IoServiceConfig;
027 import org.apache.mina.common.IoSession;
028 import org.apache.mina.common.CloseFuture;
029 import org.apache.mina.transport.vmpipe.VmPipeAddress;
030
031 import org.apache.qpid.AMQChannelException;
032 import org.apache.qpid.AMQConnectionException;
033 import org.apache.qpid.AMQException;
034 import org.apache.qpid.codec.AMQCodecFactory;
035 import org.apache.qpid.codec.AMQDecoder;
036 import org.apache.qpid.common.ClientProperties;
037 import org.apache.qpid.framing.*;
038 import org.apache.qpid.pool.ReadWriteThreadModel;
039 import org.apache.qpid.protocol.AMQConstant;
040 import org.apache.qpid.protocol.AMQMethodEvent;
041 import org.apache.qpid.protocol.AMQMethodListener;
042 import org.apache.qpid.server.AMQChannel;
043 import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
044 import org.apache.qpid.server.management.Managable;
045 import org.apache.qpid.server.management.ManagedObject;
046 import org.apache.qpid.server.output.ProtocolOutputConverter;
047 import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
048 import org.apache.qpid.server.registry.ApplicationRegistry;
049 import org.apache.qpid.server.state.AMQState;
050 import org.apache.qpid.server.state.AMQStateManager;
051 import org.apache.qpid.server.virtualhost.VirtualHost;
052 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
053 import org.apache.qpid.transport.Sender;
054
055 import javax.management.JMException;
056 import javax.security.sasl.SaslServer;
057
058 import java.net.InetSocketAddress;
059 import java.net.SocketAddress;
060 import java.security.Principal;
061 import java.util.ArrayList;
062 import java.util.HashMap;
063 import java.util.List;
064 import java.util.Map;
065 import java.util.concurrent.CopyOnWriteArrayList;
066 import java.util.concurrent.CopyOnWriteArraySet;
067
068 public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
069 {
070 private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
071
072 private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
073
074 // to save boxing the channelId and looking up in a map... cache in an array the low numbered
075 // channels. This value must be of the form 2^x - 1.
076 private static final int CHANNEL_CACHE_SIZE = 0xff;
077
078 private final IoSession _minaProtocolSession;
079
080 private AMQShortString _contextKey;
081
082 private AMQShortString _clientVersion = null;
083
084 private VirtualHost _virtualHost;
085
086 private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
087
088 private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
089
090 private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
091
092 private final AMQStateManager _stateManager;
093
094 private AMQCodecFactory _codecFactory;
095
096 private AMQProtocolSessionMBean _managedObject;
097
098 private SaslServer _saslServer;
099
100 private Object _lastReceived;
101
102 private Object _lastSent;
103
104 protected boolean _closed;
105 // maximum number of channels this session should have
106 private long _maxNoOfChannels = 1000;
107
108 /* AMQP Version for this session */
109 private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
110
111 private FieldTable _clientProperties;
112 private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
113
114 private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>();
115 private ProtocolOutputConverter _protocolOutputConverter;
116 private Principal _authorizedID;
117 private MethodDispatcher _dispatcher;
118 private ProtocolSessionIdentifier _sessionIdentifier;
119
120 private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
121 private org.apache.mina.common.WriteFuture _lastWriteFuture;
122
123 public ManagedObject getManagedObject()
124 {
125 return _managedObject;
126 }
127
128 public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
129 throws AMQException
130 {
131 _stateManager = new AMQStateManager(virtualHostRegistry, this);
132 _minaProtocolSession = session;
133 session.setAttachment(this);
134
135 _codecFactory = codecFactory;
136
137 try
138 {
139 IoServiceConfig config = session.getServiceConfig();
140 ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel();
141 threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
142 threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
143 }
144 catch (RuntimeException e)
145 {
146 e.printStackTrace();
147 throw e;
148
149 }
150 }
151
152 public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
153 AMQStateManager stateManager) throws AMQException
154 {
155 _stateManager = stateManager;
156 _minaProtocolSession = session;
157 session.setAttachment(this);
158
159 _codecFactory = codecFactory;
160
161 }
162
163 private AMQProtocolSessionMBean createMBean() throws AMQException
164 {
165 try
166 {
167 return new AMQProtocolSessionMBean(this);
168 }
169 catch (JMException ex)
170 {
171 _logger.error("AMQProtocolSession MBean creation has failed ", ex);
172 throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
173 }
174 }
175
176 public IoSession getIOSession()
177 {
178 return _minaProtocolSession;
179 }
180
181 public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession)
182 {
183 return (AMQProtocolSession) minaProtocolSession.getAttachment();
184 }
185
186 public void dataBlockReceived(AMQDataBlock message) throws Exception
187 {
188 _lastReceived = message;
189 if (message instanceof ProtocolInitiation)
190 {
191 protocolInitiationReceived((ProtocolInitiation) message);
192
193 }
194 else if (message instanceof AMQFrame)
195 {
196 AMQFrame frame = (AMQFrame) message;
197 frameReceived(frame);
198
199 }
200 else
201 {
202 throw new UnknnownMessageTypeException(message);
203 }
204 }
205
206 private void frameReceived(AMQFrame frame) throws AMQException
207 {
208 int channelId = frame.getChannel();
209 AMQBody body = frame.getBodyFrame();
210
211 if (_logger.isDebugEnabled())
212 {
213 _logger.debug("Frame Received: " + frame);
214 }
215
216 // Check that this channel is not closing
217 if (channelAwaitingClosure(channelId))
218 {
219 if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
220 {
221 if (_logger.isInfoEnabled())
222 {
223 _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
224 }
225 }
226 else
227 {
228 if (_logger.isInfoEnabled())
229 {
230 _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
231 }
232
233 closeProtocolSession();
234 return;
235 }
236 }
237
238 try
239 {
240 body.handle(channelId, this);
241 }
242 catch (AMQException e)
243 {
244 closeChannel(channelId);
245 throw e;
246 }
247
248 }
249
250 private void protocolInitiationReceived(ProtocolInitiation pi)
251 {
252 // this ensures the codec never checks for a PI message again
253 ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
254 try
255 {
256 ProtocolVersion pv = pi.checkVersion(); // Fails if not correct
257
258 // This sets the protocol version (and hence framing classes) for this session.
259 setProtocolVersion(pv);
260
261 String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
262
263 String locales = "en_US";
264
265 AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
266 (short) getProtocolMinorVersion(),
267 null,
268 mechanisms.getBytes(),
269 locales.getBytes());
270 _minaProtocolSession.write(responseBody.generateFrame(0));
271
272 }
273 catch (AMQException e)
274 {
275 _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
276
277 _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
278
279 // TODO: Close connection (but how to wait until message is sent?)
280 // ritchiem 2006-12-04 will this not do?
281 // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
282 // future.join();
283 // close connection
284
285 }
286 }
287
288 public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
289 {
290
291 final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
292
293 try
294 {
295 try
296 {
297
298 boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
299
300 if (!_frameListeners.isEmpty())
301 {
302 for (AMQMethodListener listener : _frameListeners)
303 {
304 wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
305 }
306 }
307
308 if (!wasAnyoneInterested)
309 {
310 throw new AMQNoMethodHandlerException(evt);
311 }
312 }
313 catch (AMQChannelException e)
314 {
315 if (getChannel(channelId) != null)
316 {
317 if (_logger.isInfoEnabled())
318 {
319 _logger.info("Closing channel due to: " + e.getMessage());
320 }
321
322 writeFrame(e.getCloseFrame(channelId));
323 closeChannel(channelId);
324 }
325 else
326 {
327 if (_logger.isDebugEnabled())
328 {
329 _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage());
330 }
331
332 if (_logger.isInfoEnabled())
333 {
334 _logger.info("Closing connection due to: " + e.getMessage());
335 }
336
337 AMQConnectionException ce =
338 evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
339 AMQConstant.CHANNEL_ERROR.getName().toString());
340
341 closeConnection(channelId, ce, false);
342 }
343 }
344 catch (AMQConnectionException e)
345 {
346 closeConnection(channelId, e, false);
347 }
348 }
349 catch (Exception e)
350 {
351
352 for (AMQMethodListener listener : _frameListeners)
353 {
354 listener.error(e);
355 }
356
357 _logger.error("Unexpected exception while processing frame. Closing connection.", e);
358
359 closeProtocolSession();
360 }
361 }
362
363 public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
364 {
365
366 AMQChannel channel = getAndAssertChannel(channelId);
367
368 channel.publishContentHeader(body);
369
370 }
371
372 public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
373 {
374 AMQChannel channel = getAndAssertChannel(channelId);
375
376 channel.publishContentBody(body);
377 }
378
379 public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
380 {
381 // NO - OP
382 }
383
384 /**
385 * Convenience method that writes a frame to the protocol session. Equivalent to calling
386 * getProtocolSession().write().
387 *
388 * @param frame the frame to write
389 */
390 public void writeFrame(AMQDataBlock frame)
391 {
392 _lastSent = frame;
393
394 _lastWriteFuture = _minaProtocolSession.write(frame);
395 }
396
397 public AMQShortString getContextKey()
398 {
399 return _contextKey;
400 }
401
402 public void setContextKey(AMQShortString contextKey)
403 {
404 _contextKey = contextKey;
405 }
406
407 public List<AMQChannel> getChannels()
408 {
409 return new ArrayList<AMQChannel>(_channelMap.values());
410 }
411
412 public AMQChannel getAndAssertChannel(int channelId) throws AMQException
413 {
414 AMQChannel channel = getChannel(channelId);
415 if (channel == null)
416 {
417 throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
418 }
419
420 return channel;
421 }
422
423 public AMQChannel getChannel(int channelId) throws AMQException
424 {
425 final AMQChannel channel =
426 ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
427 if ((channel == null) || channel.isClosing())
428 {
429 return null;
430 }
431 else
432 {
433 return channel;
434 }
435 }
436
437 public boolean channelAwaitingClosure(int channelId)
438 {
439 return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
440 }
441
442 public void addChannel(AMQChannel channel) throws AMQException
443 {
444 if (_closed)
445 {
446 throw new AMQException("Session is closed");
447 }
448
449 final int channelId = channel.getChannelId();
450
451 if (_closingChannelsList.contains(channelId))
452 {
453 throw new AMQException("Session is marked awaiting channel close");
454 }
455
456 if (_channelMap.size() == _maxNoOfChannels)
457 {
458 String errorMessage =
459 toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
460 + "); can't create channel";
461 _logger.error(errorMessage);
462 throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
463 }
464 else
465 {
466 _channelMap.put(channel.getChannelId(), channel);
467 }
468
469 if (((channelId & CHANNEL_CACHE_SIZE) == channelId))
470 {
471 _cachedChannels[channelId] = channel;
472 }
473
474 checkForNotification();
475 }
476
477 private void checkForNotification()
478 {
479 int channelsCount = _channelMap.size();
480 if (channelsCount >= _maxNoOfChannels)
481 {
482 _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
483 }
484 }
485
486 public Long getMaximumNumberOfChannels()
487 {
488 return _maxNoOfChannels;
489 }
490
491 public void setMaximumNumberOfChannels(Long value)
492 {
493 _maxNoOfChannels = value;
494 }
495
496 public void commitTransactions(AMQChannel channel) throws AMQException
497 {
498 if ((channel != null) && channel.isTransactional())
499 {
500 channel.commit();
501 }
502 }
503
504 public void rollbackTransactions(AMQChannel channel) throws AMQException
505 {
506 if ((channel != null) && channel.isTransactional())
507 {
508 channel.rollback();
509 }
510 }
511
512 /**
513 * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
514 * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
515 *
516 * @param channelId id of the channel to close
517 *
518 * @throws AMQException if an error occurs closing the channel
519 * @throws IllegalArgumentException if the channel id is not valid
520 */
521 public void closeChannel(int channelId) throws AMQException
522 {
523 final AMQChannel channel = getChannel(channelId);
524 if (channel == null)
525 {
526 throw new IllegalArgumentException("Unknown channel id");
527 }
528 else
529 {
530 try
531 {
532 channel.close();
533 markChannelAwaitingCloseOk(channelId);
534 }
535 finally
536 {
537 removeChannel(channelId);
538 }
539 }
540 }
541
542 public void closeChannelOk(int channelId)
543 {
544 // todo QPID-847 - This is called from two lcoations ChannelCloseHandler and ChannelCloseOkHandler.
545 // When it is the CC_OK_Handler then it makes sence to remove the channel else we will leak memory.
546 // We do it from the Close Handler as we are sending the OK back to the client.
547 // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException
548 // will send a close-ok.. Where we should call removeChannel.
549 // However, due to the poor exception handling on the client. The client-user will be notified of the
550 // InvalidArgument and if they then decide to close the session/connection then the there will be time
551 // for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed.
552 //removeChannel(channelId);
553 _closingChannelsList.remove(new Integer(channelId));
554 }
555
556 private void markChannelAwaitingCloseOk(int channelId)
557 {
558 _closingChannelsList.add(channelId);
559 }
560
561 /**
562 * In our current implementation this is used by the clustering code.
563 *
564 * @param channelId The channel to remove
565 */
566 public void removeChannel(int channelId)
567 {
568 _channelMap.remove(channelId);
569 if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
570 {
571 _cachedChannels[channelId] = null;
572 }
573 }
574
575 /**
576 * Initialise heartbeats on the session.
577 *
578 * @param delay delay in seconds (not ms)
579 */
580 public void initHeartbeats(int delay)
581 {
582 if (delay > 0)
583 {
584 _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
585 _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, (int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
586 }
587 }
588
589 /**
590 * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
591 *
592 * @throws AMQException if an error occurs while closing any channel
593 */
594 private void closeAllChannels() throws AMQException
595 {
596 for (AMQChannel channel : _channelMap.values())
597 {
598 channel.close();
599 }
600
601 _channelMap.clear();
602 for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
603 {
604 _cachedChannels[i] = null;
605 }
606 }
607
608 /** This must be called when the session is _closed in order to free up any resources managed by the session. */
609 public void closeSession() throws AMQException
610 {
611 if (!_closed)
612 {
613 _closed = true;
614
615 if (_virtualHost != null)
616 {
617 _virtualHost.getConnectionRegistry().deregisterConnection(this);
618 }
619
620 closeAllChannels();
621 if (_managedObject != null)
622 {
623 _managedObject.unregister();
624 }
625
626 for (Task task : _taskList)
627 {
628 task.doTask(this);
629 }
630 }
631 }
632
633 public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
634 {
635 if (_logger.isInfoEnabled())
636 {
637 _logger.info("Closing connection due to: " + e.getMessage());
638 }
639
640 markChannelAwaitingCloseOk(channelId);
641 closeSession();
642 _stateManager.changeState(AMQState.CONNECTION_CLOSING);
643 writeFrame(e.getCloseFrame(channelId));
644
645 if (closeProtocolSession)
646 {
647 closeProtocolSession();
648 }
649 }
650
651 public void closeProtocolSession()
652 {
653 closeProtocolSession(true);
654 }
655
656 public void closeProtocolSession(boolean waitLast)
657 {
658 if (waitLast && (_lastWriteFuture != null))
659 {
660 _logger.debug("Waiting for last write to join.");
661 _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
662 }
663
664 _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession);
665 final CloseFuture future = _minaProtocolSession.close();
666 future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
667
668 try
669 {
670 _stateManager.changeState(AMQState.CONNECTION_CLOSED);
671 }
672 catch (AMQException e)
673 {
674 _logger.info(e.getMessage());
675 }
676 }
677
678 public String toString()
679 {
680 return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
681 }
682
683 public String dump()
684 {
685 return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived;
686 }
687
688 /** @return an object that can be used to identity */
689 public Object getKey()
690 {
691 return _minaProtocolSession.getRemoteAddress();
692 }
693
694 /**
695 * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may
696 * be bound to multiple addresses this could vary depending on the acceptor this session was created from.
697 *
698 * @return a String FQDN
699 */
700 public String getLocalFQDN()
701 {
702 SocketAddress address = _minaProtocolSession.getLocalAddress();
703 // we use the vmpipe address in some tests hence the need for this rather ugly test. The host
704 // information is used by SASL primary.
705 if (address instanceof InetSocketAddress)
706 {
707 return ((InetSocketAddress) address).getHostName();
708 }
709 else if (address instanceof VmPipeAddress)
710 {
711 return "vmpipe:" + ((VmPipeAddress) address).getPort();
712 }
713 else
714 {
715 throw new IllegalArgumentException("Unsupported socket address class: " + address);
716 }
717 }
718
719 public SaslServer getSaslServer()
720 {
721 return _saslServer;
722 }
723
724 public void setSaslServer(SaslServer saslServer)
725 {
726 _saslServer = saslServer;
727 }
728
729 public FieldTable getClientProperties()
730 {
731 return _clientProperties;
732 }
733
734 public void setClientProperties(FieldTable clientProperties)
735 {
736 _clientProperties = clientProperties;
737 if (_clientProperties != null)
738 {
739 if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null)
740 {
741 setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
742 }
743
744 if (_clientProperties.getString(ClientProperties.version.toString()) != null)
745 {
746 _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString()));
747 }
748 }
749 _sessionIdentifier = new ProtocolSessionIdentifier(this);
750 }
751
752 private void setProtocolVersion(ProtocolVersion pv)
753 {
754 _protocolVersion = pv;
755
756 _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
757 _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
758 }
759
760 public byte getProtocolMajorVersion()
761 {
762 return _protocolVersion.getMajorVersion();
763 }
764
765 public ProtocolVersion getProtocolVersion()
766 {
767 return _protocolVersion;
768 }
769
770 public byte getProtocolMinorVersion()
771 {
772 return _protocolVersion.getMinorVersion();
773 }
774
775 public boolean isProtocolVersion(byte major, byte minor)
776 {
777 return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
778 }
779
780 public MethodRegistry getRegistry()
781 {
782 return getMethodRegistry();
783 }
784
785 public Object getClientIdentifier()
786 {
787 return (_minaProtocolSession != null) ? _minaProtocolSession.getRemoteAddress() : null;
788 }
789
790 public VirtualHost getVirtualHost()
791 {
792 return _virtualHost;
793 }
794
795 public void setVirtualHost(VirtualHost virtualHost) throws AMQException
796 {
797 _virtualHost = virtualHost;
798
799 _virtualHost.getConnectionRegistry().registerConnection(this);
800
801 _managedObject = createMBean();
802 _managedObject.register();
803 }
804
805 public void addSessionCloseTask(Task task)
806 {
807 _taskList.add(task);
808 }
809
810 public void removeSessionCloseTask(Task task)
811 {
812 _taskList.remove(task);
813 }
814
815 public ProtocolOutputConverter getProtocolOutputConverter()
816 {
817 return _protocolOutputConverter;
818 }
819
820 public void setAuthorizedID(Principal authorizedID)
821 {
822 _authorizedID = authorizedID;
823 }
824
825 public Principal getAuthorizedID()
826 {
827 return _authorizedID;
828 }
829
830 public MethodRegistry getMethodRegistry()
831 {
832 return MethodRegistry.getMethodRegistry(getProtocolVersion());
833 }
834
835 public MethodDispatcher getMethodDispatcher()
836 {
837 return _dispatcher;
838 }
839
840 public ProtocolSessionIdentifier getSessionIdentifier()
841 {
842 return _sessionIdentifier;
843 }
844
845 public String getClientVersion()
846 {
847 return (_clientVersion == null) ? null : _clientVersion.toString();
848 }
849
850 public void setSender(Sender<java.nio.ByteBuffer> sender)
851 {
852 // No-op, interface munging between this and AMQProtocolSession
853 }
854
855 public void init()
856 {
857 // No-op, interface munging between this and AMQProtocolSession
858 }
859 }
|