001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.transport;
022
023 import org.apache.qpid.transport.util.Logger;
024
025
026 /**
027 * SessionDelegate
028 *
029 * @author Rafael H. Schloming
030 */
031
032 public class SessionDelegate
033 extends MethodDelegate<Session>
034 implements ProtocolDelegate<Session>
035 {
036 private static final Logger log = Logger.get(SessionDelegate.class);
037
038 public void init(Session ssn, ProtocolHeader hdr) { }
039
040 public void control(Session ssn, Method method) {
041 method.dispatch(ssn, this);
042 }
043
044 public void command(Session ssn, Method method) {
045 ssn.identify(method);
046 method.dispatch(ssn, this);
047 if (!method.hasPayload())
048 {
049 ssn.processed(method);
050 }
051 }
052
053 public void error(Session ssn, ProtocolError error) { }
054
055 public void handle(Session ssn, Method method)
056 {
057 log.warn("UNHANDLED: [%s] %s", ssn, method);
058 }
059
060 @Override public void sessionAttached(Session ssn, SessionAttached atc)
061 {
062 ssn.setState(Session.State.OPEN);
063 }
064
065 @Override public void sessionTimeout(Session ssn, SessionTimeout t)
066 {
067 // XXX: we ignore this right now, we should uncomment this
068 // when full session resume is supported:
069 // ssn.setExpiry(t.getTimeout());
070 }
071
072 @Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
073 {
074 RangeSet ranges = cmp.getCommands();
075 RangeSet known = null;
076 if (cmp.getTimelyReply())
077 {
078 known = new RangeSet();
079 }
080
081 if (ranges != null)
082 {
083 for (Range range : ranges)
084 {
085 boolean advanced = ssn.complete(range.getLower(), range.getUpper());
086 if (advanced && known != null)
087 {
088 known.add(range);
089 }
090 }
091 }
092
093 if (known != null)
094 {
095 ssn.sessionKnownCompleted(known);
096 }
097 }
098
099 @Override public void sessionKnownCompleted(Session ssn, SessionKnownCompleted kcmp)
100 {
101 RangeSet kc = kcmp.getCommands();
102 if (kc != null)
103 {
104 ssn.knownComplete(kc);
105 }
106 }
107
108 @Override public void sessionFlush(Session ssn, SessionFlush flush)
109 {
110 if (flush.getCompleted())
111 {
112 ssn.flushProcessed();
113 }
114 if (flush.getConfirmed())
115 {
116 ssn.flushProcessed();
117 }
118 if (flush.getExpected())
119 {
120 ssn.flushExpected();
121 }
122 }
123
124 @Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp)
125 {
126 ssn.commandPoint(scp.getCommandId());
127 }
128
129 @Override public void executionSync(Session ssn, ExecutionSync sync)
130 {
131 ssn.syncPoint();
132 }
133
134 @Override public void executionResult(Session ssn, ExecutionResult result)
135 {
136 ssn.result(result.getCommandId(), result.getValue());
137 }
138
139 @Override public void executionException(Session ssn, ExecutionException exc)
140 {
141 ssn.setException(exc);
142 }
143
144 @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
145 {
146 ssn.getSessionListener().message(ssn, xfr);
147 }
148
149 @Override public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm)
150 {
151 if ("".equals(sfm.getDestination()) &&
152 MessageFlowMode.CREDIT.equals(sfm.getFlowMode()))
153 {
154 ssn.setFlowControl(true);
155 }
156 else
157 {
158 super.messageSetFlowMode(ssn, sfm);
159 }
160 }
161
162 @Override public void messageFlow(Session ssn, MessageFlow flow)
163 {
164 if ("".equals(flow.getDestination()) &&
165 MessageCreditUnit.MESSAGE.equals(flow.getUnit()))
166 {
167 ssn.addCredit((int) flow.getValue());
168 }
169 else
170 {
171 super.messageFlow(ssn, flow);
172 }
173 }
174
175 @Override public void messageStop(Session ssn, MessageStop stop)
176 {
177 if ("".equals(stop.getDestination()))
178 {
179 ssn.drainCredit();
180 }
181 else
182 {
183 super.messageStop(ssn, stop);
184 }
185 }
186
187 }
|