001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server;
022
023 import java.io.File;
024 import java.io.IOException;
025 import java.net.BindException;
026 import java.net.InetAddress;
027 import java.net.InetSocketAddress;
028
029 import org.apache.commons.cli.CommandLine;
030 import org.apache.commons.cli.HelpFormatter;
031 import org.apache.commons.cli.Option;
032 import org.apache.commons.cli.OptionBuilder;
033 import org.apache.commons.cli.Options;
034 import org.apache.commons.cli.ParseException;
035 import org.apache.commons.cli.PosixParser;
036 import org.apache.log4j.BasicConfigurator;
037 import org.apache.log4j.Logger;
038 import org.apache.log4j.xml.DOMConfigurator;
039 import org.apache.mina.common.ByteBuffer;
040 import org.apache.mina.common.FixedSizeByteBufferAllocator;
041 import org.apache.mina.common.IoAcceptor;
042 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
043 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
044 import org.apache.mina.util.NewThreadExecutor;
045 import org.apache.qpid.common.QpidProperties;
046 import org.apache.qpid.framing.ProtocolVersion;
047 import org.apache.qpid.pool.ReadWriteThreadModel;
048 import org.apache.qpid.server.configuration.ServerConfiguration;
049 import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
050 import org.apache.qpid.server.protocol.AMQPProtocolProvider;
051 import org.apache.qpid.server.registry.ApplicationRegistry;
052 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
053
054 /**
055 * Main entry point for AMQPD.
056 *
057 */
058 @SuppressWarnings({"AccessStaticViaInstance"})
059 public class Main
060 {
061 private static final Logger _logger = Logger.getLogger(Main.class);
062 public static final Logger _brokerLogger = Logger.getLogger("Qpid.Broker");
063
064 private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
065
066 private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
067 public static final String QPID_HOME = "QPID_HOME";
068 private static final int IPV4_ADDRESS_LENGTH = 4;
069
070 private static final char IPV4_LITERAL_SEPARATOR = '.';
071
072 protected static class InitException extends Exception
073 {
074 InitException(String msg, Throwable cause)
075 {
076 super(msg, cause);
077 }
078 }
079
080 protected final Options options = new Options();
081 protected CommandLine commandLine;
082
083 protected Main(String[] args)
084 {
085 setOptions(options);
086 if (parseCommandline(args))
087 {
088 execute();
089 }
090 }
091
092 protected boolean parseCommandline(String[] args)
093 {
094 try
095 {
096 commandLine = new PosixParser().parse(options, args);
097
098 return true;
099 }
100 catch (ParseException e)
101 {
102 System.err.println("Error: " + e.getMessage());
103 HelpFormatter formatter = new HelpFormatter();
104 formatter.printHelp("Qpid", options, true);
105
106 return false;
107 }
108 }
109
110 protected void setOptions(Options options)
111 {
112 Option help = new Option("h", "help", false, "print this message");
113 Option version = new Option("v", "version", false, "print the version information and exit");
114 Option configFile =
115 OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
116 .create("c");
117 Option port =
118 OptionBuilder.withArgName("port").hasArg()
119 .withDescription("listen on the specified port. Overrides any value in the config file")
120 .withLongOpt("port").create("p");
121 Option mport =
122 OptionBuilder.withArgName("mport").hasArg()
123 .withDescription("listen on the specified management port. Overrides any value in the config file")
124 .withLongOpt("mport").create("m");
125
126
127 Option bind =
128 OptionBuilder.withArgName("bind").hasArg()
129 .withDescription("bind to the specified address. Overrides any value in the config file")
130 .withLongOpt("bind").create("b");
131 Option logconfig =
132 OptionBuilder.withArgName("logconfig").hasArg()
133 .withDescription("use the specified log4j xml configuration file. By "
134 + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME
135 + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
136 Option logwatchconfig =
137 OptionBuilder.withArgName("logwatch").hasArg()
138 .withDescription("monitor the log file configuration file for changes. Units are seconds. "
139 + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
140
141 options.addOption(help);
142 options.addOption(version);
143 options.addOption(configFile);
144 options.addOption(logconfig);
145 options.addOption(logwatchconfig);
146 options.addOption(port);
147 options.addOption(mport);
148 options.addOption(bind);
149 }
150
151 protected void execute()
152 {
153 // note this understands either --help or -h. If an option only has a long name you can use that but if
154 // an option has a short name and a long name you must use the short name here.
155 if (commandLine.hasOption("h"))
156 {
157 HelpFormatter formatter = new HelpFormatter();
158 formatter.printHelp("Qpid", options, true);
159 }
160 else if (commandLine.hasOption("v"))
161 {
162 String ver = QpidProperties.getVersionString();
163
164 StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: ");
165
166 boolean first = true;
167 for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
168 {
169 if (first)
170 {
171 first = false;
172 }
173 else
174 {
175 protocol.append(", ");
176 }
177
178 protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
179
180 }
181
182 System.out.println(ver + " (" + protocol + ")");
183 }
184 else
185 {
186 try
187 {
188 startup();
189 }
190 catch (InitException e)
191 {
192 System.out.println(e.getMessage());
193 _brokerLogger.error("Initialisation Error : " + e.getMessage());
194 shutdown(1);
195 }
196 catch (Throwable e)
197 {
198 System.out.println("Error initialising message broker: " + e);
199 _brokerLogger.error("Error initialising message broker: " + e);
200 e.printStackTrace();
201 shutdown(1);
202 }
203 }
204 }
205
206 protected void shutdown(int status)
207 {
208 ApplicationRegistry.removeAll();
209 System.exit(status);
210 }
211
212 protected void startup() throws Exception
213 {
214 final String QpidHome = System.getProperty(QPID_HOME);
215 final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
216 final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
217 if (!configFile.exists())
218 {
219 String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
220
221 if (QpidHome == null)
222 {
223 error = error + "\nNote: " + QPID_HOME + " is not set.";
224 }
225
226 throw new InitException(error, null);
227 }
228 else
229 {
230 System.out.println("Using configuration file " + configFile.getAbsolutePath());
231 }
232
233 String logConfig = commandLine.getOptionValue("l");
234 String logWatchConfig = commandLine.getOptionValue("w", "0");
235 if (logConfig != null)
236 {
237 File logConfigFile = new File(logConfig);
238 configureLogging(logConfigFile, logWatchConfig);
239 }
240 else
241 {
242 File configFileDirectory = configFile.getParentFile();
243 File logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME);
244 configureLogging(logConfigFile, logWatchConfig);
245 }
246
247 ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
248 ServerConfiguration serverConfig = config.getConfiguration();
249 updateManagementPort(serverConfig, commandLine.getOptionValue("m"));
250
251 ApplicationRegistry.initialise(config);
252
253 //fixme .. use QpidProperties.getVersionString when we have fixed the classpath issues
254 // that are causing the broker build to pick up the wrong properties file and hence say
255 // Starting Qpid Client
256 _brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion()
257 + " build: " + QpidProperties.getBuildVersion());
258
259 ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers());
260
261 // the MINA default is currently to use the pooled allocator although this may change in future
262 // once more testing of the performance of the simple allocator has been done
263 if (!serverConfig.getEnablePooledAllocator())
264 {
265 ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
266 }
267
268 if(serverConfig.getUseBiasedWrites())
269 {
270 System.setProperty("org.apache.qpid.use_write_biased_pool","true");
271 }
272
273 int port = serverConfig.getPort();
274
275 String portStr = commandLine.getOptionValue("p");
276 if (portStr != null)
277 {
278 try
279 {
280 port = Integer.parseInt(portStr);
281 }
282 catch (NumberFormatException e)
283 {
284 throw new InitException("Invalid port: " + portStr, e);
285 }
286 }
287
288 bind(port, serverConfig);
289 }
290
291 /**
292 * Update the configuration data with the management port.
293 * @param configuration
294 * @param managementPort The string from the command line
295 */
296 private void updateManagementPort(ServerConfiguration configuration, String managementPort)
297 {
298 if (managementPort != null)
299 {
300 try
301 {
302 configuration.setJMXManagementPort(Integer.parseInt(managementPort));
303 }
304 catch (NumberFormatException e)
305 {
306 _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e);
307 }
308 }
309 }
310
311 protected void bind(int port, ServerConfiguration config) throws BindException
312 {
313 String bindAddr = commandLine.getOptionValue("b");
314 if (bindAddr == null)
315 {
316 bindAddr = config.getBind();
317 }
318
319 try
320 {
321 IoAcceptor acceptor;
322
323 if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO())
324 {
325 _logger.warn("Using Qpid Multithreaded IO Processing");
326 acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor());
327 }
328 else
329 {
330 _logger.warn("Using Mina IO Processing");
331 acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor());
332 }
333
334 SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
335 SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
336
337 sc.setReceiveBufferSize(config.getReceiveBufferSize());
338 sc.setSendBufferSize(config.getWriteBufferSize());
339 sc.setTcpNoDelay(config.getTcpNoDelay());
340
341 // if we do not use the executor pool threading model we get the default leader follower
342 // implementation provided by MINA
343 if (config.getEnableExecutorPool())
344 {
345 sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
346 }
347
348 if (!config.getEnableSSL() || !config.getSSLOnly())
349 {
350 AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
351 InetSocketAddress bindAddress;
352 if (bindAddr.equals("wildcard"))
353 {
354 bindAddress = new InetSocketAddress(port);
355 }
356 else
357 {
358 bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
359 }
360
361 bind(acceptor, bindAddress, handler, sconfig);
362
363 //fixme qpid.AMQP should be using qpidproperties to get value
364 _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
365 }
366
367 if (config.getEnableSSL())
368 {
369 AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
370 try
371 {
372
373 bind(acceptor, new InetSocketAddress(config.getSSLPort()), handler, sconfig);
374
375 //fixme qpid.AMQP should be using qpidproperties to get value
376 _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort());
377
378 }
379 catch (IOException e)
380 {
381 _brokerLogger.error("Unable to listen on SSL port: " + e, e);
382 }
383 }
384
385 //fixme qpid.AMQP should be using qpidproperties to get value
386 _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
387 + " build: " + QpidProperties.getBuildVersion());
388 }
389 catch (Exception e)
390 {
391 _logger.error("Unable to bind service to registry: " + e, e);
392 //fixme this need tidying up
393 throw new BindException(e.getMessage());
394 }
395 }
396
397 /**
398 * Ensure that any bound Acceptors are recorded in the registry so they can be closed later.
399 *
400 * @param acceptor
401 * @param bindAddress
402 * @param handler
403 * @param sconfig
404 *
405 * @throws IOException from the acceptor.bind command
406 */
407 private void bind(IoAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException
408 {
409 acceptor.bind(bindAddress, handler, sconfig);
410
411 ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor);
412 }
413
414 public static void main(String[] args)
415 {
416
417 new Main(args);
418 }
419
420 private byte[] parseIP(String address) throws Exception
421 {
422 char[] literalBuffer = address.toCharArray();
423 int byteCount = 0;
424 int currByte = 0;
425 byte[] ip = new byte[IPV4_ADDRESS_LENGTH];
426 for (int i = 0; i < literalBuffer.length; i++)
427 {
428 char currChar = literalBuffer[i];
429 if ((currChar >= '0') && (currChar <= '9'))
430 {
431 currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF);
432 }
433
434 if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length))
435 {
436 ip[byteCount++] = (byte) currByte;
437 currByte = 0;
438 }
439 }
440
441 if (byteCount != 4)
442 {
443 throw new Exception("Invalid IP address: " + address);
444 }
445 return ip;
446 }
447
448 private void configureLogging(File logConfigFile, String logWatchConfig)
449 {
450 int logWatchTime = 0;
451 try
452 {
453 logWatchTime = Integer.parseInt(logWatchConfig);
454 }
455 catch (NumberFormatException e)
456 {
457 System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
458 + "a non-negative integer. Using default of zero (no watching configured");
459 }
460
461 if (logConfigFile.exists() && logConfigFile.canRead())
462 {
463 System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath());
464 if (logWatchTime > 0)
465 {
466 System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
467 + logWatchTime + " seconds");
468 // log4j expects the watch interval in milliseconds
469 DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000);
470 }
471 else
472 {
473 DOMConfigurator.configure(logConfigFile.getAbsolutePath());
474 }
475 }
476 else
477 {
478 System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
479 System.err.println("Using basic log4j configuration");
480 BasicConfigurator.configure();
481 }
482 }
483
484 }
|