ENT-1410: MySQL notary service - process requests in batches (#758)

* ENT-1410: MySQL notary service - process requests in batches
This commit is contained in:
Andrius Dagys 2018-05-04 18:36:07 +01:00 committed by GitHub
parent ea2f9c1ef8
commit e2ae04b11c
No known key found for this signature in database
8 changed files with 637 additions and 191 deletions

View File

@ -0,0 +1,62 @@
* R3 Proprietary and Confidential
* Copyright (c) 2018 R3 Limited. All rights reserved.
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
package net.corda.core.internal.notary
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.identity.Party
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
import net.corda.core.serialization.CordaSerializable
/** Base notary implementation for a notary that supports asynchronous calls from a flow. */
abstract class AsyncCFTNotaryService : TrustedAuthorityNotaryService() {
override val uniquenessProvider: UniquenessProvider get() = asyncUniquenessProvider
/** A uniqueness provider that supports asynchronous commits. */
protected abstract val asyncUniquenessProvider: AsyncUniquenessProvider
* Commits the provided input states asynchronously.
* If a consumed state conflict is reported by the [asyncUniquenessProvider], but it is caused by the same
* transaction the transaction is getting notarised twice a success response will be returned.
private fun commitAsync(
states: List<StateRef>,
txId: SecureHash,
callerIdentity: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?
): CordaFuture<Result> {
return asyncUniquenessProvider.commitAsync(states, txId, callerIdentity, requestSignature, timeWindow)
* Required for the flow to be able to suspend until the commit is complete.
* This object will be included in the flow checkpoint.
class CommitOperation(
val service: AsyncCFTNotaryService,
val inputs: List<StateRef>,
val txId: SecureHash,
val caller: Party,
val requestSignature: NotarisationRequestSignature,
val timeWindow: TimeWindow?) : FlowAsyncOperation<Result> {
override fun execute(): CordaFuture<Result> {
return service.commitAsync(inputs, txId, caller, requestSignature, timeWindow)

View File

@ -0,0 +1,45 @@
* R3 Proprietary and Confidential
* Copyright (c) 2018 R3 Limited. All rights reserved.
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
package net.corda.core.internal.notary
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
import net.corda.core.identity.Party
* A service that records input states of the given transaction and provides conflict information
* if any of the inputs have already been used in another transaction.
interface AsyncUniquenessProvider : UniquenessProvider {
/** Commits all input states of the given transaction. */
fun commitAsync(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?): CordaFuture<Result>
/** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?) {
val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow).get()
if (result is Result.Failure) {
throw NotaryInternalException(result.error)
/** The outcome of committing a transaction. */
sealed class Result {
/** Indicates that all input states have been committed successfully. */
object Success : Result()
/** Indicates that the transaction has not been committed. */
data class Failure(val error: NotaryError) : Result()

View File

@ -6,6 +6,8 @@ import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.executeAsync
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
import net.corda.core.utilities.unwrap
@ -34,7 +36,12 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
val parts = validateRequest(requestPayload)
txId = parts.id
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp)
if (service is AsyncCFTNotaryService) {
val result = executeAsync(AsyncCFTNotaryService.CommitOperation(service, parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp))
if (result is Result.Failure) throw NotaryInternalException(result.error)
} else {
service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp)
} catch (e: NotaryInternalException) {
throw NotaryException(e.error, txId)

View File

@ -28,19 +28,8 @@ abstract class TrustedAuthorityNotaryService : NotaryService() {
fun commitInputStates(inputs: List<StateRef>, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?) {
try {
uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow)
} catch (e: NotaryInternalException) {
if (e.error is NotaryError.Conflict) {
val conflicts = inputs.filterIndexed { _, stateRef ->
val cause = e.error.consumedStates[stateRef]
cause != null && cause.hashOfTransactionId != txId.sha256()
if (conflicts.isNotEmpty()) {
// TODO: Create a new UniquenessException that only contains the conflicts filtered above.
log.info("Notary conflicts for $txId: $conflicts")
throw e
} else throw e
} catch (e: Exception) {
if (e is NotaryInternalException) throw e
log.error("Internal error", e)
throw NotaryInternalException(NotaryError.General(Exception("Service unavailable, please try again later")))

View File

@ -10,23 +10,32 @@
package net.corda.node.services
import co.paralleluniverse.fibers.Suspendable
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import com.typesafe.config.ConfigFactory
import net.corda.client.mock.Generator
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
import net.corda.core.crypto.generateKeyPair
import net.corda.core.flows.*
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.notary.AsyncCFTNotaryService
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
import net.corda.core.internal.notary.generateSignature
import net.corda.core.node.NotaryInfo
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.internal.StartedNode
import net.corda.node.services.config.MySQLConfiguration
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.transactions.MySQLNotaryService
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.testing.common.internal.testNetworkParameters
@ -35,15 +44,19 @@ import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.node.TestClock
import net.corda.testing.node.internal.*
import org.assertj.core.api.Assertions
import org.junit.After
import org.junit.Before
import org.junit.ClassRule
import org.junit.Test
import java.math.BigInteger
import java.time.Duration
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class MySQLNotaryServiceTests : IntegrationTest() {
companion object {
@ -56,12 +69,13 @@ class MySQLNotaryServiceTests : IntegrationTest() {
private lateinit var mockNet: InternalMockNetwork
private lateinit var node: StartedNode<InternalMockNetwork.MockNode>
private val nodeParty: Party get() = node.info.singleIdentity()
private lateinit var notaryParty: Party
private lateinit var notaryNode: StartedNode<InternalMockNetwork.MockNode>
fun before() {
mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.testing.contracts"))
mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.testing.contracts"), threadPerNode = true)
notaryParty = DevIdentityGenerator.generateDistributedNotarySingularIdentity(listOf(mockNet.baseDirectory(mockNet.nextNodeId)), notaryName)
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryParty, false))))
val notaryNodeUnstarted = createNotaryNode()
@ -82,17 +96,10 @@ class MySQLNotaryServiceTests : IntegrationTest() {
fun `detect double spend`() {
val inputState = issueState(node, notaryParty)
val firstTxBuilder = TransactionBuilder(notaryParty)
val firstSpendTx = node.services.signInitialTransaction(firstTxBuilder)
val firstSpend = node.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture
val secondSpendBuilder = TransactionBuilder(notaryParty).withItems(inputState).run {
val dummyState = DummyContract.SingleOwnerState(0, node.info.singleIdentity())
addOutputState(dummyState, DummyContract.PROGRAM_ID)
@ -100,9 +107,10 @@ class MySQLNotaryServiceTests : IntegrationTest() {
val secondSpendTx = node.services.signInitialTransaction(secondSpendBuilder)
val secondSpend = node.services.startFlow(NotaryFlow.Client(secondSpendTx)).resultFuture
val firstSpend = node.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture
val secondSpend = node.services.startFlow(NotaryFlow.Client(secondSpendTx)).resultFuture
val ex = assertFailsWith(NotaryException::class) { secondSpend.getOrThrow() }
val error = ex.error as NotaryError.Conflict
@ -118,37 +126,115 @@ class MySQLNotaryServiceTests : IntegrationTest() {
val spendTx = node.services.signInitialTransaction(txBuilder)
val notarise = node.services.startFlow(NotaryFlow.Client(spendTx)).resultFuture
val signature = notarise.get().single()
val notariseRetry = node.services.startFlow(NotaryFlow.Client(spendTx)).resultFuture
val signatureRetry = notariseRetry.get().single()
val futures = (1..10).map {
val signatures = futures.transpose().get().flatten()
fun checkSignature(signature: TransactionSignature) {
assertEquals(notaryParty.owningKey, signature.by)
signatures.forEach { checkSignature(it) }
private fun createNotaryNode(): InternalMockNetwork.MockNode {
val dataStoreProperties = makeTestDataSourceProperties(configSupplier = { _, _ ->ConfigFactory.empty() }, fallBackConfigSupplier = ::inMemoryH2DataSourceConfig).apply {
setProperty("autoCommit", "false")
fun `should re-sign a transaction with an expired time-window`() {
val stx = run {
val inputState = issueState(node, notaryParty)
val tx = TransactionBuilder(notaryParty)
.setTimeWindow(node.services.clock.instant(), 30.seconds)
val sig1 = node.services.startFlow(NotaryFlow.Client(stx)).resultFuture.get().first()
assertEquals(sig1.by, notaryParty.owningKey)
mockNet.nodes.forEach {
val nodeClock = (it.started!!.services.clock as TestClock)
val sig2 = node.services.startFlow(NotaryFlow.Client(stx)).resultFuture.get().first()
assertEquals(sig2.by, notaryParty.owningKey)
fun `should report error for transaction with an invalid time-window`() {
val stx = run {
val inputState = issueState(node, notaryParty)
val tx = TransactionBuilder(notaryParty)
.setTimeWindow(node.services.clock.instant().plusSeconds(3600), 30.seconds)
val future = node.services.startFlow(NotaryFlow.Client(stx)).resultFuture
val ex = assertFailsWith(NotaryException::class) { future.getOrThrow() }
fun `requests are processed in batches`() {
val notaryService = notaryNode.notaryService as MySQLNotaryService
val transactionCount = 100
val results = notaryNode.services.startFlow(RequestGenerationFlow(notaryService, transactionCount)).resultFuture.get()
assertEquals(transactionCount, results.size)
assert(results.all { it === Result.Success })
fun `batches with too many input states are processed in chunks`() {
val notaryService = notaryNode.notaryService as MySQLNotaryService
val transactionCount = 10
val results = notaryNode.services.startFlow(RequestGenerationFlow(notaryService, transactionCount, 50)).resultFuture.get()
assertEquals(transactionCount, results.size)
assert(results.all { it === Result.Success })
private class RequestGenerationFlow(
private val service: MySQLNotaryService,
private val transactionCount: Int,
private val inputStateCount: Int? = null
) : FlowLogic<List<Result>>() {
private val publicKeyGeneratorSingle = Generator.pure(generateKeyPair().public)
private val partyGenerator: Generator<Party> = Generator.int().combine(publicKeyGeneratorSingle) { n, key ->
Party(CordaX500Name(organisation = "Party$n", locality = "London", country = "GB"), key)
private val txIdGenerator = Generator.bytes(32).map { SecureHash.sha256(it) }
private val stateRefGenerator = txIdGenerator.combine(Generator.intRange(0, 10)) { id, pos -> StateRef(id, pos) }
private val random = SplittableRandom()
override fun call(): List<Result> {
val futures = mutableListOf<CordaFuture<Result>>()
var requestSignature: NotarisationRequestSignature? = null
for (i in 1..transactionCount) {
val txId: SecureHash = txIdGenerator.generateOrFail(random)
val callerParty = partyGenerator.generateOrFail(random)
val inputGenerator = if (inputStateCount == null) {
Generator.replicatePoisson(4.0, stateRefGenerator, true)
} else {
Generator.replicate(inputStateCount, stateRefGenerator)
val inputs = inputGenerator.generateOrFail(random)
if (requestSignature == null || random.nextInt(10) < 2) {
requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub)
futures += AsyncCFTNotaryService.CommitOperation(
return futures.transpose().get()
return mockNet.createUnstartedNode(
legalName = notaryNodeName,
entropyRoot = BigInteger.valueOf(60L),
configOverrides = {
val notaryConfig = NotaryConfig(validating = false, mysql = MySQLConfiguration(dataStoreProperties))
private fun issueState(node: StartedNode<InternalMockNetwork.MockNode>, notary: Party): StateAndRef<*> {
@ -159,4 +245,23 @@ class MySQLNotaryServiceTests : IntegrationTest() {
StateAndRef(builder.outputStates().first(), StateRef(stx.id, 0))
private fun createNotaryNode(): InternalMockNetwork.MockNode {
val dataStoreProperties = makeTestDataSourceProperties(configSupplier = { _, _ -> ConfigFactory.empty() }, fallBackConfigSupplier = ::inMemoryH2DataSourceConfig).apply {
setProperty("autoCommit", "false")
return mockNet.createUnstartedNode(
legalName = notaryNodeName,
entropyRoot = BigInteger.valueOf(60L),
configOverrides = {
val notaryConfig = NotaryConfig(
validating = false,
mysql = MySQLConfiguration(dataStoreProperties, maxBatchSize = 10, maxBatchInputStates = 100)

View File

@ -115,13 +115,36 @@ data class NotaryConfig(val validating: Boolean,
"raft, bftSMaRt, custom, and mysql configs cannot be specified together"
val isClusterConfig: Boolean get() = raft != null || bftSMaRt != null || mysql != null
data class MySQLConfiguration(
val dataSource: Properties,
val connectionRetries: Int = 0
) {
* Number of times to attempt to reconnect to the database.
val connectionRetries: Int = 2, // Default value for a 3 server cluster.
* Time increment between re-connection attempts.
* The total back-off duration is calculated as: backOffIncrement * backOffBase ^ currentRetryCount
val backOffIncrement: Int = 500,
/** Exponential back-off multiplier base. */
val backOffBase: Double = 1.5,
/** The maximum number of transactions processed in a single batch. */
val maxBatchSize: Int = 500,
/** The maximum combined number of input states processed in a single batch. */
val maxBatchInputStates: Int = 10_000,
/** A batch will be processed after a specified timeout even if it has not yet reached full capacity. */
val batchTimeoutMs: Long = 200,
* The maximum number of commit requests in flight. Once the capacity is reached the service will block on
* further commit requests.
val maxQueueSize: Int = 100_000
) {
init {
require(connectionRetries >= 0) { "connectionRetries cannot be negative" }
@ -410,6 +433,7 @@ data class SecurityConfiguration(val authService: SecurityConfiguration.AuthServ
data class RelayConfiguration(val relayHost: String,
val remoteInboundPort: Int,
val username: String,

View File

@ -12,7 +12,7 @@ package net.corda.node.services.transactions
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
import net.corda.core.internal.notary.AsyncCFTNotaryService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.MySQLConfiguration
import java.security.PublicKey
@ -23,19 +23,20 @@ abstract class MySQLNotaryService(
override val notaryIdentityKey: PublicKey,
configuration: MySQLConfiguration,
/** Database table will be automatically created in dev mode */
val devMode: Boolean) : TrustedAuthorityNotaryService() {
val devMode: Boolean) : AsyncCFTNotaryService() {
override val uniquenessProvider = MySQLUniquenessProvider(
override val asyncUniquenessProvider = MySQLUniquenessProvider(
override fun start() {
if (devMode) uniquenessProvider.createTable()
if (devMode) asyncUniquenessProvider.createTable()
override fun stop() {
@ -47,8 +48,8 @@ class MySQLNonValidatingNotaryService(services: ServiceHubInternal,
class MySQLValidatingNotaryService(services: ServiceHubInternal,
notaryIdentityKey: PublicKey,
configuration: MySQLConfiguration,
devMode: Boolean = false) : MySQLNotaryService(services, notaryIdentityKey, configuration, devMode) {
notaryIdentityKey: PublicKey,
configuration: MySQLConfiguration,
devMode: Boolean = false) : MySQLNotaryService(services, notaryIdentityKey, configuration, devMode) {
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = ValidatingNotaryFlow(otherPartySession, this)

View File

@ -10,12 +10,15 @@
package net.corda.node.services.transactions
import com.codahale.metrics.Gauge
import com.codahale.metrics.MetricRegistry
import com.google.common.base.Stopwatch
import com.google.common.collect.Queues
import com.mysql.cj.jdbc.exceptions.CommunicationsException
import com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
@ -24,16 +27,29 @@ import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.AsyncUniquenessProvider
import net.corda.core.internal.notary.AsyncUniquenessProvider.Result
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.internal.notary.isConsumedByTheSameTx
import net.corda.core.internal.notary.validateTimeWindow
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.services.config.MySQLConfiguration
import net.corda.nodeapi.internal.serialization.CordaSerializationEncoding.SNAPPY
import java.sql.*
import java.time.Clock
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import kotlin.concurrent.thread
* Uniqueness provider backed by a MySQL database. It is intended to be used with a multi-master synchronously replicated
@ -43,24 +59,26 @@ import java.util.concurrent.TimeUnit
class MySQLUniquenessProvider(
metrics: MetricRegistry,
configuration: MySQLConfiguration
) : UniquenessProvider, SingletonSerializeAsToken() {
val clock: Clock,
val config: MySQLConfiguration
) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<MySQLUniquenessProvider>()
// TODO: optimize table schema for InnoDB
private val createCommittedStateTable =
private const val createCommittedStateTable =
"CREATE TABLE IF NOT EXISTS notary_committed_states (" +
"issue_transaction_id BINARY(32) NOT NULL," +
"issue_transaction_output_id INT UNSIGNED NOT NULL," +
"consuming_transaction_id BINARY(32) NOT NULL," +
"CONSTRAINT id PRIMARY KEY (issue_transaction_id, issue_transaction_output_id)" +
private val insertStateStatement = "INSERT INTO notary_committed_states (issue_transaction_id, issue_transaction_output_id, consuming_transaction_id) VALUES (?, ?, ?)"
private val findStatement = "SELECT consuming_transaction_id FROM notary_committed_states WHERE issue_transaction_id = ? AND issue_transaction_output_id = ?"
private const val insertStateStatement = "INSERT INTO notary_committed_states (issue_transaction_id, issue_transaction_output_id, consuming_transaction_id) VALUES (?, ?, ?)"
private const val findStateStatement = "SELECT consuming_transaction_id, issue_transaction_id, issue_transaction_output_id " +
"FROM notary_committed_states " +
"WHERE (issue_transaction_id = ? AND issue_transaction_output_id = ?)"
private const val findClause = "OR (issue_transaction_id = ? AND issue_transaction_output_id = ?)"
private val createRequestLogTable =
private const val createRequestLogTable =
"CREATE TABLE IF NOT EXISTS notary_request_log (" +
"consuming_transaction_id BINARY(32) NOT NULL," +
"requesting_party_name TEXT NOT NULL," +
@ -69,12 +87,27 @@ class MySQLUniquenessProvider(
"CONSTRAINT rid PRIMARY KEY (request_id)" +
private val insertRequestStatement = "INSERT INTO notary_request_log (consuming_transaction_id, requesting_party_name, request_signature) VALUES (?, ?, ?)"
private const val insertRequestStatement = "INSERT INTO notary_request_log (consuming_transaction_id, requesting_party_name, request_signature) VALUES (?, ?, ?)"
/** The maximum number of attempts to retry a database operation. */
private const val maxRetries = 1000
private data class CommitRequest(
val states: List<StateRef>,
val txId: SecureHash,
val callerIdentity: Party,
val requestSignature: NotarisationRequestSignature,
val timeWindow: TimeWindow?,
val id: UUID = UUID.randomUUID())
private val metricPrefix = MySQLUniquenessProvider::class.simpleName
/** Transaction commit duration and rate metric timer */
/** Transaction commit duration timer and TPS meter. */
private val commitTimer = metrics.timer("$metricPrefix.Commit")
/** IPS (input states per second) meter. */
private val inputStatesMeter = metrics.meter("$metricPrefix.IPS")
/** Transaction batch commit duration and rate meter. */
private val batchTimer = metrics.timer("$metricPrefix.BatchCommit")
* When writing to multiple masters with Galera, transaction rollbacks may happen due to high write contention.
* This is a useful heath metric.
@ -84,32 +117,313 @@ class MySQLUniquenessProvider(
private val connectionExceptionCounter = metrics.counter("$metricPrefix.ConnectionException")
/** Track double spend attempts. Note that this will also include notarisation retries. */
private val conflictCounter = metrics.counter("$metricPrefix.Conflicts")
/** Track the distribution of the number of input states **/
private val nrInputStates = metrics.histogram("$metricPrefix.NumberOfInputStates")
/** Track the distribution of the number of input states. **/
private val inputStateCount = metrics.histogram("$metricPrefix.NumberOfInputStates")
val dataSource = HikariDataSource(HikariConfig(configuration.dataSource))
private val connectionRetries = configuration.connectionRetries
private val dataSource = HikariDataSource(HikariConfig(config.dataSource))
private val connectionRetries = config.connectionRetries
/** Attempts to obtain a database connection with number of retries specified in [connectionRetries]. */
private val connection: Connection
get() = getConnection()
get() {
var retries = 0
while (true) {
try {
return dataSource.connection
} catch (e: SQLTransientConnectionException) {
if (retries == connectionRetries) {
log.warn("Couldn't obtain connection with $retries retries, giving up", e)
throw e
log.warn("Error trying to obtain a database connection, retrying. Attempts: $retries")
val backOffDurationMs = Math.round(
config.backOffIncrement * Math.pow(config.backOffBase, retries.toDouble())
private val requestQueue = LinkedBlockingQueue<CommitRequest>(config.maxQueueSize)
private val requestFutures = ConcurrentHashMap<UUID, OpenFuture<Result>>()
/** Track the request queue size. */
private val queueSizeGauge = metrics.register(
Gauge<Int> { requestQueue.size }
/** Track the batch size. **/
private val processedBatchSize = metrics.histogram("$metricPrefix.ProcessedBatchSize")
/** A request processor thread. */
private val processorThread = thread(name = "Notary request queue processor", isDaemon = true) {
try {
} catch (e: InterruptedException) {
log.debug { "Shutting down with ${requestQueue.size} in-flight requests unprocessed." }
* Attempts to obtain a database connection with number of retries specified in [connectionRetries].
* No backoff strategy is used since it's expected that the service can immediately fail over to a different
* database server in the replicated MySQL cluster.
* Generates and adds a [CommitRequest] to the request queue. If the request queue is full, this method will block
* until space is available.
* Returns a future that will complete once the request is processed, containing the commit [Result].
private fun getConnection(nRetries: Int = 0): Connection {
return try {
} catch (e: SQLTransientConnectionException) {
if (nRetries == connectionRetries) {
log.warn("Couldn't obtain connection with {} retries, giving up, {}", nRetries, e)
throw e
log.warn("Error trying to obtain a database connection, retrying", nRetries + 1)
getConnection(nRetries + 1)
override fun commitAsync(
states: List<StateRef>,
txId: SecureHash,
callerIdentity: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?
): CordaFuture<Result> {
val timer = Stopwatch.createStarted()
val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow)
val future = openFuture<Result>()
requestFutures[request.id] = future
future.then {
return future
private fun recordDuration(totalTime: Stopwatch) {
val elapsed = totalTime.elapsed(TimeUnit.MILLISECONDS)
commitTimer.update(elapsed, TimeUnit.MILLISECONDS)
* Processes notarisation requests in batches. It attempts to fill the batch with up to [maxBatchSize] requests,
* with a total combined number of input states no greater than [maxBatchInputStates].
* If there are not enough requests to fill the batch, it will get processed after a timeout of [batchTimeoutMs].
private fun processRequests() {
val buffer = LinkedList<CommitRequest>()
while (!Thread.interrupted()) {
val drainedSize = Queues.drain(requestQueue, buffer, config.maxBatchSize, config.batchTimeoutMs, TimeUnit.MILLISECONDS)
if (drainedSize == 0) continue
* Processes the request [buffer], potentially splitting it into more than one if the total number of
* inputs is over [maxBatchInputStates].
private fun processBuffer(buffer: LinkedList<CommitRequest>) {
var inputStateCount = 0
val batch = ArrayList<CommitRequest>()
while (buffer.isNotEmpty()) {
while (buffer.isNotEmpty() && inputStateCount + buffer.peek().states.size <= config.maxBatchInputStates) {
val request = buffer.poll()
inputStateCount += request.states.size
log.debug { "Processing a batch of size: ${batch.size}, input states: $inputStateCount" }
inputStateCount = 0
private fun processBatch(requests: List<CommitRequest>) {
val batchTime = Stopwatch.createStarted()
try {
val results = runWithRetry(CommitStates(requests, clock))
} catch (e: Exception) {
// Unhandled exception, we assume that signals a problem with the database that can't be fixed with
// a retry, such as misconfiguration.
log.error("Error notarising transactions", e)
} finally {
val elapsed = batchTime.elapsed(TimeUnit.MILLISECONDS)
batchTimer.update(elapsed, TimeUnit.MILLISECONDS)
log.trace { "Processed a batch of ${requests.size} requests in $elapsed ms" }
* Completes request futures with a successful response. This will resume service flows that will generate and
* send signatures back to the request originators.
private fun respondWithSuccess(results: Map<UUID, Result>) {
for ((requestId, result) in results) {
requestFutures[requestId]?.let {
if (result is Result.Failure && result.error is NotaryError.Conflict){
* If a database exception occurred when processing the batch, propagate the error to each request. This will
* resume the service flows that will forward the error message to the request originators.
private fun respondWithError(requests: List<CommitRequest>) {
for (request in requests) {
requestFutures[request.id]?.let {
it.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
/** Stores the notarisation requests including the request signature. */
private class LogRequests(val requests: List<CommitRequest>) : DBOperation<Unit> {
override fun execute(connection: Connection) {
// Write request signature to log
connection.prepareStatement(insertRequestStatement).apply {
requests.forEach { (_, txId, callerIdentity, requestSignature) ->
setBytes(1, txId.bytes)
setString(2, callerIdentity.name.toString())
setBytes(3, requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(SNAPPY)).bytes)
* Stores all input states that don't yet exist in the database.
* A [Result.Conflict] is created for each transaction with one or more inputs already present in the database.
private class CommitStates(val requests: List<CommitRequest>, val clock: Clock) : DBOperation<Map<UUID, Result>> {
override fun execute(connection: Connection): Map<UUID, Result> {
val results = mutableMapOf<UUID, Result>()
val allStates = requests.flatMap { it.states }
val allConflicts = findAlreadyCommitted(connection, allStates).toMutableMap()
val toCommit = mutableListOf<CommitRequest>()
requests.forEach { request ->
val conflicts = allConflicts.filter { it.key in request.states }
results[request.id] = if (conflicts.isNotEmpty()) {
if (isConsumedByTheSameTx(request.txId.sha256(), conflicts)) {
} else {
Result.Failure(NotaryError.Conflict(request.txId, conflicts))
} else {
val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow)
if (outsideTimeWindowError == null) {
// Mark states as consumed to capture conflicting transactions in the same batch
request.states.forEach {
allConflicts[it] = StateConsumptionDetails(request.txId.sha256())
} else {
connection.prepareStatement(insertStateStatement).apply {
toCommit.forEach { (states, txId, _, _) ->
states.forEach { stateRef ->
// StateRef
setBytes(1, stateRef.txhash.bytes)
setInt(2, stateRef.index)
// Consuming transaction
setBytes(3, txId.bytes)
return results
private fun findAlreadyCommitted(connection: Connection, states: List<StateRef>): Map<StateRef, StateConsumptionDetails> {
val queryString = buildQueryString(states.size)
val preparedStatement = connection.prepareStatement(queryString).apply {
var parameterIndex = 0
states.forEach { (txId, index) ->
setBytes(++parameterIndex, txId.bytes)
setInt(++parameterIndex, index)
val resultSet = preparedStatement.executeQuery()
val committedStates = mutableMapOf<StateRef, StateConsumptionDetails>()
while (resultSet.next()) {
val consumingTxId = SecureHash.SHA256(resultSet.getBytes(1))
val stateRef = StateRef(SecureHash.SHA256(resultSet.getBytes(2)), resultSet.getInt(3))
committedStates[stateRef] = StateConsumptionDetails(consumingTxId.sha256())
return committedStates
private fun buildQueryString(stateCount: Int): String {
val queryStringBuilder = StringBuilder(findStateStatement)
(1 until stateCount).forEach { queryStringBuilder.append(findClause) }
return queryStringBuilder.toString()
/** An interface for custom database operations. */
private interface DBOperation<out T> {
fun execute(connection: Connection): T
/** Runs the provided [operation], retrying on transient database errors. */
private fun <T> runWithRetry(operation: DBOperation<T>): T {
var retryCount = 0
while (retryCount < maxRetries) {
connection.use {
sameConnection@ while (retryCount < maxRetries) {
try {
return operation.execute(it)
} catch (e: Exception) {
when (e) {
is BatchUpdateException, // Occurs when a competing transaction commits conflicting input states
is MySQLTransactionRollbackException -> {
log.warn("Database transaction conflict, retrying", e)
continue@sameConnection // Retrying using the same connection
is SQLRecoverableException, is CommunicationsException, // Occurs when an issue is encountered during execute() (e.g. connection lost)
is SQLNonTransientConnectionException -> { // Occurs when an issue is encountered during commit() (e.g. connection lost)
log.warn("Lost connection to the database, retrying", e)
break@sameConnection // Retrying using a new connection
// TODO: don't reinsert notarisation request on retry
else -> {
log.warn("Unexpected error occurred, attempting to rollback", e)
throw e
throw IllegalStateException("Database operation reached the maximum number of retries: $retryCount, something went wrong.")
fun createTable() {
@ -123,107 +437,6 @@ class MySQLUniquenessProvider(
fun stop() {
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?) {
val s = Stopwatch.createStarted()
try {
runWithRetry(CommitAll(states, txId, callerIdentity, requestSignature))
} catch (e: BatchUpdateException) {
log.info("Unable to commit input states, finding conflicts, txId: $txId", e)
// TODO: do not increment the conflict counter if the conflict was caused by the service retrying
// db transaction. E.g. when failing over to a different MySQL server.
runWithRetry(FindConflicts(txId, states))
} finally {
val dt = s.stop().elapsed(TimeUnit.MILLISECONDS)
commitTimer.update(dt, TimeUnit.MILLISECONDS)
log.info("Processed notarisation request, txId: $txId, nrInputStates: ${states.size}, dt: $dt")
private fun runWithRetry(action: DBTransaction) {
connection.use {
loop@ while (true) {
try {
} catch (e: Exception) {
when (e) {
is MySQLTransactionRollbackException -> {
log.warn("Rollback exception occurred, retrying", e)
continue@loop // Retrying using the same connection
is SQLRecoverableException, is CommunicationsException, // Occurs when an issue is encountered during execute() (e.g. connection lost)
is SQLNonTransientConnectionException -> { // Occurs when an issue is encountered during commit() (e.g. connection lost)
log.warn("Lost connection to the database, retrying", e)
runWithRetry(action) // Retrying using a new connection
// TODO: don't reinsert notarisation request on retry
else -> {
log.warn("Unexpected error occurred, attempting to rollback", e)
throw e
interface DBTransaction {
fun run(conn: Connection)
private class CommitAll(val states: List<StateRef>, val txId: SecureHash, val callerIdentity: Party, val requestSignature: NotarisationRequestSignature) : DBTransaction {
override fun run(conn: Connection) {
conn.prepareStatement(insertRequestStatement).apply {
setBytes(1, txId.bytes)
setString(2, callerIdentity.name.toString())
setBytes(3, requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(SNAPPY)).bytes)
// We commit here, since we want to make sure the notarisation request insertion
// doesn't get rolled back in case of a conflict when committing inputs
conn.prepareStatement(insertStateStatement).apply {
states.forEach { stateRef ->
// StateRef
setBytes(1, stateRef.txhash.bytes)
setInt(2, stateRef.index)
// Consuming transaction
setBytes(3, txId.bytes)
private class FindConflicts(val txId: SecureHash, val states: List<StateRef>) : DBTransaction {
override fun run(conn: Connection) {
val conflicts = mutableMapOf<StateRef, StateConsumptionDetails>()
states.forEach {
val st = conn.prepareStatement(findStatement).apply {
setBytes(1, it.txhash.bytes)
setInt(2, it.index)
val result = st.executeQuery()
if (result.next()) {
val consumingTxId = SecureHash.SHA256(result.getBytes(1))
conflicts[it] = StateConsumptionDetails(consumingTxId.sha256())
if (conflicts.isNotEmpty()) throw NotaryInternalException(NotaryError.Conflict(txId, conflicts))