diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt index 1f4ec80682..7473b89778 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt @@ -12,6 +12,7 @@ package net.corda.node.services.transactions import com.codahale.metrics.MetricRegistry import com.google.common.base.Stopwatch +import com.mysql.cj.jdbc.exceptions.CommunicationsException import com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException import com.zaxxer.hikari.HikariConfig import com.zaxxer.hikari.HikariDataSource @@ -24,13 +25,13 @@ import net.corda.core.flows.NotaryInternalException import net.corda.core.flows.StateConsumptionDetails import net.corda.core.identity.Party import net.corda.core.node.services.UniquenessProvider +import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.utilities.loggerFor import net.corda.node.services.config.MySQLConfiguration -import java.sql.BatchUpdateException -import java.sql.Connection -import java.sql.SQLTransientConnectionException +import net.corda.nodeapi.internal.serialization.CordaSerializationEncoding.SNAPPY +import java.sql.* import java.util.concurrent.TimeUnit /** @@ -43,7 +44,6 @@ class MySQLUniquenessProvider( metrics: MetricRegistry, configuration: MySQLConfiguration ) : UniquenessProvider, SingletonSerializeAsToken() { - companion object { private val log = loggerFor() @@ -91,18 +91,24 @@ class MySQLUniquenessProvider( private val connection: Connection get() = getConnection() - private fun getConnection(nRetries: Int = 0): Connection = - try { - dataSource.connection - } catch (e: SQLTransientConnectionException) { - if (nRetries == connectionRetries) { - log.warn("Couldn't obtain connection with {} retries, giving up, {}", nRetries, e) - throw e - } - log.warn("Connection exception, retrying", nRetries+1) - connectionExceptionCounter.inc() - getConnection(nRetries + 1) + /** + * Attempts to obtain a database connection with number of retries specified in [connectionRetries]. + * No backoff strategy is used since it's expected that the service can immediately fail over to a different + * database server in the replicated MySQL cluster. + */ + private fun getConnection(nRetries: Int = 0): Connection { + return try { + dataSource.connection + } catch (e: SQLTransientConnectionException) { + if (nRetries == connectionRetries) { + log.warn("Couldn't obtain connection with {} retries, giving up, {}", nRetries, e) + throw e } + log.warn("Error trying to obtain a database connection, retrying", nRetries + 1) + connectionExceptionCounter.inc() + getConnection(nRetries + 1) + } + } fun createTable() { log.debug("Attempting to create DB table if it does not yet exist: $createCommittedStateTable") @@ -120,12 +126,14 @@ class MySQLUniquenessProvider( override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { val s = Stopwatch.createStarted() try { - retryTransaction(CommitAll(states, txId, callerIdentity, requestSignature)) + runWithRetry(CommitAll(states, txId, callerIdentity, requestSignature)) nrInputStates.update(states.size) } catch (e: BatchUpdateException) { log.info("Unable to commit input states, finding conflicts, txId: $txId", e) + // TODO: do not increment the conflict counter if the conflict was caused by the service retrying + // db transaction. E.g. when failing over to a different MySQL server. conflictCounter.inc() - retryTransaction(FindConflicts(txId, states)) + runWithRetry(FindConflicts(txId, states)) } finally { val dt = s.stop().elapsed(TimeUnit.MILLISECONDS) commitTimer.update(dt, TimeUnit.MILLISECONDS) @@ -133,44 +141,54 @@ class MySQLUniquenessProvider( } } - private fun retryTransaction(tx: RetryableTransaction) { + private fun runWithRetry(action: DBTransaction) { connection.use { - while (true) { + loop@ while (true) { try { - tx.run(it) + action.run(it) + break } catch (e: Exception) { - if (e is MySQLTransactionRollbackException) { - log.warn("Rollback exception occurred, retrying", e) - rollbackCounter.inc() - continue - } else { - log.warn("Attempting to rollback", e) - it.rollback() - throw e + when (e) { + is MySQLTransactionRollbackException -> { + log.warn("Rollback exception occurred, retrying", e) + rollbackCounter.inc() + continue@loop // Retrying using the same connection + } + is SQLRecoverableException, is CommunicationsException, // Occurs when an issue is encountered during execute() (e.g. connection lost) + is SQLNonTransientConnectionException -> { // Occurs when an issue is encountered during commit() (e.g. connection lost) + log.warn("Lost connection to the database, retrying", e) + runWithRetry(action) // Retrying using a new connection + // TODO: don't reinsert notarisation request on retry + break@loop + } + else -> { + log.warn("Unexpected error occurred, attempting to rollback", e) + it.rollback() + throw e + } } } - break } - it.commit() } } - interface RetryableTransaction { + interface DBTransaction { fun run(conn: Connection) } - private class CommitAll(val states: List, val txId: SecureHash, val callerIdentity: Party, val requestSignature: NotarisationRequestSignature) : RetryableTransaction { + private class CommitAll(val states: List, val txId: SecureHash, val callerIdentity: Party, val requestSignature: NotarisationRequestSignature) : DBTransaction { override fun run(conn: Connection) { conn.prepareStatement(insertRequestStatement).apply { setBytes(1, txId.bytes) setString(2, callerIdentity.name.toString()) - setBytes(3, requestSignature.serialize().bytes) + setBytes(3, requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(SNAPPY)).bytes) execute() close() } - // We commit here, since we want to make sure it doesn't get rolled back in case of a conflict - // when committing inputs + // We commit here, since we want to make sure the notarisation request insertion + // doesn't get rolled back in case of a conflict when committing inputs conn.commit() + conn.setSavepoint() conn.prepareStatement(insertStateStatement).apply { states.forEach { stateRef -> // StateRef @@ -184,10 +202,11 @@ class MySQLUniquenessProvider( executeBatch() close() } + conn.commit() } } - private class FindConflicts(val txId: SecureHash, val states: List) : RetryableTransaction { + private class FindConflicts(val txId: SecureHash, val states: List) : DBTransaction { override fun run(conn: Connection) { val conflicts = mutableMapOf() states.forEach {