001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.store;
022
023 import org.apache.commons.configuration.Configuration;
024 import org.apache.log4j.Logger;
025 import org.apache.qpid.AMQException;
026 import org.apache.qpid.framing.AMQShortString;
027 import org.apache.qpid.framing.FieldTable;
028 import org.apache.qpid.framing.abstraction.ContentChunk;
029 import org.apache.qpid.server.virtualhost.VirtualHost;
030 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
031 import org.apache.qpid.server.exchange.Exchange;
032 import org.apache.qpid.server.queue.AMQQueue;
033 import org.apache.qpid.server.queue.MessageMetaData;
034 import org.apache.qpid.server.transactionlog.TransactionLog;
035 import org.apache.qpid.server.routing.RoutingTable;
036
037 import java.util.HashMap;
038 import java.util.Iterator;
039
040 public class SlowMessageStore implements TransactionLog, RoutingTable
041 {
042 private static final Logger _logger = Logger.getLogger(SlowMessageStore.class);
043 private static final String DELAYS = "delays";
044 private HashMap<String, Long> _preDelays = new HashMap<String, Long>();
045 private HashMap<String, Long> _postDelays = new HashMap<String, Long>();
046 private long _defaultDelay = 0L;
047 private TransactionLog _realTransactionLog = new MemoryMessageStore();
048 private RoutingTable _realRoutingTable = (RoutingTable)_realTransactionLog;
049 private static final String PRE = "pre";
050 private static final String POST = "post";
051 private String DEFAULT_DELAY = "default";
052
053 public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
054 {
055 _logger.warn("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName());
056 Configuration delays = config.getStoreConfiguration().subset(DELAYS);
057
058 configureDelays(delays);
059
060 String transactionLogClass = config.getTransactionLogClass();
061
062 if (delays.containsKey(DEFAULT_DELAY))
063 {
064 _defaultDelay = delays.getLong(DEFAULT_DELAY);
065 _logger.warn("Delay is:" + _defaultDelay);
066 }
067
068 if (transactionLogClass != null)
069 {
070 Class clazz = Class.forName(transactionLogClass);
071 if (clazz != this.getClass())
072 {
073
074 Object o = clazz.newInstance();
075
076 if (!(o instanceof TransactionLog))
077 {
078 throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz +
079 " does not.");
080 }
081 _realTransactionLog = (TransactionLog) o;
082 }
083 }
084 _realTransactionLog.configure(virtualHost, base , config);
085 }
086
087 private void configureDelays(Configuration config)
088 {
089 Iterator delays = config.getKeys();
090
091 while (delays.hasNext())
092 {
093 String key = (String) delays.next();
094 if (key.endsWith(PRE))
095 {
096 _preDelays.put(key.substring(0, key.length() - PRE.length() - 1), config.getLong(key));
097 }
098 else if (key.endsWith(POST))
099 {
100 _postDelays.put(key.substring(0, key.length() - POST.length() - 1), config.getLong(key));
101 }
102 }
103 }
104
105 private void doPostDelay(String method)
106 {
107 long delay = lookupDelay(_postDelays, method);
108 doDelay(delay);
109 }
110
111 private void doPreDelay(String method)
112 {
113 long delay = lookupDelay(_preDelays, method);
114 doDelay(delay);
115 }
116
117 private long lookupDelay(HashMap<String, Long> delays, String method)
118 {
119 Long delay = delays.get(method);
120 return (delay == null) ? _defaultDelay : delay;
121 }
122
123 private void doDelay(long delay)
124 {
125 if (delay > 0)
126 {
127 long start = System.nanoTime();
128 try
129 {
130
131 Thread.sleep(delay);
132 }
133 catch (InterruptedException e)
134 {
135 _logger.warn("Interrupted : " + e);
136 }
137
138 long slept = (System.nanoTime() - start) / 1000000;
139
140 if (slept >= delay)
141 {
142 _logger.info("Done sleep for:" + slept+":"+delay);
143 }
144 else
145 {
146 _logger.info("Only sleep for:" + slept + " re-sleeping");
147 doDelay(delay - slept);
148 }
149 }
150 }
151
152 // ***** MessageStore Interface.
153
154 public void close() throws Exception
155 {
156 doPreDelay("close");
157 _realTransactionLog.close();
158 doPostDelay("close");
159 }
160
161 public void createExchange(Exchange exchange) throws AMQException
162 {
163 doPreDelay("createExchange");
164 _realRoutingTable.createExchange(exchange);
165 doPostDelay("createExchange");
166 }
167
168 public void removeExchange(Exchange exchange) throws AMQException
169 {
170 doPreDelay("removeExchange");
171 _realRoutingTable.removeExchange(exchange);
172 doPostDelay("removeExchange");
173 }
174
175 public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
176 {
177 doPreDelay("bindQueue");
178 _realRoutingTable.bindQueue(exchange, routingKey, queue, args);
179 doPostDelay("bindQueue");
180 }
181
182 public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
183 {
184 doPreDelay("unbindQueue");
185 _realRoutingTable.unbindQueue(exchange, routingKey, queue, args);
186 doPostDelay("unbindQueue");
187 }
188
189 public void createQueue(AMQQueue queue) throws AMQException
190 {
191 createQueue(queue, null);
192 }
193
194 public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
195 {
196 doPreDelay("createQueue");
197 _realRoutingTable.createQueue(queue, arguments);
198 doPostDelay("createQueue");
199 }
200
201 public void removeQueue(AMQQueue queue) throws AMQException
202 {
203 doPreDelay("removeQueue");
204 _realRoutingTable.removeQueue(queue);
205 doPostDelay("removeQueue");
206 }
207
208 public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
209 {
210 doPreDelay("enqueueMessage");
211 _realTransactionLog.enqueueMessage(context, queue, messageId);
212 doPostDelay("enqueueMessage");
213 }
214
215 public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
216 {
217 doPreDelay("dequeueMessage");
218 _realTransactionLog.dequeueMessage(context, queue, messageId);
219 doPostDelay("dequeueMessage");
220 }
221
222 public void beginTran(StoreContext context) throws AMQException
223 {
224 doPreDelay("beginTran");
225 _realTransactionLog.beginTran(context);
226 doPostDelay("beginTran");
227 }
228
229 public void commitTran(StoreContext context) throws AMQException
230 {
231 doPreDelay("commitTran");
232 _realTransactionLog.commitTran(context);
233 doPostDelay("commitTran");
234 }
235
236 public void abortTran(StoreContext context) throws AMQException
237 {
238 doPreDelay("abortTran");
239 _realTransactionLog.abortTran(context);
240 doPostDelay("abortTran");
241 }
242
243 public boolean inTran(StoreContext context)
244 {
245 doPreDelay("inTran");
246 boolean b = _realTransactionLog.inTran(context);
247 doPostDelay("inTran");
248 return b;
249 }
250
251 public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
252 {
253 doPreDelay("storeContentBodyChunk");
254 _realTransactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
255 doPostDelay("storeContentBodyChunk");
256 }
257
258 public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
259 {
260 doPreDelay("storeMessageMetaData");
261 _realTransactionLog.storeMessageMetaData(context, messageId, messageMetaData);
262 doPostDelay("storeMessageMetaData");
263 }
264
265 public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
266 {
267 doPreDelay("getMessageMetaData");
268 MessageMetaData mmd = _realTransactionLog.getMessageMetaData(context, messageId);
269 doPostDelay("getMessageMetaData");
270 return mmd;
271 }
272
273 public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
274 {
275 doPreDelay("getContentBodyChunk");
276 ContentChunk c = _realTransactionLog.getContentBodyChunk(context, messageId, index);
277 doPostDelay("getContentBodyChunk");
278 return c;
279 }
280
281 public boolean isPersistent()
282 {
283 return _realTransactionLog.isPersistent();
284 }
285
286 }
|