ProtocolOutputConverterImpl.java
001 package org.apache.qpid.server.output.amqp0_9;
002 /*
003  
004  * Licensed to the Apache Software Foundation (ASF) under one
005  * or more contributor license agreements.  See the NOTICE file
006  * distributed with this work for additional information
007  * regarding copyright ownership.  The ASF licenses this file
008  * to you under the Apache License, Version 2.0 (the
009  * "License"); you may not use this file except in compliance
010  * with the License.  You may obtain a copy of the License at
011  
012  *   http://www.apache.org/licenses/LICENSE-2.0
013  
014  * Unless required by applicable law or agreed to in writing,
015  * software distributed under the License is distributed on an
016  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017  * KIND, either express or implied.  See the License for the
018  * specific language governing permissions and limitations
019  * under the License.
020  
021  */
022 
023 
024 import org.apache.mina.common.ByteBuffer;
025 
026 import java.util.Iterator;
027 
028 import org.apache.qpid.server.output.ProtocolOutputConverter;
029 import org.apache.qpid.server.protocol.AMQProtocolSession;
030 import org.apache.qpid.server.queue.AMQMessage;
031 import org.apache.qpid.server.queue.QueueEntry;
032 import org.apache.qpid.framing.*;
033 import org.apache.qpid.framing.abstraction.ContentChunk;
034 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
035 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
036 import org.apache.qpid.AMQException;
037 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
038 
039 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
040 {
041     private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
042     private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
043 
044 
045     public static Factory getInstanceFactory()
046     {
047         return new Factory()
048         {
049 
050             public ProtocolOutputConverter newInstance(AMQProtocolSession session)
051             {
052                 return new ProtocolOutputConverterImpl(session);
053             }
054         };
055     }
056 
057     private final AMQProtocolSession _protocolSession;
058 
059     private ProtocolOutputConverterImpl(AMQProtocolSession session)
060     {
061         _protocolSession = session;
062     }
063 
064 
065     public AMQProtocolSession getProtocolSession()
066     {
067         return _protocolSession;
068     }
069 
070     public void writeDeliver(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
071             throws AMQException
072     {
073         AMQMessage message = queueEntry.getMessage();
074 
075         AMQBody deliverBody = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
076         final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
077 
078         final int bodyCount = message.getBodyCount();
079 
080         if(bodyCount == 0)
081         {
082             SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
083                                                                              contentHeaderBody);
084 
085             writeFrame(compositeBlock);
086         }
087         else
088         {
089 
090 
091             //
092             // Optimise the case where we have a single content body. In that case we create a composite block
093             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
094             //
095             ContentChunk cb = message.getContentChunk(0);
096 
097             AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
098 
099             CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
100             writeFrame(compositeBlock);
101 
102             //
103             // Now start writing out the other content bodies
104             //
105             for(int i = 1; i < bodyCount; i++)
106             {
107                 cb = message.getContentChunk(i);
108                 writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
109             }
110 
111 
112         }
113         
114     }
115 
116     private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
117     {
118         
119         return ContentHeaderBody.createAMQFrame(channelId, contentHeaderBody);
120     }
121 
122 
123     public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSizethrows AMQException
124     {
125 
126         final AMQMessage message = queueEntry.getMessage();
127 
128         AMQFrame deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
129 
130 
131         AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
132 
133         final int bodyCount = message.getBodyCount();
134         if(bodyCount == 0)
135         {
136             SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
137                                                                              contentHeader);
138             writeFrame(compositeBlock);
139         }
140         else
141         {
142 
143 
144             //
145             // Optimise the case where we have a single content body. In that case we create a composite block
146             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
147             //
148             ContentChunk cb = message.getContentChunk(0);
149 
150             AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
151             AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
152             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
153             writeFrame(compositeBlock);
154 
155             //
156             // Now start writing out the other content bodies
157             //
158             for(int i = 1; i < bodyCount; i++)
159             {
160                 cb = message.getContentChunk(i);
161                 writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
162             }
163 
164 
165         }
166 
167 
168     }
169 
170 
171     private AMQBody createEncodedDeliverFrame(QueueEntry queueEntry, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
172             throws AMQException
173     {
174         AMQMessage message= queueEntry.getMessage();
175 
176         final MessagePublishInfo pb = message.getMessagePublishInfo();
177 
178         final boolean isRedelivered = queueEntry.isRedelivered();
179         final AMQShortString exchangeName = pb.getExchange();
180         final AMQShortString routingKey = pb.getRoutingKey();
181 
182         return new AMQBody()
183         {
184 
185             public AMQBody _underlyingBody;
186 
187             public AMQBody createAMQBody()
188             {
189                 return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
190                                                               deliveryTag,
191                                                               isRedelivered,
192                                                               exchangeName,
193                                                               routingKey);
194 
195 
196 
197 
198 
199             }
200 
201             public byte getFrameType()
202             {
203                 return AMQMethodBody.TYPE;
204             }
205 
206             public int getSize()
207             {
208                 if(_underlyingBody == null)
209                 {
210                     _underlyingBody = createAMQBody();
211                 }
212                 return _underlyingBody.getSize();
213             }
214 
215             public void writePayload(ByteBuffer buffer)
216             {
217                 if(_underlyingBody == null)
218                 {
219                     _underlyingBody = createAMQBody();
220                 }
221                 _underlyingBody.writePayload(buffer);
222             }
223 
224             public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
225                 throws AMQException
226             {
227                 throw new AMQException("This block should never be dispatched!");
228             }
229         };
230     }
231 
232     private AMQFrame createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
233             throws AMQException
234     {
235         final AMQMessage message = queueEntry.getMessage();
236         final MessagePublishInfo pb = message.getMessagePublishInfo();
237 
238         BasicGetOkBody getOkBody =
239                 METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
240                                                     queueEntry.isRedelivered(),
241                                                     pb.getExchange(),
242                                                     pb.getRoutingKey(),
243                                                     queueSize);
244         return getOkBody.generateFrame(channelId);
245     }
246 
247     public byte getProtocolMinorVersion()
248     {
249         return getProtocolSession().getProtocolMinorVersion();
250     }
251 
252     public byte getProtocolMajorVersion()
253     {
254         return getProtocolSession().getProtocolMajorVersion();
255     }
256 
257     private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyTextthrows AMQException
258     {
259 
260         BasicReturnBody basicReturnBody =
261                 METHOD_REGISTRY.createBasicReturnBody(replyCode,
262                                                      replyText,
263                                                      message.getMessagePublishInfo().getExchange(),
264                                                      message.getMessagePublishInfo().getRoutingKey());
265         return basicReturnBody.generateFrame(channelId);       
266     }
267 
268     public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
269             throws AMQException
270     {
271         AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
272 
273         AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
274 
275         Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
276         //
277         // Optimise the case where we have a single content body. In that case we create a composite block
278         // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
279         //
280         if (bodyFrameIterator.hasNext())
281         {
282             AMQDataBlock firstContentBody = bodyFrameIterator.next();
283             AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
284             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
285             writeFrame(compositeBlock);
286         }
287         else
288         {
289             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
290 
291             writeFrame(compositeBlock);
292         }
293 
294         //
295         // Now start writing out the other content bodies
296         // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
297         //
298         while (bodyFrameIterator.hasNext())
299         {
300             writeFrame(bodyFrameIterator.next());
301         }
302     }
303 
304 
305     public void writeFrame(AMQDataBlock block)
306     {
307         getProtocolSession().writeFrame(block);
308     }
309 
310 
311     public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
312     {
313 
314         BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
315         writeFrame(basicCancelOkBody.generateFrame(channelId));
316 
317     }
318 
319 
320     public static final class CompositeAMQBodyBlock extends AMQDataBlock
321     {
322         public static final int OVERHEAD = * AMQFrame.getFrameOverhead();
323 
324         private final AMQBody _methodBody;
325         private final AMQBody _headerBody;
326         private final AMQBody _contentBody;
327         private final int _channel;
328 
329 
330         public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
331         {
332             _channel = channel;
333             _methodBody = methodBody;
334             _headerBody = headerBody;
335             _contentBody = contentBody;
336 
337         }
338 
339         public long getSize()
340         {
341             return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
342         }
343 
344         public void writePayload(ByteBuffer buffer)
345         {
346             AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
347         }
348     }
349 
350     public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
351     {
352         public static final int OVERHEAD = * AMQFrame.getFrameOverhead();
353 
354         private final AMQBody _methodBody;
355         private final AMQBody _headerBody;
356         private final int _channel;
357 
358 
359         public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
360         {
361             _channel = channel;
362             _methodBody = methodBody;
363             _headerBody = headerBody;
364 
365         }
366 
367         public long getSize()
368         {
369             return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
370         }
371 
372         public void writePayload(ByteBuffer buffer)
373         {
374             AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
375         }
376     }
377 
378 }