0001 /*
0002 *
0003 * Licensed to the Apache Software Foundation (ASF) under one
0004 * or more contributor license agreements. See the NOTICE file
0005 * distributed with this work for additional information
0006 * regarding copyright ownership. The ASF licenses this file
0007 * to you under the Apache License, Version 2.0 (the
0008 * "License"); you may not use this file except in compliance
0009 * with the License. You may obtain a copy of the License at
0010 *
0011 * http://www.apache.org/licenses/LICENSE-2.0
0012 *
0013 * Unless required by applicable law or agreed to in writing,
0014 * software distributed under the License is distributed on an
0015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0016 * KIND, either express or implied. See the License for the
0017 * specific language governing permissions and limitations
0018 * under the License.
0019 *
0020 */
0021 package org.apache.qpid.server.store;
0022
0023 import org.apache.qpid.server.virtualhost.VirtualHost;
0024 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
0025 import org.apache.qpid.server.exchange.Exchange;
0026 import org.apache.qpid.server.queue.AMQQueue;
0027 import org.apache.qpid.server.queue.AMQQueueFactory;
0028 import org.apache.qpid.server.queue.MessageMetaData;
0029 import org.apache.qpid.server.queue.QueueRegistry;
0030
0031 import org.apache.qpid.server.queue.MessageFactory;
0032 import org.apache.qpid.server.queue.AMQMessage;
0033 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
0034 import org.apache.qpid.server.txn.TransactionalContext;
0035 import org.apache.qpid.server.txn.NonTransactionalContext;
0036 import org.apache.qpid.server.transactionlog.TransactionLog;
0037 import org.apache.qpid.server.routing.RoutingTable;
0038 import org.apache.qpid.AMQException;
0039 import org.apache.qpid.framing.AMQShortString;
0040 import org.apache.qpid.framing.FieldTable;
0041 import org.apache.qpid.framing.ContentHeaderBody;
0042 import org.apache.qpid.framing.abstraction.ContentChunk;
0043 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
0044 import org.apache.commons.configuration.Configuration;
0045 import org.apache.log4j.Logger;
0046 import org.apache.mina.common.ByteBuffer;
0047
0048 import java.io.File;
0049 import java.io.ByteArrayInputStream;
0050 import java.sql.DriverManager;
0051 import java.sql.Driver;
0052 import java.sql.Connection;
0053 import java.sql.SQLException;
0054 import java.sql.Statement;
0055 import java.sql.PreparedStatement;
0056 import java.sql.ResultSet;
0057 import java.sql.Blob;
0058 import java.sql.Types;
0059 import java.util.concurrent.atomic.AtomicLong;
0060 import java.util.concurrent.atomic.AtomicBoolean;
0061 import java.util.List;
0062 import java.util.ArrayList;
0063 import java.util.Map;
0064 import java.util.HashMap;
0065 import java.util.TreeMap;
0066
0067
0068 public class DerbyMessageStore implements TransactionLog, RoutingTable
0069 {
0070
0071 private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
0072
0073 private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
0074
0075
0076 private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
0077
0078 private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
0079
0080 private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE";
0081 private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
0082 private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
0083 private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
0084 private static final String MESSAGE_META_DATA_TABLE_NAME = "QPID_MESSAGE_META_DATA";
0085 private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
0086
0087 private static final int DB_VERSION = 1;
0088
0089
0090
0091 private VirtualHost _virtualHost;
0092 private static Class<Driver> DRIVER_CLASS;
0093
0094 private final AtomicLong _messageId = new AtomicLong(1);
0095 private AtomicBoolean _closed = new AtomicBoolean(false);
0096
0097 private String _connectionURL;
0098
0099 MessageFactory _messageFactory;
0100
0101 private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
0102 private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
0103 private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
0104 private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )";
0105 private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
0106 private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
0107 private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )";
0108 private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
0109 private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME;
0110 private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
0111 private static final String SELECT_FROM_BINDINGS =
0112 "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?";
0113 private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
0114 private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
0115 private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
0116 private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
0117 private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
0118 private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
0119 private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)";
0120 private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
0121 private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
0122 private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
0123 private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)";
0124 private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)";
0125 private static final String SELECT_FROM_MESSAGE_META_DATA =
0126 "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
0127 private static final String SELECT_FROM_MESSAGE_CONTENT =
0128 "SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?";
0129 private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME;
0130 private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
0131
0132
0133 private enum State
0134 {
0135 INITIAL,
0136 CONFIGURING,
0137 RECOVERING,
0138 STARTED,
0139 CLOSING,
0140 CLOSED
0141 }
0142
0143 private State _state = State.INITIAL;
0144
0145
0146 public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
0147 {
0148 //Only initialise when loaded with the old 'store' confing ignore the new 'RoutingTable' config
0149 if (base.equals("store"))
0150 {
0151 stateTransition(State.INITIAL, State.CONFIGURING);
0152
0153 initialiseDriver();
0154
0155 _virtualHost = virtualHost;
0156
0157 _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName());
0158 QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
0159
0160 final String databasePath = config.getStoreConfiguration().getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB");
0161
0162 File environmentPath = new File(databasePath);
0163 if (!environmentPath.exists())
0164 {
0165 if (!environmentPath.mkdirs())
0166 {
0167 throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
0168 + "Ensure the path is correct and that the permissions are correct.");
0169 }
0170 }
0171
0172 createOrOpenDatabase(databasePath);
0173
0174 // this recovers durable queues and persistent messages
0175
0176 _messageFactory = MessageFactory.getInstance();
0177
0178 recover();
0179
0180 stateTransition(State.RECOVERING, State.STARTED);
0181 }
0182 }
0183
0184 private static synchronized void initialiseDriver() throws ClassNotFoundException
0185 {
0186 if(DRIVER_CLASS == null)
0187 {
0188 DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
0189 }
0190 }
0191
0192 private void createOrOpenDatabase(final String environmentPath) throws SQLException
0193 {
0194 _connectionURL = "jdbc:derby:" + environmentPath + "/" + _virtualHost.getName() + ";create=true";
0195
0196 Connection conn = newConnection();
0197
0198 createVersionTable(conn);
0199 createExchangeTable(conn);
0200 createQueueTable(conn);
0201 createBindingsTable(conn);
0202 createQueueEntryTable(conn);
0203 createMessageMetaDataTable(conn);
0204 createMessageContentTable(conn);
0205
0206 conn.close();
0207 }
0208
0209
0210
0211 private void createVersionTable(final Connection conn) throws SQLException
0212 {
0213 if(!tableExists(DB_VERSION_TABLE_NAME, conn))
0214 {
0215 Statement stmt = conn.createStatement();
0216
0217 stmt.execute(CREATE_DB_VERSION_TABLE);
0218 stmt.close();
0219
0220 PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
0221 pstmt.setInt(1, DB_VERSION);
0222 pstmt.execute();
0223 pstmt.close();
0224 }
0225
0226 }
0227
0228
0229 private void createExchangeTable(final Connection conn) throws SQLException
0230 {
0231 if(!tableExists(EXCHANGE_TABLE_NAME, conn))
0232 {
0233 Statement stmt = conn.createStatement();
0234
0235 stmt.execute(CREATE_EXCHANGE_TABLE);
0236 stmt.close();
0237 }
0238 }
0239
0240 private void createQueueTable(final Connection conn) throws SQLException
0241 {
0242 if(!tableExists(QUEUE_TABLE_NAME, conn))
0243 {
0244 Statement stmt = conn.createStatement();
0245 stmt.execute(CREATE_QUEUE_TABLE);
0246 stmt.close();
0247 }
0248 }
0249
0250 private void createBindingsTable(final Connection conn) throws SQLException
0251 {
0252 if(!tableExists(BINDINGS_TABLE_NAME, conn))
0253 {
0254 Statement stmt = conn.createStatement();
0255 stmt.execute(CREATE_BINDINGS_TABLE);
0256
0257 stmt.close();
0258 }
0259
0260 }
0261
0262 private void createQueueEntryTable(final Connection conn) throws SQLException
0263 {
0264 if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
0265 {
0266 Statement stmt = conn.createStatement();
0267 stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
0268
0269 stmt.close();
0270 }
0271
0272 }
0273
0274 private void createMessageMetaDataTable(final Connection conn) throws SQLException
0275 {
0276 if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn))
0277 {
0278 Statement stmt = conn.createStatement();
0279 stmt.execute(CREATE_MESSAGE_META_DATA_TABLE);
0280
0281 stmt.close();
0282 }
0283
0284 }
0285
0286
0287 private void createMessageContentTable(final Connection conn) throws SQLException
0288 {
0289 if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
0290 {
0291 Statement stmt = conn.createStatement();
0292 stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
0293
0294 stmt.close();
0295 }
0296
0297 }
0298
0299
0300
0301 private boolean tableExists(final String tableName, final Connection conn) throws SQLException
0302 {
0303 PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
0304 stmt.setString(1, tableName);
0305 ResultSet rs = stmt.executeQuery();
0306 boolean exists = rs.next();
0307 rs.close();
0308 stmt.close();
0309 return exists;
0310 }
0311
0312 public void recover() throws AMQException
0313 {
0314 stateTransition(State.CONFIGURING, State.RECOVERING);
0315
0316 _logger.info("Recovering persistent state...");
0317 StoreContext context = new StoreContext();
0318
0319 try
0320 {
0321 Map<AMQShortString, AMQQueue> queues = loadQueues();
0322
0323 recoverExchanges();
0324
0325 try
0326 {
0327
0328 beginTran(context);
0329
0330 deliverMessages(context, queues);
0331 _logger.info("Persistent state recovered successfully");
0332 commitTran(context);
0333
0334 }
0335 finally
0336 {
0337 if(inTran(context))
0338 {
0339 abortTran(context);
0340 }
0341 }
0342 }
0343 catch (SQLException e)
0344 {
0345
0346 throw new AMQException("Error recovering persistent state: " + e, e);
0347 }
0348
0349 }
0350
0351 private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException
0352 {
0353 Connection conn = newConnection();
0354
0355
0356 Statement stmt = conn.createStatement();
0357 ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
0358 Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>();
0359 while(rs.next())
0360 {
0361 String queueName = rs.getString(1);
0362 String owner = rs.getString(2);
0363 AMQShortString queueNameShortString = new AMQShortString(queueName);
0364 AMQQueue q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost,
0365 null);
0366 _virtualHost.getQueueRegistry().registerQueue(q);
0367 queueMap.put(queueNameShortString,q);
0368
0369 }
0370 return queueMap;
0371 }
0372
0373 private void recoverExchanges() throws AMQException, SQLException
0374 {
0375 for (Exchange exchange : loadExchanges())
0376 {
0377 recoverExchange(exchange);
0378 }
0379 }
0380
0381
0382 private List<Exchange> loadExchanges() throws AMQException, SQLException
0383 {
0384
0385 List<Exchange> exchanges = new ArrayList<Exchange>();
0386 Connection conn = null;
0387 try
0388 {
0389 conn = newConnection();
0390
0391
0392 Statement stmt = conn.createStatement();
0393 ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
0394
0395 Exchange exchange;
0396 while(rs.next())
0397 {
0398 String exchangeName = rs.getString(1);
0399 String type = rs.getString(2);
0400 boolean autoDelete = rs.getShort(3) != 0;
0401
0402 exchange = _virtualHost.getExchangeFactory().createExchange(new AMQShortString(exchangeName), new AMQShortString(type), true, autoDelete, 0);
0403 _virtualHost.getExchangeRegistry().registerExchange(exchange);
0404 exchanges.add(exchange);
0405
0406 }
0407 return exchanges;
0408
0409 }
0410 finally
0411 {
0412 if(conn != null)
0413 {
0414 conn.close();
0415 }
0416 }
0417
0418 }
0419
0420 private void recoverExchange(Exchange exchange) throws AMQException, SQLException
0421 {
0422 _logger.info("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "...");
0423
0424 QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
0425
0426 Connection conn = null;
0427 try
0428 {
0429 conn = newConnection();
0430
0431 PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
0432 stmt.setString(1, exchange.getName().toString());
0433
0434 ResultSet rs = stmt.executeQuery();
0435
0436
0437 while(rs.next())
0438 {
0439 String queueName = rs.getString(1);
0440 String bindingKey = rs.getString(2);
0441 Blob arguments = rs.getBlob(3);
0442
0443
0444 AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName));
0445 if (queue == null)
0446 {
0447 _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: "
0448 + exchange.getName());
0449 }
0450 else
0451 {
0452 _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName
0453 + ", Routing Key: " + bindingKey + ", Arguments: " + arguments
0454 + ")");
0455
0456 FieldTable argumentsFT = null;
0457 if(arguments != null)
0458 {
0459 byte[] argumentBytes = arguments.getBytes(0, (int) arguments.length());
0460 ByteBuffer buf = ByteBuffer.wrap(argumentBytes);
0461 argumentsFT = new FieldTable(buf,arguments.length());
0462 }
0463
0464 queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT);
0465
0466 }
0467 }
0468 }
0469 finally
0470 {
0471 if(conn != null)
0472 {
0473 conn.close();
0474 }
0475 }
0476 }
0477
0478 public void close() throws Exception
0479 {
0480 _closed.getAndSet(true);
0481 }
0482
0483 public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
0484 {
0485
0486 boolean localTx = getOrCreateTransaction(storeContext);
0487
0488 Connection conn = getConnection(storeContext);
0489 ConnectionWrapper wrapper = (ConnectionWrapper) storeContext.getPayload();
0490
0491
0492 if (_logger.isDebugEnabled())
0493 {
0494 _logger.debug("Message Id: " + messageId + " Removing");
0495 }
0496
0497 // first we need to look up the header to get the chunk count
0498 MessageMetaData mmd = getMessageMetaData(storeContext, messageId);
0499 try
0500 {
0501 PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
0502 stmt.setLong(1,messageId);
0503 wrapper.setRequiresCommit();
0504 int results = stmt.executeUpdate();
0505
0506 if (results == 0)
0507 {
0508 if (localTx)
0509 {
0510 abortTran(storeContext);
0511 }
0512
0513 throw new AMQException("Message metadata not found for message id " + messageId);
0514 }
0515 stmt.close();
0516
0517 if (_logger.isDebugEnabled())
0518 {
0519 _logger.debug("Deleted metadata for message " + messageId);
0520 }
0521
0522 stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
0523 stmt.setLong(1,messageId);
0524 results = stmt.executeUpdate();
0525
0526 if(results != mmd.getContentChunkCount())
0527 {
0528 if (localTx)
0529 {
0530 abortTran(storeContext);
0531 }
0532 throw new AMQException("Unexpected number of content chunks when deleting message. Expected " + mmd.getContentChunkCount() + " but found " + results);
0533
0534 }
0535
0536 if (localTx)
0537 {
0538 commitTran(storeContext);
0539 }
0540 }
0541 catch (SQLException e)
0542 {
0543 if ((conn != null) && localTx)
0544 {
0545 abortTran(storeContext);
0546 }
0547
0548 throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
0549 }
0550
0551 }
0552
0553 public void createExchange(Exchange exchange) throws AMQException
0554 {
0555 if (_state != State.RECOVERING)
0556 {
0557 try
0558 {
0559 Connection conn = null;
0560
0561 try
0562 {
0563 conn = newConnection();
0564
0565 PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_EXCHANGE);
0566 stmt.setString(1, exchange.getName().toString());
0567 stmt.setString(2, exchange.getType().toString());
0568 stmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0);
0569 stmt.execute();
0570 stmt.close();
0571 conn.commit();
0572
0573 }
0574 finally
0575 {
0576 if(conn != null)
0577 {
0578 conn.close();
0579 }
0580 }
0581 }
0582 catch (SQLException e)
0583 {
0584 throw new AMQException("Error writing Exchange with name " + exchange.getName() + " to database: " + e, e);
0585 }
0586 }
0587
0588 }
0589
0590 public void removeExchange(Exchange exchange) throws AMQException
0591 {
0592 Connection conn = null;
0593
0594 try
0595 {
0596 conn = newConnection();
0597 PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
0598 stmt.setString(1, exchange.getName().toString());
0599 int results = stmt.executeUpdate();
0600 if(results == 0)
0601 {
0602 throw new AMQException("Exchange " + exchange.getName() + " not found");
0603 }
0604 else
0605 {
0606 conn.commit();
0607 stmt.close();
0608 }
0609 }
0610 catch (SQLException e)
0611 {
0612 throw new AMQException("Error writing deleting with name " + exchange.getName() + " from database: " + e, e);
0613 }
0614 finally
0615 {
0616 if(conn != null)
0617 {
0618 try
0619 {
0620 conn.close();
0621 }
0622 catch (SQLException e)
0623 {
0624 _logger.error(e);
0625 }
0626 }
0627
0628 }
0629 }
0630
0631 public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
0632 throws AMQException
0633 {
0634 if (_state != State.RECOVERING)
0635 {
0636 Connection conn = null;
0637
0638
0639 try
0640 {
0641 conn = newConnection();
0642 PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
0643 stmt.setString(1, exchange.getName().toString() );
0644 stmt.setString(2, queue.getName().toString());
0645 stmt.setString(3, routingKey == null ? null : routingKey.toString());
0646 if(args != null)
0647 {
0648 /* This would be the Java 6 way of setting a Blob
0649 Blob blobArgs = conn.createBlob();
0650 blobArgs.setBytes(0, args.getDataAsBytes());
0651 stmt.setBlob(4, blobArgs);
0652 */
0653 byte[] bytes = args.getDataAsBytes();
0654 ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
0655 stmt.setBinaryStream(4, bis, bytes.length);
0656 }
0657 else
0658 {
0659 stmt.setNull(4, Types.BLOB);
0660 }
0661
0662 stmt.executeUpdate();
0663 conn.commit();
0664 stmt.close();
0665 }
0666 catch (SQLException e)
0667 {
0668 throw new AMQException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
0669 + exchange.getName() + " to database: " + e, e);
0670 }
0671 finally
0672 {
0673 if(conn != null)
0674 {
0675 try
0676 {
0677 conn.close();
0678 }
0679 catch (SQLException e)
0680 {
0681 _logger.error(e);
0682 }
0683 }
0684
0685 }
0686
0687 }
0688
0689
0690 }
0691
0692 public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
0693 throws AMQException
0694 {
0695 Connection conn = null;
0696
0697
0698 try
0699 {
0700 conn = newConnection();
0701 // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
0702 PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
0703 stmt.setString(1, exchange.getName().toString() );
0704 stmt.setString(2, queue.getName().toString());
0705 stmt.setString(3, routingKey == null ? null : routingKey.toString());
0706
0707
0708 if(stmt.executeUpdate() != 1)
0709 {
0710 throw new AMQException("Queue binding for queue with name " + queue.getName() + " to exchange "
0711 + exchange.getName() + " not found");
0712 }
0713 conn.commit();
0714 stmt.close();
0715 }
0716 catch (SQLException e)
0717 {
0718 throw new AMQException("Error removing binding for AMQQueue with name " + queue.getName() + " to exchange "
0719 + exchange.getName() + " in database: " + e, e);
0720 }
0721 finally
0722 {
0723 if(conn != null)
0724 {
0725 try
0726 {
0727 conn.close();
0728 }
0729 catch (SQLException e)
0730 {
0731 _logger.error(e);
0732 }
0733 }
0734
0735 }
0736
0737
0738 }
0739
0740 public void createQueue(AMQQueue queue) throws AMQException
0741 {
0742 createQueue(queue, null);
0743 }
0744
0745 public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
0746 {
0747 _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
0748
0749 if (_state != State.RECOVERING)
0750 {
0751 try
0752 {
0753 Connection conn = newConnection();
0754
0755 PreparedStatement stmt =
0756 conn.prepareStatement(INSERT_INTO_QUEUE);
0757
0758 stmt.setString(1, queue.getName().toString());
0759 stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
0760
0761 stmt.execute();
0762
0763 stmt.close();
0764
0765 conn.commit();
0766
0767 conn.close();
0768 }
0769 catch (SQLException e)
0770 {
0771 throw new AMQException("Error writing AMQQueue with name " + queue.getName() + " to database: " + e, e);
0772 }
0773 }
0774 }
0775
0776 private Connection newConnection() throws SQLException
0777 {
0778 final Connection connection = DriverManager.getConnection(_connectionURL);
0779 return connection;
0780 }
0781
0782 public void removeQueue(final AMQQueue queue) throws AMQException
0783 {
0784 AMQShortString name = queue.getName();
0785 _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
0786 Connection conn = null;
0787
0788
0789 try
0790 {
0791 conn = newConnection();
0792 PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
0793 stmt.setString(1, name.toString());
0794 int results = stmt.executeUpdate();
0795
0796
0797 if (results == 0)
0798 {
0799 throw new AMQException("Queue " + name + " not found");
0800 }
0801
0802 conn.commit();
0803 stmt.close();
0804 }
0805 catch (SQLException e)
0806 {
0807 throw new AMQException("Error writing deleting with name " + name + " from database: " + e, e);
0808 }
0809 finally
0810 {
0811 if(conn != null)
0812 {
0813 try
0814 {
0815 conn.close();
0816 }
0817 catch (SQLException e)
0818 {
0819 _logger.error(e);
0820 }
0821 }
0822
0823 }
0824
0825
0826 }
0827
0828 public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
0829 {
0830 AMQShortString name = queue.getName();
0831
0832 boolean localTx = getOrCreateTransaction(context);
0833 Connection conn = getConnection(context);
0834 ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
0835
0836 try
0837 {
0838 PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
0839 stmt.setString(1,name.toString());
0840 stmt.setLong(2,messageId);
0841 stmt.executeUpdate();
0842 connWrapper.requiresCommit();
0843
0844 if(localTx)
0845 {
0846 commitTran(context);
0847 }
0848
0849
0850
0851 if (_logger.isDebugEnabled())
0852 {
0853 _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
0854 }
0855 }
0856 catch (SQLException e)
0857 {
0858 if(localTx)
0859 {
0860 abortTran(context);
0861 }
0862 _logger.error("Failed to enqueue: " + e, e);
0863 throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name
0864 + " to database", e);
0865 }
0866
0867 }
0868
0869 public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
0870 {
0871 AMQShortString name = queue.getName();
0872
0873 boolean localTx = getOrCreateTransaction(context);
0874 Connection conn = getConnection(context);
0875 ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
0876
0877 try
0878 {
0879 PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
0880 stmt.setString(1,name.toString());
0881 stmt.setLong(2,messageId);
0882 int results = stmt.executeUpdate();
0883
0884 connWrapper.requiresCommit();
0885
0886 if(results != 1)
0887 {
0888 throw new AMQException("Unable to find message with id " + messageId + " on queue " + name);
0889 }
0890
0891 if(localTx)
0892 {
0893 commitTran(context);
0894 }
0895
0896
0897
0898 if (_logger.isDebugEnabled())
0899 {
0900 _logger.debug("Dequeuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
0901 }
0902 }
0903 catch (SQLException e)
0904 {
0905 if(localTx)
0906 {
0907 abortTran(context);
0908 }
0909 _logger.error("Failed to dequeue: " + e, e);
0910 throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name
0911 + " from database", e);
0912 }
0913
0914 }
0915
0916 private static final class ConnectionWrapper
0917 {
0918 private final Connection _connection;
0919 private boolean _requiresCommit;
0920
0921 public ConnectionWrapper(Connection conn)
0922 {
0923 _connection = conn;
0924 }
0925
0926 public void setRequiresCommit()
0927 {
0928 _requiresCommit = true;
0929 }
0930
0931 public boolean requiresCommit()
0932 {
0933 return _requiresCommit;
0934 }
0935
0936 public Connection getConnection()
0937 {
0938 return _connection;
0939 }
0940 }
0941
0942 public void beginTran(StoreContext context) throws AMQException
0943 {
0944 if (context.getPayload() != null)
0945 {
0946 throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: "
0947 + context.getPayload());
0948 }
0949 else
0950 {
0951 try
0952 {
0953 Connection conn = newConnection();
0954
0955
0956 context.setPayload(new ConnectionWrapper(conn));
0957 }
0958 catch (SQLException e)
0959 {
0960 throw new AMQException("Error starting transaction: " + e, e);
0961 }
0962 }
0963 }
0964
0965 public void commitTran(StoreContext context) throws AMQException
0966 {
0967 ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
0968
0969 if (connWrapper == null)
0970 {
0971 throw new AMQException("Fatal internal error: transactional context is empty at commitTran");
0972 }
0973
0974 try
0975 {
0976 Connection conn = connWrapper.getConnection();
0977 if(connWrapper.requiresCommit())
0978 {
0979 conn.commit();
0980
0981 if (_logger.isDebugEnabled())
0982 {
0983 _logger.debug("commit tran completed");
0984 }
0985
0986 }
0987 conn.close();
0988 }
0989 catch (SQLException e)
0990 {
0991 throw new AMQException("Error commit tx: " + e, e);
0992 }
0993 finally
0994 {
0995 context.setPayload(null);
0996 }
0997 }
0998
0999 public void abortTran(StoreContext context) throws AMQException
1000 {
1001 ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
1002
1003 if (connWrapper == null)
1004 {
1005 throw new AMQException("Fatal internal error: transactional context is empty at abortTran");
1006 }
1007
1008 if (_logger.isDebugEnabled())
1009 {
1010 _logger.debug("abort tran called: " + connWrapper.getConnection());
1011 }
1012
1013 try
1014 {
1015 Connection conn = connWrapper.getConnection();
1016 if(connWrapper.requiresCommit())
1017 {
1018 conn.rollback();
1019 }
1020
1021 conn.close();
1022 }
1023 catch (SQLException e)
1024 {
1025 throw new AMQException("Error aborting transaction: " + e, e);
1026 }
1027 finally
1028 {
1029 context.setPayload(null);
1030 }
1031 }
1032
1033 public boolean inTran(StoreContext context)
1034 {
1035 return context.getPayload() != null;
1036 }
1037
1038 public Long getNewMessageId()
1039 {
1040 return _messageId.getAndIncrement();
1041 }
1042
1043 public void storeContentBodyChunk(StoreContext context,
1044 Long messageId,
1045 int index,
1046 ContentChunk contentBody,
1047 boolean lastContentBody) throws AMQException
1048 {
1049 boolean localTx = getOrCreateTransaction(context);
1050 Connection conn = getConnection(context);
1051 ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
1052
1053 try
1054 {
1055 PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
1056 stmt.setLong(1,messageId);
1057 stmt.setInt(2, index);
1058 byte[] chunkData = new byte[contentBody.getSize()];
1059 contentBody.getData().duplicate().get(chunkData);
1060 /* this would be the Java 6 way of doing things
1061 Blob dataAsBlob = conn.createBlob();
1062 dataAsBlob.setBytes(1L, chunkData);
1063 stmt.setBlob(3, dataAsBlob);
1064 */
1065 ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
1066 stmt.setBinaryStream(3, bis, chunkData.length);
1067 stmt.executeUpdate();
1068 connWrapper.requiresCommit();
1069
1070 if(localTx)
1071 {
1072 commitTran(context);
1073 }
1074 }
1075 catch (SQLException e)
1076 {
1077 if(localTx)
1078 {
1079 abortTran(context);
1080 }
1081
1082 throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
1083 }
1084
1085 }
1086
1087 public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData mmd)
1088 throws AMQException
1089 {
1090
1091 boolean localTx = getOrCreateTransaction(context);
1092 Connection conn = getConnection(context);
1093 ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
1094
1095 try
1096 {
1097
1098 PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
1099 stmt.setLong(1,messageId);
1100 stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString());
1101 stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString());
1102 stmt.setShort(4, mmd.getMessagePublishInfo().isMandatory() ? (short) 1 : (short) 0);
1103 stmt.setShort(5, mmd.getMessagePublishInfo().isImmediate() ? (short) 1 : (short) 0);
1104
1105 ContentHeaderBody headerBody = mmd.getContentHeaderBody();
1106 final int bodySize = headerBody.getSize();
1107 byte[] underlying = new byte[bodySize];
1108 ByteBuffer buf = ByteBuffer.wrap(underlying);
1109 headerBody.writePayload(buf);
1110 /*
1111 Blob dataAsBlob = conn.createBlob();
1112 dataAsBlob.setBytes(1L, underlying);
1113 stmt.setBlob(6, dataAsBlob);
1114 */
1115 ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
1116 stmt.setBinaryStream(6,bis,underlying.length);
1117
1118 stmt.setInt(7, mmd.getContentChunkCount());
1119
1120 stmt.executeUpdate();
1121 connWrapper.requiresCommit();
1122
1123 if(localTx)
1124 {
1125 commitTran(context);
1126 }
1127 }
1128 catch (SQLException e)
1129 {
1130 if(localTx)
1131 {
1132 abortTran(context);
1133 }
1134
1135 throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
1136 }
1137
1138
1139 }
1140
1141 public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
1142 {
1143 boolean localTx = getOrCreateTransaction(context);
1144 Connection conn = getConnection(context);
1145
1146
1147 try
1148 {
1149
1150 PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
1151 stmt.setLong(1,messageId);
1152 ResultSet rs = stmt.executeQuery();
1153
1154 if(rs.next())
1155 {
1156 final AMQShortString exchange = new AMQShortString(rs.getString(1));
1157 final AMQShortString routingKey = rs.getString(2) == null ? null : new AMQShortString(rs.getString(2));
1158 final boolean mandatory = (rs.getShort(3) != (short)0);
1159 final boolean immediate = (rs.getShort(4) != (short)0);
1160 MessagePublishInfo info = new MessagePublishInfoImpl(exchange,immediate,mandatory,routingKey);
1161
1162 Blob dataAsBlob = rs.getBlob(5);
1163
1164 byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
1165 ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
1166
1167 ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, dataAsBytes.length);
1168
1169 if(localTx)
1170 {
1171 commitTran(context);
1172 }
1173
1174 return new MessageMetaData(info, chb, rs.getInt(6));
1175
1176 }
1177 else
1178 {
1179 if(localTx)
1180 {
1181 abortTran(context);
1182 }
1183 throw new AMQException("Metadata not found for message with id " + messageId);
1184 }
1185 }
1186 catch (SQLException e)
1187 {
1188 if(localTx)
1189 {
1190 abortTran(context);
1191 }
1192
1193 throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
1194 }
1195
1196
1197 }
1198
1199 public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
1200 {
1201 boolean localTx = getOrCreateTransaction(context);
1202 Connection conn = getConnection(context);
1203
1204
1205 try
1206 {
1207
1208 PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
1209 stmt.setLong(1,messageId);
1210 stmt.setInt(2, index);
1211 ResultSet rs = stmt.executeQuery();
1212
1213 if(rs.next())
1214 {
1215 Blob dataAsBlob = rs.getBlob(1);
1216
1217 final int size = (int) dataAsBlob.length();
1218 byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
1219 final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
1220
1221 ContentChunk cb = new ContentChunk()
1222 {
1223
1224 public int getSize()
1225 {
1226 return size;
1227 }
1228
1229 public ByteBuffer getData()
1230 {
1231 return buf;
1232 }
1233
1234 public void reduceToFit()
1235 {
1236
1237 }
1238 };
1239
1240 if(localTx)
1241 {
1242 commitTran(context);
1243 }
1244
1245 return cb;
1246
1247 }
1248 else
1249 {
1250 if(localTx)
1251 {
1252 abortTran(context);
1253 }
1254 throw new AMQException("Message not found for message with id " + messageId);
1255 }
1256 }
1257 catch (SQLException e)
1258 {
1259 if(localTx)
1260 {
1261 abortTran(context);
1262 }
1263
1264 throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
1265 }
1266
1267
1268
1269 }
1270
1271 public boolean isPersistent()
1272 {
1273 return true;
1274 }
1275
1276 private void checkNotClosed() throws MessageStoreClosedException
1277 {
1278 if (_closed.get())
1279 {
1280 throw new MessageStoreClosedException();
1281 }
1282 }
1283
1284
1285 private static final class ProcessAction
1286 {
1287 private final AMQQueue _queue;
1288 private final StoreContext _context;
1289 private final AMQMessage _message;
1290
1291 public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message)
1292 {
1293 _queue = queue;
1294 _context = context;
1295 _message = message;
1296 }
1297
1298 public void process() throws AMQException
1299 {
1300 _queue.enqueue(_context, _message);
1301
1302 }
1303
1304 }
1305
1306
1307 private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
1308 throws SQLException, AMQException
1309 {
1310 Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
1311 List<ProcessAction> actions = new ArrayList<ProcessAction>();
1312
1313 Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
1314
1315 final boolean inLocaltran = inTran(context);
1316 Connection conn = null;
1317 try
1318 {
1319
1320 if(inLocaltran)
1321 {
1322 conn = getConnection(context);
1323 }
1324 else
1325 {
1326 conn = newConnection();
1327 }
1328
1329 long maxId = 1;
1330
1331 TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
1332
1333 Statement stmt = conn.createStatement();
1334 ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
1335
1336
1337 while (rs.next())
1338 {
1339
1340
1341
1342 AMQShortString queueName = new AMQShortString(rs.getString(1));
1343
1344
1345 AMQQueue queue = queues.get(queueName);
1346 if (queue == null)
1347 {
1348 queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
1349
1350 _virtualHost.getQueueRegistry().registerQueue(queue);
1351 queues.put(queueName, queue);
1352 }
1353
1354 long messageId = rs.getLong(2);
1355 maxId = Math.max(maxId, messageId);
1356 AMQMessage message = msgMap.get(messageId);
1357
1358 if(message != null)
1359 {
1360 //todo must enqueue message to build reference table
1361 // message.incrementReference(1);
1362 }
1363 else
1364 {
1365 message = _messageFactory.createMessage(messageId, this);
1366
1367 _logger.error("todo must do message recovery now.");
1368 //todo must do message recovery now.
1369
1370 msgMap.put(messageId,message);
1371 }
1372
1373 if (_logger.isDebugEnabled())
1374 {
1375 _logger.debug("On recovery, delivering " + message.getMessageId() + " to " + queue.getName());
1376 }
1377
1378 if (_logger.isInfoEnabled())
1379 {
1380 Integer count = queueRecoveries.get(queueName);
1381 if (count == null)
1382 {
1383 count = 0;
1384 }
1385
1386 queueRecoveries.put(queueName, ++count);
1387
1388 }
1389
1390 actions.add(new ProcessAction(queue, context, message));
1391
1392 }
1393
1394 for(ProcessAction action : actions)
1395 {
1396 action.process();
1397 }
1398
1399 _messageId.set(maxId + 1);
1400 }
1401 catch (SQLException e)
1402 {
1403 _logger.error("Error: " + e, e);
1404 throw e;
1405 }
1406 finally
1407 {
1408 if (inLocaltran && conn != null)
1409 {
1410 conn.close();
1411 }
1412 }
1413
1414 if (_logger.isInfoEnabled())
1415 {
1416 _logger.info("Recovered message counts: " + queueRecoveries);
1417 }
1418 }
1419
1420 private Connection getConnection(final StoreContext context)
1421 {
1422 return ((ConnectionWrapper)context.getPayload()).getConnection();
1423 }
1424
1425 private boolean getOrCreateTransaction(StoreContext context) throws AMQException
1426 {
1427
1428 ConnectionWrapper tx = (ConnectionWrapper) context.getPayload();
1429 if (tx == null)
1430 {
1431 beginTran(context);
1432 return true;
1433 }
1434
1435 return false;
1436 }
1437
1438 private synchronized void stateTransition(State requiredState, State newState) throws AMQException
1439 {
1440 if (_state != requiredState)
1441 {
1442 throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
1443 + "; currently in state: " + _state);
1444 }
1445
1446 _state = newState;
1447 }
1448 }
|