AMQProtocolSession.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client.protocol;
022 
023 import org.apache.commons.lang.StringUtils;
024 import org.apache.mina.common.CloseFuture;
025 import org.apache.mina.common.IdleStatus;
026 import org.apache.mina.common.IoSession;
027 import org.apache.mina.common.WriteFuture;
028 import org.slf4j.Logger;
029 import org.slf4j.LoggerFactory;
030 
031 import javax.jms.JMSException;
032 import javax.security.sasl.SaslClient;
033 import java.util.concurrent.ConcurrentHashMap;
034 import java.util.concurrent.ConcurrentMap;
035 
036 import org.apache.qpid.AMQException;
037 import org.apache.qpid.client.AMQConnection;
038 import org.apache.qpid.client.AMQSession;
039 import org.apache.qpid.client.ConnectionTuneParameters;
040 import org.apache.qpid.client.message.UnprocessedMessage;
041 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
042 import org.apache.qpid.client.state.AMQStateManager;
043 import org.apache.qpid.client.state.AMQState;
044 import org.apache.qpid.framing.*;
045 import org.apache.qpid.protocol.AMQConstant;
046 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
047 import org.apache.qpid.transport.Sender;
048 import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
049 
050 /**
051  * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
052  * session is still available but clients should not use it to obtain session attributes.
053  */
054 public class AMQProtocolSession implements AMQVersionAwareProtocolSession
055 {
056     protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 60 2;
057 
058     protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class);
059 
060     public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
061 
062     protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters";
063 
064     protected static final String AMQ_CONNECTION = "AMQConnection";
065 
066     protected static final String SASL_CLIENT = "SASLClient";
067 
068     protected final IoSession _minaProtocolSession;
069 
070     protected WriteFuture _lastWriteFuture;
071 
072     /**
073      * The handler from which this session was created and which is used to handle protocol events. We send failover
074      * events to the handler.
075      */
076     protected final AMQProtocolHandler _protocolHandler;
077 
078     /** Maps from the channel id to the AMQSession that it represents. */
079     protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
080 
081     protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
082 
083     /**
084      * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
085      * first) with the subsequent content header and content bodies.
086      */
087     private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>();
088     private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
089 
090     /** Counter to ensure unique queue names */
091     protected int _queueId = 1;
092     protected final Object _queueIdLock = new Object();
093 
094     private ProtocolVersion _protocolVersion;
095 //    private VersionSpecificRegistry _registry =
096 //        MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
097 
098     private MethodRegistry _methodRegistry =
099             MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
100 
101     private MethodDispatcher _methodDispatcher;
102 
103     protected final AMQConnection _connection;
104 
105     private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
106 
107     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
108     {
109         _protocolHandler = protocolHandler;
110         _minaProtocolSession = protocolSession;
111         _minaProtocolSession.setAttachment(this);
112         // properties of the connection are made available to the event handlers
113         _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
114         // fixme - real value needed
115         _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
116         _protocolVersion = connection.getProtocolVersion();
117         _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
118                                                                            this);
119         _connection = connection;
120 
121     }
122 
123     public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
124     {
125         _protocolHandler = protocolHandler;
126         _minaProtocolSession = null;
127         _protocolVersion = connection.getProtocolVersion();
128         _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
129                                                                            this);
130         _connection = connection;
131     }
132 
133     public void init()
134     {
135         // start the process of setting up the connection. This is the first place that
136         // data is written to the server.
137         _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
138     }
139 
140     public String getClientID()
141     {
142         try
143         {
144             return getAMQConnection().getClientID();
145         }
146         catch (JMSException e)
147         {
148             // we never throw a JMSException here
149             return null;
150         }
151     }
152 
153     public void setClientID(String clientIDthrows JMSException
154     {
155         getAMQConnection().setClientID(clientID);
156     }
157 
158     public AMQStateManager getStateManager()
159     {
160         return _protocolHandler.getStateManager();
161     }
162 
163     public String getVirtualHost()
164     {
165         return getAMQConnection().getVirtualHost();
166     }
167 
168     public String getUsername()
169     {
170         return getAMQConnection().getUsername();
171     }
172 
173     public String getPassword()
174     {
175         return getAMQConnection().getPassword();
176     }
177 
178     public IoSession getIoSession()
179     {
180         return _minaProtocolSession;
181     }
182 
183     public SaslClient getSaslClient()
184     {
185         return (SaslClient_minaProtocolSession.getAttribute(SASL_CLIENT);    
186     }
187 
188     /**
189      * Store the SASL client currently being used for the authentication handshake
190      *
191      @param client if non-null, stores this in the session. if null clears any existing client being stored
192      */
193     public void setSaslClient(SaslClient client)
194     {
195         if (client == null)
196         {
197             _minaProtocolSession.removeAttribute(SASL_CLIENT);
198         }
199         else
200         {
201             _minaProtocolSession.setAttribute(SASL_CLIENT, client);
202         }
203     }
204 
205     public ConnectionTuneParameters getConnectionTuneParameters()
206     {
207         return (ConnectionTuneParameters_minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS);
208     }
209 
210     public void setConnectionTuneParameters(ConnectionTuneParameters params)
211     {
212         _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params);
213         AMQConnection con = getAMQConnection();
214         con.setMaximumChannelCount(params.getChannelMax());
215         con.setMaximumFrameSize(params.getFrameMax());
216         initHeartbeats((intparams.getHeartbeat());
217     }
218 
219     /**
220      * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA
221      * dispatcher thread.
222      *
223      @param message
224      *
225      @throws AMQException if this was not expected
226      */
227     public void unprocessedMessageReceived(final int channelId, UnprocessedMessage messagethrows AMQException
228     {        
229         if ((channelId & FAST_CHANNEL_ACCESS_MASK== 0)
230         {
231             _channelId2UnprocessedMsgArray[channelId= message;
232         }
233         else
234         {
235             _channelId2UnprocessedMsgMap.put(channelId, message);
236         }
237     }
238 
239     public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeaderthrows AMQException
240     {
241         final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK== ? _channelId2UnprocessedMsgArray[channelId]
242                                                : _channelId2UnprocessedMsgMap.get(channelId));
243 
244         if (msg == null)
245         {
246             throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first on session:" + this, null);
247         }
248 
249         if (msg.getContentHeader() != null)
250         {
251             throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames on session:" + this, null);
252         }
253 
254         msg.setContentHeader(contentHeader);
255         if (contentHeader.bodySize == 0)
256         {
257             deliverMessageToAMQSession(channelId, msg);
258         }
259     }
260 
261     public void contentBodyReceived(final int channelId, ContentBody contentBodythrows AMQException
262     {
263         UnprocessedMessage_0_8 msg;
264         final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK== 0;
265         if (fastAccess)
266         {
267             msg = (UnprocessedMessage_0_8_channelId2UnprocessedMsgArray[channelId];
268         }
269         else
270         {
271             msg = (UnprocessedMessage_0_8_channelId2UnprocessedMsgMap.get(channelId);
272         }
273 
274         if (msg == null)
275         {
276             throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first"null);
277         }
278 
279         if (msg.getContentHeader() == null)
280         {
281             if (fastAccess)
282             {
283                 _channelId2UnprocessedMsgArray[channelIdnull;
284             }
285             else
286             {
287                 _channelId2UnprocessedMsgMap.remove(channelId);
288             }
289             throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first"null);
290         }
291 
292         msg.receiveBody(contentBody);
293 
294         if (msg.isAllBodyDataReceived())
295         {
296             deliverMessageToAMQSession(channelId, msg);
297         }
298     }
299 
300     public void heartbeatBodyReceived(int channelId, HeartbeatBody bodythrows AMQException
301     {
302 
303     }
304 
305     /**
306      * Deliver a message to the appropriate session, removing the unprocessed message from our map
307      *
308      @param channelId the channel id the message should be delivered to
309      @param msg       the message
310      */
311     private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
312     {
313         AMQSession session = getSession(channelId);
314         session.messageReceived(msg);
315         if ((channelId & FAST_CHANNEL_ACCESS_MASK== 0)
316         {
317             _channelId2UnprocessedMsgArray[channelIdnull;
318         }
319         else
320         {
321             _channelId2UnprocessedMsgMap.remove(channelId);
322         }
323     }
324 
325     protected AMQSession getSession(int channelId)
326     {
327         return _connection.getSession(channelId);
328     }
329 
330     /**
331      * Convenience method that writes a frame to the protocol session. Equivalent to calling
332      * getProtocolSession().write().
333      *
334      @param frame the frame to write
335      */
336     public void writeFrame(AMQDataBlock frame)
337     {
338         writeFrame(frame, false);
339     }
340 
341     public void writeFrame(AMQDataBlock frame, boolean wait)
342     {
343         WriteFuture f = _minaProtocolSession.write(frame);
344         if (wait)
345         {
346             // fixme -- time out?
347             f.join();
348         }
349         else
350         {
351             _lastWriteFuture = f;
352         }
353     }
354 
355     /**
356      * Starts the process of closing a session
357      *
358      @param session the AMQSession being closed
359      */
360     public void closeSession(AMQSession session)
361     {
362         _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
363         final int channelId = session.getChannelId();
364         if (channelId <= 0)
365         {
366             throw new IllegalArgumentException("Attempt to close a channel with id < 0");
367         }
368         // we need to know when a channel is closing so that we can respond
369         // with a channel.close frame when we receive any other type of frame
370         // on that channel
371         _closingChannels.putIfAbsent(channelId, session);
372     }
373 
374     /**
375      * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is
376      * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if
377      * appropriate.
378      *
379      @param channelId the id of the channel (session)
380      *
381      @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if
382      *         the channel close is just the server responding to the client's earlier request to close the channel.
383      */
384     public boolean channelClosed(int channelId, AMQConstant code, String textthrows AMQException
385     {
386 
387         // if this is not a response to an earlier request to close the channel
388         if (_closingChannels.remove(channelId== null)
389         {
390             final AMQSession session = getSession(channelId);
391             try
392             {
393                 session.closed(new AMQException(code, text, null));
394             }
395             catch (JMSException e)
396             {
397                 throw new AMQException(null, "JMSException received while closing session", e);
398             }
399 
400             return true;
401         }
402         else
403         {
404             return false;
405         }
406     }
407 
408     public AMQConnection getAMQConnection()
409     {
410         return (AMQConnection_minaProtocolSession.getAttribute(AMQ_CONNECTION);
411     }
412 
413     public void closeProtocolSession() throws AMQException
414     {
415         closeProtocolSession(true);
416     }
417 
418     public void closeProtocolSession(boolean waitLastthrows AMQException
419     {
420         _logger.debug("Waiting for last write to join.");
421         if (waitLast && (_lastWriteFuture != null))
422         {
423             _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
424         }
425 
426         _logger.debug("Closing protocol session");
427         
428         final CloseFuture future = _minaProtocolSession.close();
429 
430         // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
431         // then wait for the connection to close.
432         // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
433         // error now shouldn't matter.
434 
435         _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
436         future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
437     }
438 
439     public void failover(String host, int port)
440     {
441         _protocolHandler.failover(host, port);
442     }
443 
444     protected AMQShortString generateQueueName()
445     {
446         int id;
447         synchronized (_queueIdLock)
448         {
449             id = _queueId++;
450         }
451         // get rid of / and : and ; from address for spec conformance
452         String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString()"/;:""");
453 
454         return new AMQShortString("tmp_" + localAddress + "_" + id);
455     }
456 
457     /** @param delay delay in seconds (not ms) */
458     void initHeartbeats(int delay)
459     {
460         if (delay > 0)
461         {
462             _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
463             _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay));
464             HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
465         }
466     }
467 
468     public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
469     {
470         final AMQSession session = getSession(channelId);
471 
472         session.confirmConsumerCancelled(consumerTag.toIntValue());
473     }
474 
475     public void setProtocolVersion(final ProtocolVersion pv)
476     {
477         _protocolVersion = pv;
478         _methodRegistry = MethodRegistry.getMethodRegistry(pv);
479         _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
480 
481         //  _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
482     }
483 
484     public byte getProtocolMinorVersion()
485     {
486         return _protocolVersion.getMinorVersion();
487     }
488 
489     public byte getProtocolMajorVersion()
490     {
491         return _protocolVersion.getMajorVersion();
492     }
493 
494     public ProtocolVersion getProtocolVersion()
495     {
496         return _protocolVersion;
497     }
498 
499 //    public VersionSpecificRegistry getRegistry()
500 //    {
501 //        return _registry;
502 //    }
503 
504     public MethodRegistry getMethodRegistry()
505     {
506         return _methodRegistry;
507     }
508 
509     public MethodDispatcher getMethodDispatcher()
510     {
511         return _methodDispatcher;
512     }
513 
514     public void setTicket(int ticket, int channelId)
515     {
516         final AMQSession session = getSession(channelId);
517         session.setTicket(ticket);
518     }
519 
520     public void setMethodDispatcher(MethodDispatcher methodDispatcher)
521     {
522         _methodDispatcher = methodDispatcher;
523     }
524 
525     public void setFlowControl(final int channelId, final boolean active)
526     {
527         final AMQSession session = getSession(channelId);
528         session.setFlowControl(active);
529     }
530 
531     public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBodythrows AMQException
532     {
533         _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
534     }
535 
536     public void notifyError(Exception error)
537     {
538         _protocolHandler.propagateExceptionToAllWaiters(error);
539     }
540 
541     public void setSender(Sender<java.nio.ByteBuffer> sender)
542     {
543         // No-op, interface munging
544     }
545 }