001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client;
022
023 import javax.jms.Connection;
024 import javax.jms.Session;
025 import javax.jms.JMSException;
026 import javax.jms.Message;
027 import javax.jms.MessageConsumer;
028 import javax.jms.MessageProducer;
029 import javax.jms.TextMessage;
030
031 import org.apache.qpid.framing.AMQShortString;
032 import org.apache.qpid.test.utils.QpidTestCase;
033 import org.apache.qpid.client.transport.TransportConnection;
034 import org.slf4j.Logger;
035 import org.slf4j.LoggerFactory;
036
037 public class AMQQueueDeferredOrderingTest extends QpidTestCase
038 {
039
040 private static final int NUM_MESSAGES = 1000;
041
042 private Connection con;
043 private Session session;
044 private AMQQueue queue;
045 private MessageConsumer consumer;
046
047 private static final Logger _logger = LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class);
048
049 private ASyncProducer producerThread;
050
051 private class ASyncProducer extends Thread
052 {
053
054 private MessageProducer producer;
055 private final Logger _logger = LoggerFactory.getLogger(ASyncProducer.class);
056 private Session session;
057 private int start;
058 private int end;
059
060 public ASyncProducer(AMQQueue q, int start, int end) throws Exception
061 {
062 this.session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
063 this._logger.info("Create Consumer of Q1");
064 this.producer = this.session.createProducer(q);
065 this.start = start;
066 this.end = end;
067 }
068
069 public void run()
070 {
071 try
072 {
073 this._logger.info("Starting to send messages");
074 for (int i = start; i < end && !interrupted(); i++)
075 {
076 producer.send(session.createTextMessage(Integer.toString(i)));
077 }
078 this._logger.info("Sent " + (end - start) + " messages");
079 }
080 catch (JMSException e)
081 {
082 throw new RuntimeException(e);
083 }
084 }
085 }
086
087 protected void setUp() throws Exception
088 {
089 super.setUp();
090 TransportConnection.createVMBroker(1);
091
092 _logger.info("Create Connection");
093 con = getConnection();
094 _logger.info("Create Session");
095 session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
096 _logger.info("Create Q");
097 queue = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString("Q"), new AMQShortString("Q"),
098 false, true);
099 _logger.info("Create Consumer of Q");
100 consumer = session.createConsumer(queue);
101 _logger.info("Start Connection");
102 con.start();
103 }
104
105 public void testPausedOrder() throws Exception
106 {
107
108 // Setup initial messages
109 _logger.info("Creating first producer thread");
110 producerThread = new ASyncProducer(queue, 0, NUM_MESSAGES / 2);
111 producerThread.start();
112 // Wait for them to be done
113 producerThread.join();
114
115 // Setup second set of messages to produce while we consume
116 _logger.info("Creating second producer thread");
117 producerThread = new ASyncProducer(queue, NUM_MESSAGES / 2, NUM_MESSAGES);
118 producerThread.start();
119
120 // Start consuming and checking they're in order
121 _logger.info("Consuming messages");
122 for (int i = 0; i < NUM_MESSAGES; i++)
123 {
124 Message msg = consumer.receive(3000);
125 assertNotNull("Message should not be null", msg);
126 assertTrue("Message should be a text message", msg instanceof TextMessage);
127 assertEquals("Message content does not match expected", Integer.toString(i), ((TextMessage) msg).getText());
128 }
129 }
130
131 protected void tearDown() throws Exception
132 {
133 _logger.info("Interuptting producer thread");
134 producerThread.interrupt();
135 _logger.info("Closing connection");
136 con.close();
137
138 TransportConnection.killAllVMBrokers();
139 super.tearDown();
140 }
141
142 public static junit.framework.Test suite()
143 {
144 return new junit.framework.TestSuite(AMQQueueDeferredOrderingTest.class);
145 }
146
147 }
|