ConnectionStartMethodHandler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client.handler;
022 
023 import org.apache.qpid.AMQException;
024 import org.apache.qpid.client.protocol.AMQProtocolSession;
025 import org.apache.qpid.client.security.AMQCallbackHandler;
026 import org.apache.qpid.client.security.CallbackHandlerRegistry;
027 import org.apache.qpid.client.state.AMQState;
028 import org.apache.qpid.client.state.StateAwareMethodListener;
029 import org.apache.qpid.common.ClientProperties;
030 import org.apache.qpid.common.QpidProperties;
031 import org.apache.qpid.framing.AMQShortString;
032 import org.apache.qpid.framing.ConnectionStartBody;
033 import org.apache.qpid.framing.ConnectionStartOkBody;
034 import org.apache.qpid.framing.FieldTable;
035 import org.apache.qpid.framing.FieldTableFactory;
036 import org.apache.qpid.framing.ProtocolVersion;
037 
038 import org.slf4j.Logger;
039 import org.slf4j.LoggerFactory;
040 
041 import javax.security.sasl.Sasl;
042 import javax.security.sasl.SaslClient;
043 import javax.security.sasl.SaslException;
044 
045 import java.io.UnsupportedEncodingException;
046 import java.util.HashSet;
047 import java.util.StringTokenizer;
048 
049 public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody>
050 {
051     private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class);
052 
053     private static final ConnectionStartMethodHandler _instance = new ConnectionStartMethodHandler();
054 
055     public static ConnectionStartMethodHandler getInstance()
056     {
057         return _instance;
058     }
059 
060     private ConnectionStartMethodHandler()
061     { }
062 
063     public void methodReceived(AMQProtocolSession session, ConnectionStartBody body, int channelId)
064             throws AMQException
065     {
066         _log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, "
067             "AMQMethodEvent evt): called");
068 
069         ProtocolVersion pv = new ProtocolVersion((bytebody.getVersionMajor()(bytebody.getVersionMinor());
070 
071         // For the purposes of interop, we can make the client accept the broker's version string.
072         // If it does, it then internally records the version as being the latest one that it understands.
073         // It needs to do this since frame lookup is done by version.
074         if (Boolean.getBoolean("qpid.accept.broker.version"&& !pv.isSupported())
075         {
076 
077             pv = ProtocolVersion.getLatestSupportedVersion();
078         }
079 
080         if (pv.isSupported())
081         {
082             session.setProtocolVersion(pv);
083 
084             try
085             {
086                 // Used to hold the SASL mechanism to authenticate with.
087                 String mechanism;
088 
089                 if (body.getMechanisms()== null)
090                 {
091                     throw new AMQException(null, "mechanism not specified in ConnectionStart method frame"null);
092                 }
093                 else
094                 {
095                     mechanism = chooseMechanism(body.getMechanisms());
096                     _log.debug("mechanism = " + mechanism);
097                 }
098 
099                 if (mechanism == null)
100                 {
101                     throw new AMQException(null, "No supported security mechanism found, passed: " new String(body.getMechanisms())null);
102                 }
103 
104                 byte[] saslResponse;
105                 try
106                 {
107                     SaslClient sc =
108                         Sasl.createSaslClient(new String[] { mechanism }, null, "AMQP""localhost", null,
109                             createCallbackHandler(mechanism, session));
110                     if (sc == null)
111                     {
112                         throw new AMQException(null, "Client SASL configuration error: no SaslClient could be created for mechanism " + mechanism
113                             ". Please ensure all factories are registered. See DynamicSaslRegistrar for "
114                             " details of how to register non-standard SASL client providers."null);
115                     }
116 
117                     session.setSaslClient(sc);
118                     saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) null);
119                 }
120                 catch (SaslException e)
121                 {
122                     session.setSaslClient(null);
123                     throw new AMQException(null, "Unable to create SASL client: " + e, e);
124                 }
125 
126                 if (body.getLocales() == null)
127                 {
128                     throw new AMQException(null, "Locales is not defined in Connection Start method"null);
129                 }
130 
131                 final String locales = new String(body.getLocales()"utf8");
132                 final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
133                 String selectedLocale = null;
134                 if (tokenizer.hasMoreTokens())
135                 {
136                     selectedLocale = tokenizer.nextToken();
137                 }
138                 else
139                 {
140                     throw new AMQException(null, "No locales sent from server, passed: " + locales, null);
141                 }
142 
143                 session.getStateManager().changeState(AMQState.CONNECTION_NOT_TUNED);
144                 FieldTable clientProperties = FieldTableFactory.newFieldTable();
145 
146                 clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()),
147                     session.getClientID());
148                 clientProperties.setString(new AMQShortString(ClientProperties.product.toString()),
149                     QpidProperties.getProductName());
150                 clientProperties.setString(new AMQShortString(ClientProperties.version.toString()),
151                     QpidProperties.getReleaseVersion());
152                 clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
153 
154 
155                 ConnectionStartOkBody connectionStartOkBody = session.getMethodRegistry().createConnectionStartOkBody(clientProperties,new AMQShortString(mechanism),saslResponse,new AMQShortString(locales));
156                 // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
157                 // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
158                 // Be aware of possible changes to parameter order as versions change.
159                 session.writeFrame(connectionStartOkBody.generateFrame(channelId));
160                         
161             }
162             catch (UnsupportedEncodingException e)
163             {
164                 throw new AMQException(null, "Unable to decode data: " + e, e);
165             }
166         }
167         else
168         {
169             _log.error("Broker requested Protocol [" + body.getVersionMajor() "-" + body.getVersionMinor()
170                 "] which is not supported by this version of the client library");
171 
172             session.closeProtocolSession();
173         }
174     }
175 
176     private String getFullSystemInfo()
177     {
178         StringBuffer fullSystemInfo = new StringBuffer();
179         fullSystemInfo.append(System.getProperty("java.runtime.name"));
180         fullSystemInfo.append(", " + System.getProperty("java.runtime.version"));
181         fullSystemInfo.append(", " + System.getProperty("java.vendor"));
182         fullSystemInfo.append(", " + System.getProperty("os.arch"));
183         fullSystemInfo.append(", " + System.getProperty("os.name"));
184         fullSystemInfo.append(", " + System.getProperty("os.version"));
185         fullSystemInfo.append(", " + System.getProperty("sun.os.patch.level"));
186 
187         return fullSystemInfo.toString();
188     }
189 
190     private String chooseMechanism(byte[] availableMechanismsthrows UnsupportedEncodingException
191     {
192         final String mechanisms = new String(availableMechanisms, "utf8");
193         StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
194         HashSet mechanismSet = new HashSet();
195         while (tokenizer.hasMoreTokens())
196         {
197             mechanismSet.add(tokenizer.nextToken());
198         }
199 
200         String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
201         StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " ");
202         while (prefTokenizer.hasMoreTokens())
203         {
204             String mech = prefTokenizer.nextToken();
205             if (mechanismSet.contains(mech))
206             {
207                 return mech;
208             }
209         }
210 
211         return null;
212     }
213 
214     private AMQCallbackHandler createCallbackHandler(String mechanism, AMQProtocolSession protocolSession)
215         throws AMQException
216     {
217         Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism);
218         try
219         {
220             Object instance = mechanismClass.newInstance();
221             AMQCallbackHandler cbh = (AMQCallbackHandlerinstance;
222             cbh.initialise(protocolSession);
223 
224             return cbh;
225         }
226         catch (Exception e)
227         {
228             throw new AMQException(null, "Unable to create callback handler: " + e, e);
229         }
230     }
231 
232 }