SessionDelegate.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.transport;
022 
023 import org.apache.qpid.transport.util.Logger;
024 
025 
026 /**
027  * SessionDelegate
028  *
029  @author Rafael H. Schloming
030  */
031 
032 public class SessionDelegate
033     extends MethodDelegate<Session>
034     implements ProtocolDelegate<Session>
035 {
036     private static final Logger log = Logger.get(SessionDelegate.class);
037 
038     public void init(Session ssn, ProtocolHeader hdr) { }
039 
040     public void control(Session ssn, Method method) {
041         method.dispatch(ssn, this);
042     }
043 
044     public void command(Session ssn, Method method) {
045         ssn.identify(method);
046         method.dispatch(ssn, this);
047         if (!method.hasPayload())
048         {
049             ssn.processed(method);
050         }
051     }
052 
053     public void error(Session ssn, ProtocolError error) { }
054 
055     public void handle(Session ssn, Method method)
056     {
057         log.warn("UNHANDLED: [%s] %s", ssn, method);
058     }
059 
060     @Override public void sessionAttached(Session ssn, SessionAttached atc)
061     {
062         ssn.setState(Session.State.OPEN);
063     }
064 
065     @Override public void sessionTimeout(Session ssn, SessionTimeout t)
066     {
067         // XXX: we ignore this right now, we should uncomment this
068         // when full session resume is supported:
069         // ssn.setExpiry(t.getTimeout());
070     }
071 
072     @Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
073     {
074         RangeSet ranges = cmp.getCommands();
075         RangeSet known = null;
076         if (cmp.getTimelyReply())
077         {
078             known = new RangeSet();
079         }
080 
081         if (ranges != null)
082         {
083             for (Range range : ranges)
084             {
085                 boolean advanced = ssn.complete(range.getLower(), range.getUpper());
086                 if (advanced && known != null)
087                 {
088                     known.add(range);
089                 }
090             }
091         }
092 
093         if (known != null)
094         {
095             ssn.sessionKnownCompleted(known);
096         }
097     }
098 
099     @Override public void sessionKnownCompleted(Session ssn, SessionKnownCompleted kcmp)
100     {
101         RangeSet kc = kcmp.getCommands();
102         if (kc != null)
103         {
104             ssn.knownComplete(kc);
105         }
106     }
107 
108     @Override public void sessionFlush(Session ssn, SessionFlush flush)
109     {
110         if (flush.getCompleted())
111         {
112             ssn.flushProcessed();
113         }
114         if (flush.getConfirmed())
115         {
116            ssn.flushProcessed();
117         }
118         if (flush.getExpected())
119         {
120             ssn.flushExpected();
121         }
122     }
123 
124     @Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp)
125     {
126         ssn.commandPoint(scp.getCommandId());
127     }
128 
129     @Override public void executionSync(Session ssn, ExecutionSync sync)
130     {
131         ssn.syncPoint();
132     }
133 
134     @Override public void executionResult(Session ssn, ExecutionResult result)
135     {
136         ssn.result(result.getCommandId(), result.getValue());
137     }
138 
139     @Override public void executionException(Session ssn, ExecutionException exc)
140     {
141         ssn.setException(exc);
142     }
143 
144     @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
145     {
146         ssn.getSessionListener().message(ssn, xfr);
147     }
148 
149     @Override public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm)
150     {
151         if ("".equals(sfm.getDestination()) &&
152             MessageFlowMode.CREDIT.equals(sfm.getFlowMode()))
153         {
154             ssn.setFlowControl(true);
155         }
156         else
157         {
158             super.messageSetFlowMode(ssn, sfm);
159         }
160     }
161 
162     @Override public void messageFlow(Session ssn, MessageFlow flow)
163     {
164         if ("".equals(flow.getDestination()) &&
165             MessageCreditUnit.MESSAGE.equals(flow.getUnit()))
166         {
167             ssn.addCredit((intflow.getValue());
168         }
169         else
170         {
171             super.messageFlow(ssn, flow);
172         }
173     }
174 
175     @Override public void messageStop(Session ssn, MessageStop stop)
176     {
177         if ("".equals(stop.getDestination()))
178         {
179             ssn.drainCredit();
180         }
181         else
182         {
183             super.messageStop(ssn, stop);
184         }
185     }
186 
187 }