001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021
022 package org.apache.qpid.client.message;
023
024 import org.apache.commons.collections.map.ReferenceMap;
025 import org.apache.mina.common.ByteBuffer;
026 import org.apache.qpid.client.AMQSession;
027 import org.apache.qpid.client.CustomJMSXProperty;
028 import org.apache.qpid.client.AMQDestination;
029 import org.apache.qpid.client.AMQQueue;
030 import org.apache.qpid.client.AMQTopic;
031 import org.apache.qpid.client.AMQUndefinedDestination;
032 import org.apache.qpid.client.JMSAMQException;
033 import org.apache.qpid.framing.ContentHeaderProperties;
034 import org.apache.qpid.framing.BasicContentHeaderProperties;
035 import org.apache.qpid.framing.AMQShortString;
036 import org.apache.qpid.AMQException;
037 import org.apache.qpid.url.BindingURL;
038 import org.apache.qpid.url.AMQBindingURL;
039
040 import javax.jms.Destination;
041 import javax.jms.JMSException;
042 import javax.jms.MessageNotWriteableException;
043 import java.util.Map;
044 import java.util.Collections;
045 import java.util.Enumeration;
046 import java.util.UUID;
047 import java.net.URISyntaxException;
048
049 public class AMQMessageDelegate_0_8 implements AMQMessageDelegate
050 {
051 private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
052
053 public static final String JMS_TYPE = "x-jms-type";
054
055
056 private boolean _readableProperties = false;
057
058 private Destination _destination;
059 private JMSHeaderAdapter _headerAdapter;
060 private static final boolean STRICT_AMQP_COMPLIANCE =
061 Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
062
063 private ContentHeaderProperties _contentHeaderProperties;
064 /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
065 private AMQSession _session;
066 private final long _deliveryTag;
067
068 protected AMQMessageDelegate_0_8()
069 {
070 this(new BasicContentHeaderProperties(), -1);
071 _readableProperties = false;
072 _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
073
074 }
075
076 protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
077 AMQShortString routingKey)
078 {
079 this(contentHeader, deliveryTag);
080
081 Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
082
083 if(type == null)
084 {
085 type = AMQDestination.UNKNOWN_TYPE;
086 }
087
088 AMQDestination dest;
089
090 switch(type.intValue())
091 {
092 case AMQDestination.QUEUE_TYPE:
093 dest = new AMQQueue(exchange, routingKey, routingKey);
094 break;
095 case AMQDestination.TOPIC_TYPE:
096 dest = new AMQTopic(exchange, routingKey, null);
097 break;
098 default:
099 dest = new AMQUndefinedDestination(exchange, routingKey, null);
100 }
101
102
103
104 // Destination dest = AMQDestination.createDestination(url);
105 setJMSDestination(dest);
106
107
108
109 }
110
111 protected AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
112 {
113 _contentHeaderProperties = properties;
114 _deliveryTag = deliveryTag;
115 _readableProperties = (_contentHeaderProperties != null);
116 _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
117 }
118
119
120 public String getJMSMessageID() throws JMSException
121 {
122 return getContentHeaderProperties().getMessageIdAsString();
123 }
124
125 public void setJMSMessageID(String messageId) throws JMSException
126 {
127 getContentHeaderProperties().setMessageId(messageId);
128 }
129
130 public void setJMSMessageID(UUID messageId) throws JMSException
131 {
132 getContentHeaderProperties().setMessageId("ID:" + messageId);
133 }
134
135
136 public long getJMSTimestamp() throws JMSException
137 {
138 return getContentHeaderProperties().getTimestamp();
139 }
140
141 public void setJMSTimestamp(long timestamp) throws JMSException
142 {
143 getContentHeaderProperties().setTimestamp(timestamp);
144 }
145
146 public byte[] getJMSCorrelationIDAsBytes() throws JMSException
147 {
148 return getContentHeaderProperties().getCorrelationIdAsString().getBytes();
149 }
150
151 public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
152 {
153 getContentHeaderProperties().setCorrelationId(new String(bytes));
154 }
155
156 public void setJMSCorrelationID(String correlationId) throws JMSException
157 {
158 getContentHeaderProperties().setCorrelationId(correlationId);
159 }
160
161 public String getJMSCorrelationID() throws JMSException
162 {
163 return getContentHeaderProperties().getCorrelationIdAsString();
164 }
165
166 public Destination getJMSReplyTo() throws JMSException
167 {
168 String replyToEncoding = getContentHeaderProperties().getReplyToAsString();
169 if (replyToEncoding == null)
170 {
171 return null;
172 }
173 else
174 {
175 Destination dest = (Destination) _destinationCache.get(replyToEncoding);
176 if (dest == null)
177 {
178 try
179 {
180 BindingURL binding = new AMQBindingURL(replyToEncoding);
181 dest = AMQDestination.createDestination(binding);
182 }
183 catch (URISyntaxException e)
184 {
185 throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
186 }
187
188 _destinationCache.put(replyToEncoding, dest);
189 }
190
191 return dest;
192 }
193 }
194
195 public void setJMSReplyTo(Destination destination) throws JMSException
196 {
197 if (destination == null)
198 {
199 getContentHeaderProperties().setReplyTo((String) null);
200 return; // We're done here
201 }
202
203 if (!(destination instanceof AMQDestination))
204 {
205 throw new IllegalArgumentException(
206 "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
207 }
208
209 final AMQDestination amqd = (AMQDestination) destination;
210
211 final AMQShortString encodedDestination = amqd.getEncodedName();
212 _destinationCache.put(encodedDestination, destination);
213 getContentHeaderProperties().setReplyTo(encodedDestination);
214 }
215
216 public Destination getJMSDestination() throws JMSException
217 {
218 return _destination;
219 }
220
221 public void setJMSDestination(Destination destination)
222 {
223 _destination = destination;
224 }
225
226 public void setContentType(String contentType)
227 {
228 getContentHeaderProperties().setContentType(contentType);
229 }
230
231 public String getContentType()
232 {
233 return getContentHeaderProperties().getContentTypeAsString();
234 }
235
236 public void setEncoding(String encoding)
237 {
238 getContentHeaderProperties().setEncoding(encoding);
239 }
240
241 public String getEncoding()
242 {
243 return getContentHeaderProperties().getEncodingAsString();
244 }
245
246 public String getReplyToString()
247 {
248 return getContentHeaderProperties().getReplyToAsString();
249 }
250
251 public int getJMSDeliveryMode() throws JMSException
252 {
253 return getContentHeaderProperties().getDeliveryMode();
254 }
255
256 public void setJMSDeliveryMode(int i) throws JMSException
257 {
258 getContentHeaderProperties().setDeliveryMode((byte) i);
259 }
260
261 public BasicContentHeaderProperties getContentHeaderProperties()
262 {
263 return (BasicContentHeaderProperties) _contentHeaderProperties;
264 }
265
266
267 public String getJMSType() throws JMSException
268 {
269 return getContentHeaderProperties().getTypeAsString();
270 }
271
272 public void setJMSType(String string) throws JMSException
273 {
274 getContentHeaderProperties().setType(string);
275 }
276
277 public long getJMSExpiration() throws JMSException
278 {
279 return getContentHeaderProperties().getExpiration();
280 }
281
282 public void setJMSExpiration(long l) throws JMSException
283 {
284 getContentHeaderProperties().setExpiration(l);
285 }
286
287
288
289 public boolean propertyExists(String propertyName) throws JMSException
290 {
291 return getJmsHeaders().propertyExists(propertyName);
292 }
293
294 public boolean getBooleanProperty(String propertyName) throws JMSException
295 {
296 if (STRICT_AMQP_COMPLIANCE)
297 {
298 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
299 }
300
301 return getJmsHeaders().getBoolean(propertyName);
302 }
303
304 public byte getByteProperty(String propertyName) throws JMSException
305 {
306 if (STRICT_AMQP_COMPLIANCE)
307 {
308 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
309 }
310
311 return getJmsHeaders().getByte(propertyName);
312 }
313
314 public short getShortProperty(String propertyName) throws JMSException
315 {
316 if (STRICT_AMQP_COMPLIANCE)
317 {
318 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
319 }
320
321 return getJmsHeaders().getShort(propertyName);
322 }
323
324 public int getIntProperty(String propertyName) throws JMSException
325 {
326 if (STRICT_AMQP_COMPLIANCE)
327 {
328 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
329 }
330
331 return getJmsHeaders().getInteger(propertyName);
332 }
333
334 public long getLongProperty(String propertyName) throws JMSException
335 {
336 if (STRICT_AMQP_COMPLIANCE)
337 {
338 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
339 }
340
341 return getJmsHeaders().getLong(propertyName);
342 }
343
344 public float getFloatProperty(String propertyName) throws JMSException
345 {
346 if (STRICT_AMQP_COMPLIANCE)
347 {
348 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
349 }
350
351 return getJmsHeaders().getFloat(propertyName);
352 }
353
354 public double getDoubleProperty(String propertyName) throws JMSException
355 {
356 if (STRICT_AMQP_COMPLIANCE)
357 {
358 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
359 }
360
361 return getJmsHeaders().getDouble(propertyName);
362 }
363
364 public String getStringProperty(String propertyName) throws JMSException
365 {
366 //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
367 if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
368 {
369 return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
370 }
371 else
372 {
373 if (STRICT_AMQP_COMPLIANCE)
374 {
375 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
376 }
377
378 return getJmsHeaders().getString(propertyName);
379 }
380 }
381
382 public Object getObjectProperty(String propertyName) throws JMSException
383 {
384 return getJmsHeaders().getObject(propertyName);
385 }
386
387 public Enumeration getPropertyNames() throws JMSException
388 {
389 return getJmsHeaders().getPropertyNames();
390 }
391
392 public void setBooleanProperty(String propertyName, boolean b) throws JMSException
393 {
394 if (STRICT_AMQP_COMPLIANCE)
395 {
396 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
397 }
398
399 checkWritableProperties();
400 getJmsHeaders().setBoolean(propertyName, b);
401 }
402
403 public void setByteProperty(String propertyName, byte b) throws JMSException
404 {
405 if (STRICT_AMQP_COMPLIANCE)
406 {
407 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
408 }
409
410 checkWritableProperties();
411 getJmsHeaders().setByte(propertyName, new Byte(b));
412 }
413
414 public void setShortProperty(String propertyName, short i) throws JMSException
415 {
416 if (STRICT_AMQP_COMPLIANCE)
417 {
418 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
419 }
420
421 checkWritableProperties();
422 getJmsHeaders().setShort(propertyName, new Short(i));
423 }
424
425 public void setIntProperty(String propertyName, int i) throws JMSException
426 {
427 checkWritableProperties();
428 getJmsHeaders().setInteger(propertyName, new Integer(i));
429 }
430
431 public void setLongProperty(String propertyName, long l) throws JMSException
432 {
433 if (STRICT_AMQP_COMPLIANCE)
434 {
435 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
436 }
437
438 checkWritableProperties();
439 getJmsHeaders().setLong(propertyName, new Long(l));
440 }
441
442 public void setFloatProperty(String propertyName, float f) throws JMSException
443 {
444 if (STRICT_AMQP_COMPLIANCE)
445 {
446 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
447 }
448
449 checkWritableProperties();
450 getJmsHeaders().setFloat(propertyName, new Float(f));
451 }
452
453 public void setDoubleProperty(String propertyName, double v) throws JMSException
454 {
455 if (STRICT_AMQP_COMPLIANCE)
456 {
457 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
458 }
459
460 checkWritableProperties();
461 getJmsHeaders().setDouble(propertyName, new Double(v));
462 }
463
464 public void setStringProperty(String propertyName, String value) throws JMSException
465 {
466 checkWritableProperties();
467 getJmsHeaders().setString(propertyName, value);
468 }
469
470 public void setObjectProperty(String propertyName, Object object) throws JMSException
471 {
472 checkWritableProperties();
473 getJmsHeaders().setObject(propertyName, object);
474 }
475
476 public void removeProperty(String propertyName) throws JMSException
477 {
478 getJmsHeaders().remove(propertyName);
479 }
480
481
482 private JMSHeaderAdapter getJmsHeaders()
483 {
484 return _headerAdapter;
485 }
486
487 protected void checkWritableProperties() throws MessageNotWriteableException
488 {
489 if (_readableProperties)
490 {
491 throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
492 }
493 _contentHeaderProperties.updated();
494 }
495
496
497 public int getJMSPriority() throws JMSException
498 {
499 return getContentHeaderProperties().getPriority();
500 }
501
502 public void setJMSPriority(int i) throws JMSException
503 {
504 getContentHeaderProperties().setPriority((byte) i);
505 }
506
507 public void clearProperties() throws JMSException
508 {
509 getJmsHeaders().clear();
510
511 _readableProperties = false;
512 }
513
514
515 public void acknowledgeThis() throws JMSException
516 {
517 // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
518 // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
519 if (_session != null)
520 {
521 if (_session.getAMQConnection().isClosed())
522 {
523 throw new javax.jms.IllegalStateException("Connection is already closed");
524 }
525
526 // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
527 // received on the session
528 _session.acknowledgeMessage(_deliveryTag, true);
529 }
530 }
531
532 public void acknowledge() throws JMSException
533 {
534 if (_session != null)
535 {
536 _session.acknowledge();
537 }
538 }
539
540
541 /**
542 * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
543 * acknowledge()
544 *
545 * @param s the AMQ session that delivered this message
546 */
547 public void setAMQSession(AMQSession s)
548 {
549 _session = s;
550 }
551
552 public AMQSession getAMQSession()
553 {
554 return _session;
555 }
556
557 /**
558 * Get the AMQ message number assigned to this message
559 *
560 * @return the message number
561 */
562 public long getDeliveryTag()
563 {
564 return _deliveryTag;
565 }
566
567
568 }
|