Connection.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.transport;
022 
023 import org.apache.qpid.transport.network.ConnectionBinding;
024 import org.apache.qpid.transport.network.io.IoTransport;
025 import org.apache.qpid.transport.util.Logger;
026 import org.apache.qpid.transport.util.Waiter;
027 import org.apache.qpid.util.Strings;
028 
029 import java.util.ArrayList;
030 import java.util.HashMap;
031 import java.util.List;
032 import java.util.Map;
033 
034 import java.util.UUID;
035 
036 import javax.security.sasl.SaslClient;
037 import javax.security.sasl.SaslServer;
038 
039 import static org.apache.qpid.transport.Connection.State.*;
040 
041 
042 /**
043  * Connection
044  *
045  @author Rafael H. Schloming
046  *
047  * @todo the channels map should probably be replaced with something
048  * more efficient, e.g. an array or a map implementation that can use
049  * short instead of Short
050  */
051 
052 public class Connection extends ConnectionInvoker
053     implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
054 {
055 
056     private static final Logger log = Logger.get(Connection.class);
057 
058     enum State NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
059 
060     class DefaultConnectionListener implements ConnectionListener
061     {
062         public void opened(Connection conn) {}
063         public void exception(Connection conn, ConnectionException exception)
064         {
065             log.error(exception, "connection exception");
066         }
067         public void closed(Connection conn) {}
068     }
069 
070     private ConnectionDelegate delegate;
071     private Sender<ProtocolEvent> sender;
072 
073     final private Map<Binary,Session> sessions = new HashMap<Binary,Session>();
074     final private Map<Integer,Session> channels = new HashMap<Integer,Session>();
075 
076     private State state = NEW;
077     private Object lock = new Object();
078     private long timeout = 60000;
079     private ConnectionListener listener = new DefaultConnectionListener();
080     private ConnectionException error = null;
081 
082     private int channelMax = 1;
083     private String locale;
084     private SaslServer saslServer;
085     private SaslClient saslClient;
086     private long idleTimeout = 0;
087     
088     // want to make this final
089     private int _connectionId;
090 
091     public Connection() {}
092 
093     public void setConnectionDelegate(ConnectionDelegate delegate)
094     {
095         this.delegate = delegate;
096     }
097 
098     public void setConnectionListener(ConnectionListener listener)
099     {
100         if (listener == null)
101         {
102             this.listener = new DefaultConnectionListener();
103         }
104         else
105         {
106             this.listener = listener;
107         }
108     }
109 
110     public Sender<ProtocolEvent> getSender()
111     {
112         return sender;
113     }
114 
115     public void setSender(Sender<ProtocolEvent> sender)
116     {
117         this.sender = sender;
118         sender.setIdleTimeout(idleTimeout);         
119     }
120 
121     void setState(State state)
122     {
123         synchronized (lock)
124         {
125             this.state = state;
126             lock.notifyAll();
127         }
128     }
129 
130     void setLocale(String locale)
131     {
132         this.locale = locale;
133     }
134 
135     String getLocale()
136     {
137         return locale;
138     }
139 
140     void setSaslServer(SaslServer saslServer)
141     {
142         this.saslServer = saslServer;
143     }
144 
145     SaslServer getSaslServer()
146     {
147         return saslServer;
148     }
149 
150     void setSaslClient(SaslClient saslClient)
151     {
152         this.saslClient = saslClient;
153     }
154 
155     SaslClient getSaslClient()
156     {
157         return saslClient;
158     }
159 
160     public void connect(String host, int port, String vhost, String username, String password)
161     {
162         connect(host, port, vhost, username, password, false);
163     }
164     
165     public void connect(String host, int port, String vhost, String username, String password, boolean ssl)
166     {
167         connect(host, port, vhost, username, password, false,"PLAIN");
168     }
169 
170     public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs)
171     {
172         synchronized (lock)
173         {
174             state = OPENING;
175 
176             delegate = new ClientDelegate(vhost, username, password,saslMechs);
177 
178             IoTransport.connect(host, port, ConnectionBinding.get(this), ssl);
179             send(new ProtocolHeader(1010));
180 
181             Waiter w = new Waiter(lock, timeout);
182             while (w.hasTime() && state == OPENING && error == null)
183             {
184                 w.await();
185             }
186 
187             if (error != null)
188             {
189                 ConnectionException t = error;
190                 error = null;
191                 try
192                 {
193                     close();
194                 }
195                 catch (ConnectionException ce)
196                 {
197                     if (!(instanceof ProtocolVersionException))
198                     {
199                         throw ce;
200                     }
201                 }
202                 t.rethrow();
203             }
204 
205             switch (state)
206             {
207             case OPENING:
208                 close();
209                 throw new ConnectionException("connect() timed out");
210             case OPEN:
211                 break;
212             case CLOSED:
213                 throw new ConnectionException("connect() aborted");
214             default:
215                 throw new IllegalStateException(String.valueOf(state));
216             }
217         }
218 
219         listener.opened(this);
220     }
221 
222     public Session createSession()
223     {
224         return createSession(0);
225     }
226 
227     public Session createSession(long expiry)
228     {
229         return createSession(UUID.randomUUID().toString(), expiry);
230     }
231 
232     public Session createSession(String name)
233     {
234         return createSession(name, 0);
235     }
236 
237     public Session createSession(String name, long expiry)
238     {
239         return createSession(Strings.toUTF8(name), expiry);
240     }
241 
242     public Session createSession(byte[] name, long expiry)
243     {
244         return createSession(new Binary(name), expiry);
245     }
246 
247     public Session createSession(Binary name, long expiry)
248     {
249         synchronized (lock)
250         {
251             Session ssn = new Session(this, name, expiry);
252             sessions.put(name, ssn);
253             map(ssn);
254             ssn.attach();
255             return ssn;
256         }
257     }
258 
259     void removeSession(Session ssn)
260     {
261         synchronized (lock)
262         {
263             sessions.remove(ssn.getName());
264         }
265     }
266 
267     public void setConnectionId(int id)
268     {
269         _connectionId = id;
270     }
271 
272     public int getConnectionId()
273     {
274         return _connectionId;
275     }
276 
277     public ConnectionDelegate getConnectionDelegate()
278     {
279         return delegate;
280     }
281 
282     public void received(ProtocolEvent event)
283     {
284         log.debug("RECV: [%s] %s", this, event);
285         event.delegate(this, delegate);
286     }
287 
288     public void send(ProtocolEvent event)
289     {
290         log.debug("SEND: [%s] %s", this, event);
291         Sender s = sender;
292         if (s == null)
293         {
294             throw new ConnectionException("connection closed");
295         }
296         s.send(event);
297     }
298 
299     public void flush()
300     {
301         log.debug("FLUSH: [%s]"this);
302         sender.flush();
303     }
304 
305     protected void invoke(Method method)
306     {
307         method.setChannel(0);
308         send(method);
309         if (!method.isBatch())
310         {
311             flush();
312         }
313     }
314 
315     public void dispatch(Method method)
316     {
317         Session ssn = getSession(method.getChannel());
318         ssn.received(method);
319     }
320 
321     public int getChannelMax()
322     {
323         return channelMax;
324     }
325 
326     void setChannelMax(int max)
327     {
328         channelMax = max;
329     }
330 
331     private int map(Session ssn)
332     {
333         synchronized (lock)
334         {
335             for (int i = 0; i < getChannelMax(); i++)
336             {
337                 if (!channels.containsKey(i))
338                 {
339                     map(ssn, i);
340                     return i;
341                 }
342             }
343 
344             throw new RuntimeException("no more channels available");
345         }
346     }
347 
348     void map(Session ssn, int channel)
349     {
350         synchronized (lock)
351         {
352             channels.put(channel, ssn);
353             ssn.setChannel(channel);
354         }
355     }
356 
357     void unmap(Session ssn)
358     {
359         synchronized (lock)
360         {
361             channels.remove(ssn.getChannel());
362         }
363     }
364 
365     Session getSession(int channel)
366     {
367         synchronized (lock)
368         {
369             return channels.get(channel);
370         }
371     }
372 
373     public void resume()
374     {
375         synchronized (lock)
376         {
377             for (Session ssn : sessions.values())
378             {
379                 map(ssn);
380                 ssn.attach();
381                 ssn.resume();
382             }
383         }
384     }
385 
386     public void exception(ConnectionException e)
387     {
388         synchronized (lock)
389         {
390             switch (state)
391             {
392             case OPENING:
393             case CLOSING:
394                 error = e;
395                 lock.notifyAll();
396                 return;
397             }
398         }
399 
400         listener.exception(this, e);
401     }
402 
403     public void exception(Throwable t)
404     {
405         exception(new ConnectionException(t));
406     }
407 
408     void closeCode(ConnectionClose close)
409     {
410         synchronized (lock)
411         {
412             for (Session ssn : channels.values())
413             {
414                 ssn.closeCode(close);
415             }
416             ConnectionCloseCode code = close.getReplyCode();
417             if (code != ConnectionCloseCode.NORMAL)
418             {
419                 exception(new ConnectionException(close));
420             }
421         }
422     }
423 
424     public void closed()
425     {
426         if (state == OPEN)
427         {
428             exception(new ConnectionException("connection aborted"));
429         }
430 
431         log.debug("connection closed: %s"this);
432 
433         synchronized (lock)
434         {
435             List<Session> values = new ArrayList<Session>(channels.values());
436             for (Session ssn : values)
437             {
438                 ssn.closed();
439             }
440 
441             sender = null;
442             setState(CLOSED);
443         }
444 
445         listener.closed(this);
446     }
447 
448     public void close()
449     {
450         synchronized (lock)
451         {
452             switch (state)
453             {
454             case OPEN:
455                 state = CLOSING;
456                 connectionClose(ConnectionCloseCode.NORMAL, null);
457                 Waiter w = new Waiter(lock, timeout);
458                 while (w.hasTime() && state == CLOSING && error == null)
459                 {
460                     w.await();
461                 }
462 
463                 if (error != null)
464                 {
465                     close();
466                     throw new ConnectionException(error);
467                 }
468 
469                 switch (state)
470                 {
471                 case CLOSING:
472                     close();
473                     throw new ConnectionException("close() timed out");
474                 case CLOSED:
475                     break;
476                 default:
477                     throw new IllegalStateException(String.valueOf(state));
478                 }
479                 break;
480             case CLOSED:
481                 break;
482             default:
483                 if (sender != null)
484                 {
485                     sender.close();
486                     w = new Waiter(lock, timeout);
487                     while (w.hasTime() && sender != null && error == null)
488                     {
489                         w.await();
490                     }
491 
492                     if (error != null)
493                     {
494                         throw new ConnectionException(error);
495                     }
496 
497                     if (sender != null)
498                     {
499                         throw new ConnectionException("close() timed out");
500                     }
501                 }
502                 break;
503             }
504         }
505     }
506 
507     public void setIdleTimeout(long l)
508     {
509         idleTimeout = l;       
510         if (sender != null)
511         {            
512             sender.setIdleTimeout(l);    
513         }
514     }
515     
516     public long getIdleTimeout()
517     {
518         return idleTimeout;
519     }
520     
521     public String toString()
522     {
523         return String.format("conn:%x", System.identityHashCode(this));
524     }
525 
526 }