MySQL notary service: improve database failover and exception handling.

Transactions are now retried if a communication failure occurs during execute() or commit().
Note that this may result in duplicate entries in the notary request log, and false state
commit conflicts (the notary client would still receive a successful response).
We need to tidy this up in the future.
This commit is contained in:
Andrius Dagys 2018-03-27 17:05:06 +01:00
parent 76cb58f66a
commit 447b41d936

View File

@ -12,6 +12,7 @@ package net.corda.node.services.transactions
import com.codahale.metrics.MetricRegistry import com.codahale.metrics.MetricRegistry
import com.google.common.base.Stopwatch import com.google.common.base.Stopwatch
import com.mysql.cj.jdbc.exceptions.CommunicationsException
import com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException import com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException
import com.zaxxer.hikari.HikariConfig import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource import com.zaxxer.hikari.HikariDataSource
@ -24,13 +25,13 @@ import net.corda.core.flows.NotaryInternalException
import net.corda.core.flows.StateConsumptionDetails import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.node.services.UniquenessProvider import net.corda.core.node.services.UniquenessProvider
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.node.services.config.MySQLConfiguration import net.corda.node.services.config.MySQLConfiguration
import java.sql.BatchUpdateException import net.corda.nodeapi.internal.serialization.CordaSerializationEncoding.SNAPPY
import java.sql.Connection import java.sql.*
import java.sql.SQLTransientConnectionException
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
/** /**
@ -43,7 +44,6 @@ class MySQLUniquenessProvider(
metrics: MetricRegistry, metrics: MetricRegistry,
configuration: MySQLConfiguration configuration: MySQLConfiguration
) : UniquenessProvider, SingletonSerializeAsToken() { ) : UniquenessProvider, SingletonSerializeAsToken() {
companion object { companion object {
private val log = loggerFor<MySQLUniquenessProvider>() private val log = loggerFor<MySQLUniquenessProvider>()
@ -91,18 +91,24 @@ class MySQLUniquenessProvider(
private val connection: Connection private val connection: Connection
get() = getConnection() get() = getConnection()
private fun getConnection(nRetries: Int = 0): Connection = /**
try { * Attempts to obtain a database connection with number of retries specified in [connectionRetries].
dataSource.connection * No backoff strategy is used since it's expected that the service can immediately fail over to a different
} catch (e: SQLTransientConnectionException) { * database server in the replicated MySQL cluster.
if (nRetries == connectionRetries) { */
log.warn("Couldn't obtain connection with {} retries, giving up, {}", nRetries, e) private fun getConnection(nRetries: Int = 0): Connection {
throw e return try {
} dataSource.connection
log.warn("Connection exception, retrying", nRetries+1) } catch (e: SQLTransientConnectionException) {
connectionExceptionCounter.inc() if (nRetries == connectionRetries) {
getConnection(nRetries + 1) log.warn("Couldn't obtain connection with {} retries, giving up, {}", nRetries, e)
throw e
} }
log.warn("Error trying to obtain a database connection, retrying", nRetries + 1)
connectionExceptionCounter.inc()
getConnection(nRetries + 1)
}
}
fun createTable() { fun createTable() {
log.debug("Attempting to create DB table if it does not yet exist: $createCommittedStateTable") log.debug("Attempting to create DB table if it does not yet exist: $createCommittedStateTable")
@ -120,12 +126,14 @@ class MySQLUniquenessProvider(
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
val s = Stopwatch.createStarted() val s = Stopwatch.createStarted()
try { try {
retryTransaction(CommitAll(states, txId, callerIdentity, requestSignature)) runWithRetry(CommitAll(states, txId, callerIdentity, requestSignature))
nrInputStates.update(states.size) nrInputStates.update(states.size)
} catch (e: BatchUpdateException) { } catch (e: BatchUpdateException) {
log.info("Unable to commit input states, finding conflicts, txId: $txId", e) log.info("Unable to commit input states, finding conflicts, txId: $txId", e)
// TODO: do not increment the conflict counter if the conflict was caused by the service retrying
// db transaction. E.g. when failing over to a different MySQL server.
conflictCounter.inc() conflictCounter.inc()
retryTransaction(FindConflicts(txId, states)) runWithRetry(FindConflicts(txId, states))
} finally { } finally {
val dt = s.stop().elapsed(TimeUnit.MILLISECONDS) val dt = s.stop().elapsed(TimeUnit.MILLISECONDS)
commitTimer.update(dt, TimeUnit.MILLISECONDS) commitTimer.update(dt, TimeUnit.MILLISECONDS)
@ -133,44 +141,54 @@ class MySQLUniquenessProvider(
} }
} }
private fun retryTransaction(tx: RetryableTransaction) { private fun runWithRetry(action: DBTransaction) {
connection.use { connection.use {
while (true) { loop@ while (true) {
try { try {
tx.run(it) action.run(it)
break
} catch (e: Exception) { } catch (e: Exception) {
if (e is MySQLTransactionRollbackException) { when (e) {
log.warn("Rollback exception occurred, retrying", e) is MySQLTransactionRollbackException -> {
rollbackCounter.inc() log.warn("Rollback exception occurred, retrying", e)
continue rollbackCounter.inc()
} else { continue@loop // Retrying using the same connection
log.warn("Attempting to rollback", e) }
it.rollback() is SQLRecoverableException, is CommunicationsException, // Occurs when an issue is encountered during execute() (e.g. connection lost)
throw e is SQLNonTransientConnectionException -> { // Occurs when an issue is encountered during commit() (e.g. connection lost)
log.warn("Lost connection to the database, retrying", e)
runWithRetry(action) // Retrying using a new connection
// TODO: don't reinsert notarisation request on retry
break@loop
}
else -> {
log.warn("Unexpected error occurred, attempting to rollback", e)
it.rollback()
throw e
}
} }
} }
break
} }
it.commit()
} }
} }
interface RetryableTransaction { interface DBTransaction {
fun run(conn: Connection) fun run(conn: Connection)
} }
private class CommitAll(val states: List<StateRef>, val txId: SecureHash, val callerIdentity: Party, val requestSignature: NotarisationRequestSignature) : RetryableTransaction { private class CommitAll(val states: List<StateRef>, val txId: SecureHash, val callerIdentity: Party, val requestSignature: NotarisationRequestSignature) : DBTransaction {
override fun run(conn: Connection) { override fun run(conn: Connection) {
conn.prepareStatement(insertRequestStatement).apply { conn.prepareStatement(insertRequestStatement).apply {
setBytes(1, txId.bytes) setBytes(1, txId.bytes)
setString(2, callerIdentity.name.toString()) setString(2, callerIdentity.name.toString())
setBytes(3, requestSignature.serialize().bytes) setBytes(3, requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(SNAPPY)).bytes)
execute() execute()
close() close()
} }
// We commit here, since we want to make sure it doesn't get rolled back in case of a conflict // We commit here, since we want to make sure the notarisation request insertion
// when committing inputs // doesn't get rolled back in case of a conflict when committing inputs
conn.commit() conn.commit()
conn.setSavepoint()
conn.prepareStatement(insertStateStatement).apply { conn.prepareStatement(insertStateStatement).apply {
states.forEach { stateRef -> states.forEach { stateRef ->
// StateRef // StateRef
@ -184,10 +202,11 @@ class MySQLUniquenessProvider(
executeBatch() executeBatch()
close() close()
} }
conn.commit()
} }
} }
private class FindConflicts(val txId: SecureHash, val states: List<StateRef>) : RetryableTransaction { private class FindConflicts(val txId: SecureHash, val states: List<StateRef>) : DBTransaction {
override fun run(conn: Connection) { override fun run(conn: Connection) {
val conflicts = mutableMapOf<StateRef, StateConsumptionDetails>() val conflicts = mutableMapOf<StateRef, StateConsumptionDetails>()
states.forEach { states.forEach {