BasicMessageProducer_0_8.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client;
022 
023 import java.util.UUID;
024 
025 import javax.jms.JMSException;
026 import javax.jms.Message;
027 
028 import org.apache.mina.common.ByteBuffer;
029 import org.apache.qpid.client.message.AbstractJMSMessage;
030 import org.apache.qpid.client.message.AMQMessageDelegate;
031 import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
032 import org.apache.qpid.client.protocol.AMQProtocolHandler;
033 import org.apache.qpid.framing.AMQFrame;
034 import org.apache.qpid.framing.BasicConsumeBody;
035 import org.apache.qpid.framing.BasicContentHeaderProperties;
036 import org.apache.qpid.framing.BasicPublishBody;
037 import org.apache.qpid.framing.CompositeAMQDataBlock;
038 import org.apache.qpid.framing.ContentBody;
039 import org.apache.qpid.framing.ContentHeaderBody;
040 import org.apache.qpid.framing.ExchangeDeclareBody;
041 
042 public class BasicMessageProducer_0_8 extends BasicMessageProducer
043 {
044 
045     BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
046             AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
047             boolean waitUntilSent)
048     {
049         super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory,waitUntilSent);
050     }
051 
052     void declareDestination(AMQDestination destination)
053     {
054 
055         ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
056                                                                                               destination.getExchangeName(),
057                                                                                               destination.getExchangeClass(),
058                                                                                               false,
059                                                                                               false,
060                                                                                               false,
061                                                                                               false,
062                                                                                               true,
063                                                                                               null);
064         // Declare the exchange
065         // Note that the durable and internal arguments are ignored since passive is set to false
066 
067         AMQFrame declare = body.generateFrame(_channelId);
068 
069         _protocolHandler.writeFrame(declare);
070     }
071 
072     void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
073                      UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
074                      boolean immediate, boolean waitthrows JMSException
075     {
076         BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
077                                                                                         destination.getExchangeName(),
078                                                                                         destination.getRoutingKey(),
079                                                                                         mandatory,
080                                                                                         immediate);
081 
082         AMQFrame publishFrame = body.generateFrame(_channelId);
083 
084         message.prepareForSending();
085         ByteBuffer payload = message.getData();
086         AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8message.getDelegate();
087         BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
088 
089         contentHeaderProperties.setUserId(_userID);
090         
091         if (!_disableTimestamps)
092         {
093             final long currentTime = System.currentTimeMillis();
094             contentHeaderProperties.setTimestamp(currentTime);
095 
096             if (timeToLive > 0)
097             {
098                 contentHeaderProperties.setExpiration(currentTime + timeToLive);
099             }
100             else
101             {
102                 contentHeaderProperties.setExpiration(0);
103             }
104         }
105 
106         contentHeaderProperties.setDeliveryMode((bytedeliveryMode);
107         contentHeaderProperties.setPriority((bytepriority);
108 
109         final int size = (payload != null? payload.limit() 0;
110         final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
111         final AMQFrame[] frames = new AMQFrame[+ contentBodyFrameCount];
112 
113         if (payload != null)
114         {
115             createContentBodies(payload, frames, 2, _channelId);
116         }
117 
118         if ((contentBodyFrameCount != 0&& _logger.isDebugEnabled())
119         {
120             _logger.debug("Sending content body frames to " + destination);
121         }
122 
123 
124         // TODO: This is a hacky way of getting the AMQP class-id for the Basic class
125         int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
126 
127         AMQFrame contentHeaderFrame =
128             ContentHeaderBody.createAMQFrame(_channelId,
129                                              classIfForBasic, 0, contentHeaderProperties, size);
130         if (_logger.isDebugEnabled())
131         {
132             _logger.debug("Sending content header frame to " + destination);
133         }
134 
135         frames[0= publishFrame;
136         frames[1= contentHeaderFrame;
137         CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
138 
139         try
140         {
141             _session.checkFlowControl();
142         }
143         catch (InterruptedException e)
144         {
145             JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed");
146             jmsEx.setLinkedException(e);
147             throw jmsEx;
148         }
149 
150         _protocolHandler.writeFrame(compositeFrame, wait);
151     }
152 
153     /**
154      * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
155      * maximum frame size.
156      *
157      @param payload
158      @param frames
159      @param offset
160      @param channelId @return the array of content bodies
161      */
162     private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
163     {
164 
165         if (frames.length == (offset + 1))
166         {
167             frames[offset= ContentBody.createAMQFrame(channelId, new ContentBody(payload));
168         }
169         else
170         {
171 
172             final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() 1;
173             long remaining = payload.remaining();
174             for (int i = offset; i < frames.length; i++)
175             {
176                 payload.position((intframePayloadMax * (i - offset));
177                 int length = (remaining >= framePayloadMax(intframePayloadMax : (intremaining;
178                 payload.limit(payload.position() + length);
179                 frames[i= ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
180 
181                 remaining -= length;
182             }
183         }
184 
185     }
186 
187     private int calculateContentBodyFrameCount(ByteBuffer payload)
188     {
189         // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
190         // (0xCE byte).
191         int frameCount;
192         if ((payload == null|| (payload.remaining() == 0))
193         {
194             frameCount = 0;
195         }
196         else
197         {
198             int dataLength = payload.remaining();
199             final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() 1;
200             int lastFrame = ((dataLength % framePayloadMax00;
201             frameCount = (int) (dataLength / framePayloadMax+ lastFrame;
202         }
203 
204         return frameCount;
205     }
206 
207 }