AMQMinaProtocolSession.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.protocol;
022 
023 import org.apache.log4j.Logger;
024 
025 import org.apache.mina.common.IdleStatus;
026 import org.apache.mina.common.IoServiceConfig;
027 import org.apache.mina.common.IoSession;
028 import org.apache.mina.common.CloseFuture;
029 import org.apache.mina.transport.vmpipe.VmPipeAddress;
030 
031 import org.apache.qpid.AMQChannelException;
032 import org.apache.qpid.AMQConnectionException;
033 import org.apache.qpid.AMQException;
034 import org.apache.qpid.codec.AMQCodecFactory;
035 import org.apache.qpid.codec.AMQDecoder;
036 import org.apache.qpid.common.ClientProperties;
037 import org.apache.qpid.framing.*;
038 import org.apache.qpid.pool.ReadWriteThreadModel;
039 import org.apache.qpid.protocol.AMQConstant;
040 import org.apache.qpid.protocol.AMQMethodEvent;
041 import org.apache.qpid.protocol.AMQMethodListener;
042 import org.apache.qpid.server.AMQChannel;
043 import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
044 import org.apache.qpid.server.management.Managable;
045 import org.apache.qpid.server.management.ManagedObject;
046 import org.apache.qpid.server.output.ProtocolOutputConverter;
047 import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
048 import org.apache.qpid.server.registry.ApplicationRegistry;
049 import org.apache.qpid.server.state.AMQState;
050 import org.apache.qpid.server.state.AMQStateManager;
051 import org.apache.qpid.server.virtualhost.VirtualHost;
052 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
053 import org.apache.qpid.transport.Sender;
054 
055 import javax.management.JMException;
056 import javax.security.sasl.SaslServer;
057 
058 import java.net.InetSocketAddress;
059 import java.net.SocketAddress;
060 import java.security.Principal;
061 import java.util.ArrayList;
062 import java.util.HashMap;
063 import java.util.List;
064 import java.util.Map;
065 import java.util.concurrent.CopyOnWriteArrayList;
066 import java.util.concurrent.CopyOnWriteArraySet;
067 
068 public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
069 {
070     private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
071 
072     private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
073 
074     // to save boxing the channelId and looking up in a map... cache in an array the low numbered
075     // channels.  This value must be of the form 2^x - 1.
076     private static final int CHANNEL_CACHE_SIZE = 0xff;
077 
078     private final IoSession _minaProtocolSession;
079 
080     private AMQShortString _contextKey;
081 
082     private AMQShortString _clientVersion = null;
083 
084     private VirtualHost _virtualHost;
085 
086     private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
087 
088     private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
089 
090     private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
091 
092     private final AMQStateManager _stateManager;
093 
094     private AMQCodecFactory _codecFactory;
095 
096     private AMQProtocolSessionMBean _managedObject;
097 
098     private SaslServer _saslServer;
099 
100     private Object _lastReceived;
101 
102     private Object _lastSent;
103 
104     protected boolean _closed;
105     // maximum number of channels this session should have
106     private long _maxNoOfChannels = 1000;
107 
108     /* AMQP Version for this session */
109     private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
110 
111     private FieldTable _clientProperties;
112     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
113 
114     private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>();
115     private ProtocolOutputConverter _protocolOutputConverter;
116     private Principal _authorizedID;
117     private MethodDispatcher _dispatcher;
118     private ProtocolSessionIdentifier _sessionIdentifier;
119 
120     private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
121     private org.apache.mina.common.WriteFuture _lastWriteFuture;
122 
123     public ManagedObject getManagedObject()
124     {
125         return _managedObject;
126     }
127 
128     public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
129             throws AMQException
130     {
131         _stateManager = new AMQStateManager(virtualHostRegistry, this);
132         _minaProtocolSession = session;
133         session.setAttachment(this);
134 
135         _codecFactory = codecFactory;
136 
137         try
138         {
139             IoServiceConfig config = session.getServiceConfig();
140             ReadWriteThreadModel threadModel = (ReadWriteThreadModelconfig.getThreadModel();
141             threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
142             threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
143         }
144         catch (RuntimeException e)
145         {
146             e.printStackTrace();
147             throw e;
148 
149         }
150     }
151 
152     public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
153                                   AMQStateManager stateManagerthrows AMQException
154     {
155         _stateManager = stateManager;
156         _minaProtocolSession = session;
157         session.setAttachment(this);
158 
159         _codecFactory = codecFactory;
160 
161     }
162 
163     private AMQProtocolSessionMBean createMBean() throws AMQException
164     {
165         try
166         {
167             return new AMQProtocolSessionMBean(this);
168         }
169         catch (JMException ex)
170         {
171             _logger.error("AMQProtocolSession MBean creation has failed ", ex);
172             throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
173         }
174     }
175 
176     public IoSession getIOSession()
177     {
178         return _minaProtocolSession;
179     }
180 
181     public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession)
182     {
183         return (AMQProtocolSessionminaProtocolSession.getAttachment();
184     }
185 
186     public void dataBlockReceived(AMQDataBlock messagethrows Exception
187     {
188         _lastReceived = message;
189         if (message instanceof ProtocolInitiation)
190         {
191             protocolInitiationReceived((ProtocolInitiationmessage);
192 
193         }
194         else if (message instanceof AMQFrame)
195         {
196             AMQFrame frame = (AMQFramemessage;
197             frameReceived(frame);
198 
199         }
200         else
201         {
202             throw new UnknnownMessageTypeException(message);
203         }
204     }
205 
206     private void frameReceived(AMQFrame framethrows AMQException
207     {
208         int channelId = frame.getChannel();
209         AMQBody body = frame.getBodyFrame();
210 
211         if (_logger.isDebugEnabled())
212         {
213             _logger.debug("Frame Received: " + frame);
214         }
215 
216         // Check that this channel is not closing
217         if (channelAwaitingClosure(channelId))
218         {
219             if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
220             {
221                 if (_logger.isInfoEnabled())
222                 {
223                     _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
224                 }
225             }
226             else
227             {
228                 if (_logger.isInfoEnabled())
229                 {
230                     _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
231                 }
232 
233                 closeProtocolSession();
234                 return;
235             }
236         }
237 
238         try
239         {
240             body.handle(channelId, this);
241         }
242         catch (AMQException e)
243         {
244             closeChannel(channelId);
245             throw e;
246         }
247 
248     }
249 
250     private void protocolInitiationReceived(ProtocolInitiation pi)
251     {
252         // this ensures the codec never checks for a PI message again
253         ((AMQDecoder_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
254         try
255         {
256             ProtocolVersion pv = pi.checkVersion()// Fails if not correct
257 
258             // This sets the protocol version (and hence framing classes) for this session.
259             setProtocolVersion(pv);
260 
261             String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
262 
263             String locales = "en_US";
264 
265             AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((shortgetProtocolMajorVersion(),
266                                                                                        (shortgetProtocolMinorVersion(),
267                                                                                        null,
268                                                                                        mechanisms.getBytes(),
269                                                                                        locales.getBytes());
270             _minaProtocolSession.write(responseBody.generateFrame(0));
271 
272         }
273         catch (AMQException e)
274         {
275             _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
276 
277             _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
278 
279             // TODO: Close connection (but how to wait until message is sent?)
280             // ritchiem 2006-12-04 will this not do?
281             // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
282             // future.join();
283             // close connection
284 
285         }
286     }
287 
288     public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
289     {
290 
291         final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
292 
293         try
294         {
295             try
296             {
297 
298                 boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
299 
300                 if (!_frameListeners.isEmpty())
301                 {
302                     for (AMQMethodListener listener : _frameListeners)
303                     {
304                         wasAnyoneInterested = listener.methodReceived(evt|| wasAnyoneInterested;
305                     }
306                 }
307 
308                 if (!wasAnyoneInterested)
309                 {
310                     throw new AMQNoMethodHandlerException(evt);
311                 }
312             }
313             catch (AMQChannelException e)
314             {
315                 if (getChannel(channelId!= null)
316                 {
317                     if (_logger.isInfoEnabled())
318                     {
319                         _logger.info("Closing channel due to: " + e.getMessage());
320                     }
321 
322                     writeFrame(e.getCloseFrame(channelId));
323                     closeChannel(channelId);
324                 }
325                 else
326                 {
327                     if (_logger.isDebugEnabled())
328                     {
329                         _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage());
330                     }
331 
332                     if (_logger.isInfoEnabled())
333                     {
334                         _logger.info("Closing connection due to: " + e.getMessage());
335                     }
336 
337                     AMQConnectionException ce =
338                             evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
339                                                                    AMQConstant.CHANNEL_ERROR.getName().toString());
340 
341                     closeConnection(channelId, ce, false);
342                 }
343             }
344             catch (AMQConnectionException e)
345             {
346                 closeConnection(channelId, e, false);
347             }
348         }
349         catch (Exception e)
350         {
351 
352             for (AMQMethodListener listener : _frameListeners)
353             {
354                 listener.error(e);
355             }
356 
357             _logger.error("Unexpected exception while processing frame.  Closing connection.", e);
358 
359             closeProtocolSession();
360         }
361     }
362 
363     public void contentHeaderReceived(int channelId, ContentHeaderBody bodythrows AMQException
364     {
365 
366         AMQChannel channel = getAndAssertChannel(channelId);
367 
368         channel.publishContentHeader(body);
369 
370     }
371 
372     public void contentBodyReceived(int channelId, ContentBody bodythrows AMQException
373     {
374         AMQChannel channel = getAndAssertChannel(channelId);
375 
376         channel.publishContentBody(body);
377     }
378 
379     public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
380     {
381         // NO - OP
382     }
383 
384     /**
385      * Convenience method that writes a frame to the protocol session. Equivalent to calling
386      * getProtocolSession().write().
387      *
388      @param frame the frame to write
389      */
390     public void writeFrame(AMQDataBlock frame)
391     {
392         _lastSent = frame;
393 
394         _lastWriteFuture = _minaProtocolSession.write(frame);
395     }
396 
397     public AMQShortString getContextKey()
398     {
399         return _contextKey;
400     }
401 
402     public void setContextKey(AMQShortString contextKey)
403     {
404         _contextKey = contextKey;
405     }
406 
407     public List<AMQChannel> getChannels()
408     {
409         return new ArrayList<AMQChannel>(_channelMap.values());
410     }
411 
412     public AMQChannel getAndAssertChannel(int channelIdthrows AMQException
413     {
414         AMQChannel channel = getChannel(channelId);
415         if (channel == null)
416         {
417             throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
418         }
419 
420         return channel;
421     }
422 
423     public AMQChannel getChannel(int channelIdthrows AMQException
424     {
425         final AMQChannel channel =
426                 ((channelId & CHANNEL_CACHE_SIZE== channelId? _cachedChannels[channelId: _channelMap.get(channelId);
427         if ((channel == null|| channel.isClosing())
428         {
429             return null;
430         }
431         else
432         {
433             return channel;
434         }
435     }
436 
437     public boolean channelAwaitingClosure(int channelId)
438     {
439         return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
440     }
441 
442     public void addChannel(AMQChannel channelthrows AMQException
443     {
444         if (_closed)
445         {
446             throw new AMQException("Session is closed");
447         }
448 
449         final int channelId = channel.getChannelId();
450 
451         if (_closingChannelsList.contains(channelId))
452         {
453             throw new AMQException("Session is marked awaiting channel close");
454         }
455 
456         if (_channelMap.size() == _maxNoOfChannels)
457         {
458             String errorMessage =
459                     toString() ": maximum number of channels has been reached (" + _maxNoOfChannels
460                     "); can't create channel";
461             _logger.error(errorMessage);
462             throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
463         }
464         else
465         {
466             _channelMap.put(channel.getChannelId(), channel);
467         }
468 
469         if (((channelId & CHANNEL_CACHE_SIZE== channelId))
470         {
471             _cachedChannels[channelId= channel;
472         }
473 
474         checkForNotification();
475     }
476 
477     private void checkForNotification()
478     {
479         int channelsCount = _channelMap.size();
480         if (channelsCount >= _maxNoOfChannels)
481         {
482             _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
483         }
484     }
485 
486     public Long getMaximumNumberOfChannels()
487     {
488         return _maxNoOfChannels;
489     }
490 
491     public void setMaximumNumberOfChannels(Long value)
492     {
493         _maxNoOfChannels = value;
494     }
495 
496     public void commitTransactions(AMQChannel channelthrows AMQException
497     {
498         if ((channel != null&& channel.isTransactional())
499         {
500             channel.commit();
501         }
502     }
503 
504     public void rollbackTransactions(AMQChannel channelthrows AMQException
505     {
506         if ((channel != null&& channel.isTransactional())
507         {
508             channel.rollback();
509         }
510     }
511 
512     /**
513      * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
514      * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
515      *
516      @param channelId id of the channel to close
517      *
518      @throws AMQException             if an error occurs closing the channel
519      @throws IllegalArgumentException if the channel id is not valid
520      */
521     public void closeChannel(int channelIdthrows AMQException
522     {
523         final AMQChannel channel = getChannel(channelId);
524         if (channel == null)
525         {
526             throw new IllegalArgumentException("Unknown channel id");
527         }
528         else
529         {
530             try
531             {
532                 channel.close();
533                 markChannelAwaitingCloseOk(channelId);
534             }
535             finally
536             {
537                 removeChannel(channelId);
538             }
539         }
540     }
541 
542     public void closeChannelOk(int channelId)
543     {
544         // todo QPID-847 - This is called from two lcoations ChannelCloseHandler and ChannelCloseOkHandler.
545         // When it is the CC_OK_Handler then it makes sence to remove the channel else we will leak memory.
546         // We do it from the Close Handler as we are sending the OK back to the client.
547         // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException
548         // will send a close-ok.. Where we should call removeChannel.
549         // However, due to the poor exception handling on the client. The client-user will be notified of the
550         // InvalidArgument and if they then decide to close the session/connection then the there will be time
551         // for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed.
552         //removeChannel(channelId);
553         _closingChannelsList.remove(new Integer(channelId));
554     }
555 
556     private void markChannelAwaitingCloseOk(int channelId)
557     {
558         _closingChannelsList.add(channelId);
559     }
560 
561     /**
562      * In our current implementation this is used by the clustering code.
563      *
564      @param channelId The channel to remove
565      */
566     public void removeChannel(int channelId)
567     {
568         _channelMap.remove(channelId);
569         if ((channelId & CHANNEL_CACHE_SIZE== channelId)
570         {
571             _cachedChannels[channelIdnull;
572         }
573     }
574 
575     /**
576      * Initialise heartbeats on the session.
577      *
578      @param delay delay in seconds (not ms)
579      */
580     public void initHeartbeats(int delay)
581     {
582         if (delay > 0)
583         {
584             _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
585             _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, (int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
586         }
587     }
588 
589     /**
590      * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
591      *
592      @throws AMQException if an error occurs while closing any channel
593      */
594     private void closeAllChannels() throws AMQException
595     {
596         for (AMQChannel channel : _channelMap.values())
597         {
598             channel.close();
599         }
600 
601         _channelMap.clear();
602         for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
603         {
604             _cachedChannels[inull;
605         }
606     }
607 
608     /** This must be called when the session is _closed in order to free up any resources managed by the session. */
609     public void closeSession() throws AMQException
610     {
611         if (!_closed)
612         {
613             _closed = true;
614 
615             if (_virtualHost != null)
616             {
617                 _virtualHost.getConnectionRegistry().deregisterConnection(this);
618             }
619 
620             closeAllChannels();
621             if (_managedObject != null)
622             {
623                 _managedObject.unregister();
624             }
625 
626             for (Task task : _taskList)
627             {
628                 task.doTask(this);
629             }
630         }
631     }
632 
633     public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSessionthrows AMQException
634     {
635         if (_logger.isInfoEnabled())
636         {
637             _logger.info("Closing connection due to: " + e.getMessage());
638         }
639 
640         markChannelAwaitingCloseOk(channelId);
641         closeSession();
642         _stateManager.changeState(AMQState.CONNECTION_CLOSING);
643         writeFrame(e.getCloseFrame(channelId));
644 
645         if (closeProtocolSession)
646         {
647             closeProtocolSession();
648         }
649     }
650 
651     public void closeProtocolSession()
652     {
653         closeProtocolSession(true);
654     }
655 
656     public void closeProtocolSession(boolean waitLast)
657     {
658         if (waitLast && (_lastWriteFuture != null))
659         {
660             _logger.debug("Waiting for last write to join.");
661             _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
662         }
663 
664         _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession);
665         final CloseFuture future = _minaProtocolSession.close();
666         future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
667 
668         try
669         {
670             _stateManager.changeState(AMQState.CONNECTION_CLOSED);
671         }
672         catch (AMQException e)
673         {
674             _logger.info(e.getMessage());
675         }
676     }
677 
678     public String toString()
679     {
680         return _minaProtocolSession.getRemoteAddress() "(" (getAuthorizedID() == null "?" : getAuthorizedID().getName() ")");
681     }
682 
683     public String dump()
684     {
685         return this " last_sent=" + _lastSent + " last_received=" + _lastReceived;
686     }
687 
688     /** @return an object that can be used to identity */
689     public Object getKey()
690     {
691         return _minaProtocolSession.getRemoteAddress();
692     }
693 
694     /**
695      * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may
696      * be bound to multiple addresses this could vary depending on the acceptor this session was created from.
697      *
698      @return a String FQDN
699      */
700     public String getLocalFQDN()
701     {
702         SocketAddress address = _minaProtocolSession.getLocalAddress();
703         // we use the vmpipe address in some tests hence the need for this rather ugly test. The host
704         // information is used by SASL primary.
705         if (address instanceof InetSocketAddress)
706         {
707             return ((InetSocketAddressaddress).getHostName();
708         }
709         else if (address instanceof VmPipeAddress)
710         {
711             return "vmpipe:" ((VmPipeAddressaddress).getPort();
712         }
713         else
714         {
715             throw new IllegalArgumentException("Unsupported socket address class: " + address);
716         }
717     }
718 
719     public SaslServer getSaslServer()
720     {
721         return _saslServer;
722     }
723 
724     public void setSaslServer(SaslServer saslServer)
725     {
726         _saslServer = saslServer;
727     }
728 
729     public FieldTable getClientProperties()
730     {
731         return _clientProperties;
732     }
733 
734     public void setClientProperties(FieldTable clientProperties)
735     {
736         _clientProperties = clientProperties;
737         if (_clientProperties != null)
738         {
739             if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE!= null)
740             {
741                 setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
742             }
743 
744             if (_clientProperties.getString(ClientProperties.version.toString()) != null)
745             {
746                 _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString()));
747             }
748         }
749         _sessionIdentifier = new ProtocolSessionIdentifier(this);
750     }
751 
752     private void setProtocolVersion(ProtocolVersion pv)
753     {
754         _protocolVersion = pv;
755 
756         _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
757         _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
758     }
759 
760     public byte getProtocolMajorVersion()
761     {
762         return _protocolVersion.getMajorVersion();
763     }
764 
765     public ProtocolVersion getProtocolVersion()
766     {
767         return _protocolVersion;
768     }
769 
770     public byte getProtocolMinorVersion()
771     {
772         return _protocolVersion.getMinorVersion();
773     }
774 
775     public boolean isProtocolVersion(byte major, byte minor)
776     {
777         return (getProtocolMajorVersion() == major&& (getProtocolMinorVersion() == minor);
778     }
779 
780     public MethodRegistry getRegistry()
781     {
782         return getMethodRegistry();
783     }
784 
785     public Object getClientIdentifier()
786     {
787         return (_minaProtocolSession != null? _minaProtocolSession.getRemoteAddress() null;
788     }
789 
790     public VirtualHost getVirtualHost()
791     {
792         return _virtualHost;
793     }
794 
795     public void setVirtualHost(VirtualHost virtualHostthrows AMQException
796     {
797         _virtualHost = virtualHost;
798 
799         _virtualHost.getConnectionRegistry().registerConnection(this);
800 
801         _managedObject = createMBean();
802         _managedObject.register();
803     }
804 
805     public void addSessionCloseTask(Task task)
806     {
807         _taskList.add(task);
808     }
809 
810     public void removeSessionCloseTask(Task task)
811     {
812         _taskList.remove(task);
813     }
814 
815     public ProtocolOutputConverter getProtocolOutputConverter()
816     {
817         return _protocolOutputConverter;
818     }
819 
820     public void setAuthorizedID(Principal authorizedID)
821     {
822         _authorizedID = authorizedID;
823     }
824 
825     public Principal getAuthorizedID()
826     {
827         return _authorizedID;
828     }
829 
830     public MethodRegistry getMethodRegistry()
831     {
832         return MethodRegistry.getMethodRegistry(getProtocolVersion());
833     }
834 
835     public MethodDispatcher getMethodDispatcher()
836     {
837         return _dispatcher;
838     }
839 
840     public ProtocolSessionIdentifier getSessionIdentifier()
841     {
842         return _sessionIdentifier;
843     }
844 
845     public String getClientVersion()
846     {
847         return (_clientVersion == nullnull : _clientVersion.toString();
848     }
849 
850     public void setSender(Sender<java.nio.ByteBuffer> sender)
851     {
852        // No-op, interface munging between this and AMQProtocolSession
853     }
854 
855     public void init()
856     {
857        // No-op, interface munging between this and AMQProtocolSession
858     }
859 }