DispatcherTest.java
001 /*
002  *  Licensed to the Apache Software Foundation (ASF) under one
003  *  or more contributor license agreements.  See the NOTICE file
004  *  distributed with this work for additional information
005  *  regarding copyright ownership.  The ASF licenses this file
006  *  to you under the Apache License, Version 2.0 (the
007  *  "License"); you may not use this file except in compliance
008  *  with the License.  You may obtain a copy of the License at
009  *
010  *    http://www.apache.org/licenses/LICENSE-2.0
011  *
012  *  Unless required by applicable law or agreed to in writing,
013  *  software distributed under the License is distributed on an
014  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015  *  KIND, either express or implied.  See the License for the
016  *  specific language governing permissions and limitations
017  *  under the License.
018  *
019  *
020  */
021 package org.apache.qpid.client;
022 
023 import java.util.Hashtable;
024 import java.util.concurrent.CountDownLatch;
025 import java.util.concurrent.TimeUnit;
026 
027 import javax.jms.Connection;
028 import javax.jms.ConnectionFactory;
029 import javax.jms.JMSException;
030 import javax.jms.Message;
031 import javax.jms.MessageConsumer;
032 import javax.jms.MessageListener;
033 import javax.jms.MessageProducer;
034 import javax.jms.Queue;
035 import javax.jms.Session;
036 import javax.naming.Context;
037 import javax.naming.spi.InitialContextFactory;
038 
039 import org.apache.qpid.client.transport.TransportConnection;
040 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
041 import org.apache.qpid.test.utils.QpidTestCase;
042 import org.slf4j.Logger;
043 import org.slf4j.LoggerFactory;
044 
045 /**
046  * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
047  <p/>
048  * The message delivery process:
049  * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
050  * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
051  * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
052  * session can run in any order and a synchronous put/poll will block the dispatcher).
053  <p/>
054  * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
055  * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
056  */
057 public class DispatcherTest extends QpidTestCase
058 {
059     private static final Logger _logger = LoggerFactory.getLogger(DispatcherTest.class);
060 
061     Context _context;
062 
063     private static final int MSG_COUNT = 6;
064     private int _receivedCount = 0;
065     private int _receivedCountWhileStopped = 0;
066     private Connection _clientConnection, _producerConnection;
067     private MessageConsumer _consumer;
068     MessageProducer _producer;
069     Session _clientSession, _producerSession;
070 
071     private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(1)// all messages Sent Lock
072     private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(1)// all messages Sent Lock
073 
074     private volatile boolean _connectionStopped = false;
075 
076     protected void setUp() throws Exception
077     {
078         super.setUp();
079 
080         InitialContextFactory factory = new PropertiesFileInitialContextFactory();
081 
082         Hashtable<String, String> env = new Hashtable<String, String>();
083 
084         // Create Client 1
085         _clientConnection = getConnection();
086 
087         _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
088 
089         Queue queue = _clientSession.createQueue(this.getClass().getName());
090         _consumer = _clientSession.createConsumer(queue);
091 
092         // Create Producer
093         _producerConnection = getConnection();
094 
095         _producerConnection.start();
096 
097         _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
098 
099         _producer = _producerSession.createProducer(queue);
100 
101         for (int msg = 0; msg < MSG_COUNT; msg++)
102         {
103             _producer.send(_producerSession.createTextMessage("Message " + msg));
104         }
105     }
106 
107     protected void tearDown() throws Exception
108     {
109 
110         _clientConnection.close();
111 
112         _producerConnection.close();
113         super.tearDown();
114     }
115 
116     public void testAsynchronousRecieve()
117     {
118         _logger.info("Test Start");
119 
120         assertTrue(!((AMQConnection_clientConnection).started());
121 
122         // Set default Message Listener
123         try
124         {
125             _consumer.setMessageListener(new MessageListener()
126                 {
127                     public void onMessage(Message message)
128                     {
129                         _logger.info("Client 1 ML 1 Received Message(" + _receivedCount + "):" + message);
130 
131                         _receivedCount++;
132 
133                         if (_receivedCount == MSG_COUNT)
134                         {
135                             _allFirstMessagesSent.countDown();
136                         }
137 
138                         if (_connectionStopped)
139                         {
140                             _logger.info("Running with Message:" + _receivedCount);
141                         }
142 
143                         if (_connectionStopped && (_allFirstMessagesSent.getCount() == 0))
144                         {
145                             _receivedCountWhileStopped++;
146                         }
147 
148                         if (_allFirstMessagesSent.getCount() == 0)
149                         {
150                             if (_receivedCount == (MSG_COUNT * 2))
151                             {
152                                 _allSecondMessagesSent.countDown();
153                             }
154                         }
155                     }
156                 });
157 
158             assertTrue("Connecion should not be started", !((AMQConnection_clientConnection).started());
159             _clientConnection.start();
160         }
161         catch (JMSException e)
162         {
163             _logger.error("Error Setting Default ML on consumer1");
164         }
165 
166         try
167         {
168             _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS);
169         }
170         catch (InterruptedException e)
171         {
172             // do nothing
173         }
174 
175         try
176         {
177             assertTrue("Connecion should be started"((AMQConnection_clientConnection).started());
178             _clientConnection.stop();
179             _connectionStopped = true;
180         }
181         catch (JMSException e)
182         {
183             _logger.error("Error stopping connection");
184         }
185 
186         try
187         {
188             _logger.error("Send additional messages");
189 
190             for (int msg = 0; msg < MSG_COUNT; msg++)
191             {
192                 _producer.send(_producerSession.createTextMessage("Message " + msg));
193             }
194         }
195         catch (JMSException e)
196         {
197             _logger.error("Unable to send additional messages", e);
198         }
199 
200         try
201         {
202             Thread.sleep(1000);
203         }
204         catch (InterruptedException e)
205         {
206             // ignore
207         }
208 
209         try
210         {
211             _logger.info("Restarting connection");
212 
213             _connectionStopped = false;
214             _clientConnection.start();
215         }
216         catch (JMSException e)
217         {
218             _logger.error("Error Setting Better ML on consumer1", e);
219         }
220 
221         _logger.info("Waiting upto 2 seconds for messages");
222 
223         try
224         {
225             _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS);
226         }
227         catch (InterruptedException e)
228         {
229             // do nothing
230         }
231 
232         assertEquals("Messages not received correctly"0, _allFirstMessagesSent.getCount());
233         assertEquals("Messages not received correctly"0, _allSecondMessagesSent.getCount());
234         assertEquals("Client didn't get all messages", MSG_COUNT * 2, _receivedCount);
235         assertEquals("Messages received while stopped is not 0"0, _receivedCountWhileStopped);
236     }
237 
238     public static junit.framework.Test suite()
239     {
240         return new junit.framework.TestSuite(DispatcherTest.class);
241     }
242 }