001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.transport.network;
022
023 import static java.lang.Math.min;
024 import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
025 import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
026 import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
027 import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
028 import static org.apache.qpid.transport.network.Frame.LAST_SEG;
029
030 import java.nio.ByteBuffer;
031 import java.nio.ByteOrder;
032
033 import org.apache.qpid.transport.Header;
034 import org.apache.qpid.transport.Method;
035 import org.apache.qpid.transport.ProtocolDelegate;
036 import org.apache.qpid.transport.ProtocolError;
037 import org.apache.qpid.transport.ProtocolEvent;
038 import org.apache.qpid.transport.ProtocolHeader;
039 import org.apache.qpid.transport.SegmentType;
040 import org.apache.qpid.transport.Sender;
041 import org.apache.qpid.transport.Struct;
042 import org.apache.qpid.transport.codec.BBEncoder;
043
044
045 /**
046 * Disassembler
047 *
048 */
049
050 public final class Disassembler implements Sender<ProtocolEvent>,
051 ProtocolDelegate<Void>
052 {
053
054 private final Sender<ByteBuffer> sender;
055 private final int maxPayload;
056 private final ByteBuffer header;
057 private final Object sendlock = new Object();
058 private final ThreadLocal<BBEncoder> encoder = new ThreadLocal()
059 {
060 public BBEncoder initialValue()
061 {
062 return new BBEncoder(4*1024);
063 }
064 };
065
066 public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
067 {
068 if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
069 {
070 throw new IllegalArgumentException
071 ("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
072 }
073 this.sender = sender;
074 this.maxPayload = maxFrame - HEADER_SIZE;
075 this.header = ByteBuffer.allocate(HEADER_SIZE);
076 this.header.order(ByteOrder.BIG_ENDIAN);
077
078 }
079
080 public void send(ProtocolEvent event)
081 {
082 event.delegate(null, this);
083 }
084
085 public void flush()
086 {
087 synchronized (sendlock)
088 {
089 sender.flush();
090 }
091 }
092
093 public void close()
094 {
095 synchronized (sendlock)
096 {
097 sender.close();
098 }
099 }
100
101 private final void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
102 {
103 synchronized (sendlock)
104 {
105 header.put(0, flags);
106 header.put(1, type);
107 header.putShort(2, (short) (size + HEADER_SIZE));
108 header.put(5, track);
109 header.putShort(6, (short) channel);
110
111 header.rewind();
112
113 sender.send(header);
114
115 int limit = buf.limit();
116 buf.limit(buf.position() + size);
117 sender.send(buf);
118 buf.limit(limit);
119 }
120 }
121
122 private void fragment(byte flags, SegmentType type, ProtocolEvent event,
123 ByteBuffer buf)
124 {
125 byte typeb = (byte) type.getValue();
126 byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
127
128 int remaining = buf.remaining();
129 boolean first = true;
130 while (true)
131 {
132 int size = min(maxPayload, remaining);
133 remaining -= size;
134
135 byte newflags = flags;
136 if (first)
137 {
138 newflags |= FIRST_FRAME;
139 first = false;
140 }
141 if (remaining == 0)
142 {
143 newflags |= LAST_FRAME;
144 }
145
146 frame(newflags, typeb, track, event.getChannel(), size, buf);
147
148 if (remaining == 0)
149 {
150 break;
151 }
152 }
153 }
154
155 public void init(Void v, ProtocolHeader header)
156 {
157 synchronized (sendlock)
158 {
159 sender.send(header.toByteBuffer());
160 sender.flush();
161 }
162 }
163
164 public void control(Void v, Method method)
165 {
166 method(method, SegmentType.CONTROL);
167 }
168
169 public void command(Void v, Method method)
170 {
171 method(method, SegmentType.COMMAND);
172 }
173
174 private ByteBuffer copy(ByteBuffer src)
175 {
176 ByteBuffer buf = ByteBuffer.allocate(src.remaining());
177 buf.put(src);
178 buf.flip();
179 return buf;
180 }
181
182 private void method(Method method, SegmentType type)
183 {
184 BBEncoder enc = encoder.get();
185 enc.init();
186 enc.writeUint16(method.getEncodedType());
187 if (type == SegmentType.COMMAND)
188 {
189 if (method.isSync())
190 {
191 enc.writeUint16(0x0101);
192 }
193 else
194 {
195 enc.writeUint16(0x0100);
196 }
197 }
198 method.write(enc);
199 ByteBuffer methodSeg = enc.segment();
200
201 byte flags = FIRST_SEG;
202
203 boolean payload = method.hasPayload();
204 if (!payload)
205 {
206 flags |= LAST_SEG;
207 }
208
209 ByteBuffer headerSeg = null;
210 if (payload)
211 {
212 final Header hdr = method.getHeader();
213 if (hdr != null)
214 {
215 final Struct[] structs = hdr.getStructs();
216
217 for (Struct st : structs)
218 {
219 enc.writeStruct32(st);
220 }
221 }
222 headerSeg = enc.segment();
223 }
224
225 synchronized (sendlock)
226 {
227 fragment(flags, type, method, methodSeg);
228 if (payload)
229 {
230 fragment((byte) 0x0, SegmentType.HEADER, method, headerSeg);
231 fragment(LAST_SEG, SegmentType.BODY, method, method.getBody());
232 }
233 }
234 }
235
236 public void error(Void v, ProtocolError error)
237 {
238 throw new IllegalArgumentException("" + error);
239 }
240
241 public void setIdleTimeout(long l)
242 {
243 sender.setIdleTimeout(l);
244 }
245 }
|