AMQIoTransportProtocolSession.java
001 package org.apache.qpid.client.protocol;
002 /*
003  
004  * Licensed to the Apache Software Foundation (ASF) under one
005  * or more contributor license agreements.  See the NOTICE file
006  * distributed with this work for additional information
007  * regarding copyright ownership.  The ASF licenses this file
008  * to you under the Apache License, Version 2.0 (the
009  * "License"); you may not use this file except in compliance
010  * with the License.  You may obtain a copy of the License at
011  
012  *   http://www.apache.org/licenses/LICENSE-2.0
013  
014  * Unless required by applicable law or agreed to in writing,
015  * software distributed under the License is distributed on an
016  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017  * KIND, either express or implied.  See the License for the
018  * specific language governing permissions and limitations
019  * under the License.
020  
021  */
022 
023 
024 import java.util.UUID;
025 
026 import javax.security.sasl.SaslClient;
027 
028 import org.apache.commons.lang.StringUtils;
029 import org.apache.mina.common.IdleStatus;
030 import org.apache.qpid.AMQException;
031 import org.apache.qpid.client.AMQConnection;
032 import org.apache.qpid.client.ConnectionTuneParameters;
033 import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
034 import org.apache.qpid.client.state.AMQState;
035 import org.apache.qpid.framing.AMQDataBlock;
036 import org.apache.qpid.framing.AMQMethodBody;
037 import org.apache.qpid.framing.AMQShortString;
038 import org.apache.qpid.framing.ProtocolInitiation;
039 import org.apache.qpid.framing.ProtocolVersion;
040 import org.apache.qpid.transport.Sender;
041 
042 public class AMQIoTransportProtocolSession extends AMQProtocolSession
043 {
044 
045     protected Sender<java.nio.ByteBuffer> _ioSender;
046     private SaslClient _saslClient;
047     private ConnectionTuneParameters _connectionTuneParameters;
048     
049     public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
050     {
051         super(protocolHandler, connection);
052     }
053     
054     @Override
055     public void closeProtocolSession(boolean waitLastthrows AMQException
056     {
057         _ioSender.close();
058         _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
059     }
060 
061     @Override
062     public void init()
063     {
064         _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer());
065         _ioSender.flush();
066     }
067 
068     @Override
069     protected AMQShortString generateQueueName()
070     {
071         int id;
072         synchronized (_queueIdLock)
073         {
074             id = _queueId++;
075         }
076         return new AMQShortString("tmp_" + UUID.randomUUID() "_" + id);
077     }
078     
079     @Override
080     public AMQConnection getAMQConnection()
081     {
082         return _connection;
083     }
084     
085     @Override
086     public SaslClient getSaslClient()
087     {
088         return _saslClient;
089     }
090     
091     @Override
092     public void setSaslClient(SaslClient client)
093     {
094         _saslClient = client;
095     }
096     
097     /** @param delay delay in seconds (not ms) */
098     @Override
099     void initHeartbeats(int delay)
100     {
101         if (delay > 0)
102         {
103             // FIXME: actually do something here
104             HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
105         }
106     }
107     
108     @Override
109     public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBodythrows AMQException
110     {
111         // FIXME?
112         _protocolHandler.methodBodyReceived(channel, amqMethodBody, null);
113     }
114     
115     @Override
116     public void writeFrame(AMQDataBlock frame, boolean wait)
117     {      
118         _ioSender.send(frame.toNioByteBuffer());
119         if (wait)
120         {
121             _ioSender.flush();
122         }
123     }
124     
125     @Override
126     public void setSender(Sender<java.nio.ByteBuffer> sender)
127     {
128         _ioSender = sender;
129     }
130  
131     @Override
132     public ConnectionTuneParameters getConnectionTuneParameters()
133     {
134         return _connectionTuneParameters;
135     }
136     
137     @Override
138     public void setConnectionTuneParameters(ConnectionTuneParameters params)
139     {
140         _connectionTuneParameters = params;
141         AMQConnection con = getAMQConnection();
142         con.setMaximumChannelCount(params.getChannelMax());
143         con.setMaximumFrameSize(params.getFrameMax());
144         initHeartbeats((intparams.getHeartbeat());
145     }
146 }