001 /* Licensed to the Apache Software Foundation (ASF) under one
002 * or more contributor license agreements. See the NOTICE file
003 * distributed with this work for additional information
004 * regarding copyright ownership. The ASF licenses this file
005 * to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance
007 * with the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing,
012 * software distributed under the License is distributed on an
013 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
014 * KIND, either express or implied. See the License for the
015 * specific language governing permissions and limitations
016 * under the License.
017 */
018 package org.apache.qpid.client;
019
020 import java.io.IOException;
021 import java.net.URISyntaxException;
022 import java.util.HashMap;
023 import java.util.Map;
024 import java.util.UUID;
025 import java.nio.ByteBuffer;
026
027 import javax.jms.JMSException;
028 import javax.jms.Message;
029 import javax.jms.DeliveryMode;
030
031 import org.apache.qpid.client.message.AbstractJMSMessage;
032 import org.apache.qpid.client.message.FiledTableSupport;
033 import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
034 import org.apache.qpid.client.protocol.AMQProtocolHandler;
035 import org.apache.qpid.framing.AMQShortString;
036 import org.apache.qpid.framing.BasicContentHeaderProperties;
037 import org.apache.qpid.url.AMQBindingURL;
038 import org.apache.qpid.util.Strings;
039 import org.apache.qpid.njms.ExceptionHelper;
040 import org.apache.qpid.transport.*;
041 import static org.apache.qpid.transport.Option.*;
042
043 /**
044 * This is a 0_10 message producer.
045 */
046 public class BasicMessageProducer_0_10 extends BasicMessageProducer
047 {
048 private byte[] userIDBytes;
049
050 BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
051 AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
052 boolean immediate, boolean mandatory, boolean waitUntilSent)
053 {
054 super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate,
055 mandatory, waitUntilSent);
056
057 userIDBytes = Strings.toUTF8(_userID);
058 }
059
060 void declareDestination(AMQDestination destination)
061 {
062 ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare(destination.getExchangeName().toString(),
063 destination.getExchangeClass().toString(),
064 null, null);
065 }
066
067 //--- Overwritten methods
068
069 /**
070 * Sends a message to a given destination
071 */
072 void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
073 UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
074 boolean immediate, boolean wait) throws JMSException
075 {
076 message.prepareForSending();
077
078 AMQMessageDelegate_0_10 delegate = (AMQMessageDelegate_0_10) message.getDelegate();
079
080 DeliveryProperties deliveryProp = delegate.getDeliveryProperties();
081 MessageProperties messageProps = delegate.getMessageProperties();
082
083 // On the receiving side, this will be read in to the JMSXUserID as well.
084 messageProps.setUserId(userIDBytes);
085
086 if (messageId != null)
087 {
088 messageProps.setMessageId(messageId);
089 }
090 else if (messageProps.hasMessageId())
091 {
092 messageProps.clearMessageId();
093 }
094
095 if (!_disableTimestamps)
096 {
097 final long currentTime = System.currentTimeMillis();
098 deliveryProp.setTimestamp(currentTime);
099 if (timeToLive > 0)
100 {
101 deliveryProp.setExpiration(currentTime + timeToLive);
102 message.setJMSExpiration(currentTime + timeToLive);
103 }
104 else
105 {
106 deliveryProp.setExpiration(0);
107 message.setJMSExpiration(0);
108 }
109 message.setJMSTimestamp(currentTime);
110 }
111
112 if (!deliveryProp.hasDeliveryMode() || deliveryProp.getDeliveryMode().getValue() != deliveryMode)
113 {
114 MessageDeliveryMode mode;
115 switch (deliveryMode)
116 {
117 case DeliveryMode.PERSISTENT:
118 mode = MessageDeliveryMode.PERSISTENT;
119 break;
120 case DeliveryMode.NON_PERSISTENT:
121 mode = MessageDeliveryMode.NON_PERSISTENT;
122 break;
123 default:
124 throw new IllegalArgumentException("illegal delivery mode: " + deliveryMode);
125 }
126 deliveryProp.setDeliveryMode(mode);
127 message.setJMSDeliveryMode(deliveryMode);
128 }
129 if (!deliveryProp.hasPriority() || deliveryProp.getPriority().getValue() != priority)
130 {
131 deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority));
132 message.setJMSPriority(priority);
133 }
134 String exchangeName = destination.getExchangeName().toString();
135 if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName))
136 {
137 deliveryProp.setExchange(exchangeName);
138 }
139 String routingKey = destination.getRoutingKey().toString();
140 if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey))
141 {
142 deliveryProp.setRoutingKey(routingKey);
143 }
144
145 messageProps.setContentLength(message.getContentLength());
146
147 // send the message
148 try
149 {
150 org.apache.qpid.transport.Session ssn = (org.apache.qpid.transport.Session)
151 ((AMQSession_0_10) getSession()).getQpidSession();
152
153 // if true, we need to sync the delivery of this message
154 boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
155 getSession().getAMQConnection().getSyncPersistence());
156
157 org.apache.mina.common.ByteBuffer data = message.getData();
158 ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
159
160 ssn.messageTransfer(destination.getExchangeName().toString(), MessageAcceptMode.NONE,
161 MessageAcquireMode.PRE_ACQUIRED,
162 new Header(deliveryProp, messageProps),
163 buffer, sync ? SYNC : NONE);
164 if (sync)
165 {
166 ssn.sync();
167 }
168
169
170 }
171 catch (RuntimeException rte)
172 {
173 JMSException ex = new JMSException("Exception when sending message");
174 rte.printStackTrace();
175 ex.setLinkedException(rte);
176 throw ex;
177 }
178 }
179
180
181 public boolean isBound(AMQDestination destination) throws JMSException
182 {
183 return _session.isQueueBound(destination);
184 }
185 }
|