001 package org.apache.qpid.server.output.amqp0_9;
002 /*
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied. See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 *
021 */
022
023
024 import org.apache.mina.common.ByteBuffer;
025
026 import java.util.Iterator;
027
028 import org.apache.qpid.server.output.ProtocolOutputConverter;
029 import org.apache.qpid.server.protocol.AMQProtocolSession;
030 import org.apache.qpid.server.queue.AMQMessage;
031 import org.apache.qpid.server.queue.QueueEntry;
032 import org.apache.qpid.framing.*;
033 import org.apache.qpid.framing.abstraction.ContentChunk;
034 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
035 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
036 import org.apache.qpid.AMQException;
037 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
038
039 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
040 {
041 private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
042 private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
043
044
045 public static Factory getInstanceFactory()
046 {
047 return new Factory()
048 {
049
050 public ProtocolOutputConverter newInstance(AMQProtocolSession session)
051 {
052 return new ProtocolOutputConverterImpl(session);
053 }
054 };
055 }
056
057 private final AMQProtocolSession _protocolSession;
058
059 private ProtocolOutputConverterImpl(AMQProtocolSession session)
060 {
061 _protocolSession = session;
062 }
063
064
065 public AMQProtocolSession getProtocolSession()
066 {
067 return _protocolSession;
068 }
069
070 public void writeDeliver(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
071 throws AMQException
072 {
073 AMQMessage message = queueEntry.getMessage();
074
075 AMQBody deliverBody = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
076 final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
077
078 final int bodyCount = message.getBodyCount();
079
080 if(bodyCount == 0)
081 {
082 SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
083 contentHeaderBody);
084
085 writeFrame(compositeBlock);
086 }
087 else
088 {
089
090
091 //
092 // Optimise the case where we have a single content body. In that case we create a composite block
093 // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
094 //
095 ContentChunk cb = message.getContentChunk(0);
096
097 AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
098
099 CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
100 writeFrame(compositeBlock);
101
102 //
103 // Now start writing out the other content bodies
104 //
105 for(int i = 1; i < bodyCount; i++)
106 {
107 cb = message.getContentChunk(i);
108 writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
109 }
110
111
112 }
113
114 }
115
116 private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
117 {
118
119 return ContentHeaderBody.createAMQFrame(channelId, contentHeaderBody);
120 }
121
122
123 public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException
124 {
125
126 final AMQMessage message = queueEntry.getMessage();
127
128 AMQFrame deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
129
130
131 AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
132
133 final int bodyCount = message.getBodyCount();
134 if(bodyCount == 0)
135 {
136 SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
137 contentHeader);
138 writeFrame(compositeBlock);
139 }
140 else
141 {
142
143
144 //
145 // Optimise the case where we have a single content body. In that case we create a composite block
146 // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
147 //
148 ContentChunk cb = message.getContentChunk(0);
149
150 AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
151 AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
152 CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
153 writeFrame(compositeBlock);
154
155 //
156 // Now start writing out the other content bodies
157 //
158 for(int i = 1; i < bodyCount; i++)
159 {
160 cb = message.getContentChunk(i);
161 writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
162 }
163
164
165 }
166
167
168 }
169
170
171 private AMQBody createEncodedDeliverFrame(QueueEntry queueEntry, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
172 throws AMQException
173 {
174 AMQMessage message= queueEntry.getMessage();
175
176 final MessagePublishInfo pb = message.getMessagePublishInfo();
177
178 final boolean isRedelivered = queueEntry.isRedelivered();
179 final AMQShortString exchangeName = pb.getExchange();
180 final AMQShortString routingKey = pb.getRoutingKey();
181
182 return new AMQBody()
183 {
184
185 public AMQBody _underlyingBody;
186
187 public AMQBody createAMQBody()
188 {
189 return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
190 deliveryTag,
191 isRedelivered,
192 exchangeName,
193 routingKey);
194
195
196
197
198
199 }
200
201 public byte getFrameType()
202 {
203 return AMQMethodBody.TYPE;
204 }
205
206 public int getSize()
207 {
208 if(_underlyingBody == null)
209 {
210 _underlyingBody = createAMQBody();
211 }
212 return _underlyingBody.getSize();
213 }
214
215 public void writePayload(ByteBuffer buffer)
216 {
217 if(_underlyingBody == null)
218 {
219 _underlyingBody = createAMQBody();
220 }
221 _underlyingBody.writePayload(buffer);
222 }
223
224 public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
225 throws AMQException
226 {
227 throw new AMQException("This block should never be dispatched!");
228 }
229 };
230 }
231
232 private AMQFrame createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
233 throws AMQException
234 {
235 final AMQMessage message = queueEntry.getMessage();
236 final MessagePublishInfo pb = message.getMessagePublishInfo();
237
238 BasicGetOkBody getOkBody =
239 METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
240 queueEntry.isRedelivered(),
241 pb.getExchange(),
242 pb.getRoutingKey(),
243 queueSize);
244 return getOkBody.generateFrame(channelId);
245 }
246
247 public byte getProtocolMinorVersion()
248 {
249 return getProtocolSession().getProtocolMinorVersion();
250 }
251
252 public byte getProtocolMajorVersion()
253 {
254 return getProtocolSession().getProtocolMajorVersion();
255 }
256
257 private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
258 {
259
260 BasicReturnBody basicReturnBody =
261 METHOD_REGISTRY.createBasicReturnBody(replyCode,
262 replyText,
263 message.getMessagePublishInfo().getExchange(),
264 message.getMessagePublishInfo().getRoutingKey());
265 return basicReturnBody.generateFrame(channelId);
266 }
267
268 public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
269 throws AMQException
270 {
271 AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
272
273 AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
274
275 Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
276 //
277 // Optimise the case where we have a single content body. In that case we create a composite block
278 // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
279 //
280 if (bodyFrameIterator.hasNext())
281 {
282 AMQDataBlock firstContentBody = bodyFrameIterator.next();
283 AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
284 CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
285 writeFrame(compositeBlock);
286 }
287 else
288 {
289 CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
290
291 writeFrame(compositeBlock);
292 }
293
294 //
295 // Now start writing out the other content bodies
296 // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
297 //
298 while (bodyFrameIterator.hasNext())
299 {
300 writeFrame(bodyFrameIterator.next());
301 }
302 }
303
304
305 public void writeFrame(AMQDataBlock block)
306 {
307 getProtocolSession().writeFrame(block);
308 }
309
310
311 public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
312 {
313
314 BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
315 writeFrame(basicCancelOkBody.generateFrame(channelId));
316
317 }
318
319
320 public static final class CompositeAMQBodyBlock extends AMQDataBlock
321 {
322 public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
323
324 private final AMQBody _methodBody;
325 private final AMQBody _headerBody;
326 private final AMQBody _contentBody;
327 private final int _channel;
328
329
330 public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
331 {
332 _channel = channel;
333 _methodBody = methodBody;
334 _headerBody = headerBody;
335 _contentBody = contentBody;
336
337 }
338
339 public long getSize()
340 {
341 return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
342 }
343
344 public void writePayload(ByteBuffer buffer)
345 {
346 AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
347 }
348 }
349
350 public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
351 {
352 public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
353
354 private final AMQBody _methodBody;
355 private final AMQBody _headerBody;
356 private final int _channel;
357
358
359 public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
360 {
361 _channel = channel;
362 _methodBody = methodBody;
363 _headerBody = headerBody;
364
365 }
366
367 public long getSize()
368 {
369 return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
370 }
371
372 public void writePayload(ByteBuffer buffer)
373 {
374 AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
375 }
376 }
377
378 }
|