ProtocolOutputConverterImpl.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 
022 /*
023  * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
024  * Supported AMQP versions:
025  *   8-0
026  */
027 package org.apache.qpid.server.output.amqp0_8;
028 
029 import org.apache.qpid.server.protocol.AMQProtocolSession;
030 import org.apache.qpid.server.queue.AMQMessage;
031 import org.apache.qpid.server.queue.QueueEntry;
032 import org.apache.qpid.server.store.StoreContext;
033 import org.apache.qpid.server.output.ProtocolOutputConverter;
034 import org.apache.qpid.framing.*;
035 import org.apache.qpid.framing.abstraction.ContentChunk;
036 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
037 import org.apache.qpid.AMQException;
038 
039 import java.util.Iterator;
040 
041 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
042 {
043 
044 
045     public static Factory getInstanceFactory()
046     {
047         return new Factory()
048         {
049     
050             public ProtocolOutputConverter newInstance(AMQProtocolSession session)
051             {
052                 return new ProtocolOutputConverterImpl(session);
053             }
054         };
055     }
056 
057     private final AMQProtocolSession _protocolSession;
058 
059     private ProtocolOutputConverterImpl(AMQProtocolSession session)
060     {
061         _protocolSession = session;
062     }
063 
064 
065     public AMQProtocolSession getProtocolSession()
066     {
067         return _protocolSession;
068     }
069 
070     public void writeDeliver(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
071             throws AMQException
072     {
073         final AMQMessage message = queueEntry.getMessage();
074 
075         AMQDataBlock deliver = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
076         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
077                                                                       message.getContentHeaderBody());
078 
079         final int bodyCount = message.getBodyCount();
080 
081         if(bodyCount == 0)
082         {
083             SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
084                                                                              contentHeader);
085 
086             writeFrame(compositeBlock);
087         }
088         else
089         {
090 
091 
092             //
093             // Optimise the case where we have a single content body. In that case we create a composite block
094             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
095             //
096             ContentChunk cb = message.getContentChunk(0);
097 
098             AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
099             AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
100             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
101             writeFrame(compositeBlock);
102 
103             //
104             // Now start writing out the other content bodies
105             //
106             for(int i = 1; i < bodyCount; i++)
107             {
108                 cb = message.getContentChunk(i);
109                 writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
110             }
111 
112 
113         }
114 
115 
116     }
117 
118 
119     public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSizethrows AMQException
120     {
121         final AMQMessage message = queueEntry.getMessage();
122 
123         AMQDataBlock deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
124 
125 
126         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
127                                                                       message.getContentHeaderBody());
128 
129         final int bodyCount = message.getBodyCount();
130         if(bodyCount == 0)
131         {
132             SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
133                                                                              contentHeader);
134             writeFrame(compositeBlock);
135         }
136         else
137         {
138 
139 
140             //
141             // Optimise the case where we have a single content body. In that case we create a composite block
142             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
143             //
144             ContentChunk cb = message.getContentChunk(0);
145 
146             AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
147             AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
148             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
149             writeFrame(compositeBlock);
150 
151             //
152             // Now start writing out the other content bodies
153             //
154             for(int i = 1; i < bodyCount; i++)
155             {
156                 cb = message.getContentChunk(i);
157                 writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
158             }
159 
160 
161         }
162 
163 
164     }
165 
166 
167     private AMQDataBlock createEncodedDeliverFrame(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
168             throws AMQException
169     {
170         final AMQMessage message = queueEntry.getMessage();
171 
172         final MessagePublishInfo pb = message.getMessagePublishInfo();
173 
174         MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
175         BasicDeliverBody deliverBody =
176                 methodRegistry.createBasicDeliverBody(consumerTag,
177                                                       deliveryTag,
178                                                       queueEntry.isRedelivered(),
179                                                       pb.getExchange(),
180                                                       pb.getRoutingKey());
181         return deliverBody.generateFrame(channelId);
182     }
183 
184     private AMQDataBlock createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
185             throws AMQException
186     {
187         final AMQMessage message = queueEntry.getMessage();
188         final MessagePublishInfo pb = message.getMessagePublishInfo();        
189 
190         MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
191         BasicGetOkBody getOkBody =
192                 methodRegistry.createBasicGetOkBody(deliveryTag,
193                                                     queueEntry.isRedelivered(),
194                                                     pb.getExchange(),
195                                                     pb.getRoutingKey(),
196                                                     queueSize);
197         return getOkBody.generateFrame(channelId);
198     }
199 
200     public byte getProtocolMinorVersion()
201     {
202         return getProtocolSession().getProtocolMinorVersion();
203     }
204 
205     public byte getProtocolMajorVersion()
206     {
207         return getProtocolSession().getProtocolMajorVersion();
208     }
209 
210     private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyTextthrows AMQException
211     {
212         MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
213         BasicReturnBody basicReturnBody =
214                 methodRegistry.createBasicReturnBody(replyCode,
215                                                      replyText,
216                                                      message.getMessagePublishInfo().getExchange(),
217                                                      message.getMessagePublishInfo().getRoutingKey());
218         return basicReturnBody.generateFrame(channelId);
219         
220     }
221 
222     public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
223             throws AMQException
224     {
225         AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
226 
227         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
228                                                                       message.getContentHeaderBody());
229 
230         Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
231         //
232         // Optimise the case where we have a single content body. In that case we create a composite block
233         // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
234         //
235         if (bodyFrameIterator.hasNext())
236         {
237             AMQDataBlock firstContentBody = bodyFrameIterator.next();
238             AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
239             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
240             writeFrame(compositeBlock);
241         }
242         else
243         {
244             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
245 
246             writeFrame(compositeBlock);
247         }
248 
249         //
250         // Now start writing out the other content bodies
251         // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
252         //
253         while (bodyFrameIterator.hasNext())
254         {
255             writeFrame(bodyFrameIterator.next());
256         }
257     }
258 
259 
260     public void writeFrame(AMQDataBlock block)
261     {
262         getProtocolSession().writeFrame(block);
263     }
264 
265 
266     public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
267     {
268         MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
269         BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
270         writeFrame(basicCancelOkBody.generateFrame(channelId));
271 
272     }
273 }