001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 *
019 *
020 */
021 package org.apache.qpid.tools.messagestore;
022
023 import org.apache.commons.cli.Option;
024 import org.apache.commons.cli.OptionBuilder;
025 import org.apache.commons.configuration.ConfigurationException;
026 import org.apache.qpid.configuration.Configuration;
027 import org.apache.qpid.server.exchange.Exchange;
028 import org.apache.qpid.server.registry.ApplicationRegistry;
029 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
030 import org.apache.qpid.server.store.MemoryMessageStore;
031 import org.apache.qpid.server.virtualhost.VirtualHost;
032 import org.apache.qpid.server.queue.AMQQueue;
033 import org.apache.qpid.tools.messagestore.commands.Clear;
034 import org.apache.qpid.tools.messagestore.commands.Command;
035 import org.apache.qpid.tools.messagestore.commands.Copy;
036 import org.apache.qpid.tools.messagestore.commands.Dump;
037 import org.apache.qpid.tools.messagestore.commands.Help;
038 import org.apache.qpid.tools.messagestore.commands.List;
039 import org.apache.qpid.tools.messagestore.commands.Load;
040 import org.apache.qpid.tools.messagestore.commands.Quit;
041 import org.apache.qpid.tools.messagestore.commands.Select;
042 import org.apache.qpid.tools.messagestore.commands.Show;
043 import org.apache.qpid.tools.messagestore.commands.Move;
044 import org.apache.qpid.tools.messagestore.commands.Purge;
045 import org.apache.qpid.tools.utils.CommandParser;
046 import org.apache.qpid.tools.utils.Console;
047 import org.apache.qpid.tools.utils.SimpleCommandParser;
048 import org.apache.qpid.tools.utils.SimpleConsole;
049 import org.slf4j.Logger;
050 import org.slf4j.LoggerFactory;
051
052 import java.io.BufferedReader;
053 import java.io.BufferedWriter;
054 import java.io.File;
055 import java.io.FileReader;
056 import java.io.InputStream;
057 import java.io.InputStreamReader;
058 import java.io.OutputStream;
059 import java.io.OutputStreamWriter;
060 import java.util.Collection;
061 import java.util.HashMap;
062 import java.util.LinkedList;
063 import java.util.Map;
064 import java.util.StringTokenizer;
065
066 /**
067 * MessageStoreTool.
068 */
069 public class MessageStoreTool
070 {
071 /** Text outputted at the start of each console.*/
072 private static final String BOILER_PLATE = "MessageStoreTool - for examining Persistent Qpid Broker MessageStore instances";
073
074 /** I/O Wrapper. */
075 protected Console _console;
076
077 /** Batch mode flag. */
078 protected boolean _batchMode;
079
080 /** Internal State object. */
081 private State _state = new State();
082
083 private HashMap<String, Command> _commands = new HashMap<String, Command>();
084
085 /** SLF4J Logger. */
086 private static Logger _devlog = LoggerFactory.getLogger(MessageStoreTool.class);
087
088 /** Loaded configuration file. */
089 private Configuration _config;
090
091 /** Control used for main run loop. */
092 private boolean _running = true;
093 private boolean _initialised = false;
094
095 //---------------------------------------------------------------------------------------------------/
096
097 public static void main(String[] args) throws Configuration.InitException
098 {
099
100 MessageStoreTool tool = new MessageStoreTool(args);
101
102 tool.start();
103 }
104
105
106 public MessageStoreTool(String[] args) throws Configuration.InitException
107 {
108 this(args, System.in, System.out);
109 }
110
111 public MessageStoreTool(String[] args, InputStream in, OutputStream out) throws Configuration.InitException
112 {
113 BufferedReader consoleReader = new BufferedReader(new InputStreamReader(in));
114 BufferedWriter consoleWriter = new BufferedWriter(new OutputStreamWriter(out));
115
116 Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(this)));
117 _batchMode = false;
118
119 _console = new SimpleConsole(consoleWriter, consoleReader);
120
121 _config = new Configuration();
122
123 setOptions();
124 _config.processCommandline(args);
125 }
126
127
128 private void setOptions()
129 {
130 Option help = new Option("h", "help", false, "print this message");
131 Option version = new Option("v", "version", false, "print the version information and exit");
132 Option configFile =
133 OptionBuilder.withArgName("file").hasArg()
134 .withDescription("use given configuration file By "
135 + "default looks for a file named "
136 + Configuration.DEFAULT_CONFIG_FILE + " in " + Configuration.QPID_HOME)
137 .withLongOpt("config")
138 .create("c");
139
140 _config.setOption(help);
141 _config.setOption(version);
142 _config.setOption(configFile);
143 }
144
145 public State getState()
146 {
147 return _state;
148 }
149
150 public Map<String, Command> getCommands()
151 {
152 return _commands;
153 }
154
155 public void setConfigurationFile(String configfile) throws Configuration.InitException
156 {
157 _config.loadConfig(new File(configfile));
158 setup();
159 }
160
161 public Console getConsole()
162 {
163 return _console;
164 }
165
166 public void setConsole(Console console)
167 {
168 _console = console;
169 }
170
171 /**
172 * Simple ShutdownHook to cleanly shutdown the databases
173 */
174 class ShutdownHook implements Runnable
175 {
176 MessageStoreTool _tool;
177
178 ShutdownHook(MessageStoreTool messageStoreTool)
179 {
180 _tool = messageStoreTool;
181 }
182
183 public void run()
184 {
185 _tool.quit();
186 }
187 }
188
189 public void quit()
190 {
191 _running = false;
192
193 if (_initialised)
194 {
195 ApplicationRegistry.remove(1);
196 }
197
198 _console.println("...exiting");
199
200 _console.close();
201 }
202
203 public void setBatchMode(boolean batchmode)
204 {
205 _batchMode = batchmode;
206 }
207
208 /**
209 * Main loop
210 */
211 protected void start()
212 {
213 setup();
214
215 if (!_initialised)
216 {
217 System.exit(1);
218 }
219
220 _console.println("");
221
222 _console.println(BOILER_PLATE);
223
224 runCLI();
225 }
226
227 private void setup()
228 {
229 loadDefaultVirtualHosts();
230
231 loadCommands();
232
233 _state.clearAll();
234 }
235
236 private void loadCommands()
237 {
238 _commands.clear();
239 //todo Dynamically load the classes that exis in com.redhat.etp.qpid.commands
240 _commands.put("close", new Clear(this));
241 _commands.put("copy", new Copy(this));
242 _commands.put("dump", new Dump(this));
243 _commands.put("help", new Help(this));
244 _commands.put("list", new List(this));
245 _commands.put("load", new Load(this));
246 _commands.put("move", new Move(this));
247 _commands.put("purge", new Purge(this));
248 _commands.put("quit", new Quit(this));
249 _commands.put("select", new Select(this));
250 _commands.put("show", new Show(this));
251 }
252
253 private void loadDefaultVirtualHosts()
254 {
255 final File configFile = _config.getConfigFile();
256
257 loadVirtualHosts(configFile);
258 }
259
260 private void loadVirtualHosts(File configFile)
261 {
262
263 if (!configFile.exists())
264 {
265 _devlog.error("Config file not found:" + configFile.getAbsolutePath());
266 return;
267 }
268 else
269 {
270 _devlog.debug("using config file :" + configFile.getAbsolutePath());
271 }
272
273 try
274 {
275 ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(configFile);
276
277 ApplicationRegistry.remove(1);
278
279 ApplicationRegistry.initialise(registry);
280
281 checkMessageStores();
282 _initialised = true;
283 }
284 catch (ConfigurationException e)
285 {
286 _console.println("Unable to load configuration due to configuration error: " + e.getMessage());
287 e.printStackTrace();
288 }
289 catch (Exception e)
290 {
291 _console.println("Unable to load configuration due to: " + e.getMessage());
292 e.printStackTrace();
293 }
294
295
296 }
297
298 private void checkMessageStores()
299 {
300 Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts();
301
302 boolean warning = false;
303 for (VirtualHost vhost : vhosts)
304 {
305 if (vhost.getTransactionLog() instanceof MemoryMessageStore)
306 {
307 _console.println("WARNING: Virtualhost '" + vhost.getName() + "' is using a MemoryMessageStore. "
308 + "Changes will not persist.");
309 warning = true;
310 }
311 }
312
313 if (warning)
314 {
315 _console.println("");
316 _console.println("Please ensure you are using the correct config file currently using '"
317 + _config.getConfigFile().getAbsolutePath() + "'");
318 _console.println("New config file can be specifed by 'load <config file>' or -c on the commandline.");
319 _console.println("");
320 }
321 }
322
323 private void runCLI()
324 {
325 while (_running)
326 {
327 if (!_batchMode)
328 {
329 printPrompt();
330 }
331
332 String[] args = _console.readCommand();
333
334 while (args != null)
335 {
336 exec(args);
337
338 if (_running)
339 {
340 if (!_batchMode)
341 {
342 printPrompt();
343 }
344
345 args = _console.readCommand();
346 }
347 }
348 }
349 }
350
351 private void printPrompt()
352 {
353 _console.print(prompt());
354 }
355
356
357 /**
358 * Execute a script (batch mode).
359 *
360 * @param script The file script
361 */
362 protected void runScripts(String script)
363 {
364 //Store Current State
365 boolean oldBatch = _batchMode;
366 CommandParser oldParser = _console.getCommandParser();
367 setBatchMode(true);
368
369 try
370 {
371 _devlog.debug("Running script '" + script + "'");
372
373 _console.setCommandParser(new SimpleCommandParser(new BufferedReader(new FileReader(script))));
374
375 start();
376 }
377 catch (java.io.FileNotFoundException e)
378 {
379 _devlog.error("Script not found: '" + script + "' due to:" + e.getMessage());
380 }
381
382 //Restore previous state
383 _console.setCommandParser(oldParser);
384 setBatchMode(oldBatch);
385 }
386
387 public String prompt()
388 {
389 String state = _state.toString();
390 if (state != null && state.length() != 0)
391 {
392 return state + ":bdb$ ";
393 }
394 else
395 {
396 return "bdb$ ";
397 }
398 }
399
400 /**
401 * Execute the command.
402 *
403 * @param args [command, arg0, arg1...].
404 */
405 protected void exec(String[] args)
406 {
407 // Comment lines start with a #
408 if (args.length == 0 || args[0].startsWith("#"))
409 {
410 return;
411 }
412
413 final String command = args[0];
414
415 Command cmd = _commands.get(command);
416
417 if (cmd == null)
418 {
419 _console.println("Command not understood: " + command);
420 }
421 else
422 {
423 cmd.execute(args);
424 }
425 }
426
427
428 /**
429 * Displays usage info.
430 */
431 protected static void help()
432 {
433 System.out.println(BOILER_PLATE);
434 System.out.println("Usage: java " + MessageStoreTool.class + " [Options]");
435 System.out.println(" [-c <broker config file>] : Defaults to \"$QPID_HOME/etc/config.xml\"");
436 }
437
438
439 /**
440 * This class is used to store the current state of the tool.
441 *
442 * This is then interrogated by the various commands to augment their behaviour.
443 *
444 *
445 */
446 public class State
447 {
448 private VirtualHost _vhost = null;
449 private AMQQueue _queue = null;
450 private Exchange _exchange = null;
451 private java.util.List<Long> _msgids = null;
452
453 public State()
454 {
455 }
456
457 public void setQueue(AMQQueue queue)
458 {
459 _queue = queue;
460 }
461
462 public AMQQueue getQueue()
463 {
464 return _queue;
465 }
466
467 public void setVhost(VirtualHost vhost)
468 {
469 _vhost = vhost;
470 }
471
472 public VirtualHost getVhost()
473 {
474 return _vhost;
475 }
476
477 public Exchange getExchange()
478 {
479 return _exchange;
480 }
481
482 public void setExchange(Exchange exchange)
483 {
484 _exchange = exchange;
485 }
486
487 public String toString()
488 {
489 StringBuilder status = new StringBuilder();
490
491 if (_vhost != null)
492 {
493 status.append(_vhost.getName());
494
495 if (_exchange != null)
496 {
497 status.append("[");
498 status.append(_exchange.getName());
499 status.append("]");
500
501 if (_queue != null)
502 {
503 status.append("->'");
504 status.append(_queue.getName());
505 status.append("'");
506
507 if (_msgids != null)
508 {
509 status.append(printMessages());
510 }
511 }
512 }
513 }
514
515 return status.toString();
516 }
517
518
519 public String printMessages()
520 {
521 StringBuilder sb = new StringBuilder();
522
523 Long previous = null;
524
525 Long start = null;
526 for (Long id : _msgids)
527 {
528 if (previous != null)
529 {
530 if (id == previous + 1)
531 {
532 if (start == null)
533 {
534 start = previous;
535 }
536 }
537 else
538 {
539 if (start != null)
540 {
541 sb.append(",");
542 sb.append(start);
543 sb.append("-");
544 sb.append(id);
545 start = null;
546 }
547 else
548 {
549 sb.append(",");
550 sb.append(previous);
551 }
552 }
553 }
554
555 previous = id;
556 }
557
558 if (start != null)
559 {
560 sb.append(",");
561 sb.append(start);
562 sb.append("-");
563 sb.append(_msgids.get(_msgids.size() - 1));
564 }
565 else
566 {
567 sb.append(",");
568 sb.append(previous);
569 }
570
571 // surround list in ()
572 sb.replace(0, 1, "(");
573 sb.append(")");
574 return sb.toString();
575 }
576
577 public void clearAll()
578 {
579 _vhost = null;
580 clearExchange();
581 }
582
583 public void clearExchange()
584 {
585 _exchange = null;
586 clearQueue();
587 }
588
589 public void clearQueue()
590 {
591 _queue = null;
592 clearMessages();
593 }
594
595 public void clearMessages()
596 {
597 _msgids = null;
598 }
599
600 /**
601 * A common location to provide parsing of the message id string
602 * utilised by a number of the commands.
603 * The String is comma separated list of ids that can be individual ids
604 * or a range (4-10)
605 *
606 * @param msgString string of msg ids to parse 1,2,4-10
607 */
608 public void setMessages(String msgString)
609 {
610 StringTokenizer tok = new StringTokenizer(msgString, ",");
611
612 if (tok.hasMoreTokens())
613 {
614 _msgids = new LinkedList<Long>();
615 }
616
617 while (tok.hasMoreTokens())
618 {
619 String next = tok.nextToken();
620 if (next.contains("-"))
621 {
622 Long start = Long.parseLong(next.substring(0, next.indexOf("-")));
623 Long end = Long.parseLong(next.substring(next.indexOf("-") + 1));
624
625 if (end >= start)
626 {
627 for (long l = start; l <= end; l++)
628 {
629 _msgids.add(l);
630 }
631 }
632 }
633 else
634 {
635 _msgids.add(Long.parseLong(next));
636 }
637 }
638
639 }
640
641 public void setMessages(java.util.List<Long> msgids)
642 {
643 _msgids = msgids;
644 }
645
646 public java.util.List<Long> getMessages()
647 {
648 return _msgids;
649 }
650 }//Class State
651
652 }//Class MessageStoreTool
|