MessageFactory.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.queue;
022 
023 import org.apache.qpid.server.transactionlog.TransactionLog;
024 
025 import java.util.concurrent.atomic.AtomicLong;
026 
027 public class MessageFactory
028 {
029     private AtomicLong _messageId;
030     private static MessageFactory INSTANCE;
031 
032     private enum State
033     {
034         RECOVER,
035         OPEN
036     }
037 
038     private State _state = State.RECOVER;
039 
040     private MessageFactory()
041     {
042         _messageId = new AtomicLong(0L);
043     }
044 
045     public void recoveryComplete()
046     {
047         _state = State.OPEN;
048     }
049 
050     /**
051      * Only to be used by tests as this will cause violate the principal that message IDs should not be reused.
052      */
053     public void reset()
054     {
055         _state = State.RECOVER;
056         _messageId = new AtomicLong(0L);
057     }
058     
059     /**
060      * Normal message creation path
061      @param transactionLog
062      @param persistent
063      @return
064      */
065     public AMQMessage createMessage(TransactionLog transactionLog, boolean persistent)
066     {
067         if (_state != State.OPEN)
068         {
069             _state = State.OPEN;
070         }
071 
072         return createNextMessage(_messageId.incrementAndGet(), transactionLog, persistent);
073     }
074 
075     /**
076      * Used for message recovery only and so only creates persistent messages.
077      @param messageId the id that this message must have
078      @param transactionLog
079      @return
080      */
081     public AMQMessage createMessage(Long messageId, TransactionLog transactionLog)
082     {
083         if (_state != State.RECOVER)
084         {
085             throw new RuntimeException("Unable to create message by ID when not recovering");
086         }
087 
088         long currentID = _messageId.get();
089         if (messageId <= currentID)
090         {
091             throw new RuntimeException("Message IDs can only increase current id is:"
092                                        + currentID + ". Requested:" + messageId);
093         }
094         else
095         {
096             _messageId.set(messageId);
097         }
098 
099         return createNextMessage(messageId, transactionLog, true);
100     }
101 
102     private AMQMessage createNextMessage(Long messageId, TransactionLog transactionLog, boolean persistent)
103     {
104         if (persistent)
105         {
106             return new PersistentAMQMessage(messageId, transactionLog);
107         }
108         else
109         {
110             return new TransientAMQMessage(messageId);
111         }
112     }
113 
114     public static MessageFactory getInstance()
115     {
116         if (INSTANCE == null)
117         {
118             INSTANCE = new MessageFactory();
119         }
120 
121         return INSTANCE;
122     }
123 }