001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 *
019 *
020 */
021 package org.apache.qpid.tools.messagestore.commands;
022
023 import org.apache.qpid.framing.AMQShortString;
024 import org.apache.qpid.server.queue.QueueEntryImpl;
025 import org.apache.qpid.server.queue.AMQQueue;
026 import org.apache.qpid.server.queue.QueueEntry;
027 import org.apache.qpid.server.store.StoreContext;
028 import org.apache.qpid.tools.messagestore.MessageStoreTool;
029
030 import java.util.LinkedList;
031 import java.util.List;
032
033 public class Move extends AbstractCommand
034 {
035
036 /**
037 * Since the Coopy command is not associated with a real channel we can safely create our own store context
038 * for use in the few methods that require one.
039 */
040 protected StoreContext _storeContext = new StoreContext();
041
042 public Move(MessageStoreTool tool)
043 {
044 super(tool);
045 }
046
047 public String help()
048 {
049 return "Move messages between queues.";/*\n" +
050 "The currently selected message set will be moved to the specifed queue.\n" +
051 "Alternatively the values can be provided on the command line.";*/
052 }
053
054 public String usage()
055 {
056 return "move to=<queue> [from=<queue>] [msgids=<msgids eg, 1,2,4-10>]";
057 }
058
059 public String getCommand()
060 {
061 return "move";
062 }
063
064 public void execute(String... args)
065 {
066 AMQQueue toQueue = null;
067 AMQQueue fromQueue = _tool.getState().getQueue();
068 java.util.List<Long> msgids = _tool.getState().getMessages();
069
070 if (args.length >= 2)
071 {
072 for (String arg : args)
073 {
074 if (arg.startsWith("to="))
075 {
076 String queueName = arg.substring(arg.indexOf("=") + 1);
077 toQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName));
078 }
079
080 if (arg.startsWith("from="))
081 {
082 String queueName = arg.substring(arg.indexOf("=") + 1);
083 fromQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName));
084 }
085
086 if (arg.startsWith("msgids="))
087 {
088 String msgidStr = arg.substring(arg.indexOf("=") + 1);
089
090 // Record the current message selection
091 java.util.List<Long> currentIDs = _tool.getState().getMessages();
092
093 // Use the ToolState class to perform the messasge parsing
094 _tool.getState().setMessages(msgidStr);
095 msgids = _tool.getState().getMessages();
096
097 // Reset the original selection of messages
098 _tool.getState().setMessages(currentIDs);
099 }
100 }
101 }
102
103 if (!checkRequirements(fromQueue, toQueue, msgids))
104 {
105 return;
106 }
107
108 processIDs(fromQueue, toQueue, msgids);
109 }
110
111 private void processIDs(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids)
112 {
113 Long previous = null;
114 Long start = null;
115
116 if (msgids == null)
117 {
118 msgids = allMessageIDs(fromQueue);
119 }
120
121 if (msgids == null || msgids.size() == 0)
122 {
123 _console.println("No Messages to move.");
124 return;
125 }
126
127 for (long id : msgids)
128 {
129 if (previous != null)
130 {
131 if (id == previous + 1)
132 {
133 if (start == null)
134 {
135 start = previous;
136 }
137 }
138 else
139 {
140 if (start != null)
141 {
142 //move a range of ids
143 doCommand(fromQueue, start, id, toQueue);
144 start = null;
145 }
146 else
147 {
148 //move a single id
149 doCommand(fromQueue, id, id, toQueue);
150 }
151 }
152 }
153
154 previous = id;
155 }
156
157 if (start != null)
158 {
159 //move a range of ids
160 doCommand(fromQueue, start, previous, toQueue);
161 }
162 }
163
164 private List<Long> allMessageIDs(AMQQueue fromQueue)
165 {
166 List<Long> ids = new LinkedList<Long>();
167
168 if (fromQueue != null)
169 {
170 List<QueueEntry> messages = fromQueue.getMessagesOnTheQueue();
171 if (messages != null)
172 {
173 for (QueueEntry msg : messages)
174 {
175 ids.add(msg.getMessage().getMessageId());
176 }
177 }
178 }
179
180 return ids;
181 }
182
183 protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, List<Long> msgids)
184 {
185 if (toQueue == null)
186 {
187 _console.println("Destination queue not specifed.");
188 _console.println(usage());
189 return false;
190 }
191
192 if (fromQueue == null)
193 {
194 _console.println("Source queue not specifed.");
195 _console.println(usage());
196 return false;
197 }
198
199 return true;
200 }
201
202 protected void doCommand(AMQQueue fromQueue, long start, long id, AMQQueue toQueue)
203 {
204 fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), _storeContext);
205 }
206 }
|