001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.transport.network;
022
023 import java.util.ArrayList;
024 import java.util.HashMap;
025 import java.util.List;
026 import java.util.Map;
027
028 import java.nio.ByteBuffer;
029
030 import org.apache.qpid.transport.codec.BBDecoder;
031 import org.apache.qpid.transport.codec.Decoder;
032
033 import org.apache.qpid.transport.Header;
034 import org.apache.qpid.transport.Method;
035 import org.apache.qpid.transport.ProtocolError;
036 import org.apache.qpid.transport.ProtocolEvent;
037 import org.apache.qpid.transport.ProtocolHeader;
038 import org.apache.qpid.transport.Receiver;
039 import org.apache.qpid.transport.SegmentType;
040 import org.apache.qpid.transport.Struct;
041
042
043 /**
044 * Assembler
045 *
046 */
047
048 public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
049 {
050
051 private final Receiver<ProtocolEvent> receiver;
052 private final Map<Integer,List<Frame>> segments;
053 private final Method[] incomplete;
054 private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
055 {
056 public BBDecoder initialValue()
057 {
058 return new BBDecoder();
059 }
060 };
061
062 public Assembler(Receiver<ProtocolEvent> receiver)
063 {
064 this.receiver = receiver;
065 segments = new HashMap<Integer,List<Frame>>();
066 incomplete = new Method[64*1024];
067 }
068
069 private int segmentKey(Frame frame)
070 {
071 return (frame.getTrack() + 1) * frame.getChannel();
072 }
073
074 private List<Frame> getSegment(Frame frame)
075 {
076 return segments.get(segmentKey(frame));
077 }
078
079 private void setSegment(Frame frame, List<Frame> segment)
080 {
081 int key = segmentKey(frame);
082 if (segments.containsKey(key))
083 {
084 error(new ProtocolError(Frame.L2, "segment in progress: %s",
085 frame));
086 }
087 segments.put(segmentKey(frame), segment);
088 }
089
090 private void clearSegment(Frame frame)
091 {
092 segments.remove(segmentKey(frame));
093 }
094
095 private void emit(int channel, ProtocolEvent event)
096 {
097 event.setChannel(channel);
098 receiver.received(event);
099 }
100
101 public void received(NetworkEvent event)
102 {
103 event.delegate(this);
104 }
105
106 public void exception(Throwable t)
107 {
108 this.receiver.exception(t);
109 }
110
111 public void closed()
112 {
113 this.receiver.closed();
114 }
115
116 public void init(ProtocolHeader header)
117 {
118 emit(0, header);
119 }
120
121 public void error(ProtocolError error)
122 {
123 emit(0, error);
124 }
125
126 public void frame(Frame frame)
127 {
128 ByteBuffer segment;
129 if (frame.isFirstFrame() && frame.isLastFrame())
130 {
131 segment = frame.getBody();
132 assemble(frame, segment);
133 }
134 else
135 {
136 List<Frame> frames;
137 if (frame.isFirstFrame())
138 {
139 frames = new ArrayList<Frame>();
140 setSegment(frame, frames);
141 }
142 else
143 {
144 frames = getSegment(frame);
145 }
146
147 frames.add(frame);
148
149 if (frame.isLastFrame())
150 {
151 clearSegment(frame);
152
153 int size = 0;
154 for (Frame f : frames)
155 {
156 size += f.getSize();
157 }
158 segment = ByteBuffer.allocate(size);
159 for (Frame f : frames)
160 {
161 segment.put(f.getBody());
162 }
163 segment.flip();
164 assemble(frame, segment);
165 }
166 }
167
168 }
169
170 private void assemble(Frame frame, ByteBuffer segment)
171 {
172 BBDecoder dec = decoder.get();
173 dec.init(segment);
174
175 int channel = frame.getChannel();
176 Method command;
177
178 switch (frame.getType())
179 {
180 case CONTROL:
181 int controlType = dec.readUint16();
182 Method control = Method.create(controlType);
183 control.read(dec);
184 emit(channel, control);
185 break;
186 case COMMAND:
187 int commandType = dec.readUint16();
188 // read in the session header, right now we don't use it
189 dec.readUint16();
190 command = Method.create(commandType);
191 command.read(dec);
192 if (command.hasPayload())
193 {
194 incomplete[channel] = command;
195 }
196 else
197 {
198 emit(channel, command);
199 }
200 break;
201 case HEADER:
202 command = incomplete[channel];
203 List<Struct> structs = new ArrayList(2);
204 while (dec.hasRemaining())
205 {
206 structs.add(dec.readStruct32());
207 }
208 command.setHeader(new Header(structs));
209 if (frame.isLastSegment())
210 {
211 incomplete[channel] = null;
212 emit(channel, command);
213 }
214 break;
215 case BODY:
216 command = incomplete[channel];
217 command.setBody(segment);
218 incomplete[channel] = null;
219 emit(channel, command);
220 break;
221 default:
222 throw new IllegalStateException("unknown frame type: " + frame.getType());
223 }
224 }
225
226 }
|