001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client.message;
022
023 import org.slf4j.Logger;
024 import org.slf4j.LoggerFactory;
025 import org.apache.qpid.client.AMQSession;
026
027 import javax.jms.*;
028
029 import java.util.Enumeration;
030
031 public class MessageConverter
032 {
033
034 /**
035 * Log4J logger
036 */
037 protected final Logger _logger = LoggerFactory.getLogger(getClass());
038
039 /**
040 * AbstractJMSMessage which will hold the converted message
041 */
042 private AbstractJMSMessage _newMessage;
043
044 public MessageConverter(AbstractJMSMessage message) throws JMSException
045 {
046 _newMessage = message;
047 }
048
049 public MessageConverter(AMQSession session, BytesMessage bytesMessage) throws JMSException
050 {
051 bytesMessage.reset();
052
053 JMSBytesMessage nativeMsg = (JMSBytesMessage) session.createBytesMessage();
054
055 byte[] buf = new byte[1024];
056
057 int len;
058
059 while ((len = bytesMessage.readBytes(buf)) != -1)
060 {
061 nativeMsg.writeBytes(buf, 0, len);
062 }
063
064 _newMessage = nativeMsg;
065 setMessageProperties(bytesMessage);
066 }
067
068 public MessageConverter(AMQSession session, MapMessage message) throws JMSException
069 {
070 MapMessage nativeMessage = session.createMapMessage();
071
072 Enumeration mapNames = message.getMapNames();
073 while (mapNames.hasMoreElements())
074 {
075 String name = (String) mapNames.nextElement();
076 nativeMessage.setObject(name, message.getObject(name));
077 }
078
079 _newMessage = (AbstractJMSMessage) nativeMessage;
080 setMessageProperties(message);
081 }
082
083 public MessageConverter(AMQSession session, ObjectMessage origMessage) throws JMSException
084 {
085
086 ObjectMessage nativeMessage = session.createObjectMessage();
087
088 nativeMessage.setObject(origMessage.getObject());
089
090 _newMessage = (AbstractJMSMessage) nativeMessage;
091 setMessageProperties(origMessage);
092
093 }
094
095 public MessageConverter(AMQSession session, TextMessage message) throws JMSException
096 {
097 TextMessage nativeMessage = session.createTextMessage();
098
099 nativeMessage.setText(message.getText());
100
101 _newMessage = (AbstractJMSMessage) nativeMessage;
102 setMessageProperties(message);
103 }
104
105 public MessageConverter(AMQSession session, StreamMessage message) throws JMSException
106 {
107 StreamMessage nativeMessage = session.createStreamMessage();
108
109 try
110 {
111 message.reset();
112 while (true)
113 {
114 nativeMessage.writeObject(message.readObject());
115 }
116 }
117 catch (MessageEOFException e)
118 {
119 // we're at the end so don't mind the exception
120 }
121
122 _newMessage = (AbstractJMSMessage) nativeMessage;
123 setMessageProperties(message);
124 }
125
126 public MessageConverter(AMQSession session, Message message) throws JMSException
127 {
128 // Send a message with just properties.
129 // Throwing away content
130 Message nativeMessage = session.createMessage();
131
132 _newMessage = (AbstractJMSMessage) nativeMessage;
133 setMessageProperties(message);
134 }
135
136 public AbstractJMSMessage getConvertedMessage()
137 {
138 return _newMessage;
139 }
140
141 /**
142 * Sets all message properties
143 */
144 protected void setMessageProperties(Message message) throws JMSException
145 {
146 setNonJMSProperties(message);
147 setJMSProperties(message);
148 }
149
150 /**
151 * Sets all non-JMS defined properties on converted message
152 */
153 protected void setNonJMSProperties(Message message) throws JMSException
154 {
155 Enumeration propertyNames = message.getPropertyNames();
156 while (propertyNames.hasMoreElements())
157 {
158 String propertyName = String.valueOf(propertyNames.nextElement());
159 // TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
160 if (!propertyName.startsWith("JMSX_"))
161 {
162 Object value = message.getObjectProperty(propertyName);
163 _newMessage.setObjectProperty(propertyName, value);
164 }
165 }
166 }
167
168 /**
169 * Exposed JMS defined properties on converted message:
170 * JMSDestination - we don't set here
171 * JMSDeliveryMode - set
172 * JMSExpiration - we don't set here
173 * JMSPriority - we don't set here
174 * JMSMessageID - we don't set here
175 * JMSTimestamp - we don't set here
176 * JMSCorrelationID - set
177 * JMSReplyTo - set
178 * JMSType - set
179 * JMSRedlivered - we don't set here
180 */
181 protected void setJMSProperties(Message message) throws JMSException
182 {
183 _newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
184
185 if (message.getJMSReplyTo() != null)
186 {
187 _newMessage.setJMSReplyTo(message.getJMSReplyTo());
188 }
189
190 _newMessage.setJMSType(message.getJMSType());
191
192 _newMessage.setJMSCorrelationID(message.getJMSCorrelationID());
193 }
194
195 }
|