SSLSender.java
001 /*
002  * Licensed to the Apache Software Foundation (ASF) under one
003  * or more contributor license agreements.  See the NOTICE file
004  * distributed with this work for additional information
005  * regarding copyright ownership.  The ASF licenses this file
006  * to you under the Apache License, Version 2.0 (the
007  * "License"); you may not use this file except in compliance
008  * with the License.  You may obtain a copy of the License at
009  *
010  *   http://www.apache.org/licenses/LICENSE-2.0
011  *
012  * Unless required by applicable law or agreed to in writing,
013  * software distributed under the License is distributed on an
014  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015  * KIND, either express or implied.  See the License for the
016  * specific language governing permissions and limitations
017  * under the License.
018  *
019  */
020 package org.apache.qpid.transport.network.ssl;
021 
022 import java.nio.ByteBuffer;
023 import java.util.concurrent.atomic.AtomicBoolean;
024 
025 import javax.net.ssl.SSLEngine;
026 import javax.net.ssl.SSLEngineResult;
027 import javax.net.ssl.SSLException;
028 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
029 import javax.net.ssl.SSLEngineResult.Status;
030 
031 import org.apache.qpid.transport.Sender;
032 import org.apache.qpid.transport.SenderException;
033 import org.apache.qpid.transport.util.Logger;
034 
035 public class SSLSender implements Sender<ByteBuffer>
036 {
037     private Sender<ByteBuffer> delegate;
038     private SSLEngine engine;
039     private int sslBufSize;
040     private ByteBuffer netData;
041     
042     private final Object engineState = new Object();
043     private final AtomicBoolean closed = new AtomicBoolean(false);
044     
045     private static final Logger log = Logger.get(SSLSender.class);    
046     
047     public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate)
048     {
049         this.engine = engine;
050         this.delegate = delegate;        
051         sslBufSize = engine.getSession().getPacketBufferSize();
052         netData = ByteBuffer.allocate(sslBufSize);      
053     }
054 
055     public void close()
056     {
057         if (!closed.getAndSet(true))
058         {
059             if (engine.isOutboundDone())
060             {
061                 return;
062             }
063             log.debug("Closing SSL connection");
064             engine.closeOutbound();
065             try
066             {
067                 tearDownSSLConnection();            
068             }
069             catch(Exception e)
070             {
071                 throw new SenderException("Error closing SSL connection",e);
072             }
073             
074             while (!engine.isOutboundDone())
075             {
076                 synchronized(engineState)
077                 {
078                     try
079                     {
080                         engineState.wait();
081                     }
082                     catch(InterruptedException e)
083                     {
084                         // pass
085                     }
086                 }
087             }
088             delegate.close();
089         }
090     }
091 
092     private void tearDownSSLConnection() throws Exception
093     {
094         SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData);
095         Status status = result.getStatus();
096         int read   = result.bytesProduced();
097         while (status != Status.CLOSED)
098         {
099             if (status == Status.BUFFER_OVERFLOW)
100             {
101                 netData.clear();
102             }
103             if(read > 0)
104             {
105                 int limit = netData.limit();
106                 netData.limit(netData.position());
107                 netData.position(netData.position() - read);
108                 
109                 ByteBuffer data = netData.slice();
110                 
111                 netData.limit(limit);
112                 netData.position(netData.position() + read);
113                 
114                 delegate.send(data);
115                 flush();
116             }            
117             result = engine.wrap(ByteBuffer.allocate(0), netData);
118             status = result.getStatus();             
119             read   = result.bytesProduced();
120         }
121     }
122     
123     public void flush()
124     {
125         delegate.flush();        
126     }
127 
128     public void send(ByteBuffer appData)
129     {
130         if (closed.get())
131         {
132             throw new SenderException("SSL Sender is closed");
133         }
134 
135         HandshakeStatus handshakeStatus;
136         Status status;
137         
138         while(appData.hasRemaining())
139         {        
140 
141             int read = 0;
142             try
143             {
144                 SSLEngineResult result = engine.wrap(appData, netData);        
145                 read   = result.bytesProduced();
146                 status = result.getStatus();
147                 handshakeStatus = result.getHandshakeStatus();
148                 
149             }
150             catch(SSLException e)
151             {
152                 throw new SenderException("SSL, Error occurred while encrypting data",e);
153             }            
154             
155             if(read > 0)
156             {
157                 int limit = netData.limit();
158                 netData.limit(netData.position());
159                 netData.position(netData.position() - read);
160                 
161                 ByteBuffer data = netData.slice();
162                 
163                 netData.limit(limit);
164                 netData.position(netData.position() + read);
165                 
166                 delegate.send(data);
167             }
168             
169             switch(status
170             {
171                 case CLOSED:
172                     throw new SenderException("SSLEngine is closed");
173                 
174                 case BUFFER_OVERFLOW:
175                     netData.clear();
176                     continue;
177                     
178                 case OK:                        
179                     break// do nothing 
180                 
181                 default:
182                     throw new IllegalStateException("SSLReceiver: Invalid State " + status);
183             }          
184             
185             switch (handshakeStatus)
186             {
187                 case NEED_WRAP:
188                     if (netData.hasRemaining())
189                     {
190                         continue;
191                     }
192                 
193                 case NEED_TASK:
194                     doTasks();
195                     break;
196                    
197                 case NEED_UNWRAP:
198                     flush();
199                     synchronized(engineState)
200                     {
201                         try
202                         {
203                             engineState.wait();
204                         }
205                         catch(InterruptedException e)
206                         {
207                             // pass
208                         }
209                     }
210                     break;
211                     
212                 case FINISHED:                     
213                 case NOT_HANDSHAKING:
214                     break//do  nothing
215                       
216                 default:
217                     throw new IllegalStateException("SSLReceiver: Invalid State " + status);
218             }
219             
220         }
221     }
222     
223     public void doTasks() 
224     {
225         Runnable runnable;
226         while ((runnable = engine.getDelegatedTask()) != null) {
227             runnable.run();
228         }
229     }    
230     
231     public Object getNotificationToken()
232     {
233         return engineState;
234     }
235     
236     public void setIdleTimeout(long l)
237     {
238         delegate.setIdleTimeout(l);
239     }
240 }