001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021
022 /*
023 * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
024 * Supported AMQP versions:
025 * 8-0
026 */
027 package org.apache.qpid.server.output.amqp0_8;
028
029 import org.apache.qpid.server.protocol.AMQProtocolSession;
030 import org.apache.qpid.server.queue.AMQMessage;
031 import org.apache.qpid.server.queue.QueueEntry;
032 import org.apache.qpid.server.store.StoreContext;
033 import org.apache.qpid.server.output.ProtocolOutputConverter;
034 import org.apache.qpid.framing.*;
035 import org.apache.qpid.framing.abstraction.ContentChunk;
036 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
037 import org.apache.qpid.AMQException;
038
039 import java.util.Iterator;
040
041 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
042 {
043
044
045 public static Factory getInstanceFactory()
046 {
047 return new Factory()
048 {
049
050 public ProtocolOutputConverter newInstance(AMQProtocolSession session)
051 {
052 return new ProtocolOutputConverterImpl(session);
053 }
054 };
055 }
056
057 private final AMQProtocolSession _protocolSession;
058
059 private ProtocolOutputConverterImpl(AMQProtocolSession session)
060 {
061 _protocolSession = session;
062 }
063
064
065 public AMQProtocolSession getProtocolSession()
066 {
067 return _protocolSession;
068 }
069
070 public void writeDeliver(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
071 throws AMQException
072 {
073 final AMQMessage message = queueEntry.getMessage();
074
075 AMQDataBlock deliver = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
076 AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
077 message.getContentHeaderBody());
078
079 final int bodyCount = message.getBodyCount();
080
081 if(bodyCount == 0)
082 {
083 SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
084 contentHeader);
085
086 writeFrame(compositeBlock);
087 }
088 else
089 {
090
091
092 //
093 // Optimise the case where we have a single content body. In that case we create a composite block
094 // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
095 //
096 ContentChunk cb = message.getContentChunk(0);
097
098 AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
099 AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
100 CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
101 writeFrame(compositeBlock);
102
103 //
104 // Now start writing out the other content bodies
105 //
106 for(int i = 1; i < bodyCount; i++)
107 {
108 cb = message.getContentChunk(i);
109 writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
110 }
111
112
113 }
114
115
116 }
117
118
119 public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException
120 {
121 final AMQMessage message = queueEntry.getMessage();
122
123 AMQDataBlock deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
124
125
126 AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
127 message.getContentHeaderBody());
128
129 final int bodyCount = message.getBodyCount();
130 if(bodyCount == 0)
131 {
132 SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
133 contentHeader);
134 writeFrame(compositeBlock);
135 }
136 else
137 {
138
139
140 //
141 // Optimise the case where we have a single content body. In that case we create a composite block
142 // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
143 //
144 ContentChunk cb = message.getContentChunk(0);
145
146 AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
147 AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
148 CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
149 writeFrame(compositeBlock);
150
151 //
152 // Now start writing out the other content bodies
153 //
154 for(int i = 1; i < bodyCount; i++)
155 {
156 cb = message.getContentChunk(i);
157 writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
158 }
159
160
161 }
162
163
164 }
165
166
167 private AMQDataBlock createEncodedDeliverFrame(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag)
168 throws AMQException
169 {
170 final AMQMessage message = queueEntry.getMessage();
171
172 final MessagePublishInfo pb = message.getMessagePublishInfo();
173
174 MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
175 BasicDeliverBody deliverBody =
176 methodRegistry.createBasicDeliverBody(consumerTag,
177 deliveryTag,
178 queueEntry.isRedelivered(),
179 pb.getExchange(),
180 pb.getRoutingKey());
181 return deliverBody.generateFrame(channelId);
182 }
183
184 private AMQDataBlock createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
185 throws AMQException
186 {
187 final AMQMessage message = queueEntry.getMessage();
188 final MessagePublishInfo pb = message.getMessagePublishInfo();
189
190 MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
191 BasicGetOkBody getOkBody =
192 methodRegistry.createBasicGetOkBody(deliveryTag,
193 queueEntry.isRedelivered(),
194 pb.getExchange(),
195 pb.getRoutingKey(),
196 queueSize);
197 return getOkBody.generateFrame(channelId);
198 }
199
200 public byte getProtocolMinorVersion()
201 {
202 return getProtocolSession().getProtocolMinorVersion();
203 }
204
205 public byte getProtocolMajorVersion()
206 {
207 return getProtocolSession().getProtocolMajorVersion();
208 }
209
210 private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
211 {
212 MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
213 BasicReturnBody basicReturnBody =
214 methodRegistry.createBasicReturnBody(replyCode,
215 replyText,
216 message.getMessagePublishInfo().getExchange(),
217 message.getMessagePublishInfo().getRoutingKey());
218 return basicReturnBody.generateFrame(channelId);
219
220 }
221
222 public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
223 throws AMQException
224 {
225 AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
226
227 AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
228 message.getContentHeaderBody());
229
230 Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
231 //
232 // Optimise the case where we have a single content body. In that case we create a composite block
233 // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
234 //
235 if (bodyFrameIterator.hasNext())
236 {
237 AMQDataBlock firstContentBody = bodyFrameIterator.next();
238 AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
239 CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
240 writeFrame(compositeBlock);
241 }
242 else
243 {
244 CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
245
246 writeFrame(compositeBlock);
247 }
248
249 //
250 // Now start writing out the other content bodies
251 // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
252 //
253 while (bodyFrameIterator.hasNext())
254 {
255 writeFrame(bodyFrameIterator.next());
256 }
257 }
258
259
260 public void writeFrame(AMQDataBlock block)
261 {
262 getProtocolSession().writeFrame(block);
263 }
264
265
266 public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
267 {
268 MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
269 BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
270 writeFrame(basicCancelOkBody.generateFrame(channelId));
271
272 }
273 }
|