001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.queue;
022
023 import org.apache.qpid.server.transactionlog.TransactionLog;
024
025 import java.util.concurrent.atomic.AtomicLong;
026
027 public class MessageFactory
028 {
029 private AtomicLong _messageId;
030 private static MessageFactory INSTANCE;
031
032 private enum State
033 {
034 RECOVER,
035 OPEN
036 }
037
038 private State _state = State.RECOVER;
039
040 private MessageFactory()
041 {
042 _messageId = new AtomicLong(0L);
043 }
044
045 public void recoveryComplete()
046 {
047 _state = State.OPEN;
048 }
049
050 /**
051 * Only to be used by tests as this will cause violate the principal that message IDs should not be reused.
052 */
053 public void reset()
054 {
055 _state = State.RECOVER;
056 _messageId = new AtomicLong(0L);
057 }
058
059 /**
060 * Normal message creation path
061 * @param transactionLog
062 * @param persistent
063 * @return
064 */
065 public AMQMessage createMessage(TransactionLog transactionLog, boolean persistent)
066 {
067 if (_state != State.OPEN)
068 {
069 _state = State.OPEN;
070 }
071
072 return createNextMessage(_messageId.incrementAndGet(), transactionLog, persistent);
073 }
074
075 /**
076 * Used for message recovery only and so only creates persistent messages.
077 * @param messageId the id that this message must have
078 * @param transactionLog
079 * @return
080 */
081 public AMQMessage createMessage(Long messageId, TransactionLog transactionLog)
082 {
083 if (_state != State.RECOVER)
084 {
085 throw new RuntimeException("Unable to create message by ID when not recovering");
086 }
087
088 long currentID = _messageId.get();
089 if (messageId <= currentID)
090 {
091 throw new RuntimeException("Message IDs can only increase current id is:"
092 + currentID + ". Requested:" + messageId);
093 }
094 else
095 {
096 _messageId.set(messageId);
097 }
098
099 return createNextMessage(messageId, transactionLog, true);
100 }
101
102 private AMQMessage createNextMessage(Long messageId, TransactionLog transactionLog, boolean persistent)
103 {
104 if (persistent)
105 {
106 return new PersistentAMQMessage(messageId, transactionLog);
107 }
108 else
109 {
110 return new TransientAMQMessage(messageId);
111 }
112 }
113
114 public static MessageFactory getInstance()
115 {
116 if (INSTANCE == null)
117 {
118 INSTANCE = new MessageFactory();
119 }
120
121 return INSTANCE;
122 }
123 }
|