001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.transport;
022
023
024 import org.apache.qpid.transport.network.Frame;
025
026 import org.apache.qpid.transport.util.Logger;
027 import org.apache.qpid.transport.util.Waiter;
028
029 import java.nio.ByteBuffer;
030 import java.util.ArrayList;
031 import java.util.Arrays;
032 import java.util.HashMap;
033 import java.util.List;
034 import java.util.Map;
035 import java.util.concurrent.atomic.AtomicBoolean;
036 import java.util.concurrent.Semaphore;
037 import java.util.concurrent.TimeUnit;
038
039 import static org.apache.qpid.transport.Option.*;
040 import static org.apache.qpid.transport.Session.State.*;
041 import static org.apache.qpid.transport.util.Functions.*;
042 import static org.apache.qpid.util.Serial.*;
043 import static org.apache.qpid.util.Strings.*;
044
045 /**
046 * Session
047 *
048 * @author Rafael H. Schloming
049 */
050
051 public class Session extends SessionInvoker
052 {
053
054 private static final Logger log = Logger.get(Session.class);
055
056 enum State { NEW, DETACHED, OPEN, CLOSING, CLOSED }
057
058 class DefaultSessionListener implements SessionListener
059 {
060
061 public void opened(Session ssn) {}
062
063 public void message(Session ssn, MessageTransfer xfr)
064 {
065 log.info("message: %s", xfr);
066 }
067
068 public void exception(Session ssn, SessionException exc)
069 {
070 log.error(exc, "session exception");
071 }
072
073 public void closed(Session ssn) {}
074 }
075
076 public static final int UNLIMITED_CREDIT = 0xFFFFFFFF;
077
078 private Connection connection;
079 private Binary name;
080 private long expiry;
081 private int channel;
082 private SessionDelegate delegate = new SessionDelegate();
083 private SessionListener listener = new DefaultSessionListener();
084 private long timeout = 60000;
085 private boolean autoSync = false;
086
087 private boolean incomingInit;
088 // incoming command count
089 private int commandsIn;
090 // completed incoming commands
091 private final Object processedLock = new Object();
092 private RangeSet processed;
093 private int maxProcessed;
094 private int syncPoint;
095
096 // outgoing command count
097 private int commandsOut = 0;
098 private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)];
099 private int commandBytes = 0;
100 private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024);
101 private int maxComplete = commandsOut - 1;
102 private boolean needSync = false;
103
104 private State state = NEW;
105
106 // transfer flow control
107 private volatile boolean flowControl = false;
108 private Semaphore credit = new Semaphore(0);
109
110 Session(Connection connection, Binary name, long expiry)
111 {
112 this.connection = connection;
113 this.name = name;
114 this.expiry = expiry;
115 initReceiver();
116 }
117
118 public Connection getConnection()
119 {
120 return connection;
121 }
122
123 public Binary getName()
124 {
125 return name;
126 }
127
128 void setExpiry(long expiry)
129 {
130 this.expiry = expiry;
131 }
132
133 int getChannel()
134 {
135 return channel;
136 }
137
138 void setChannel(int channel)
139 {
140 this.channel = channel;
141 }
142
143 public void setSessionListener(SessionListener listener)
144 {
145 if (listener == null)
146 {
147 this.listener = new DefaultSessionListener();
148 }
149 else
150 {
151 this.listener = listener;
152 }
153 }
154
155 public SessionListener getSessionListener()
156 {
157 return listener;
158 }
159
160 public void setAutoSync(boolean value)
161 {
162 synchronized (commands)
163 {
164 this.autoSync = value;
165 }
166 }
167
168 void setState(State state)
169 {
170 synchronized (commands)
171 {
172 this.state = state;
173 commands.notifyAll();
174 }
175 }
176
177 void setFlowControl(boolean value)
178 {
179 flowControl = value;
180 }
181
182 void addCredit(int value)
183 {
184 credit.release(value);
185 }
186
187 void drainCredit()
188 {
189 credit.drainPermits();
190 }
191
192 void acquireCredit()
193 {
194 if (flowControl)
195 {
196 try
197 {
198 if (!credit.tryAcquire(timeout, TimeUnit.MILLISECONDS))
199 {
200 throw new SessionException
201 ("timed out waiting for message credit");
202 }
203 }
204 catch (InterruptedException e)
205 {
206 throw new SessionException
207 ("interrupted while waiting for credit", null, e);
208 }
209 }
210 }
211
212 private void initReceiver()
213 {
214 synchronized (processedLock)
215 {
216 incomingInit = false;
217 processed = new RangeSet();
218 }
219 }
220
221 void attach()
222 {
223 initReceiver();
224 sessionAttach(name.getBytes());
225 // XXX: when the broker and client support full session
226 // recovery we should use expiry as the requested timeout
227 sessionRequestTimeout(0);
228 }
229
230 void resume()
231 {
232 synchronized (commands)
233 {
234 for (int i = maxComplete + 1; lt(i, commandsOut); i++)
235 {
236 Method m = commands[mod(i, commands.length)];
237 if (m != null)
238 {
239 sessionCommandPoint(m.getId(), 0);
240 send(m);
241 }
242 }
243
244 sessionCommandPoint(commandsOut, 0);
245 sessionFlush(COMPLETED);
246 }
247 }
248
249 void dump()
250 {
251 synchronized (commands)
252 {
253 for (Method m : commands)
254 {
255 if (m != null)
256 {
257 System.out.println(m);
258 }
259 }
260 }
261 }
262
263 final void commandPoint(int id)
264 {
265 synchronized (processedLock)
266 {
267 this.commandsIn = id;
268 if (!incomingInit)
269 {
270 incomingInit = true;
271 maxProcessed = commandsIn - 1;
272 syncPoint = maxProcessed;
273 }
274 }
275 }
276
277 public int getCommandsOut()
278 {
279 return commandsOut;
280 }
281
282 public int getCommandsIn()
283 {
284 return commandsIn;
285 }
286
287 public int nextCommandId()
288 {
289 return commandsIn++;
290 }
291
292 final void identify(Method cmd)
293 {
294 if (!incomingInit)
295 {
296 throw new IllegalStateException();
297 }
298
299 int id = nextCommandId();
300 cmd.setId(id);
301
302 if(log.isDebugEnabled())
303 {
304 log.debug("ID: [%s] %s", this.channel, id);
305 }
306
307 //if ((id % 65536) == 0)
308 if ((id & 0xff) == 0)
309 {
310 flushProcessed(TIMELY_REPLY);
311 }
312 }
313
314 public void processed(Method command)
315 {
316 processed(command.getId());
317 }
318
319 public void processed(int command)
320 {
321 processed(new Range(command, command));
322 }
323
324 public void processed(int lower, int upper)
325 {
326
327 processed(new Range(lower, upper));
328 }
329
330 public void processed(Range range)
331 {
332 log.debug("%s processed(%s) %s %s", this, range, syncPoint, maxProcessed);
333
334 boolean flush;
335 synchronized (processedLock)
336 {
337 log.debug("%s", processed);
338
339 if (ge(range.getUpper(), commandsIn))
340 {
341 throw new IllegalArgumentException
342 ("range exceeds max received command-id: " + range);
343 }
344
345 processed.add(range);
346 Range first = processed.getFirst();
347 int lower = first.getLower();
348 int upper = first.getUpper();
349 int old = maxProcessed;
350 if (le(lower, maxProcessed + 1))
351 {
352 maxProcessed = max(maxProcessed, upper);
353 }
354 boolean synced = ge(maxProcessed, syncPoint);
355 flush = lt(old, syncPoint) && synced;
356 if (synced)
357 {
358 syncPoint = maxProcessed;
359 }
360 }
361 if (flush)
362 {
363 flushProcessed();
364 }
365 }
366
367 void flushExpected()
368 {
369 RangeSet rs = new RangeSet();
370 synchronized (processedLock)
371 {
372 if (incomingInit)
373 {
374 rs.add(commandsIn);
375 }
376 }
377 sessionExpected(rs, null);
378 }
379
380 public void flushProcessed(Option ... options)
381 {
382 RangeSet copy;
383 synchronized (processedLock)
384 {
385 copy = processed.copy();
386 }
387
388 synchronized (commands)
389 {
390 if (state == DETACHED)
391 {
392 return;
393 }
394 sessionCompleted(copy, options);
395 }
396 }
397
398 void knownComplete(RangeSet kc)
399 {
400 synchronized (processedLock)
401 {
402 RangeSet newProcessed = new RangeSet();
403 for (Range pr : processed)
404 {
405 for (Range kr : kc)
406 {
407 for (Range r : pr.subtract(kr))
408 {
409 newProcessed.add(r);
410 }
411 }
412 }
413 this.processed = newProcessed;
414 }
415 }
416
417 void syncPoint()
418 {
419 int id = getCommandsIn() - 1;
420 log.debug("%s synced to %d", this, id);
421 boolean flush;
422 synchronized (processedLock)
423 {
424 syncPoint = id;
425 flush = ge(maxProcessed, syncPoint);
426 }
427 if (flush)
428 {
429 flushProcessed();
430 }
431 }
432
433 boolean complete(int lower, int upper)
434 {
435 //avoid autoboxing
436 if(log.isDebugEnabled())
437 {
438 log.debug("%s complete(%d, %d)", this, lower, upper);
439 }
440 synchronized (commands)
441 {
442 int old = maxComplete;
443 for (int id = max(maxComplete, lower); le(id, upper); id++)
444 {
445 int idx = mod(id, commands.length);
446 Method m = commands[idx];
447 if (m != null)
448 {
449 commandBytes -= m.getBodySize();
450 }
451 commands[idx] = null;
452 }
453 if (le(lower, maxComplete + 1))
454 {
455 maxComplete = max(maxComplete, upper);
456 }
457 log.debug("%s commands remaining: %s", this, commandsOut - maxComplete);
458 commands.notifyAll();
459 return gt(maxComplete, old);
460 }
461 }
462
463 void received(Method m)
464 {
465 m.delegate(this, delegate);
466 }
467
468 private void send(Method m)
469 {
470 m.setChannel(channel);
471 connection.send(m);
472
473 if (!m.isBatch())
474 {
475 connection.flush();
476 }
477 }
478
479 final private boolean isFull(int id)
480 {
481 return id - maxComplete >= commands.length || commandBytes >= byteLimit;
482 }
483
484 public void invoke(Method m)
485 {
486 if (m.getEncodedTrack() == Frame.L4)
487 {
488 if (m.hasPayload())
489 {
490 acquireCredit();
491 }
492
493 synchronized (commands)
494 {
495 if (state == DETACHED && m.isUnreliable())
496 {
497 return;
498 }
499
500 if (state != OPEN && state != CLOSED)
501 {
502 Waiter w = new Waiter(commands, timeout);
503 while (w.hasTime() && (state != OPEN && state != CLOSED))
504 {
505 w.await();
506 }
507 }
508
509 switch (state)
510 {
511 case OPEN:
512 break;
513 case CLOSED:
514 throw new SessionClosedException();
515 default:
516 throw new SessionException
517 (String.format
518 ("timed out waiting for session to become open " +
519 "(state=%s)", state));
520 }
521
522 int next = commandsOut++;
523 m.setId(next);
524
525 if (isFull(next))
526 {
527 Waiter w = new Waiter(commands, timeout);
528 while (w.hasTime() && isFull(next))
529 {
530 if (state == OPEN)
531 {
532 try
533 {
534 sessionFlush(COMPLETED);
535 }
536 catch (SenderException e)
537 {
538 if (expiry > 0)
539 {
540 // if expiry is > 0 then this will
541 // happen again on resume
542 log.error(e, "error sending flush (full replay buffer)");
543 }
544 else
545 {
546 e.rethrow();
547 }
548 }
549 }
550 w.await();
551 }
552 }
553
554 if (isFull(next))
555 {
556 throw new SessionException("timed out waiting for completion");
557 }
558
559 if (next == 0)
560 {
561 sessionCommandPoint(0, 0);
562 }
563 if (expiry > 0)
564 {
565 commands[mod(next, commands.length)] = m;
566 commandBytes += m.getBodySize();
567 }
568 if (autoSync)
569 {
570 m.setSync(true);
571 }
572 needSync = !m.isSync();
573 try
574 {
575 send(m);
576 }
577 catch (SenderException e)
578 {
579 if (expiry > 0)
580 {
581 // if expiry is > 0 then this will happen
582 // again on resume
583 log.error(e, "error sending command");
584 }
585 else
586 {
587 e.rethrow();
588 }
589 }
590 if (autoSync)
591 {
592 sync();
593 }
594
595 // flush every 64K commands to avoid ambiguity on
596 // wraparound
597 if ((next % 65536) == 0)
598 {
599 try
600 {
601 sessionFlush(COMPLETED);
602 }
603 catch (SenderException e)
604 {
605 if (expiry > 0)
606 {
607 // if expiry is > 0 then this will happen
608 // again on resume
609 log.error(e, "error sending flush (periodic)");
610 }
611 else
612 {
613 e.rethrow();
614 }
615 }
616 }
617 }
618 }
619 else
620 {
621 send(m);
622 }
623 }
624
625 public void sync()
626 {
627 sync(timeout);
628 }
629
630 public void sync(long timeout)
631 {
632 log.debug("%s sync()", this);
633 synchronized (commands)
634 {
635 int point = commandsOut - 1;
636
637 if (needSync && lt(maxComplete, point))
638 {
639 executionSync(SYNC);
640 }
641
642 Waiter w = new Waiter(commands, timeout);
643 while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
644 {
645 log.debug("%s waiting for[%d]: %d, %s", this, point,
646 maxComplete, commands);
647 w.await();
648 }
649
650 if (lt(maxComplete, point))
651 {
652 if (state == CLOSED)
653 {
654 throw new SessionException(getException());
655 }
656 else
657 {
658 throw new SessionException
659 (String.format
660 ("timed out waiting for sync: complete = %s, point = %s", maxComplete, point));
661 }
662 }
663 }
664 }
665
666 private Map<Integer,ResultFuture<?>> results =
667 new HashMap<Integer,ResultFuture<?>>();
668 private ExecutionException exception = null;
669
670 void result(int command, Struct result)
671 {
672 ResultFuture<?> future;
673 synchronized (results)
674 {
675 future = results.remove(command);
676 }
677 future.set(result);
678 }
679
680 void setException(ExecutionException exc)
681 {
682 synchronized (results)
683 {
684 if (exception != null)
685 {
686 throw new IllegalStateException
687 (String.format
688 ("too many exceptions: %s, %s", exception, exc));
689 }
690 exception = exc;
691 }
692 }
693
694 private ConnectionClose close = null;
695
696 void closeCode(ConnectionClose close)
697 {
698 this.close = close;
699 }
700
701 ExecutionException getException()
702 {
703 synchronized (results)
704 {
705 return exception;
706 }
707 }
708
709 protected <T> Future<T> invoke(Method m, Class<T> klass)
710 {
711 synchronized (commands)
712 {
713 int command = commandsOut;
714 ResultFuture<T> future = new ResultFuture<T>(klass);
715 synchronized (results)
716 {
717 results.put(command, future);
718 }
719 invoke(m);
720 return future;
721 }
722 }
723
724 private class ResultFuture<T> implements Future<T>
725 {
726
727 private final Class<T> klass;
728 private T result;
729
730 private ResultFuture(Class<T> klass)
731 {
732 this.klass = klass;
733 }
734
735 private void set(Struct result)
736 {
737 synchronized (this)
738 {
739 this.result = klass.cast(result);
740 notifyAll();
741 }
742 }
743
744 public T get(long timeout)
745 {
746 synchronized (this)
747 {
748 Waiter w = new Waiter(this, timeout);
749 while (w.hasTime() && state != CLOSED && !isDone())
750 {
751 log.debug("%s waiting for result: %s", Session.this, this);
752 w.await();
753 }
754 }
755
756 if (isDone())
757 {
758 return result;
759 }
760 else if (state == CLOSED)
761 {
762 throw new SessionException(getException());
763 }
764 else
765 {
766 throw new SessionException
767 (String.format("%s timed out waiting for result: %s",
768 Session.this, this));
769 }
770 }
771
772 public T get()
773 {
774 return get(timeout);
775 }
776
777 public boolean isDone()
778 {
779 return result != null;
780 }
781
782 public String toString()
783 {
784 return String.format("Future(%s)", isDone() ? result : klass);
785 }
786
787 }
788
789 public final void messageTransfer(String destination,
790 MessageAcceptMode acceptMode,
791 MessageAcquireMode acquireMode,
792 Header header,
793 byte[] body,
794 Option ... _options) {
795 messageTransfer(destination, acceptMode, acquireMode, header,
796 ByteBuffer.wrap(body), _options);
797 }
798
799 public final void messageTransfer(String destination,
800 MessageAcceptMode acceptMode,
801 MessageAcquireMode acquireMode,
802 Header header,
803 String body,
804 Option ... _options) {
805 messageTransfer(destination, acceptMode, acquireMode, header,
806 toUTF8(body), _options);
807 }
808
809 public void close()
810 {
811 synchronized (commands)
812 {
813 state = CLOSING;
814 // XXX: we manually set the expiry to zero here to
815 // simulate full session recovery in brokers that don't
816 // support it, we should remove this line when there is
817 // broker support for full session resume:
818 expiry = 0;
819 sessionRequestTimeout(0);
820 sessionDetach(name.getBytes());
821 Waiter w = new Waiter(commands, timeout);
822 while (w.hasTime() && state != CLOSED)
823 {
824 w.await();
825 }
826
827 if (state != CLOSED)
828 {
829 throw new SessionException("close() timed out");
830 }
831
832 connection.removeSession(this);
833 }
834 }
835
836 public void exception(Throwable t)
837 {
838 log.error(t, "caught exception");
839 }
840
841 public void closed()
842 {
843 synchronized (commands)
844 {
845 if (expiry == 0)
846 {
847 state = CLOSED;
848 }
849 else
850 {
851 state = DETACHED;
852 }
853
854 commands.notifyAll();
855
856 synchronized (results)
857 {
858 for (ResultFuture<?> result : results.values())
859 {
860 synchronized(result)
861 {
862 result.notifyAll();
863 }
864 }
865 }
866 }
867 }
868
869 public String toString()
870 {
871 return String.format("ssn:%s", name);
872 }
873
874 }
|