Disassembler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.transport.network;
022 
023 import static java.lang.Math.min;
024 import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
025 import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
026 import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
027 import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
028 import static org.apache.qpid.transport.network.Frame.LAST_SEG;
029 
030 import java.nio.ByteBuffer;
031 import java.nio.ByteOrder;
032 
033 import org.apache.qpid.transport.Header;
034 import org.apache.qpid.transport.Method;
035 import org.apache.qpid.transport.ProtocolDelegate;
036 import org.apache.qpid.transport.ProtocolError;
037 import org.apache.qpid.transport.ProtocolEvent;
038 import org.apache.qpid.transport.ProtocolHeader;
039 import org.apache.qpid.transport.SegmentType;
040 import org.apache.qpid.transport.Sender;
041 import org.apache.qpid.transport.Struct;
042 import org.apache.qpid.transport.codec.BBEncoder;
043 
044 
045 /**
046  * Disassembler
047  *
048  */
049 
050 public final class Disassembler implements Sender<ProtocolEvent>,
051                                            ProtocolDelegate<Void>
052 {
053 
054     private final Sender<ByteBuffer> sender;
055     private final int maxPayload;
056     private final ByteBuffer header;
057     private final Object sendlock = new Object();
058     private final ThreadLocal<BBEncoder> encoder = new ThreadLocal()
059     {
060         public BBEncoder initialValue()
061         {
062             return new BBEncoder(4*1024);
063         }
064     };
065 
066     public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
067     {
068         if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
069         {
070             throw new IllegalArgumentException
071                 ("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
072         }
073         this.sender = sender;
074         this.maxPayload  = maxFrame - HEADER_SIZE;
075         this.header =  ByteBuffer.allocate(HEADER_SIZE);
076         this.header.order(ByteOrder.BIG_ENDIAN);
077 
078     }
079 
080     public void send(ProtocolEvent event)
081     {
082         event.delegate(null, this);
083     }
084 
085     public void flush()
086     {
087         synchronized (sendlock)
088         {
089             sender.flush();
090         }
091     }
092 
093     public void close()
094     {
095         synchronized (sendlock)
096         {
097             sender.close();
098         }
099     }
100 
101     private final void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
102     {
103         synchronized (sendlock)
104         {
105             header.put(0, flags);
106             header.put(1, type);
107             header.putShort(2(short) (size + HEADER_SIZE));
108             header.put(5, track);
109             header.putShort(6(shortchannel);
110 
111             header.rewind();
112 
113             sender.send(header);
114 
115             int limit = buf.limit();
116             buf.limit(buf.position() + size);
117             sender.send(buf);
118             buf.limit(limit);
119         }
120     }
121 
122     private void fragment(byte flags, SegmentType type, ProtocolEvent event,
123                           ByteBuffer buf)
124     {
125         byte typeb = (bytetype.getValue();
126         byte track = event.getEncodedTrack() == Frame.L4 ? (byte(byte0;
127 
128         int remaining = buf.remaining();
129         boolean first = true;
130         while (true)
131         {
132             int size = min(maxPayload, remaining);
133             remaining -= size;
134 
135             byte newflags = flags;
136             if (first)
137             {
138                 newflags |= FIRST_FRAME;
139                 first = false;
140             }
141             if (remaining == 0)
142             {
143                 newflags |= LAST_FRAME;
144             }
145 
146             frame(newflags, typeb, track, event.getChannel(), size, buf);
147 
148             if (remaining == 0)
149             {
150                 break;
151             }
152         }
153     }
154 
155     public void init(Void v, ProtocolHeader header)
156     {
157         synchronized (sendlock)
158         {
159             sender.send(header.toByteBuffer());
160             sender.flush();
161         }
162     }
163 
164     public void control(Void v, Method method)
165     {
166         method(method, SegmentType.CONTROL);
167     }
168 
169     public void command(Void v, Method method)
170     {
171         method(method, SegmentType.COMMAND);
172     }
173 
174     private ByteBuffer copy(ByteBuffer src)
175     {
176         ByteBuffer buf = ByteBuffer.allocate(src.remaining());
177         buf.put(src);
178         buf.flip();
179         return buf;
180     }
181 
182     private void method(Method method, SegmentType type)
183     {
184         BBEncoder enc = encoder.get();
185         enc.init();
186         enc.writeUint16(method.getEncodedType());
187         if (type == SegmentType.COMMAND)
188         {
189             if (method.isSync())
190             {
191                 enc.writeUint16(0x0101);
192             }
193             else
194             {
195                 enc.writeUint16(0x0100);
196             }
197         }
198         method.write(enc);
199         ByteBuffer methodSeg = enc.segment();
200 
201         byte flags = FIRST_SEG;
202 
203         boolean payload = method.hasPayload();
204         if (!payload)
205         {
206             flags |= LAST_SEG;
207         }
208 
209         ByteBuffer headerSeg = null;
210         if (payload)
211         {
212             final Header hdr = method.getHeader();
213             if (hdr != null)
214             {
215                 final Struct[] structs = hdr.getStructs();
216 
217                 for (Struct st : structs)
218                 {
219                     enc.writeStruct32(st);
220                 }
221             }
222             headerSeg = enc.segment();
223         }
224 
225         synchronized (sendlock)
226         {
227             fragment(flags, type, method, methodSeg);
228             if (payload)
229             {
230                 fragment((byte0x0, SegmentType.HEADER, method, headerSeg);
231                 fragment(LAST_SEG, SegmentType.BODY, method, method.getBody());
232             }
233         }
234     }
235 
236     public void error(Void v, ProtocolError error)
237     {
238         throw new IllegalArgumentException("" + error);
239     }
240     
241     public void setIdleTimeout(long l)
242     {
243         sender.setIdleTimeout(l);
244     }
245 }