TimeToLiveTest.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 
022 package org.apache.qpid.server.queue;
023 
024 import javax.jms.Connection;
025 import javax.jms.JMSException;
026 import javax.jms.Message;
027 import javax.jms.MessageConsumer;
028 import javax.jms.MessageProducer;
029 import javax.jms.Queue;
030 import javax.jms.Session;
031 
032 import junit.framework.Assert;
033 
034 import org.apache.log4j.Logger;
035 import org.apache.qpid.client.AMQDestination;
036 import org.apache.qpid.client.AMQSession;
037 import org.apache.qpid.test.utils.QpidTestCase;
038 
039 import java.util.concurrent.locks.ReentrantLock;
040 import java.util.concurrent.locks.Condition;
041 
042 public class TimeToLiveTest extends QpidTestCase
043 {
044     private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class);
045 
046     protected final String QUEUE = "TimeToLiveQueue";
047 
048     private final long TIME_TO_LIVE = 100L;
049 
050     private static final int MSG_COUNT = 50;
051     private static final long SERVER_TTL_TIMEOUT = 60000L;
052 
053     public void testPassiveTTL() throws Exception
054     {
055         //Create Client 1
056         Connection clientConnection = getConnection();
057         
058         Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
059         Queue queue = clientSession.createQueue(QUEUE)
060         
061         // Create then close the consumer so the queue is actually created
062         // Closing it then reopening it ensures that the consumer shouldn't get messages
063         // which should have expired and allows a shorter sleep period. See QPID-1418
064         
065         MessageConsumer consumer = clientSession.createConsumer(queue);
066         consumer.close();
067 
068         //Create Producer
069         Connection producerConnection = getConnection();
070 
071         producerConnection.start();
072 
073         Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
074 
075         MessageProducer producer = producerSession.createProducer(queue);
076 
077         //Set TTL
078         int msg = 0;
079         producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
080 
081         producer.setTimeToLive(TIME_TO_LIVE);
082 
083         for (; msg < MSG_COUNT - 2; msg++)
084         {
085             producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
086         }
087 
088         //Reset TTL
089         producer.setTimeToLive(0L);
090         producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
091 
092         consumer = clientSession.createConsumer(queue);
093 
094         // Ensure we sleep the required amount of time.
095         ReentrantLock waitLock = new ReentrantLock();
096         Condition wait = waitLock.newCondition();
097         final long MILLIS = 1000000L;
098         long waitTime = TIME_TO_LIVE * MILLIS;
099         while (waitTime > 0)
100         {
101             try
102             {
103                 waitLock.lock();
104 
105                 waitTime = wait.awaitNanos(waitTime);
106             }
107             catch (InterruptedException e)
108             {
109                 //Stop if we are interrupted
110                 fail(e.getMessage());
111             }
112             finally
113             {
114                 waitLock.unlock();
115             }
116 
117         }
118 
119         clientConnection.start();
120 
121         //Receive Message 0
122         Message receivedFirst = consumer.receive(1000);
123         Message receivedSecond = consumer.receive(1000);
124         Message receivedThird = consumer.receive(1000);
125         
126         // Only first and last messages sent should survive expiry
127         Assert.assertNull("More messages received", receivedThird)
128 
129         Assert.assertNotNull("First message not received", receivedFirst);
130         Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));
131         Assert.assertEquals("First message has incorrect TTL."0L, receivedFirst.getLongProperty("TTL"));
132 
133         Assert.assertNotNull("Final message not received", receivedSecond);
134         Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first"));
135         Assert.assertEquals("Final message has incorrect TTL."0L, receivedSecond.getLongProperty("TTL"));
136 
137         clientConnection.close();
138 
139         producerConnection.close();
140     }
141 
142     private Message nextMessage(String msg, boolean first, Session producerSession, MessageProducer producerthrows JMSException
143     {
144         Message send = producerSession.createTextMessage("Message " + msg);
145         send.setBooleanProperty("first", first);
146         send.setLongProperty("TTL", producer.getTimeToLive());
147         return send;
148     }
149 
150 
151     /**
152      * Tests the expired messages get actively deleted even on queues which have no consumers
153      @throws Exception 
154      */
155     public void testActiveTTL() throws Exception
156     {
157         Connection producerConnection = getConnection();
158         AMQSession producerSession = (AMQSessionproducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
159         Queue queue = producerSession.createTemporaryQueue();
160         producerSession.declareAndBind((AMQDestinationqueue);
161         MessageProducer producer = producerSession.createProducer(queue);
162         producer.setTimeToLive(1000L);
163 
164         // send Messages
165         for(int i = 0; i < MSG_COUNT; i++)
166         {
167             producer.send(producerSession.createTextMessage("Message: "+i));
168         }
169         long failureTime = System.currentTimeMillis() 2*SERVER_TTL_TIMEOUT;
170 
171         // check Queue depth for up to TIMEOUT seconds
172         long messageCount;
173 
174         do
175         {
176             Thread.sleep(100);
177             messageCount = producerSession.getQueueDepth((AMQDestinationqueue);
178         }
179         while(messageCount > 0L && System.currentTimeMillis() < failureTime);
180 
181         assertEquals("Messages not automatically expired: "0L, messageCount);
182 
183         producer.close();
184         producerSession.close();
185         producerConnection.close();
186     }
187 
188 }