DerbyMessageStore.java
0001 /*
0002 *
0003 * Licensed to the Apache Software Foundation (ASF) under one
0004 * or more contributor license agreements.  See the NOTICE file
0005 * distributed with this work for additional information
0006 * regarding copyright ownership.  The ASF licenses this file
0007 * to you under the Apache License, Version 2.0 (the
0008 * "License"); you may not use this file except in compliance
0009 * with the License.  You may obtain a copy of the License at
0010 *
0011 *   http://www.apache.org/licenses/LICENSE-2.0
0012 *
0013 * Unless required by applicable law or agreed to in writing,
0014 * software distributed under the License is distributed on an
0015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0016 * KIND, either express or implied.  See the License for the
0017 * specific language governing permissions and limitations
0018 * under the License.
0019 *
0020 */
0021 package org.apache.qpid.server.store;
0022 
0023 import org.apache.qpid.server.virtualhost.VirtualHost;
0024 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
0025 import org.apache.qpid.server.exchange.Exchange;
0026 import org.apache.qpid.server.queue.AMQQueue;
0027 import org.apache.qpid.server.queue.AMQQueueFactory;
0028 import org.apache.qpid.server.queue.MessageMetaData;
0029 import org.apache.qpid.server.queue.QueueRegistry;
0030 
0031 import org.apache.qpid.server.queue.MessageFactory;
0032 import org.apache.qpid.server.queue.AMQMessage;
0033 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
0034 import org.apache.qpid.server.txn.TransactionalContext;
0035 import org.apache.qpid.server.txn.NonTransactionalContext;
0036 import org.apache.qpid.server.transactionlog.TransactionLog;
0037 import org.apache.qpid.server.routing.RoutingTable;
0038 import org.apache.qpid.AMQException;
0039 import org.apache.qpid.framing.AMQShortString;
0040 import org.apache.qpid.framing.FieldTable;
0041 import org.apache.qpid.framing.ContentHeaderBody;
0042 import org.apache.qpid.framing.abstraction.ContentChunk;
0043 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
0044 import org.apache.commons.configuration.Configuration;
0045 import org.apache.log4j.Logger;
0046 import org.apache.mina.common.ByteBuffer;
0047 
0048 import java.io.File;
0049 import java.io.ByteArrayInputStream;
0050 import java.sql.DriverManager;
0051 import java.sql.Driver;
0052 import java.sql.Connection;
0053 import java.sql.SQLException;
0054 import java.sql.Statement;
0055 import java.sql.PreparedStatement;
0056 import java.sql.ResultSet;
0057 import java.sql.Blob;
0058 import java.sql.Types;
0059 import java.util.concurrent.atomic.AtomicLong;
0060 import java.util.concurrent.atomic.AtomicBoolean;
0061 import java.util.List;
0062 import java.util.ArrayList;
0063 import java.util.Map;
0064 import java.util.HashMap;
0065 import java.util.TreeMap;
0066 
0067 
0068 public class DerbyMessageStore implements TransactionLog, RoutingTable
0069 {
0070 
0071     private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
0072 
0073     private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
0074 
0075 
0076     private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
0077 
0078     private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
0079 
0080     private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE";
0081     private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
0082     private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
0083     private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
0084     private static final String MESSAGE_META_DATA_TABLE_NAME = "QPID_MESSAGE_META_DATA";
0085     private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
0086 
0087     private static final int DB_VERSION = 1;
0088 
0089 
0090 
0091     private VirtualHost _virtualHost;
0092     private static Class<Driver> DRIVER_CLASS;
0093 
0094     private final AtomicLong _messageId = new AtomicLong(1);
0095     private AtomicBoolean _closed = new AtomicBoolean(false);
0096 
0097     private String _connectionURL;
0098 
0099     MessageFactory _messageFactory;
0100 
0101     private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
0102     private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
0103     private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
0104     private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )";
0105     private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
0106     private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
0107     private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )";
0108     private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
0109     private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME;
0110     private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
0111     private static final String SELECT_FROM_BINDINGS =
0112             "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?";
0113     private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
0114     private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
0115     private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
0116     private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
0117     private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
0118     private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
0119     private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)";
0120     private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
0121     private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
0122     private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
0123     private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)";
0124     private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)";
0125     private static final String SELECT_FROM_MESSAGE_META_DATA =
0126             "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
0127     private static final String SELECT_FROM_MESSAGE_CONTENT =
0128             "SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?";
0129     private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME;
0130     private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
0131 
0132 
0133     private enum State
0134     {
0135         INITIAL,
0136         CONFIGURING,
0137         RECOVERING,
0138         STARTED,
0139         CLOSING,
0140         CLOSED
0141     }
0142 
0143     private State _state = State.INITIAL;
0144 
0145 
0146     public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration configthrows Exception
0147     {
0148         //Only initialise when loaded with the old 'store' confing ignore the new 'RoutingTable' config
0149         if (base.equals("store"))
0150         {
0151             stateTransition(State.INITIAL, State.CONFIGURING);
0152 
0153             initialiseDriver();
0154 
0155             _virtualHost = virtualHost;
0156 
0157             _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName());
0158             QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
0159 
0160             final String databasePath = config.getStoreConfiguration().getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB");
0161 
0162             File environmentPath = new File(databasePath);
0163             if (!environmentPath.exists())
0164             {
0165                 if (!environmentPath.mkdirs())
0166                 {
0167                     throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
0168                                                        "Ensure the path is correct and that the permissions are correct.");
0169                 }
0170             }
0171 
0172             createOrOpenDatabase(databasePath);
0173 
0174             // this recovers durable queues and persistent messages
0175 
0176             _messageFactory = MessageFactory.getInstance();
0177 
0178             recover();
0179 
0180             stateTransition(State.RECOVERING, State.STARTED);
0181         }
0182     }
0183 
0184     private static synchronized void initialiseDriver() throws ClassNotFoundException
0185     {
0186         if(DRIVER_CLASS == null)
0187         {
0188             DRIVER_CLASS = (Class<Driver>Class.forName(SQL_DRIVER_NAME);
0189         }
0190     }
0191 
0192     private void createOrOpenDatabase(final String environmentPaththrows SQLException
0193     {
0194         _connectionURL = "jdbc:derby:" + environmentPath + "/" + _virtualHost.getName() ";create=true";
0195 
0196         Connection conn = newConnection();
0197 
0198         createVersionTable(conn);
0199         createExchangeTable(conn);
0200         createQueueTable(conn);
0201         createBindingsTable(conn);
0202         createQueueEntryTable(conn);
0203         createMessageMetaDataTable(conn);
0204         createMessageContentTable(conn);
0205 
0206         conn.close();
0207     }
0208 
0209 
0210 
0211     private void createVersionTable(final Connection connthrows SQLException
0212     {
0213         if(!tableExists(DB_VERSION_TABLE_NAME, conn))
0214         {
0215             Statement stmt = conn.createStatement();
0216 
0217             stmt.execute(CREATE_DB_VERSION_TABLE);
0218             stmt.close();
0219 
0220             PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
0221             pstmt.setInt(1, DB_VERSION);
0222             pstmt.execute();
0223             pstmt.close();
0224         }
0225 
0226     }
0227 
0228 
0229     private void createExchangeTable(final Connection connthrows SQLException
0230     {
0231         if(!tableExists(EXCHANGE_TABLE_NAME, conn))
0232         {
0233             Statement stmt = conn.createStatement();
0234 
0235             stmt.execute(CREATE_EXCHANGE_TABLE);
0236             stmt.close();
0237         }
0238     }
0239 
0240     private void createQueueTable(final Connection connthrows SQLException
0241     {
0242         if(!tableExists(QUEUE_TABLE_NAME, conn))
0243         {
0244             Statement stmt = conn.createStatement();
0245             stmt.execute(CREATE_QUEUE_TABLE);
0246             stmt.close();
0247         }
0248     }
0249 
0250     private void createBindingsTable(final Connection connthrows SQLException
0251     {
0252         if(!tableExists(BINDINGS_TABLE_NAME, conn))
0253         {
0254             Statement stmt = conn.createStatement();
0255             stmt.execute(CREATE_BINDINGS_TABLE);
0256 
0257             stmt.close();
0258         }
0259 
0260     }
0261 
0262     private void createQueueEntryTable(final Connection connthrows SQLException
0263     {
0264         if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
0265         {
0266             Statement stmt = conn.createStatement();
0267             stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
0268 
0269             stmt.close();
0270         }
0271 
0272     }
0273 
0274     private void createMessageMetaDataTable(final Connection connthrows SQLException
0275     {
0276         if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn))
0277         {
0278             Statement stmt = conn.createStatement();
0279             stmt.execute(CREATE_MESSAGE_META_DATA_TABLE);
0280 
0281             stmt.close();
0282         }
0283 
0284     }
0285 
0286 
0287     private void createMessageContentTable(final Connection connthrows SQLException
0288     {
0289         if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
0290         {
0291             Statement stmt = conn.createStatement();
0292             stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
0293 
0294             stmt.close();
0295         }
0296 
0297     }
0298 
0299 
0300 
0301     private boolean tableExists(final String tableName, final Connection connthrows SQLException
0302     {
0303         PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
0304         stmt.setString(1, tableName);
0305         ResultSet rs = stmt.executeQuery();
0306         boolean exists = rs.next();
0307         rs.close();
0308         stmt.close();
0309         return exists;
0310     }
0311 
0312     public void recover() throws AMQException
0313     {
0314         stateTransition(State.CONFIGURING, State.RECOVERING);
0315 
0316         _logger.info("Recovering persistent state...");
0317         StoreContext context = new StoreContext();
0318 
0319         try
0320         {
0321             Map<AMQShortString, AMQQueue> queues = loadQueues();
0322 
0323             recoverExchanges();
0324 
0325             try
0326             {
0327 
0328                 beginTran(context);
0329 
0330                 deliverMessages(context, queues);
0331                 _logger.info("Persistent state recovered successfully");
0332                 commitTran(context);
0333 
0334             }
0335             finally
0336             {
0337                 if(inTran(context))
0338                 {
0339                     abortTran(context);
0340                 }
0341             }
0342         }
0343         catch (SQLException e)
0344         {
0345 
0346             throw new AMQException("Error recovering persistent state: " + e, e);
0347         }
0348 
0349     }
0350 
0351     private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException
0352     {
0353         Connection conn = newConnection();
0354 
0355 
0356         Statement stmt = conn.createStatement();
0357         ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
0358         Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>();
0359         while(rs.next())
0360         {
0361             String queueName = rs.getString(1);
0362             String owner = rs.getString(2);
0363             AMQShortString queueNameShortString = new AMQShortString(queueName);
0364             AMQQueue q =  AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null null new AMQShortString(owner), false, _virtualHost,
0365                                                              null);
0366             _virtualHost.getQueueRegistry().registerQueue(q);
0367             queueMap.put(queueNameShortString,q);
0368 
0369         }
0370         return queueMap;
0371     }
0372 
0373     private void recoverExchanges() throws AMQException, SQLException
0374     {
0375         for (Exchange exchange : loadExchanges())
0376         {
0377             recoverExchange(exchange);
0378         }
0379     }
0380 
0381 
0382     private List<Exchange> loadExchanges() throws AMQException, SQLException
0383     {
0384 
0385         List<Exchange> exchanges = new ArrayList<Exchange>();
0386         Connection conn = null;
0387         try
0388         {
0389             conn = newConnection();
0390 
0391 
0392             Statement stmt = conn.createStatement();
0393             ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
0394 
0395             Exchange exchange;
0396             while(rs.next())
0397             {
0398                 String exchangeName = rs.getString(1);
0399                 String type = rs.getString(2);
0400                 boolean autoDelete = rs.getShort(3!= 0;
0401 
0402                 exchange = _virtualHost.getExchangeFactory().createExchange(new AMQShortString(exchangeName)new AMQShortString(type), true, autoDelete, 0);
0403                 _virtualHost.getExchangeRegistry().registerExchange(exchange);
0404                 exchanges.add(exchange);
0405 
0406             }
0407             return exchanges;
0408 
0409         }
0410         finally
0411         {
0412             if(conn != null)
0413             {
0414                 conn.close();
0415             }
0416         }
0417 
0418     }
0419 
0420     private void recoverExchange(Exchange exchangethrows AMQException, SQLException
0421     {
0422         _logger.info("Recovering durable exchange " + exchange.getName() " of type " + exchange.getType() "...");
0423 
0424         QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
0425 
0426         Connection conn = null;
0427         try
0428         {
0429             conn = newConnection();
0430 
0431             PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
0432             stmt.setString(1, exchange.getName().toString());
0433 
0434             ResultSet rs = stmt.executeQuery();
0435 
0436 
0437             while(rs.next())
0438             {
0439                 String queueName = rs.getString(1);
0440                 String bindingKey = rs.getString(2);
0441                 Blob arguments = rs.getBlob(3);
0442 
0443 
0444                 AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName));
0445                 if (queue == null)
0446                 {
0447                     _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: "
0448                         + exchange.getName());
0449                 }
0450                 else
0451                 {
0452                     _logger.info("Restoring binding: (Exchange: " + exchange.getName() ", Queue: " + queueName
0453                         ", Routing Key: " + bindingKey + ", Arguments: " + arguments
0454                         ")");
0455 
0456                     FieldTable argumentsFT = null;
0457                     if(arguments != null)
0458                     {
0459                         byte[] argumentBytes = arguments.getBytes(0(intarguments.length());
0460                         ByteBuffer buf = ByteBuffer.wrap(argumentBytes);
0461                         argumentsFT = new FieldTable(buf,arguments.length());
0462                     }
0463 
0464                     queue.bind(exchange, bindingKey == null null new AMQShortString(bindingKey), argumentsFT);
0465 
0466                 }
0467             }
0468         }
0469         finally
0470         {
0471             if(conn != null)
0472             {
0473                 conn.close();
0474             }
0475         }
0476     }
0477 
0478     public void close() throws Exception
0479     {
0480         _closed.getAndSet(true);
0481     }
0482 
0483     public void removeMessage(StoreContext storeContext, Long messageIdthrows AMQException
0484     {
0485 
0486         boolean localTx = getOrCreateTransaction(storeContext);
0487 
0488         Connection conn = getConnection(storeContext);
0489         ConnectionWrapper wrapper = (ConnectionWrapperstoreContext.getPayload();
0490 
0491 
0492         if (_logger.isDebugEnabled())
0493         {
0494             _logger.debug("Message Id: " + messageId + " Removing");
0495         }
0496 
0497         // first we need to look up the header to get the chunk count
0498         MessageMetaData mmd = getMessageMetaData(storeContext, messageId);
0499         try
0500         {
0501             PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
0502             stmt.setLong(1,messageId);
0503             wrapper.setRequiresCommit();
0504             int results = stmt.executeUpdate();
0505 
0506             if (results == 0)
0507             {
0508                 if (localTx)
0509                 {
0510                     abortTran(storeContext);
0511                 }
0512 
0513                 throw new AMQException("Message metadata not found for message id " + messageId);
0514             }
0515             stmt.close();
0516 
0517             if (_logger.isDebugEnabled())
0518             {
0519                 _logger.debug("Deleted metadata for message " + messageId);
0520             }
0521 
0522             stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
0523             stmt.setLong(1,messageId);
0524             results = stmt.executeUpdate();
0525 
0526             if(results != mmd.getContentChunkCount())
0527             {
0528                 if (localTx)
0529                 {
0530                     abortTran(storeContext);
0531                 }
0532                 throw new AMQException("Unexpected number of content chunks when deleting message.  Expected " + mmd.getContentChunkCount() " but found " + results);
0533 
0534             }
0535 
0536             if (localTx)
0537             {
0538                 commitTran(storeContext);
0539             }
0540         }
0541         catch (SQLException e)
0542         {
0543             if ((conn != null&& localTx)
0544             {
0545                 abortTran(storeContext);
0546             }
0547 
0548             throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
0549         }
0550 
0551     }
0552 
0553     public void createExchange(Exchange exchangethrows AMQException
0554     {
0555         if (_state != State.RECOVERING)
0556         {
0557             try
0558             {
0559                 Connection conn = null;
0560 
0561                 try
0562                 {
0563                     conn = newConnection();
0564 
0565                     PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_EXCHANGE);
0566                     stmt.setString(1, exchange.getName().toString());
0567                     stmt.setString(2, exchange.getType().toString());
0568                     stmt.setShort(3, exchange.isAutoDelete() (short(short0);
0569                     stmt.execute();
0570                     stmt.close();
0571                     conn.commit();
0572 
0573                 }
0574                 finally
0575                 {
0576                     if(conn != null)
0577                     {
0578                         conn.close();
0579                     }
0580                 }
0581             }
0582             catch (SQLException e)
0583             {
0584                 throw new AMQException("Error writing Exchange with name " + exchange.getName() " to database: " + e, e);
0585             }
0586         }
0587 
0588     }
0589 
0590     public void removeExchange(Exchange exchangethrows AMQException
0591     {
0592         Connection conn = null;
0593 
0594         try
0595         {
0596             conn = newConnection();
0597             PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
0598             stmt.setString(1, exchange.getName().toString());
0599             int results = stmt.executeUpdate();
0600             if(results == 0)
0601             {
0602                 throw new AMQException("Exchange " + exchange.getName() " not found");
0603             }
0604             else
0605             {
0606                 conn.commit();
0607                 stmt.close();
0608             }
0609         }
0610         catch (SQLException e)
0611         {
0612             throw new AMQException("Error writing deleting with name " + exchange.getName() " from database: " + e, e);
0613         }
0614         finally
0615         {
0616             if(conn != null)
0617             {
0618                try
0619                {
0620                    conn.close();
0621                }
0622                catch (SQLException e)
0623                {
0624                    _logger.error(e);
0625                }
0626             }
0627 
0628         }
0629     }
0630 
0631     public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
0632             throws AMQException
0633     {
0634         if (_state != State.RECOVERING)
0635         {
0636             Connection conn = null;
0637 
0638 
0639             try
0640             {
0641                 conn = newConnection();
0642                 PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
0643                 stmt.setString(1, exchange.getName().toString() );
0644                 stmt.setString(2, queue.getName().toString());
0645                 stmt.setString(3, routingKey == null null : routingKey.toString());
0646                 if(args != null)
0647                 {
0648                     /* This would be the Java 6 way of setting a Blob
0649                     Blob blobArgs = conn.createBlob();
0650                     blobArgs.setBytes(0, args.getDataAsBytes());
0651                     stmt.setBlob(4, blobArgs);
0652                     */
0653                     byte[] bytes = args.getDataAsBytes();
0654                     ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
0655                     stmt.setBinaryStream(4, bis, bytes.length);
0656                 }
0657                 else
0658                 {
0659                     stmt.setNull(4, Types.BLOB);
0660                 }
0661 
0662                 stmt.executeUpdate();
0663                 conn.commit();
0664                 stmt.close();
0665             }
0666             catch (SQLException e)
0667             {
0668                 throw new AMQException("Error writing binding for AMQQueue with name " + queue.getName() " to exchange "
0669                     + exchange.getName() " to database: " + e, e);
0670             }
0671             finally
0672             {
0673                 if(conn != null)
0674                 {
0675                    try
0676                    {
0677                        conn.close();
0678                    }
0679                    catch (SQLException e)
0680                    {
0681                        _logger.error(e);
0682                    }
0683                 }
0684 
0685             }
0686 
0687         }
0688 
0689 
0690     }
0691 
0692     public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
0693             throws AMQException
0694     {
0695         Connection conn = null;
0696 
0697 
0698         try
0699         {
0700             conn = newConnection();
0701             // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
0702             PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
0703             stmt.setString(1, exchange.getName().toString() );
0704             stmt.setString(2, queue.getName().toString());
0705             stmt.setString(3, routingKey == null null : routingKey.toString());
0706 
0707 
0708             if(stmt.executeUpdate() != 1)
0709             {
0710                  throw new AMQException("Queue binding for queue with name " + queue.getName() " to exchange "
0711                 + exchange.getName() "  not found");
0712             }
0713             conn.commit();
0714             stmt.close();
0715         }
0716         catch (SQLException e)
0717         {
0718             throw new AMQException("Error removing binding for AMQQueue with name " + queue.getName() " to exchange "
0719                 + exchange.getName() " in database: " + e, e);
0720         }
0721         finally
0722         {
0723             if(conn != null)
0724             {
0725                try
0726                {
0727                    conn.close();
0728                }
0729                catch (SQLException e)
0730                {
0731                    _logger.error(e);
0732                }
0733             }
0734 
0735         }
0736 
0737 
0738     }
0739 
0740     public void createQueue(AMQQueue queuethrows AMQException
0741     {
0742         createQueue(queue, null);
0743     }
0744 
0745     public void createQueue(AMQQueue queue, FieldTable argumentsthrows AMQException
0746     {
0747         _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
0748 
0749         if (_state != State.RECOVERING)
0750         {
0751             try
0752             {
0753                 Connection conn = newConnection();
0754 
0755                 PreparedStatement stmt =
0756                         conn.prepareStatement(INSERT_INTO_QUEUE);
0757 
0758                 stmt.setString(1, queue.getName().toString());
0759                 stmt.setString(2, queue.getOwner() == null null : queue.getOwner().toString());
0760 
0761                 stmt.execute();
0762 
0763                 stmt.close();
0764 
0765                 conn.commit();
0766 
0767                 conn.close();
0768             }
0769             catch (SQLException e)
0770             {
0771                 throw new AMQException("Error writing AMQQueue with name " + queue.getName() " to database: " + e, e);
0772             }
0773         }
0774     }
0775 
0776     private Connection newConnection() throws SQLException
0777     {
0778         final Connection connection = DriverManager.getConnection(_connectionURL);
0779         return connection;
0780     }
0781 
0782     public void removeQueue(final AMQQueue queuethrows AMQException
0783     {
0784         AMQShortString name = queue.getName();
0785         _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
0786         Connection conn = null;
0787 
0788 
0789         try
0790         {
0791             conn = newConnection();
0792             PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
0793             stmt.setString(1, name.toString());
0794             int results = stmt.executeUpdate();
0795 
0796 
0797             if (results == 0)
0798             {
0799                 throw new AMQException("Queue " + name + " not found");
0800             }
0801 
0802             conn.commit();
0803             stmt.close();
0804         }
0805         catch (SQLException e)
0806         {
0807             throw new AMQException("Error writing deleting with name " + name + " from database: " + e, e);
0808         }
0809         finally
0810         {
0811             if(conn != null)
0812             {
0813                try
0814                {
0815                    conn.close();
0816                }
0817                catch (SQLException e)
0818                {
0819                    _logger.error(e);
0820                }
0821             }
0822 
0823         }
0824 
0825 
0826     }
0827 
0828     public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageIdthrows AMQException
0829     {
0830         AMQShortString name = queue.getName();
0831 
0832         boolean localTx = getOrCreateTransaction(context);
0833         Connection conn = getConnection(context);
0834         ConnectionWrapper connWrapper = (ConnectionWrappercontext.getPayload();
0835 
0836         try
0837         {
0838             PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
0839             stmt.setString(1,name.toString());
0840             stmt.setLong(2,messageId);
0841             stmt.executeUpdate();
0842             connWrapper.requiresCommit();
0843 
0844             if(localTx)
0845             {
0846                 commitTran(context);
0847             }
0848 
0849 
0850 
0851             if (_logger.isDebugEnabled())
0852             {
0853                 _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
0854             }
0855         }
0856         catch (SQLException e)
0857         {
0858             if(localTx)
0859             {
0860                 abortTran(context);
0861             }
0862             _logger.error("Failed to enqueue: " + e, e);
0863             throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name
0864                 " to database", e);
0865         }
0866 
0867     }
0868 
0869     public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageIdthrows AMQException
0870     {
0871         AMQShortString name = queue.getName();
0872 
0873         boolean localTx = getOrCreateTransaction(context);
0874         Connection conn = getConnection(context);
0875         ConnectionWrapper connWrapper = (ConnectionWrappercontext.getPayload();
0876 
0877         try
0878         {
0879             PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
0880             stmt.setString(1,name.toString());
0881             stmt.setLong(2,messageId);
0882             int results = stmt.executeUpdate();
0883 
0884             connWrapper.requiresCommit();
0885 
0886             if(results != 1)
0887             {
0888                 throw new AMQException("Unable to find message with id " + messageId + " on queue " + name);
0889             }
0890 
0891             if(localTx)
0892             {
0893                 commitTran(context);
0894             }
0895 
0896 
0897 
0898             if (_logger.isDebugEnabled())
0899             {
0900                 _logger.debug("Dequeuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
0901             }
0902         }
0903         catch (SQLException e)
0904         {
0905             if(localTx)
0906             {
0907                 abortTran(context);
0908             }
0909             _logger.error("Failed to dequeue: " + e, e);
0910             throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name
0911                 " from database", e);
0912         }
0913 
0914     }
0915 
0916     private static final class ConnectionWrapper
0917     {
0918         private final Connection _connection;
0919         private boolean _requiresCommit;
0920 
0921         public ConnectionWrapper(Connection conn)
0922         {
0923             _connection = conn;
0924         }
0925 
0926         public void setRequiresCommit()
0927         {
0928             _requiresCommit = true;
0929         }
0930 
0931         public boolean requiresCommit()
0932         {
0933             return _requiresCommit;
0934         }
0935 
0936         public Connection getConnection()
0937         {
0938             return _connection;
0939         }
0940     }
0941 
0942     public void beginTran(StoreContext contextthrows AMQException
0943     {
0944         if (context.getPayload() != null)
0945         {
0946             throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: "
0947                 + context.getPayload());
0948         }
0949         else
0950         {
0951             try
0952             {
0953                 Connection conn = newConnection();
0954 
0955 
0956                 context.setPayload(new ConnectionWrapper(conn));
0957             }
0958             catch (SQLException e)
0959             {
0960                 throw new AMQException("Error starting transaction: " + e, e);
0961             }
0962         }
0963     }
0964 
0965     public void commitTran(StoreContext contextthrows AMQException
0966     {
0967         ConnectionWrapper connWrapper = (ConnectionWrappercontext.getPayload();
0968 
0969         if (connWrapper == null)
0970         {
0971             throw new AMQException("Fatal internal error: transactional context is empty at commitTran");
0972         }
0973 
0974         try
0975         {
0976             Connection conn = connWrapper.getConnection();
0977             if(connWrapper.requiresCommit())
0978             {
0979                 conn.commit();
0980 
0981                 if (_logger.isDebugEnabled())
0982                 {
0983                     _logger.debug("commit tran completed");
0984                 }
0985 
0986             }
0987             conn.close();
0988         }
0989         catch (SQLException e)
0990         {
0991             throw new AMQException("Error commit tx: " + e, e);
0992         }
0993         finally
0994         {
0995             context.setPayload(null);
0996         }
0997     }
0998 
0999     public void abortTran(StoreContext contextthrows AMQException
1000     {
1001         ConnectionWrapper connWrapper = (ConnectionWrappercontext.getPayload();
1002 
1003         if (connWrapper == null)
1004         {
1005             throw new AMQException("Fatal internal error: transactional context is empty at abortTran");
1006         }
1007 
1008         if (_logger.isDebugEnabled())
1009         {
1010             _logger.debug("abort tran called: " + connWrapper.getConnection());
1011         }
1012 
1013         try
1014         {
1015             Connection conn = connWrapper.getConnection();
1016             if(connWrapper.requiresCommit())
1017             {
1018                 conn.rollback();
1019             }
1020 
1021             conn.close();
1022         }
1023         catch (SQLException e)
1024         {
1025             throw new AMQException("Error aborting transaction: " + e, e);
1026         }
1027         finally
1028         {
1029             context.setPayload(null);
1030         }
1031     }
1032 
1033     public boolean inTran(StoreContext context)
1034     {
1035         return context.getPayload() != null;
1036     }
1037 
1038     public Long getNewMessageId()
1039     {
1040         return _messageId.getAndIncrement();
1041     }
1042 
1043     public void storeContentBodyChunk(StoreContext context,
1044                                       Long messageId,
1045                                       int index,
1046                                       ContentChunk contentBody,
1047                                       boolean lastContentBodythrows AMQException
1048     {
1049         boolean localTx = getOrCreateTransaction(context);
1050         Connection conn = getConnection(context);
1051         ConnectionWrapper connWrapper = (ConnectionWrappercontext.getPayload();
1052 
1053         try
1054         {
1055             PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
1056             stmt.setLong(1,messageId);
1057             stmt.setInt(2, index);
1058             byte[] chunkData = new byte[contentBody.getSize()];
1059             contentBody.getData().duplicate().get(chunkData);
1060             /* this would be the Java 6 way of doing things
1061             Blob dataAsBlob = conn.createBlob();
1062             dataAsBlob.setBytes(1L, chunkData);
1063             stmt.setBlob(3, dataAsBlob);
1064             */
1065             ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
1066             stmt.setBinaryStream(3, bis, chunkData.length);
1067             stmt.executeUpdate();
1068             connWrapper.requiresCommit();
1069 
1070             if(localTx)
1071             {
1072                 commitTran(context);
1073             }
1074         }
1075         catch (SQLException e)
1076         {
1077             if(localTx)
1078             {
1079                 abortTran(context);
1080             }
1081 
1082             throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
1083         }
1084 
1085     }
1086 
1087     public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData mmd)
1088             throws AMQException
1089     {
1090 
1091         boolean localTx = getOrCreateTransaction(context);
1092         Connection conn = getConnection(context);
1093         ConnectionWrapper connWrapper = (ConnectionWrappercontext.getPayload();
1094 
1095         try
1096         {
1097 
1098             PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
1099             stmt.setLong(1,messageId);
1100             stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString());
1101             stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString());
1102             stmt.setShort(4, mmd.getMessagePublishInfo().isMandatory() (short(short0);
1103             stmt.setShort(5, mmd.getMessagePublishInfo().isImmediate() (short(short0);
1104 
1105             ContentHeaderBody headerBody = mmd.getContentHeaderBody();
1106             final int bodySize = headerBody.getSize();
1107             byte[] underlying = new byte[bodySize];
1108             ByteBuffer buf = ByteBuffer.wrap(underlying);
1109             headerBody.writePayload(buf);
1110 /*
1111             Blob dataAsBlob = conn.createBlob();
1112             dataAsBlob.setBytes(1L, underlying);
1113             stmt.setBlob(6, dataAsBlob);
1114 */
1115             ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
1116             stmt.setBinaryStream(6,bis,underlying.length);
1117 
1118             stmt.setInt(7, mmd.getContentChunkCount());
1119 
1120             stmt.executeUpdate();
1121             connWrapper.requiresCommit();
1122 
1123             if(localTx)
1124             {
1125                 commitTran(context);
1126             }
1127         }
1128         catch (SQLException e)
1129         {
1130             if(localTx)
1131             {
1132                 abortTran(context);
1133             }
1134 
1135             throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
1136         }
1137 
1138 
1139     }
1140 
1141     public MessageMetaData getMessageMetaData(StoreContext context, Long messageIdthrows AMQException
1142     {
1143         boolean localTx = getOrCreateTransaction(context);
1144         Connection conn = getConnection(context);
1145 
1146 
1147         try
1148         {
1149 
1150             PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
1151             stmt.setLong(1,messageId);
1152             ResultSet rs = stmt.executeQuery();
1153 
1154             if(rs.next())
1155             {
1156                 final AMQShortString exchange = new AMQShortString(rs.getString(1));
1157                 final AMQShortString routingKey = rs.getString(2== null null new AMQShortString(rs.getString(2));
1158                 final boolean mandatory = (rs.getShort(3!= (short)0);
1159                 final boolean immediate = (rs.getShort(4!= (short)0);
1160                 MessagePublishInfo info = new MessagePublishInfoImpl(exchange,immediate,mandatory,routingKey);
1161 
1162                 Blob dataAsBlob = rs.getBlob(5);
1163 
1164                 byte[] dataAsBytes = dataAsBlob.getBytes(1,(intdataAsBlob.length());
1165                 ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
1166 
1167                 ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, dataAsBytes.length);
1168 
1169                 if(localTx)
1170                 {
1171                     commitTran(context);
1172                 }
1173 
1174                 return new MessageMetaData(info, chb, rs.getInt(6));
1175 
1176             }
1177             else
1178             {
1179                 if(localTx)
1180                 {
1181                     abortTran(context);
1182                 }
1183                 throw new AMQException("Metadata not found for message with id " + messageId);
1184             }
1185         }
1186         catch (SQLException e)
1187         {
1188             if(localTx)
1189             {
1190                 abortTran(context);
1191             }
1192 
1193             throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
1194         }
1195 
1196 
1197     }
1198 
1199     public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int indexthrows AMQException
1200     {
1201         boolean localTx = getOrCreateTransaction(context);
1202                 Connection conn = getConnection(context);
1203 
1204 
1205                 try
1206                 {
1207 
1208                     PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
1209                     stmt.setLong(1,messageId);
1210                     stmt.setInt(2, index);
1211                     ResultSet rs = stmt.executeQuery();
1212 
1213                     if(rs.next())
1214                     {
1215                         Blob dataAsBlob = rs.getBlob(1);
1216 
1217                         final int size = (intdataAsBlob.length();
1218                         byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
1219                         final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
1220 
1221                         ContentChunk cb = new ContentChunk()
1222                         {
1223 
1224                             public int getSize()
1225                             {
1226                                 return size;
1227                             }
1228 
1229                             public ByteBuffer getData()
1230                             {
1231                                 return buf;
1232                             }
1233 
1234                             public void reduceToFit()
1235                             {
1236 
1237                             }
1238                         };
1239 
1240                         if(localTx)
1241                         {
1242                             commitTran(context);
1243                         }
1244 
1245                         return cb;
1246 
1247                     }
1248                     else
1249                     {
1250                         if(localTx)
1251                         {
1252                             abortTran(context);
1253                         }
1254                         throw new AMQException("Message not found for message with id " + messageId);
1255                     }
1256                 }
1257                 catch (SQLException e)
1258                 {
1259                     if(localTx)
1260                     {
1261                         abortTran(context);
1262                     }
1263 
1264                     throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
1265                 }
1266 
1267 
1268 
1269     }
1270 
1271     public boolean isPersistent()
1272     {
1273         return true;
1274     }
1275 
1276     private void checkNotClosed() throws MessageStoreClosedException
1277     {
1278         if (_closed.get())
1279         {
1280             throw new MessageStoreClosedException();
1281         }
1282     }
1283 
1284 
1285     private static final class ProcessAction
1286     {
1287         private final AMQQueue _queue;
1288         private final StoreContext _context;
1289         private final AMQMessage _message;
1290 
1291         public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message)
1292         {
1293             _queue = queue;
1294             _context = context;
1295             _message = message;
1296         }
1297 
1298         public void process() throws AMQException
1299         {
1300             _queue.enqueue(_context, _message);
1301 
1302         }
1303 
1304     }
1305 
1306 
1307     private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
1308         throws SQLException, AMQException
1309     {
1310         Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
1311         List<ProcessAction> actions = new ArrayList<ProcessAction>();
1312 
1313         Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
1314 
1315         final boolean inLocaltran = inTran(context);
1316         Connection conn = null;
1317         try
1318         {
1319 
1320             if(inLocaltran)
1321             {
1322                 conn = getConnection(context);
1323             }
1324             else
1325             {
1326                 conn = newConnection();
1327             }
1328 
1329             long maxId = 1;
1330 
1331             TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
1332 
1333             Statement stmt = conn.createStatement();
1334             ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
1335 
1336 
1337             while (rs.next())
1338             {
1339 
1340 
1341 
1342                 AMQShortString queueName = new AMQShortString(rs.getString(1));
1343 
1344 
1345                 AMQQueue queue = queues.get(queueName);
1346                 if (queue == null)
1347                 {
1348                     queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
1349 
1350                     _virtualHost.getQueueRegistry().registerQueue(queue);
1351                     queues.put(queueName, queue);
1352                 }
1353 
1354                 long messageId = rs.getLong(2);
1355                 maxId = Math.max(maxId, messageId);
1356                 AMQMessage message = msgMap.get(messageId);
1357 
1358                 if(message != null)
1359                 {
1360                     //todo must enqueue message to build reference table
1361 //                    message.incrementReference(1);
1362                 }
1363                 else
1364                 {
1365                     message = _messageFactory.createMessage(messageId, this);
1366 
1367                     _logger.error("todo must do message recovery now.");
1368                     //todo must do message recovery now.
1369 
1370                     msgMap.put(messageId,message);
1371                 }
1372 
1373                 if (_logger.isDebugEnabled())
1374                 {
1375                     _logger.debug("On recovery, delivering " + message.getMessageId() " to " + queue.getName());
1376                 }
1377 
1378                 if (_logger.isInfoEnabled())
1379                 {
1380                     Integer count = queueRecoveries.get(queueName);
1381                     if (count == null)
1382                     {
1383                         count = 0;
1384                     }
1385 
1386                     queueRecoveries.put(queueName, ++count);
1387 
1388                 }
1389 
1390                 actions.add(new ProcessAction(queue, context, message));
1391 
1392             }
1393 
1394             for(ProcessAction action : actions)
1395             {
1396                 action.process();
1397             }
1398 
1399             _messageId.set(maxId + 1);
1400         }
1401         catch (SQLException e)
1402         {
1403             _logger.error("Error: " + e, e);
1404             throw e;
1405         }
1406         finally
1407         {
1408             if (inLocaltran && conn != null)
1409             {
1410                 conn.close();
1411             }
1412         }
1413 
1414         if (_logger.isInfoEnabled())
1415         {
1416             _logger.info("Recovered message counts: " + queueRecoveries);
1417         }
1418     }
1419 
1420     private Connection getConnection(final StoreContext context)
1421     {
1422         return ((ConnectionWrapper)context.getPayload()).getConnection();
1423     }
1424 
1425     private boolean getOrCreateTransaction(StoreContext contextthrows AMQException
1426     {
1427 
1428         ConnectionWrapper tx = (ConnectionWrappercontext.getPayload();
1429         if (tx == null)
1430         {
1431             beginTran(context);
1432             return true;
1433         }
1434 
1435         return false;
1436     }
1437 
1438     private synchronized void stateTransition(State requiredState, State newStatethrows AMQException
1439     {
1440         if (_state != requiredState)
1441         {
1442             throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
1443                 "; currently in state: " + _state);
1444         }
1445 
1446         _state = newState;
1447     }
1448 }