BasicMessageProducer_0_10.java
001 /* Licensed to the Apache Software Foundation (ASF) under one
002  * or more contributor license agreements.  See the NOTICE file
003  * distributed with this work for additional information
004  * regarding copyright ownership.  The ASF licenses this file
005  * to you under the Apache License, Version 2.0 (the
006  * "License"); you may not use this file except in compliance
007  * with the License.  You may obtain a copy of the License at
008  *
009  *   http://www.apache.org/licenses/LICENSE-2.0
010  *
011  * Unless required by applicable law or agreed to in writing,
012  * software distributed under the License is distributed on an
013  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
014  * KIND, either express or implied.  See the License for the
015  * specific language governing permissions and limitations
016  * under the License.
017  */
018 package org.apache.qpid.client;
019 
020 import java.io.IOException;
021 import java.net.URISyntaxException;
022 import java.util.HashMap;
023 import java.util.Map;
024 import java.util.UUID;
025 import java.nio.ByteBuffer;
026 
027 import javax.jms.JMSException;
028 import javax.jms.Message;
029 import javax.jms.DeliveryMode;
030 
031 import org.apache.qpid.client.message.AbstractJMSMessage;
032 import org.apache.qpid.client.message.FiledTableSupport;
033 import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
034 import org.apache.qpid.client.protocol.AMQProtocolHandler;
035 import org.apache.qpid.framing.AMQShortString;
036 import org.apache.qpid.framing.BasicContentHeaderProperties;
037 import org.apache.qpid.url.AMQBindingURL;
038 import org.apache.qpid.util.Strings;
039 import org.apache.qpid.njms.ExceptionHelper;
040 import org.apache.qpid.transport.*;
041 import static org.apache.qpid.transport.Option.*;
042 
043 /**
044  * This is a 0_10 message producer.
045  */
046 public class BasicMessageProducer_0_10 extends BasicMessageProducer
047 {
048     private byte[] userIDBytes;
049 
050     BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
051                               AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
052                               boolean immediate, boolean mandatory, boolean waitUntilSent)
053     {
054         super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate,
055               mandatory, waitUntilSent);
056         
057         userIDBytes = Strings.toUTF8(_userID);
058     }
059 
060     void declareDestination(AMQDestination destination)
061     {
062         ((AMQSession_0_10getSession()).getQpidSession().exchangeDeclare(destination.getExchangeName().toString(),
063                                                                           destination.getExchangeClass().toString(),
064                                                                           null, null);
065     }
066 
067     //--- Overwritten methods
068 
069     /**
070      * Sends a message to a given destination
071      */
072     void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
073                      UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
074                      boolean immediate, boolean waitthrows JMSException
075     {
076         message.prepareForSending();
077 
078         AMQMessageDelegate_0_10 delegate = (AMQMessageDelegate_0_10message.getDelegate();
079 
080         DeliveryProperties deliveryProp = delegate.getDeliveryProperties();
081         MessageProperties messageProps = delegate.getMessageProperties();
082 
083         // On the receiving side, this will be read in to the JMSXUserID as well.
084         messageProps.setUserId(userIDBytes);
085                 
086         if (messageId != null)
087         {
088             messageProps.setMessageId(messageId);
089         }
090         else if (messageProps.hasMessageId())
091         {
092             messageProps.clearMessageId();
093         }
094 
095         if (!_disableTimestamps)
096         {
097             final long currentTime = System.currentTimeMillis();
098             deliveryProp.setTimestamp(currentTime);
099             if (timeToLive > 0)
100             {
101                 deliveryProp.setExpiration(currentTime + timeToLive);
102                 message.setJMSExpiration(currentTime + timeToLive);
103             }
104             else
105             {
106                deliveryProp.setExpiration(0);
107                message.setJMSExpiration(0);
108             }
109             message.setJMSTimestamp(currentTime);
110         }
111 
112         if (!deliveryProp.hasDeliveryMode() || deliveryProp.getDeliveryMode().getValue() != deliveryMode)
113         {
114             MessageDeliveryMode mode;
115             switch (deliveryMode)
116             {
117             case DeliveryMode.PERSISTENT:
118                 mode = MessageDeliveryMode.PERSISTENT;
119                 break;
120             case DeliveryMode.NON_PERSISTENT:
121                 mode = MessageDeliveryMode.NON_PERSISTENT;
122                 break;
123             default:
124                 throw new IllegalArgumentException("illegal delivery mode: " + deliveryMode);
125             }
126             deliveryProp.setDeliveryMode(mode);
127             message.setJMSDeliveryMode(deliveryMode);
128         }
129         if (!deliveryProp.hasPriority() || deliveryProp.getPriority().getValue() != priority)
130         {
131             deliveryProp.setPriority(MessageDeliveryPriority.get((shortpriority));
132             message.setJMSPriority(priority);
133         }
134         String exchangeName = destination.getExchangeName().toString();
135         if deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName))
136         {
137             deliveryProp.setExchange(exchangeName);
138         }
139         String routingKey = destination.getRoutingKey().toString();
140         if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey))
141         {
142             deliveryProp.setRoutingKey(routingKey);
143         }
144 
145         messageProps.setContentLength(message.getContentLength());
146 
147         // send the message
148         try
149         {
150             org.apache.qpid.transport.Session ssn = (org.apache.qpid.transport.Session)
151                 ((AMQSession_0_10getSession()).getQpidSession();
152 
153             // if true, we need to sync the delivery of this message
154             boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
155                             getSession().getAMQConnection().getSyncPersistence());
156 
157             org.apache.mina.common.ByteBuffer data = message.getData();
158             ByteBuffer buffer = data == null ? ByteBuffer.allocate(0: data.buf().slice();
159             
160             ssn.messageTransfer(destination.getExchangeName().toString(), MessageAcceptMode.NONE,
161                                 MessageAcquireMode.PRE_ACQUIRED,
162                                 new Header(deliveryProp, messageProps),
163                     buffer, sync ? SYNC : NONE);
164             if (sync)
165             {
166                 ssn.sync();
167             }
168             
169             
170         }
171         catch (RuntimeException rte)
172         {
173             JMSException ex = new JMSException("Exception when sending message");
174             rte.printStackTrace();
175             ex.setLinkedException(rte);
176             throw ex;
177         }
178     }
179 
180 
181     public boolean isBound(AMQDestination destinationthrows JMSException
182     {
183         return _session.isQueueBound(destination);
184     }
185 }