SlowMessageStore.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.store;
022 
023 import org.apache.commons.configuration.Configuration;
024 import org.apache.log4j.Logger;
025 import org.apache.qpid.AMQException;
026 import org.apache.qpid.framing.AMQShortString;
027 import org.apache.qpid.framing.FieldTable;
028 import org.apache.qpid.framing.abstraction.ContentChunk;
029 import org.apache.qpid.server.virtualhost.VirtualHost;
030 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
031 import org.apache.qpid.server.exchange.Exchange;
032 import org.apache.qpid.server.queue.AMQQueue;
033 import org.apache.qpid.server.queue.MessageMetaData;
034 import org.apache.qpid.server.transactionlog.TransactionLog;
035 import org.apache.qpid.server.routing.RoutingTable;
036 
037 import java.util.HashMap;
038 import java.util.Iterator;
039 
040 public class SlowMessageStore implements TransactionLog, RoutingTable
041 {
042     private static final Logger _logger = Logger.getLogger(SlowMessageStore.class);
043     private static final String DELAYS = "delays";
044     private HashMap<String, Long> _preDelays = new HashMap<String, Long>();
045     private HashMap<String, Long> _postDelays = new HashMap<String, Long>();
046     private long _defaultDelay = 0L;
047     private TransactionLog _realTransactionLog = new MemoryMessageStore();
048     private RoutingTable _realRoutingTable = (RoutingTable)_realTransactionLog;
049     private static final String PRE = "pre";
050     private static final String POST = "post";
051     private String DEFAULT_DELAY = "default";
052 
053     public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration configthrows Exception
054     {
055         _logger.warn("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName());
056         Configuration delays = config.getStoreConfiguration().subset(DELAYS);
057 
058         configureDelays(delays);
059 
060         String transactionLogClass = config.getTransactionLogClass();
061 
062         if (delays.containsKey(DEFAULT_DELAY))
063         {
064             _defaultDelay = delays.getLong(DEFAULT_DELAY);
065             _logger.warn("Delay is:" + _defaultDelay);
066         }
067 
068         if (transactionLogClass != null)
069         {
070             Class clazz = Class.forName(transactionLogClass);
071             if (clazz != this.getClass())
072             {
073 
074                 Object o = clazz.newInstance();
075 
076                 if (!(instanceof TransactionLog))
077                 {
078                     throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class ". Class " + clazz +
079                     " does not.");
080                 }
081                 _realTransactionLog = (TransactionLogo;
082             }
083         }
084         _realTransactionLog.configure(virtualHost, base , config);
085     }
086 
087     private void configureDelays(Configuration config)
088     {
089         Iterator delays = config.getKeys();
090 
091         while (delays.hasNext())
092         {
093             String key = (Stringdelays.next();
094             if (key.endsWith(PRE))
095             {
096                 _preDelays.put(key.substring(0, key.length() - PRE.length() 1), config.getLong(key));
097             }
098             else if (key.endsWith(POST))
099             {
100                 _postDelays.put(key.substring(0, key.length() - POST.length() 1), config.getLong(key));
101             }
102         }
103     }
104 
105     private void doPostDelay(String method)
106     {
107         long delay = lookupDelay(_postDelays, method);
108         doDelay(delay);
109     }
110 
111     private void doPreDelay(String method)
112     {
113         long delay = lookupDelay(_preDelays, method);
114         doDelay(delay);
115     }
116 
117     private long lookupDelay(HashMap<String, Long> delays, String method)
118     {
119         Long delay = delays.get(method);
120         return (delay == null? _defaultDelay : delay;
121     }
122 
123     private void doDelay(long delay)
124     {
125         if (delay > 0)
126         {
127             long start = System.nanoTime();
128             try
129             {
130 
131                 Thread.sleep(delay);
132             }
133             catch (InterruptedException e)
134             {
135                 _logger.warn("Interrupted : " + e);
136             }
137 
138             long slept = (System.nanoTime() - start1000000;
139             
140             if (slept >= delay)
141             {
142                 _logger.info("Done sleep for:" + slept+":"+delay);
143             }
144             else
145             {
146                 _logger.info("Only sleep for:" + slept + " re-sleeping");
147                 doDelay(delay - slept);
148             }
149         }
150     }
151 
152     // ***** MessageStore Interface.
153 
154     public void close() throws Exception
155     {
156         doPreDelay("close");
157         _realTransactionLog.close();
158         doPostDelay("close");
159     }
160 
161     public void createExchange(Exchange exchangethrows AMQException
162     {
163         doPreDelay("createExchange");
164         _realRoutingTable.createExchange(exchange);
165         doPostDelay("createExchange");
166     }
167 
168     public void removeExchange(Exchange exchangethrows AMQException
169     {
170         doPreDelay("removeExchange");
171         _realRoutingTable.removeExchange(exchange);
172         doPostDelay("removeExchange");
173     }
174 
175     public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable argsthrows AMQException
176     {
177         doPreDelay("bindQueue");
178         _realRoutingTable.bindQueue(exchange, routingKey, queue, args);
179         doPostDelay("bindQueue");
180     }
181 
182     public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable argsthrows AMQException
183     {
184         doPreDelay("unbindQueue");
185         _realRoutingTable.unbindQueue(exchange, routingKey, queue, args);
186         doPostDelay("unbindQueue");
187     }
188 
189     public void createQueue(AMQQueue queuethrows AMQException
190     {
191         createQueue(queue, null);
192     }
193 
194     public void createQueue(AMQQueue queue, FieldTable argumentsthrows AMQException
195     {
196         doPreDelay("createQueue");
197         _realRoutingTable.createQueue(queue, arguments);
198         doPostDelay("createQueue");
199     }
200 
201     public void removeQueue(AMQQueue queuethrows AMQException
202     {
203         doPreDelay("removeQueue");
204         _realRoutingTable.removeQueue(queue);
205         doPostDelay("removeQueue");
206     }
207 
208     public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageIdthrows AMQException
209     {
210         doPreDelay("enqueueMessage");
211         _realTransactionLog.enqueueMessage(context, queue, messageId);
212         doPostDelay("enqueueMessage");
213     }
214 
215     public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageIdthrows AMQException
216     {
217         doPreDelay("dequeueMessage");
218         _realTransactionLog.dequeueMessage(context, queue, messageId);
219         doPostDelay("dequeueMessage");
220     }
221 
222     public void beginTran(StoreContext contextthrows AMQException
223     {
224         doPreDelay("beginTran");
225         _realTransactionLog.beginTran(context);
226         doPostDelay("beginTran");
227     }
228 
229     public void commitTran(StoreContext contextthrows AMQException
230     {
231         doPreDelay("commitTran");
232         _realTransactionLog.commitTran(context);
233         doPostDelay("commitTran");
234     }
235 
236     public void abortTran(StoreContext contextthrows AMQException
237     {
238         doPreDelay("abortTran");
239         _realTransactionLog.abortTran(context);
240         doPostDelay("abortTran");
241     }
242 
243     public boolean inTran(StoreContext context)
244     {
245         doPreDelay("inTran");
246         boolean b = _realTransactionLog.inTran(context);
247         doPostDelay("inTran");
248         return b;
249     }
250 
251     public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBodythrows AMQException
252     {
253         doPreDelay("storeContentBodyChunk");
254         _realTransactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
255         doPostDelay("storeContentBodyChunk");
256     }
257 
258     public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaDatathrows AMQException
259     {
260         doPreDelay("storeMessageMetaData");
261         _realTransactionLog.storeMessageMetaData(context, messageId, messageMetaData);
262         doPostDelay("storeMessageMetaData");
263     }
264 
265     public MessageMetaData getMessageMetaData(StoreContext context, Long messageIdthrows AMQException
266     {
267         doPreDelay("getMessageMetaData");
268         MessageMetaData mmd = _realTransactionLog.getMessageMetaData(context, messageId);
269         doPostDelay("getMessageMetaData");
270         return mmd;
271     }
272 
273     public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int indexthrows AMQException
274     {
275         doPreDelay("getContentBodyChunk");
276         ContentChunk c = _realTransactionLog.getContentBodyChunk(context, messageId, index);
277         doPostDelay("getContentBodyChunk");
278         return c;
279     }
280 
281     public boolean isPersistent()
282     {
283         return _realTransactionLog.isPersistent();
284     }
285 
286 }