SimpleAMQQueue.java
0001 package org.apache.qpid.server.queue;
0002 
0003 import java.util.ArrayList;
0004 import java.util.EnumSet;
0005 import java.util.List;
0006 import java.util.Set;
0007 import java.util.concurrent.CopyOnWriteArrayList;
0008 import java.util.concurrent.Executor;
0009 import java.util.concurrent.atomic.AtomicBoolean;
0010 import java.util.concurrent.atomic.AtomicInteger;
0011 import java.util.concurrent.atomic.AtomicLong;
0012 import java.util.concurrent.atomic.AtomicReference;
0013 
0014 import javax.management.JMException;
0015 
0016 import org.apache.log4j.Logger;
0017 import org.apache.qpid.AMQException;
0018 import org.apache.qpid.framing.AMQShortString;
0019 import org.apache.qpid.framing.FieldTable;
0020 import org.apache.qpid.pool.ReadWriteRunnable;
0021 import org.apache.qpid.pool.ReferenceCountingExecutorService;
0022 import org.apache.qpid.server.configuration.QueueConfiguration;
0023 import org.apache.qpid.server.exchange.Exchange;
0024 import org.apache.qpid.server.management.ManagedObject;
0025 import org.apache.qpid.server.output.ProtocolOutputConverter;
0026 import org.apache.qpid.server.registry.ApplicationRegistry;
0027 import org.apache.qpid.server.store.StoreContext;
0028 import org.apache.qpid.server.subscription.Subscription;
0029 import org.apache.qpid.server.subscription.SubscriptionList;
0030 import org.apache.qpid.server.transactionlog.TransactionLog;
0031 import org.apache.qpid.server.virtualhost.VirtualHost;
0032 
0033 /*
0034 *
0035 * Licensed to the Apache Software Foundation (ASF) under one
0036 * or more contributor license agreements.  See the NOTICE file
0037 * distributed with this work for additional information
0038 * regarding copyright ownership.  The ASF licenses this file
0039 * to you under the Apache License, Version 2.0 (the
0040 * "License"); you may not use this file except in compliance
0041 * with the License.  You may obtain a copy of the License at
0042 *
0043 *   http://www.apache.org/licenses/LICENSE-2.0
0044 *
0045 * Unless required by applicable law or agreed to in writing,
0046 * software distributed under the License is distributed on an
0047 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0048 * KIND, either express or implied.  See the License for the
0049 * specific language governing permissions and limitations
0050 * under the License.
0051 *
0052 */
0053 public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
0054 {
0055     private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
0056 
0057     private final AMQShortString _name;
0058 
0059     /** null means shared */
0060     private final AMQShortString _owner;
0061 
0062     private final boolean _durable;
0063 
0064     /** If true, this queue is deleted when the last subscriber is removed */
0065     private final boolean _autoDelete;
0066 
0067     private final VirtualHost _virtualHost;
0068 
0069     /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
0070     private final ExchangeBindings _bindings = new ExchangeBindings(this);
0071 
0072     private final AtomicBoolean _deleted = new AtomicBoolean(false);
0073 
0074     private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
0075 
0076     private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
0077 
0078     private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
0079 
0080     private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
0081 
0082     protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
0083     private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
0084 
0085     private volatile Subscription _exclusiveSubscriber;
0086 
0087     protected final QueueEntryList _entries;
0088 
0089     private final AMQQueueMBean _managedObject;
0090     private final Executor _asyncDelivery;
0091     private final AtomicLong _totalMessagesReceived = new AtomicLong();
0092 
0093     /** max allowed size(KB) of a single message */
0094     public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
0095 
0096     /** max allowed number of messages on a queue. */
0097     public long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount();
0098 
0099     /** max queue depth for the queue */
0100     public long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth();
0101 
0102     /** maximum message age before alerts occur */
0103     public long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge();
0104 
0105     /** the minimum interval between sending out consecutive alerts of the same type */
0106     public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
0107 
0108     private static final int MAX_ASYNC_DELIVERIES = 10;
0109 
0110     private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
0111 
0112     private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
0113     private AtomicReference _asynchronousRunner = new AtomicReference(null);
0114     private AtomicInteger _deliveredMessages = new AtomicInteger();
0115     private AtomicBoolean _stopped = new AtomicBoolean(false);
0116 
0117     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
0118             throws AMQException
0119     {
0120         this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory());
0121     }
0122 
0123     protected SimpleAMQQueue(AMQShortString name,
0124                              boolean durable,
0125                              AMQShortString owner,
0126                              boolean autoDelete,
0127                              VirtualHost virtualHost,
0128                              QueueEntryListFactory entryListFactory)
0129             throws AMQException
0130     {
0131 
0132         if (name == null)
0133         {
0134             throw new IllegalArgumentException("Queue name must not be null");
0135         }
0136 
0137         if (virtualHost == null)
0138         {
0139             throw new IllegalArgumentException("Virtual Host must not be null");
0140         }
0141 
0142         _name = name;
0143         _durable = durable;
0144         _owner = owner;
0145         _autoDelete = autoDelete;
0146         _virtualHost = virtualHost;
0147         _entries = entryListFactory.createQueueEntryList(this);
0148 
0149         _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
0150 
0151         try
0152         {
0153             _managedObject = new AMQQueueMBean(this);
0154             _managedObject.register();
0155         }
0156         catch (JMException e)
0157         {
0158             throw new AMQException("AMQQueue MBean creation has failed ", e);
0159         }
0160 
0161         resetNotifications();
0162 
0163     }
0164 
0165     public void resetNotifications()
0166     {
0167         // This ensure that the notification checks for the configured alerts are created.
0168         setMaximumMessageAge(_maximumMessageAge);
0169         setMaximumMessageCount(_maximumMessageCount);
0170         setMaximumMessageSize(_maximumMessageSize);
0171         setMaximumQueueDepth(_maximumQueueDepth);
0172     }
0173 
0174     // ------ Getters and Setters
0175 
0176     public AMQShortString getName()
0177     {
0178         return _name;
0179     }
0180 
0181     public boolean isDurable()
0182     {
0183         return _durable;
0184     }
0185 
0186     public boolean isAutoDelete()
0187     {
0188         return _autoDelete;
0189     }
0190 
0191     public AMQShortString getOwner()
0192     {
0193         return _owner;
0194     }
0195 
0196     public VirtualHost getVirtualHost()
0197     {
0198         return _virtualHost;
0199     }
0200 
0201     // ------ bind and unbind
0202 
0203     public void bind(Exchange exchange, AMQShortString routingKey, FieldTable argumentsthrows AMQException
0204     {
0205         exchange.registerQueue(routingKey, this, arguments);
0206         if (isDurable() && exchange.isDurable())
0207         {
0208             _virtualHost.getRoutingTable().bindQueue(exchange, routingKey, this, arguments);
0209         }
0210 
0211         _bindings.addBinding(routingKey, arguments, exchange);
0212     }
0213 
0214     public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable argumentsthrows AMQException
0215     {
0216         exchange.deregisterQueue(routingKey, this, arguments);
0217         if (isDurable() && exchange.isDurable())
0218         {
0219             _virtualHost.getRoutingTable().unbindQueue(exchange, routingKey, this, arguments);
0220         }
0221 
0222         boolean removed = _bindings.remove(routingKey, arguments, exchange);
0223         if (!removed)
0224         {
0225             _logger.error("Mismatch between queue bindings and exchange record of bindings");
0226         }
0227     }
0228 
0229     public List<ExchangeBinding> getExchangeBindings()
0230     {
0231         return new ArrayList<ExchangeBinding>(_bindings.getExchangeBindings());
0232     }
0233 
0234     // ------ Manage Subscriptions
0235 
0236     public synchronized void registerSubscription(final Subscription subscription, final boolean exclusivethrows AMQException
0237     {
0238 
0239         if (isExclusiveSubscriber())
0240         {
0241             throw new ExistingExclusiveSubscription();
0242         }
0243 
0244         if (exclusive)
0245         {
0246             if (getConsumerCount() != 0)
0247             {
0248                 throw new ExistingSubscriptionPreventsExclusive();
0249             }
0250             else
0251             {
0252                 _exclusiveSubscriber = subscription;
0253 
0254             }
0255         }
0256 
0257         _activeSubscriberCount.incrementAndGet();
0258         subscription.setStateListener(this);
0259         subscription.setLastSeenEntry(null, _entries.getHead());
0260 
0261         if (!isDeleted())
0262         {
0263             subscription.setQueue(this);
0264             _subscriptionList.add(subscription);
0265             if (isDeleted())
0266             {
0267                 subscription.queueDeleted(this);
0268             }
0269         }
0270         else
0271         {
0272             // TODO
0273         }
0274 
0275         deliverAsync(subscription);
0276 
0277     }
0278 
0279     public synchronized void unregisterSubscription(final Subscription subscriptionthrows AMQException
0280     {
0281         if (subscription == null)
0282         {
0283             throw new NullPointerException("subscription argument is null");
0284         }
0285 
0286         boolean removed = _subscriptionList.remove(subscription);
0287 
0288         if (removed)
0289         {
0290             subscription.close();
0291             // No longer can the queue have an exclusive consumer
0292             setExclusiveSubscriber(null);
0293 
0294             QueueEntry lastSeen;
0295 
0296             while ((lastSeen = subscription.getLastSeenEntry()) != null)
0297             {
0298                 subscription.setLastSeenEntry(lastSeen, null);
0299             }
0300 
0301             // auto-delete queues must be deleted if there are no remaining subscribers
0302 
0303             if (_autoDelete && getConsumerCount() == 0)
0304             {
0305                 if (_logger.isInfoEnabled())
0306                 {
0307                     _logger.info("Auto-deleteing queue:" this);
0308                 }
0309 
0310                 delete();
0311 
0312                 // we need to manually fire the event to the removed subscription (which was the last one left for this
0313                 // queue. This is because the delete method uses the subscription set which has just been cleared
0314                 subscription.queueDeleted(this);
0315             }
0316         }
0317 
0318     }
0319 
0320     // ------ Enqueue / Dequeue
0321 
0322     public QueueEntry enqueue(StoreContext storeContext, AMQMessage messagethrows AMQException
0323     {
0324 
0325         incrementQueueCount();
0326         incrementQueueSize(message);
0327 
0328         _totalMessagesReceived.incrementAndGet();
0329 
0330         QueueEntry entry;
0331         Subscription exclusiveSub = _exclusiveSubscriber;
0332 
0333         if (exclusiveSub != null)
0334         {
0335             exclusiveSub.getSendLock();
0336 
0337             try
0338             {
0339                 entry = _entries.add(message);
0340 
0341                 deliverToSubscription(exclusiveSub, entry);
0342 
0343                 // where there is more than one producer there's a reasonable chance that even though there is
0344                 // no "queueing" we do not deliver because we get an interleving of _entries.add and
0345                 // deliverToSubscription between threads.  Therefore have one more try. 
0346                 if (!(entry.isAcquired() || entry.isDeleted()))
0347                 {
0348                     deliverToSubscription(exclusiveSub, entry);
0349                 }
0350             }
0351             finally
0352             {
0353                 exclusiveSub.releaseSendLock();
0354             }
0355         }
0356         else
0357         {
0358             entry = _entries.add(message);
0359             /*
0360 
0361             iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
0362 
0363              */
0364             SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
0365             SubscriptionList.SubscriptionNode nextNode = node.getNext();
0366             if (nextNode == null)
0367             {
0368                 nextNode = _subscriptionList.getHead().getNext();
0369             }
0370             while (nextNode != null)
0371             {
0372                 if (_lastSubscriptionNode.compareAndSet(node, nextNode))
0373                 {
0374                     break;
0375                 }
0376                 else
0377                 {
0378                     node = _lastSubscriptionNode.get();
0379                     nextNode = node.getNext();
0380                     if (nextNode == null)
0381                     {
0382                         nextNode = _subscriptionList.getHead().getNext();
0383                     }
0384                 }
0385             }
0386 
0387             // always do one extra loop after we believe we've finished
0388             // this catches the case where we *just* miss an update
0389             int loops = 2;
0390 
0391             while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
0392             {
0393                 if (nextNode == null)
0394                 {
0395                     loops--;
0396                     nextNode = _subscriptionList.getHead();
0397                 }
0398                 else
0399                 {
0400                     // if subscription at end, and active, offer
0401                     Subscription sub = nextNode.getSubscription();
0402                     deliverToSubscription(sub, entry);
0403                 }
0404                 nextNode = nextNode.getNext();
0405 
0406             }
0407         }
0408 
0409         if (entry.immediateAndNotDelivered())
0410         {
0411             //We acquire the message here to ensure that the dequeueAndDelete will correctly remove the content
0412             // from the transactionLog. This saves us from having to have a custom dequeueAndDelete that checks
0413             // for the AVAILABLE state of an entry rather than the ACQUIRED state that it currently uses. 
0414             entry.acquire();
0415             entry.dequeueAndDelete(storeContext);
0416         }
0417         else if (!(entry.isAcquired() || entry.isDeleted()))
0418         {
0419             checkSubscriptionsNotAheadOfDelivery(entry);
0420 
0421             deliverAsync();
0422         }
0423 
0424         _managedObject.checkForNotification(entry.getMessage());
0425         
0426         return entry;
0427     }
0428 
0429     private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
0430             throws AMQException
0431     {
0432 
0433         sub.getSendLock();
0434         try
0435         {
0436             if (subscriptionReadyAndHasInterest(sub, entry)
0437                 && !sub.isSuspended())
0438             {
0439                 if (!sub.wouldSuspend(entry))
0440                 {
0441                     if (!sub.isBrowser() && !entry.acquire(sub))
0442                     {
0443                         // restore credit here that would have been taken away by wouldSuspend since we didn't manage
0444                         // to acquire the entry for this subscription
0445                         sub.restoreCredit(entry);
0446                     }
0447                     else
0448                     {
0449 
0450                         deliverMessage(sub, entry);
0451 
0452                     }
0453                 }
0454             }
0455         }
0456         finally
0457         {
0458             sub.releaseSendLock();
0459         }
0460     }
0461 
0462     protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
0463     {
0464         // This method is only required for queues which mess with ordering
0465         // Simple Queues don't :-)
0466     }
0467 
0468     private void incrementQueueSize(final AMQMessage message)
0469     {
0470         getAtomicQueueSize().addAndGet(message.getSize());
0471     }
0472 
0473     private void incrementQueueCount()
0474     {
0475         getAtomicQueueCount().incrementAndGet();
0476     }
0477 
0478     private void deliverMessage(final Subscription sub, final QueueEntry entry)
0479             throws AMQException
0480     {
0481         _deliveredMessages.incrementAndGet();
0482         sub.send(entry);
0483 
0484     }
0485 
0486     private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
0487     {
0488 
0489         // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no
0490         // interest in.
0491         QueueEntry node = sub.getLastSeenEntry();
0492         while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)))
0493         {
0494 
0495             QueueEntry newNode = _entries.next(node);
0496             if (newNode != null)
0497             {
0498                 sub.setLastSeenEntry(node, newNode);
0499                 node = sub.getLastSeenEntry();
0500             }
0501             else
0502             {
0503                 node = null;
0504                 break;
0505             }
0506 
0507         }
0508 
0509         if (node == entry)
0510         {
0511             // If the first entry that subscription can process is the one we are trying to deliver to it, then we are
0512             // good
0513             return true;
0514         }
0515         else
0516         {
0517             // Otherwise we should try to update the subscription's last seen entry to the entry we got to, providing
0518             // no-one else has updated it to something furhter on in the list
0519             //TODO - check
0520             //updateLastSeenEntry(sub, entry);
0521             return false;
0522         }
0523 
0524     }
0525 
0526     private void updateLastSeenEntry(final Subscription sub, final QueueEntry entry)
0527     {
0528         QueueEntry node = sub.getLastSeenEntry();
0529 
0530         if (node != null && entry.compareTo(node&& sub.hasInterest(entry))
0531         {
0532             do
0533             {
0534                 if (sub.setLastSeenEntry(node, entry))
0535                 {
0536                     return;
0537                 }
0538                 else
0539                 {
0540                     node = sub.getLastSeenEntry();
0541                 }
0542             }
0543             while (node != null && entry.compareTo(node0);
0544         }
0545 
0546     }
0547 
0548     public void requeue(StoreContext storeContext, QueueEntry entrythrows AMQException
0549     {
0550 
0551         SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
0552         // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
0553         while (subscriberIter.advance())
0554         {
0555             Subscription sub = subscriberIter.getNode().getSubscription();
0556 
0557             // we don't make browsers send the same stuff twice
0558             if (!sub.isBrowser())
0559             {
0560                 updateLastSeenEntry(sub, entry);
0561             }
0562         }
0563 
0564         deliverAsync();
0565 
0566     }
0567 
0568     /**
0569      * Only call from queue Entry
0570      @param storeContext
0571      @param entry
0572      @throws FailedDequeueException
0573      */
0574     public void dequeue(StoreContext storeContext, QueueEntry entrythrows FailedDequeueException
0575     {
0576         decrementQueueCount();
0577         decrementQueueSize(entry);
0578         if (entry.acquiredBySubscription())
0579         {
0580             _deliveredMessages.decrementAndGet();
0581         }
0582 
0583         try
0584         {
0585             AMQMessage msg = entry.getMessage();
0586             if (msg.isPersistent())
0587             {
0588                 _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId());
0589             }
0590 
0591         }
0592         catch (MessageCleanupException e)
0593         {
0594             // Message was dequeued, but could not then be deleted
0595             // though it is no longer referenced. This should be very
0596             // rare and can be detected and cleaned up on recovery or
0597             // done through some form of manual intervention.
0598             _logger.error(e, e);
0599         }
0600         catch (AMQException e)
0601         {
0602             throw new FailedDequeueException(_name.toString(), e);
0603         }
0604 
0605     }
0606 
0607     private void decrementQueueSize(final QueueEntry entry)
0608     {
0609         getAtomicQueueSize().addAndGet(-entry.getMessage().getSize());
0610     }
0611 
0612     void decrementQueueCount()
0613     {
0614         getAtomicQueueCount().decrementAndGet();
0615     }
0616 
0617     public boolean resend(final QueueEntry entry, final Subscription subscriptionthrows AMQException
0618     {
0619         /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
0620                   entry to resend and move back the subscription pointer. */
0621 
0622         subscription.getSendLock();
0623         try
0624         {
0625             if (!subscription.isClosed())
0626             {
0627                 deliverMessage(subscription, entry);
0628                 return true;
0629             }
0630             else
0631             {
0632                 return false;
0633             }
0634         }
0635         finally
0636         {
0637             subscription.releaseSendLock();
0638         }
0639     }
0640 
0641     public int getConsumerCount()
0642     {
0643         return _subscriptionList.size();
0644     }
0645 
0646     public int getActiveConsumerCount()
0647     {
0648         return _activeSubscriberCount.get();
0649     }
0650 
0651     public boolean isUnused()
0652     {
0653         return getConsumerCount() == 0;
0654     }
0655 
0656     public boolean isEmpty()
0657     {
0658         return getMessageCount() == 0;
0659     }
0660 
0661     public int getMessageCount()
0662     {
0663         return getAtomicQueueCount().get();
0664     }
0665 
0666     public long getQueueDepth()
0667     {
0668         return getAtomicQueueSize().get();
0669     }
0670 
0671     public int getUndeliveredMessageCount()
0672     {
0673         int count = getMessageCount() - _deliveredMessages.get();
0674         if (count < 0)
0675         {
0676             return 0;
0677         }
0678         else
0679         {
0680             return count;
0681         }
0682     }
0683 
0684     public long getReceivedMessageCount()
0685     {
0686         return _totalMessagesReceived.get();
0687     }
0688 
0689     public long getOldestMessageArrivalTime()
0690     {
0691         QueueEntry entry = getOldestQueueEntry();
0692         return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
0693     }
0694 
0695     protected QueueEntry getOldestQueueEntry()
0696     {
0697         return _entries.next(_entries.getHead());
0698     }
0699 
0700     public boolean isDeleted()
0701     {
0702         return _deleted.get();
0703     }
0704 
0705     public List<QueueEntry> getMessagesOnTheQueue()
0706     {
0707         ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
0708         QueueEntryIterator queueListIterator = _entries.iterator();
0709         while (queueListIterator.advance())
0710         {
0711             QueueEntry node = queueListIterator.getNode();
0712             if (node != null && !node.isDeleted())
0713             {
0714                 entryList.add(node);
0715             }
0716         }
0717         return entryList;
0718 
0719     }
0720 
0721     public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
0722     {
0723         if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
0724         {
0725             _activeSubscriberCount.decrementAndGet();
0726 
0727         }
0728         else if (newState == Subscription.State.ACTIVE)
0729         {
0730             if (oldState != Subscription.State.ACTIVE)
0731             {
0732                 _activeSubscriberCount.incrementAndGet();
0733 
0734             }
0735             deliverAsync(sub);
0736         }
0737     }
0738 
0739     public int compareTo(final AMQQueue o)
0740     {
0741         return _name.compareTo(o.getName());
0742     }
0743 
0744     public AtomicInteger getAtomicQueueCount()
0745     {
0746         return _atomicQueueCount;
0747     }
0748 
0749     public AtomicLong getAtomicQueueSize()
0750     {
0751         return _atomicQueueSize;
0752     }
0753 
0754     private boolean isExclusiveSubscriber()
0755     {
0756         return _exclusiveSubscriber != null;
0757     }
0758 
0759     private void setExclusiveSubscriber(Subscription exclusiveSubscriber)
0760     {
0761         _exclusiveSubscriber = exclusiveSubscriber;
0762     }
0763 
0764     public static interface QueueEntryFilter
0765     {
0766         public boolean accept(QueueEntry entry);
0767 
0768         public boolean filterComplete();
0769     }
0770 
0771     public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
0772     {
0773         return getMessagesOnTheQueue(new QueueEntryFilter()
0774         {
0775 
0776             public boolean accept(QueueEntry entry)
0777             {
0778                 final long messageId = entry.getMessage().getMessageId();
0779                 return messageId >= fromMessageId && messageId <= toMessageId;
0780             }
0781 
0782             public boolean filterComplete()
0783             {
0784                 return false;
0785             }
0786         });
0787     }
0788 
0789     public QueueEntry getMessageOnTheQueue(final long messageId)
0790     {
0791         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
0792         {
0793             private boolean _complete;
0794 
0795             public boolean accept(QueueEntry entry)
0796             {
0797                 _complete = entry.getMessage().getMessageId() == messageId;
0798                 return _complete;
0799             }
0800 
0801             public boolean filterComplete()
0802             {
0803                 return _complete;
0804             }
0805         });
0806         return entries.isEmpty() null : entries.get(0);
0807     }
0808 
0809     public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
0810     {
0811         ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
0812         QueueEntryIterator queueListIterator = _entries.iterator();
0813         while (queueListIterator.advance() && !filter.filterComplete())
0814         {
0815             QueueEntry node = queueListIterator.getNode();
0816             if (!node.isDeleted() && filter.accept(node))
0817             {
0818                 entryList.add(node);
0819             }
0820         }
0821         return entryList;
0822 
0823     }
0824 
0825 
0826     public void moveMessagesToAnotherQueue(final long fromMessageId,
0827                                            final long toMessageId,
0828                                            String queueName,
0829                                            StoreContext storeContext)
0830     {
0831         // The move is a two step process. First the messages are moved in the _transactionLog.
0832         // That is persistent messages are moved queues on disk for recovery and the QueueEntries removed from the
0833         // existing queue.
0834         // This is done as Queue.enqueue() does not write the data to the transactionLog. In normal message delivery
0835         // this is done as the message is recieved.
0836         // So The final step is to enqueue the messages on the new queue.
0837 
0838         AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
0839         TransactionLog transactionLog = getVirtualHost().getTransactionLog();
0840 
0841         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
0842         {
0843 
0844             public boolean accept(QueueEntry entry)
0845             {
0846                 final long messageId = entry.getMessage().getMessageId();
0847                 return (messageId >= fromMessageId)
0848                        && (messageId <= toMessageId)
0849                        && entry.acquire();
0850             }
0851 
0852             public boolean filterComplete()
0853             {
0854                 return false;
0855             }
0856         });
0857 
0858         try
0859         {
0860             transactionLog.beginTran(storeContext);
0861 
0862             // Move the messages in the transaction log.
0863             for (QueueEntry entry : entries)
0864             {
0865                 AMQMessage message = entry.getMessage();
0866 
0867                 if (message.isPersistent() && toQueue.isDurable())
0868                 {
0869                     transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
0870                 }
0871                 // dequeue will remove the messages from the queue
0872                 entry.dequeue(storeContext);
0873             }
0874 
0875             // Commit and flush the move transcations.
0876             try
0877             {
0878                 transactionLog.commitTran(storeContext);
0879             }
0880             catch (AMQException e)
0881             {
0882                 throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e);
0883             }
0884         }
0885         catch (AMQException e)
0886         {
0887             try
0888             {
0889                 transactionLog.abortTran(storeContext);
0890             }
0891             catch (AMQException rollbackEx)
0892             {
0893                 _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
0894             }
0895             throw new RuntimeException(e);
0896         }
0897 
0898         try
0899         {
0900             // Add messages to new queue
0901             for (QueueEntry entry : entries)
0902             {
0903                 toQueue.enqueue(storeContext, entry.getMessage());
0904             }
0905         }
0906         catch (MessageCleanupException e)
0907         {
0908             throw new RuntimeException(e);
0909         }
0910         catch (AMQException e)
0911         {
0912             throw new RuntimeException(e);
0913         }
0914 
0915     }
0916 
0917     public void copyMessagesToAnotherQueue(final long fromMessageId,
0918                                            final long toMessageId,
0919                                            String queueName,
0920                                            final StoreContext storeContext)
0921     {
0922         AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
0923         TransactionLog transactionLog = getVirtualHost().getTransactionLog();
0924 
0925         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
0926         {
0927 
0928             public boolean accept(QueueEntry entry)
0929             {
0930                 final long messageId = entry.getMessage().getMessageId();
0931                 if ((messageId >= fromMessageId)
0932                     && (messageId <= toMessageId))
0933                 {
0934                     if (!entry.isDeleted())
0935                     {
0936                         return true;
0937                     }
0938                 }
0939 
0940                 return false;
0941             }
0942 
0943             public boolean filterComplete()
0944             {
0945                 return false;
0946             }
0947         });
0948 
0949         try
0950         {
0951             transactionLog.beginTran(storeContext);
0952 
0953             // Move the messages in on the transaction log.
0954             for (QueueEntry entry : entries)
0955             {
0956                 AMQMessage message = entry.getMessage();
0957 
0958                 if (!entry.isDeleted() && message.isPersistent() && toQueue.isDurable())
0959                 {
0960                     transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
0961                 }
0962             }
0963 
0964             // Commit and flush the move transcations.
0965             try
0966             {
0967                 transactionLog.commitTran(storeContext);
0968             }
0969             catch (AMQException e)
0970             {
0971                 throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e);
0972             }
0973         }
0974         catch (AMQException e)
0975         {
0976             try
0977             {
0978                 transactionLog.abortTran(storeContext);
0979             }
0980             catch (AMQException rollbackEx)
0981             {
0982                 _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
0983             }
0984             throw new RuntimeException(e);
0985         }
0986 
0987         try
0988         {
0989             for (QueueEntry entry : entries)
0990             {
0991                 if (!entry.isDeleted())
0992                 {
0993                     toQueue.enqueue(storeContext, entry.getMessage());
0994                 }
0995             }
0996         }
0997         catch (MessageCleanupException e)
0998         {
0999             throw new RuntimeException(e);
1000         }
1001         catch (AMQException e)
1002         {
1003             throw new RuntimeException(e);
1004         }
1005 
1006     }
1007 
1008     public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
1009     {
1010 
1011         try
1012         {
1013             QueueEntryIterator queueListIterator = _entries.iterator();
1014 
1015             while (queueListIterator.advance())
1016             {
1017                 QueueEntry node = queueListIterator.getNode();
1018 
1019                 final long messageId = node.getMessage().getMessageId();
1020 
1021                 if ((messageId >= fromMessageId)
1022                     && (messageId <= toMessageId)
1023                     && !node.isDeleted()
1024                     && node.acquire())
1025                 {
1026                     node.dequeueAndDelete(storeContext);
1027                 }
1028 
1029             }
1030         }
1031         catch (AMQException e)
1032         {
1033             throw new RuntimeException(e);
1034         }
1035 
1036     }
1037 
1038     // ------ Management functions
1039 
1040     public void deleteMessageFromTop(StoreContext storeContextthrows AMQException
1041     {
1042         QueueEntryIterator queueListIterator = _entries.iterator();
1043         boolean noDeletes = true;
1044 
1045         while (noDeletes && queueListIterator.advance())
1046         {
1047             QueueEntry node = queueListIterator.getNode();
1048             if (!node.isDeleted() && node.acquire())
1049             {
1050                 node.dequeueAndDelete(storeContext);
1051                 noDeletes = false;
1052             }
1053 
1054         }
1055     }
1056 
1057     public long clearQueue(StoreContext storeContextthrows AMQException
1058     {
1059 
1060         QueueEntryIterator queueListIterator = _entries.iterator();
1061         long count = 0;
1062 
1063         while (queueListIterator.advance())
1064         {
1065             QueueEntry node = queueListIterator.getNode();
1066             if (!node.isDeleted() && node.acquire())
1067             {
1068                 node.dequeueAndDelete(storeContext);
1069                 count++;
1070             }
1071 
1072         }
1073         return count;
1074 
1075     }
1076 
1077     public void addQueueDeleteTask(final Task task)
1078     {
1079         _deleteTaskList.add(task);
1080     }
1081 
1082     public int delete() throws AMQException
1083     {
1084         if (!_deleted.getAndSet(true))
1085         {
1086 
1087             SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
1088 
1089             while (subscriptionIter.advance())
1090             {
1091                 Subscription s = subscriptionIter.getNode().getSubscription();
1092                 if (s != null)
1093                 {
1094                     s.queueDeleted(this);
1095                 }
1096             }
1097 
1098             _bindings.deregister();
1099             _virtualHost.getQueueRegistry().unregisterQueue(_name);
1100 
1101             _managedObject.unregister();
1102             for (Task task : _deleteTaskList)
1103             {
1104                 task.doTask(this);
1105             }
1106 
1107             _deleteTaskList.clear();
1108             stop();
1109         }
1110         return getMessageCount();
1111 
1112     }
1113 
1114     public void stop()
1115     {
1116         if (!_stopped.getAndSet(true))
1117         {
1118             ReferenceCountingExecutorService.getInstance().releaseExecutorService();
1119         }
1120     }
1121 
1122     public void deliverAsync()
1123     {
1124         _stateChangeCount.incrementAndGet();
1125 
1126         Runner runner = new Runner();
1127 
1128         if (_asynchronousRunner.compareAndSet(null, runner))
1129         {
1130             _asyncDelivery.execute(runner);
1131         }
1132     }
1133 
1134     public void deliverAsync(Subscription sub)
1135     {
1136         _asyncDelivery.execute(new SubFlushRunner(sub));
1137     }
1138 
1139     private class Runner implements ReadWriteRunnable
1140     {
1141         public void run()
1142         {
1143             try
1144             {
1145                 processQueue(this);
1146             }
1147             catch (AMQException e)
1148             {
1149                 _logger.error(e);
1150             }
1151 
1152         }
1153 
1154         public boolean isRead()
1155         {
1156             return false;
1157         }
1158 
1159         public boolean isWrite()
1160         {
1161             return true;
1162         }
1163     }
1164 
1165     private class SubFlushRunner implements ReadWriteRunnable
1166     {
1167         private final Subscription _sub;
1168 
1169         public SubFlushRunner(Subscription sub)
1170         {
1171             _sub = sub;
1172         }
1173 
1174         public void run()
1175         {
1176             boolean complete = false;
1177             try
1178             {
1179                 complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
1180 
1181             }
1182             catch (AMQException e)
1183             {
1184                 _logger.error(e);
1185             }
1186             if (!complete && !_sub.isSuspended())
1187             {
1188                 _asyncDelivery.execute(this);
1189             }
1190 
1191         }
1192 
1193         public boolean isRead()
1194         {
1195             return false;
1196         }
1197 
1198         public boolean isWrite()
1199         {
1200             return true;
1201         }
1202     }
1203 
1204     public void flushSubscription(Subscription subthrows AMQException
1205     {
1206         flushSubscription(sub, Long.MAX_VALUE);
1207     }
1208 
1209     public boolean flushSubscription(Subscription sub, Long iterationsthrows AMQException
1210     {
1211         boolean atTail = false;
1212 
1213         while (!sub.isSuspended() && !atTail && iterations != 0)
1214         {
1215             try 
1216             {
1217                 sub.getSendLock();
1218                 atTail =  attemptDelivery(sub);
1219                 if (atTail && sub.isAutoClose())
1220                 {
1221                     unregisterSubscription(sub);
1222 
1223                     ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
1224                     converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
1225                 }
1226                 else if (!atTail)
1227                 {
1228                     iterations--;
1229                 }
1230             }
1231             finally
1232             {
1233                 sub.releaseSendLock();
1234             }
1235         }
1236 
1237         // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
1238         // next entry they are interested in yet.  This would lead to holding on to references to expired messages, etc
1239         // which would give us memory "leak".
1240 
1241         if (!isExclusiveSubscriber())
1242         {
1243             advanceAllSubscriptions();
1244         }
1245         return atTail;
1246     }
1247 
1248     private boolean attemptDelivery(Subscription subthrows AMQException
1249     {
1250         boolean atTail = false;
1251         boolean advanced = false;
1252         boolean subActive = sub.isActive();
1253         if (subActive)
1254         {
1255             QueueEntry node = moveSubscriptionToNextNode(sub);
1256             if (!(node.isAcquired() || node.isDeleted()))
1257             {
1258                 if (!sub.isSuspended())
1259                 {
1260                     if (sub.hasInterest(node))
1261                     {
1262                         if (!sub.wouldSuspend(node))
1263                         {
1264                             if (!sub.isBrowser() && !node.acquire(sub))
1265                             {
1266                                 sub.restoreCredit(node);
1267                             }
1268                             else
1269                             {
1270                                 deliverMessage(sub, node);
1271 
1272                                 if (sub.isBrowser())
1273                                 {
1274                                     QueueEntry newNode = _entries.next(node);
1275 
1276                                     if (newNode != null)
1277                                     {
1278                                         advanced = true;
1279                                         sub.setLastSeenEntry(node, newNode);
1280                                         node = sub.getLastSeenEntry();
1281                                     }
1282                                 }
1283                             }
1284 
1285                         }
1286                         else // Not enough Credit for message and wouldSuspend
1287                         {
1288                             //QPID-1187 - Treat the subscription as suspended for this message
1289                             // and wait for the message to be removed to continue delivery.
1290                             subActive = false;
1291                             node.addStateChangeListener(new QueueEntryListener(sub, node));
1292                         }
1293                     }
1294                     else
1295                     {
1296                         // this subscription is not interested in this node so we can skip over it
1297                         QueueEntry newNode = _entries.next(node);
1298                         if (newNode != null)
1299                         {
1300                             sub.setLastSeenEntry(node, newNode);
1301                         }
1302                     }
1303                 }
1304 
1305             }
1306             atTail = (_entries.next(node== null&& !advanced;
1307         }
1308         return atTail || !subActive;
1309     }
1310 
1311     protected void advanceAllSubscriptions() throws AMQException
1312     {
1313         SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
1314         while (subscriberIter.advance())
1315         {
1316             SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
1317             Subscription sub = subNode.getSubscription();
1318             moveSubscriptionToNextNode(sub);
1319         }
1320     }
1321 
1322     private QueueEntry moveSubscriptionToNextNode(final Subscription sub)
1323             throws AMQException
1324     {
1325         QueueEntry node = sub.getLastSeenEntry();
1326 
1327         while (node != null && (node.isAcquired() || node.isDeleted() || node.expired()))
1328         {
1329             if (!node.isAcquired() && !node.isDeleted() && node.expired())
1330             {
1331                 if (node.acquire())
1332                 {
1333                     // creating a new final store context per message seems wasteful.
1334                     final StoreContext reapingStoreContext = new StoreContext();
1335                     node.dequeueAndDelete(reapingStoreContext);
1336                 }
1337             }
1338             QueueEntry newNode = _entries.next(node);
1339             if (newNode != null)
1340             {
1341                 sub.setLastSeenEntry(node, newNode);
1342                 node = sub.getLastSeenEntry();
1343             }
1344             else
1345             {
1346                 break;
1347             }
1348 
1349         }
1350         return node;
1351     }
1352 
1353     private void processQueue(Runnable runnerthrows AMQException
1354     {
1355         long stateChangeCount;
1356         long previousStateChangeCount = Long.MIN_VALUE;
1357         boolean deliveryIncomplete = true;
1358 
1359         int extraLoops = 1;
1360         Long iterations = new Long(MAX_ASYNC_DELIVERIES);
1361 
1362         _asynchronousRunner.compareAndSet(runner, null);
1363 
1364         while (iterations != && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete&& _asynchronousRunner.compareAndSet(null, runner))
1365         {
1366             // we want to have one extra loop after every subscription has reached the point where it cannot move
1367             // further, just in case the advance of one subscription in the last loop allows a different subscription to
1368             // move forward in the next iteration
1369 
1370             if (previousStateChangeCount != stateChangeCount)
1371             {
1372                 extraLoops = 1;
1373             }
1374 
1375             previousStateChangeCount = stateChangeCount;
1376             deliveryIncomplete = _subscriptionList.size() != 0;
1377             boolean done = true;
1378 
1379             SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
1380             //iterate over the subscribers and try to advance their pointer
1381             while (subscriptionIter.advance())
1382             {
1383                 boolean closeConsumer = false;
1384                 Subscription sub = subscriptionIter.getNode().getSubscription();
1385                 sub.getSendLock();
1386                 try
1387                 {
1388                     if (sub != null)
1389                     {
1390 
1391                         QueueEntry node = moveSubscriptionToNextNode(sub);
1392                         if (node != null)
1393                         {
1394                             done = attemptDelivery(sub);
1395                         }
1396                     }
1397                     if (done)
1398                     {
1399                         if (extraLoops == 0)
1400                         {
1401                             deliveryIncomplete = false;
1402                             if (sub.isAutoClose())
1403                             {
1404                                 unregisterSubscription(sub);
1405 
1406                                 ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
1407                                 converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
1408                             }
1409                         }
1410                         else
1411                         {
1412                             extraLoops--;
1413                         }
1414                     }
1415                     else
1416                     {
1417                         iterations--;
1418                         extraLoops = 1;
1419                     }
1420                 }
1421                 finally
1422                 {
1423                     sub.releaseSendLock();
1424                 }
1425             }
1426             _asynchronousRunner.set(null);
1427         }
1428 
1429         // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
1430         // therefore we should schedule this runner again (unless someone beats us to it :-) ).
1431         if (iterations == && _asynchronousRunner.compareAndSet(null, runner))
1432         {
1433             _asyncDelivery.execute(runner);
1434         }
1435     }
1436 
1437     
1438     public void checkMessageStatus() throws AMQException
1439     {
1440 
1441         final StoreContext storeContext = new StoreContext();
1442 
1443         QueueEntryIterator queueListIterator = _entries.iterator();
1444 
1445         while (queueListIterator.advance())
1446         {
1447             QueueEntry node = queueListIterator.getNode();
1448             if (!node.isDeleted() && node.expired() && node.acquire())
1449             {
1450                 node.dequeueAndDelete(storeContext);
1451             
1452             else 
1453             {
1454                 _managedObject.checkForNotification(node.getMessage());
1455             }
1456         }
1457 
1458     }
1459 
1460     public long getMinimumAlertRepeatGap()
1461     {
1462         return _minimumAlertRepeatGap;
1463     }
1464 
1465     public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
1466     {
1467         _minimumAlertRepeatGap = minimumAlertRepeatGap;
1468     }
1469 
1470     public long getMaximumMessageAge()
1471     {
1472         return _maximumMessageAge;
1473     }
1474 
1475     public void setMaximumMessageAge(long maximumMessageAge)
1476     {
1477         _maximumMessageAge = maximumMessageAge;
1478         if (maximumMessageAge == 0L)
1479         {
1480             _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
1481         }
1482         else
1483         {
1484             _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
1485         }
1486     }
1487 
1488     public long getMaximumMessageCount()
1489     {
1490         return _maximumMessageCount;
1491     }
1492 
1493     public void setMaximumMessageCount(final long maximumMessageCount)
1494     {
1495         _maximumMessageCount = maximumMessageCount;
1496         if (maximumMessageCount == 0L)
1497         {
1498             _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
1499         }
1500         else
1501         {
1502             _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
1503         }
1504 
1505     }
1506 
1507     public long getMaximumQueueDepth()
1508     {
1509         return _maximumQueueDepth;
1510     }
1511 
1512     // Sets the queue depth, the max queue size
1513     public void setMaximumQueueDepth(final long maximumQueueDepth)
1514     {
1515         _maximumQueueDepth = maximumQueueDepth;
1516         if (maximumQueueDepth == 0L)
1517         {
1518             _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
1519         }
1520         else
1521         {
1522             _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
1523         }
1524 
1525     }
1526 
1527     public long getMaximumMessageSize()
1528     {
1529         return _maximumMessageSize;
1530     }
1531 
1532     public void setMaximumMessageSize(final long maximumMessageSize)
1533     {
1534         _maximumMessageSize = maximumMessageSize;
1535         if (maximumMessageSize == 0L)
1536         {
1537             _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
1538         }
1539         else
1540         {
1541             _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
1542         }
1543     }
1544 
1545     public Set<NotificationCheck> getNotificationChecks()
1546     {
1547         return _notificationChecks;
1548     }
1549 
1550     public ManagedObject getManagedObject()
1551     {
1552         return _managedObject;
1553     }
1554 
1555     private final class QueueEntryListener implements QueueEntry.StateChangeListener
1556     {
1557         private final QueueEntry _entry;
1558         private final Subscription _sub;
1559 
1560         public QueueEntryListener(final Subscription sub, final QueueEntry entry)
1561         {
1562             _entry = entry;
1563             _sub = sub;
1564         }
1565 
1566         public boolean equals(Object o)
1567         {
1568             return _entry == ((QueueEntryListenero)._entry && _sub == ((QueueEntryListenero)._sub;
1569         }
1570 
1571         public int hashCode()
1572         {
1573             return System.identityHashCode(_entry^ System.identityHashCode(_sub);
1574         }
1575 
1576         public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
1577         {
1578             entry.removeStateChangeListener(this);
1579             deliverAsync(_sub);
1580         }
1581     }
1582 
1583     public List<Long> getMessagesOnTheQueue(int num)
1584     {
1585         return getMessagesOnTheQueue(num, 0);
1586     }
1587 
1588     public List<Long> getMessagesOnTheQueue(int num, int offset)
1589     {
1590         ArrayList<Long> ids = new ArrayList<Long>(num);
1591         QueueEntryIterator it = _entries.iterator();
1592         for (int i = 0; i < offset; i++)
1593         {
1594             it.advance();
1595         }
1596 
1597         for (int i = 0; i < num && !it.atTail(); i++)
1598         {
1599             it.advance();
1600             ids.add(it.getNode().getMessage().getMessageId());
1601         }
1602         return ids;
1603     }
1604 }