001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.transport;
022
023 import org.apache.qpid.transport.network.ConnectionBinding;
024 import org.apache.qpid.transport.network.io.IoTransport;
025 import org.apache.qpid.transport.util.Logger;
026 import org.apache.qpid.transport.util.Waiter;
027 import org.apache.qpid.util.Strings;
028
029 import java.util.ArrayList;
030 import java.util.HashMap;
031 import java.util.List;
032 import java.util.Map;
033
034 import java.util.UUID;
035
036 import javax.security.sasl.SaslClient;
037 import javax.security.sasl.SaslServer;
038
039 import static org.apache.qpid.transport.Connection.State.*;
040
041
042 /**
043 * Connection
044 *
045 * @author Rafael H. Schloming
046 *
047 * @todo the channels map should probably be replaced with something
048 * more efficient, e.g. an array or a map implementation that can use
049 * short instead of Short
050 */
051
052 public class Connection extends ConnectionInvoker
053 implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
054 {
055
056 private static final Logger log = Logger.get(Connection.class);
057
058 enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
059
060 class DefaultConnectionListener implements ConnectionListener
061 {
062 public void opened(Connection conn) {}
063 public void exception(Connection conn, ConnectionException exception)
064 {
065 log.error(exception, "connection exception");
066 }
067 public void closed(Connection conn) {}
068 }
069
070 private ConnectionDelegate delegate;
071 private Sender<ProtocolEvent> sender;
072
073 final private Map<Binary,Session> sessions = new HashMap<Binary,Session>();
074 final private Map<Integer,Session> channels = new HashMap<Integer,Session>();
075
076 private State state = NEW;
077 private Object lock = new Object();
078 private long timeout = 60000;
079 private ConnectionListener listener = new DefaultConnectionListener();
080 private ConnectionException error = null;
081
082 private int channelMax = 1;
083 private String locale;
084 private SaslServer saslServer;
085 private SaslClient saslClient;
086 private long idleTimeout = 0;
087
088 // want to make this final
089 private int _connectionId;
090
091 public Connection() {}
092
093 public void setConnectionDelegate(ConnectionDelegate delegate)
094 {
095 this.delegate = delegate;
096 }
097
098 public void setConnectionListener(ConnectionListener listener)
099 {
100 if (listener == null)
101 {
102 this.listener = new DefaultConnectionListener();
103 }
104 else
105 {
106 this.listener = listener;
107 }
108 }
109
110 public Sender<ProtocolEvent> getSender()
111 {
112 return sender;
113 }
114
115 public void setSender(Sender<ProtocolEvent> sender)
116 {
117 this.sender = sender;
118 sender.setIdleTimeout(idleTimeout);
119 }
120
121 void setState(State state)
122 {
123 synchronized (lock)
124 {
125 this.state = state;
126 lock.notifyAll();
127 }
128 }
129
130 void setLocale(String locale)
131 {
132 this.locale = locale;
133 }
134
135 String getLocale()
136 {
137 return locale;
138 }
139
140 void setSaslServer(SaslServer saslServer)
141 {
142 this.saslServer = saslServer;
143 }
144
145 SaslServer getSaslServer()
146 {
147 return saslServer;
148 }
149
150 void setSaslClient(SaslClient saslClient)
151 {
152 this.saslClient = saslClient;
153 }
154
155 SaslClient getSaslClient()
156 {
157 return saslClient;
158 }
159
160 public void connect(String host, int port, String vhost, String username, String password)
161 {
162 connect(host, port, vhost, username, password, false);
163 }
164
165 public void connect(String host, int port, String vhost, String username, String password, boolean ssl)
166 {
167 connect(host, port, vhost, username, password, false,"PLAIN");
168 }
169
170 public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs)
171 {
172 synchronized (lock)
173 {
174 state = OPENING;
175
176 delegate = new ClientDelegate(vhost, username, password,saslMechs);
177
178 IoTransport.connect(host, port, ConnectionBinding.get(this), ssl);
179 send(new ProtocolHeader(1, 0, 10));
180
181 Waiter w = new Waiter(lock, timeout);
182 while (w.hasTime() && state == OPENING && error == null)
183 {
184 w.await();
185 }
186
187 if (error != null)
188 {
189 ConnectionException t = error;
190 error = null;
191 try
192 {
193 close();
194 }
195 catch (ConnectionException ce)
196 {
197 if (!(t instanceof ProtocolVersionException))
198 {
199 throw ce;
200 }
201 }
202 t.rethrow();
203 }
204
205 switch (state)
206 {
207 case OPENING:
208 close();
209 throw new ConnectionException("connect() timed out");
210 case OPEN:
211 break;
212 case CLOSED:
213 throw new ConnectionException("connect() aborted");
214 default:
215 throw new IllegalStateException(String.valueOf(state));
216 }
217 }
218
219 listener.opened(this);
220 }
221
222 public Session createSession()
223 {
224 return createSession(0);
225 }
226
227 public Session createSession(long expiry)
228 {
229 return createSession(UUID.randomUUID().toString(), expiry);
230 }
231
232 public Session createSession(String name)
233 {
234 return createSession(name, 0);
235 }
236
237 public Session createSession(String name, long expiry)
238 {
239 return createSession(Strings.toUTF8(name), expiry);
240 }
241
242 public Session createSession(byte[] name, long expiry)
243 {
244 return createSession(new Binary(name), expiry);
245 }
246
247 public Session createSession(Binary name, long expiry)
248 {
249 synchronized (lock)
250 {
251 Session ssn = new Session(this, name, expiry);
252 sessions.put(name, ssn);
253 map(ssn);
254 ssn.attach();
255 return ssn;
256 }
257 }
258
259 void removeSession(Session ssn)
260 {
261 synchronized (lock)
262 {
263 sessions.remove(ssn.getName());
264 }
265 }
266
267 public void setConnectionId(int id)
268 {
269 _connectionId = id;
270 }
271
272 public int getConnectionId()
273 {
274 return _connectionId;
275 }
276
277 public ConnectionDelegate getConnectionDelegate()
278 {
279 return delegate;
280 }
281
282 public void received(ProtocolEvent event)
283 {
284 log.debug("RECV: [%s] %s", this, event);
285 event.delegate(this, delegate);
286 }
287
288 public void send(ProtocolEvent event)
289 {
290 log.debug("SEND: [%s] %s", this, event);
291 Sender s = sender;
292 if (s == null)
293 {
294 throw new ConnectionException("connection closed");
295 }
296 s.send(event);
297 }
298
299 public void flush()
300 {
301 log.debug("FLUSH: [%s]", this);
302 sender.flush();
303 }
304
305 protected void invoke(Method method)
306 {
307 method.setChannel(0);
308 send(method);
309 if (!method.isBatch())
310 {
311 flush();
312 }
313 }
314
315 public void dispatch(Method method)
316 {
317 Session ssn = getSession(method.getChannel());
318 ssn.received(method);
319 }
320
321 public int getChannelMax()
322 {
323 return channelMax;
324 }
325
326 void setChannelMax(int max)
327 {
328 channelMax = max;
329 }
330
331 private int map(Session ssn)
332 {
333 synchronized (lock)
334 {
335 for (int i = 0; i < getChannelMax(); i++)
336 {
337 if (!channels.containsKey(i))
338 {
339 map(ssn, i);
340 return i;
341 }
342 }
343
344 throw new RuntimeException("no more channels available");
345 }
346 }
347
348 void map(Session ssn, int channel)
349 {
350 synchronized (lock)
351 {
352 channels.put(channel, ssn);
353 ssn.setChannel(channel);
354 }
355 }
356
357 void unmap(Session ssn)
358 {
359 synchronized (lock)
360 {
361 channels.remove(ssn.getChannel());
362 }
363 }
364
365 Session getSession(int channel)
366 {
367 synchronized (lock)
368 {
369 return channels.get(channel);
370 }
371 }
372
373 public void resume()
374 {
375 synchronized (lock)
376 {
377 for (Session ssn : sessions.values())
378 {
379 map(ssn);
380 ssn.attach();
381 ssn.resume();
382 }
383 }
384 }
385
386 public void exception(ConnectionException e)
387 {
388 synchronized (lock)
389 {
390 switch (state)
391 {
392 case OPENING:
393 case CLOSING:
394 error = e;
395 lock.notifyAll();
396 return;
397 }
398 }
399
400 listener.exception(this, e);
401 }
402
403 public void exception(Throwable t)
404 {
405 exception(new ConnectionException(t));
406 }
407
408 void closeCode(ConnectionClose close)
409 {
410 synchronized (lock)
411 {
412 for (Session ssn : channels.values())
413 {
414 ssn.closeCode(close);
415 }
416 ConnectionCloseCode code = close.getReplyCode();
417 if (code != ConnectionCloseCode.NORMAL)
418 {
419 exception(new ConnectionException(close));
420 }
421 }
422 }
423
424 public void closed()
425 {
426 if (state == OPEN)
427 {
428 exception(new ConnectionException("connection aborted"));
429 }
430
431 log.debug("connection closed: %s", this);
432
433 synchronized (lock)
434 {
435 List<Session> values = new ArrayList<Session>(channels.values());
436 for (Session ssn : values)
437 {
438 ssn.closed();
439 }
440
441 sender = null;
442 setState(CLOSED);
443 }
444
445 listener.closed(this);
446 }
447
448 public void close()
449 {
450 synchronized (lock)
451 {
452 switch (state)
453 {
454 case OPEN:
455 state = CLOSING;
456 connectionClose(ConnectionCloseCode.NORMAL, null);
457 Waiter w = new Waiter(lock, timeout);
458 while (w.hasTime() && state == CLOSING && error == null)
459 {
460 w.await();
461 }
462
463 if (error != null)
464 {
465 close();
466 throw new ConnectionException(error);
467 }
468
469 switch (state)
470 {
471 case CLOSING:
472 close();
473 throw new ConnectionException("close() timed out");
474 case CLOSED:
475 break;
476 default:
477 throw new IllegalStateException(String.valueOf(state));
478 }
479 break;
480 case CLOSED:
481 break;
482 default:
483 if (sender != null)
484 {
485 sender.close();
486 w = new Waiter(lock, timeout);
487 while (w.hasTime() && sender != null && error == null)
488 {
489 w.await();
490 }
491
492 if (error != null)
493 {
494 throw new ConnectionException(error);
495 }
496
497 if (sender != null)
498 {
499 throw new ConnectionException("close() timed out");
500 }
501 }
502 break;
503 }
504 }
505 }
506
507 public void setIdleTimeout(long l)
508 {
509 idleTimeout = l;
510 if (sender != null)
511 {
512 sender.setIdleTimeout(l);
513 }
514 }
515
516 public long getIdleTimeout()
517 {
518 return idleTimeout;
519 }
520
521 public String toString()
522 {
523 return String.format("conn:%x", System.identityHashCode(this));
524 }
525
526 }
|