001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.handler;
022
023 import java.util.UUID;
024 import java.io.ByteArrayOutputStream;
025 import java.io.DataOutputStream;
026 import java.io.IOException;
027
028 import org.apache.qpid.AMQException;
029 import org.apache.qpid.framing.*;
030 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
031 import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
032 import org.apache.qpid.protocol.AMQConstant;
033 import org.apache.qpid.server.AMQChannel;
034 import org.apache.qpid.server.protocol.AMQProtocolSession;
035 import org.apache.qpid.server.state.AMQStateManager;
036 import org.apache.qpid.server.state.StateAwareMethodListener;
037 import org.apache.qpid.server.virtualhost.VirtualHost;
038
039 public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenBody>
040 {
041 private static ChannelOpenHandler _instance = new ChannelOpenHandler();
042
043 public static ChannelOpenHandler getInstance()
044 {
045 return _instance;
046 }
047
048 private ChannelOpenHandler()
049 {
050 }
051
052 public void methodReceived(AMQStateManager stateManager, ChannelOpenBody body, int channelId) throws AMQException
053 {
054 AMQProtocolSession session = stateManager.getProtocolSession();
055 VirtualHost virtualHost = session.getVirtualHost();
056
057 final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getTransactionLog()
058 );
059 session.addChannel(channel);
060
061 ChannelOpenOkBody response;
062
063 ProtocolVersion pv = session.getProtocolVersion();
064
065 if(pv.equals(ProtocolVersion.v8_0))
066 {
067 MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
068 response = methodRegistry.createChannelOpenOkBody();
069
070 }
071 else if(pv.equals(ProtocolVersion.v0_9))
072 {
073 MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
074 UUID uuid = UUID.randomUUID();
075 ByteArrayOutputStream output = new ByteArrayOutputStream();
076 DataOutputStream dataOut = new DataOutputStream(output);
077 try
078 {
079 dataOut.writeLong(uuid.getMostSignificantBits());
080 dataOut.writeLong(uuid.getLeastSignificantBits());
081 dataOut.flush();
082 dataOut.close();
083 }
084 catch (IOException e)
085 {
086 // This *really* shouldn't happen as we're not doing any I/O
087 throw new RuntimeException("I/O exception when writing to byte array", e);
088 }
089
090 // should really associate this channelId to the session
091 byte[] channelName = output.toByteArray();
092
093 response = methodRegistry.createChannelOpenOkBody(channelName);
094 }
095 else
096 {
097 throw new AMQException(AMQConstant.INTERNAL_ERROR, "Got channel open for protocol version not catered for: " + pv, null);
098 }
099
100
101 session.writeFrame(response.generateFrame(channelId));
102 }
103 }
|