MemoryMessageStore.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.store;
022 
023 import org.apache.commons.configuration.Configuration;
024 import org.apache.log4j.Logger;
025 import org.apache.qpid.AMQException;
026 import org.apache.qpid.framing.AMQShortString;
027 import org.apache.qpid.framing.FieldTable;
028 import org.apache.qpid.framing.abstraction.ContentChunk;
029 import org.apache.qpid.server.queue.MessageMetaData;
030 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
031 import org.apache.qpid.server.exchange.Exchange;
032 import org.apache.qpid.server.queue.AMQQueue;
033 import org.apache.qpid.server.queue.MessageMetaData;
034 import org.apache.qpid.server.routing.RoutingTable;
035 import org.apache.qpid.server.transactionlog.TransactionLog;
036 import org.apache.qpid.server.virtualhost.VirtualHost;
037 
038 import java.util.ArrayList;
039 import java.util.Collections;
040 import java.util.LinkedList;
041 import java.util.List;
042 import java.util.Map;
043 import java.util.HashMap;
044 import java.util.concurrent.ConcurrentHashMap;
045 import java.util.concurrent.ConcurrentMap;
046 import java.util.concurrent.atomic.AtomicBoolean;
047 import java.util.concurrent.atomic.AtomicLong;
048 
049 /**
050  * A simple message store that stores the messages in a threadsafe structure in memory.
051  *
052  * NOTE: Now that we have removed the MessageStore interface and are using a TransactionLog
053  *
054  * This class really should have no storage unless we want to do inMemory Recovery.
055  */
056 public class MemoryMessageStore implements TransactionLog, RoutingTable
057 {
058     private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
059 
060     private static final int DEFAULT_HASHTABLE_CAPACITY = 50000;
061 
062     private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
063 
064     protected ConcurrentMap<Long, MessageMetaData> _metaDataMap;
065 
066     protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
067 
068     private final AtomicLong _messageId = new AtomicLong(1);
069     private AtomicBoolean _closed = new AtomicBoolean(false);
070     protected final Map<Long, List<AMQQueue>> _messageEnqueueMap = new HashMap<Long, List<AMQQueue>>();
071 
072     public void configure()
073     {
074         _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
075         _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
076         _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY);
077     }
078 
079     public void configure(String base, VirtualHostConfiguration config)
080     {
081         //Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable'
082         if (base.equals("store"))
083         {
084             int hashtableCapacity = config.getStoreConfiguration().getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
085             _log.info("Using capacity " + hashtableCapacity + " for hash tables");
086             _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
087             _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
088         }
089     }
090 
091     public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration configthrows Exception
092     {
093         configure(base, config);
094     }
095 
096     public void close() throws Exception
097     {
098         _closed.getAndSet(true);
099         if (_metaDataMap != null)
100         {
101             _metaDataMap.clear();
102             _metaDataMap = null;
103         }
104         if (_contentBodyMap != null)
105         {
106             _contentBodyMap.clear();
107             _contentBodyMap = null;
108         }
109     }
110 
111     private void removeMessage(StoreContext context, Long messageIdthrows AMQException
112     {
113         checkNotClosed();
114         if (_log.isDebugEnabled())
115         {
116             _log.debug("Removing message with id " + messageId);
117         }
118         _metaDataMap.remove(messageId);
119         _contentBodyMap.remove(messageId);
120         _messageEnqueueMap.remove(messageId);
121     }
122 
123     public void createExchange(Exchange exchangethrows AMQException
124     {
125 
126     }
127 
128     public void removeExchange(Exchange exchangethrows AMQException
129     {
130 
131     }
132 
133     public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable argsthrows AMQException
134     {
135 
136     }
137 
138     public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable argsthrows AMQException
139     {
140 
141     }
142 
143     public void createQueue(AMQQueue queuethrows AMQException
144     {
145         // Not requred to do anything
146     }
147 
148     public void createQueue(AMQQueue queue, FieldTable argumentsthrows AMQException
149     {
150         // Not required to do anything
151     }
152 
153     public void removeQueue(final AMQQueue queuethrows AMQException
154     {
155         // Not required to do anything
156     }
157 
158     public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageIdthrows AMQException
159     {
160         synchronized (_messageEnqueueMap)
161         {
162             List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
163             if (queues == null)
164             {
165                 queues = new LinkedList<AMQQueue>();
166                 _messageEnqueueMap.put(messageId, queues);
167             }
168 
169             queues.add(queue);
170         }
171     }
172 
173     public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageIdthrows AMQException
174     {
175         synchronized (_messageEnqueueMap)
176         {
177             List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
178             if (queues == null || !queues.contains(queue))
179             {
180                 throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName()
181                                            " but it is not enqueued on that queue.");
182             }
183             else
184             {
185                 queues.remove(queue);
186                 if (queues.isEmpty())
187                 {
188                     removeMessage(context,messageId);
189                 }
190             }
191         }
192 
193     }
194 
195     public void beginTran(StoreContext contextthrows AMQException
196     {
197         // Not required to do anything
198     }
199 
200     public void commitTran(StoreContext contextthrows AMQException
201     {
202         // Not required to do anything
203     }
204 
205     public void abortTran(StoreContext contextthrows AMQException
206     {
207         // Not required to do anything
208     }
209 
210     public boolean inTran(StoreContext context)
211     {
212         return false;
213     }
214 
215     public List<AMQQueue> createQueues() throws AMQException
216     {
217         return null;
218     }
219 
220     public Long getNewMessageId()
221     {
222         return _messageId.getAndIncrement();
223     }
224 
225     public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody)
226             throws AMQException
227     {
228         checkNotClosed();
229         List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
230 
231         if (bodyList == null && lastContentBody)
232         {
233             _contentBodyMap.put(messageId, Collections.singletonList(contentBody));
234         }
235         else
236         {
237             if (bodyList == null)
238             {
239                 bodyList = new ArrayList<ContentChunk>();
240                 _contentBodyMap.put(messageId, bodyList);
241             }
242 
243             bodyList.add(index, contentBody);
244         }
245     }
246 
247     public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
248             throws AMQException
249     {
250         checkNotClosed();
251         _metaDataMap.put(messageId, messageMetaData);
252     }
253 
254     public MessageMetaData getMessageMetaData(StoreContext context, Long messageIdthrows AMQException
255     {
256         checkNotClosed();
257         return _metaDataMap.get(messageId);
258     }
259 
260     public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int indexthrows AMQException
261     {
262         checkNotClosed();
263         List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
264         return bodyList.get(index);
265     }
266 
267     public boolean isPersistent()
268     {
269         return false;
270     }
271 
272     private void checkNotClosed() throws MessageStoreClosedException
273     {
274         if (_closed.get())
275         {
276             throw new MessageStoreClosedException();
277         }
278     }
279 }