SSLReceiver.java
001 /*
002 *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.transport.network.ssl;
022 
023 import java.nio.ByteBuffer;
024 
025 import javax.net.ssl.SSLEngine;
026 import javax.net.ssl.SSLEngineResult;
027 import javax.net.ssl.SSLException;
028 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
029 import javax.net.ssl.SSLEngineResult.Status;
030 
031 import org.apache.qpid.transport.Receiver;
032 import org.apache.qpid.transport.TransportException;
033 import org.apache.qpid.transport.util.Logger;
034 
035 public class SSLReceiver implements Receiver<ByteBuffer>
036 {
037     private Receiver<ByteBuffer> delegate;
038     private SSLEngine engine;
039     private SSLSender sender;    
040     private int sslBufSize;
041     private ByteBuffer appData;
042     private ByteBuffer localBuffer;
043     private boolean dataCached = false;
044     private final Object notificationToken;
045     
046     private static final Logger log = Logger.get(SSLReceiver.class);
047     
048     public SSLReceiver(SSLEngine engine, Receiver<ByteBuffer> delegate,SSLSender sender)
049     {
050         this.engine = engine;
051         this.delegate = delegate;
052         this.sender = sender;
053         this.sslBufSize = engine.getSession().getApplicationBufferSize();   
054         appData = ByteBuffer.allocate(sslBufSize);
055         localBuffer = ByteBuffer.allocate(sslBufSize);
056         notificationToken = sender.getNotificationToken();
057     }
058     
059     public void closed()
060     {        
061        delegate.closed();
062     }
063 
064     public void exception(Throwable t)
065     {
066         delegate.exception(t);        
067     }
068     
069     private ByteBuffer addPreviouslyUnreadData(ByteBuffer buf)
070     {
071         if (dataCached)
072         {
073             ByteBuffer b = ByteBuffer.allocate(localBuffer.remaining() + buf.remaining());
074             b.put(localBuffer);
075             b.put(buf);
076             b.flip();
077             dataCached = false;
078             return b;
079         }
080         else
081         {
082             return buf;
083         }
084     }
085 
086     public void received(ByteBuffer buf)
087     {
088         ByteBuffer netData = addPreviouslyUnreadData(buf);
089         
090         HandshakeStatus handshakeStatus;
091         Status status;
092         
093         while (netData.hasRemaining())
094         {               
095             try
096             {
097                 SSLEngineResult result = engine.unwrap(netData, appData);
098                 int read = result.bytesProduced();
099                 status = result.getStatus();
100                 handshakeStatus = result.getHandshakeStatus();  
101                 
102                 if (read > 0)
103                 {
104                     int limit = appData.limit();
105                     appData.limit(appData.position());
106                     appData.position(appData.position() - read);
107                     
108                     ByteBuffer data = appData.slice();
109                     
110                     appData.limit(limit);
111                     appData.position(appData.position() + read);
112                     
113                     delegate.received(data);       
114                 }     
115                 
116                 
117                 switch(status
118                 {
119                     case CLOSED:
120                         synchronized(notificationToken)
121                         {
122                             notificationToken.notifyAll();
123                         }
124                         return;
125                     
126                     case BUFFER_OVERFLOW:
127                         appData = ByteBuffer.allocate(sslBufSize);
128                         continue;
129                      
130                     case BUFFER_UNDERFLOW:
131                         localBuffer.clear();
132                         localBuffer.put(netData);
133                         localBuffer.flip();
134                         dataCached = true;
135                         break;
136                         
137                     case OK:                        
138                         break// do nothing 
139                     
140                     default:
141                         throw new IllegalStateException("SSLReceiver: Invalid State " + status);
142                 }       
143                                
144                 switch (handshakeStatus)
145                 {
146                     case NEED_UNWRAP:
147                         if (netData.hasRemaining())
148                         {
149                             continue;
150                         }
151                         break;
152                     
153                     case NEED_TASK:
154                         sender.doTasks();
155                         handshakeStatus = engine.getHandshakeStatus();
156                        
157                     case NEED_WRAP: 
158                     case FINISHED:
159                     case NOT_HANDSHAKING:                        
160                         synchronized(notificationToken)
161                         {
162                             notificationToken.notifyAll();
163                         }
164                         break
165                         
166                     default:
167                         throw new IllegalStateException("SSLReceiver: Invalid State " + status);
168                 }
169                 
170                     
171             }
172             catch(SSLException e)
173             {
174                 throw new TransportException("Error in SSLReceiver",e);
175             }
176                
177         }
178     }
179 }