001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client.protocol;
022
023 import org.apache.commons.lang.StringUtils;
024 import org.apache.mina.common.CloseFuture;
025 import org.apache.mina.common.IdleStatus;
026 import org.apache.mina.common.IoSession;
027 import org.apache.mina.common.WriteFuture;
028 import org.slf4j.Logger;
029 import org.slf4j.LoggerFactory;
030
031 import javax.jms.JMSException;
032 import javax.security.sasl.SaslClient;
033 import java.util.concurrent.ConcurrentHashMap;
034 import java.util.concurrent.ConcurrentMap;
035
036 import org.apache.qpid.AMQException;
037 import org.apache.qpid.client.AMQConnection;
038 import org.apache.qpid.client.AMQSession;
039 import org.apache.qpid.client.ConnectionTuneParameters;
040 import org.apache.qpid.client.message.UnprocessedMessage;
041 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
042 import org.apache.qpid.client.state.AMQStateManager;
043 import org.apache.qpid.client.state.AMQState;
044 import org.apache.qpid.framing.*;
045 import org.apache.qpid.protocol.AMQConstant;
046 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
047 import org.apache.qpid.transport.Sender;
048 import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
049
050 /**
051 * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
052 * session is still available but clients should not use it to obtain session attributes.
053 */
054 public class AMQProtocolSession implements AMQVersionAwareProtocolSession
055 {
056 protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
057
058 protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class);
059
060 public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
061
062 protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters";
063
064 protected static final String AMQ_CONNECTION = "AMQConnection";
065
066 protected static final String SASL_CLIENT = "SASLClient";
067
068 protected final IoSession _minaProtocolSession;
069
070 protected WriteFuture _lastWriteFuture;
071
072 /**
073 * The handler from which this session was created and which is used to handle protocol events. We send failover
074 * events to the handler.
075 */
076 protected final AMQProtocolHandler _protocolHandler;
077
078 /** Maps from the channel id to the AMQSession that it represents. */
079 protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
080
081 protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
082
083 /**
084 * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
085 * first) with the subsequent content header and content bodies.
086 */
087 private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>();
088 private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
089
090 /** Counter to ensure unique queue names */
091 protected int _queueId = 1;
092 protected final Object _queueIdLock = new Object();
093
094 private ProtocolVersion _protocolVersion;
095 // private VersionSpecificRegistry _registry =
096 // MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
097
098 private MethodRegistry _methodRegistry =
099 MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
100
101 private MethodDispatcher _methodDispatcher;
102
103 protected final AMQConnection _connection;
104
105 private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
106
107 public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
108 {
109 _protocolHandler = protocolHandler;
110 _minaProtocolSession = protocolSession;
111 _minaProtocolSession.setAttachment(this);
112 // properties of the connection are made available to the event handlers
113 _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
114 // fixme - real value needed
115 _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
116 _protocolVersion = connection.getProtocolVersion();
117 _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
118 this);
119 _connection = connection;
120
121 }
122
123 public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
124 {
125 _protocolHandler = protocolHandler;
126 _minaProtocolSession = null;
127 _protocolVersion = connection.getProtocolVersion();
128 _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
129 this);
130 _connection = connection;
131 }
132
133 public void init()
134 {
135 // start the process of setting up the connection. This is the first place that
136 // data is written to the server.
137 _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
138 }
139
140 public String getClientID()
141 {
142 try
143 {
144 return getAMQConnection().getClientID();
145 }
146 catch (JMSException e)
147 {
148 // we never throw a JMSException here
149 return null;
150 }
151 }
152
153 public void setClientID(String clientID) throws JMSException
154 {
155 getAMQConnection().setClientID(clientID);
156 }
157
158 public AMQStateManager getStateManager()
159 {
160 return _protocolHandler.getStateManager();
161 }
162
163 public String getVirtualHost()
164 {
165 return getAMQConnection().getVirtualHost();
166 }
167
168 public String getUsername()
169 {
170 return getAMQConnection().getUsername();
171 }
172
173 public String getPassword()
174 {
175 return getAMQConnection().getPassword();
176 }
177
178 public IoSession getIoSession()
179 {
180 return _minaProtocolSession;
181 }
182
183 public SaslClient getSaslClient()
184 {
185 return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
186 }
187
188 /**
189 * Store the SASL client currently being used for the authentication handshake
190 *
191 * @param client if non-null, stores this in the session. if null clears any existing client being stored
192 */
193 public void setSaslClient(SaslClient client)
194 {
195 if (client == null)
196 {
197 _minaProtocolSession.removeAttribute(SASL_CLIENT);
198 }
199 else
200 {
201 _minaProtocolSession.setAttribute(SASL_CLIENT, client);
202 }
203 }
204
205 public ConnectionTuneParameters getConnectionTuneParameters()
206 {
207 return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS);
208 }
209
210 public void setConnectionTuneParameters(ConnectionTuneParameters params)
211 {
212 _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params);
213 AMQConnection con = getAMQConnection();
214 con.setMaximumChannelCount(params.getChannelMax());
215 con.setMaximumFrameSize(params.getFrameMax());
216 initHeartbeats((int) params.getHeartbeat());
217 }
218
219 /**
220 * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA
221 * dispatcher thread.
222 *
223 * @param message
224 *
225 * @throws AMQException if this was not expected
226 */
227 public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
228 {
229 if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
230 {
231 _channelId2UnprocessedMsgArray[channelId] = message;
232 }
233 else
234 {
235 _channelId2UnprocessedMsgMap.put(channelId, message);
236 }
237 }
238
239 public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
240 {
241 final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
242 : _channelId2UnprocessedMsgMap.get(channelId));
243
244 if (msg == null)
245 {
246 throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first on session:" + this, null);
247 }
248
249 if (msg.getContentHeader() != null)
250 {
251 throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames on session:" + this, null);
252 }
253
254 msg.setContentHeader(contentHeader);
255 if (contentHeader.bodySize == 0)
256 {
257 deliverMessageToAMQSession(channelId, msg);
258 }
259 }
260
261 public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException
262 {
263 UnprocessedMessage_0_8 msg;
264 final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
265 if (fastAccess)
266 {
267 msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId];
268 }
269 else
270 {
271 msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgMap.get(channelId);
272 }
273
274 if (msg == null)
275 {
276 throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first", null);
277 }
278
279 if (msg.getContentHeader() == null)
280 {
281 if (fastAccess)
282 {
283 _channelId2UnprocessedMsgArray[channelId] = null;
284 }
285 else
286 {
287 _channelId2UnprocessedMsgMap.remove(channelId);
288 }
289 throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null);
290 }
291
292 msg.receiveBody(contentBody);
293
294 if (msg.isAllBodyDataReceived())
295 {
296 deliverMessageToAMQSession(channelId, msg);
297 }
298 }
299
300 public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
301 {
302
303 }
304
305 /**
306 * Deliver a message to the appropriate session, removing the unprocessed message from our map
307 *
308 * @param channelId the channel id the message should be delivered to
309 * @param msg the message
310 */
311 private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
312 {
313 AMQSession session = getSession(channelId);
314 session.messageReceived(msg);
315 if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
316 {
317 _channelId2UnprocessedMsgArray[channelId] = null;
318 }
319 else
320 {
321 _channelId2UnprocessedMsgMap.remove(channelId);
322 }
323 }
324
325 protected AMQSession getSession(int channelId)
326 {
327 return _connection.getSession(channelId);
328 }
329
330 /**
331 * Convenience method that writes a frame to the protocol session. Equivalent to calling
332 * getProtocolSession().write().
333 *
334 * @param frame the frame to write
335 */
336 public void writeFrame(AMQDataBlock frame)
337 {
338 writeFrame(frame, false);
339 }
340
341 public void writeFrame(AMQDataBlock frame, boolean wait)
342 {
343 WriteFuture f = _minaProtocolSession.write(frame);
344 if (wait)
345 {
346 // fixme -- time out?
347 f.join();
348 }
349 else
350 {
351 _lastWriteFuture = f;
352 }
353 }
354
355 /**
356 * Starts the process of closing a session
357 *
358 * @param session the AMQSession being closed
359 */
360 public void closeSession(AMQSession session)
361 {
362 _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
363 final int channelId = session.getChannelId();
364 if (channelId <= 0)
365 {
366 throw new IllegalArgumentException("Attempt to close a channel with id < 0");
367 }
368 // we need to know when a channel is closing so that we can respond
369 // with a channel.close frame when we receive any other type of frame
370 // on that channel
371 _closingChannels.putIfAbsent(channelId, session);
372 }
373
374 /**
375 * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is
376 * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if
377 * appropriate.
378 *
379 * @param channelId the id of the channel (session)
380 *
381 * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if
382 * the channel close is just the server responding to the client's earlier request to close the channel.
383 */
384 public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException
385 {
386
387 // if this is not a response to an earlier request to close the channel
388 if (_closingChannels.remove(channelId) == null)
389 {
390 final AMQSession session = getSession(channelId);
391 try
392 {
393 session.closed(new AMQException(code, text, null));
394 }
395 catch (JMSException e)
396 {
397 throw new AMQException(null, "JMSException received while closing session", e);
398 }
399
400 return true;
401 }
402 else
403 {
404 return false;
405 }
406 }
407
408 public AMQConnection getAMQConnection()
409 {
410 return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
411 }
412
413 public void closeProtocolSession() throws AMQException
414 {
415 closeProtocolSession(true);
416 }
417
418 public void closeProtocolSession(boolean waitLast) throws AMQException
419 {
420 _logger.debug("Waiting for last write to join.");
421 if (waitLast && (_lastWriteFuture != null))
422 {
423 _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
424 }
425
426 _logger.debug("Closing protocol session");
427
428 final CloseFuture future = _minaProtocolSession.close();
429
430 // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
431 // then wait for the connection to close.
432 // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
433 // error now shouldn't matter.
434
435 _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
436 future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
437 }
438
439 public void failover(String host, int port)
440 {
441 _protocolHandler.failover(host, port);
442 }
443
444 protected AMQShortString generateQueueName()
445 {
446 int id;
447 synchronized (_queueIdLock)
448 {
449 id = _queueId++;
450 }
451 // get rid of / and : and ; from address for spec conformance
452 String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
453
454 return new AMQShortString("tmp_" + localAddress + "_" + id);
455 }
456
457 /** @param delay delay in seconds (not ms) */
458 void initHeartbeats(int delay)
459 {
460 if (delay > 0)
461 {
462 _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
463 _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay));
464 HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
465 }
466 }
467
468 public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
469 {
470 final AMQSession session = getSession(channelId);
471
472 session.confirmConsumerCancelled(consumerTag.toIntValue());
473 }
474
475 public void setProtocolVersion(final ProtocolVersion pv)
476 {
477 _protocolVersion = pv;
478 _methodRegistry = MethodRegistry.getMethodRegistry(pv);
479 _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
480
481 // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
482 }
483
484 public byte getProtocolMinorVersion()
485 {
486 return _protocolVersion.getMinorVersion();
487 }
488
489 public byte getProtocolMajorVersion()
490 {
491 return _protocolVersion.getMajorVersion();
492 }
493
494 public ProtocolVersion getProtocolVersion()
495 {
496 return _protocolVersion;
497 }
498
499 // public VersionSpecificRegistry getRegistry()
500 // {
501 // return _registry;
502 // }
503
504 public MethodRegistry getMethodRegistry()
505 {
506 return _methodRegistry;
507 }
508
509 public MethodDispatcher getMethodDispatcher()
510 {
511 return _methodDispatcher;
512 }
513
514 public void setTicket(int ticket, int channelId)
515 {
516 final AMQSession session = getSession(channelId);
517 session.setTicket(ticket);
518 }
519
520 public void setMethodDispatcher(MethodDispatcher methodDispatcher)
521 {
522 _methodDispatcher = methodDispatcher;
523 }
524
525 public void setFlowControl(final int channelId, final boolean active)
526 {
527 final AMQSession session = getSession(channelId);
528 session.setFlowControl(active);
529 }
530
531 public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
532 {
533 _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
534 }
535
536 public void notifyError(Exception error)
537 {
538 _protocolHandler.propagateExceptionToAllWaiters(error);
539 }
540
541 public void setSender(Sender<java.nio.ByteBuffer> sender)
542 {
543 // No-op, interface munging
544 }
545 }
|