Assembler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.transport.network;
022 
023 import java.util.ArrayList;
024 import java.util.HashMap;
025 import java.util.List;
026 import java.util.Map;
027 
028 import java.nio.ByteBuffer;
029 
030 import org.apache.qpid.transport.codec.BBDecoder;
031 import org.apache.qpid.transport.codec.Decoder;
032 
033 import org.apache.qpid.transport.Header;
034 import org.apache.qpid.transport.Method;
035 import org.apache.qpid.transport.ProtocolError;
036 import org.apache.qpid.transport.ProtocolEvent;
037 import org.apache.qpid.transport.ProtocolHeader;
038 import org.apache.qpid.transport.Receiver;
039 import org.apache.qpid.transport.SegmentType;
040 import org.apache.qpid.transport.Struct;
041 
042 
043 /**
044  * Assembler
045  *
046  */
047 
048 public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
049 {
050 
051     private final Receiver<ProtocolEvent> receiver;
052     private final Map<Integer,List<Frame>> segments;
053     private final Method[] incomplete;
054     private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
055     {
056         public BBDecoder initialValue()
057         {
058             return new BBDecoder();
059         }
060     };
061 
062     public Assembler(Receiver<ProtocolEvent> receiver)
063     {
064         this.receiver = receiver;
065         segments = new HashMap<Integer,List<Frame>>();
066         incomplete = new Method[64*1024];
067     }
068 
069     private int segmentKey(Frame frame)
070     {
071         return (frame.getTrack() 1* frame.getChannel();
072     }
073 
074     private List<Frame> getSegment(Frame frame)
075     {
076         return segments.get(segmentKey(frame));
077     }
078 
079     private void setSegment(Frame frame, List<Frame> segment)
080     {
081         int key = segmentKey(frame);
082         if (segments.containsKey(key))
083         {
084             error(new ProtocolError(Frame.L2, "segment in progress: %s",
085                                     frame));
086         }
087         segments.put(segmentKey(frame), segment);
088     }
089 
090     private void clearSegment(Frame frame)
091     {
092         segments.remove(segmentKey(frame));
093     }
094 
095     private void emit(int channel, ProtocolEvent event)
096     {
097         event.setChannel(channel);
098         receiver.received(event);
099     }
100 
101     public void received(NetworkEvent event)
102     {
103         event.delegate(this);
104     }
105 
106     public void exception(Throwable t)
107     {
108         this.receiver.exception(t);
109     }
110 
111     public void closed()
112     {
113         this.receiver.closed();
114     }
115 
116     public void init(ProtocolHeader header)
117     {
118         emit(0, header);
119     }
120 
121     public void error(ProtocolError error)
122     {
123         emit(0, error);
124     }
125 
126     public void frame(Frame frame)
127     {
128         ByteBuffer segment;
129         if (frame.isFirstFrame() && frame.isLastFrame())
130         {
131             segment = frame.getBody();
132             assemble(frame, segment);
133         }
134         else
135         {
136             List<Frame> frames;
137             if (frame.isFirstFrame())
138             {
139                 frames = new ArrayList<Frame>();
140                 setSegment(frame, frames);
141             }
142             else
143             {
144                 frames = getSegment(frame);
145             }
146 
147             frames.add(frame);
148 
149             if (frame.isLastFrame())
150             {
151                 clearSegment(frame);
152 
153                 int size = 0;
154                 for (Frame f : frames)
155                 {
156                     size += f.getSize();
157                 }
158                 segment = ByteBuffer.allocate(size);
159                 for (Frame f : frames)
160                 {
161                     segment.put(f.getBody());
162                 }
163                 segment.flip();
164                 assemble(frame, segment);
165             }
166         }
167 
168     }
169 
170     private void assemble(Frame frame, ByteBuffer segment)
171     {
172         BBDecoder dec = decoder.get();
173         dec.init(segment);
174 
175         int channel = frame.getChannel();
176         Method command;
177 
178         switch (frame.getType())
179         {
180         case CONTROL:
181             int controlType = dec.readUint16();
182             Method control = Method.create(controlType);
183             control.read(dec);
184             emit(channel, control);
185             break;
186         case COMMAND:
187             int commandType = dec.readUint16();
188             // read in the session header, right now we don't use it
189             dec.readUint16();
190             command = Method.create(commandType);
191             command.read(dec);
192             if (command.hasPayload())
193             {
194                 incomplete[channel= command;
195             }
196             else
197             {
198                 emit(channel, command);
199             }
200             break;
201         case HEADER:
202             command = incomplete[channel];
203             List<Struct> structs = new ArrayList(2);
204             while (dec.hasRemaining())
205             {
206                 structs.add(dec.readStruct32());
207             }
208             command.setHeader(new Header(structs));
209             if (frame.isLastSegment())
210             {
211                 incomplete[channelnull;
212                 emit(channel, command);
213             }
214             break;
215         case BODY:
216             command = incomplete[channel];
217             command.setBody(segment);
218             incomplete[channelnull;
219             emit(channel, command);
220             break;
221         default:
222             throw new IllegalStateException("unknown frame type: " + frame.getType());
223         }
224     }
225 
226 }