InputHandler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.transport.network;
022 
023 import java.nio.ByteBuffer;
024 import java.nio.ByteOrder;
025 
026 import org.apache.qpid.transport.ProtocolError;
027 import org.apache.qpid.transport.ProtocolHeader;
028 import org.apache.qpid.transport.Receiver;
029 import org.apache.qpid.transport.SegmentType;
030 
031 import static org.apache.qpid.transport.util.Functions.*;
032 
033 import static org.apache.qpid.transport.network.InputHandler.State.*;
034 
035 
036 /**
037  * InputHandler
038  *
039  @author Rafael H. Schloming
040  */
041 
042 public final class InputHandler implements Receiver<ByteBuffer>
043 {
044 
045     public enum State
046     {
047         PROTO_HDR,
048         FRAME_HDR,
049         FRAME_BODY,
050         ERROR;
051     }
052 
053     private final Receiver<NetworkEvent> receiver;
054     private State state;
055     private ByteBuffer input = null;
056     private int needed;
057 
058     private byte flags;
059     private SegmentType type;
060     private byte track;
061     private int channel;
062 
063     public InputHandler(Receiver<NetworkEvent> receiver, State state)
064     {
065         this.receiver = receiver;
066         this.state = state;
067 
068         switch (state)
069         {
070         case PROTO_HDR:
071             needed = 8;
072             break;
073         case FRAME_HDR:
074             needed = Frame.HEADER_SIZE;
075             break;
076         }
077     }
078 
079     public InputHandler(Receiver<NetworkEvent> receiver)
080     {
081         this(receiver, PROTO_HDR);
082     }
083 
084     private void error(String fmt, Object ... args)
085     {
086         receiver.received(new ProtocolError(Frame.L1, fmt, args));
087     }
088 
089     public void received(ByteBuffer buf)
090     {
091         int limit = buf.limit();
092         int remaining = buf.remaining();
093         while (remaining > 0)
094         {
095             if (remaining >= needed)
096             {
097                 int consumed = needed;
098                 int pos = buf.position();
099                 if (input == null)
100                 {
101                     buf.limit(pos + needed);
102                     input = buf;
103                     state = next(pos);
104                     buf.limit(limit);
105                     buf.position(pos + consumed);
106                 }
107                 else
108                 {
109                     buf.limit(pos + needed);
110                     input.put(buf);
111                     buf.limit(limit);
112                     input.flip();
113                     state = next(0);
114                 }
115 
116                 remaining -= consumed;
117                 input = null;
118             }
119             else
120             {
121                 if (input == null)
122                 {
123                     input = ByteBuffer.allocate(needed);
124                 }
125                 input.put(buf);
126                 needed -= remaining;
127                 remaining = 0;
128             }
129         }
130     }
131 
132     private State next(int pos)
133     {
134         input.order(ByteOrder.BIG_ENDIAN);
135 
136         switch (state) {
137         case PROTO_HDR:
138             if (input.get(pos!= 'A' &&
139                 input.get(pos + 1!= 'M' &&
140                 input.get(pos + 2!= 'Q' &&
141                 input.get(pos + 3!= 'P')
142             {
143                 error("bad protocol header: %s", str(input));
144                 return ERROR;
145             }
146 
147             byte instance = input.get(pos + 5);
148             byte major = input.get(pos + 6);
149             byte minor = input.get(pos + 7);
150             receiver.received(new ProtocolHeader(instance, major, minor));
151             needed = Frame.HEADER_SIZE;
152             return FRAME_HDR;
153         case FRAME_HDR:
154             flags = input.get(pos);
155             type = SegmentType.get(input.get(pos + 1));
156             int size = (0xFFFF & input.getShort(pos + 2));
157             size -= Frame.HEADER_SIZE;
158             if (size < || size > (64*1024 12))
159             {
160                 error("bad frame size: %d", size);
161                 return ERROR;
162             }
163             byte b = input.get(pos + 5);
164             if ((b & 0xF0!= 0) {
165                 error("non-zero reserved bits in upper nibble of " +
166                       "frame header byte 5: '%x'", b);
167                 return ERROR;
168             else {
169                 track = (byte) (b & 0xF);
170             }
171             channel = (0xFFFF & input.getShort(pos + 6));
172             if (size == 0)
173             {
174                 Frame frame = new Frame(flags, type, track, channel, ByteBuffer.allocate(0));
175                 receiver.received(frame);
176                 needed = Frame.HEADER_SIZE;
177                 return FRAME_HDR;
178             }
179             else
180             {
181                 needed = size;
182                 return FRAME_BODY;
183             }
184         case FRAME_BODY:
185             Frame frame = new Frame(flags, type, track, channel, input.slice());
186             receiver.received(frame);
187             needed = Frame.HEADER_SIZE;
188             return FRAME_HDR;
189         default:
190             throw new IllegalStateException();
191         }
192     }
193 
194     public void exception(Throwable t)
195     {
196         receiver.exception(t);
197     }
198 
199     public void closed()
200     {
201         receiver.closed();
202     }
203 
204 }