Merge pull request #361 from corda/thomas-mysql-reconnect

Retry obtaining DB connection
This commit is contained in:
Thomas Schroeter 2018-01-18 16:38:47 +00:00 committed by GitHub
commit e9f0c8eca8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 40 additions and 11 deletions

View File

@ -13,6 +13,7 @@ import net.corda.core.identity.Party
import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.StartedNode import net.corda.node.internal.StartedNode
import net.corda.node.services.config.MySQLConfiguration
import net.corda.node.services.config.NotaryConfig import net.corda.node.services.config.NotaryConfig
import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.network.NetworkParametersCopier import net.corda.nodeapi.internal.network.NetworkParametersCopier
@ -135,7 +136,7 @@ class MySQLNotaryServiceTests : IntegrationTest() {
legalName = notaryName, legalName = notaryName,
entropyRoot = BigInteger.valueOf(60L), entropyRoot = BigInteger.valueOf(60L),
configOverrides = { configOverrides = {
val notaryConfig = NotaryConfig(validating = false, mysql = dataStoreProperties) val notaryConfig = NotaryConfig(validating = false, mysql = MySQLConfiguration(dataStoreProperties))
doReturn(notaryConfig).whenever(it).notary doReturn(notaryConfig).whenever(it).notary
} }
) )

View File

@ -71,7 +71,7 @@ data class NotaryConfig(val validating: Boolean,
val raft: RaftConfig? = null, val raft: RaftConfig? = null,
val bftSMaRt: BFTSMaRtConfiguration? = null, val bftSMaRt: BFTSMaRtConfiguration? = null,
val custom: Boolean = false, val custom: Boolean = false,
val mysql: Properties? = null val mysql: MySQLConfiguration? = null
) { ) {
init { init {
require(raft == null || bftSMaRt == null || !custom || mysql == null) { require(raft == null || bftSMaRt == null || !custom || mysql == null) {
@ -81,6 +81,15 @@ data class NotaryConfig(val validating: Boolean,
val isClusterConfig: Boolean get() = raft != null || bftSMaRt != null val isClusterConfig: Boolean get() = raft != null || bftSMaRt != null
} }
data class MySQLConfiguration(
val dataSource: Properties,
val connectionRetries: Int = 0
) {
init {
require(connectionRetries >= 0) { "connectionRetries cannot be negative" }
}
}
data class RaftConfig(val nodeAddress: NetworkHostAndPort, val clusterAddresses: List<NetworkHostAndPort>) data class RaftConfig(val nodeAddress: NetworkHostAndPort, val clusterAddresses: List<NetworkHostAndPort>)
/** @param exposeRaces for testing only, so its default is not in reference.conf but here. */ /** @param exposeRaces for testing only, so its default is not in reference.conf but here. */

View File

@ -5,6 +5,7 @@ import net.corda.core.flows.FlowSession
import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.MySQLConfiguration
import java.security.PublicKey import java.security.PublicKey
import java.util.* import java.util.*
@ -12,14 +13,14 @@ import java.util.*
abstract class MySQLNotaryService( abstract class MySQLNotaryService(
final override val services: ServiceHubInternal, final override val services: ServiceHubInternal,
override val notaryIdentityKey: PublicKey, override val notaryIdentityKey: PublicKey,
dataSourceProperties: Properties, configuration: MySQLConfiguration,
/** Database table will be automatically created in dev mode */ /** Database table will be automatically created in dev mode */
val devMode: Boolean) : TrustedAuthorityNotaryService() { val devMode: Boolean) : TrustedAuthorityNotaryService() {
override val timeWindowChecker = TimeWindowChecker(services.clock) override val timeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider = MySQLUniquenessProvider( override val uniquenessProvider = MySQLUniquenessProvider(
services.monitoringService.metrics, services.monitoringService.metrics,
dataSourceProperties configuration
) )
override fun start() { override fun start() {
@ -33,14 +34,14 @@ abstract class MySQLNotaryService(
class MySQLNonValidatingNotaryService(services: ServiceHubInternal, class MySQLNonValidatingNotaryService(services: ServiceHubInternal,
notaryIdentityKey: PublicKey, notaryIdentityKey: PublicKey,
dataSourceProperties: Properties, configuration: MySQLConfiguration,
devMode: Boolean = false) : MySQLNotaryService(services, notaryIdentityKey, dataSourceProperties, devMode) { devMode: Boolean = false) : MySQLNotaryService(services, notaryIdentityKey, configuration, devMode) {
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = NonValidatingNotaryFlow(otherPartySession, this) override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = NonValidatingNotaryFlow(otherPartySession, this)
} }
class MySQLValidatingNotaryService(services: ServiceHubInternal, class MySQLValidatingNotaryService(services: ServiceHubInternal,
notaryIdentityKey: PublicKey, notaryIdentityKey: PublicKey,
dataSourceProperties: Properties, configuration: MySQLConfiguration,
devMode: Boolean = false) : MySQLNotaryService(services, notaryIdentityKey, dataSourceProperties, devMode) { devMode: Boolean = false) : MySQLNotaryService(services, notaryIdentityKey, configuration, devMode) {
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = ValidatingNotaryFlow(otherPartySession, this) override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = ValidatingNotaryFlow(otherPartySession, this)
} }

View File

@ -14,9 +14,11 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.node.services.config.MySQLConfiguration
import java.security.PublicKey import java.security.PublicKey
import java.sql.BatchUpdateException import java.sql.BatchUpdateException
import java.sql.Connection import java.sql.Connection
import java.sql.SQLTransientConnectionException
import java.util.* import java.util.*
/** /**
@ -27,7 +29,7 @@ import java.util.*
*/ */
class MySQLUniquenessProvider( class MySQLUniquenessProvider(
metrics: MetricRegistry, metrics: MetricRegistry,
dataSourceProperties: Properties configuration: MySQLConfiguration
) : UniquenessProvider, SingletonSerializeAsToken() { ) : UniquenessProvider, SingletonSerializeAsToken() {
companion object { companion object {
private val log = loggerFor<MySQLUniquenessProvider>() private val log = loggerFor<MySQLUniquenessProvider>()
@ -57,13 +59,29 @@ class MySQLUniquenessProvider(
* This is a useful heath metric. * This is a useful heath metric.
*/ */
private val rollbackCounter = metrics.counter("$metricPrefix.Rollback") private val rollbackCounter = metrics.counter("$metricPrefix.Rollback")
/** Incremented when we can not obtain a DB connection. */
private val connectionExceptionCounter = metrics.counter("$metricPrefix.ConnectionException")
/** Track double spend attempts. Note that this will also include notarisation retries. */ /** Track double spend attempts. Note that this will also include notarisation retries. */
private val conflictCounter = metrics.counter("$metricPrefix.Conflicts") private val conflictCounter = metrics.counter("$metricPrefix.Conflicts")
val dataSource = HikariDataSource(HikariConfig(dataSourceProperties)) val dataSource = HikariDataSource(HikariConfig(configuration.dataSource))
private val connectionRetries = configuration.connectionRetries
private val connection: Connection private val connection: Connection
get() = dataSource.connection get() = getConnection()
private fun getConnection(nRetries: Int = 0): Connection =
try {
dataSource.connection
} catch (e: SQLTransientConnectionException) {
if (nRetries == connectionRetries) {
log.warn("Couldn't obtain connection with {} retries, giving up, {}", nRetries, e)
throw e
}
log.warn("Connection exception, retrying", nRetries+1)
connectionExceptionCounter.inc()
getConnection(nRetries + 1)
}
fun createTable() { fun createTable() {
log.debug("Attempting to create DB table if it does not yet exist: $createTableStatement") log.debug("Attempting to create DB table if it does not yet exist: $createTableStatement")