Main.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server;
022 
023 import java.io.File;
024 import java.io.IOException;
025 import java.net.BindException;
026 import java.net.InetAddress;
027 import java.net.InetSocketAddress;
028 
029 import org.apache.commons.cli.CommandLine;
030 import org.apache.commons.cli.HelpFormatter;
031 import org.apache.commons.cli.Option;
032 import org.apache.commons.cli.OptionBuilder;
033 import org.apache.commons.cli.Options;
034 import org.apache.commons.cli.ParseException;
035 import org.apache.commons.cli.PosixParser;
036 import org.apache.log4j.BasicConfigurator;
037 import org.apache.log4j.Logger;
038 import org.apache.log4j.xml.DOMConfigurator;
039 import org.apache.mina.common.ByteBuffer;
040 import org.apache.mina.common.FixedSizeByteBufferAllocator;
041 import org.apache.mina.common.IoAcceptor;
042 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
043 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
044 import org.apache.mina.util.NewThreadExecutor;
045 import org.apache.qpid.common.QpidProperties;
046 import org.apache.qpid.framing.ProtocolVersion;
047 import org.apache.qpid.pool.ReadWriteThreadModel;
048 import org.apache.qpid.server.configuration.ServerConfiguration;
049 import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
050 import org.apache.qpid.server.protocol.AMQPProtocolProvider;
051 import org.apache.qpid.server.registry.ApplicationRegistry;
052 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
053 
054 /**
055  * Main entry point for AMQPD.
056  *
057  */
058 @SuppressWarnings({"AccessStaticViaInstance"})
059 public class Main
060 {
061     private static final Logger _logger = Logger.getLogger(Main.class);
062     public static final Logger _brokerLogger = Logger.getLogger("Qpid.Broker");
063 
064     private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
065 
066     private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
067     public static final String QPID_HOME = "QPID_HOME";
068     private static final int IPV4_ADDRESS_LENGTH = 4;
069 
070     private static final char IPV4_LITERAL_SEPARATOR = '.';
071 
072     protected static class InitException extends Exception
073     {
074         InitException(String msg, Throwable cause)
075         {
076             super(msg, cause);
077         }
078     }
079 
080     protected final Options options = new Options();
081     protected CommandLine commandLine;
082 
083     protected Main(String[] args)
084     {
085         setOptions(options);
086         if (parseCommandline(args))
087         {
088             execute();
089         }
090     }
091 
092     protected boolean parseCommandline(String[] args)
093     {
094         try
095         {
096             commandLine = new PosixParser().parse(options, args);
097 
098             return true;
099         }
100         catch (ParseException e)
101         {
102             System.err.println("Error: " + e.getMessage());
103             HelpFormatter formatter = new HelpFormatter();
104             formatter.printHelp("Qpid", options, true);
105 
106             return false;
107         }
108     }
109 
110     protected void setOptions(Options options)
111     {
112         Option help = new Option("h""help", false, "print this message");
113         Option version = new Option("v""version", false, "print the version information and exit");
114         Option configFile =
115                 OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
116                         .create("c");
117         Option port =
118                 OptionBuilder.withArgName("port").hasArg()
119                         .withDescription("listen on the specified port. Overrides any value in the config file")
120                         .withLongOpt("port").create("p");
121         Option mport =
122                 OptionBuilder.withArgName("mport").hasArg()
123                         .withDescription("listen on the specified management port. Overrides any value in the config file")
124                         .withLongOpt("mport").create("m");
125 
126 
127         Option bind =
128                 OptionBuilder.withArgName("bind").hasArg()
129                         .withDescription("bind to the specified address. Overrides any value in the config file")
130                         .withLongOpt("bind").create("b");
131         Option logconfig =
132                 OptionBuilder.withArgName("logconfig").hasArg()
133                         .withDescription("use the specified log4j xml configuration file. By "
134                                          "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME
135                                          " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
136         Option logwatchconfig =
137                 OptionBuilder.withArgName("logwatch").hasArg()
138                         .withDescription("monitor the log file configuration file for changes. Units are seconds. "
139                                          "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
140 
141         options.addOption(help);
142         options.addOption(version);
143         options.addOption(configFile);
144         options.addOption(logconfig);
145         options.addOption(logwatchconfig);
146         options.addOption(port);
147         options.addOption(mport);
148         options.addOption(bind);
149     }
150 
151     protected void execute()
152     {
153         // note this understands either --help or -h. If an option only has a long name you can use that but if
154         // an option has a short name and a long name you must use the short name here.
155         if (commandLine.hasOption("h"))
156         {
157             HelpFormatter formatter = new HelpFormatter();
158             formatter.printHelp("Qpid", options, true);
159         }
160         else if (commandLine.hasOption("v"))
161         {
162             String ver = QpidProperties.getVersionString();
163 
164             StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: ");
165 
166             boolean first = true;
167             for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
168             {
169                 if (first)
170                 {
171                     first = false;
172                 }
173                 else
174                 {
175                     protocol.append(", ");
176                 }
177 
178                 protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
179 
180             }
181 
182             System.out.println(ver + " (" + protocol + ")");
183         }
184         else
185         {
186             try
187             {
188                 startup();
189             }
190             catch (InitException e)
191             {
192                 System.out.println(e.getMessage());
193                 _brokerLogger.error("Initialisation Error : " + e.getMessage());
194                 shutdown(1);
195             }
196             catch (Throwable e)
197             {
198                 System.out.println("Error initialising message broker: " + e);
199                 _brokerLogger.error("Error initialising message broker: " + e);
200                 e.printStackTrace();
201                 shutdown(1);
202             }
203         }
204     }
205 
206     protected void shutdown(int status)
207     {
208         ApplicationRegistry.removeAll();
209         System.exit(status);
210     }
211 
212     protected void startup() throws Exception
213     {
214         final String QpidHome = System.getProperty(QPID_HOME);
215         final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
216         final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
217         if (!configFile.exists())
218         {
219             String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
220 
221             if (QpidHome == null)
222             {
223                 error = error + "\nNote: " + QPID_HOME + " is not set.";
224             }
225 
226             throw new InitException(error, null);
227         }
228         else
229         {
230             System.out.println("Using configuration file " + configFile.getAbsolutePath());
231         }
232 
233         String logConfig = commandLine.getOptionValue("l");
234         String logWatchConfig = commandLine.getOptionValue("w""0");
235         if (logConfig != null)
236         {
237             File logConfigFile = new File(logConfig);
238             configureLogging(logConfigFile, logWatchConfig);
239         }
240         else
241         {
242             File configFileDirectory = configFile.getParentFile();
243             File logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME);
244             configureLogging(logConfigFile, logWatchConfig);
245         }
246 
247         ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
248         ServerConfiguration serverConfig = config.getConfiguration();
249         updateManagementPort(serverConfig, commandLine.getOptionValue("m"));
250 
251         ApplicationRegistry.initialise(config);
252 
253         //fixme .. use QpidProperties.getVersionString when we have fixed the classpath issues
254         // that are causing the broker build to pick up the wrong properties file and hence say
255         // Starting Qpid Client 
256         _brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion()
257                            " build: " + QpidProperties.getBuildVersion());
258 
259         ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers());
260 
261         // the MINA default is currently to use the pooled allocator although this may change in future
262         // once more testing of the performance of the simple allocator has been done
263         if (!serverConfig.getEnablePooledAllocator())
264         {
265             ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
266         }
267 
268         if(serverConfig.getUseBiasedWrites())
269         {
270             System.setProperty("org.apache.qpid.use_write_biased_pool","true");
271         }
272 
273         int port = serverConfig.getPort();
274 
275         String portStr = commandLine.getOptionValue("p");
276         if (portStr != null)
277         {
278             try
279             {
280                 port = Integer.parseInt(portStr);
281             }
282             catch (NumberFormatException e)
283             {
284                 throw new InitException("Invalid port: " + portStr, e);
285             }
286         }
287         
288         bind(port, serverConfig);
289     }
290 
291     /**
292      * Update the configuration data with the management port.
293      @param configuration
294      @param managementPort The string from the command line
295      */
296     private void updateManagementPort(ServerConfiguration configuration, String managementPort)
297     {
298         if (managementPort != null)
299         {
300             try
301             {
302                 configuration.setJMXManagementPort(Integer.parseInt(managementPort));
303             }
304             catch (NumberFormatException e)
305             {
306                 _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e);
307             }
308         }
309     }
310 
311     protected void bind(int port, ServerConfiguration configthrows BindException
312     {
313         String bindAddr = commandLine.getOptionValue("b");
314         if (bindAddr == null)
315         {
316             bindAddr = config.getBind();
317         }
318 
319         try
320         {
321             IoAcceptor acceptor;
322             
323             if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO())
324             {
325                 _logger.warn("Using Qpid Multithreaded IO Processing");
326                 acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors()new NewThreadExecutor());
327             }
328             else
329             {
330                 _logger.warn("Using Mina IO Processing");
331                 acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors()new NewThreadExecutor());
332             }
333             
334             SocketAcceptorConfig sconfig = (SocketAcceptorConfigacceptor.getDefaultConfig();
335             SocketSessionConfig sc = (SocketSessionConfigsconfig.getSessionConfig();
336 
337             sc.setReceiveBufferSize(config.getReceiveBufferSize());
338             sc.setSendBufferSize(config.getWriteBufferSize());
339             sc.setTcpNoDelay(config.getTcpNoDelay());
340 
341             // if we do not use the executor pool threading model we get the default leader follower
342             // implementation provided by MINA
343             if (config.getEnableExecutorPool())
344             {
345                 sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
346             }
347 
348             if (!config.getEnableSSL() || !config.getSSLOnly())
349             {
350                 AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
351                 InetSocketAddress bindAddress;
352                 if (bindAddr.equals("wildcard"))
353                 {
354                     bindAddress = new InetSocketAddress(port);
355                 }
356                 else
357                 {
358                     bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
359                 }
360 
361                 bind(acceptor, bindAddress, handler, sconfig);
362 
363                 //fixme  qpid.AMQP should be using qpidproperties to get value
364                 _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
365             }
366 
367             if (config.getEnableSSL())
368             {
369                 AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
370                 try
371                 {
372 
373                     bind(acceptor, new InetSocketAddress(config.getSSLPort()), handler, sconfig);
374 
375                     //fixme  qpid.AMQP should be using qpidproperties to get value
376                     _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort());
377 
378                 }
379                 catch (IOException e)
380                 {
381                     _brokerLogger.error("Unable to listen on SSL port: " + e, e);
382                 }
383             }
384 
385             //fixme  qpid.AMQP should be using qpidproperties to get value
386             _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
387                                " build: " + QpidProperties.getBuildVersion());
388         }
389         catch (Exception e)
390         {
391             _logger.error("Unable to bind service to registry: " + e, e);
392             //fixme this need tidying up
393             throw new BindException(e.getMessage());
394         }
395     }
396 
397     /**
398      * Ensure that any bound Acceptors are recorded in the registry so they can be closed later.
399      *
400      @param acceptor
401      @param bindAddress
402      @param handler
403      @param sconfig
404      *
405      @throws IOException from the acceptor.bind command
406      */
407     private void bind(IoAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfigthrows IOException
408     {
409         acceptor.bind(bindAddress, handler, sconfig);
410 
411         ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor);
412     }
413 
414     public static void main(String[] args)
415     {
416 
417         new Main(args);
418     }
419 
420     private byte[] parseIP(String addressthrows Exception
421     {
422         char[] literalBuffer = address.toCharArray();
423         int byteCount = 0;
424         int currByte = 0;
425         byte[] ip = new byte[IPV4_ADDRESS_LENGTH];
426         for (int i = 0; i < literalBuffer.length; i++)
427         {
428             char currChar = literalBuffer[i];
429             if ((currChar >= '0'&& (currChar <= '9'))
430             {
431                 currByte = (currByte * 10(Character.digit(currChar, 100xFF);
432             }
433 
434             if (currChar == IPV4_LITERAL_SEPARATOR || (i + == literalBuffer.length))
435             {
436                 ip[byteCount++(bytecurrByte;
437                 currByte = 0;
438             }
439         }
440 
441         if (byteCount != 4)
442         {
443             throw new Exception("Invalid IP address: " + address);
444         }
445         return ip;
446     }
447 
448     private void configureLogging(File logConfigFile, String logWatchConfig)
449     {
450         int logWatchTime = 0;
451         try
452         {
453             logWatchTime = Integer.parseInt(logWatchConfig);
454         }
455         catch (NumberFormatException e)
456         {
457             System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
458                                "a non-negative integer. Using default of zero (no watching configured");
459         }
460 
461         if (logConfigFile.exists() && logConfigFile.canRead())
462         {
463             System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath());
464             if (logWatchTime > 0)
465             {
466                 System.out.println("log file " + logConfigFile.getAbsolutePath() " will be checked for changes every "
467                                    + logWatchTime + " seconds");
468                 // log4j expects the watch interval in milliseconds
469                 DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000);
470             }
471             else
472             {
473                 DOMConfigurator.configure(logConfigFile.getAbsolutePath());
474             }
475         }
476         else
477         {
478             System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
479             System.err.println("Using basic log4j configuration");
480             BasicConfigurator.configure();
481         }
482     }
483 
484 }