Move.java
001 /*
002  *  Licensed to the Apache Software Foundation (ASF) under one
003  *  or more contributor license agreements.  See the NOTICE file
004  *  distributed with this work for additional information
005  *  regarding copyright ownership.  The ASF licenses this file
006  *  to you under the Apache License, Version 2.0 (the
007  *  "License"); you may not use this file except in compliance
008  *  with the License.  You may obtain a copy of the License at
009  *
010  *    http://www.apache.org/licenses/LICENSE-2.0
011  *
012  *  Unless required by applicable law or agreed to in writing,
013  *  software distributed under the License is distributed on an
014  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015  *  KIND, either express or implied.  See the License for the
016  *  specific language governing permissions and limitations
017  *  under the License.    
018  *
019  
020  */
021 package org.apache.qpid.tools.messagestore.commands;
022 
023 import org.apache.qpid.framing.AMQShortString;
024 import org.apache.qpid.server.queue.QueueEntryImpl;
025 import org.apache.qpid.server.queue.AMQQueue;
026 import org.apache.qpid.server.queue.QueueEntry;
027 import org.apache.qpid.server.store.StoreContext;
028 import org.apache.qpid.tools.messagestore.MessageStoreTool;
029 
030 import java.util.LinkedList;
031 import java.util.List;
032 
033 public class Move extends AbstractCommand
034 {
035 
036     /**
037      * Since the Coopy command is not associated with a real channel we can safely create our own store context
038      * for use in the few methods that require one.
039      */
040     protected StoreContext _storeContext = new StoreContext();
041 
042     public Move(MessageStoreTool tool)
043     {
044         super(tool);
045     }
046 
047     public String help()
048     {
049         return "Move messages between queues.";/*\n" +
050                "The currently selected message set will be moved to the specifed queue.\n" +
051                "Alternatively the values can be provided on the command line.";*/
052     }
053 
054     public String usage()
055     {
056         return "move to=<queue> [from=<queue>] [msgids=<msgids eg, 1,2,4-10>]";
057     }
058 
059     public String getCommand()
060     {
061         return "move";
062     }
063 
064     public void execute(String... args)
065     {
066         AMQQueue toQueue = null;
067         AMQQueue fromQueue = _tool.getState().getQueue();
068         java.util.List<Long> msgids = _tool.getState().getMessages();
069 
070         if (args.length >= 2)
071         {
072             for (String arg : args)
073             {
074                 if (arg.startsWith("to="))
075                 {
076                     String queueName = arg.substring(arg.indexOf("="1);
077                     toQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName));
078                 }
079 
080                 if (arg.startsWith("from="))
081                 {
082                     String queueName = arg.substring(arg.indexOf("="1);
083                     fromQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName));
084                 }
085 
086                 if (arg.startsWith("msgids="))
087                 {
088                     String msgidStr = arg.substring(arg.indexOf("="1);
089 
090                     // Record the current message selection
091                     java.util.List<Long> currentIDs = _tool.getState().getMessages();
092 
093                     // Use the ToolState class to perform the messasge parsing
094                     _tool.getState().setMessages(msgidStr);
095                     msgids = _tool.getState().getMessages();
096 
097                     // Reset the original selection of messages
098                     _tool.getState().setMessages(currentIDs);
099                 }
100             }
101         }
102 
103         if (!checkRequirements(fromQueue, toQueue, msgids))
104         {
105             return;
106         }
107 
108         processIDs(fromQueue, toQueue, msgids);
109     }
110 
111     private void processIDs(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids)
112     {
113         Long previous = null;
114         Long start = null;
115 
116         if (msgids == null)
117         {
118             msgids = allMessageIDs(fromQueue);
119         }
120 
121         if (msgids == null || msgids.size() == 0)
122         {
123             _console.println("No Messages to move.");
124             return;
125         }
126 
127         for (long id : msgids)
128         {
129             if (previous != null)
130             {
131                 if (id == previous + 1)
132                 {
133                     if (start == null)
134                     {
135                         start = previous;
136                     }
137                 }
138                 else
139                 {
140                     if (start != null)
141                     {
142                         //move a range of ids
143                         doCommand(fromQueue, start, id, toQueue);
144                         start = null;
145                     }
146                     else
147                     {
148                         //move a single id
149                         doCommand(fromQueue, id, id, toQueue);
150                     }
151                 }
152             }
153 
154             previous = id;
155         }
156 
157         if (start != null)
158         {
159             //move a range of ids
160             doCommand(fromQueue, start, previous, toQueue);
161         }
162     }
163 
164     private List<Long> allMessageIDs(AMQQueue fromQueue)
165     {
166         List<Long> ids = new LinkedList<Long>();
167 
168         if (fromQueue != null)
169         {
170             List<QueueEntry> messages = fromQueue.getMessagesOnTheQueue();
171             if (messages != null)
172             {
173                 for (QueueEntry msg : messages)
174                 {
175                     ids.add(msg.getMessage().getMessageId());
176                 }
177             }
178         }
179 
180         return ids;
181     }
182 
183     protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, List<Long> msgids)
184     {
185         if (toQueue == null)
186         {
187             _console.println("Destination queue not specifed.");
188             _console.println(usage());
189             return false;
190         }
191 
192         if (fromQueue == null)
193         {
194             _console.println("Source queue not specifed.");
195             _console.println(usage());
196             return false;
197         }
198 
199         return true;
200     }
201 
202     protected void doCommand(AMQQueue fromQueue, long start, long id, AMQQueue toQueue)
203     {
204         fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), _storeContext);
205     }
206 }