001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 *
019 */
020 package org.apache.qpid.transport.network.ssl;
021
022 import java.nio.ByteBuffer;
023 import java.util.concurrent.atomic.AtomicBoolean;
024
025 import javax.net.ssl.SSLEngine;
026 import javax.net.ssl.SSLEngineResult;
027 import javax.net.ssl.SSLException;
028 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
029 import javax.net.ssl.SSLEngineResult.Status;
030
031 import org.apache.qpid.transport.Sender;
032 import org.apache.qpid.transport.SenderException;
033 import org.apache.qpid.transport.util.Logger;
034
035 public class SSLSender implements Sender<ByteBuffer>
036 {
037 private Sender<ByteBuffer> delegate;
038 private SSLEngine engine;
039 private int sslBufSize;
040 private ByteBuffer netData;
041
042 private final Object engineState = new Object();
043 private final AtomicBoolean closed = new AtomicBoolean(false);
044
045 private static final Logger log = Logger.get(SSLSender.class);
046
047 public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate)
048 {
049 this.engine = engine;
050 this.delegate = delegate;
051 sslBufSize = engine.getSession().getPacketBufferSize();
052 netData = ByteBuffer.allocate(sslBufSize);
053 }
054
055 public void close()
056 {
057 if (!closed.getAndSet(true))
058 {
059 if (engine.isOutboundDone())
060 {
061 return;
062 }
063 log.debug("Closing SSL connection");
064 engine.closeOutbound();
065 try
066 {
067 tearDownSSLConnection();
068 }
069 catch(Exception e)
070 {
071 throw new SenderException("Error closing SSL connection",e);
072 }
073
074 while (!engine.isOutboundDone())
075 {
076 synchronized(engineState)
077 {
078 try
079 {
080 engineState.wait();
081 }
082 catch(InterruptedException e)
083 {
084 // pass
085 }
086 }
087 }
088 delegate.close();
089 }
090 }
091
092 private void tearDownSSLConnection() throws Exception
093 {
094 SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData);
095 Status status = result.getStatus();
096 int read = result.bytesProduced();
097 while (status != Status.CLOSED)
098 {
099 if (status == Status.BUFFER_OVERFLOW)
100 {
101 netData.clear();
102 }
103 if(read > 0)
104 {
105 int limit = netData.limit();
106 netData.limit(netData.position());
107 netData.position(netData.position() - read);
108
109 ByteBuffer data = netData.slice();
110
111 netData.limit(limit);
112 netData.position(netData.position() + read);
113
114 delegate.send(data);
115 flush();
116 }
117 result = engine.wrap(ByteBuffer.allocate(0), netData);
118 status = result.getStatus();
119 read = result.bytesProduced();
120 }
121 }
122
123 public void flush()
124 {
125 delegate.flush();
126 }
127
128 public void send(ByteBuffer appData)
129 {
130 if (closed.get())
131 {
132 throw new SenderException("SSL Sender is closed");
133 }
134
135 HandshakeStatus handshakeStatus;
136 Status status;
137
138 while(appData.hasRemaining())
139 {
140
141 int read = 0;
142 try
143 {
144 SSLEngineResult result = engine.wrap(appData, netData);
145 read = result.bytesProduced();
146 status = result.getStatus();
147 handshakeStatus = result.getHandshakeStatus();
148
149 }
150 catch(SSLException e)
151 {
152 throw new SenderException("SSL, Error occurred while encrypting data",e);
153 }
154
155 if(read > 0)
156 {
157 int limit = netData.limit();
158 netData.limit(netData.position());
159 netData.position(netData.position() - read);
160
161 ByteBuffer data = netData.slice();
162
163 netData.limit(limit);
164 netData.position(netData.position() + read);
165
166 delegate.send(data);
167 }
168
169 switch(status)
170 {
171 case CLOSED:
172 throw new SenderException("SSLEngine is closed");
173
174 case BUFFER_OVERFLOW:
175 netData.clear();
176 continue;
177
178 case OK:
179 break; // do nothing
180
181 default:
182 throw new IllegalStateException("SSLReceiver: Invalid State " + status);
183 }
184
185 switch (handshakeStatus)
186 {
187 case NEED_WRAP:
188 if (netData.hasRemaining())
189 {
190 continue;
191 }
192
193 case NEED_TASK:
194 doTasks();
195 break;
196
197 case NEED_UNWRAP:
198 flush();
199 synchronized(engineState)
200 {
201 try
202 {
203 engineState.wait();
204 }
205 catch(InterruptedException e)
206 {
207 // pass
208 }
209 }
210 break;
211
212 case FINISHED:
213 case NOT_HANDSHAKING:
214 break; //do nothing
215
216 default:
217 throw new IllegalStateException("SSLReceiver: Invalid State " + status);
218 }
219
220 }
221 }
222
223 public void doTasks()
224 {
225 Runnable runnable;
226 while ((runnable = engine.getDelegatedTask()) != null) {
227 runnable.run();
228 }
229 }
230
231 public Object getNotificationToken()
232 {
233 return engineState;
234 }
235
236 public void setIdleTimeout(long l)
237 {
238 delegate.setIdleTimeout(l);
239 }
240 }
|