QueueDepthWithSelectorTest.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 
022 package org.apache.qpid.server.queue;
023 
024 import junit.framework.TestCase;
025 import org.apache.log4j.Level;
026 import org.apache.log4j.Logger;
027 import org.apache.log4j.PropertyConfigurator;
028 import org.apache.qpid.AMQException;
029 import org.apache.qpid.client.AMQDestination;
030 import org.apache.qpid.client.AMQSession;
031 import org.apache.qpid.client.transport.TransportConnection;
032 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
033 
034 import javax.jms.Connection;
035 import javax.jms.ConnectionFactory;
036 import javax.jms.JMSException;
037 import javax.jms.Message;
038 import javax.jms.MessageConsumer;
039 import javax.jms.MessageProducer;
040 import javax.jms.Queue;
041 import javax.jms.Session;
042 import javax.naming.Context;
043 import javax.naming.NamingException;
044 import javax.naming.spi.InitialContextFactory;
045 import java.util.Hashtable;
046 
047 /**
048  * Test Case to ensure that messages are correctly returned.
049  * This includes checking:
050  * - The message is returned.
051  * - The broker doesn't leak memory.
052  * - The broker's state is correct after test.
053  */
054 public class QueueDepthWithSelectorTest extends TestCase
055 {
056     private static final Logger _logger = Logger.getLogger(QueueDepthWithSelectorTest.class);
057 
058     protected final String BROKER = "vm://:1";
059     protected final String VHOST = "test";
060     protected final String QUEUE = this.getClass().getName();
061 
062     private Context _context;
063 
064     private Connection _clientConnection, _producerConnection;
065     private Session _clientSession, _producerSession;
066     private MessageProducer _producer;
067     private MessageConsumer _consumer;
068 
069     private static final int MSG_COUNT = 50;
070 
071     private Message[] _messages = new Message[MSG_COUNT];
072 
073     protected void setUp() throws Exception
074     {
075 
076         System.err.println("amqj.logging.level:" + System.getProperty("amqj.logging.level"));
077         System.err.println("_logger.level:" + _logger.getLevel());
078         System.err.println("_logger.isE-Error:" + _logger.isEnabledFor(Level.ERROR));
079         System.err.println("_logger.isE-Warn:" + _logger.isEnabledFor(Level.WARN));
080         System.err.println("_logger.isInfo:" + _logger.isInfoEnabled() ":" + _logger.isEnabledFor(Level.INFO));
081         System.err.println("_logger.isDebug:" + _logger.isDebugEnabled() ":" + _logger.isEnabledFor(Level.DEBUG));
082         System.err.println("_logger.isTrace:" + _logger.isTraceEnabled() ":" + _logger.isEnabledFor(Level.TRACE));
083 
084         System.err.println(Logger.getRootLogger().getLoggerRepository());
085 
086         if (BROKER.startsWith("vm://"))
087         {
088             TransportConnection.createVMBroker(1);
089         }
090         InitialContextFactory factory = new PropertiesFileInitialContextFactory();
091 
092         Hashtable<String, String> env = new Hashtable<String, String>();
093 
094         env.put("connectionfactory.connection""amqp://guest:guest@TTL_TEST_ID/" + VHOST + "?brokerlist='" + BROKER + "'");
095         env.put("queue.queue", QUEUE);
096 
097         _context = factory.getInitialContext(env);
098 
099     }
100 
101     protected void tearDown() throws Exception
102     {
103         super.tearDown();
104 
105         if (_producerConnection != null)
106         {
107             _producerConnection.close();
108         }
109 
110         if (_clientConnection != null)
111         {
112             _clientConnection.close();
113         }
114 
115         if (BROKER.startsWith("vm://"))
116         {
117             TransportConnection.killAllVMBrokers();
118         }
119     }
120 
121     public void test() throws Exception
122     {
123 
124         init();
125         //Send messages
126         _logger.info("Starting to send messages");
127         for (int msg = 0; msg < MSG_COUNT; msg++)
128         {
129             _producer.send(nextMessage(msg));
130         }
131         _logger.info("Closing connection");
132         //Close the connection.. .giving the broker time to clean up its state.
133         _producerConnection.close();
134 
135         //Verify we get all the messages.
136         _logger.info("Verifying messages");
137         verifyAllMessagesRecevied();
138 
139         //Close the connection.. .giving the broker time to clean up its state.
140         _clientConnection.close();
141 
142         //Verify Broker state
143         _logger.info("Verifying broker state");
144         verifyBrokerState();
145     }
146 
147     private void init() throws NamingException, JMSException
148     {
149         _messages = new Message[MSG_COUNT];
150 
151         //Create Producer
152         _producerConnection = ((ConnectionFactory_context.lookup("connection")).createConnection();
153         _producerConnection.start();
154         _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
155         _producer = _producerSession.createProducer((Queue_context.lookup("queue"));
156 
157         // Create consumer
158         _clientConnection = ((ConnectionFactory_context.lookup("connection")).createConnection();
159         _clientConnection.start();
160         _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
161         _consumer = _clientSession.createConsumer((Queue_context.lookup("queue")"key = 23");
162     }
163 
164     private void verifyBrokerState()
165     {
166         try
167         {
168             _clientConnection = ((ConnectionFactory_context.lookup("connection")).createConnection();
169 
170             _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
171         }
172         catch (Exception e)
173         {
174             fail(e.getMessage());
175         }
176 
177         try
178         {
179             Thread.sleep(2000);
180             long queueDepth = ((AMQSession_clientSession).getQueueDepth((AMQDestination_context.lookup("queue"));
181             assertEquals("Session reports Queue depth not as expected"0, queueDepth);
182         }
183         catch (InterruptedException e)
184         {
185             fail(e.getMessage());
186         }
187         catch (NamingException e)
188         {
189             fail(e.getMessage());
190         }
191         catch (AMQException e)
192         {
193             fail(e.getMessage());
194         }
195         finally
196         {
197             try
198             {
199                 _clientConnection.close();
200             }
201             catch (JMSException e)
202             {
203                 fail(e.getMessage());
204             }
205         }
206 
207     }
208 
209     private void verifyAllMessagesRecevied() throws Exception
210     {
211 
212         boolean[] msgIdRecevied = new boolean[MSG_COUNT];
213 
214         for (int i = 0; i < MSG_COUNT; i++)
215         {
216             _messages[i= _consumer.receive(1000);
217             assertNotNull("should have received a message but didn't", _messages[i]);
218         }
219         long queueDepth = ((AMQSession_clientSession).getQueueDepth((AMQDestination_context.lookup("queue"));
220         assertEquals("Session reports Queue depth not as expected"0, queueDepth);
221 
222         //Check received messages
223         int msgId = 0;
224         for (Message msg : _messages)
225         {
226             assertNotNull("Message should not be null", msg);
227             assertEquals("msgId was wrong", msgId, msg.getIntProperty("ID"));
228             assertFalse("Already received msg id " + msgId, msgIdRecevied[msgId]);
229             msgIdRecevied[msgIdtrue;
230             msgId++;
231         }
232 
233         //Check all received
234         for (msgId = 0; msgId < MSG_COUNT; msgId++)
235         {
236             assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]);
237         }
238     }
239 
240     /**
241      * Get the next message putting the given count into the intProperties as ID.
242      *
243      @param msgNo the message count to store as ID.
244      *
245      @return
246      *
247      @throws JMSException
248      */
249 
250     private Message nextMessage(int msgNothrows JMSException
251     {
252         Message send = _producerSession.createTextMessage("MessageReturnTest");
253         send.setIntProperty("ID", msgNo);
254         send.setIntProperty("key"23);
255         return send;
256     }
257 
258 }