001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.store;
022
023 import org.apache.commons.configuration.Configuration;
024 import org.apache.log4j.Logger;
025 import org.apache.qpid.AMQException;
026 import org.apache.qpid.framing.AMQShortString;
027 import org.apache.qpid.framing.FieldTable;
028 import org.apache.qpid.framing.abstraction.ContentChunk;
029 import org.apache.qpid.server.queue.MessageMetaData;
030 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
031 import org.apache.qpid.server.exchange.Exchange;
032 import org.apache.qpid.server.queue.AMQQueue;
033 import org.apache.qpid.server.queue.MessageMetaData;
034 import org.apache.qpid.server.routing.RoutingTable;
035 import org.apache.qpid.server.transactionlog.TransactionLog;
036 import org.apache.qpid.server.virtualhost.VirtualHost;
037
038 import java.util.ArrayList;
039 import java.util.Collections;
040 import java.util.LinkedList;
041 import java.util.List;
042 import java.util.Map;
043 import java.util.HashMap;
044 import java.util.concurrent.ConcurrentHashMap;
045 import java.util.concurrent.ConcurrentMap;
046 import java.util.concurrent.atomic.AtomicBoolean;
047 import java.util.concurrent.atomic.AtomicLong;
048
049 /**
050 * A simple message store that stores the messages in a threadsafe structure in memory.
051 *
052 * NOTE: Now that we have removed the MessageStore interface and are using a TransactionLog
053 *
054 * This class really should have no storage unless we want to do inMemory Recovery.
055 */
056 public class MemoryMessageStore implements TransactionLog, RoutingTable
057 {
058 private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
059
060 private static final int DEFAULT_HASHTABLE_CAPACITY = 50000;
061
062 private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
063
064 protected ConcurrentMap<Long, MessageMetaData> _metaDataMap;
065
066 protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
067
068 private final AtomicLong _messageId = new AtomicLong(1);
069 private AtomicBoolean _closed = new AtomicBoolean(false);
070 protected final Map<Long, List<AMQQueue>> _messageEnqueueMap = new HashMap<Long, List<AMQQueue>>();
071
072 public void configure()
073 {
074 _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
075 _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
076 _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY);
077 }
078
079 public void configure(String base, VirtualHostConfiguration config)
080 {
081 //Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable'
082 if (base.equals("store"))
083 {
084 int hashtableCapacity = config.getStoreConfiguration().getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
085 _log.info("Using capacity " + hashtableCapacity + " for hash tables");
086 _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
087 _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
088 }
089 }
090
091 public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
092 {
093 configure(base, config);
094 }
095
096 public void close() throws Exception
097 {
098 _closed.getAndSet(true);
099 if (_metaDataMap != null)
100 {
101 _metaDataMap.clear();
102 _metaDataMap = null;
103 }
104 if (_contentBodyMap != null)
105 {
106 _contentBodyMap.clear();
107 _contentBodyMap = null;
108 }
109 }
110
111 private void removeMessage(StoreContext context, Long messageId) throws AMQException
112 {
113 checkNotClosed();
114 if (_log.isDebugEnabled())
115 {
116 _log.debug("Removing message with id " + messageId);
117 }
118 _metaDataMap.remove(messageId);
119 _contentBodyMap.remove(messageId);
120 _messageEnqueueMap.remove(messageId);
121 }
122
123 public void createExchange(Exchange exchange) throws AMQException
124 {
125
126 }
127
128 public void removeExchange(Exchange exchange) throws AMQException
129 {
130
131 }
132
133 public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
134 {
135
136 }
137
138 public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
139 {
140
141 }
142
143 public void createQueue(AMQQueue queue) throws AMQException
144 {
145 // Not requred to do anything
146 }
147
148 public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
149 {
150 // Not required to do anything
151 }
152
153 public void removeQueue(final AMQQueue queue) throws AMQException
154 {
155 // Not required to do anything
156 }
157
158 public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
159 {
160 synchronized (_messageEnqueueMap)
161 {
162 List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
163 if (queues == null)
164 {
165 queues = new LinkedList<AMQQueue>();
166 _messageEnqueueMap.put(messageId, queues);
167 }
168
169 queues.add(queue);
170 }
171 }
172
173 public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
174 {
175 synchronized (_messageEnqueueMap)
176 {
177 List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
178 if (queues == null || !queues.contains(queue))
179 {
180 throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName()
181 + " but it is not enqueued on that queue.");
182 }
183 else
184 {
185 queues.remove(queue);
186 if (queues.isEmpty())
187 {
188 removeMessage(context,messageId);
189 }
190 }
191 }
192
193 }
194
195 public void beginTran(StoreContext context) throws AMQException
196 {
197 // Not required to do anything
198 }
199
200 public void commitTran(StoreContext context) throws AMQException
201 {
202 // Not required to do anything
203 }
204
205 public void abortTran(StoreContext context) throws AMQException
206 {
207 // Not required to do anything
208 }
209
210 public boolean inTran(StoreContext context)
211 {
212 return false;
213 }
214
215 public List<AMQQueue> createQueues() throws AMQException
216 {
217 return null;
218 }
219
220 public Long getNewMessageId()
221 {
222 return _messageId.getAndIncrement();
223 }
224
225 public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody)
226 throws AMQException
227 {
228 checkNotClosed();
229 List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
230
231 if (bodyList == null && lastContentBody)
232 {
233 _contentBodyMap.put(messageId, Collections.singletonList(contentBody));
234 }
235 else
236 {
237 if (bodyList == null)
238 {
239 bodyList = new ArrayList<ContentChunk>();
240 _contentBodyMap.put(messageId, bodyList);
241 }
242
243 bodyList.add(index, contentBody);
244 }
245 }
246
247 public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
248 throws AMQException
249 {
250 checkNotClosed();
251 _metaDataMap.put(messageId, messageMetaData);
252 }
253
254 public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
255 {
256 checkNotClosed();
257 return _metaDataMap.get(messageId);
258 }
259
260 public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
261 {
262 checkNotClosed();
263 List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
264 return bodyList.get(index);
265 }
266
267 public boolean isPersistent()
268 {
269 return false;
270 }
271
272 private void checkNotClosed() throws MessageStoreClosedException
273 {
274 if (_closed.get())
275 {
276 throw new MessageStoreClosedException();
277 }
278 }
279 }
|