001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client.message;
022
023 import java.util.ArrayList;
024 import java.util.Collections;
025 import java.util.List;
026
027 import org.apache.qpid.framing.AMQShortString;
028 import org.apache.qpid.framing.BasicDeliverBody;
029 import org.apache.qpid.framing.ContentBody;
030 import org.apache.qpid.framing.ContentHeaderBody;
031
032 /**
033 * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
034 * the content body/ies.
035 *
036 * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
037 * thread in order to minimise the amount of work done in the MINA dispatcher thread.
038 */
039 public class UnprocessedMessage_0_8 extends UnprocessedMessage
040 {
041 private long _bytesReceived = 0;
042
043
044 private AMQShortString _exchange;
045 private AMQShortString _routingKey;
046 private final long _deliveryId;
047 protected boolean _redelivered;
048
049 private BasicDeliverBody _deliverBody;
050 private ContentHeaderBody _contentHeader;
051
052 /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
053 private List<ContentBody> _bodies;
054
055 public UnprocessedMessage_0_8(long deliveryId, int consumerTag, AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
056 {
057 super(consumerTag);
058 _exchange = exchange;
059 _routingKey = routingKey;
060
061 _redelivered = redelivered;
062 _deliveryId = deliveryId;
063 }
064
065
066 public AMQShortString getExchange()
067 {
068 return _exchange;
069 }
070
071 public AMQShortString getRoutingKey()
072 {
073 return _routingKey;
074 }
075
076 public long getDeliveryTag()
077 {
078 return _deliveryId;
079 }
080
081 public boolean isRedelivered()
082 {
083 return _redelivered;
084 }
085
086
087 public void receiveBody(ContentBody body)
088 {
089
090 if (body.payload != null)
091 {
092 final long payloadSize = body.payload.remaining();
093
094 if (_bodies == null)
095 {
096 if (payloadSize == getContentHeader().bodySize)
097 {
098 _bodies = Collections.singletonList(body);
099 }
100 else
101 {
102 _bodies = new ArrayList<ContentBody>();
103 _bodies.add(body);
104 }
105
106 }
107 else
108 {
109 _bodies.add(body);
110 }
111 _bytesReceived += payloadSize;
112 }
113 }
114
115 public void setMethodBody(BasicDeliverBody deliverBody)
116 {
117 _deliverBody = deliverBody;
118 }
119
120 public void setContentHeader(ContentHeaderBody contentHeader)
121 {
122 this._contentHeader = contentHeader;
123 }
124
125 public boolean isAllBodyDataReceived()
126 {
127 return _bytesReceived == getContentHeader().bodySize;
128 }
129
130 public BasicDeliverBody getDeliverBody()
131 {
132 return _deliverBody;
133 }
134
135 public ContentHeaderBody getContentHeader()
136 {
137 return _contentHeader;
138 }
139
140 public List<ContentBody> getBodies()
141 {
142 return _bodies;
143 }
144
145 public String toString()
146 {
147 StringBuilder buf = new StringBuilder();
148
149 if (_contentHeader != null)
150 {
151 buf.append("ContentHeader " + _contentHeader);
152 }
153 if(_deliverBody != null)
154 {
155 buf.append("Delivery tag " + _deliverBody.getDeliveryTag());
156 buf.append("Consumer tag " + _deliverBody.getConsumerTag());
157 buf.append("Deliver Body " + _deliverBody);
158 }
159
160 return buf.toString();
161 }
162
163 }
|