001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.transport.network.ssl;
022
023 import java.nio.ByteBuffer;
024
025 import javax.net.ssl.SSLEngine;
026 import javax.net.ssl.SSLEngineResult;
027 import javax.net.ssl.SSLException;
028 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
029 import javax.net.ssl.SSLEngineResult.Status;
030
031 import org.apache.qpid.transport.Receiver;
032 import org.apache.qpid.transport.TransportException;
033 import org.apache.qpid.transport.util.Logger;
034
035 public class SSLReceiver implements Receiver<ByteBuffer>
036 {
037 private Receiver<ByteBuffer> delegate;
038 private SSLEngine engine;
039 private SSLSender sender;
040 private int sslBufSize;
041 private ByteBuffer appData;
042 private ByteBuffer localBuffer;
043 private boolean dataCached = false;
044 private final Object notificationToken;
045
046 private static final Logger log = Logger.get(SSLReceiver.class);
047
048 public SSLReceiver(SSLEngine engine, Receiver<ByteBuffer> delegate,SSLSender sender)
049 {
050 this.engine = engine;
051 this.delegate = delegate;
052 this.sender = sender;
053 this.sslBufSize = engine.getSession().getApplicationBufferSize();
054 appData = ByteBuffer.allocate(sslBufSize);
055 localBuffer = ByteBuffer.allocate(sslBufSize);
056 notificationToken = sender.getNotificationToken();
057 }
058
059 public void closed()
060 {
061 delegate.closed();
062 }
063
064 public void exception(Throwable t)
065 {
066 delegate.exception(t);
067 }
068
069 private ByteBuffer addPreviouslyUnreadData(ByteBuffer buf)
070 {
071 if (dataCached)
072 {
073 ByteBuffer b = ByteBuffer.allocate(localBuffer.remaining() + buf.remaining());
074 b.put(localBuffer);
075 b.put(buf);
076 b.flip();
077 dataCached = false;
078 return b;
079 }
080 else
081 {
082 return buf;
083 }
084 }
085
086 public void received(ByteBuffer buf)
087 {
088 ByteBuffer netData = addPreviouslyUnreadData(buf);
089
090 HandshakeStatus handshakeStatus;
091 Status status;
092
093 while (netData.hasRemaining())
094 {
095 try
096 {
097 SSLEngineResult result = engine.unwrap(netData, appData);
098 int read = result.bytesProduced();
099 status = result.getStatus();
100 handshakeStatus = result.getHandshakeStatus();
101
102 if (read > 0)
103 {
104 int limit = appData.limit();
105 appData.limit(appData.position());
106 appData.position(appData.position() - read);
107
108 ByteBuffer data = appData.slice();
109
110 appData.limit(limit);
111 appData.position(appData.position() + read);
112
113 delegate.received(data);
114 }
115
116
117 switch(status)
118 {
119 case CLOSED:
120 synchronized(notificationToken)
121 {
122 notificationToken.notifyAll();
123 }
124 return;
125
126 case BUFFER_OVERFLOW:
127 appData = ByteBuffer.allocate(sslBufSize);
128 continue;
129
130 case BUFFER_UNDERFLOW:
131 localBuffer.clear();
132 localBuffer.put(netData);
133 localBuffer.flip();
134 dataCached = true;
135 break;
136
137 case OK:
138 break; // do nothing
139
140 default:
141 throw new IllegalStateException("SSLReceiver: Invalid State " + status);
142 }
143
144 switch (handshakeStatus)
145 {
146 case NEED_UNWRAP:
147 if (netData.hasRemaining())
148 {
149 continue;
150 }
151 break;
152
153 case NEED_TASK:
154 sender.doTasks();
155 handshakeStatus = engine.getHandshakeStatus();
156
157 case NEED_WRAP:
158 case FINISHED:
159 case NOT_HANDSHAKING:
160 synchronized(notificationToken)
161 {
162 notificationToken.notifyAll();
163 }
164 break;
165
166 default:
167 throw new IllegalStateException("SSLReceiver: Invalid State " + status);
168 }
169
170
171 }
172 catch(SSLException e)
173 {
174 throw new TransportException("Error in SSLReceiver",e);
175 }
176
177 }
178 }
179 }
|