001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 *
019 *
020 */
021 package org.apache.qpid.client;
022
023 import java.util.Hashtable;
024 import java.util.concurrent.CountDownLatch;
025 import java.util.concurrent.TimeUnit;
026
027 import javax.jms.Connection;
028 import javax.jms.ConnectionFactory;
029 import javax.jms.JMSException;
030 import javax.jms.Message;
031 import javax.jms.MessageConsumer;
032 import javax.jms.MessageListener;
033 import javax.jms.MessageProducer;
034 import javax.jms.Queue;
035 import javax.jms.Session;
036 import javax.naming.Context;
037 import javax.naming.spi.InitialContextFactory;
038
039 import org.apache.qpid.client.transport.TransportConnection;
040 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
041 import org.apache.qpid.test.utils.QpidTestCase;
042 import org.slf4j.Logger;
043 import org.slf4j.LoggerFactory;
044
045 /**
046 * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
047 * <p/>
048 * The message delivery process:
049 * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
050 * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
051 * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
052 * session can run in any order and a synchronous put/poll will block the dispatcher).
053 * <p/>
054 * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
055 * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
056 */
057 public class DispatcherTest extends QpidTestCase
058 {
059 private static final Logger _logger = LoggerFactory.getLogger(DispatcherTest.class);
060
061 Context _context;
062
063 private static final int MSG_COUNT = 6;
064 private int _receivedCount = 0;
065 private int _receivedCountWhileStopped = 0;
066 private Connection _clientConnection, _producerConnection;
067 private MessageConsumer _consumer;
068 MessageProducer _producer;
069 Session _clientSession, _producerSession;
070
071 private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(1); // all messages Sent Lock
072 private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(1); // all messages Sent Lock
073
074 private volatile boolean _connectionStopped = false;
075
076 protected void setUp() throws Exception
077 {
078 super.setUp();
079
080 InitialContextFactory factory = new PropertiesFileInitialContextFactory();
081
082 Hashtable<String, String> env = new Hashtable<String, String>();
083
084 // Create Client 1
085 _clientConnection = getConnection();
086
087 _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
088
089 Queue queue = _clientSession.createQueue(this.getClass().getName());
090 _consumer = _clientSession.createConsumer(queue);
091
092 // Create Producer
093 _producerConnection = getConnection();
094
095 _producerConnection.start();
096
097 _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
098
099 _producer = _producerSession.createProducer(queue);
100
101 for (int msg = 0; msg < MSG_COUNT; msg++)
102 {
103 _producer.send(_producerSession.createTextMessage("Message " + msg));
104 }
105 }
106
107 protected void tearDown() throws Exception
108 {
109
110 _clientConnection.close();
111
112 _producerConnection.close();
113 super.tearDown();
114 }
115
116 public void testAsynchronousRecieve()
117 {
118 _logger.info("Test Start");
119
120 assertTrue(!((AMQConnection) _clientConnection).started());
121
122 // Set default Message Listener
123 try
124 {
125 _consumer.setMessageListener(new MessageListener()
126 {
127 public void onMessage(Message message)
128 {
129 _logger.info("Client 1 ML 1 Received Message(" + _receivedCount + "):" + message);
130
131 _receivedCount++;
132
133 if (_receivedCount == MSG_COUNT)
134 {
135 _allFirstMessagesSent.countDown();
136 }
137
138 if (_connectionStopped)
139 {
140 _logger.info("Running with Message:" + _receivedCount);
141 }
142
143 if (_connectionStopped && (_allFirstMessagesSent.getCount() == 0))
144 {
145 _receivedCountWhileStopped++;
146 }
147
148 if (_allFirstMessagesSent.getCount() == 0)
149 {
150 if (_receivedCount == (MSG_COUNT * 2))
151 {
152 _allSecondMessagesSent.countDown();
153 }
154 }
155 }
156 });
157
158 assertTrue("Connecion should not be started", !((AMQConnection) _clientConnection).started());
159 _clientConnection.start();
160 }
161 catch (JMSException e)
162 {
163 _logger.error("Error Setting Default ML on consumer1");
164 }
165
166 try
167 {
168 _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS);
169 }
170 catch (InterruptedException e)
171 {
172 // do nothing
173 }
174
175 try
176 {
177 assertTrue("Connecion should be started", ((AMQConnection) _clientConnection).started());
178 _clientConnection.stop();
179 _connectionStopped = true;
180 }
181 catch (JMSException e)
182 {
183 _logger.error("Error stopping connection");
184 }
185
186 try
187 {
188 _logger.error("Send additional messages");
189
190 for (int msg = 0; msg < MSG_COUNT; msg++)
191 {
192 _producer.send(_producerSession.createTextMessage("Message " + msg));
193 }
194 }
195 catch (JMSException e)
196 {
197 _logger.error("Unable to send additional messages", e);
198 }
199
200 try
201 {
202 Thread.sleep(1000);
203 }
204 catch (InterruptedException e)
205 {
206 // ignore
207 }
208
209 try
210 {
211 _logger.info("Restarting connection");
212
213 _connectionStopped = false;
214 _clientConnection.start();
215 }
216 catch (JMSException e)
217 {
218 _logger.error("Error Setting Better ML on consumer1", e);
219 }
220
221 _logger.info("Waiting upto 2 seconds for messages");
222
223 try
224 {
225 _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS);
226 }
227 catch (InterruptedException e)
228 {
229 // do nothing
230 }
231
232 assertEquals("Messages not received correctly", 0, _allFirstMessagesSent.getCount());
233 assertEquals("Messages not received correctly", 0, _allSecondMessagesSent.getCount());
234 assertEquals("Client didn't get all messages", MSG_COUNT * 2, _receivedCount);
235 assertEquals("Messages received while stopped is not 0", 0, _receivedCountWhileStopped);
236 }
237
238 public static junit.framework.Test suite()
239 {
240 return new junit.framework.TestSuite(DispatcherTest.class);
241 }
242 }
|