001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.queue;
022
023 import junit.framework.TestCase;
024 import junit.framework.Assert;
025 import org.apache.log4j.Logger;
026 import org.apache.qpid.client.transport.TransportConnection;
027 import org.apache.qpid.client.AMQConnection;
028 import org.apache.qpid.client.AMQSession;
029 import org.apache.qpid.client.AMQQueue;
030 import org.apache.qpid.client.AMQDestination;
031 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
032 import org.apache.qpid.test.utils.QpidTestCase;
033 import org.apache.qpid.url.URLSyntaxException;
034 import org.apache.qpid.AMQException;
035 import org.apache.qpid.framing.AMQShortString;
036 import org.apache.qpid.framing.FieldTable;
037
038 import javax.jms.*;
039 import javax.naming.NamingException;
040 import javax.naming.Context;
041 import javax.naming.spi.InitialContextFactory;
042 import java.util.Hashtable;
043 import java.util.HashMap;
044 import java.util.Map;
045
046 public class PriorityTest extends QpidTestCase
047 {
048 private static final int TIMEOUT = 1500;
049
050
051 private static final Logger _logger = Logger.getLogger(PriorityTest.class);
052
053 protected final String QUEUE = "PriorityQueue";
054
055 private static final int MSG_COUNT = 50;
056
057 private Connection producerConnection;
058 private MessageProducer producer;
059 private Session producerSession;
060 private Queue queue;
061 private Connection consumerConnection;
062 private Session consumerSession;
063
064
065 private MessageConsumer consumer;
066
067 protected void setUp() throws Exception
068 {
069 super.setUp();
070
071 producerConnection = getConnection();
072 producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
073
074 producerConnection.start();
075
076 consumerConnection = getConnection();
077 consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
078
079 }
080
081 protected void tearDown() throws Exception
082 {
083 producerConnection.close();
084 consumerConnection.close();
085 super.tearDown();
086 }
087
088 public void testPriority() throws JMSException, NamingException, AMQException
089 {
090 final Map<String,Object> arguments = new HashMap<String, Object>();
091 arguments.put("x-qpid-priorities",10);
092 ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
093 queue = new AMQQueue("amq.direct",QUEUE);
094 ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
095 producer = producerSession.createProducer(queue);
096
097 for (int msg = 0; msg < MSG_COUNT; msg++)
098 {
099 producer.setPriority(msg % 10);
100 producer.send(nextMessage(msg, false, producerSession, producer));
101 }
102 producerSession.commit();
103 producer.close();
104 producerSession.close();
105 producerConnection.close();
106
107 consumer = consumerSession.createConsumer(queue);
108 consumerConnection.start();
109 Message received;
110 int receivedCount = 0;
111 Message previous = null;
112 int messageCount = 0;
113 while((received = consumer.receive(1000))!=null)
114 {
115 messageCount++;
116 if(previous != null)
117 {
118 assertTrue("Messages arrived in unexpected order " + messageCount + " " + previous.getIntProperty("msg") + " " + received.getIntProperty("msg") + " " + previous.getJMSPriority() + " " + received.getJMSPriority(), (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) );
119 }
120
121 previous = received;
122 receivedCount++;
123 }
124
125 assertEquals("Incorrect number of message received", 50, receivedCount);
126 }
127
128 public void testOddOrdering() throws AMQException, JMSException
129 {
130 final Map<String,Object> arguments = new HashMap<String, Object>();
131 arguments.put("x-qpid-priorities",3);
132 ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
133 queue = new AMQQueue("amq.direct",QUEUE);
134 ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
135 producer = producerSession.createProducer(queue);
136
137 // In order ABC
138 producer.setPriority(9);
139 producer.send(nextMessage(1, false, producerSession, producer));
140 producer.setPriority(4);
141 producer.send(nextMessage(2, false, producerSession, producer));
142 producer.setPriority(1);
143 producer.send(nextMessage(3, false, producerSession, producer));
144
145 // Out of order BAC
146 producer.setPriority(4);
147 producer.send(nextMessage(4, false, producerSession, producer));
148 producer.setPriority(9);
149 producer.send(nextMessage(5, false, producerSession, producer));
150 producer.setPriority(1);
151 producer.send(nextMessage(6, false, producerSession, producer));
152
153 // Out of order BCA
154 producer.setPriority(4);
155 producer.send(nextMessage(7, false, producerSession, producer));
156 producer.setPriority(1);
157 producer.send(nextMessage(8, false, producerSession, producer));
158 producer.setPriority(9);
159 producer.send(nextMessage(9, false, producerSession, producer));
160
161 // Reverse order CBA
162 producer.setPriority(1);
163 producer.send(nextMessage(10, false, producerSession, producer));
164 producer.setPriority(4);
165 producer.send(nextMessage(11, false, producerSession, producer));
166 producer.setPriority(9);
167 producer.send(nextMessage(12, false, producerSession, producer));
168 producerSession.commit();
169
170 consumer = consumerSession.createConsumer(queue);
171 consumerConnection.start();
172
173 Message msg = consumer.receive(TIMEOUT);
174 assertEquals(1, msg.getIntProperty("msg"));
175 msg = consumer.receive(TIMEOUT);
176 assertEquals(5, msg.getIntProperty("msg"));
177 msg = consumer.receive(TIMEOUT);
178 assertEquals(9, msg.getIntProperty("msg"));
179 msg = consumer.receive(TIMEOUT);
180 assertEquals(12, msg.getIntProperty("msg"));
181
182 msg = consumer.receive(TIMEOUT);
183 assertEquals(2, msg.getIntProperty("msg"));
184 msg = consumer.receive(TIMEOUT);
185 assertEquals(4, msg.getIntProperty("msg"));
186 msg = consumer.receive(TIMEOUT);
187 assertEquals(7, msg.getIntProperty("msg"));
188 msg = consumer.receive(TIMEOUT);
189 assertEquals(11, msg.getIntProperty("msg"));
190
191 msg = consumer.receive(TIMEOUT);
192 assertEquals(3, msg.getIntProperty("msg"));
193 msg = consumer.receive(TIMEOUT);
194 assertEquals(6, msg.getIntProperty("msg"));
195 msg = consumer.receive(TIMEOUT);
196 assertEquals(8, msg.getIntProperty("msg"));
197 msg = consumer.receive(TIMEOUT);
198 assertEquals(10, msg.getIntProperty("msg"));
199 }
200
201 private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
202 {
203 Message send = producerSession.createTextMessage("Message: " + msg);
204 send.setIntProperty("msg", msg);
205
206 return send;
207 }
208
209
210 }
|