001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021
022 package org.apache.qpid.server.queue;
023
024 import javax.jms.Connection;
025 import javax.jms.JMSException;
026 import javax.jms.Message;
027 import javax.jms.MessageConsumer;
028 import javax.jms.MessageProducer;
029 import javax.jms.Queue;
030 import javax.jms.Session;
031
032 import junit.framework.Assert;
033
034 import org.apache.log4j.Logger;
035 import org.apache.qpid.client.AMQDestination;
036 import org.apache.qpid.client.AMQSession;
037 import org.apache.qpid.test.utils.QpidTestCase;
038
039 import java.util.concurrent.locks.ReentrantLock;
040 import java.util.concurrent.locks.Condition;
041
042 public class TimeToLiveTest extends QpidTestCase
043 {
044 private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class);
045
046 protected final String QUEUE = "TimeToLiveQueue";
047
048 private final long TIME_TO_LIVE = 100L;
049
050 private static final int MSG_COUNT = 50;
051 private static final long SERVER_TTL_TIMEOUT = 60000L;
052
053 public void testPassiveTTL() throws Exception
054 {
055 //Create Client 1
056 Connection clientConnection = getConnection();
057
058 Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
059 Queue queue = clientSession.createQueue(QUEUE);
060
061 // Create then close the consumer so the queue is actually created
062 // Closing it then reopening it ensures that the consumer shouldn't get messages
063 // which should have expired and allows a shorter sleep period. See QPID-1418
064
065 MessageConsumer consumer = clientSession.createConsumer(queue);
066 consumer.close();
067
068 //Create Producer
069 Connection producerConnection = getConnection();
070
071 producerConnection.start();
072
073 Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
074
075 MessageProducer producer = producerSession.createProducer(queue);
076
077 //Set TTL
078 int msg = 0;
079 producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
080
081 producer.setTimeToLive(TIME_TO_LIVE);
082
083 for (; msg < MSG_COUNT - 2; msg++)
084 {
085 producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
086 }
087
088 //Reset TTL
089 producer.setTimeToLive(0L);
090 producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
091
092 consumer = clientSession.createConsumer(queue);
093
094 // Ensure we sleep the required amount of time.
095 ReentrantLock waitLock = new ReentrantLock();
096 Condition wait = waitLock.newCondition();
097 final long MILLIS = 1000000L;
098 long waitTime = TIME_TO_LIVE * MILLIS;
099 while (waitTime > 0)
100 {
101 try
102 {
103 waitLock.lock();
104
105 waitTime = wait.awaitNanos(waitTime);
106 }
107 catch (InterruptedException e)
108 {
109 //Stop if we are interrupted
110 fail(e.getMessage());
111 }
112 finally
113 {
114 waitLock.unlock();
115 }
116
117 }
118
119 clientConnection.start();
120
121 //Receive Message 0
122 Message receivedFirst = consumer.receive(1000);
123 Message receivedSecond = consumer.receive(1000);
124 Message receivedThird = consumer.receive(1000);
125
126 // Only first and last messages sent should survive expiry
127 Assert.assertNull("More messages received", receivedThird);
128
129 Assert.assertNotNull("First message not received", receivedFirst);
130 Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));
131 Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL"));
132
133 Assert.assertNotNull("Final message not received", receivedSecond);
134 Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first"));
135 Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL"));
136
137 clientConnection.close();
138
139 producerConnection.close();
140 }
141
142 private Message nextMessage(String msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
143 {
144 Message send = producerSession.createTextMessage("Message " + msg);
145 send.setBooleanProperty("first", first);
146 send.setLongProperty("TTL", producer.getTimeToLive());
147 return send;
148 }
149
150
151 /**
152 * Tests the expired messages get actively deleted even on queues which have no consumers
153 * @throws Exception
154 */
155 public void testActiveTTL() throws Exception
156 {
157 Connection producerConnection = getConnection();
158 AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
159 Queue queue = producerSession.createTemporaryQueue();
160 producerSession.declareAndBind((AMQDestination) queue);
161 MessageProducer producer = producerSession.createProducer(queue);
162 producer.setTimeToLive(1000L);
163
164 // send Messages
165 for(int i = 0; i < MSG_COUNT; i++)
166 {
167 producer.send(producerSession.createTextMessage("Message: "+i));
168 }
169 long failureTime = System.currentTimeMillis() + 2*SERVER_TTL_TIMEOUT;
170
171 // check Queue depth for up to TIMEOUT seconds
172 long messageCount;
173
174 do
175 {
176 Thread.sleep(100);
177 messageCount = producerSession.getQueueDepth((AMQDestination) queue);
178 }
179 while(messageCount > 0L && System.currentTimeMillis() < failureTime);
180
181 assertEquals("Messages not automatically expired: ", 0L, messageCount);
182
183 producer.close();
184 producerSession.close();
185 producerConnection.close();
186 }
187
188 }
|