001 /* Licensed to the Apache Software Foundation (ASF) under one
002 * or more contributor license agreements. See the NOTICE file
003 * distributed with this work for additional information
004 * regarding copyright ownership. The ASF licenses this file
005 * to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance
007 * with the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing,
012 * software distributed under the License is distributed on an
013 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
014 * KIND, either express or implied. See the License for the
015 * specific language governing permissions and limitations
016 * under the License.
017 */
018 package org.apache.qpid.client;
019
020 import org.apache.qpid.framing.AMQShortString;
021 import org.apache.qpid.framing.FieldTable;
022 import org.apache.qpid.AMQException;
023 import org.apache.qpid.protocol.AMQConstant;
024 import org.apache.qpid.client.failover.FailoverException;
025 import org.apache.qpid.client.failover.FailoverNoopSupport;
026 import org.apache.qpid.client.failover.FailoverProtectedOperation;
027 import org.apache.qpid.client.protocol.AMQProtocolHandler;
028 import org.apache.qpid.client.message.MessageFactoryRegistry;
029 import org.apache.qpid.client.message.FiledTableSupport;
030 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
031 import org.apache.qpid.client.message.UnprocessedMessage_0_10;
032 import org.apache.qpid.util.Serial;
033 import org.apache.qpid.transport.ExecutionException;
034 import org.apache.qpid.transport.MessageAcceptMode;
035 import org.apache.qpid.transport.MessageAcquireMode;
036 import org.apache.qpid.transport.MessageCreditUnit;
037 import org.apache.qpid.transport.MessageFlowMode;
038 import org.apache.qpid.transport.MessageTransfer;
039 import org.apache.qpid.transport.RangeSet;
040 import org.apache.qpid.transport.Option;
041 import org.apache.qpid.transport.ExchangeBoundResult;
042 import org.apache.qpid.transport.Future;
043 import org.apache.qpid.transport.Range;
044 import org.apache.qpid.transport.Session;
045 import org.apache.qpid.transport.SessionException;
046 import org.apache.qpid.transport.SessionListener;
047 import org.slf4j.Logger;
048 import org.slf4j.LoggerFactory;
049
050 import static org.apache.qpid.transport.Option.*;
051
052 import javax.jms.*;
053 import javax.jms.IllegalStateException;
054
055 import java.util.Date;
056 import java.util.HashMap;
057 import java.util.UUID;
058 import java.util.Map;
059 import java.util.Timer;
060 import java.util.TimerTask;
061
062 /**
063 * This is a 0.10 Session
064 */
065 public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
066 implements SessionListener
067 {
068
069 /**
070 * This class logger
071 */
072 private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
073
074 private static Timer timer = new Timer("ack-flusher", true);
075
076
077 /**
078 * The underlying QpidSession
079 */
080 private Session _qpidSession;
081
082 /**
083 * The latest qpid Exception that has been reaised.
084 */
085 private Object _currentExceptionLock = new Object();
086 private SessionException _currentException;
087
088 // a ref on the qpid connection
089 protected org.apache.qpid.transport.Connection _qpidConnection;
090
091 private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
092 private TimerTask flushTask = null;
093 private RangeSet unacked = new RangeSet();
094 private int unackedCount = 0;
095
096 /**
097 * USed to store the range of in tx messages
098 */
099 private RangeSet _txRangeSet = new RangeSet();
100 private int _txSize = 0;
101 //--- constructors
102
103 /**
104 * Creates a new session on a connection.
105 *
106 * @param con The connection on which to create the session.
107 * @param channelId The unique identifier for the session.
108 * @param transacted Indicates whether or not the session is transactional.
109 * @param acknowledgeMode The acknoledgement mode for the session.
110 * @param messageFactoryRegistry The message factory factory for the session.
111 * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
112 * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
113 * @param qpidConnection The qpid connection
114 */
115 AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
116 boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
117 int defaultPrefetchHighMark, int defaultPrefetchLowMark)
118 {
119
120 super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
121 defaultPrefetchLowMark);
122 _qpidConnection = qpidConnection;
123 _qpidSession = _qpidConnection.createSession(1);
124 _qpidSession.setSessionListener(this);
125 if (_transacted)
126 {
127 _qpidSession.txSelect();
128 }
129
130 if (maxAckDelay > 0)
131 {
132 flushTask = new TimerTask()
133 {
134 public void run()
135 {
136 try
137 {
138 flushAcknowledgments();
139 }
140 catch (Throwable t)
141 {
142 _logger.error("error flushing acks", t);
143 }
144 }
145 };
146 timer.schedule(flushTask, new Date(), maxAckDelay);
147 }
148 }
149
150 /**
151 * Creates a new session on a connection with the default 0-10 message factory.
152 *
153 * @param con The connection on which to create the session.
154 * @param channelId The unique identifier for the session.
155 * @param transacted Indicates whether or not the session is transactional.
156 * @param acknowledgeMode The acknoledgement mode for the session.
157 * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
158 * @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
159 * @param qpidConnection The connection
160 */
161 AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
162 boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
163 {
164
165 this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
166 defaultPrefetchHigh, defaultPrefetchLow);
167 }
168
169 private void addUnacked(int id)
170 {
171 synchronized (unacked)
172 {
173 unacked.add(id);
174 unackedCount++;
175 }
176 }
177
178 private void clearUnacked()
179 {
180 synchronized (unacked)
181 {
182 unacked.clear();
183 unackedCount = 0;
184 }
185 }
186
187 //------- overwritten methods of class AMQSession
188
189 void failoverPrep()
190 {
191 super.failoverPrep();
192 clearUnacked();
193 }
194
195 /**
196 * Acknowledge one or many messages.
197 *
198 * @param deliveryTag The tag of the last message to be acknowledged.
199 * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the
200 * delivery tag, <tt>false</tt> to just acknowledge that message.
201 */
202
203 public void acknowledgeMessage(long deliveryTag, boolean multiple)
204 {
205 if (_logger.isDebugEnabled())
206 {
207 _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId);
208 }
209 // acknowledge this message
210 if (multiple)
211 {
212 for (Long messageTag : _unacknowledgedMessageTags)
213 {
214 if( messageTag <= deliveryTag )
215 {
216 addUnacked(messageTag.intValue());
217 _unacknowledgedMessageTags.remove(messageTag);
218 }
219 }
220 //empty the list of unack messages
221
222 }
223 else
224 {
225 addUnacked((int) deliveryTag);
226 _unacknowledgedMessageTags.remove(deliveryTag);
227 }
228
229 long prefetch = getAMQConnection().getMaxPrefetch();
230
231 if (unackedCount >= prefetch/2 || maxAckDelay <= 0)
232 {
233 flushAcknowledgments();
234 }
235 }
236
237 void flushAcknowledgments()
238 {
239 synchronized (unacked)
240 {
241 if (unackedCount > 0)
242 {
243 messageAcknowledge
244 (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
245 clearUnacked();
246 }
247 }
248 }
249
250 void messageAcknowledge(RangeSet ranges, boolean accept)
251 {
252 Session ssn = getQpidSession();
253 for (Range range : ranges)
254 {
255 ssn.processed(range);
256 }
257 ssn.flushProcessed(accept ? BATCH : NONE);
258 if (accept)
259 {
260 ssn.messageAccept(ranges, UNRELIABLE);
261 }
262 }
263
264 /**
265 * Bind a queue with an exchange.
266 *
267 * @param queueName Specifies the name of the queue to bind. If the queue name is empty,
268 * refers to the current
269 * queue for the session, which is the last declared queue.
270 * @param exchangeName The exchange name.
271 * @param routingKey Specifies the routing key for the binding.
272 * @param arguments 0_8 specific
273 */
274 public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
275 final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination)
276 throws AMQException, FailoverException
277 {
278 Map args = FiledTableSupport.convertToMap(arguments);
279 // this is there only becasue the broker may expect a value for x-match
280 if( ! args.containsKey("x-match") )
281 {
282 args.put("x-match", "any");
283 }
284
285 for (AMQShortString rk: destination.getBindingKeys())
286 {
287 _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
288 getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
289 }
290 // We need to sync so that we get notify of an error.
291 getQpidSession().sync();
292 getCurrentException();
293 }
294
295
296 /**
297 * Close this session.
298 *
299 * @param timeout no used / 0_8 specific
300 * @throws AMQException
301 * @throws FailoverException
302 */
303 public void sendClose(long timeout) throws AMQException, FailoverException
304 {
305 if (flushTask != null)
306 {
307 flushTask.cancel();
308 }
309 flushAcknowledgments();
310 getQpidSession().sync();
311 getQpidSession().close();
312 getCurrentException();
313 }
314
315
316 /**
317 * Commit the receipt and the delivery of all messages exchanged by this session resources.
318 */
319 public void sendCommit() throws AMQException, FailoverException
320 {
321 getQpidSession().setAutoSync(true);
322 try
323 {
324 getQpidSession().txCommit();
325 }
326 finally
327 {
328 getQpidSession().setAutoSync(false);
329 }
330 // We need to sync so that we get notify of an error.
331 getCurrentException();
332 }
333
334 /**
335 * Create a queue with a given name.
336 *
337 * @param name The queue name
338 * @param autoDelete If this field is set and the exclusive field is also set,
339 * then the queue is deleted when the connection closes.
340 * @param durable If set when creating a new queue,
341 * the queue will be marked as durable.
342 * @param exclusive Exclusive queues can only be used from one connection at a time.
343 * @param arguments Exclusive queues can only be used from one connection at a time.
344 * @throws AMQException
345 * @throws FailoverException
346 */
347 public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
348 final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException
349 {
350 getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE,
351 autoDelete ? Option.AUTO_DELETE : Option.NONE,
352 exclusive ? Option.EXCLUSIVE : Option.NONE);
353 // We need to sync so that we get notify of an error.
354 getQpidSession().sync();
355 getCurrentException();
356 }
357
358 /**
359 * This method asks the broker to redeliver all unacknowledged messages
360 *
361 * @throws AMQException
362 * @throws FailoverException
363 */
364 public void sendRecover() throws AMQException, FailoverException
365 {
366 // release all unack messages
367 RangeSet ranges = new RangeSet();
368 while (true)
369 {
370 Long tag = _unacknowledgedMessageTags.poll();
371 if (tag == null)
372 {
373 break;
374 }
375 ranges.add((int) (long) tag);
376 }
377 getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
378 // We need to sync so that we get notify of an error.
379 getQpidSession().sync();
380 getCurrentException();
381 }
382
383 public void releaseForRollback()
384 {
385 if (_dispatcher != null)
386 {
387 _dispatcher.rollback();
388 }
389 getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
390 _txRangeSet.clear();
391 _txSize = 0;
392 }
393
394 /**
395 * Release (0_8 notion of Reject) an acquired message
396 *
397 * @param deliveryTag the message ID
398 * @param requeue always true
399 */
400 public void rejectMessage(long deliveryTag, boolean requeue)
401 {
402 // The value of requeue is always true
403 RangeSet ranges = new RangeSet();
404 ranges.add((int) deliveryTag);
405 getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
406 //I don't think we need to sync
407 }
408
409 /**
410 * Create an 0_10 message consumer
411 */
412 public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
413 final int prefetchLow, final boolean noLocal,
414 final boolean exclusive, String messageSelector,
415 final FieldTable ft, final boolean noConsume,
416 final boolean autoClose) throws JMSException
417 {
418
419 final AMQProtocolHandler protocolHandler = getProtocolHandler();
420 return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal,
421 _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh,
422 prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose);
423 }
424
425 /**
426 * Bind a queue with an exchange.
427 */
428
429 public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
430 throws JMSException
431 {
432 return isQueueBound(exchangeName,queueName,routingKey,null);
433 }
434
435 public boolean isQueueBound(final AMQDestination destination) throws JMSException
436 {
437 return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys());
438 }
439
440 public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
441 throws JMSException
442 {
443 String rk = "";
444 boolean res;
445 if (bindingKeys != null && bindingKeys.length>0)
446 {
447 rk = bindingKeys[0].toString();
448 }
449 else if (routingKey != null)
450 {
451 rk = routingKey.toString();
452 }
453
454 ExchangeBoundResult bindingQueryResult =
455 getQpidSession().exchangeBound(exchangeName.toString(),queueName.toString(), rk, null).get();
456
457 if (rk == null)
458 {
459 res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
460 }
461 else
462 {
463 res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
464 .getQueueNotMatched());
465 }
466 return res;
467 }
468
469 /**
470 * This method is invoked when a consumer is creted
471 * Registers the consumer with the broker
472 */
473 public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
474 boolean nowait, String messageSelector, int tag)
475 throws AMQException, FailoverException
476 {
477 boolean preAcquire;
478 try
479 {
480 preAcquire = ( ! consumer.isNoConsume() &&
481 (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) )
482 || !(consumer.getDestination() instanceof AMQQueue);
483 getQpidSession().messageSubscribe
484 (queueName.toString(), String.valueOf(tag),
485 getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
486 preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, null,
487 consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
488 }
489 catch (JMSException e)
490 {
491 throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
492 }
493
494 String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
495
496 if (! prefetch())
497 {
498 getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
499 }
500 else
501 {
502 getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
503 }
504 getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
505 // We need to sync so that we get notify of an error.
506 // only if not immediat prefetch
507 if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
508 {
509 // set the flow
510 getQpidSession().messageFlow(consumerTag,
511 MessageCreditUnit.MESSAGE,
512 getAMQConnection().getMaxPrefetch());
513 }
514 getQpidSession().sync();
515 getCurrentException();
516 }
517
518 /**
519 * Create an 0_10 message producer
520 */
521 public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
522 final boolean immediate, final boolean waitUntilSent,
523 long producerId)
524 {
525 return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this,
526 getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
527
528 }
529
530 /**
531 * creates an exchange if it does not already exist
532 */
533 public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type,
534 final AMQProtocolHandler protocolHandler, final boolean nowait)
535 throws AMQException, FailoverException
536 {
537 getQpidSession().exchangeDeclare(name.toString(),
538 type.toString(),
539 null,
540 null,
541 name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE);
542 // We need to sync so that we get notify of an error.
543 getQpidSession().sync();
544 getCurrentException();
545 }
546
547 /**
548 * Declare a queue with the given queueName
549 */
550 public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
551 throws AMQException, FailoverException
552 {
553 // do nothing this is only used by 0_8
554 }
555
556 /**
557 * Declare a queue with the given queueName
558 */
559 public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
560 final boolean noLocal)
561 throws AMQException, FailoverException
562 {
563 AMQShortString res;
564 if (amqd.getAMQQueueName() == null)
565 {
566 // generate a name for this queue
567 res = new AMQShortString("TempQueue" + UUID.randomUUID());
568 }
569 else
570 {
571 res = amqd.getAMQQueueName();
572 }
573 Map<String,Object> arguments = null;
574 if (noLocal)
575 {
576 arguments = new HashMap<String,Object>();
577 arguments.put("no-local", true);
578 }
579 getQpidSession().queueDeclare(res.toString(), null, arguments,
580 amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
581 amqd.isDurable() ? Option.DURABLE : Option.NONE,
582 !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
583 // passive --> false
584 // We need to sync so that we get notify of an error.
585 getQpidSession().sync();
586 getCurrentException();
587 return res;
588 }
589
590 /**
591 * deletes a queue
592 */
593 public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
594 {
595 getQpidSession().queueDelete(queueName.toString());
596 // ifEmpty --> false
597 // ifUnused --> false
598 // We need to sync so that we get notify of an error.
599 getQpidSession().sync();
600 getCurrentException();
601 }
602
603 /**
604 * Activate/deactivate the message flow for all the consumers of this session.
605 */
606 public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
607 {
608 if (suspend)
609 {
610 for (BasicMessageConsumer consumer : _consumers.values())
611 {
612 getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()));
613 }
614 }
615 else
616 {
617 for (BasicMessageConsumer_0_10 consumer : _consumers.values())
618 {
619 String consumerTag = String.valueOf(consumer.getConsumerTag());
620 //only set if msg list is null
621 try
622 {
623 if (! prefetch())
624 {
625 if (consumer.getMessageListener() != null)
626 {
627 getQpidSession().messageFlow(consumerTag,
628 MessageCreditUnit.MESSAGE, 1);
629 }
630 }
631 else
632 {
633 getQpidSession()
634 .messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
635 getAMQConnection().getMaxPrefetch());
636 }
637 getQpidSession()
638 .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
639 }
640 catch (Exception e)
641 {
642 throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error while trying to get the listener", e);
643 }
644 }
645 }
646 // We need to sync so that we get notify of an error.
647 getQpidSession().sync();
648 getCurrentException();
649 }
650
651
652 public void sendRollback() throws AMQException, FailoverException
653 {
654 getQpidSession().txRollback();
655 // We need to sync so that we get notify of an error.
656 getQpidSession().sync();
657 getCurrentException();
658 }
659
660 //------ Private methods
661 /**
662 * Access to the underlying Qpid Session
663 *
664 * @return The associated Qpid Session.
665 */
666 protected Session getQpidSession()
667 {
668 return _qpidSession;
669 }
670
671
672 /**
673 * Get the latest thrown exception.
674 *
675 * @throws org.apache.qpid.AMQException get the latest thrown error.
676 */
677 public void getCurrentException() throws AMQException
678 {
679 synchronized (_currentExceptionLock)
680 {
681 if (_currentException != null)
682 {
683 SessionException se = _currentException;
684 _currentException = null;
685 ExecutionException ee = se.getException();
686 int code;
687 if (ee == null)
688 {
689 code = 0;
690 }
691 else
692 {
693 code = ee.getErrorCode().getValue();
694 }
695 throw new AMQException
696 (AMQConstant.getConstant(code), se.getMessage(), se);
697 }
698 }
699 }
700
701 public void opened(Session ssn) {}
702
703 public void message(Session ssn, MessageTransfer xfr)
704 {
705 messageReceived(new UnprocessedMessage_0_10(xfr));
706 }
707
708 public void exception(Session ssn, SessionException exc)
709 {
710 synchronized (_currentExceptionLock)
711 {
712 _currentException = exc;
713 }
714 }
715
716 public void closed(Session ssn) {}
717
718 protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
719 final boolean noLocal)
720 throws AMQException
721 {
722 /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
723 return new FailoverNoopSupport<AMQShortString, AMQException>(
724 new FailoverProtectedOperation<AMQShortString, AMQException>()
725 {
726 public AMQShortString execute() throws AMQException, FailoverException
727 {
728 // Generate the queue name if the destination indicates that a client generated name is to be used.
729 if (amqd.isNameRequired())
730 {
731 String binddingKey = "";
732 for(AMQShortString key : amqd.getBindingKeys())
733 {
734 binddingKey = binddingKey + "_" + key.toString();
735 }
736 amqd.setQueueName(new AMQShortString( binddingKey + "@"
737 + amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
738 }
739 return send0_10QueueDeclare(amqd, protocolHandler, noLocal);
740 }
741 }, _connection).execute();
742 }
743
744
745 void start() throws AMQException
746 {
747 super.start();
748 for(BasicMessageConsumer c: _consumers.values())
749 {
750 c.start();
751 }
752 }
753
754
755 void stop() throws AMQException
756 {
757 super.stop();
758 for(BasicMessageConsumer c: _consumers.values())
759 {
760 c.stop();
761 }
762 }
763
764
765
766
767 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
768 {
769
770 checkNotClosed();
771 AMQTopic origTopic=checkValidTopic(topic);
772 AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
773
774 TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name);
775 if (subscriber != null)
776 {
777 if (subscriber.getTopic().equals(topic))
778 {
779 throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
780 + name);
781 }
782 else
783 {
784 unsubscribe(name);
785 }
786 }
787 else
788 {
789 AMQShortString topicName;
790 if (topic instanceof AMQTopic)
791 {
792 topicName=((AMQTopic) topic).getBindingKeys()[0];
793 }
794 else
795 {
796 topicName=new AMQShortString(topic.getTopicName());
797 }
798
799 if (_strictAMQP)
800 {
801 if (_strictAMQPFATAL)
802 {
803 throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
804 }
805 else
806 {
807 _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
808 + "for creation durableSubscriber. Requesting queue deletion regardless.");
809 }
810
811 deleteQueue(dest.getAMQQueueName());
812 }
813 else
814 {
815 // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
816 // says we must trash the subscription.
817 if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
818 && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
819 {
820 deleteQueue(dest.getAMQQueueName());
821 }
822 }
823 }
824
825 subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest));
826
827 _subscriptions.put(name, subscriber);
828 _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
829
830 return subscriber;
831 }
832
833 protected Long requestQueueDepth(AMQDestination amqd)
834 {
835 return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
836 }
837
838
839 /**
840 * Store non committed messages for this session
841 * With 0.10 messages are consumed with window mode, we must send a completion
842 * before the window size is reached so credits don't dry up.
843 * @param id
844 */
845 @Override protected void addDeliveredMessage(long id)
846 {
847 _txRangeSet.add((int) id);
848 _txSize++;
849 // this is a heuristic, we may want to have that configurable
850 if (_connection.getMaxPrefetch() == 1 ||
851 _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)
852 {
853 // send completed so consumer credits don't dry up
854 messageAcknowledge(_txRangeSet, false);
855 }
856 }
857
858 @Override public void commit() throws JMSException
859 {
860 checkTransacted();
861 try
862 {
863 if( _txSize > 0 )
864 {
865 messageAcknowledge(_txRangeSet, true);
866 _txRangeSet.clear();
867 _txSize = 0;
868 }
869 sendCommit();
870 }
871 catch (AMQException e)
872 {
873 throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
874 }
875 catch (FailoverException e)
876 {
877 throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
878 }
879 }
880
881 protected final boolean tagLE(long tag1, long tag2)
882 {
883 return Serial.le((int) tag1, (int) tag2);
884 }
885
886 protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
887 {
888 return Serial.lt((int) currentMark, (int) deliveryTag);
889 }
890
891 public AMQMessageDelegateFactory getMessageDelegateFactory()
892 {
893 return AMQMessageDelegateFactory.FACTORY_0_10;
894 }
895
896 }
|