From 447b41d936c7c50fae0195c8a9f58f56340a3482 Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Tue, 27 Mar 2018 17:05:06 +0100 Subject: [PATCH] MySQL notary service: improve database failover and exception handling. Transactions are now retried if a communication failure occurs during execute() or commit(). Note that this may result in duplicate entries in the notary request log, and false state commit conflicts (the notary client would still receive a successful response). We need to tidy this up in the future. --- .../transactions/MySQLUniquenessProvider.kt | 91 +++++++++++-------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt index 1f4ec80682..7473b89778 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt @@ -12,6 +12,7 @@ package net.corda.node.services.transactions import com.codahale.metrics.MetricRegistry import com.google.common.base.Stopwatch +import com.mysql.cj.jdbc.exceptions.CommunicationsException import com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException import com.zaxxer.hikari.HikariConfig import com.zaxxer.hikari.HikariDataSource @@ -24,13 +25,13 @@ import net.corda.core.flows.NotaryInternalException import net.corda.core.flows.StateConsumptionDetails import net.corda.core.identity.Party import net.corda.core.node.services.UniquenessProvider +import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.utilities.loggerFor import net.corda.node.services.config.MySQLConfiguration -import java.sql.BatchUpdateException -import java.sql.Connection -import java.sql.SQLTransientConnectionException +import net.corda.nodeapi.internal.serialization.CordaSerializationEncoding.SNAPPY +import java.sql.* import java.util.concurrent.TimeUnit /** @@ -43,7 +44,6 @@ class MySQLUniquenessProvider( metrics: MetricRegistry, configuration: MySQLConfiguration ) : UniquenessProvider, SingletonSerializeAsToken() { - companion object { private val log = loggerFor() @@ -91,18 +91,24 @@ class MySQLUniquenessProvider( private val connection: Connection get() = getConnection() - private fun getConnection(nRetries: Int = 0): Connection = - try { - dataSource.connection - } catch (e: SQLTransientConnectionException) { - if (nRetries == connectionRetries) { - log.warn("Couldn't obtain connection with {} retries, giving up, {}", nRetries, e) - throw e - } - log.warn("Connection exception, retrying", nRetries+1) - connectionExceptionCounter.inc() - getConnection(nRetries + 1) + /** + * Attempts to obtain a database connection with number of retries specified in [connectionRetries]. + * No backoff strategy is used since it's expected that the service can immediately fail over to a different + * database server in the replicated MySQL cluster. + */ + private fun getConnection(nRetries: Int = 0): Connection { + return try { + dataSource.connection + } catch (e: SQLTransientConnectionException) { + if (nRetries == connectionRetries) { + log.warn("Couldn't obtain connection with {} retries, giving up, {}", nRetries, e) + throw e } + log.warn("Error trying to obtain a database connection, retrying", nRetries + 1) + connectionExceptionCounter.inc() + getConnection(nRetries + 1) + } + } fun createTable() { log.debug("Attempting to create DB table if it does not yet exist: $createCommittedStateTable") @@ -120,12 +126,14 @@ class MySQLUniquenessProvider( override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { val s = Stopwatch.createStarted() try { - retryTransaction(CommitAll(states, txId, callerIdentity, requestSignature)) + runWithRetry(CommitAll(states, txId, callerIdentity, requestSignature)) nrInputStates.update(states.size) } catch (e: BatchUpdateException) { log.info("Unable to commit input states, finding conflicts, txId: $txId", e) + // TODO: do not increment the conflict counter if the conflict was caused by the service retrying + // db transaction. E.g. when failing over to a different MySQL server. conflictCounter.inc() - retryTransaction(FindConflicts(txId, states)) + runWithRetry(FindConflicts(txId, states)) } finally { val dt = s.stop().elapsed(TimeUnit.MILLISECONDS) commitTimer.update(dt, TimeUnit.MILLISECONDS) @@ -133,44 +141,54 @@ class MySQLUniquenessProvider( } } - private fun retryTransaction(tx: RetryableTransaction) { + private fun runWithRetry(action: DBTransaction) { connection.use { - while (true) { + loop@ while (true) { try { - tx.run(it) + action.run(it) + break } catch (e: Exception) { - if (e is MySQLTransactionRollbackException) { - log.warn("Rollback exception occurred, retrying", e) - rollbackCounter.inc() - continue - } else { - log.warn("Attempting to rollback", e) - it.rollback() - throw e + when (e) { + is MySQLTransactionRollbackException -> { + log.warn("Rollback exception occurred, retrying", e) + rollbackCounter.inc() + continue@loop // Retrying using the same connection + } + is SQLRecoverableException, is CommunicationsException, // Occurs when an issue is encountered during execute() (e.g. connection lost) + is SQLNonTransientConnectionException -> { // Occurs when an issue is encountered during commit() (e.g. connection lost) + log.warn("Lost connection to the database, retrying", e) + runWithRetry(action) // Retrying using a new connection + // TODO: don't reinsert notarisation request on retry + break@loop + } + else -> { + log.warn("Unexpected error occurred, attempting to rollback", e) + it.rollback() + throw e + } } } - break } - it.commit() } } - interface RetryableTransaction { + interface DBTransaction { fun run(conn: Connection) } - private class CommitAll(val states: List, val txId: SecureHash, val callerIdentity: Party, val requestSignature: NotarisationRequestSignature) : RetryableTransaction { + private class CommitAll(val states: List, val txId: SecureHash, val callerIdentity: Party, val requestSignature: NotarisationRequestSignature) : DBTransaction { override fun run(conn: Connection) { conn.prepareStatement(insertRequestStatement).apply { setBytes(1, txId.bytes) setString(2, callerIdentity.name.toString()) - setBytes(3, requestSignature.serialize().bytes) + setBytes(3, requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(SNAPPY)).bytes) execute() close() } - // We commit here, since we want to make sure it doesn't get rolled back in case of a conflict - // when committing inputs + // We commit here, since we want to make sure the notarisation request insertion + // doesn't get rolled back in case of a conflict when committing inputs conn.commit() + conn.setSavepoint() conn.prepareStatement(insertStateStatement).apply { states.forEach { stateRef -> // StateRef @@ -184,10 +202,11 @@ class MySQLUniquenessProvider( executeBatch() close() } + conn.commit() } } - private class FindConflicts(val txId: SecureHash, val states: List) : RetryableTransaction { + private class FindConflicts(val txId: SecureHash, val states: List) : DBTransaction { override fun run(conn: Connection) { val conflicts = mutableMapOf() states.forEach {