001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server;
022
023 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
024 import org.apache.qpid.server.queue.QueueEntry;
025 import org.apache.qpid.server.subscription.Subscription;
026 import org.apache.qpid.server.store.StoreContext;
027 import org.apache.qpid.AMQException;
028 import org.apache.log4j.Logger;
029
030 import java.util.Map;
031
032 public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
033 {
034 private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class);
035
036 private Map<Long, QueueEntry> _msgToRequeue;
037 private Map<Long, QueueEntry> _msgToResend;
038 private boolean _requeueIfUnabletoResend;
039 private StoreContext _storeContext;
040 private UnacknowledgedMessageMap _unacknowledgedMessageMap;
041
042 public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
043 Map<Long, QueueEntry> msgToRequeue,
044 Map<Long, QueueEntry> msgToResend,
045 boolean requeueIfUnabletoResend,
046 StoreContext storeContext)
047 {
048 _unacknowledgedMessageMap = unacknowledgedMessageMap;
049 _msgToRequeue = msgToRequeue;
050 _msgToResend = msgToResend;
051 _requeueIfUnabletoResend = requeueIfUnabletoResend;
052 _storeContext = storeContext;
053 }
054
055 public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException
056 {
057
058 queueEntry.setRedelivered(true);
059 final Subscription subscription = queueEntry.getDeliveredSubscription();
060 if (subscription != null)
061 {
062 // Consumer exists
063 if (!subscription.isClosed())
064 {
065 _msgToResend.put(deliveryTag, queueEntry);
066 }
067 else // consumer has gone
068 {
069 _msgToRequeue.put(deliveryTag, queueEntry);
070 }
071 }
072 else
073 {
074 // Message has no consumer tag, so was "delivered" to a GET
075 // or consumer no longer registered
076 // cannot resend, so re-queue.
077 if (!queueEntry.isQueueDeleted())
078 {
079 if (_requeueIfUnabletoResend)
080 {
081 _msgToRequeue.put(deliveryTag, queueEntry);
082 }
083 else
084 {
085 queueEntry.dequeueAndDelete(_storeContext);
086 _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry);
087 }
088 }
089 else
090 {
091 queueEntry.dequeueAndDelete(_storeContext);
092 _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry);
093 }
094 }
095
096 // false means continue processing
097 return false;
098 }
099
100 public void visitComplete()
101 {
102 _unacknowledgedMessageMap.clear();
103 }
104
105 }
|