001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021
022 package org.apache.qpid.server.failure;
023
024 import junit.framework.TestCase;
025 import org.apache.qpid.test.utils.QpidClientConnectionHelper;
026 import org.apache.qpid.client.failover.FailoverException;
027 import org.apache.qpid.AMQException;
028 import org.apache.qpid.protocol.AMQConstant;
029 import org.apache.log4j.Logger;
030
031 import javax.jms.JMSException;
032 import javax.jms.DeliveryMode;
033 import java.io.IOException;
034
035
036 /** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */
037 public class HeapExhaustion extends TestCase
038 {
039 private static final Logger _logger = Logger.getLogger(HeapExhaustion.class);
040
041 protected QpidClientConnectionHelper conn;
042 protected final String BROKER = "localhost";
043 protected final String vhost = "/test";
044 protected final String queue = "direct://amq.direct//queue";
045
046 protected String hundredK;
047 protected String megabyte;
048
049 protected String generatePayloadOfSize(Integer numBytes)
050 {
051 return new String(new byte[numBytes]);
052 }
053
054 protected void setUp() throws Exception
055 {
056 conn = new QpidClientConnectionHelper(BROKER);
057 conn.setVirtualHost(vhost);
058
059 try
060 {
061 conn.connect();
062 } catch (JMSException e)
063 {
064 e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
065 }
066 // clear queue
067 _logger.debug("setup: clearing test queue");
068 conn.consume(queue, 2000);
069
070 hundredK = generatePayloadOfSize(1024 * 100);
071 megabyte = generatePayloadOfSize(1024 * 1024);
072 }
073
074 protected void tearDown() throws Exception
075 {
076 conn.disconnect();
077 }
078
079
080 /**
081 * PUT at maximum rate (although we commit after each PUT) until failure
082 *
083 * @throws Exception on error
084 */
085 public void testUntilFailureTransient() throws Exception
086 {
087 int copies = 0;
088 int total = 0;
089 String payload = hundredK;
090 int size = payload.getBytes().length;
091 while (true)
092 {
093 conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT);
094 copies++;
095 total += size;
096 System.out.println("put copy " + copies + " OK for total bytes: " + total);
097 }
098 }
099
100 /**
101 * PUT at lower rate (5 per second) until failure
102 *
103 * @throws Exception on error
104 */
105 public void testUntilFailureWithDelaysTransient() throws Exception
106 {
107 int copies = 0;
108 int total = 0;
109 String payload = hundredK;
110 int size = payload.getBytes().length;
111 while (true)
112 {
113 conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT);
114 copies++;
115 total += size;
116 System.out.println("put copy " + copies + " OK for total bytes: " + total);
117 Thread.sleep(200);
118 }
119 }
120
121 public static void noDelay()
122 {
123 HeapExhaustion he = new HeapExhaustion();
124
125 try
126 {
127 he.setUp();
128 }
129 catch (Exception e)
130 {
131 _logger.info("Unable to connect");
132 System.exit(0);
133 }
134
135 try
136 {
137 _logger.info("Running testUntilFailure");
138 try
139 {
140 he.testUntilFailureTransient();
141 }
142 catch (FailoverException fe)
143 {
144 _logger.error("Caught failover:" + fe);
145 }
146 _logger.info("Finishing Connection ");
147
148 try
149 {
150 he.tearDown();
151 }
152 catch (JMSException jmse)
153 {
154 if (((AMQException) jmse.getLinkedException()).getErrorCode() == AMQConstant.REQUEST_TIMEOUT)
155 {
156 _logger.info("Successful test of testUntilFailure");
157 }
158 else
159 {
160 _logger.error("Test Failed due to:" + jmse);
161 }
162 }
163 }
164 catch (Exception e)
165 {
166 _logger.error("Test Failed due to:" + e);
167 }
168 }
169
170 public static void withDelay()
171 {
172 HeapExhaustion he = new HeapExhaustion();
173
174 try
175 {
176 he.setUp();
177 }
178 catch (Exception e)
179 {
180 _logger.info("Unable to connect");
181 System.exit(0);
182 }
183
184 try
185 {
186 _logger.info("Running testUntilFailure");
187 try
188 {
189 he.testUntilFailureWithDelaysTransient();
190 }
191 catch (FailoverException fe)
192 {
193 _logger.error("Caught failover:" + fe);
194 }
195 _logger.info("Finishing Connection ");
196
197 try
198 {
199 he.tearDown();
200 }
201 catch (JMSException jmse)
202 {
203 if (((AMQException) jmse.getLinkedException()).getErrorCode() == AMQConstant.REQUEST_TIMEOUT)
204 {
205 _logger.info("Successful test of testUntilFailure");
206 }
207 else
208 {
209 _logger.error("Test Failed due to:" + jmse);
210 }
211 }
212 }
213 catch (Exception e)
214 {
215 _logger.error("Test Failed due to:" + e);
216 }
217 }
218
219 public static void main(String args[])
220 {
221 noDelay();
222
223
224 try
225 {
226 System.out.println("Restart failed broker now to retest broker with delays in send.");
227 System.in.read();
228 }
229 catch (IOException e)
230 {
231 _logger.info("Continuing");
232 }
233
234 withDelay();
235 }
236 }
|