JMSObjectMessage.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client.message;
022 
023 import java.io.IOException;
024 import java.io.InputStream;
025 import java.io.ObjectInputStream;
026 import java.io.ObjectOutputStream;
027 import java.io.Serializable;
028 import java.nio.charset.CharacterCodingException;
029 import java.nio.charset.Charset;
030 
031 import javax.jms.JMSException;
032 import javax.jms.MessageFormatException;
033 import javax.jms.ObjectMessage;
034 
035 import org.apache.mina.common.ByteBuffer;
036 
037 import org.apache.qpid.AMQException;
038 import org.apache.qpid.framing.AMQShortString;
039 import org.apache.qpid.framing.BasicContentHeaderProperties;
040 
041 public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
042 {
043     public static final String MIME_TYPE = "application/java-object-stream";
044 
045 
046     private static final int DEFAULT_BUFFER_SIZE = 1024;
047 
048     /**
049      * Creates empty, writable message for use by producers
050      @param delegateFactory
051      */
052     public JMSObjectMessage(AMQMessageDelegateFactory delegateFactory)
053     {
054         this(delegateFactory, null);
055     }
056 
057     private JMSObjectMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
058     {
059         super(delegateFactory, data);
060         if (data == null)
061         {
062             _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
063             _data.setAutoExpand(true);
064         }
065 
066         setContentType(getMimeType());
067     }
068 
069     /**
070      * Creates read only message for delivery to consumers
071      */
072 
073       JMSObjectMessage(AMQMessageDelegate delegate, ByteBuffer datathrows AMQException
074       {
075           super(delegate, data);
076       }
077 
078 
079     public void clearBodyImpl() throws JMSException
080     {
081         if (_data != null)
082         {
083             _data.release();
084             _data = null;
085         }
086 
087 
088 
089     }
090 
091     public String toBodyString() throws JMSException
092     {
093         return String.valueOf(getObject());
094     }
095 
096     public String getMimeType()
097     {
098         return MIME_TYPE;
099     }
100 
101     public void setObject(Serializable serializablethrows JMSException
102     {
103         checkWritable();
104 
105         if (_data == null)
106         {
107             _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
108             _data.setAutoExpand(true);
109         }
110         else
111         {
112             _data.rewind();
113         }
114 
115         try
116         {
117             ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream());
118             out.writeObject(serializable);
119             out.flush();
120             out.close();
121         }
122         catch (IOException e)
123         {
124             MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e);
125             mfe.setLinkedException(e);
126             throw mfe;
127         }
128 
129     }
130 
131     public Serializable getObject() throws JMSException
132     {
133         ObjectInputStream in = null;
134         if (_data == null)
135         {
136             return null;
137         }
138 
139         try
140         {
141             _data.rewind();
142             in = new ObjectInputStream(_data.asInputStream());
143 
144             return (Serializablein.readObject();
145         }
146         catch (IOException e)
147         {
148             MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
149             mfe.setLinkedException(e);
150             throw mfe;
151         }
152         catch (ClassNotFoundException e)
153         {
154             MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
155             mfe.setLinkedException(e);
156             throw mfe;
157         }
158         finally
159         {
160             _data.rewind();
161             close(in);
162         }
163     }
164 
165     private static void close(InputStream in)
166     {
167         try
168         {
169             if (in != null)
170             {
171                 in.close();
172             }
173         }
174         catch (IOException ignore)
175         { }
176     }
177 }