0001 package org.apache.qpid.server.queue;
0002
0003 import java.util.ArrayList;
0004 import java.util.EnumSet;
0005 import java.util.List;
0006 import java.util.Set;
0007 import java.util.concurrent.CopyOnWriteArrayList;
0008 import java.util.concurrent.Executor;
0009 import java.util.concurrent.atomic.AtomicBoolean;
0010 import java.util.concurrent.atomic.AtomicInteger;
0011 import java.util.concurrent.atomic.AtomicLong;
0012 import java.util.concurrent.atomic.AtomicReference;
0013
0014 import javax.management.JMException;
0015
0016 import org.apache.log4j.Logger;
0017 import org.apache.qpid.AMQException;
0018 import org.apache.qpid.framing.AMQShortString;
0019 import org.apache.qpid.framing.FieldTable;
0020 import org.apache.qpid.pool.ReadWriteRunnable;
0021 import org.apache.qpid.pool.ReferenceCountingExecutorService;
0022 import org.apache.qpid.server.configuration.QueueConfiguration;
0023 import org.apache.qpid.server.exchange.Exchange;
0024 import org.apache.qpid.server.management.ManagedObject;
0025 import org.apache.qpid.server.output.ProtocolOutputConverter;
0026 import org.apache.qpid.server.registry.ApplicationRegistry;
0027 import org.apache.qpid.server.store.StoreContext;
0028 import org.apache.qpid.server.subscription.Subscription;
0029 import org.apache.qpid.server.subscription.SubscriptionList;
0030 import org.apache.qpid.server.transactionlog.TransactionLog;
0031 import org.apache.qpid.server.virtualhost.VirtualHost;
0032
0033 /*
0034 *
0035 * Licensed to the Apache Software Foundation (ASF) under one
0036 * or more contributor license agreements. See the NOTICE file
0037 * distributed with this work for additional information
0038 * regarding copyright ownership. The ASF licenses this file
0039 * to you under the Apache License, Version 2.0 (the
0040 * "License"); you may not use this file except in compliance
0041 * with the License. You may obtain a copy of the License at
0042 *
0043 * http://www.apache.org/licenses/LICENSE-2.0
0044 *
0045 * Unless required by applicable law or agreed to in writing,
0046 * software distributed under the License is distributed on an
0047 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0048 * KIND, either express or implied. See the License for the
0049 * specific language governing permissions and limitations
0050 * under the License.
0051 *
0052 */
0053 public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
0054 {
0055 private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
0056
0057 private final AMQShortString _name;
0058
0059 /** null means shared */
0060 private final AMQShortString _owner;
0061
0062 private final boolean _durable;
0063
0064 /** If true, this queue is deleted when the last subscriber is removed */
0065 private final boolean _autoDelete;
0066
0067 private final VirtualHost _virtualHost;
0068
0069 /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
0070 private final ExchangeBindings _bindings = new ExchangeBindings(this);
0071
0072 private final AtomicBoolean _deleted = new AtomicBoolean(false);
0073
0074 private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
0075
0076 private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
0077
0078 private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
0079
0080 private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
0081
0082 protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
0083 private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
0084
0085 private volatile Subscription _exclusiveSubscriber;
0086
0087 protected final QueueEntryList _entries;
0088
0089 private final AMQQueueMBean _managedObject;
0090 private final Executor _asyncDelivery;
0091 private final AtomicLong _totalMessagesReceived = new AtomicLong();
0092
0093 /** max allowed size(KB) of a single message */
0094 public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
0095
0096 /** max allowed number of messages on a queue. */
0097 public long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount();
0098
0099 /** max queue depth for the queue */
0100 public long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth();
0101
0102 /** maximum message age before alerts occur */
0103 public long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge();
0104
0105 /** the minimum interval between sending out consecutive alerts of the same type */
0106 public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
0107
0108 private static final int MAX_ASYNC_DELIVERIES = 10;
0109
0110 private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
0111
0112 private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
0113 private AtomicReference _asynchronousRunner = new AtomicReference(null);
0114 private AtomicInteger _deliveredMessages = new AtomicInteger();
0115 private AtomicBoolean _stopped = new AtomicBoolean(false);
0116
0117 protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
0118 throws AMQException
0119 {
0120 this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory());
0121 }
0122
0123 protected SimpleAMQQueue(AMQShortString name,
0124 boolean durable,
0125 AMQShortString owner,
0126 boolean autoDelete,
0127 VirtualHost virtualHost,
0128 QueueEntryListFactory entryListFactory)
0129 throws AMQException
0130 {
0131
0132 if (name == null)
0133 {
0134 throw new IllegalArgumentException("Queue name must not be null");
0135 }
0136
0137 if (virtualHost == null)
0138 {
0139 throw new IllegalArgumentException("Virtual Host must not be null");
0140 }
0141
0142 _name = name;
0143 _durable = durable;
0144 _owner = owner;
0145 _autoDelete = autoDelete;
0146 _virtualHost = virtualHost;
0147 _entries = entryListFactory.createQueueEntryList(this);
0148
0149 _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
0150
0151 try
0152 {
0153 _managedObject = new AMQQueueMBean(this);
0154 _managedObject.register();
0155 }
0156 catch (JMException e)
0157 {
0158 throw new AMQException("AMQQueue MBean creation has failed ", e);
0159 }
0160
0161 resetNotifications();
0162
0163 }
0164
0165 public void resetNotifications()
0166 {
0167 // This ensure that the notification checks for the configured alerts are created.
0168 setMaximumMessageAge(_maximumMessageAge);
0169 setMaximumMessageCount(_maximumMessageCount);
0170 setMaximumMessageSize(_maximumMessageSize);
0171 setMaximumQueueDepth(_maximumQueueDepth);
0172 }
0173
0174 // ------ Getters and Setters
0175
0176 public AMQShortString getName()
0177 {
0178 return _name;
0179 }
0180
0181 public boolean isDurable()
0182 {
0183 return _durable;
0184 }
0185
0186 public boolean isAutoDelete()
0187 {
0188 return _autoDelete;
0189 }
0190
0191 public AMQShortString getOwner()
0192 {
0193 return _owner;
0194 }
0195
0196 public VirtualHost getVirtualHost()
0197 {
0198 return _virtualHost;
0199 }
0200
0201 // ------ bind and unbind
0202
0203 public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
0204 {
0205 exchange.registerQueue(routingKey, this, arguments);
0206 if (isDurable() && exchange.isDurable())
0207 {
0208 _virtualHost.getRoutingTable().bindQueue(exchange, routingKey, this, arguments);
0209 }
0210
0211 _bindings.addBinding(routingKey, arguments, exchange);
0212 }
0213
0214 public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
0215 {
0216 exchange.deregisterQueue(routingKey, this, arguments);
0217 if (isDurable() && exchange.isDurable())
0218 {
0219 _virtualHost.getRoutingTable().unbindQueue(exchange, routingKey, this, arguments);
0220 }
0221
0222 boolean removed = _bindings.remove(routingKey, arguments, exchange);
0223 if (!removed)
0224 {
0225 _logger.error("Mismatch between queue bindings and exchange record of bindings");
0226 }
0227 }
0228
0229 public List<ExchangeBinding> getExchangeBindings()
0230 {
0231 return new ArrayList<ExchangeBinding>(_bindings.getExchangeBindings());
0232 }
0233
0234 // ------ Manage Subscriptions
0235
0236 public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
0237 {
0238
0239 if (isExclusiveSubscriber())
0240 {
0241 throw new ExistingExclusiveSubscription();
0242 }
0243
0244 if (exclusive)
0245 {
0246 if (getConsumerCount() != 0)
0247 {
0248 throw new ExistingSubscriptionPreventsExclusive();
0249 }
0250 else
0251 {
0252 _exclusiveSubscriber = subscription;
0253
0254 }
0255 }
0256
0257 _activeSubscriberCount.incrementAndGet();
0258 subscription.setStateListener(this);
0259 subscription.setLastSeenEntry(null, _entries.getHead());
0260
0261 if (!isDeleted())
0262 {
0263 subscription.setQueue(this);
0264 _subscriptionList.add(subscription);
0265 if (isDeleted())
0266 {
0267 subscription.queueDeleted(this);
0268 }
0269 }
0270 else
0271 {
0272 // TODO
0273 }
0274
0275 deliverAsync(subscription);
0276
0277 }
0278
0279 public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
0280 {
0281 if (subscription == null)
0282 {
0283 throw new NullPointerException("subscription argument is null");
0284 }
0285
0286 boolean removed = _subscriptionList.remove(subscription);
0287
0288 if (removed)
0289 {
0290 subscription.close();
0291 // No longer can the queue have an exclusive consumer
0292 setExclusiveSubscriber(null);
0293
0294 QueueEntry lastSeen;
0295
0296 while ((lastSeen = subscription.getLastSeenEntry()) != null)
0297 {
0298 subscription.setLastSeenEntry(lastSeen, null);
0299 }
0300
0301 // auto-delete queues must be deleted if there are no remaining subscribers
0302
0303 if (_autoDelete && getConsumerCount() == 0)
0304 {
0305 if (_logger.isInfoEnabled())
0306 {
0307 _logger.info("Auto-deleteing queue:" + this);
0308 }
0309
0310 delete();
0311
0312 // we need to manually fire the event to the removed subscription (which was the last one left for this
0313 // queue. This is because the delete method uses the subscription set which has just been cleared
0314 subscription.queueDeleted(this);
0315 }
0316 }
0317
0318 }
0319
0320 // ------ Enqueue / Dequeue
0321
0322 public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
0323 {
0324
0325 incrementQueueCount();
0326 incrementQueueSize(message);
0327
0328 _totalMessagesReceived.incrementAndGet();
0329
0330 QueueEntry entry;
0331 Subscription exclusiveSub = _exclusiveSubscriber;
0332
0333 if (exclusiveSub != null)
0334 {
0335 exclusiveSub.getSendLock();
0336
0337 try
0338 {
0339 entry = _entries.add(message);
0340
0341 deliverToSubscription(exclusiveSub, entry);
0342
0343 // where there is more than one producer there's a reasonable chance that even though there is
0344 // no "queueing" we do not deliver because we get an interleving of _entries.add and
0345 // deliverToSubscription between threads. Therefore have one more try.
0346 if (!(entry.isAcquired() || entry.isDeleted()))
0347 {
0348 deliverToSubscription(exclusiveSub, entry);
0349 }
0350 }
0351 finally
0352 {
0353 exclusiveSub.releaseSendLock();
0354 }
0355 }
0356 else
0357 {
0358 entry = _entries.add(message);
0359 /*
0360
0361 iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
0362
0363 */
0364 SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
0365 SubscriptionList.SubscriptionNode nextNode = node.getNext();
0366 if (nextNode == null)
0367 {
0368 nextNode = _subscriptionList.getHead().getNext();
0369 }
0370 while (nextNode != null)
0371 {
0372 if (_lastSubscriptionNode.compareAndSet(node, nextNode))
0373 {
0374 break;
0375 }
0376 else
0377 {
0378 node = _lastSubscriptionNode.get();
0379 nextNode = node.getNext();
0380 if (nextNode == null)
0381 {
0382 nextNode = _subscriptionList.getHead().getNext();
0383 }
0384 }
0385 }
0386
0387 // always do one extra loop after we believe we've finished
0388 // this catches the case where we *just* miss an update
0389 int loops = 2;
0390
0391 while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
0392 {
0393 if (nextNode == null)
0394 {
0395 loops--;
0396 nextNode = _subscriptionList.getHead();
0397 }
0398 else
0399 {
0400 // if subscription at end, and active, offer
0401 Subscription sub = nextNode.getSubscription();
0402 deliverToSubscription(sub, entry);
0403 }
0404 nextNode = nextNode.getNext();
0405
0406 }
0407 }
0408
0409 if (entry.immediateAndNotDelivered())
0410 {
0411 //We acquire the message here to ensure that the dequeueAndDelete will correctly remove the content
0412 // from the transactionLog. This saves us from having to have a custom dequeueAndDelete that checks
0413 // for the AVAILABLE state of an entry rather than the ACQUIRED state that it currently uses.
0414 entry.acquire();
0415 entry.dequeueAndDelete(storeContext);
0416 }
0417 else if (!(entry.isAcquired() || entry.isDeleted()))
0418 {
0419 checkSubscriptionsNotAheadOfDelivery(entry);
0420
0421 deliverAsync();
0422 }
0423
0424 _managedObject.checkForNotification(entry.getMessage());
0425
0426 return entry;
0427 }
0428
0429 private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
0430 throws AMQException
0431 {
0432
0433 sub.getSendLock();
0434 try
0435 {
0436 if (subscriptionReadyAndHasInterest(sub, entry)
0437 && !sub.isSuspended())
0438 {
0439 if (!sub.wouldSuspend(entry))
0440 {
0441 if (!sub.isBrowser() && !entry.acquire(sub))
0442 {
0443 // restore credit here that would have been taken away by wouldSuspend since we didn't manage
0444 // to acquire the entry for this subscription
0445 sub.restoreCredit(entry);
0446 }
0447 else
0448 {
0449
0450 deliverMessage(sub, entry);
0451
0452 }
0453 }
0454 }
0455 }
0456 finally
0457 {
0458 sub.releaseSendLock();
0459 }
0460 }
0461
0462 protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
0463 {
0464 // This method is only required for queues which mess with ordering
0465 // Simple Queues don't :-)
0466 }
0467
0468 private void incrementQueueSize(final AMQMessage message)
0469 {
0470 getAtomicQueueSize().addAndGet(message.getSize());
0471 }
0472
0473 private void incrementQueueCount()
0474 {
0475 getAtomicQueueCount().incrementAndGet();
0476 }
0477
0478 private void deliverMessage(final Subscription sub, final QueueEntry entry)
0479 throws AMQException
0480 {
0481 _deliveredMessages.incrementAndGet();
0482 sub.send(entry);
0483
0484 }
0485
0486 private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
0487 {
0488
0489 // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no
0490 // interest in.
0491 QueueEntry node = sub.getLastSeenEntry();
0492 while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)))
0493 {
0494
0495 QueueEntry newNode = _entries.next(node);
0496 if (newNode != null)
0497 {
0498 sub.setLastSeenEntry(node, newNode);
0499 node = sub.getLastSeenEntry();
0500 }
0501 else
0502 {
0503 node = null;
0504 break;
0505 }
0506
0507 }
0508
0509 if (node == entry)
0510 {
0511 // If the first entry that subscription can process is the one we are trying to deliver to it, then we are
0512 // good
0513 return true;
0514 }
0515 else
0516 {
0517 // Otherwise we should try to update the subscription's last seen entry to the entry we got to, providing
0518 // no-one else has updated it to something furhter on in the list
0519 //TODO - check
0520 //updateLastSeenEntry(sub, entry);
0521 return false;
0522 }
0523
0524 }
0525
0526 private void updateLastSeenEntry(final Subscription sub, final QueueEntry entry)
0527 {
0528 QueueEntry node = sub.getLastSeenEntry();
0529
0530 if (node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
0531 {
0532 do
0533 {
0534 if (sub.setLastSeenEntry(node, entry))
0535 {
0536 return;
0537 }
0538 else
0539 {
0540 node = sub.getLastSeenEntry();
0541 }
0542 }
0543 while (node != null && entry.compareTo(node) < 0);
0544 }
0545
0546 }
0547
0548 public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
0549 {
0550
0551 SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
0552 // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
0553 while (subscriberIter.advance())
0554 {
0555 Subscription sub = subscriberIter.getNode().getSubscription();
0556
0557 // we don't make browsers send the same stuff twice
0558 if (!sub.isBrowser())
0559 {
0560 updateLastSeenEntry(sub, entry);
0561 }
0562 }
0563
0564 deliverAsync();
0565
0566 }
0567
0568 /**
0569 * Only call from queue Entry
0570 * @param storeContext
0571 * @param entry
0572 * @throws FailedDequeueException
0573 */
0574 public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
0575 {
0576 decrementQueueCount();
0577 decrementQueueSize(entry);
0578 if (entry.acquiredBySubscription())
0579 {
0580 _deliveredMessages.decrementAndGet();
0581 }
0582
0583 try
0584 {
0585 AMQMessage msg = entry.getMessage();
0586 if (msg.isPersistent())
0587 {
0588 _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId());
0589 }
0590
0591 }
0592 catch (MessageCleanupException e)
0593 {
0594 // Message was dequeued, but could not then be deleted
0595 // though it is no longer referenced. This should be very
0596 // rare and can be detected and cleaned up on recovery or
0597 // done through some form of manual intervention.
0598 _logger.error(e, e);
0599 }
0600 catch (AMQException e)
0601 {
0602 throw new FailedDequeueException(_name.toString(), e);
0603 }
0604
0605 }
0606
0607 private void decrementQueueSize(final QueueEntry entry)
0608 {
0609 getAtomicQueueSize().addAndGet(-entry.getMessage().getSize());
0610 }
0611
0612 void decrementQueueCount()
0613 {
0614 getAtomicQueueCount().decrementAndGet();
0615 }
0616
0617 public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
0618 {
0619 /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
0620 entry to resend and move back the subscription pointer. */
0621
0622 subscription.getSendLock();
0623 try
0624 {
0625 if (!subscription.isClosed())
0626 {
0627 deliverMessage(subscription, entry);
0628 return true;
0629 }
0630 else
0631 {
0632 return false;
0633 }
0634 }
0635 finally
0636 {
0637 subscription.releaseSendLock();
0638 }
0639 }
0640
0641 public int getConsumerCount()
0642 {
0643 return _subscriptionList.size();
0644 }
0645
0646 public int getActiveConsumerCount()
0647 {
0648 return _activeSubscriberCount.get();
0649 }
0650
0651 public boolean isUnused()
0652 {
0653 return getConsumerCount() == 0;
0654 }
0655
0656 public boolean isEmpty()
0657 {
0658 return getMessageCount() == 0;
0659 }
0660
0661 public int getMessageCount()
0662 {
0663 return getAtomicQueueCount().get();
0664 }
0665
0666 public long getQueueDepth()
0667 {
0668 return getAtomicQueueSize().get();
0669 }
0670
0671 public int getUndeliveredMessageCount()
0672 {
0673 int count = getMessageCount() - _deliveredMessages.get();
0674 if (count < 0)
0675 {
0676 return 0;
0677 }
0678 else
0679 {
0680 return count;
0681 }
0682 }
0683
0684 public long getReceivedMessageCount()
0685 {
0686 return _totalMessagesReceived.get();
0687 }
0688
0689 public long getOldestMessageArrivalTime()
0690 {
0691 QueueEntry entry = getOldestQueueEntry();
0692 return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
0693 }
0694
0695 protected QueueEntry getOldestQueueEntry()
0696 {
0697 return _entries.next(_entries.getHead());
0698 }
0699
0700 public boolean isDeleted()
0701 {
0702 return _deleted.get();
0703 }
0704
0705 public List<QueueEntry> getMessagesOnTheQueue()
0706 {
0707 ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
0708 QueueEntryIterator queueListIterator = _entries.iterator();
0709 while (queueListIterator.advance())
0710 {
0711 QueueEntry node = queueListIterator.getNode();
0712 if (node != null && !node.isDeleted())
0713 {
0714 entryList.add(node);
0715 }
0716 }
0717 return entryList;
0718
0719 }
0720
0721 public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
0722 {
0723 if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
0724 {
0725 _activeSubscriberCount.decrementAndGet();
0726
0727 }
0728 else if (newState == Subscription.State.ACTIVE)
0729 {
0730 if (oldState != Subscription.State.ACTIVE)
0731 {
0732 _activeSubscriberCount.incrementAndGet();
0733
0734 }
0735 deliverAsync(sub);
0736 }
0737 }
0738
0739 public int compareTo(final AMQQueue o)
0740 {
0741 return _name.compareTo(o.getName());
0742 }
0743
0744 public AtomicInteger getAtomicQueueCount()
0745 {
0746 return _atomicQueueCount;
0747 }
0748
0749 public AtomicLong getAtomicQueueSize()
0750 {
0751 return _atomicQueueSize;
0752 }
0753
0754 private boolean isExclusiveSubscriber()
0755 {
0756 return _exclusiveSubscriber != null;
0757 }
0758
0759 private void setExclusiveSubscriber(Subscription exclusiveSubscriber)
0760 {
0761 _exclusiveSubscriber = exclusiveSubscriber;
0762 }
0763
0764 public static interface QueueEntryFilter
0765 {
0766 public boolean accept(QueueEntry entry);
0767
0768 public boolean filterComplete();
0769 }
0770
0771 public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
0772 {
0773 return getMessagesOnTheQueue(new QueueEntryFilter()
0774 {
0775
0776 public boolean accept(QueueEntry entry)
0777 {
0778 final long messageId = entry.getMessage().getMessageId();
0779 return messageId >= fromMessageId && messageId <= toMessageId;
0780 }
0781
0782 public boolean filterComplete()
0783 {
0784 return false;
0785 }
0786 });
0787 }
0788
0789 public QueueEntry getMessageOnTheQueue(final long messageId)
0790 {
0791 List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
0792 {
0793 private boolean _complete;
0794
0795 public boolean accept(QueueEntry entry)
0796 {
0797 _complete = entry.getMessage().getMessageId() == messageId;
0798 return _complete;
0799 }
0800
0801 public boolean filterComplete()
0802 {
0803 return _complete;
0804 }
0805 });
0806 return entries.isEmpty() ? null : entries.get(0);
0807 }
0808
0809 public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
0810 {
0811 ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
0812 QueueEntryIterator queueListIterator = _entries.iterator();
0813 while (queueListIterator.advance() && !filter.filterComplete())
0814 {
0815 QueueEntry node = queueListIterator.getNode();
0816 if (!node.isDeleted() && filter.accept(node))
0817 {
0818 entryList.add(node);
0819 }
0820 }
0821 return entryList;
0822
0823 }
0824
0825
0826 public void moveMessagesToAnotherQueue(final long fromMessageId,
0827 final long toMessageId,
0828 String queueName,
0829 StoreContext storeContext)
0830 {
0831 // The move is a two step process. First the messages are moved in the _transactionLog.
0832 // That is persistent messages are moved queues on disk for recovery and the QueueEntries removed from the
0833 // existing queue.
0834 // This is done as Queue.enqueue() does not write the data to the transactionLog. In normal message delivery
0835 // this is done as the message is recieved.
0836 // So The final step is to enqueue the messages on the new queue.
0837
0838 AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
0839 TransactionLog transactionLog = getVirtualHost().getTransactionLog();
0840
0841 List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
0842 {
0843
0844 public boolean accept(QueueEntry entry)
0845 {
0846 final long messageId = entry.getMessage().getMessageId();
0847 return (messageId >= fromMessageId)
0848 && (messageId <= toMessageId)
0849 && entry.acquire();
0850 }
0851
0852 public boolean filterComplete()
0853 {
0854 return false;
0855 }
0856 });
0857
0858 try
0859 {
0860 transactionLog.beginTran(storeContext);
0861
0862 // Move the messages in the transaction log.
0863 for (QueueEntry entry : entries)
0864 {
0865 AMQMessage message = entry.getMessage();
0866
0867 if (message.isPersistent() && toQueue.isDurable())
0868 {
0869 transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
0870 }
0871 // dequeue will remove the messages from the queue
0872 entry.dequeue(storeContext);
0873 }
0874
0875 // Commit and flush the move transcations.
0876 try
0877 {
0878 transactionLog.commitTran(storeContext);
0879 }
0880 catch (AMQException e)
0881 {
0882 throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e);
0883 }
0884 }
0885 catch (AMQException e)
0886 {
0887 try
0888 {
0889 transactionLog.abortTran(storeContext);
0890 }
0891 catch (AMQException rollbackEx)
0892 {
0893 _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
0894 }
0895 throw new RuntimeException(e);
0896 }
0897
0898 try
0899 {
0900 // Add messages to new queue
0901 for (QueueEntry entry : entries)
0902 {
0903 toQueue.enqueue(storeContext, entry.getMessage());
0904 }
0905 }
0906 catch (MessageCleanupException e)
0907 {
0908 throw new RuntimeException(e);
0909 }
0910 catch (AMQException e)
0911 {
0912 throw new RuntimeException(e);
0913 }
0914
0915 }
0916
0917 public void copyMessagesToAnotherQueue(final long fromMessageId,
0918 final long toMessageId,
0919 String queueName,
0920 final StoreContext storeContext)
0921 {
0922 AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
0923 TransactionLog transactionLog = getVirtualHost().getTransactionLog();
0924
0925 List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
0926 {
0927
0928 public boolean accept(QueueEntry entry)
0929 {
0930 final long messageId = entry.getMessage().getMessageId();
0931 if ((messageId >= fromMessageId)
0932 && (messageId <= toMessageId))
0933 {
0934 if (!entry.isDeleted())
0935 {
0936 return true;
0937 }
0938 }
0939
0940 return false;
0941 }
0942
0943 public boolean filterComplete()
0944 {
0945 return false;
0946 }
0947 });
0948
0949 try
0950 {
0951 transactionLog.beginTran(storeContext);
0952
0953 // Move the messages in on the transaction log.
0954 for (QueueEntry entry : entries)
0955 {
0956 AMQMessage message = entry.getMessage();
0957
0958 if (!entry.isDeleted() && message.isPersistent() && toQueue.isDurable())
0959 {
0960 transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
0961 }
0962 }
0963
0964 // Commit and flush the move transcations.
0965 try
0966 {
0967 transactionLog.commitTran(storeContext);
0968 }
0969 catch (AMQException e)
0970 {
0971 throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e);
0972 }
0973 }
0974 catch (AMQException e)
0975 {
0976 try
0977 {
0978 transactionLog.abortTran(storeContext);
0979 }
0980 catch (AMQException rollbackEx)
0981 {
0982 _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
0983 }
0984 throw new RuntimeException(e);
0985 }
0986
0987 try
0988 {
0989 for (QueueEntry entry : entries)
0990 {
0991 if (!entry.isDeleted())
0992 {
0993 toQueue.enqueue(storeContext, entry.getMessage());
0994 }
0995 }
0996 }
0997 catch (MessageCleanupException e)
0998 {
0999 throw new RuntimeException(e);
1000 }
1001 catch (AMQException e)
1002 {
1003 throw new RuntimeException(e);
1004 }
1005
1006 }
1007
1008 public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
1009 {
1010
1011 try
1012 {
1013 QueueEntryIterator queueListIterator = _entries.iterator();
1014
1015 while (queueListIterator.advance())
1016 {
1017 QueueEntry node = queueListIterator.getNode();
1018
1019 final long messageId = node.getMessage().getMessageId();
1020
1021 if ((messageId >= fromMessageId)
1022 && (messageId <= toMessageId)
1023 && !node.isDeleted()
1024 && node.acquire())
1025 {
1026 node.dequeueAndDelete(storeContext);
1027 }
1028
1029 }
1030 }
1031 catch (AMQException e)
1032 {
1033 throw new RuntimeException(e);
1034 }
1035
1036 }
1037
1038 // ------ Management functions
1039
1040 public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
1041 {
1042 QueueEntryIterator queueListIterator = _entries.iterator();
1043 boolean noDeletes = true;
1044
1045 while (noDeletes && queueListIterator.advance())
1046 {
1047 QueueEntry node = queueListIterator.getNode();
1048 if (!node.isDeleted() && node.acquire())
1049 {
1050 node.dequeueAndDelete(storeContext);
1051 noDeletes = false;
1052 }
1053
1054 }
1055 }
1056
1057 public long clearQueue(StoreContext storeContext) throws AMQException
1058 {
1059
1060 QueueEntryIterator queueListIterator = _entries.iterator();
1061 long count = 0;
1062
1063 while (queueListIterator.advance())
1064 {
1065 QueueEntry node = queueListIterator.getNode();
1066 if (!node.isDeleted() && node.acquire())
1067 {
1068 node.dequeueAndDelete(storeContext);
1069 count++;
1070 }
1071
1072 }
1073 return count;
1074
1075 }
1076
1077 public void addQueueDeleteTask(final Task task)
1078 {
1079 _deleteTaskList.add(task);
1080 }
1081
1082 public int delete() throws AMQException
1083 {
1084 if (!_deleted.getAndSet(true))
1085 {
1086
1087 SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
1088
1089 while (subscriptionIter.advance())
1090 {
1091 Subscription s = subscriptionIter.getNode().getSubscription();
1092 if (s != null)
1093 {
1094 s.queueDeleted(this);
1095 }
1096 }
1097
1098 _bindings.deregister();
1099 _virtualHost.getQueueRegistry().unregisterQueue(_name);
1100
1101 _managedObject.unregister();
1102 for (Task task : _deleteTaskList)
1103 {
1104 task.doTask(this);
1105 }
1106
1107 _deleteTaskList.clear();
1108 stop();
1109 }
1110 return getMessageCount();
1111
1112 }
1113
1114 public void stop()
1115 {
1116 if (!_stopped.getAndSet(true))
1117 {
1118 ReferenceCountingExecutorService.getInstance().releaseExecutorService();
1119 }
1120 }
1121
1122 public void deliverAsync()
1123 {
1124 _stateChangeCount.incrementAndGet();
1125
1126 Runner runner = new Runner();
1127
1128 if (_asynchronousRunner.compareAndSet(null, runner))
1129 {
1130 _asyncDelivery.execute(runner);
1131 }
1132 }
1133
1134 public void deliverAsync(Subscription sub)
1135 {
1136 _asyncDelivery.execute(new SubFlushRunner(sub));
1137 }
1138
1139 private class Runner implements ReadWriteRunnable
1140 {
1141 public void run()
1142 {
1143 try
1144 {
1145 processQueue(this);
1146 }
1147 catch (AMQException e)
1148 {
1149 _logger.error(e);
1150 }
1151
1152 }
1153
1154 public boolean isRead()
1155 {
1156 return false;
1157 }
1158
1159 public boolean isWrite()
1160 {
1161 return true;
1162 }
1163 }
1164
1165 private class SubFlushRunner implements ReadWriteRunnable
1166 {
1167 private final Subscription _sub;
1168
1169 public SubFlushRunner(Subscription sub)
1170 {
1171 _sub = sub;
1172 }
1173
1174 public void run()
1175 {
1176 boolean complete = false;
1177 try
1178 {
1179 complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
1180
1181 }
1182 catch (AMQException e)
1183 {
1184 _logger.error(e);
1185 }
1186 if (!complete && !_sub.isSuspended())
1187 {
1188 _asyncDelivery.execute(this);
1189 }
1190
1191 }
1192
1193 public boolean isRead()
1194 {
1195 return false;
1196 }
1197
1198 public boolean isWrite()
1199 {
1200 return true;
1201 }
1202 }
1203
1204 public void flushSubscription(Subscription sub) throws AMQException
1205 {
1206 flushSubscription(sub, Long.MAX_VALUE);
1207 }
1208
1209 public boolean flushSubscription(Subscription sub, Long iterations) throws AMQException
1210 {
1211 boolean atTail = false;
1212
1213 while (!sub.isSuspended() && !atTail && iterations != 0)
1214 {
1215 try
1216 {
1217 sub.getSendLock();
1218 atTail = attemptDelivery(sub);
1219 if (atTail && sub.isAutoClose())
1220 {
1221 unregisterSubscription(sub);
1222
1223 ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
1224 converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
1225 }
1226 else if (!atTail)
1227 {
1228 iterations--;
1229 }
1230 }
1231 finally
1232 {
1233 sub.releaseSendLock();
1234 }
1235 }
1236
1237 // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
1238 // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
1239 // which would give us memory "leak".
1240
1241 if (!isExclusiveSubscriber())
1242 {
1243 advanceAllSubscriptions();
1244 }
1245 return atTail;
1246 }
1247
1248 private boolean attemptDelivery(Subscription sub) throws AMQException
1249 {
1250 boolean atTail = false;
1251 boolean advanced = false;
1252 boolean subActive = sub.isActive();
1253 if (subActive)
1254 {
1255 QueueEntry node = moveSubscriptionToNextNode(sub);
1256 if (!(node.isAcquired() || node.isDeleted()))
1257 {
1258 if (!sub.isSuspended())
1259 {
1260 if (sub.hasInterest(node))
1261 {
1262 if (!sub.wouldSuspend(node))
1263 {
1264 if (!sub.isBrowser() && !node.acquire(sub))
1265 {
1266 sub.restoreCredit(node);
1267 }
1268 else
1269 {
1270 deliverMessage(sub, node);
1271
1272 if (sub.isBrowser())
1273 {
1274 QueueEntry newNode = _entries.next(node);
1275
1276 if (newNode != null)
1277 {
1278 advanced = true;
1279 sub.setLastSeenEntry(node, newNode);
1280 node = sub.getLastSeenEntry();
1281 }
1282 }
1283 }
1284
1285 }
1286 else // Not enough Credit for message and wouldSuspend
1287 {
1288 //QPID-1187 - Treat the subscription as suspended for this message
1289 // and wait for the message to be removed to continue delivery.
1290 subActive = false;
1291 node.addStateChangeListener(new QueueEntryListener(sub, node));
1292 }
1293 }
1294 else
1295 {
1296 // this subscription is not interested in this node so we can skip over it
1297 QueueEntry newNode = _entries.next(node);
1298 if (newNode != null)
1299 {
1300 sub.setLastSeenEntry(node, newNode);
1301 }
1302 }
1303 }
1304
1305 }
1306 atTail = (_entries.next(node) == null) && !advanced;
1307 }
1308 return atTail || !subActive;
1309 }
1310
1311 protected void advanceAllSubscriptions() throws AMQException
1312 {
1313 SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
1314 while (subscriberIter.advance())
1315 {
1316 SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
1317 Subscription sub = subNode.getSubscription();
1318 moveSubscriptionToNextNode(sub);
1319 }
1320 }
1321
1322 private QueueEntry moveSubscriptionToNextNode(final Subscription sub)
1323 throws AMQException
1324 {
1325 QueueEntry node = sub.getLastSeenEntry();
1326
1327 while (node != null && (node.isAcquired() || node.isDeleted() || node.expired()))
1328 {
1329 if (!node.isAcquired() && !node.isDeleted() && node.expired())
1330 {
1331 if (node.acquire())
1332 {
1333 // creating a new final store context per message seems wasteful.
1334 final StoreContext reapingStoreContext = new StoreContext();
1335 node.dequeueAndDelete(reapingStoreContext);
1336 }
1337 }
1338 QueueEntry newNode = _entries.next(node);
1339 if (newNode != null)
1340 {
1341 sub.setLastSeenEntry(node, newNode);
1342 node = sub.getLastSeenEntry();
1343 }
1344 else
1345 {
1346 break;
1347 }
1348
1349 }
1350 return node;
1351 }
1352
1353 private void processQueue(Runnable runner) throws AMQException
1354 {
1355 long stateChangeCount;
1356 long previousStateChangeCount = Long.MIN_VALUE;
1357 boolean deliveryIncomplete = true;
1358
1359 int extraLoops = 1;
1360 Long iterations = new Long(MAX_ASYNC_DELIVERIES);
1361
1362 _asynchronousRunner.compareAndSet(runner, null);
1363
1364 while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
1365 {
1366 // we want to have one extra loop after every subscription has reached the point where it cannot move
1367 // further, just in case the advance of one subscription in the last loop allows a different subscription to
1368 // move forward in the next iteration
1369
1370 if (previousStateChangeCount != stateChangeCount)
1371 {
1372 extraLoops = 1;
1373 }
1374
1375 previousStateChangeCount = stateChangeCount;
1376 deliveryIncomplete = _subscriptionList.size() != 0;
1377 boolean done = true;
1378
1379 SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
1380 //iterate over the subscribers and try to advance their pointer
1381 while (subscriptionIter.advance())
1382 {
1383 boolean closeConsumer = false;
1384 Subscription sub = subscriptionIter.getNode().getSubscription();
1385 sub.getSendLock();
1386 try
1387 {
1388 if (sub != null)
1389 {
1390
1391 QueueEntry node = moveSubscriptionToNextNode(sub);
1392 if (node != null)
1393 {
1394 done = attemptDelivery(sub);
1395 }
1396 }
1397 if (done)
1398 {
1399 if (extraLoops == 0)
1400 {
1401 deliveryIncomplete = false;
1402 if (sub.isAutoClose())
1403 {
1404 unregisterSubscription(sub);
1405
1406 ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
1407 converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
1408 }
1409 }
1410 else
1411 {
1412 extraLoops--;
1413 }
1414 }
1415 else
1416 {
1417 iterations--;
1418 extraLoops = 1;
1419 }
1420 }
1421 finally
1422 {
1423 sub.releaseSendLock();
1424 }
1425 }
1426 _asynchronousRunner.set(null);
1427 }
1428
1429 // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
1430 // therefore we should schedule this runner again (unless someone beats us to it :-) ).
1431 if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
1432 {
1433 _asyncDelivery.execute(runner);
1434 }
1435 }
1436
1437
1438 public void checkMessageStatus() throws AMQException
1439 {
1440
1441 final StoreContext storeContext = new StoreContext();
1442
1443 QueueEntryIterator queueListIterator = _entries.iterator();
1444
1445 while (queueListIterator.advance())
1446 {
1447 QueueEntry node = queueListIterator.getNode();
1448 if (!node.isDeleted() && node.expired() && node.acquire())
1449 {
1450 node.dequeueAndDelete(storeContext);
1451 }
1452 else
1453 {
1454 _managedObject.checkForNotification(node.getMessage());
1455 }
1456 }
1457
1458 }
1459
1460 public long getMinimumAlertRepeatGap()
1461 {
1462 return _minimumAlertRepeatGap;
1463 }
1464
1465 public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
1466 {
1467 _minimumAlertRepeatGap = minimumAlertRepeatGap;
1468 }
1469
1470 public long getMaximumMessageAge()
1471 {
1472 return _maximumMessageAge;
1473 }
1474
1475 public void setMaximumMessageAge(long maximumMessageAge)
1476 {
1477 _maximumMessageAge = maximumMessageAge;
1478 if (maximumMessageAge == 0L)
1479 {
1480 _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
1481 }
1482 else
1483 {
1484 _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
1485 }
1486 }
1487
1488 public long getMaximumMessageCount()
1489 {
1490 return _maximumMessageCount;
1491 }
1492
1493 public void setMaximumMessageCount(final long maximumMessageCount)
1494 {
1495 _maximumMessageCount = maximumMessageCount;
1496 if (maximumMessageCount == 0L)
1497 {
1498 _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
1499 }
1500 else
1501 {
1502 _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
1503 }
1504
1505 }
1506
1507 public long getMaximumQueueDepth()
1508 {
1509 return _maximumQueueDepth;
1510 }
1511
1512 // Sets the queue depth, the max queue size
1513 public void setMaximumQueueDepth(final long maximumQueueDepth)
1514 {
1515 _maximumQueueDepth = maximumQueueDepth;
1516 if (maximumQueueDepth == 0L)
1517 {
1518 _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
1519 }
1520 else
1521 {
1522 _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
1523 }
1524
1525 }
1526
1527 public long getMaximumMessageSize()
1528 {
1529 return _maximumMessageSize;
1530 }
1531
1532 public void setMaximumMessageSize(final long maximumMessageSize)
1533 {
1534 _maximumMessageSize = maximumMessageSize;
1535 if (maximumMessageSize == 0L)
1536 {
1537 _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
1538 }
1539 else
1540 {
1541 _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
1542 }
1543 }
1544
1545 public Set<NotificationCheck> getNotificationChecks()
1546 {
1547 return _notificationChecks;
1548 }
1549
1550 public ManagedObject getManagedObject()
1551 {
1552 return _managedObject;
1553 }
1554
1555 private final class QueueEntryListener implements QueueEntry.StateChangeListener
1556 {
1557 private final QueueEntry _entry;
1558 private final Subscription _sub;
1559
1560 public QueueEntryListener(final Subscription sub, final QueueEntry entry)
1561 {
1562 _entry = entry;
1563 _sub = sub;
1564 }
1565
1566 public boolean equals(Object o)
1567 {
1568 return _entry == ((QueueEntryListener) o)._entry && _sub == ((QueueEntryListener) o)._sub;
1569 }
1570
1571 public int hashCode()
1572 {
1573 return System.identityHashCode(_entry) ^ System.identityHashCode(_sub);
1574 }
1575
1576 public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
1577 {
1578 entry.removeStateChangeListener(this);
1579 deliverAsync(_sub);
1580 }
1581 }
1582
1583 public List<Long> getMessagesOnTheQueue(int num)
1584 {
1585 return getMessagesOnTheQueue(num, 0);
1586 }
1587
1588 public List<Long> getMessagesOnTheQueue(int num, int offset)
1589 {
1590 ArrayList<Long> ids = new ArrayList<Long>(num);
1591 QueueEntryIterator it = _entries.iterator();
1592 for (int i = 0; i < offset; i++)
1593 {
1594 it.advance();
1595 }
1596
1597 for (int i = 0; i < num && !it.atTail(); i++)
1598 {
1599 it.advance();
1600 ids.add(it.getNode().getMessage().getMessageId());
1601 }
1602 return ids;
1603 }
1604 }
|