001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.transport.network;
022
023 import java.nio.ByteBuffer;
024 import java.nio.ByteOrder;
025
026 import org.apache.qpid.transport.ProtocolError;
027 import org.apache.qpid.transport.ProtocolHeader;
028 import org.apache.qpid.transport.Receiver;
029 import org.apache.qpid.transport.SegmentType;
030
031 import static org.apache.qpid.transport.util.Functions.*;
032
033 import static org.apache.qpid.transport.network.InputHandler.State.*;
034
035
036 /**
037 * InputHandler
038 *
039 * @author Rafael H. Schloming
040 */
041
042 public final class InputHandler implements Receiver<ByteBuffer>
043 {
044
045 public enum State
046 {
047 PROTO_HDR,
048 FRAME_HDR,
049 FRAME_BODY,
050 ERROR;
051 }
052
053 private final Receiver<NetworkEvent> receiver;
054 private State state;
055 private ByteBuffer input = null;
056 private int needed;
057
058 private byte flags;
059 private SegmentType type;
060 private byte track;
061 private int channel;
062
063 public InputHandler(Receiver<NetworkEvent> receiver, State state)
064 {
065 this.receiver = receiver;
066 this.state = state;
067
068 switch (state)
069 {
070 case PROTO_HDR:
071 needed = 8;
072 break;
073 case FRAME_HDR:
074 needed = Frame.HEADER_SIZE;
075 break;
076 }
077 }
078
079 public InputHandler(Receiver<NetworkEvent> receiver)
080 {
081 this(receiver, PROTO_HDR);
082 }
083
084 private void error(String fmt, Object ... args)
085 {
086 receiver.received(new ProtocolError(Frame.L1, fmt, args));
087 }
088
089 public void received(ByteBuffer buf)
090 {
091 int limit = buf.limit();
092 int remaining = buf.remaining();
093 while (remaining > 0)
094 {
095 if (remaining >= needed)
096 {
097 int consumed = needed;
098 int pos = buf.position();
099 if (input == null)
100 {
101 buf.limit(pos + needed);
102 input = buf;
103 state = next(pos);
104 buf.limit(limit);
105 buf.position(pos + consumed);
106 }
107 else
108 {
109 buf.limit(pos + needed);
110 input.put(buf);
111 buf.limit(limit);
112 input.flip();
113 state = next(0);
114 }
115
116 remaining -= consumed;
117 input = null;
118 }
119 else
120 {
121 if (input == null)
122 {
123 input = ByteBuffer.allocate(needed);
124 }
125 input.put(buf);
126 needed -= remaining;
127 remaining = 0;
128 }
129 }
130 }
131
132 private State next(int pos)
133 {
134 input.order(ByteOrder.BIG_ENDIAN);
135
136 switch (state) {
137 case PROTO_HDR:
138 if (input.get(pos) != 'A' &&
139 input.get(pos + 1) != 'M' &&
140 input.get(pos + 2) != 'Q' &&
141 input.get(pos + 3) != 'P')
142 {
143 error("bad protocol header: %s", str(input));
144 return ERROR;
145 }
146
147 byte instance = input.get(pos + 5);
148 byte major = input.get(pos + 6);
149 byte minor = input.get(pos + 7);
150 receiver.received(new ProtocolHeader(instance, major, minor));
151 needed = Frame.HEADER_SIZE;
152 return FRAME_HDR;
153 case FRAME_HDR:
154 flags = input.get(pos);
155 type = SegmentType.get(input.get(pos + 1));
156 int size = (0xFFFF & input.getShort(pos + 2));
157 size -= Frame.HEADER_SIZE;
158 if (size < 0 || size > (64*1024 - 12))
159 {
160 error("bad frame size: %d", size);
161 return ERROR;
162 }
163 byte b = input.get(pos + 5);
164 if ((b & 0xF0) != 0) {
165 error("non-zero reserved bits in upper nibble of " +
166 "frame header byte 5: '%x'", b);
167 return ERROR;
168 } else {
169 track = (byte) (b & 0xF);
170 }
171 channel = (0xFFFF & input.getShort(pos + 6));
172 if (size == 0)
173 {
174 Frame frame = new Frame(flags, type, track, channel, ByteBuffer.allocate(0));
175 receiver.received(frame);
176 needed = Frame.HEADER_SIZE;
177 return FRAME_HDR;
178 }
179 else
180 {
181 needed = size;
182 return FRAME_BODY;
183 }
184 case FRAME_BODY:
185 Frame frame = new Frame(flags, type, track, channel, input.slice());
186 receiver.received(frame);
187 needed = Frame.HEADER_SIZE;
188 return FRAME_HDR;
189 default:
190 throw new IllegalStateException();
191 }
192 }
193
194 public void exception(Throwable t)
195 {
196 receiver.exception(t);
197 }
198
199 public void closed()
200 {
201 receiver.closed();
202 }
203
204 }
|