ExtractResendAndRequeue.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server;
022 
023 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
024 import org.apache.qpid.server.queue.QueueEntry;
025 import org.apache.qpid.server.subscription.Subscription;
026 import org.apache.qpid.server.store.StoreContext;
027 import org.apache.qpid.AMQException;
028 import org.apache.log4j.Logger;
029 
030 import java.util.Map;
031 
032 public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
033 {
034     private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class);
035 
036     private Map<Long, QueueEntry> _msgToRequeue;
037     private Map<Long, QueueEntry> _msgToResend;
038     private boolean _requeueIfUnabletoResend;
039     private StoreContext _storeContext;
040     private UnacknowledgedMessageMap _unacknowledgedMessageMap;
041 
042     public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap, 
043                                    Map<Long, QueueEntry> msgToRequeue,
044                                    Map<Long, QueueEntry> msgToResend,
045                                    boolean requeueIfUnabletoResend,
046                                    StoreContext storeContext)
047     {
048         _unacknowledgedMessageMap = unacknowledgedMessageMap;
049         _msgToRequeue = msgToRequeue;
050         _msgToResend = msgToResend;
051         _requeueIfUnabletoResend = requeueIfUnabletoResend;
052         _storeContext = storeContext;
053     }
054 
055     public boolean callback(final long deliveryTag, QueueEntry queueEntrythrows AMQException
056     {
057        
058         queueEntry.setRedelivered(true);
059         final Subscription subscription = queueEntry.getDeliveredSubscription();
060         if (subscription != null)
061         {
062             // Consumer exists
063             if (!subscription.isClosed())
064             {
065                 _msgToResend.put(deliveryTag, queueEntry);
066             }
067             else // consumer has gone
068             {
069                 _msgToRequeue.put(deliveryTag, queueEntry);
070             }
071         }
072         else
073         {
074             // Message has no consumer tag, so was "delivered" to a GET
075             // or consumer no longer registered
076             // cannot resend, so re-queue.
077             if (!queueEntry.isQueueDeleted())
078             {
079                 if (_requeueIfUnabletoResend)
080                 {
081                     _msgToRequeue.put(deliveryTag, queueEntry);
082                 }
083                 else
084                 {
085                     queueEntry.dequeueAndDelete(_storeContext);
086                     _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry);
087                 }
088             }
089             else
090             {
091                 queueEntry.dequeueAndDelete(_storeContext);
092                 _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry);
093             }
094         }
095 
096         // false means continue processing
097         return false;
098     }
099 
100     public void visitComplete()
101     {
102         _unacknowledgedMessageMap.clear();
103     }
104 
105 }