MessageStoreTool.java
001 /*
002  *  Licensed to the Apache Software Foundation (ASF) under one
003  *  or more contributor license agreements.  See the NOTICE file
004  *  distributed with this work for additional information
005  *  regarding copyright ownership.  The ASF licenses this file
006  *  to you under the Apache License, Version 2.0 (the
007  *  "License"); you may not use this file except in compliance
008  *  with the License.  You may obtain a copy of the License at
009  *
010  *    http://www.apache.org/licenses/LICENSE-2.0
011  *
012  *  Unless required by applicable law or agreed to in writing,
013  *  software distributed under the License is distributed on an
014  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015  *  KIND, either express or implied.  See the License for the
016  *  specific language governing permissions and limitations
017  *  under the License.
018  *
019  *
020  */
021 package org.apache.qpid.tools.messagestore;
022 
023 import org.apache.commons.cli.Option;
024 import org.apache.commons.cli.OptionBuilder;
025 import org.apache.commons.configuration.ConfigurationException;
026 import org.apache.qpid.configuration.Configuration;
027 import org.apache.qpid.server.exchange.Exchange;
028 import org.apache.qpid.server.registry.ApplicationRegistry;
029 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
030 import org.apache.qpid.server.store.MemoryMessageStore;
031 import org.apache.qpid.server.virtualhost.VirtualHost;
032 import org.apache.qpid.server.queue.AMQQueue;
033 import org.apache.qpid.tools.messagestore.commands.Clear;
034 import org.apache.qpid.tools.messagestore.commands.Command;
035 import org.apache.qpid.tools.messagestore.commands.Copy;
036 import org.apache.qpid.tools.messagestore.commands.Dump;
037 import org.apache.qpid.tools.messagestore.commands.Help;
038 import org.apache.qpid.tools.messagestore.commands.List;
039 import org.apache.qpid.tools.messagestore.commands.Load;
040 import org.apache.qpid.tools.messagestore.commands.Quit;
041 import org.apache.qpid.tools.messagestore.commands.Select;
042 import org.apache.qpid.tools.messagestore.commands.Show;
043 import org.apache.qpid.tools.messagestore.commands.Move;
044 import org.apache.qpid.tools.messagestore.commands.Purge;
045 import org.apache.qpid.tools.utils.CommandParser;
046 import org.apache.qpid.tools.utils.Console;
047 import org.apache.qpid.tools.utils.SimpleCommandParser;
048 import org.apache.qpid.tools.utils.SimpleConsole;
049 import org.slf4j.Logger;
050 import org.slf4j.LoggerFactory;
051 
052 import java.io.BufferedReader;
053 import java.io.BufferedWriter;
054 import java.io.File;
055 import java.io.FileReader;
056 import java.io.InputStream;
057 import java.io.InputStreamReader;
058 import java.io.OutputStream;
059 import java.io.OutputStreamWriter;
060 import java.util.Collection;
061 import java.util.HashMap;
062 import java.util.LinkedList;
063 import java.util.Map;
064 import java.util.StringTokenizer;
065 
066 /**
067  * MessageStoreTool.
068  */
069 public class MessageStoreTool
070 {
071     /** Text outputted at the start of each console.*/
072     private static final String BOILER_PLATE = "MessageStoreTool - for examining Persistent Qpid Broker MessageStore instances";
073 
074     /** I/O Wrapper. */
075     protected Console _console;
076 
077     /** Batch mode flag. */
078     protected boolean _batchMode;
079 
080     /** Internal State object. */
081     private State _state = new State();
082 
083     private HashMap<String, Command> _commands = new HashMap<String, Command>();
084 
085     /** SLF4J Logger. */
086     private static Logger _devlog = LoggerFactory.getLogger(MessageStoreTool.class);
087 
088     /** Loaded configuration file. */
089     private Configuration _config;
090 
091     /** Control used for main run loop. */
092     private boolean _running = true;
093     private boolean _initialised = false;
094 
095     //---------------------------------------------------------------------------------------------------/
096 
097     public static void main(String[] argsthrows Configuration.InitException
098     {
099 
100         MessageStoreTool tool = new MessageStoreTool(args);
101 
102         tool.start();
103     }
104 
105 
106     public MessageStoreTool(String[] argsthrows Configuration.InitException
107     {
108         this(args, System.in, System.out);
109     }
110 
111     public MessageStoreTool(String[] args, InputStream in, OutputStream outthrows Configuration.InitException
112     {
113         BufferedReader consoleReader = new BufferedReader(new InputStreamReader(in));
114         BufferedWriter consoleWriter = new BufferedWriter(new OutputStreamWriter(out));
115 
116         Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(this)));
117         _batchMode = false;
118 
119         _console = new SimpleConsole(consoleWriter, consoleReader);
120 
121         _config = new Configuration();
122 
123         setOptions();
124         _config.processCommandline(args);
125     }
126 
127 
128     private void setOptions()
129     {
130         Option help = new Option("h""help", false, "print this message");
131         Option version = new Option("v""version", false, "print the version information and exit");
132         Option configFile =
133                 OptionBuilder.withArgName("file").hasArg()
134                         .withDescription("use given configuration file By "
135                                          "default looks for a file named "
136                                          + Configuration.DEFAULT_CONFIG_FILE + " in " + Configuration.QPID_HOME)
137                         .withLongOpt("config")
138                         .create("c");
139 
140         _config.setOption(help);
141         _config.setOption(version);
142         _config.setOption(configFile);
143     }
144 
145     public State getState()
146     {
147         return _state;
148     }
149 
150     public Map<String, Command> getCommands()
151     {
152         return _commands;
153     }
154 
155     public void setConfigurationFile(String configfilethrows Configuration.InitException
156     {
157         _config.loadConfig(new File(configfile));
158         setup();
159     }
160 
161     public Console getConsole()
162     {
163         return _console;
164     }
165 
166     public void setConsole(Console console)
167     {
168         _console = console;
169     }
170 
171     /**
172      * Simple ShutdownHook to cleanly shutdown the databases
173      */
174     class ShutdownHook implements Runnable
175     {
176         MessageStoreTool _tool;
177 
178         ShutdownHook(MessageStoreTool messageStoreTool)
179         {
180             _tool = messageStoreTool;
181         }
182 
183         public void run()
184         {
185             _tool.quit();
186         }
187     }
188 
189     public void quit()
190     {
191         _running = false;
192 
193         if (_initialised)
194         {
195             ApplicationRegistry.remove(1);
196         }
197 
198         _console.println("...exiting");
199 
200         _console.close();
201     }
202 
203     public void setBatchMode(boolean batchmode)
204     {
205         _batchMode = batchmode;
206     }
207 
208     /**
209      * Main loop
210      */
211     protected void start()
212     {
213         setup();
214 
215         if (!_initialised)
216         {
217             System.exit(1);
218         }
219 
220         _console.println("");
221 
222         _console.println(BOILER_PLATE);        
223 
224         runCLI();
225     }
226 
227     private void setup()
228     {
229         loadDefaultVirtualHosts();
230 
231         loadCommands();
232 
233         _state.clearAll();
234     }
235 
236     private void loadCommands()
237     {
238         _commands.clear();
239         //todo Dynamically load the classes that exis in com.redhat.etp.qpid.commands
240         _commands.put("close"new Clear(this));
241         _commands.put("copy"new Copy(this));
242         _commands.put("dump"new Dump(this));
243         _commands.put("help"new Help(this));
244         _commands.put("list"new List(this));
245         _commands.put("load"new Load(this));
246         _commands.put("move"new Move(this));
247         _commands.put("purge"new Purge(this));
248         _commands.put("quit"new Quit(this));
249         _commands.put("select"new Select(this));
250         _commands.put("show"new Show(this));
251     }
252 
253     private void loadDefaultVirtualHosts()
254     {
255         final File configFile = _config.getConfigFile();
256 
257         loadVirtualHosts(configFile);
258     }
259 
260     private void loadVirtualHosts(File configFile)
261     {
262 
263         if (!configFile.exists())
264         {
265             _devlog.error("Config file not found:" + configFile.getAbsolutePath());
266             return;
267         }
268         else
269         {
270             _devlog.debug("using config file :" + configFile.getAbsolutePath());
271         }
272 
273         try
274         {
275             ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(configFile);
276 
277             ApplicationRegistry.remove(1);
278 
279             ApplicationRegistry.initialise(registry);
280 
281             checkMessageStores();
282             _initialised = true;
283         }
284         catch (ConfigurationException e)
285         {
286             _console.println("Unable to load configuration due to configuration error: " + e.getMessage());
287             e.printStackTrace();
288         }
289         catch (Exception e)
290         {
291             _console.println("Unable to load configuration due to: " + e.getMessage());
292             e.printStackTrace();
293         }
294 
295 
296     }
297 
298     private void checkMessageStores()
299     {
300         Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts();
301 
302         boolean warning = false;
303         for (VirtualHost vhost : vhosts)
304         {
305             if (vhost.getTransactionLog() instanceof MemoryMessageStore)
306             {
307                 _console.println("WARNING: Virtualhost '" + vhost.getName() "' is using a MemoryMessageStore. "
308                                  "Changes will not persist.");
309                 warning = true;
310             }
311         }
312 
313         if (warning)
314         {
315             _console.println("");
316             _console.println("Please ensure you are using the correct config file currently using '"
317                              + _config.getConfigFile().getAbsolutePath() "'");
318             _console.println("New config file can be specifed by 'load <config file>' or -c on the commandline.");
319             _console.println("");
320         }
321     }
322 
323     private void runCLI()
324     {
325         while (_running)
326         {
327             if (!_batchMode)
328             {
329                 printPrompt();
330             }
331 
332             String[] args = _console.readCommand();
333 
334             while (args != null)
335             {
336                 exec(args);
337 
338                 if (_running)
339                 {
340                     if (!_batchMode)
341                     {
342                         printPrompt();
343                     }
344 
345                     args = _console.readCommand();
346                 }
347             }
348         }
349     }
350 
351     private void printPrompt()
352     {
353         _console.print(prompt());
354     }
355 
356 
357     /**
358      * Execute a script (batch mode).
359      *
360      @param script The file script
361      */
362     protected void runScripts(String script)
363     {
364         //Store Current State
365         boolean oldBatch = _batchMode;
366         CommandParser oldParser = _console.getCommandParser();
367         setBatchMode(true);
368 
369         try
370         {
371             _devlog.debug("Running script '" + script + "'");
372 
373             _console.setCommandParser(new SimpleCommandParser(new BufferedReader(new FileReader(script))));
374 
375             start();
376         }
377         catch (java.io.FileNotFoundException e)
378         {
379             _devlog.error("Script not found: '" + script + "' due to:" + e.getMessage());
380         }
381 
382         //Restore previous state
383         _console.setCommandParser(oldParser);
384         setBatchMode(oldBatch);
385     }
386 
387     public String prompt()
388     {
389         String state = _state.toString();
390         if (state != null && state.length() != 0)
391         {
392             return state + ":bdb$ ";
393         }
394         else
395         {
396             return "bdb$ ";
397         }
398     }
399 
400     /**
401      * Execute the command.
402      *
403      @param args [command, arg0, arg1...].
404      */
405     protected void exec(String[] args)
406     {
407         // Comment lines start with a #
408         if (args.length == || args[0].startsWith("#"))
409         {
410             return;
411         }
412 
413         final String command = args[0];
414 
415         Command cmd = _commands.get(command);
416 
417         if (cmd == null)
418         {
419             _console.println("Command not understood: " + command);
420         }
421         else
422         {
423             cmd.execute(args);
424         }
425     }
426 
427 
428     /**
429      * Displays usage info.
430      */
431     protected static void help()
432     {
433         System.out.println(BOILER_PLATE);
434         System.out.println("Usage: java " + MessageStoreTool.class " [Options]");
435         System.out.println("       [-c <broker config file>] : Defaults to \"$QPID_HOME/etc/config.xml\"");
436     }
437 
438 
439     /**
440      * This class is used to store the current state of the tool.
441      *
442      * This is then interrogated by the various commands to augment their behaviour.
443      *
444      *
445      */
446     public class State
447     {
448         private VirtualHost _vhost = null;
449         private AMQQueue _queue = null;
450         private Exchange _exchange = null;
451         private java.util.List<Long> _msgids = null;
452 
453         public State()
454         {
455         }
456 
457         public void setQueue(AMQQueue queue)
458         {
459             _queue = queue;
460         }
461 
462         public AMQQueue getQueue()
463         {
464             return _queue;
465         }
466 
467         public void setVhost(VirtualHost vhost)
468         {
469             _vhost = vhost;
470         }
471 
472         public VirtualHost getVhost()
473         {
474             return _vhost;
475         }
476 
477         public Exchange getExchange()
478         {
479             return _exchange;
480         }
481 
482         public void setExchange(Exchange exchange)
483         {
484             _exchange = exchange;
485         }
486 
487         public String toString()
488         {
489             StringBuilder status = new StringBuilder();
490 
491             if (_vhost != null)
492             {
493                 status.append(_vhost.getName());
494 
495                 if (_exchange != null)
496                 {
497                     status.append("[");
498                     status.append(_exchange.getName());
499                     status.append("]");
500 
501                     if (_queue != null)
502                     {
503                         status.append("->'");
504                         status.append(_queue.getName());
505                         status.append("'");
506 
507                         if (_msgids != null)
508                         {
509                             status.append(printMessages());
510                         }
511                     }
512                 }
513             }
514 
515             return status.toString();
516         }
517 
518 
519         public String printMessages()
520         {
521             StringBuilder sb = new StringBuilder();
522 
523             Long previous = null;
524 
525             Long start = null;
526             for (Long id : _msgids)
527             {
528                 if (previous != null)
529                 {
530                     if (id == previous + 1)
531                     {
532                         if (start == null)
533                         {
534                             start = previous;
535                         }
536                     }
537                     else
538                     {
539                         if (start != null)
540                         {
541                             sb.append(",");
542                             sb.append(start);
543                             sb.append("-");
544                             sb.append(id);
545                             start = null;
546                         }
547                         else
548                         {
549                             sb.append(",");
550                             sb.append(previous);
551                         }
552                     }
553                 }
554 
555                 previous = id;
556             }
557 
558             if (start != null)
559             {
560                 sb.append(",");
561                 sb.append(start);
562                 sb.append("-");
563                 sb.append(_msgids.get(_msgids.size() 1));
564             }
565             else
566             {
567                 sb.append(",");
568                 sb.append(previous);
569             }
570 
571             // surround list in ()
572             sb.replace(01"(");
573             sb.append(")");
574             return sb.toString();
575         }
576 
577         public void clearAll()
578         {
579             _vhost = null;
580             clearExchange();
581         }
582 
583         public void clearExchange()
584         {
585             _exchange = null;
586             clearQueue();
587         }
588 
589         public void clearQueue()
590         {
591             _queue = null;
592             clearMessages();
593         }
594 
595         public void clearMessages()
596         {
597             _msgids = null;
598         }
599 
600         /**
601          * A common location to provide parsing of the message id string
602          * utilised by a number of the commands.
603          * The String is comma separated list of ids that can be individual ids
604          * or a range (4-10)
605          *
606          @param msgString string of msg ids to parse 1,2,4-10
607          */
608         public void setMessages(String msgString)
609         {
610             StringTokenizer tok = new StringTokenizer(msgString, ",");
611 
612             if (tok.hasMoreTokens())
613             {
614                 _msgids = new LinkedList<Long>();
615             }
616 
617             while (tok.hasMoreTokens())
618             {
619                 String next = tok.nextToken();
620                 if (next.contains("-"))
621                 {
622                     Long start = Long.parseLong(next.substring(0, next.indexOf("-")));
623                     Long end = Long.parseLong(next.substring(next.indexOf("-"1));
624 
625                     if (end >= start)
626                     {
627                         for (long l = start; l <= end; l++)
628                         {
629                             _msgids.add(l);
630                         }
631                     }
632                 }
633                 else
634                 {
635                     _msgids.add(Long.parseLong(next));
636                 }
637             }
638 
639         }
640 
641         public void setMessages(java.util.List<Long> msgids)
642         {
643             _msgids = msgids;
644         }
645 
646         public java.util.List<Long> getMessages()
647         {
648             return _msgids;
649         }
650     }//Class State
651 
652 }//Class MessageStoreTool