001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client;
022
023
024 import javax.jms.*;
025 import javax.jms.IllegalStateException;
026
027 import org.apache.qpid.AMQException;
028 import org.apache.qpid.AMQUndeliveredException;
029 import org.apache.qpid.client.failover.FailoverException;
030 import org.apache.qpid.client.failover.FailoverProtectedOperation;
031 import org.apache.qpid.client.failover.FailoverRetrySupport;
032 import org.apache.qpid.client.message.*;
033 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
034 import org.apache.qpid.client.protocol.AMQProtocolHandler;
035 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
036 import org.apache.qpid.common.AMQPFilterTypes;
037 import org.apache.qpid.framing.*;
038 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
039 import org.apache.qpid.jms.Session;
040 import org.apache.qpid.protocol.AMQConstant;
041 import org.apache.qpid.protocol.AMQMethodEvent;
042 import org.slf4j.Logger;
043 import org.slf4j.LoggerFactory;
044
045 import java.util.Map;
046
047 public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
048 {
049
050 /** Used for debugging. */
051 private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
052
053 /**
054 * Creates a new session on a connection.
055 *
056 * @param con The connection on which to create the session.
057 * @param channelId The unique identifier for the session.
058 * @param transacted Indicates whether or not the session is transactional.
059 * @param acknowledgeMode The acknoledgement mode for the session.
060 * @param messageFactoryRegistry The message factory factory for the session.
061 * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
062 * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
063 */
064 AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
065 MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
066 {
067
068 super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
069 }
070
071 /**
072 * Creates a new session on a connection with the default message factory factory.
073 *
074 * @param con The connection on which to create the session.
075 * @param channelId The unique identifier for the session.
076 * @param transacted Indicates whether or not the session is transactional.
077 * @param acknowledgeMode The acknoledgement mode for the session.
078 * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
079 * @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
080 */
081 AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
082 int defaultPrefetchLow)
083 {
084 this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
085 defaultPrefetchLow);
086 }
087
088 private ProtocolVersion getProtocolVersion()
089 {
090 return getProtocolHandler().getProtocolVersion();
091 }
092
093 public void acknowledgeMessage(long deliveryTag, boolean multiple)
094 {
095 BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
096
097 final AMQFrame ackFrame = body.generateFrame(_channelId);
098
099 if (_logger.isDebugEnabled())
100 {
101 _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
102 }
103
104 getProtocolHandler().writeFrame(ackFrame);
105 _unacknowledgedMessageTags.remove(deliveryTag);
106 }
107
108 public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
109 final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException
110 {
111 getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
112 (getTicket(),queueName,exchangeName,routingKey,false,arguments).
113 generateFrame(_channelId), QueueBindOkBody.class);
114 }
115
116 public void sendClose(long timeout) throws AMQException, FailoverException
117 {
118 getProtocolHandler().closeSession(this);
119 getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
120 new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId),
121 ChannelCloseOkBody.class, timeout);
122 // When control resumes at this point, a reply will have been received that
123 // indicates the broker has closed the channel successfully.
124 }
125
126 public void sendCommit() throws AMQException, FailoverException
127 {
128 final AMQProtocolHandler handler = getProtocolHandler();
129
130 handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
131 }
132
133 public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
134 FailoverException
135 {
136 FieldTable table = null;
137 if(arguments != null && !arguments.isEmpty())
138 {
139 table = new FieldTable();
140 for(Map.Entry<String, Object> entry : arguments.entrySet())
141 {
142 table.setObject(entry.getKey(), entry.getValue());
143 }
144 }
145 QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
146 AMQFrame queueDeclare = body.generateFrame(_channelId);
147 getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
148 }
149
150 public void sendRecover() throws AMQException, FailoverException
151 {
152 _unacknowledgedMessageTags.clear();
153
154 if (isStrictAMQP())
155 {
156 // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
157
158 BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
159 _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
160 _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
161 }
162 else
163 {
164 // in Qpid the 0-8 spec was hacked to have a recover-ok method... this is bad
165 // in 0-9 we used the cleaner addition of a new sync recover method with its own ok
166 if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
167 {
168 BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
169 _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
170 }
171 else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
172 {
173 BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
174 _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
175 }
176 else
177 {
178 throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion());
179 }
180 }
181 }
182
183 public void releaseForRollback()
184 {
185 while (true)
186 {
187 Long tag = _deliveredMessageTags.poll();
188 if (tag == null)
189 {
190 break;
191 }
192
193 rejectMessage(tag, true);
194 }
195
196 if (_dispatcher != null)
197 {
198 _dispatcher.rollback();
199 }
200 }
201
202 public void rejectMessage(long deliveryTag, boolean requeue)
203 {
204 if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED))
205 {
206 if (_logger.isDebugEnabled())
207 {
208 _logger.debug("Rejecting delivery tag:" + deliveryTag + ":SessionHC:" + this.hashCode());
209 }
210
211 BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
212 AMQFrame frame = body.generateFrame(_channelId);
213
214 _connection.getProtocolHandler().writeFrame(frame);
215 }
216 }
217
218 public boolean isQueueBound(final AMQDestination destination) throws JMSException
219 {
220 return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
221 }
222
223
224 public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
225 throws JMSException
226 {
227 try
228 {
229 AMQMethodEvent response = new FailoverRetrySupport<AMQMethodEvent, AMQException>(
230 new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
231 {
232 public AMQMethodEvent execute() throws AMQException, FailoverException
233 {
234 AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
235 (exchangeName, routingKey, queueName).generateFrame(_channelId);
236
237 return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
238
239 }
240 }, _connection).execute();
241
242 // Extract and return the response code from the query.
243 ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
244
245 return (responseBody.getReplyCode() == 0);
246 }
247 catch (AMQException e)
248 {
249 throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
250 }
251 }
252
253 @Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
254 AMQShortString queueName,
255 AMQProtocolHandler protocolHandler,
256 boolean nowait,
257 String messageSelector,
258 int tag) throws AMQException, FailoverException
259 {
260 FieldTable arguments = FieldTableFactory.newFieldTable();
261 if ((messageSelector != null) && !messageSelector.equals(""))
262 {
263 arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
264 }
265
266 if (consumer.isAutoClose())
267 {
268 arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
269 }
270
271 if (consumer.isNoConsume())
272 {
273 arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
274 }
275
276 BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
277 queueName,
278 new AMQShortString(String.valueOf(tag)),
279 consumer.isNoLocal(),
280 consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
281 consumer.isExclusive(),
282 nowait,
283 arguments);
284
285
286 AMQFrame jmsConsume = body.generateFrame(_channelId);
287
288 if (nowait)
289 {
290 protocolHandler.writeFrame(jmsConsume);
291 }
292 else
293 {
294 protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
295 }
296 }
297
298 public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
299 final boolean nowait) throws AMQException, FailoverException
300 {
301 ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
302 name.toString().startsWith("amq."),
303 false,false,false,nowait,null);
304 AMQFrame exchangeDeclare = body.generateFrame(_channelId);
305
306 protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
307 }
308
309 public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
310 {
311 QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
312
313 AMQFrame queueDeclare = body.generateFrame(_channelId);
314
315 protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
316 }
317
318 public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
319 {
320 QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(),
321 queueName,
322 false,
323 false,
324 true);
325 AMQFrame queueDeleteFrame = body.generateFrame(_channelId);
326
327 getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
328 }
329
330 public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
331 {
332 ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend);
333 AMQFrame channelFlowFrame = body.generateFrame(_channelId);
334 _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
335 }
336
337 public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
338 final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments,
339 final boolean noConsume, final boolean autoClose) throws JMSException
340 {
341
342 final AMQProtocolHandler protocolHandler = getProtocolHandler();
343 return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
344 _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow,
345 exclusive, _acknowledgeMode, noConsume, autoClose);
346 }
347
348
349 public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
350 final boolean immediate, final boolean waitUntilSent, long producerId)
351 {
352
353 return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
354 this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
355 }
356
357
358 @Override public void messageReceived(UnprocessedMessage message)
359 {
360
361 if (message instanceof ReturnMessage)
362 {
363 // Return of the bounced message.
364 returnBouncedMessage((ReturnMessage) message);
365 }
366 else
367 {
368 super.messageReceived(message);
369 }
370 }
371
372 private void returnBouncedMessage(final ReturnMessage msg)
373 {
374 _connection.performConnectionTask(new Runnable()
375 {
376 public void run()
377 {
378 try
379 {
380 // Bounced message is processed here, away from the mina thread
381 AbstractJMSMessage bouncedMessage =
382 _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
383 msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
384 AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
385 AMQShortString reason = msg.getReplyText();
386 _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
387
388 // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
389 if (errorCode == AMQConstant.NO_CONSUMERS)
390 {
391 _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
392 }
393 else if (errorCode == AMQConstant.NO_ROUTE)
394 {
395 _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
396 }
397 else
398 {
399 _connection.exceptionReceived(
400 new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
401 }
402
403 }
404 catch (Exception e)
405 {
406 _logger.error(
407 "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
408 e);
409 }
410 }
411 });
412 }
413
414
415
416
417 public void sendRollback() throws AMQException, FailoverException
418 {
419 TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
420 AMQFrame frame = body.generateFrame(getChannelId());
421 getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
422 }
423
424 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
425 {
426
427 checkNotClosed();
428 AMQTopic origTopic = checkValidTopic(topic);
429 AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
430 TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name);
431 if (subscriber != null)
432 {
433 if (subscriber.getTopic().equals(topic))
434 {
435 throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
436 + name);
437 }
438 else
439 {
440 unsubscribe(name);
441 }
442 }
443 else
444 {
445 AMQShortString topicName;
446 if (topic instanceof AMQTopic)
447 {
448 topicName = ((AMQTopic) topic).getRoutingKey();
449 }
450 else
451 {
452 topicName = new AMQShortString(topic.getTopicName());
453 }
454
455 if (_strictAMQP)
456 {
457 if (_strictAMQPFATAL)
458 {
459 throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
460 }
461 else
462 {
463 _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
464 + "for creation durableSubscriber. Requesting queue deletion regardless.");
465 }
466
467 deleteQueue(dest.getAMQQueueName());
468 }
469 else
470 {
471 // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
472 // says we must trash the subscription.
473 if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
474 && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
475 {
476 deleteQueue(dest.getAMQQueueName());
477 }
478 }
479 }
480
481 subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
482
483 _subscriptions.put(name, subscriber);
484 _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
485
486 return subscriber;
487 }
488
489
490
491
492 public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
493 {
494 new FailoverRetrySupport<Object, AMQException>(
495 new FailoverProtectedOperation<Object, AMQException>()
496 {
497 public Object execute() throws AMQException, FailoverException
498 {
499
500 BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
501
502 // todo send low water mark when protocol allows.
503 // todo Be aware of possible changes to parameter order as versions change.
504 getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
505
506 return null;
507 }
508 }, _connection).execute();
509 }
510
511 class QueueDeclareOkHandler extends SpecificMethodFrameListener
512 {
513
514 private long _messageCount;
515 private long _consumerCount;
516
517 public QueueDeclareOkHandler()
518 {
519 super(getChannelId(), QueueDeclareOkBody.class);
520 }
521
522 public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
523 {
524 boolean matches = super.processMethod(channelId, frame);
525 if (matches)
526 {
527 QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame;
528 _messageCount = declareOk.getMessageCount();
529 _consumerCount = declareOk.getConsumerCount();
530 }
531 return matches;
532 }
533
534 }
535
536 protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
537 {
538 AMQFrame queueDeclare =
539 getMethodRegistry().createQueueDeclareBody(getTicket(),
540 amqd.getAMQQueueName(),
541 true,
542 amqd.isDurable(),
543 amqd.isExclusive(),
544 amqd.isAutoDelete(),
545 false,
546 null).generateFrame(_channelId);
547 QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
548 getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
549 return okHandler._messageCount;
550 }
551
552 protected final boolean tagLE(long tag1, long tag2)
553 {
554 return tag1 <= tag2;
555 }
556
557 protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
558 {
559 return false;
560 }
561
562 public AMQMessageDelegateFactory getMessageDelegateFactory()
563 {
564 return AMQMessageDelegateFactory.FACTORY_0_8;
565 }
566
567 }
|