001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client;
022
023 import java.util.UUID;
024
025 import javax.jms.JMSException;
026 import javax.jms.Message;
027
028 import org.apache.mina.common.ByteBuffer;
029 import org.apache.qpid.client.message.AbstractJMSMessage;
030 import org.apache.qpid.client.message.AMQMessageDelegate;
031 import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
032 import org.apache.qpid.client.protocol.AMQProtocolHandler;
033 import org.apache.qpid.framing.AMQFrame;
034 import org.apache.qpid.framing.BasicConsumeBody;
035 import org.apache.qpid.framing.BasicContentHeaderProperties;
036 import org.apache.qpid.framing.BasicPublishBody;
037 import org.apache.qpid.framing.CompositeAMQDataBlock;
038 import org.apache.qpid.framing.ContentBody;
039 import org.apache.qpid.framing.ContentHeaderBody;
040 import org.apache.qpid.framing.ExchangeDeclareBody;
041
042 public class BasicMessageProducer_0_8 extends BasicMessageProducer
043 {
044
045 BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
046 AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
047 boolean waitUntilSent)
048 {
049 super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory,waitUntilSent);
050 }
051
052 void declareDestination(AMQDestination destination)
053 {
054
055 ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
056 destination.getExchangeName(),
057 destination.getExchangeClass(),
058 false,
059 false,
060 false,
061 false,
062 true,
063 null);
064 // Declare the exchange
065 // Note that the durable and internal arguments are ignored since passive is set to false
066
067 AMQFrame declare = body.generateFrame(_channelId);
068
069 _protocolHandler.writeFrame(declare);
070 }
071
072 void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
073 UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
074 boolean immediate, boolean wait) throws JMSException
075 {
076 BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
077 destination.getExchangeName(),
078 destination.getRoutingKey(),
079 mandatory,
080 immediate);
081
082 AMQFrame publishFrame = body.generateFrame(_channelId);
083
084 message.prepareForSending();
085 ByteBuffer payload = message.getData();
086 AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
087 BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
088
089 contentHeaderProperties.setUserId(_userID);
090
091 if (!_disableTimestamps)
092 {
093 final long currentTime = System.currentTimeMillis();
094 contentHeaderProperties.setTimestamp(currentTime);
095
096 if (timeToLive > 0)
097 {
098 contentHeaderProperties.setExpiration(currentTime + timeToLive);
099 }
100 else
101 {
102 contentHeaderProperties.setExpiration(0);
103 }
104 }
105
106 contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
107 contentHeaderProperties.setPriority((byte) priority);
108
109 final int size = (payload != null) ? payload.limit() : 0;
110 final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
111 final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
112
113 if (payload != null)
114 {
115 createContentBodies(payload, frames, 2, _channelId);
116 }
117
118 if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
119 {
120 _logger.debug("Sending content body frames to " + destination);
121 }
122
123
124 // TODO: This is a hacky way of getting the AMQP class-id for the Basic class
125 int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
126
127 AMQFrame contentHeaderFrame =
128 ContentHeaderBody.createAMQFrame(_channelId,
129 classIfForBasic, 0, contentHeaderProperties, size);
130 if (_logger.isDebugEnabled())
131 {
132 _logger.debug("Sending content header frame to " + destination);
133 }
134
135 frames[0] = publishFrame;
136 frames[1] = contentHeaderFrame;
137 CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
138
139 try
140 {
141 _session.checkFlowControl();
142 }
143 catch (InterruptedException e)
144 {
145 JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed");
146 jmsEx.setLinkedException(e);
147 throw jmsEx;
148 }
149
150 _protocolHandler.writeFrame(compositeFrame, wait);
151 }
152
153 /**
154 * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
155 * maximum frame size.
156 *
157 * @param payload
158 * @param frames
159 * @param offset
160 * @param channelId @return the array of content bodies
161 */
162 private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
163 {
164
165 if (frames.length == (offset + 1))
166 {
167 frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
168 }
169 else
170 {
171
172 final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
173 long remaining = payload.remaining();
174 for (int i = offset; i < frames.length; i++)
175 {
176 payload.position((int) framePayloadMax * (i - offset));
177 int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
178 payload.limit(payload.position() + length);
179 frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
180
181 remaining -= length;
182 }
183 }
184
185 }
186
187 private int calculateContentBodyFrameCount(ByteBuffer payload)
188 {
189 // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
190 // (0xCE byte).
191 int frameCount;
192 if ((payload == null) || (payload.remaining() == 0))
193 {
194 frameCount = 0;
195 }
196 else
197 {
198 int dataLength = payload.remaining();
199 final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
200 int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
201 frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
202 }
203
204 return frameCount;
205 }
206
207 }
|