AMQSession_0_10.java
001 /* Licensed to the Apache Software Foundation (ASF) under one
002  * or more contributor license agreements.  See the NOTICE file
003  * distributed with this work for additional information
004  * regarding copyright ownership.  The ASF licenses this file
005  * to you under the Apache License, Version 2.0 (the
006  * "License"); you may not use this file except in compliance
007  * with the License.  You may obtain a copy of the License at
008  *
009  *   http://www.apache.org/licenses/LICENSE-2.0
010  *
011  * Unless required by applicable law or agreed to in writing,
012  * software distributed under the License is distributed on an
013  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
014  * KIND, either express or implied.  See the License for the
015  * specific language governing permissions and limitations
016  * under the License.
017  */
018 package org.apache.qpid.client;
019 
020 import org.apache.qpid.framing.AMQShortString;
021 import org.apache.qpid.framing.FieldTable;
022 import org.apache.qpid.AMQException;
023 import org.apache.qpid.protocol.AMQConstant;
024 import org.apache.qpid.client.failover.FailoverException;
025 import org.apache.qpid.client.failover.FailoverNoopSupport;
026 import org.apache.qpid.client.failover.FailoverProtectedOperation;
027 import org.apache.qpid.client.protocol.AMQProtocolHandler;
028 import org.apache.qpid.client.message.MessageFactoryRegistry;
029 import org.apache.qpid.client.message.FiledTableSupport;
030 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
031 import org.apache.qpid.client.message.UnprocessedMessage_0_10;
032 import org.apache.qpid.util.Serial;
033 import org.apache.qpid.transport.ExecutionException;
034 import org.apache.qpid.transport.MessageAcceptMode;
035 import org.apache.qpid.transport.MessageAcquireMode;
036 import org.apache.qpid.transport.MessageCreditUnit;
037 import org.apache.qpid.transport.MessageFlowMode;
038 import org.apache.qpid.transport.MessageTransfer;
039 import org.apache.qpid.transport.RangeSet;
040 import org.apache.qpid.transport.Option;
041 import org.apache.qpid.transport.ExchangeBoundResult;
042 import org.apache.qpid.transport.Future;
043 import org.apache.qpid.transport.Range;
044 import org.apache.qpid.transport.Session;
045 import org.apache.qpid.transport.SessionException;
046 import org.apache.qpid.transport.SessionListener;
047 import org.slf4j.Logger;
048 import org.slf4j.LoggerFactory;
049 
050 import static org.apache.qpid.transport.Option.*;
051 
052 import javax.jms.*;
053 import javax.jms.IllegalStateException;
054 
055 import java.util.Date;
056 import java.util.HashMap;
057 import java.util.UUID;
058 import java.util.Map;
059 import java.util.Timer;
060 import java.util.TimerTask;
061 
062 /**
063  * This is a 0.10 Session
064  */
065 public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
066     implements SessionListener
067 {
068 
069     /**
070      * This class logger
071      */
072     private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
073 
074     private static Timer timer = new Timer("ack-flusher"true);
075 
076 
077     /**
078      * The underlying QpidSession
079      */
080     private Session _qpidSession;
081 
082     /**
083      * The latest qpid Exception that has been reaised.
084      */
085     private Object _currentExceptionLock = new Object();
086     private SessionException _currentException;
087 
088     // a ref on the qpid connection
089     protected org.apache.qpid.transport.Connection _qpidConnection;
090 
091     private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay"1000);
092     private TimerTask flushTask = null;
093     private RangeSet unacked = new RangeSet();
094     private int unackedCount = 0;
095 
096     /**
097      * USed to store the range of in tx messages
098      */
099     private RangeSet _txRangeSet = new RangeSet();
100     private int _txSize = 0;
101     //--- constructors
102 
103     /**
104      * Creates a new session on a connection.
105      *
106      @param con                     The connection on which to create the session.
107      @param channelId               The unique identifier for the session.
108      @param transacted              Indicates whether or not the session is transactional.
109      @param acknowledgeMode         The acknoledgement mode for the session.
110      @param messageFactoryRegistry  The message factory factory for the session.
111      @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
112      @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
113      @param qpidConnection          The qpid connection
114      */
115     AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
116                     boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
117                     int defaultPrefetchHighMark, int defaultPrefetchLowMark)
118     {
119 
120         super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
121               defaultPrefetchLowMark);
122         _qpidConnection = qpidConnection;
123         _qpidSession = _qpidConnection.createSession(1);
124         _qpidSession.setSessionListener(this);
125         if (_transacted)
126         {
127             _qpidSession.txSelect();
128         }
129 
130         if (maxAckDelay > 0)
131         {
132             flushTask = new TimerTask()
133             {
134                 public void run()
135                 {
136                     try
137                     {
138                         flushAcknowledgments();
139                     }
140                     catch (Throwable t)
141                     {
142                         _logger.error("error flushing acks", t);
143                     }
144                 }
145             };
146             timer.schedule(flushTask, new Date(), maxAckDelay);
147         }
148     }
149 
150     /**
151      * Creates a new session on a connection with the default 0-10 message factory.
152      *
153      @param con                 The connection on which to create the session.
154      @param channelId           The unique identifier for the session.
155      @param transacted          Indicates whether or not the session is transactional.
156      @param acknowledgeMode     The acknoledgement mode for the session.
157      @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
158      @param defaultPrefetchLow  The number of prefetched messages at which to resume the session.
159      @param qpidConnection      The connection
160      */
161     AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
162                     boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
163     {
164 
165         this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
166              defaultPrefetchHigh, defaultPrefetchLow);
167     }
168 
169     private void addUnacked(int id)
170     {
171         synchronized (unacked)
172         {
173             unacked.add(id);
174             unackedCount++;
175         }
176     }
177 
178     private void clearUnacked()
179     {
180         synchronized (unacked)
181         {
182             unacked.clear();
183             unackedCount = 0;
184         }
185     }
186 
187     //------- overwritten methods of class AMQSession
188 
189     void failoverPrep()
190     {
191         super.failoverPrep();
192         clearUnacked();
193     }
194 
195     /**
196      * Acknowledge one or many messages.
197      *
198      @param deliveryTag The tag of the last message to be acknowledged.
199      @param multiple    <tt>true</tt> to acknowledge all messages up to and including the one specified by the
200      *                    delivery tag, <tt>false</tt> to just acknowledge that message.
201      */
202 
203     public void acknowledgeMessage(long deliveryTag, boolean multiple)
204     {
205         if (_logger.isDebugEnabled())
206         {
207             _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId);
208         }
209         // acknowledge this message
210         if (multiple)
211         {
212             for (Long messageTag : _unacknowledgedMessageTags)
213             {
214                 ifmessageTag <= deliveryTag )
215                 {
216                     addUnacked(messageTag.intValue());
217                     _unacknowledgedMessageTags.remove(messageTag);
218                 }
219             }
220             //empty the list of unack messages
221 
222         }
223         else
224         {
225             addUnacked((intdeliveryTag);
226             _unacknowledgedMessageTags.remove(deliveryTag);
227         }
228 
229         long prefetch = getAMQConnection().getMaxPrefetch();
230 
231         if (unackedCount >= prefetch/|| maxAckDelay <= 0)
232         {
233             flushAcknowledgments();
234         }
235     }
236 
237     void flushAcknowledgments()
238     {
239         synchronized (unacked)
240         {
241             if (unackedCount > 0)
242             {
243                 messageAcknowledge
244                     (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
245                 clearUnacked();
246             }
247         }
248     }
249 
250     void messageAcknowledge(RangeSet ranges, boolean accept)
251     {
252         Session ssn = getQpidSession();
253         for (Range range : ranges)
254         {
255             ssn.processed(range);
256         }
257         ssn.flushProcessed(accept ? BATCH : NONE);
258         if (accept)
259         {
260             ssn.messageAccept(ranges, UNRELIABLE);
261         }
262     }
263 
264     /**
265      * Bind a queue with an exchange.
266      *
267      @param queueName    Specifies the name of the queue to bind. If the queue name is empty,
268      *                     refers to the current
269      *                     queue for the session, which is the last declared queue.
270      @param exchangeName The exchange name.
271      @param routingKey   Specifies the routing key for the binding.
272      @param arguments    0_8 specific
273      */
274     public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
275                               final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination)
276             throws AMQException, FailoverException
277     {
278         Map args = FiledTableSupport.convertToMap(arguments);
279         // this is there only becasue the broker may expect a value for x-match
280         if! args.containsKey("x-match") )
281         {
282             args.put("x-match""any");
283         }
284 
285         for (AMQShortString rk: destination.getBindingKeys())
286         {
287             _logger.debug("Binding queue : " + queueName.toString() " exchange: " + exchangeName.toString() " using binding key " + rk.asString());
288             getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
289         }
290         // We need to sync so that we get notify of an error.
291         getQpidSession().sync();
292         getCurrentException();
293     }
294 
295 
296     /**
297      * Close this session.
298      *
299      @param timeout no used / 0_8 specific
300      @throws AMQException
301      @throws FailoverException
302      */
303     public void sendClose(long timeoutthrows AMQException, FailoverException
304     {
305         if (flushTask != null)
306         {
307             flushTask.cancel();
308         }
309         flushAcknowledgments();
310         getQpidSession().sync();
311         getQpidSession().close();
312         getCurrentException();
313     }
314 
315 
316     /**
317      * Commit the receipt and the delivery of all messages exchanged by this session resources.
318      */
319     public void sendCommit() throws AMQException, FailoverException
320     {
321         getQpidSession().setAutoSync(true);
322         try
323         {
324             getQpidSession().txCommit();
325         }
326         finally
327         {
328             getQpidSession().setAutoSync(false);
329         }
330         // We need to sync so that we get notify of an error.
331         getCurrentException();
332     }
333 
334     /**
335      * Create a queue with a given name.
336      *
337      @param name       The queue name
338      @param autoDelete If this field is set and the exclusive field is also set,
339      *                   then the queue is deleted when the connection closes.
340      @param durable    If set when creating a new queue,
341      *                   the queue will be marked as durable.
342      @param exclusive  Exclusive queues can only be used from one connection at a time.
343      @param arguments  Exclusive queues can only be used from one connection at a time.
344      @throws AMQException
345      @throws FailoverException
346      */
347     public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
348                                 final boolean exclusive, Map<String, Object> argumentsthrows AMQException, FailoverException
349     {
350         getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE,
351                                       autoDelete ? Option.AUTO_DELETE : Option.NONE,
352                                       exclusive ? Option.EXCLUSIVE : Option.NONE);
353         // We need to sync so that we get notify of an error.
354         getQpidSession().sync();
355         getCurrentException();
356     }
357 
358     /**
359      * This method asks the broker to redeliver all unacknowledged messages
360      *
361      @throws AMQException
362      @throws FailoverException
363      */
364     public void sendRecover() throws AMQException, FailoverException
365     {
366         // release all unack messages
367         RangeSet ranges = new RangeSet();
368         while (true)
369         {
370             Long tag = _unacknowledgedMessageTags.poll();
371             if (tag == null)
372             {
373                 break;
374             }
375             ranges.add((int) (longtag);
376         }
377         getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
378         // We need to sync so that we get notify of an error.
379         getQpidSession().sync();
380         getCurrentException();
381     }
382 
383     public void releaseForRollback()
384     {
385         if (_dispatcher != null)
386         {
387             _dispatcher.rollback();
388         }
389         getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
390         _txRangeSet.clear();
391         _txSize = 0;
392     }
393 
394     /**
395      * Release (0_8 notion of Reject) an acquired message
396      *
397      @param deliveryTag the message ID
398      @param requeue     always true
399      */
400     public void rejectMessage(long deliveryTag, boolean requeue)
401     {
402         // The value of requeue is always true
403         RangeSet ranges = new RangeSet();
404         ranges.add((intdeliveryTag);
405         getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
406         //I don't think we need to sync
407     }
408 
409     /**
410      * Create an 0_10 message consumer
411      */
412     public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
413                                                       final int prefetchLow, final boolean noLocal,
414                                                       final boolean exclusive, String messageSelector,
415                                                       final FieldTable ft, final boolean noConsume,
416                                                       final boolean autoClosethrows JMSException
417     {
418 
419         final AMQProtocolHandler protocolHandler = getProtocolHandler();
420         return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal,
421                                              _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh,
422                                              prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose);
423     }
424 
425     /**
426      * Bind a queue with an exchange.
427      */
428 
429     public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
430     throws JMSException
431     {
432         return isQueueBound(exchangeName,queueName,routingKey,null);
433     }
434 
435     public boolean isQueueBound(final AMQDestination destinationthrows JMSException
436     {
437         return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys());
438     }
439 
440     public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
441     throws JMSException
442     {
443         String rk = "";
444         boolean res;
445         if (bindingKeys != null && bindingKeys.length>0)
446         {
447             rk = bindingKeys[0].toString();
448         }
449         else if (routingKey != null)
450         {
451             rk = routingKey.toString();
452         }
453 
454         ExchangeBoundResult bindingQueryResult =
455             getQpidSession().exchangeBound(exchangeName.toString(),queueName.toString(), rk, null).get();
456 
457         if (rk == null)
458         {
459             res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
460         }
461         else
462         {
463             res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
464                     .getQueueNotMatched());
465         }
466         return res;
467     }
468 
469     /**
470      * This method is invoked when a consumer is creted
471      * Registers the consumer with the broker
472      */
473     public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
474                             boolean nowait, String messageSelector, int tag)
475             throws AMQException, FailoverException
476     {
477         boolean preAcquire;
478         try
479         {
480             preAcquire = ! consumer.isNoConsume()  &&
481                     (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) )
482                     || !(consumer.getDestination() instanceof AMQQueue);
483             getQpidSession().messageSubscribe
484                 (queueName.toString(), String.valueOf(tag),
485                  getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
486                  preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, null,
487                  consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
488         }
489         catch (JMSException e)
490         {
491             throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
492         }
493 
494         String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
495 
496         if (! prefetch())
497         {
498             getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
499         }
500         else
501         {
502             getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
503         }
504         getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
505         // We need to sync so that we get notify of an error.
506         // only if not immediat prefetch
507         if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
508         {
509             // set the flow
510             getQpidSession().messageFlow(consumerTag,
511                                          MessageCreditUnit.MESSAGE,
512                                          getAMQConnection().getMaxPrefetch());
513         }
514         getQpidSession().sync();
515         getCurrentException();
516     }
517 
518     /**
519      * Create an 0_10 message producer
520      */
521     public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
522                                                       final boolean immediate, final boolean waitUntilSent,
523                                                       long producerId)
524     {
525         return new BasicMessageProducer_0_10(_connection, (AMQDestinationdestination, _transacted, _channelId, this,
526                                              getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
527 
528     }
529 
530     /**
531      * creates an exchange if it does not already exist
532      */
533     public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type,
534                                     final AMQProtocolHandler protocolHandler, final boolean nowait)
535             throws AMQException, FailoverException
536     {
537         getQpidSession().exchangeDeclare(name.toString(),
538                                         type.toString(),
539                                         null,
540                                         null,
541                                         name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE);
542         // We need to sync so that we get notify of an error.
543         getQpidSession().sync();
544         getCurrentException();
545     }
546 
547     /**
548      * Declare a queue with the given queueName
549      */
550     public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
551             throws AMQException, FailoverException
552     {
553         // do nothing this is only used by 0_8
554     }
555 
556     /**
557      * Declare a queue with the given queueName
558      */
559     public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
560                                                final boolean noLocal)
561             throws AMQException, FailoverException
562     {
563         AMQShortString res;
564         if (amqd.getAMQQueueName() == null)
565         {
566             // generate a name for this queue
567             res = new AMQShortString("TempQueue" + UUID.randomUUID());
568         }
569         else
570         {
571             res = amqd.getAMQQueueName();
572         }
573         Map<String,Object> arguments = null;
574         if (noLocal)
575         {
576             arguments = new HashMap<String,Object>();
577             arguments.put("no-local"true);
578         }
579         getQpidSession().queueDeclare(res.toString(), null, arguments,
580                                       amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
581                                       amqd.isDurable() ? Option.DURABLE : Option.NONE,
582                                       !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
583         // passive --> false
584         // We need to sync so that we get notify of an error.
585         getQpidSession().sync();
586         getCurrentException();
587         return res;
588     }
589 
590     /**
591      * deletes a queue
592      */
593     public void sendQueueDelete(final AMQShortString queueNamethrows AMQException, FailoverException
594     {
595         getQpidSession().queueDelete(queueName.toString());
596         // ifEmpty --> false
597         // ifUnused --> false
598         // We need to sync so that we get notify of an error.
599         getQpidSession().sync();
600         getCurrentException();
601     }
602 
603     /**
604      * Activate/deactivate the message flow for all the consumers of this session.
605      */
606     public void sendSuspendChannel(boolean suspendthrows AMQException, FailoverException
607     {
608         if (suspend)
609         {
610             for (BasicMessageConsumer consumer : _consumers.values())
611             {
612                 getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()));
613             }
614         }
615         else
616         {
617             for (BasicMessageConsumer_0_10 consumer : _consumers.values())
618             {
619                 String consumerTag = String.valueOf(consumer.getConsumerTag());
620                 //only set if msg list is null
621                 try
622                 {
623                     if (! prefetch())
624                     {
625                         if (consumer.getMessageListener() != null)
626                         {
627                             getQpidSession().messageFlow(consumerTag,
628                                                          MessageCreditUnit.MESSAGE, 1);
629                         }
630                     }
631                     else
632                     {
633                         getQpidSession()
634                             .messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
635                                          getAMQConnection().getMaxPrefetch());
636                     }
637                     getQpidSession()
638                         .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
639                 }
640                 catch (Exception e)
641                 {
642                     throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error while trying to get the listener", e);
643                 }
644             }
645         }
646         // We need to sync so that we get notify of an error.
647         getQpidSession().sync();
648         getCurrentException();
649     }
650 
651 
652     public void sendRollback() throws AMQException, FailoverException
653     {
654         getQpidSession().txRollback();
655         // We need to sync so that we get notify of an error.
656         getQpidSession().sync();
657         getCurrentException();
658     }
659 
660     //------ Private methods
661     /**
662      * Access to the underlying Qpid Session
663      *
664      @return The associated Qpid Session.
665      */
666     protected Session getQpidSession()
667     {
668         return _qpidSession;
669     }
670 
671 
672     /**
673      * Get the latest thrown exception.
674      *
675      @throws org.apache.qpid.AMQException get the latest thrown error.
676      */
677     public void getCurrentException() throws AMQException
678     {
679         synchronized (_currentExceptionLock)
680         {
681             if (_currentException != null)
682             {
683                 SessionException se = _currentException;
684                 _currentException = null;
685                 ExecutionException ee = se.getException();
686                 int code;
687                 if (ee == null)
688                 {
689                     code = 0;
690                 }
691                 else
692                 {
693                     code = ee.getErrorCode().getValue();
694                 }
695                 throw new AMQException
696                     (AMQConstant.getConstant(code), se.getMessage(), se);
697             }
698         }
699     }
700 
701     public void opened(Session ssn) {}
702 
703     public void message(Session ssn, MessageTransfer xfr)
704     {
705         messageReceived(new UnprocessedMessage_0_10(xfr));
706     }
707 
708     public void exception(Session ssn, SessionException exc)
709     {
710         synchronized (_currentExceptionLock)
711         {
712             _currentException = exc;
713         }
714     }
715 
716     public void closed(Session ssn) {}
717 
718     protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
719                                           final boolean noLocal)
720             throws AMQException
721     {
722         /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
723         return new FailoverNoopSupport<AMQShortString, AMQException>(
724                 new FailoverProtectedOperation<AMQShortString, AMQException>()
725                 {
726                     public AMQShortString execute() throws AMQException, FailoverException
727                     {
728                         // Generate the queue name if the destination indicates that a client generated name is to be used.
729                         if (amqd.isNameRequired())
730                         {
731                             String binddingKey = "";
732                             for(AMQShortString key : amqd.getBindingKeys())
733                             {
734                                binddingKey = binddingKey + "_" + key.toString();
735                             }
736                             amqd.setQueueName(new AMQShortStringbinddingKey + "@"
737                                     + amqd.getExchangeName().toString() "_" + UUID.randomUUID()));
738                         }
739                         return send0_10QueueDeclare(amqd, protocolHandler, noLocal);
740                     }
741                 }, _connection).execute();
742     }
743 
744 
745     void start() throws AMQException
746     {
747         super.start();
748         for(BasicMessageConsumer  c:  _consumers.values())
749         {
750               c.start();
751         }
752     }
753 
754 
755     void stop() throws AMQException
756     {
757         super.stop();
758         for(BasicMessageConsumer  c:  _consumers.values())
759         {
760               c.stop();
761         }
762     }
763 
764 
765 
766 
767     public TopicSubscriber createDurableSubscriber(Topic topic, String namethrows JMSException
768     {
769 
770         checkNotClosed();
771         AMQTopic origTopic=checkValidTopic(topic);
772         AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
773 
774         TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name);
775         if (subscriber != null)
776         {
777             if (subscriber.getTopic().equals(topic))
778             {
779                 throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
780                         + name);
781             }
782             else
783             {
784                 unsubscribe(name);
785             }
786         }
787         else
788         {
789             AMQShortString topicName;
790             if (topic instanceof AMQTopic)
791             {
792                 topicName=((AMQTopictopic).getBindingKeys()[0];
793             }
794             else
795             {
796                 topicName=new AMQShortString(topic.getTopicName());
797             }
798 
799             if (_strictAMQP)
800             {
801                 if (_strictAMQPFATAL)
802                 {
803                     throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
804                 }
805                 else
806                 {
807                     _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
808                             "for creation durableSubscriber. Requesting queue deletion regardless.");
809                 }
810 
811                 deleteQueue(dest.getAMQQueueName());
812             }
813             else
814             {
815                 // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
816                 // says we must trash the subscription.
817                 if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
818                         && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
819                 {
820                     deleteQueue(dest.getAMQQueueName());
821                 }
822             }
823         }
824 
825         subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest));
826 
827         _subscriptions.put(name, subscriber);
828         _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
829 
830         return subscriber;
831     }
832 
833     protected Long requestQueueDepth(AMQDestination amqd)
834     {
835         return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
836     }
837 
838 
839     /**
840      * Store non committed messages for this session
841      * With 0.10 messages are consumed with window mode, we must send a completion
842      * before the window size is reached so credits don't dry up.
843      @param id
844      */
845     @Override protected void addDeliveredMessage(long id)
846     {
847         _txRangeSet.add((intid);
848         _txSize++;
849         // this is a heuristic, we may want to have that configurable
850         if (_connection.getMaxPrefetch() == ||
851                 _connection.getMaxPrefetch() != && _txSize % (_connection.getMaxPrefetch() 2== 0)
852         {
853             // send completed so consumer credits don't dry up
854             messageAcknowledge(_txRangeSet, false);
855         }
856     }
857 
858     @Override public void commit() throws JMSException
859     {
860         checkTransacted();
861         try
862         {
863             if_txSize > )
864             {
865                 messageAcknowledge(_txRangeSet, true);
866                 _txRangeSet.clear();
867                 _txSize = 0;
868             }
869             sendCommit();
870         }
871         catch (AMQException e)
872         {
873             throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
874         }
875         catch (FailoverException e)
876         {
877             throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
878         }
879     }
880 
881     protected final boolean tagLE(long tag1, long tag2)
882     {
883         return Serial.le((inttag1, (inttag2);
884     }
885 
886     protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
887     {
888         return Serial.lt((intcurrentMark, (intdeliveryTag);
889     }
890 
891     public AMQMessageDelegateFactory getMessageDelegateFactory()
892     {
893         return AMQMessageDelegateFactory.FACTORY_0_10;
894     }
895 
896 }