AMQQueueDeferredOrderingTest.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client;
022 
023 import javax.jms.Connection;
024 import javax.jms.Session;
025 import javax.jms.JMSException;
026 import javax.jms.Message;
027 import javax.jms.MessageConsumer;
028 import javax.jms.MessageProducer;
029 import javax.jms.TextMessage;
030 
031 import org.apache.qpid.framing.AMQShortString;
032 import org.apache.qpid.test.utils.QpidTestCase;
033 import org.apache.qpid.client.transport.TransportConnection;
034 import org.slf4j.Logger;
035 import org.slf4j.LoggerFactory;
036 
037 public class AMQQueueDeferredOrderingTest extends QpidTestCase
038 {
039 
040     private static final int NUM_MESSAGES = 1000;
041 
042     private Connection con;
043     private Session session;
044     private AMQQueue queue;
045     private MessageConsumer consumer;
046 
047     private static final Logger _logger = LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class);
048 
049     private ASyncProducer producerThread;
050 
051     private class ASyncProducer extends Thread
052     {
053 
054         private MessageProducer producer;
055         private final Logger _logger = LoggerFactory.getLogger(ASyncProducer.class);
056         private Session session;
057         private int start;
058         private int end;
059 
060         public ASyncProducer(AMQQueue q, int start, int endthrows Exception
061         {
062             this.session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
063             this._logger.info("Create Consumer of Q1");
064             this.producer = this.session.createProducer(q);
065             this.start = start;
066             this.end = end;
067         }
068 
069         public void run()
070         {
071             try
072             {
073                 this._logger.info("Starting to send messages");
074                 for (int i = start; i < end && !interrupted(); i++)
075                 {
076                     producer.send(session.createTextMessage(Integer.toString(i)));
077                 }
078                 this._logger.info("Sent " (end - start" messages");
079             }
080             catch (JMSException e)
081             {
082                 throw new RuntimeException(e);
083             }
084         }
085     }
086 
087     protected void setUp() throws Exception
088     {
089         super.setUp();
090         TransportConnection.createVMBroker(1);
091 
092         _logger.info("Create Connection");
093         con = getConnection();
094         _logger.info("Create Session");
095         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
096         _logger.info("Create Q");
097         queue = new AMQQueue(new AMQShortString("amq.direct")new AMQShortString("Q")new AMQShortString("Q"),
098                 false, true);
099         _logger.info("Create Consumer of Q");
100         consumer = session.createConsumer(queue);
101         _logger.info("Start Connection");
102         con.start();
103     }
104 
105     public void testPausedOrder() throws Exception
106     {
107 
108         // Setup initial messages
109         _logger.info("Creating first producer thread");
110         producerThread = new ASyncProducer(queue, 0, NUM_MESSAGES / 2);
111         producerThread.start();
112         // Wait for them to be done
113         producerThread.join();
114 
115         // Setup second set of messages to produce while we consume
116         _logger.info("Creating second producer thread");
117         producerThread = new ASyncProducer(queue, NUM_MESSAGES / 2, NUM_MESSAGES);
118         producerThread.start();
119 
120         // Start consuming and checking they're in order
121         _logger.info("Consuming messages");
122         for (int i = 0; i < NUM_MESSAGES; i++)
123         {
124             Message msg = consumer.receive(3000);
125             assertNotNull("Message should not be null", msg);
126             assertTrue("Message should be a text message", msg instanceof TextMessage);
127             assertEquals("Message content does not match expected", Integer.toString(i)((TextMessagemsg).getText());
128         }
129     }
130 
131     protected void tearDown() throws Exception
132     {
133         _logger.info("Interuptting producer thread");
134         producerThread.interrupt();
135         _logger.info("Closing connection");
136         con.close();
137 
138         TransportConnection.killAllVMBrokers();
139         super.tearDown();
140     }
141 
142     public static junit.framework.Test suite()
143     {
144         return new junit.framework.TestSuite(AMQQueueDeferredOrderingTest.class);
145     }
146 
147 }