Session.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.transport;
022 
023 
024 import org.apache.qpid.transport.network.Frame;
025 
026 import org.apache.qpid.transport.util.Logger;
027 import org.apache.qpid.transport.util.Waiter;
028 
029 import java.nio.ByteBuffer;
030 import java.util.ArrayList;
031 import java.util.Arrays;
032 import java.util.HashMap;
033 import java.util.List;
034 import java.util.Map;
035 import java.util.concurrent.atomic.AtomicBoolean;
036 import java.util.concurrent.Semaphore;
037 import java.util.concurrent.TimeUnit;
038 
039 import static org.apache.qpid.transport.Option.*;
040 import static org.apache.qpid.transport.Session.State.*;
041 import static org.apache.qpid.transport.util.Functions.*;
042 import static org.apache.qpid.util.Serial.*;
043 import static org.apache.qpid.util.Strings.*;
044 
045 /**
046  * Session
047  *
048  @author Rafael H. Schloming
049  */
050 
051 public class Session extends SessionInvoker
052 {
053 
054     private static final Logger log = Logger.get(Session.class);
055 
056     enum State NEW, DETACHED, OPEN, CLOSING, CLOSED }
057 
058     class DefaultSessionListener implements SessionListener
059     {
060 
061         public void opened(Session ssn) {}
062 
063         public void message(Session ssn, MessageTransfer xfr)
064         {
065             log.info("message: %s", xfr);
066         }
067 
068         public void exception(Session ssn, SessionException exc)
069         {
070             log.error(exc, "session exception");
071         }
072 
073         public void closed(Session ssn) {}
074     }
075 
076     public static final int UNLIMITED_CREDIT = 0xFFFFFFFF;
077 
078     private Connection connection;
079     private Binary name;
080     private long expiry;
081     private int channel;
082     private SessionDelegate delegate = new SessionDelegate();
083     private SessionListener listener = new DefaultSessionListener();
084     private long timeout = 60000;
085     private boolean autoSync = false;
086 
087     private boolean incomingInit;
088     // incoming command count
089     private int commandsIn;
090     // completed incoming commands
091     private final Object processedLock = new Object();
092     private RangeSet processed;
093     private int maxProcessed;
094     private int syncPoint;
095 
096     // outgoing command count
097     private int commandsOut = 0;
098     private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit"64*1024)];
099     private int commandBytes = 0;
100     private int byteLimit = Integer.getInteger("qpid.session.byte_limit"1024*1024);
101     private int maxComplete = commandsOut - 1;
102     private boolean needSync = false;
103 
104     private State state = NEW;
105 
106     // transfer flow control
107     private volatile boolean flowControl = false;
108     private Semaphore credit = new Semaphore(0);
109 
110     Session(Connection connection, Binary name, long expiry)
111     {
112         this.connection = connection;
113         this.name = name;
114         this.expiry = expiry;
115         initReceiver();
116     }
117 
118     public Connection getConnection()
119     {
120         return connection;
121     }
122 
123     public Binary getName()
124     {
125         return name;
126     }
127 
128     void setExpiry(long expiry)
129     {
130         this.expiry = expiry;
131     }
132 
133     int getChannel()
134     {
135         return channel;
136     }
137 
138     void setChannel(int channel)
139     {
140         this.channel = channel;
141     }
142 
143     public void setSessionListener(SessionListener listener)
144     {
145         if (listener == null)
146         {
147             this.listener = new DefaultSessionListener();
148         }
149         else
150         {
151             this.listener = listener;
152         }
153     }
154 
155     public SessionListener getSessionListener()
156     {
157         return listener;
158     }
159 
160     public void setAutoSync(boolean value)
161     {
162         synchronized (commands)
163         {
164             this.autoSync = value;
165         }
166     }
167 
168     void setState(State state)
169     {
170         synchronized (commands)
171         {
172             this.state = state;
173             commands.notifyAll();
174         }
175     }
176 
177     void setFlowControl(boolean value)
178     {
179         flowControl = value;
180     }
181 
182     void addCredit(int value)
183     {
184         credit.release(value);
185     }
186 
187     void drainCredit()
188     {
189         credit.drainPermits();
190     }
191 
192     void acquireCredit()
193     {
194         if (flowControl)
195         {
196             try
197             {
198                 if (!credit.tryAcquire(timeout, TimeUnit.MILLISECONDS))
199                 {
200                     throw new SessionException
201                         ("timed out waiting for message credit");
202                 }
203             }
204             catch (InterruptedException e)
205             {
206                 throw new SessionException
207                     ("interrupted while waiting for credit", null, e);
208             }
209         }
210     }
211 
212     private void initReceiver()
213     {
214         synchronized (processedLock)
215         {
216             incomingInit = false;
217             processed = new RangeSet();
218         }
219     }
220 
221     void attach()
222     {
223         initReceiver();
224         sessionAttach(name.getBytes());
225         // XXX: when the broker and client support full session
226         // recovery we should use expiry as the requested timeout
227         sessionRequestTimeout(0);
228     }
229 
230     void resume()
231     {
232         synchronized (commands)
233         {
234             for (int i = maxComplete + 1; lt(i, commandsOut); i++)
235             {
236                 Method m = commands[mod(i, commands.length)];
237                 if (m != null)
238                 {
239                     sessionCommandPoint(m.getId()0);
240                     send(m);
241                 }
242             }
243 
244             sessionCommandPoint(commandsOut, 0);
245             sessionFlush(COMPLETED);
246         }
247     }
248 
249     void dump()
250     {
251         synchronized (commands)
252         {
253             for (Method m : commands)
254             {
255                 if (m != null)
256                 {
257                     System.out.println(m);
258                 }
259             }
260         }
261     }
262 
263     final void commandPoint(int id)
264     {
265         synchronized (processedLock)
266         {
267             this.commandsIn = id;
268             if (!incomingInit)
269             {
270                 incomingInit = true;
271                 maxProcessed = commandsIn - 1;
272                 syncPoint = maxProcessed;
273             }
274         }
275     }
276 
277     public int getCommandsOut()
278     {
279         return commandsOut;
280     }
281 
282     public int getCommandsIn()
283     {
284         return commandsIn;
285     }
286 
287     public int nextCommandId()
288     {
289         return commandsIn++;
290     }
291 
292     final void identify(Method cmd)
293     {
294         if (!incomingInit)
295         {
296             throw new IllegalStateException();
297         }
298 
299         int id = nextCommandId();
300         cmd.setId(id);
301 
302         if(log.isDebugEnabled())
303         {
304             log.debug("ID: [%s] %s"this.channel, id);
305         }
306 
307         //if ((id % 65536) == 0)
308         if ((id & 0xff== 0)
309         {
310             flushProcessed(TIMELY_REPLY);
311         }
312     }
313 
314     public void processed(Method command)
315     {
316         processed(command.getId());
317     }
318 
319     public void processed(int command)
320     {
321         processed(new Range(command, command));
322     }
323 
324     public void processed(int lower, int upper)
325     {
326 
327         processed(new Range(lower, upper));
328     }
329 
330     public void processed(Range range)
331     {
332         log.debug("%s processed(%s) %s %s", this, range, syncPoint, maxProcessed);
333 
334         boolean flush;
335         synchronized (processedLock)
336         {
337             log.debug("%s", processed);
338 
339             if (ge(range.getUpper(), commandsIn))
340             {
341                 throw new IllegalArgumentException
342                     ("range exceeds max received command-id: " + range);
343             }
344 
345             processed.add(range);
346             Range first = processed.getFirst();
347             int lower = first.getLower();
348             int upper = first.getUpper();
349             int old = maxProcessed;
350             if (le(lower, maxProcessed + 1))
351             {
352                 maxProcessed = max(maxProcessed, upper);
353             }
354             boolean synced = ge(maxProcessed, syncPoint);
355             flush = lt(old, syncPoint&& synced;
356             if (synced)
357             {
358                 syncPoint = maxProcessed;
359             }
360         }
361         if (flush)
362         {
363             flushProcessed();
364         }
365     }
366 
367     void flushExpected()
368     {
369         RangeSet rs = new RangeSet();
370         synchronized (processedLock)
371         {
372             if (incomingInit)
373             {
374                 rs.add(commandsIn);
375             }
376         }
377         sessionExpected(rs, null);
378     }
379 
380     public void flushProcessed(Option ... options)
381     {
382         RangeSet copy;
383         synchronized (processedLock)
384         {
385             copy = processed.copy();
386         }
387 
388         synchronized (commands)
389         {
390             if (state == DETACHED)
391             {
392                 return;
393             }
394             sessionCompleted(copy, options);
395         }
396     }
397 
398     void knownComplete(RangeSet kc)
399     {
400         synchronized (processedLock)
401         {
402             RangeSet newProcessed = new RangeSet();
403             for (Range pr : processed)
404             {
405                 for (Range kr : kc)
406                 {
407                     for (Range r : pr.subtract(kr))
408                     {
409                         newProcessed.add(r);
410                     }
411                 }
412             }
413             this.processed = newProcessed;
414         }
415     }
416 
417     void syncPoint()
418     {
419         int id = getCommandsIn() 1;
420         log.debug("%s synced to %d", this, id);
421         boolean flush;
422         synchronized (processedLock)
423         {
424             syncPoint = id;
425             flush = ge(maxProcessed, syncPoint);
426         }
427         if (flush)
428         {
429             flushProcessed();
430         }
431     }
432 
433     boolean complete(int lower, int upper)
434     {
435         //avoid autoboxing
436         if(log.isDebugEnabled())
437         {
438             log.debug("%s complete(%d, %d)", this, lower, upper);
439         }
440         synchronized (commands)
441         {
442             int old = maxComplete;
443             for (int id = max(maxComplete, lower); le(id, upper); id++)
444             {
445                 int idx = mod(id, commands.length);
446                 Method m = commands[idx];
447                 if (m != null)
448                 {
449                     commandBytes -= m.getBodySize();
450                 }
451                 commands[idxnull;
452             }
453             if (le(lower, maxComplete + 1))
454             {
455                 maxComplete = max(maxComplete, upper);
456             }
457             log.debug("%s   commands remaining: %s", this, commandsOut - maxComplete);
458             commands.notifyAll();
459             return gt(maxComplete, old);
460         }
461     }
462 
463     void received(Method m)
464     {
465         m.delegate(this, delegate);
466     }
467 
468     private void send(Method m)
469     {
470         m.setChannel(channel);
471         connection.send(m);
472 
473         if (!m.isBatch())
474         {
475             connection.flush();
476         }
477     }
478 
479     final private boolean isFull(int id)
480     {
481         return id - maxComplete >= commands.length || commandBytes >= byteLimit;
482     }
483 
484     public void invoke(Method m)
485     {
486         if (m.getEncodedTrack() == Frame.L4)
487         {
488             if (m.hasPayload())
489             {
490                 acquireCredit();
491             }
492 
493             synchronized (commands)
494             {
495                 if (state == DETACHED && m.isUnreliable())
496                 {
497                     return;
498                 }
499 
500                 if (state != OPEN && state != CLOSED)
501                 {
502                     Waiter w = new Waiter(commands, timeout);
503                     while (w.hasTime() && (state != OPEN && state != CLOSED))
504                     {
505                         w.await();
506                     }
507                 }
508 
509                 switch (state)
510                 {
511                 case OPEN:
512                     break;
513                 case CLOSED:
514                     throw new SessionClosedException();
515                 default:
516                     throw new SessionException
517                         (String.format
518                          ("timed out waiting for session to become open " +
519                           "(state=%s)", state));
520                 }
521 
522                 int next = commandsOut++;
523                 m.setId(next);
524 
525                 if (isFull(next))
526                 {
527                     Waiter w = new Waiter(commands, timeout);
528                     while (w.hasTime() && isFull(next))
529                     {
530                         if (state == OPEN)
531                         {
532                             try
533                             {
534                                 sessionFlush(COMPLETED);
535                             }
536                             catch (SenderException e)
537                             {
538                                 if (expiry > 0)
539                                 {
540                                     // if expiry is > 0 then this will
541                                     // happen again on resume
542                                     log.error(e, "error sending flush (full replay buffer)");
543                                 }
544                                 else
545                                 {
546                                     e.rethrow();
547                                 }
548                             }
549                         }
550                         w.await();
551                     }
552                 }
553 
554                 if (isFull(next))
555                 {
556                     throw new SessionException("timed out waiting for completion");
557                 }
558 
559                 if (next == 0)
560                 {
561                     sessionCommandPoint(00);
562                 }
563                 if (expiry > 0)
564                 {
565                     commands[mod(next, commands.length)] = m;
566                     commandBytes += m.getBodySize();
567                 }
568                 if (autoSync)
569                 {
570                     m.setSync(true);
571                 }
572                 needSync = !m.isSync();
573                 try
574                 {
575                     send(m);
576                 }
577                 catch (SenderException e)
578                 {
579                     if (expiry > 0)
580                     {
581                         // if expiry is > 0 then this will happen
582                         // again on resume
583                         log.error(e, "error sending command");
584                     }
585                     else
586                     {
587                         e.rethrow();
588                     }
589                 }
590                 if (autoSync)
591                 {
592                     sync();
593                 }
594 
595                 // flush every 64K commands to avoid ambiguity on
596                 // wraparound
597                 if ((next % 65536== 0)
598                 {
599                     try
600                     {
601                         sessionFlush(COMPLETED);
602                     }
603                     catch (SenderException e)
604                     {
605                         if (expiry > 0)
606                         {
607                             // if expiry is > 0 then this will happen
608                             // again on resume
609                             log.error(e, "error sending flush (periodic)");
610                         }
611                         else
612                         {
613                             e.rethrow();
614                         }
615                     }
616                 }
617             }
618         }
619         else
620         {
621             send(m);
622         }
623     }
624 
625     public void sync()
626     {
627         sync(timeout);
628     }
629 
630     public void sync(long timeout)
631     {
632         log.debug("%s sync()"this);
633         synchronized (commands)
634         {
635             int point = commandsOut - 1;
636 
637             if (needSync && lt(maxComplete, point))
638             {
639                 executionSync(SYNC);
640             }
641 
642             Waiter w = new Waiter(commands, timeout);
643             while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
644             {
645                 log.debug("%s   waiting for[%d]: %d, %s", this, point,
646                           maxComplete, commands);
647                 w.await();
648             }
649 
650             if (lt(maxComplete, point))
651             {
652                 if (state == CLOSED)
653                 {
654                     throw new SessionException(getException());
655                 }
656                 else
657                 {
658                     throw new SessionException
659                         (String.format
660                          ("timed out waiting for sync: complete = %s, point = %s", maxComplete, point));
661                 }
662             }
663         }
664     }
665 
666     private Map<Integer,ResultFuture<?>> results =
667         new HashMap<Integer,ResultFuture<?>>();
668     private ExecutionException exception = null;
669 
670     void result(int command, Struct result)
671     {
672         ResultFuture<?> future;
673         synchronized (results)
674         {
675             future = results.remove(command);
676         }
677         future.set(result);
678     }
679 
680     void setException(ExecutionException exc)
681     {
682         synchronized (results)
683         {
684             if (exception != null)
685             {
686                 throw new IllegalStateException
687                     (String.format
688                      ("too many exceptions: %s, %s", exception, exc));
689             }
690             exception = exc;
691         }
692     }
693 
694     private ConnectionClose close = null;
695 
696     void closeCode(ConnectionClose close)
697     {
698         this.close = close;
699     }
700 
701     ExecutionException getException()
702     {
703         synchronized (results)
704         {
705             return exception;
706         }
707     }
708 
709     protected <T> Future<T> invoke(Method m, Class<T> klass)
710     {
711         synchronized (commands)
712         {
713             int command = commandsOut;
714             ResultFuture<T> future = new ResultFuture<T>(klass);
715             synchronized (results)
716             {
717                 results.put(command, future);
718             }
719             invoke(m);
720             return future;
721         }
722     }
723 
724     private class ResultFuture<T> implements Future<T>
725     {
726 
727         private final Class<T> klass;
728         private T result;
729 
730         private ResultFuture(Class<T> klass)
731         {
732             this.klass = klass;
733         }
734 
735         private void set(Struct result)
736         {
737             synchronized (this)
738             {
739                 this.result = klass.cast(result);
740                 notifyAll();
741             }
742         }
743 
744         public T get(long timeout)
745         {
746             synchronized (this)
747             {
748                 Waiter w = new Waiter(this, timeout);
749                 while (w.hasTime() && state != CLOSED && !isDone())
750                 {
751                     log.debug("%s waiting for result: %s", Session.this, this);
752                     w.await();
753                 }
754             }
755 
756             if (isDone())
757             {
758                 return result;
759             }
760             else if (state == CLOSED)
761             {
762                 throw new SessionException(getException());
763             }
764             else
765             {
766                 throw new SessionException
767                     (String.format("%s timed out waiting for result: %s",
768                                    Session.this, this));
769             }
770         }
771 
772         public T get()
773         {
774             return get(timeout);
775         }
776 
777         public boolean isDone()
778         {
779             return result != null;
780         }
781 
782         public String toString()
783         {
784             return String.format("Future(%s)", isDone() ? result : klass);
785         }
786 
787     }
788 
789     public final void messageTransfer(String destination,
790                                       MessageAcceptMode acceptMode,
791                                       MessageAcquireMode acquireMode,
792                                       Header header,
793                                       byte[] body,
794                                       Option ... _options) {
795         messageTransfer(destination, acceptMode, acquireMode, header,
796                         ByteBuffer.wrap(body), _options);
797     }
798 
799     public final void messageTransfer(String destination,
800                                       MessageAcceptMode acceptMode,
801                                       MessageAcquireMode acquireMode,
802                                       Header header,
803                                       String body,
804                                       Option ... _options) {
805         messageTransfer(destination, acceptMode, acquireMode, header,
806                         toUTF8(body), _options);
807     }
808 
809     public void close()
810     {
811         synchronized (commands)
812         {
813             state = CLOSING;
814             // XXX: we manually set the expiry to zero here to
815             // simulate full session recovery in brokers that don't
816             // support it, we should remove this line when there is
817             // broker support for full session resume:
818             expiry = 0;
819             sessionRequestTimeout(0);
820             sessionDetach(name.getBytes());
821             Waiter w = new Waiter(commands, timeout);
822             while (w.hasTime() && state != CLOSED)
823             {
824                 w.await();
825             }
826 
827             if (state != CLOSED)
828             {
829                 throw new SessionException("close() timed out");
830             }
831 
832             connection.removeSession(this);
833         }
834     }
835 
836     public void exception(Throwable t)
837     {
838         log.error(t, "caught exception");
839     }
840 
841     public void closed()
842     {
843         synchronized (commands)
844         {
845             if (expiry == 0)
846             {
847                 state = CLOSED;
848             }
849             else
850             {
851                 state = DETACHED;
852             }
853 
854             commands.notifyAll();
855 
856             synchronized (results)
857             {
858                 for (ResultFuture<?> result : results.values())
859                 {
860                     synchronized(result)
861                     {
862                         result.notifyAll();
863                     }
864                 }
865             }
866         }
867     }
868 
869     public String toString()
870     {
871         return String.format("ssn:%s", name);
872     }
873 
874 }