001 package org.apache.qpid.client.protocol;
002 /*
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied. See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 *
021 */
022
023
024 import java.util.UUID;
025
026 import javax.security.sasl.SaslClient;
027
028 import org.apache.commons.lang.StringUtils;
029 import org.apache.mina.common.IdleStatus;
030 import org.apache.qpid.AMQException;
031 import org.apache.qpid.client.AMQConnection;
032 import org.apache.qpid.client.ConnectionTuneParameters;
033 import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
034 import org.apache.qpid.client.state.AMQState;
035 import org.apache.qpid.framing.AMQDataBlock;
036 import org.apache.qpid.framing.AMQMethodBody;
037 import org.apache.qpid.framing.AMQShortString;
038 import org.apache.qpid.framing.ProtocolInitiation;
039 import org.apache.qpid.framing.ProtocolVersion;
040 import org.apache.qpid.transport.Sender;
041
042 public class AMQIoTransportProtocolSession extends AMQProtocolSession
043 {
044
045 protected Sender<java.nio.ByteBuffer> _ioSender;
046 private SaslClient _saslClient;
047 private ConnectionTuneParameters _connectionTuneParameters;
048
049 public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
050 {
051 super(protocolHandler, connection);
052 }
053
054 @Override
055 public void closeProtocolSession(boolean waitLast) throws AMQException
056 {
057 _ioSender.close();
058 _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
059 }
060
061 @Override
062 public void init()
063 {
064 _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer());
065 _ioSender.flush();
066 }
067
068 @Override
069 protected AMQShortString generateQueueName()
070 {
071 int id;
072 synchronized (_queueIdLock)
073 {
074 id = _queueId++;
075 }
076 return new AMQShortString("tmp_" + UUID.randomUUID() + "_" + id);
077 }
078
079 @Override
080 public AMQConnection getAMQConnection()
081 {
082 return _connection;
083 }
084
085 @Override
086 public SaslClient getSaslClient()
087 {
088 return _saslClient;
089 }
090
091 @Override
092 public void setSaslClient(SaslClient client)
093 {
094 _saslClient = client;
095 }
096
097 /** @param delay delay in seconds (not ms) */
098 @Override
099 void initHeartbeats(int delay)
100 {
101 if (delay > 0)
102 {
103 // FIXME: actually do something here
104 HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
105 }
106 }
107
108 @Override
109 public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
110 {
111 // FIXME?
112 _protocolHandler.methodBodyReceived(channel, amqMethodBody, null);
113 }
114
115 @Override
116 public void writeFrame(AMQDataBlock frame, boolean wait)
117 {
118 _ioSender.send(frame.toNioByteBuffer());
119 if (wait)
120 {
121 _ioSender.flush();
122 }
123 }
124
125 @Override
126 public void setSender(Sender<java.nio.ByteBuffer> sender)
127 {
128 _ioSender = sender;
129 }
130
131 @Override
132 public ConnectionTuneParameters getConnectionTuneParameters()
133 {
134 return _connectionTuneParameters;
135 }
136
137 @Override
138 public void setConnectionTuneParameters(ConnectionTuneParameters params)
139 {
140 _connectionTuneParameters = params;
141 AMQConnection con = getAMQConnection();
142 con.setMaximumChannelCount(params.getChannelMax());
143 con.setMaximumFrameSize(params.getFrameMax());
144 initHeartbeats((int) params.getHeartbeat());
145 }
146 }
|