HeapExhaustion.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 
022 package org.apache.qpid.server.failure;
023 
024 import junit.framework.TestCase;
025 import org.apache.qpid.test.utils.QpidClientConnectionHelper;
026 import org.apache.qpid.client.failover.FailoverException;
027 import org.apache.qpid.AMQException;
028 import org.apache.qpid.protocol.AMQConstant;
029 import org.apache.log4j.Logger;
030 
031 import javax.jms.JMSException;
032 import javax.jms.DeliveryMode;
033 import java.io.IOException;
034 
035 
036 /** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */
037 public class HeapExhaustion extends TestCase
038 {
039     private static final Logger _logger = Logger.getLogger(HeapExhaustion.class);
040 
041     protected QpidClientConnectionHelper conn;
042     protected final String BROKER = "localhost";
043     protected final String vhost = "/test";
044     protected final String queue = "direct://amq.direct//queue";
045 
046     protected String hundredK;
047     protected String megabyte;
048 
049     protected String generatePayloadOfSize(Integer numBytes)
050     {
051         return new String(new byte[numBytes]);
052     }
053 
054     protected void setUp() throws Exception
055     {
056         conn = new QpidClientConnectionHelper(BROKER);
057         conn.setVirtualHost(vhost);
058 
059         try
060         {
061             conn.connect();
062         catch (JMSException e)
063         {
064             e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
065         }
066         // clear queue
067         _logger.debug("setup: clearing test queue");
068         conn.consume(queue, 2000);
069 
070         hundredK = generatePayloadOfSize(1024 100);
071         megabyte = generatePayloadOfSize(1024 1024);
072     }
073 
074     protected void tearDown() throws Exception
075     {
076         conn.disconnect();
077     }
078 
079 
080     /**
081      * PUT at maximum rate (although we commit after each PUT) until failure
082      *
083      @throws Exception on error
084      */
085     public void testUntilFailureTransient() throws Exception
086     {
087         int copies = 0;
088         int total = 0;
089         String payload = hundredK;
090         int size = payload.getBytes().length;
091         while (true)
092         {
093             conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT);
094             copies++;
095             total += size;
096             System.out.println("put copy " + copies + " OK for total bytes: " + total);
097         }
098     }
099 
100     /**
101      * PUT at lower rate (5 per second) until failure
102      *
103      @throws Exception on error
104      */
105     public void testUntilFailureWithDelaysTransient() throws Exception
106     {
107         int copies = 0;
108         int total = 0;
109         String payload = hundredK;
110         int size = payload.getBytes().length;
111         while (true)
112         {
113             conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT);
114             copies++;
115             total += size;
116             System.out.println("put copy " + copies + " OK for total bytes: " + total);
117             Thread.sleep(200);
118         }
119     }
120 
121     public static void noDelay()
122     {
123         HeapExhaustion he = new HeapExhaustion();
124 
125         try
126         {
127             he.setUp();
128         }
129         catch (Exception e)
130         {
131             _logger.info("Unable to connect");
132             System.exit(0);
133         }
134 
135         try
136         {
137             _logger.info("Running testUntilFailure");
138             try
139             {
140                 he.testUntilFailureTransient();
141             }
142             catch (FailoverException fe)
143             {
144                 _logger.error("Caught failover:" + fe);
145             }
146             _logger.info("Finishing Connection ");
147 
148             try
149             {
150                 he.tearDown();
151             }
152             catch (JMSException jmse)
153             {
154                 if (((AMQExceptionjmse.getLinkedException()).getErrorCode() == AMQConstant.REQUEST_TIMEOUT)
155                 {
156                     _logger.info("Successful test of testUntilFailure");
157                 }
158                 else
159                 {
160                     _logger.error("Test Failed due to:" + jmse);
161                 }
162             }
163         }
164         catch (Exception e)
165         {
166             _logger.error("Test Failed due to:" + e);
167         }
168     }
169 
170     public static void withDelay()
171     {
172         HeapExhaustion he = new HeapExhaustion();
173 
174         try
175         {
176             he.setUp();
177         }
178         catch (Exception e)
179         {
180             _logger.info("Unable to connect");
181             System.exit(0);
182         }
183 
184         try
185         {
186             _logger.info("Running testUntilFailure");
187             try
188             {
189                 he.testUntilFailureWithDelaysTransient();
190             }
191             catch (FailoverException fe)
192             {
193                 _logger.error("Caught failover:" + fe);
194             }
195             _logger.info("Finishing Connection ");
196 
197             try
198             {
199                 he.tearDown();
200             }
201             catch (JMSException jmse)
202             {
203                 if (((AMQExceptionjmse.getLinkedException()).getErrorCode() == AMQConstant.REQUEST_TIMEOUT)
204                 {
205                     _logger.info("Successful test of testUntilFailure");
206                 }
207                 else
208                 {
209                     _logger.error("Test Failed due to:" + jmse);
210                 }
211             }
212         }
213         catch (Exception e)
214         {
215             _logger.error("Test Failed due to:" + e);
216         }
217     }
218 
219     public static void main(String args[])
220     {
221         noDelay();
222 
223 
224         try
225         {
226             System.out.println("Restart failed broker now to retest broker with delays in send.");
227             System.in.read();
228         }
229         catch (IOException e)
230         {
231             _logger.info("Continuing");
232         }
233 
234         withDelay();
235     }
236 }