001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021
022 package org.apache.qpid.server.queue;
023
024 import junit.framework.TestCase;
025 import org.apache.log4j.Level;
026 import org.apache.log4j.Logger;
027 import org.apache.log4j.PropertyConfigurator;
028 import org.apache.qpid.AMQException;
029 import org.apache.qpid.client.AMQDestination;
030 import org.apache.qpid.client.AMQSession;
031 import org.apache.qpid.client.transport.TransportConnection;
032 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
033
034 import javax.jms.Connection;
035 import javax.jms.ConnectionFactory;
036 import javax.jms.JMSException;
037 import javax.jms.Message;
038 import javax.jms.MessageConsumer;
039 import javax.jms.MessageProducer;
040 import javax.jms.Queue;
041 import javax.jms.Session;
042 import javax.naming.Context;
043 import javax.naming.NamingException;
044 import javax.naming.spi.InitialContextFactory;
045 import java.util.Hashtable;
046
047 /**
048 * Test Case to ensure that messages are correctly returned.
049 * This includes checking:
050 * - The message is returned.
051 * - The broker doesn't leak memory.
052 * - The broker's state is correct after test.
053 */
054 public class QueueDepthWithSelectorTest extends TestCase
055 {
056 private static final Logger _logger = Logger.getLogger(QueueDepthWithSelectorTest.class);
057
058 protected final String BROKER = "vm://:1";
059 protected final String VHOST = "test";
060 protected final String QUEUE = this.getClass().getName();
061
062 private Context _context;
063
064 private Connection _clientConnection, _producerConnection;
065 private Session _clientSession, _producerSession;
066 private MessageProducer _producer;
067 private MessageConsumer _consumer;
068
069 private static final int MSG_COUNT = 50;
070
071 private Message[] _messages = new Message[MSG_COUNT];
072
073 protected void setUp() throws Exception
074 {
075
076 System.err.println("amqj.logging.level:" + System.getProperty("amqj.logging.level"));
077 System.err.println("_logger.level:" + _logger.getLevel());
078 System.err.println("_logger.isE-Error:" + _logger.isEnabledFor(Level.ERROR));
079 System.err.println("_logger.isE-Warn:" + _logger.isEnabledFor(Level.WARN));
080 System.err.println("_logger.isInfo:" + _logger.isInfoEnabled() + ":" + _logger.isEnabledFor(Level.INFO));
081 System.err.println("_logger.isDebug:" + _logger.isDebugEnabled() + ":" + _logger.isEnabledFor(Level.DEBUG));
082 System.err.println("_logger.isTrace:" + _logger.isTraceEnabled() + ":" + _logger.isEnabledFor(Level.TRACE));
083
084 System.err.println(Logger.getRootLogger().getLoggerRepository());
085
086 if (BROKER.startsWith("vm://"))
087 {
088 TransportConnection.createVMBroker(1);
089 }
090 InitialContextFactory factory = new PropertiesFileInitialContextFactory();
091
092 Hashtable<String, String> env = new Hashtable<String, String>();
093
094 env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID/" + VHOST + "?brokerlist='" + BROKER + "'");
095 env.put("queue.queue", QUEUE);
096
097 _context = factory.getInitialContext(env);
098
099 }
100
101 protected void tearDown() throws Exception
102 {
103 super.tearDown();
104
105 if (_producerConnection != null)
106 {
107 _producerConnection.close();
108 }
109
110 if (_clientConnection != null)
111 {
112 _clientConnection.close();
113 }
114
115 if (BROKER.startsWith("vm://"))
116 {
117 TransportConnection.killAllVMBrokers();
118 }
119 }
120
121 public void test() throws Exception
122 {
123
124 init();
125 //Send messages
126 _logger.info("Starting to send messages");
127 for (int msg = 0; msg < MSG_COUNT; msg++)
128 {
129 _producer.send(nextMessage(msg));
130 }
131 _logger.info("Closing connection");
132 //Close the connection.. .giving the broker time to clean up its state.
133 _producerConnection.close();
134
135 //Verify we get all the messages.
136 _logger.info("Verifying messages");
137 verifyAllMessagesRecevied();
138
139 //Close the connection.. .giving the broker time to clean up its state.
140 _clientConnection.close();
141
142 //Verify Broker state
143 _logger.info("Verifying broker state");
144 verifyBrokerState();
145 }
146
147 private void init() throws NamingException, JMSException
148 {
149 _messages = new Message[MSG_COUNT];
150
151 //Create Producer
152 _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
153 _producerConnection.start();
154 _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
155 _producer = _producerSession.createProducer((Queue) _context.lookup("queue"));
156
157 // Create consumer
158 _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
159 _clientConnection.start();
160 _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
161 _consumer = _clientSession.createConsumer((Queue) _context.lookup("queue"), "key = 23");
162 }
163
164 private void verifyBrokerState()
165 {
166 try
167 {
168 _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
169
170 _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
171 }
172 catch (Exception e)
173 {
174 fail(e.getMessage());
175 }
176
177 try
178 {
179 Thread.sleep(2000);
180 long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _context.lookup("queue"));
181 assertEquals("Session reports Queue depth not as expected", 0, queueDepth);
182 }
183 catch (InterruptedException e)
184 {
185 fail(e.getMessage());
186 }
187 catch (NamingException e)
188 {
189 fail(e.getMessage());
190 }
191 catch (AMQException e)
192 {
193 fail(e.getMessage());
194 }
195 finally
196 {
197 try
198 {
199 _clientConnection.close();
200 }
201 catch (JMSException e)
202 {
203 fail(e.getMessage());
204 }
205 }
206
207 }
208
209 private void verifyAllMessagesRecevied() throws Exception
210 {
211
212 boolean[] msgIdRecevied = new boolean[MSG_COUNT];
213
214 for (int i = 0; i < MSG_COUNT; i++)
215 {
216 _messages[i] = _consumer.receive(1000);
217 assertNotNull("should have received a message but didn't", _messages[i]);
218 }
219 long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _context.lookup("queue"));
220 assertEquals("Session reports Queue depth not as expected", 0, queueDepth);
221
222 //Check received messages
223 int msgId = 0;
224 for (Message msg : _messages)
225 {
226 assertNotNull("Message should not be null", msg);
227 assertEquals("msgId was wrong", msgId, msg.getIntProperty("ID"));
228 assertFalse("Already received msg id " + msgId, msgIdRecevied[msgId]);
229 msgIdRecevied[msgId] = true;
230 msgId++;
231 }
232
233 //Check all received
234 for (msgId = 0; msgId < MSG_COUNT; msgId++)
235 {
236 assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]);
237 }
238 }
239
240 /**
241 * Get the next message putting the given count into the intProperties as ID.
242 *
243 * @param msgNo the message count to store as ID.
244 *
245 * @return
246 *
247 * @throws JMSException
248 */
249
250 private Message nextMessage(int msgNo) throws JMSException
251 {
252 Message send = _producerSession.createTextMessage("MessageReturnTest");
253 send.setIntProperty("ID", msgNo);
254 send.setIntProperty("key", 23);
255 return send;
256 }
257
258 }
|