PriorityTest.java
001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.queue;
022 
023 import junit.framework.TestCase;
024 import junit.framework.Assert;
025 import org.apache.log4j.Logger;
026 import org.apache.qpid.client.transport.TransportConnection;
027 import org.apache.qpid.client.AMQConnection;
028 import org.apache.qpid.client.AMQSession;
029 import org.apache.qpid.client.AMQQueue;
030 import org.apache.qpid.client.AMQDestination;
031 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
032 import org.apache.qpid.test.utils.QpidTestCase;
033 import org.apache.qpid.url.URLSyntaxException;
034 import org.apache.qpid.AMQException;
035 import org.apache.qpid.framing.AMQShortString;
036 import org.apache.qpid.framing.FieldTable;
037 
038 import javax.jms.*;
039 import javax.naming.NamingException;
040 import javax.naming.Context;
041 import javax.naming.spi.InitialContextFactory;
042 import java.util.Hashtable;
043 import java.util.HashMap;
044 import java.util.Map;
045 
046 public class PriorityTest extends QpidTestCase
047 {
048     private static final int TIMEOUT = 1500;
049 
050 
051     private static final Logger _logger = Logger.getLogger(PriorityTest.class);
052 
053     protected final String QUEUE = "PriorityQueue";
054 
055     private static final int MSG_COUNT = 50;
056 
057     private Connection producerConnection;
058     private MessageProducer producer;
059     private Session producerSession;
060     private Queue queue;
061     private Connection consumerConnection;
062     private Session consumerSession;
063 
064 
065     private MessageConsumer consumer;
066     
067     protected void setUp() throws Exception
068     {
069         super.setUp();
070 
071         producerConnection = getConnection();
072         producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
073 
074         producerConnection.start();
075         
076         consumerConnection = getConnection();
077         consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
078         
079     }
080 
081     protected void tearDown() throws Exception
082     {
083         producerConnection.close();
084         consumerConnection.close();
085         super.tearDown();
086     }
087 
088     public void testPriority() throws JMSException, NamingException, AMQException
089     {
090         final Map<String,Object> arguments = new HashMap<String, Object>();
091         arguments.put("x-qpid-priorities",10);
092         ((AMQSessionproducerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
093         queue = new AMQQueue("amq.direct",QUEUE);
094         ((AMQSessionproducerSession).declareAndBind((AMQDestination)queue);
095         producer = producerSession.createProducer(queue);
096 
097         for (int msg = 0; msg < MSG_COUNT; msg++)
098         {
099             producer.setPriority(msg % 10);
100             producer.send(nextMessage(msg, false, producerSession, producer));
101         }
102         producerSession.commit();
103         producer.close();
104         producerSession.close();
105         producerConnection.close();
106 
107         consumer = consumerSession.createConsumer(queue);
108         consumerConnection.start();
109         Message received;
110         int receivedCount = 0;
111         Message previous = null;
112         int messageCount = 0;
113         while((received = consumer.receive(1000))!=null)
114         {   
115             messageCount++;
116             if(previous != null)
117             {
118                 assertTrue("Messages arrived in unexpected order " + messageCount + " " + previous.getIntProperty("msg"" " + received.getIntProperty("msg"" " + previous.getJMSPriority() " " + received.getJMSPriority()(previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg"< received.getIntProperty("msg")) );
119             }
120 
121             previous = received;
122             receivedCount++;
123         }
124 
125         assertEquals("Incorrect number of message received"50, receivedCount);
126     }
127     
128     public void testOddOrdering() throws AMQException, JMSException
129     {
130         final Map<String,Object> arguments = new HashMap<String, Object>();
131         arguments.put("x-qpid-priorities",3);
132         ((AMQSessionproducerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
133         queue = new AMQQueue("amq.direct",QUEUE);
134         ((AMQSessionproducerSession).declareAndBind((AMQDestination)queue);
135         producer = producerSession.createProducer(queue);
136         
137         // In order ABC
138         producer.setPriority(9);
139         producer.send(nextMessage(1, false, producerSession, producer));
140         producer.setPriority(4);
141         producer.send(nextMessage(2, false, producerSession, producer));
142         producer.setPriority(1);
143         producer.send(nextMessage(3, false, producerSession, producer));
144 
145         // Out of order BAC
146         producer.setPriority(4);
147         producer.send(nextMessage(4, false, producerSession, producer));
148         producer.setPriority(9);
149         producer.send(nextMessage(5, false, producerSession, producer));
150         producer.setPriority(1);
151         producer.send(nextMessage(6, false, producerSession, producer));
152 
153         // Out of order BCA 
154         producer.setPriority(4);
155         producer.send(nextMessage(7, false, producerSession, producer));
156         producer.setPriority(1);
157         producer.send(nextMessage(8, false, producerSession, producer));
158         producer.setPriority(9);
159         producer.send(nextMessage(9, false, producerSession, producer));
160         
161         // Reverse order CBA
162         producer.setPriority(1);
163         producer.send(nextMessage(10, false, producerSession, producer));
164         producer.setPriority(4);
165         producer.send(nextMessage(11, false, producerSession, producer));
166         producer.setPriority(9);
167         producer.send(nextMessage(12, false, producerSession, producer));
168         producerSession.commit();
169         
170         consumer = consumerSession.createConsumer(queue);
171         consumerConnection.start();
172         
173         Message msg = consumer.receive(TIMEOUT);
174         assertEquals(1, msg.getIntProperty("msg"));
175         msg = consumer.receive(TIMEOUT);
176         assertEquals(5, msg.getIntProperty("msg"));
177         msg = consumer.receive(TIMEOUT);
178         assertEquals(9, msg.getIntProperty("msg"));
179         msg = consumer.receive(TIMEOUT);
180         assertEquals(12, msg.getIntProperty("msg"));
181         
182         msg = consumer.receive(TIMEOUT);
183         assertEquals(2, msg.getIntProperty("msg"));
184         msg = consumer.receive(TIMEOUT);
185         assertEquals(4, msg.getIntProperty("msg"));
186         msg = consumer.receive(TIMEOUT);
187         assertEquals(7, msg.getIntProperty("msg"));
188         msg = consumer.receive(TIMEOUT);
189         assertEquals(11, msg.getIntProperty("msg"));
190         
191         msg = consumer.receive(TIMEOUT);
192         assertEquals(3, msg.getIntProperty("msg"));
193         msg = consumer.receive(TIMEOUT);
194         assertEquals(6, msg.getIntProperty("msg"));
195         msg = consumer.receive(TIMEOUT);
196         assertEquals(8, msg.getIntProperty("msg"));
197         msg = consumer.receive(TIMEOUT);
198         assertEquals(10, msg.getIntProperty("msg"));
199     }
200 
201     private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producerthrows JMSException
202     {
203         Message send = producerSession.createTextMessage("Message: " + msg);
204         send.setIntProperty("msg", msg);
205 
206         return send;
207     }
208 
209 
210 }