AMQMessageDelegate_0_8.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 
022 package org.apache.qpid.client.message;
023 
024 import org.apache.commons.collections.map.ReferenceMap;
025 import org.apache.mina.common.ByteBuffer;
026 import org.apache.qpid.client.AMQSession;
027 import org.apache.qpid.client.CustomJMSXProperty;
028 import org.apache.qpid.client.AMQDestination;
029 import org.apache.qpid.client.AMQQueue;
030 import org.apache.qpid.client.AMQTopic;
031 import org.apache.qpid.client.AMQUndefinedDestination;
032 import org.apache.qpid.client.JMSAMQException;
033 import org.apache.qpid.framing.ContentHeaderProperties;
034 import org.apache.qpid.framing.BasicContentHeaderProperties;
035 import org.apache.qpid.framing.AMQShortString;
036 import org.apache.qpid.AMQException;
037 import org.apache.qpid.url.BindingURL;
038 import org.apache.qpid.url.AMQBindingURL;
039 
040 import javax.jms.Destination;
041 import javax.jms.JMSException;
042 import javax.jms.MessageNotWriteableException;
043 import java.util.Map;
044 import java.util.Collections;
045 import java.util.Enumeration;
046 import java.util.UUID;
047 import java.net.URISyntaxException;
048 
049 public class AMQMessageDelegate_0_8 implements AMQMessageDelegate
050 {
051     private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
052 
053     public static final String JMS_TYPE = "x-jms-type";
054 
055 
056     private boolean _readableProperties = false;
057 
058     private Destination _destination;
059     private JMSHeaderAdapter _headerAdapter;
060     private static final boolean STRICT_AMQP_COMPLIANCE =
061             Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
062 
063     private ContentHeaderProperties _contentHeaderProperties;
064     /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
065     private AMQSession _session;
066     private final long _deliveryTag;
067 
068     protected AMQMessageDelegate_0_8()
069     {
070         this(new BasicContentHeaderProperties(), -1);
071         _readableProperties = false;
072         _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties_contentHeaderProperties).getHeaders());
073 
074     }
075 
076     protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
077                                      AMQShortString routingKey
078     {
079         this(contentHeader, deliveryTag);
080 
081         Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
082 
083         if(type == null)
084         {
085             type = AMQDestination.UNKNOWN_TYPE;
086         }
087 
088         AMQDestination dest;
089 
090         switch(type.intValue())
091         {
092             case AMQDestination.QUEUE_TYPE:
093                 dest = new AMQQueue(exchange, routingKey, routingKey);
094                 break;
095             case  AMQDestination.TOPIC_TYPE:
096                 dest = new AMQTopic(exchange, routingKey, null);
097                 break;
098             default:
099                 dest = new AMQUndefinedDestination(exchange, routingKey, null);
100         }
101         
102 
103 
104         // Destination dest = AMQDestination.createDestination(url);
105         setJMSDestination(dest);
106 
107 
108 
109     }
110 
111     protected AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
112     {
113         _contentHeaderProperties = properties;
114         _deliveryTag = deliveryTag;
115         _readableProperties = (_contentHeaderProperties != null);
116         _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties_contentHeaderProperties).getHeaders());
117     }
118 
119 
120     public String getJMSMessageID() throws JMSException
121     {
122         return getContentHeaderProperties().getMessageIdAsString();
123     }
124 
125     public void setJMSMessageID(String messageIdthrows JMSException
126     {
127         getContentHeaderProperties().setMessageId(messageId);
128     }
129 
130     public void setJMSMessageID(UUID messageIdthrows JMSException
131     {
132         getContentHeaderProperties().setMessageId("ID:" + messageId);
133     }
134 
135 
136     public long getJMSTimestamp() throws JMSException
137     {
138         return getContentHeaderProperties().getTimestamp();
139     }
140 
141     public void setJMSTimestamp(long timestampthrows JMSException
142     {
143         getContentHeaderProperties().setTimestamp(timestamp);
144     }
145 
146     public byte[] getJMSCorrelationIDAsBytes() throws JMSException
147     {
148         return getContentHeaderProperties().getCorrelationIdAsString().getBytes();
149     }
150 
151     public void setJMSCorrelationIDAsBytes(byte[] bytesthrows JMSException
152     {
153         getContentHeaderProperties().setCorrelationId(new String(bytes));
154     }
155 
156     public void setJMSCorrelationID(String correlationIdthrows JMSException
157     {
158         getContentHeaderProperties().setCorrelationId(correlationId);
159     }
160 
161     public String getJMSCorrelationID() throws JMSException
162     {
163         return getContentHeaderProperties().getCorrelationIdAsString();
164     }
165 
166     public Destination getJMSReplyTo() throws JMSException
167     {
168         String replyToEncoding = getContentHeaderProperties().getReplyToAsString();
169         if (replyToEncoding == null)
170         {
171             return null;
172         }
173         else
174         {
175             Destination dest = (Destination_destinationCache.get(replyToEncoding);
176             if (dest == null)
177             {
178                 try
179                 {
180                     BindingURL binding = new AMQBindingURL(replyToEncoding);
181                     dest = AMQDestination.createDestination(binding);
182                 }
183                 catch (URISyntaxException e)
184                 {
185                     throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
186                 }
187 
188                 _destinationCache.put(replyToEncoding, dest);
189             }
190 
191             return dest;
192         }
193     }
194 
195     public void setJMSReplyTo(Destination destinationthrows JMSException
196     {
197         if (destination == null)
198         {
199             getContentHeaderProperties().setReplyTo((Stringnull);
200             return// We're done here
201         }
202 
203         if (!(destination instanceof AMQDestination))
204         {
205             throw new IllegalArgumentException(
206                 "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
207         }
208 
209         final AMQDestination amqd = (AMQDestinationdestination;
210 
211         final AMQShortString encodedDestination = amqd.getEncodedName();
212         _destinationCache.put(encodedDestination, destination);
213         getContentHeaderProperties().setReplyTo(encodedDestination);
214     }
215 
216     public Destination getJMSDestination() throws JMSException
217     {
218         return _destination;
219     }
220 
221     public void setJMSDestination(Destination destination)
222     {
223         _destination = destination;
224     }
225 
226     public void setContentType(String contentType)
227     {
228         getContentHeaderProperties().setContentType(contentType);
229     }
230 
231     public String getContentType()
232     {
233         return getContentHeaderProperties().getContentTypeAsString();
234     }
235 
236     public void setEncoding(String encoding)
237     {
238         getContentHeaderProperties().setEncoding(encoding);
239     }
240 
241     public String getEncoding()
242     {
243         return getContentHeaderProperties().getEncodingAsString();
244     }
245 
246     public String getReplyToString()
247     {
248         return getContentHeaderProperties().getReplyToAsString();
249     }
250 
251     public int getJMSDeliveryMode() throws JMSException
252     {
253         return getContentHeaderProperties().getDeliveryMode();
254     }
255 
256     public void setJMSDeliveryMode(int ithrows JMSException
257     {
258         getContentHeaderProperties().setDeliveryMode((bytei);
259     }
260 
261     public BasicContentHeaderProperties getContentHeaderProperties()
262     {
263         return (BasicContentHeaderProperties_contentHeaderProperties;
264     }
265 
266 
267     public String getJMSType() throws JMSException
268     {
269         return getContentHeaderProperties().getTypeAsString();
270     }
271 
272     public void setJMSType(String stringthrows JMSException
273     {
274         getContentHeaderProperties().setType(string);
275     }
276 
277     public long getJMSExpiration() throws JMSException
278     {
279         return getContentHeaderProperties().getExpiration();
280     }
281 
282     public void setJMSExpiration(long lthrows JMSException
283     {
284         getContentHeaderProperties().setExpiration(l);
285     }
286 
287 
288 
289     public boolean propertyExists(String propertyNamethrows JMSException
290     {
291         return getJmsHeaders().propertyExists(propertyName);
292     }
293 
294     public boolean getBooleanProperty(String propertyNamethrows JMSException
295     {
296         if (STRICT_AMQP_COMPLIANCE)
297         {
298             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
299         }
300 
301         return getJmsHeaders().getBoolean(propertyName);
302     }
303 
304     public byte getByteProperty(String propertyNamethrows JMSException
305     {
306         if (STRICT_AMQP_COMPLIANCE)
307         {
308             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
309         }
310 
311         return getJmsHeaders().getByte(propertyName);
312     }
313 
314     public short getShortProperty(String propertyNamethrows JMSException
315     {
316         if (STRICT_AMQP_COMPLIANCE)
317         {
318             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
319         }
320 
321         return getJmsHeaders().getShort(propertyName);
322     }
323 
324     public int getIntProperty(String propertyNamethrows JMSException
325     {
326         if (STRICT_AMQP_COMPLIANCE)
327         {
328             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
329         }
330 
331         return getJmsHeaders().getInteger(propertyName);
332     }
333 
334     public long getLongProperty(String propertyNamethrows JMSException
335     {
336         if (STRICT_AMQP_COMPLIANCE)
337         {
338             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
339         }
340 
341         return getJmsHeaders().getLong(propertyName);
342     }
343 
344     public float getFloatProperty(String propertyNamethrows JMSException
345     {
346         if (STRICT_AMQP_COMPLIANCE)
347         {
348             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
349         }
350 
351         return getJmsHeaders().getFloat(propertyName);
352     }
353 
354     public double getDoubleProperty(String propertyNamethrows JMSException
355     {
356         if (STRICT_AMQP_COMPLIANCE)
357         {
358             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
359         }
360 
361         return getJmsHeaders().getDouble(propertyName);
362     }
363 
364     public String getStringProperty(String propertyNamethrows JMSException
365     {
366         //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
367         if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
368         {
369             return ((BasicContentHeaderProperties_contentHeaderProperties).getUserIdAsString();
370         }
371         else
372         {
373             if (STRICT_AMQP_COMPLIANCE)
374             {
375                 throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
376             }
377 
378             return getJmsHeaders().getString(propertyName);
379         }
380     }
381 
382     public Object getObjectProperty(String propertyNamethrows JMSException
383     {
384         return getJmsHeaders().getObject(propertyName);
385     }
386 
387     public Enumeration getPropertyNames() throws JMSException
388     {
389         return getJmsHeaders().getPropertyNames();
390     }
391 
392     public void setBooleanProperty(String propertyName, boolean bthrows JMSException
393     {
394         if (STRICT_AMQP_COMPLIANCE)
395         {
396             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
397         }
398 
399         checkWritableProperties();
400         getJmsHeaders().setBoolean(propertyName, b);
401     }
402 
403     public void setByteProperty(String propertyName, byte bthrows JMSException
404     {
405         if (STRICT_AMQP_COMPLIANCE)
406         {
407             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
408         }
409 
410         checkWritableProperties();
411         getJmsHeaders().setByte(propertyName, new Byte(b));
412     }
413 
414     public void setShortProperty(String propertyName, short ithrows JMSException
415     {
416         if (STRICT_AMQP_COMPLIANCE)
417         {
418             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
419         }
420 
421         checkWritableProperties();
422         getJmsHeaders().setShort(propertyName, new Short(i));
423     }
424 
425     public void setIntProperty(String propertyName, int ithrows JMSException
426     {
427         checkWritableProperties();
428         getJmsHeaders().setInteger(propertyName, new Integer(i));
429     }
430 
431     public void setLongProperty(String propertyName, long lthrows JMSException
432     {
433         if (STRICT_AMQP_COMPLIANCE)
434         {
435             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
436         }
437 
438         checkWritableProperties();
439         getJmsHeaders().setLong(propertyName, new Long(l));
440     }
441 
442     public void setFloatProperty(String propertyName, float fthrows JMSException
443     {
444         if (STRICT_AMQP_COMPLIANCE)
445         {
446             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
447         }
448 
449         checkWritableProperties();
450         getJmsHeaders().setFloat(propertyName, new Float(f));
451     }
452 
453     public void setDoubleProperty(String propertyName, double vthrows JMSException
454     {
455         if (STRICT_AMQP_COMPLIANCE)
456         {
457             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
458         }
459 
460         checkWritableProperties();
461         getJmsHeaders().setDouble(propertyName, new Double(v));
462     }
463 
464     public void setStringProperty(String propertyName, String valuethrows JMSException
465     {
466         checkWritableProperties();
467         getJmsHeaders().setString(propertyName, value);
468     }
469 
470     public void setObjectProperty(String propertyName, Object objectthrows JMSException
471     {
472         checkWritableProperties();
473         getJmsHeaders().setObject(propertyName, object);
474     }
475 
476     public void removeProperty(String propertyNamethrows JMSException
477     {
478         getJmsHeaders().remove(propertyName);
479     }
480 
481 
482     private JMSHeaderAdapter getJmsHeaders()
483     {
484         return _headerAdapter;
485     }
486 
487     protected void checkWritableProperties() throws MessageNotWriteableException
488     {
489         if (_readableProperties)
490         {
491             throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
492         }
493         _contentHeaderProperties.updated();
494     }
495 
496 
497     public int getJMSPriority() throws JMSException
498     {
499         return getContentHeaderProperties().getPriority();
500     }
501 
502     public void setJMSPriority(int ithrows JMSException
503     {
504         getContentHeaderProperties().setPriority((bytei);
505     }
506 
507     public void clearProperties() throws JMSException
508     {
509         getJmsHeaders().clear();
510 
511         _readableProperties = false;
512     }
513 
514 
515     public void acknowledgeThis() throws JMSException
516     {
517         // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
518         // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
519         if (_session != null)
520         {
521             if (_session.getAMQConnection().isClosed())
522             {
523                 throw new javax.jms.IllegalStateException("Connection is already closed");
524             }
525 
526             // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
527             // received on the session
528             _session.acknowledgeMessage(_deliveryTag, true);
529         }
530     }
531 
532     public void acknowledge() throws JMSException
533     {
534         if (_session != null)
535         {
536             _session.acknowledge();
537         }
538     }
539     
540 
541      /**
542      * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
543      * acknowledge()
544      *
545      @param s the AMQ session that delivered this message
546      */
547     public void setAMQSession(AMQSession s)
548     {
549         _session = s;
550     }
551 
552     public AMQSession getAMQSession()
553     {
554         return _session;
555     }
556 
557     /**
558      * Get the AMQ message number assigned to this message
559      *
560      @return the message number
561      */
562     public long getDeliveryTag()
563     {
564         return _deliveryTag;
565     }
566 
567 
568 }