001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.txn;
022
023 import java.util.ArrayList;
024 import java.util.List;
025
026 import org.apache.log4j.Logger;
027 import org.apache.qpid.AMQException;
028 import org.apache.qpid.server.store.StoreContext;
029
030 /** Holds a list of TxnOp instance representing transactional operations. */
031 public class TxnBuffer
032 {
033 private final List<TxnOp> _ops = new ArrayList<TxnOp>();
034 private static final Logger _log = Logger.getLogger(TxnBuffer.class);
035
036 public TxnBuffer()
037 {
038 }
039
040 public void commit(StoreContext context) throws AMQException
041 {
042 if (_log.isDebugEnabled())
043 {
044 _log.debug("Committing " + _ops.size() + " ops to commit.:" + _ops);
045 }
046
047 if (prepare(context))
048 {
049 for (TxnOp op : _ops)
050 {
051 op.commit(context);
052 }
053 }
054 _ops.clear();
055 }
056
057 private boolean prepare(StoreContext context) throws AMQException
058 {
059 for (int i = 0; i < _ops.size(); i++)
060 {
061 TxnOp op = _ops.get(i);
062 try
063 {
064 op.prepare(context);
065 }
066 catch (AMQException e)
067 {
068 undoPrepare(i);
069 throw e;
070 }
071 catch (RuntimeException e)
072 {
073 undoPrepare(i);
074 throw e;
075 }
076 }
077 return true;
078 }
079
080 private void undoPrepare(int lastPrepared)
081 {
082 //compensate previously prepared ops
083 for (int j = 0; j < lastPrepared; j++)
084 {
085 _ops.get(j).undoPrepare();
086 }
087 }
088
089
090
091 public void rollback(StoreContext context) throws AMQException
092 {
093 for (TxnOp op : _ops)
094 {
095 op.rollback(context);
096 }
097 _ops.clear();
098 }
099
100 public void enlist(TxnOp op)
101 {
102 _ops.add(op);
103 }
104
105 public void cancel(TxnOp op)
106 {
107 _ops.remove(op);
108 }
109 }
|