AbstractJMSMessageFactory.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client.message;
022 
023 import org.apache.mina.common.ByteBuffer;
024 
025 import org.apache.qpid.AMQException;
026 import org.apache.qpid.framing.AMQShortString;
027 import org.apache.qpid.framing.ContentBody;
028 import org.apache.qpid.framing.ContentHeaderBody;
029 import org.apache.qpid.framing.BasicContentHeaderProperties;
030 import org.apache.qpid.transport.MessageProperties;
031 import org.apache.qpid.transport.DeliveryProperties;
032 
033 import org.slf4j.Logger;
034 import org.slf4j.LoggerFactory;
035 
036 import javax.jms.JMSException;
037 
038 import java.util.Iterator;
039 import java.util.List;
040 
041 public abstract class AbstractJMSMessageFactory implements MessageFactory
042 {
043     private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
044 
045     protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
046                                                          AMQShortString exchange, AMQShortString routingKey,
047                                                          List bodiesthrows AMQException
048     {
049         ByteBuffer data;
050         final boolean debug = _logger.isDebugEnabled();
051 
052         // we optimise the non-fragmented case to avoid copying
053         if ((bodies != null&& (bodies.size() == 1))
054         {
055             if (debug)
056             {
057                 _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")");
058             }
059 
060             data = ((ContentBodybodies.get(0)).payload;
061         }
062         else if (bodies != null)
063         {
064             if (debug)
065             {
066                 _logger.debug("Fragmented message body (" + bodies
067                         .size() " frames, bodySize=" + contentHeader.bodySize + ")");
068             }
069 
070             data = ByteBuffer.allocate((intcontentHeader.bodySize)// XXX: Is cast a problem?
071             final Iterator it = bodies.iterator();
072             while (it.hasNext())
073             {
074                 ContentBody cb = (ContentBodyit.next();
075                 final ByteBuffer payload = cb.payload;
076                 if(payload.isDirect() || payload.isReadOnly())
077                 {
078                     data.put(payload);
079                 }
080                 else
081                 {
082                     data.put(payload.array(), payload.arrayOffset(), payload.limit());
083                 }
084 
085                 payload.release();
086             }
087 
088             data.flip();
089         }
090         else // bodies == null
091         {
092             data = ByteBuffer.allocate(0);
093         }
094 
095         if (debug)
096         {
097             _logger.debug("Creating message from buffer with position=" + data.position() " and remaining=" + data
098                     .remaining());
099         }
100 
101         AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
102                                                                  (BasicContentHeaderPropertiescontentHeader.properties,
103                                                                  exchange, routingKey);
104 
105         return createMessage(delegate, data);
106     }
107 
108     protected abstract AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer datathrows AMQException;
109 
110 
111     protected AbstractJMSMessage create010MessageWithBody(long messageNbr, MessageProperties msgProps,
112                                                           DeliveryProperties deliveryProps,  
113                                                           java.nio.ByteBuffer bodythrows AMQException
114     {
115         ByteBuffer data;
116         final boolean debug = _logger.isDebugEnabled();
117 
118 
119         if (body != null)
120         {
121             data = ByteBuffer.wrap(body);
122         }
123         else // body == null
124         {
125             data = ByteBuffer.allocate(0);
126         }
127 
128         if (debug)
129         {
130             _logger.debug("Creating message from buffer with position=" + data.position() " and remaining=" + data
131                     .remaining());
132         }
133         AMQMessageDelegate delegate = new AMQMessageDelegate_0_10(msgProps, deliveryProps, messageNbr);
134 
135         AbstractJMSMessage message = createMessage(delegate, data);
136         return message;
137     }
138 
139     private static final String asString(byte[] bytes)
140     {
141         if (bytes == null)
142         {
143             return null;
144         }
145         else
146         {
147             return new String(bytes);
148         }
149     }
150 
151 
152     public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader,
153                                             AMQShortString exchange, AMQShortString routingKey, List bodies)
154             throws JMSException, AMQException
155     {
156         final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
157         msg.setJMSRedelivered(redelivered);
158         msg.receivedFromServer();
159         return msg;
160     }
161 
162     public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, MessageProperties msgProps,
163                                             DeliveryProperties deliveryProps, java.nio.ByteBuffer body)
164             throws JMSException, AMQException
165     {
166         final AbstractJMSMessage msg =
167                 create010MessageWithBody(messageNbr,msgProps,deliveryProps, body);
168         msg.setJMSRedelivered(redelivered);
169         msg.receivedFromServer();
170         return msg;
171     }
172 
173 }