From e2ae04b11c42042b612a74acb633320c2c16166e Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Fri, 4 May 2018 18:36:07 +0100 Subject: [PATCH] ENT-1410: MySQL notary service - process requests in batches (#758) * ENT-1410: MySQL notary service - process requests in batches --- .../internal/notary/AsyncCFTNotaryService.kt | 62 +++ .../notary/AsyncUniquenessProvider.kt | 45 ++ .../core/internal/notary/NotaryServiceFlow.kt | 9 +- .../notary/TrustedAuthorityNotaryService.kt | 13 +- .../node/services/MySQLNotaryServiceTests.kt | 177 +++++-- .../node/services/config/NodeConfiguration.kt | 28 +- .../transactions/MySQLNotaryService.kt | 17 +- .../transactions/MySQLUniquenessProvider.kt | 477 +++++++++++++----- 8 files changed, 637 insertions(+), 191 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/internal/notary/AsyncCFTNotaryService.kt create mode 100644 core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncCFTNotaryService.kt b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncCFTNotaryService.kt new file mode 100644 index 0000000000..27ce1be073 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncCFTNotaryService.kt @@ -0,0 +1,62 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + +package net.corda.core.internal.notary + +import net.corda.core.concurrent.CordaFuture +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.NotarisationRequestSignature +import net.corda.core.identity.Party +import net.corda.core.internal.FlowAsyncOperation +import net.corda.core.internal.notary.AsyncUniquenessProvider.Result +import net.corda.core.serialization.CordaSerializable + +/** Base notary implementation for a notary that supports asynchronous calls from a flow. */ +abstract class AsyncCFTNotaryService : TrustedAuthorityNotaryService() { + override val uniquenessProvider: UniquenessProvider get() = asyncUniquenessProvider + /** A uniqueness provider that supports asynchronous commits. */ + protected abstract val asyncUniquenessProvider: AsyncUniquenessProvider + + /** + * Commits the provided input states asynchronously. + * + * If a consumed state conflict is reported by the [asyncUniquenessProvider], but it is caused by the same + * transaction – the transaction is getting notarised twice – a success response will be returned. + */ + private fun commitAsync( + states: List, + txId: SecureHash, + callerIdentity: Party, + requestSignature: NotarisationRequestSignature, + timeWindow: TimeWindow? + ): CordaFuture { + return asyncUniquenessProvider.commitAsync(states, txId, callerIdentity, requestSignature, timeWindow) + } + + /** + * Required for the flow to be able to suspend until the commit is complete. + * This object will be included in the flow checkpoint. + */ + @CordaSerializable + class CommitOperation( + val service: AsyncCFTNotaryService, + val inputs: List, + val txId: SecureHash, + val caller: Party, + val requestSignature: NotarisationRequestSignature, + val timeWindow: TimeWindow?) : FlowAsyncOperation { + override fun execute(): CordaFuture { + return service.commitAsync(inputs, txId, caller, requestSignature, timeWindow) + } + } +} + diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt new file mode 100644 index 0000000000..b611f6fdbc --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt @@ -0,0 +1,45 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + +package net.corda.core.internal.notary + +import net.corda.core.concurrent.CordaFuture +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.NotarisationRequestSignature +import net.corda.core.flows.NotaryError +import net.corda.core.identity.Party + +/** + * A service that records input states of the given transaction and provides conflict information + * if any of the inputs have already been used in another transaction. + */ +interface AsyncUniquenessProvider : UniquenessProvider { + /** Commits all input states of the given transaction. */ + fun commitAsync(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?): CordaFuture + + /** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */ + override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?) { + val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow).get() + if (result is Result.Failure) { + throw NotaryInternalException(result.error) + } + } + + /** The outcome of committing a transaction. */ + sealed class Result { + /** Indicates that all input states have been committed successfully. */ + object Success : Result() + /** Indicates that the transaction has not been committed. */ + data class Failure(val error: NotaryError) : Result() + } +} + diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt index ad1ad7293e..bfe059f189 100644 --- a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt @@ -6,6 +6,8 @@ import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash import net.corda.core.flows.* import net.corda.core.identity.Party +import net.corda.core.internal.executeAsync +import net.corda.core.internal.notary.AsyncUniquenessProvider.Result import net.corda.core.utilities.unwrap /** @@ -34,7 +36,12 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: val parts = validateRequest(requestPayload) txId = parts.id checkNotary(parts.notary) - service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp) + if (service is AsyncCFTNotaryService) { + val result = executeAsync(AsyncCFTNotaryService.CommitOperation(service, parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp)) + if (result is Result.Failure) throw NotaryInternalException(result.error) + } else { + service.commitInputStates(parts.inputs, txId, otherSideSession.counterparty, requestPayload.requestSignature, parts.timestamp) + } signTransactionAndSendResponse(txId) } catch (e: NotaryInternalException) { throw NotaryException(e.error, txId) diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt b/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt index 3d6bda0a7b..a947288a97 100644 --- a/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt +++ b/core/src/main/kotlin/net/corda/core/internal/notary/TrustedAuthorityNotaryService.kt @@ -28,19 +28,8 @@ abstract class TrustedAuthorityNotaryService : NotaryService() { fun commitInputStates(inputs: List, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?) { try { uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow) - } catch (e: NotaryInternalException) { - if (e.error is NotaryError.Conflict) { - val conflicts = inputs.filterIndexed { _, stateRef -> - val cause = e.error.consumedStates[stateRef] - cause != null && cause.hashOfTransactionId != txId.sha256() - } - if (conflicts.isNotEmpty()) { - // TODO: Create a new UniquenessException that only contains the conflicts filtered above. - log.info("Notary conflicts for $txId: $conflicts") - throw e - } - } else throw e } catch (e: Exception) { + if (e is NotaryInternalException) throw e log.error("Internal error", e) throw NotaryInternalException(NotaryError.General(Exception("Service unavailable, please try again later"))) } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/MySQLNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/MySQLNotaryServiceTests.kt index b14c0bfdca..9782860d4d 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/MySQLNotaryServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/MySQLNotaryServiceTests.kt @@ -10,23 +10,32 @@ package net.corda.node.services +import co.paralleluniverse.fibers.Suspendable import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.whenever import com.typesafe.config.ConfigFactory +import net.corda.client.mock.Generator +import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash import net.corda.core.crypto.TransactionSignature -import net.corda.core.flows.NotaryError -import net.corda.core.flows.NotaryException -import net.corda.core.flows.NotaryFlow +import net.corda.core.crypto.generateKeyPair +import net.corda.core.flows.* import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.transpose +import net.corda.core.internal.notary.AsyncCFTNotaryService +import net.corda.core.internal.notary.AsyncUniquenessProvider.Result +import net.corda.core.internal.notary.generateSignature import net.corda.core.node.NotaryInfo import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds import net.corda.node.internal.StartedNode import net.corda.node.services.config.MySQLConfiguration import net.corda.node.services.config.NotaryConfig +import net.corda.node.services.transactions.MySQLNotaryService import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.network.NetworkParametersCopier import net.corda.testing.common.internal.testNetworkParameters @@ -35,15 +44,19 @@ import net.corda.testing.core.dummyCommand import net.corda.testing.core.singleIdentity import net.corda.testing.internal.IntegrationTest import net.corda.testing.internal.IntegrationTestSchemas +import net.corda.testing.node.TestClock import net.corda.testing.node.internal.* +import org.assertj.core.api.Assertions import org.junit.After import org.junit.Before import org.junit.ClassRule import org.junit.Test import java.math.BigInteger +import java.time.Duration import java.util.* import kotlin.test.assertEquals import kotlin.test.assertFailsWith +import kotlin.test.assertTrue class MySQLNotaryServiceTests : IntegrationTest() { companion object { @@ -56,12 +69,13 @@ class MySQLNotaryServiceTests : IntegrationTest() { private lateinit var mockNet: InternalMockNetwork private lateinit var node: StartedNode + private val nodeParty: Party get() = node.info.singleIdentity() private lateinit var notaryParty: Party private lateinit var notaryNode: StartedNode @Before fun before() { - mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.testing.contracts")) + mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.testing.contracts"), threadPerNode = true) notaryParty = DevIdentityGenerator.generateDistributedNotarySingularIdentity(listOf(mockNet.baseDirectory(mockNet.nextNodeId)), notaryName) val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryParty, false)))) val notaryNodeUnstarted = createNotaryNode() @@ -82,17 +96,10 @@ class MySQLNotaryServiceTests : IntegrationTest() { @Test fun `detect double spend`() { val inputState = issueState(node, notaryParty) - val firstTxBuilder = TransactionBuilder(notaryParty) .addInputState(inputState) .addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey)) val firstSpendTx = node.services.signInitialTransaction(firstTxBuilder) - - val firstSpend = node.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture - mockNet.runNetwork() - - firstSpend.getOrThrow() - val secondSpendBuilder = TransactionBuilder(notaryParty).withItems(inputState).run { val dummyState = DummyContract.SingleOwnerState(0, node.info.singleIdentity()) addOutputState(dummyState, DummyContract.PROGRAM_ID) @@ -100,9 +107,10 @@ class MySQLNotaryServiceTests : IntegrationTest() { this } val secondSpendTx = node.services.signInitialTransaction(secondSpendBuilder) - val secondSpend = node.services.startFlow(NotaryFlow.Client(secondSpendTx)).resultFuture - mockNet.runNetwork() + val firstSpend = node.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture + val secondSpend = node.services.startFlow(NotaryFlow.Client(secondSpendTx)).resultFuture + firstSpend.getOrThrow() val ex = assertFailsWith(NotaryException::class) { secondSpend.getOrThrow() } val error = ex.error as NotaryError.Conflict @@ -118,37 +126,115 @@ class MySQLNotaryServiceTests : IntegrationTest() { .addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey)) val spendTx = node.services.signInitialTransaction(txBuilder) - val notarise = node.services.startFlow(NotaryFlow.Client(spendTx)).resultFuture - mockNet.runNetwork() - val signature = notarise.get().single() - - val notariseRetry = node.services.startFlow(NotaryFlow.Client(spendTx)).resultFuture - mockNet.runNetwork() - val signatureRetry = notariseRetry.get().single() + val futures = (1..10).map { + node.services.startFlow(NotaryFlow.Client(spendTx)).resultFuture + } + val signatures = futures.transpose().get().flatten() fun checkSignature(signature: TransactionSignature) { signature.verify(spendTx.id) assertEquals(notaryParty.owningKey, signature.by) } - - checkSignature(signature) - checkSignature(signatureRetry) + signatures.forEach { checkSignature(it) } } - private fun createNotaryNode(): InternalMockNetwork.MockNode { - val dataStoreProperties = makeTestDataSourceProperties(configSupplier = { _, _ ->ConfigFactory.empty() }, fallBackConfigSupplier = ::inMemoryH2DataSourceConfig).apply { - setProperty("autoCommit", "false") + @Test + fun `should re-sign a transaction with an expired time-window`() { + val stx = run { + val inputState = issueState(node, notaryParty) + val tx = TransactionBuilder(notaryParty) + .addInputState(inputState) + .addCommand(dummyCommand(nodeParty.owningKey)) + .setTimeWindow(node.services.clock.instant(), 30.seconds) + node.services.signInitialTransaction(tx) + } + + val sig1 = node.services.startFlow(NotaryFlow.Client(stx)).resultFuture.get().first() + assertEquals(sig1.by, notaryParty.owningKey) + assertTrue(sig1.isValid(stx.id)) + + mockNet.nodes.forEach { + val nodeClock = (it.started!!.services.clock as TestClock) + nodeClock.advanceBy(Duration.ofDays(1)) + } + + val sig2 = node.services.startFlow(NotaryFlow.Client(stx)).resultFuture.get().first() + assertEquals(sig2.by, notaryParty.owningKey) + } + + @Test + fun `should report error for transaction with an invalid time-window`() { + val stx = run { + val inputState = issueState(node, notaryParty) + val tx = TransactionBuilder(notaryParty) + .addInputState(inputState) + .addCommand(dummyCommand(nodeParty.owningKey)) + .setTimeWindow(node.services.clock.instant().plusSeconds(3600), 30.seconds) + node.services.signInitialTransaction(tx) + } + val future = node.services.startFlow(NotaryFlow.Client(stx)).resultFuture + + val ex = assertFailsWith(NotaryException::class) { future.getOrThrow() } + Assertions.assertThat(ex.error).isInstanceOf(NotaryError.TimeWindowInvalid::class.java) + } + + @Test + fun `requests are processed in batches`() { + val notaryService = notaryNode.notaryService as MySQLNotaryService + val transactionCount = 100 + val results = notaryNode.services.startFlow(RequestGenerationFlow(notaryService, transactionCount)).resultFuture.get() + assertEquals(transactionCount, results.size) + assert(results.all { it === Result.Success }) + } + + @Test + fun `batches with too many input states are processed in chunks`() { + val notaryService = notaryNode.notaryService as MySQLNotaryService + val transactionCount = 10 + val results = notaryNode.services.startFlow(RequestGenerationFlow(notaryService, transactionCount, 50)).resultFuture.get() + assertEquals(transactionCount, results.size) + assert(results.all { it === Result.Success }) + } + + private class RequestGenerationFlow( + private val service: MySQLNotaryService, + private val transactionCount: Int, + private val inputStateCount: Int? = null + ) : FlowLogic>() { + private val publicKeyGeneratorSingle = Generator.pure(generateKeyPair().public) + private val partyGenerator: Generator = Generator.int().combine(publicKeyGeneratorSingle) { n, key -> + Party(CordaX500Name(organisation = "Party$n", locality = "London", country = "GB"), key) + } + private val txIdGenerator = Generator.bytes(32).map { SecureHash.sha256(it) } + private val stateRefGenerator = txIdGenerator.combine(Generator.intRange(0, 10)) { id, pos -> StateRef(id, pos) } + private val random = SplittableRandom() + + @Suspendable + override fun call(): List { + val futures = mutableListOf>() + var requestSignature: NotarisationRequestSignature? = null + for (i in 1..transactionCount) { + val txId: SecureHash = txIdGenerator.generateOrFail(random) + val callerParty = partyGenerator.generateOrFail(random) + val inputGenerator = if (inputStateCount == null) { + Generator.replicatePoisson(4.0, stateRefGenerator, true) + } else { + Generator.replicate(inputStateCount, stateRefGenerator) + } + val inputs = inputGenerator.generateOrFail(random) + if (requestSignature == null || random.nextInt(10) < 2) { + requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub) + } + futures += AsyncCFTNotaryService.CommitOperation( + service, + inputs, + txId, + callerParty, + requestSignature, + null).execute() + } + return futures.transpose().get() } - return mockNet.createUnstartedNode( - InternalMockNodeParameters( - legalName = notaryNodeName, - entropyRoot = BigInteger.valueOf(60L), - configOverrides = { - val notaryConfig = NotaryConfig(validating = false, mysql = MySQLConfiguration(dataStoreProperties)) - doReturn(notaryConfig).whenever(it).notary - } - ) - ) } private fun issueState(node: StartedNode, notary: Party): StateAndRef<*> { @@ -159,4 +245,23 @@ class MySQLNotaryServiceTests : IntegrationTest() { StateAndRef(builder.outputStates().first(), StateRef(stx.id, 0)) } } + + private fun createNotaryNode(): InternalMockNetwork.MockNode { + val dataStoreProperties = makeTestDataSourceProperties(configSupplier = { _, _ -> ConfigFactory.empty() }, fallBackConfigSupplier = ::inMemoryH2DataSourceConfig).apply { + setProperty("autoCommit", "false") + } + return mockNet.createUnstartedNode( + InternalMockNodeParameters( + legalName = notaryNodeName, + entropyRoot = BigInteger.valueOf(60L), + configOverrides = { + val notaryConfig = NotaryConfig( + validating = false, + mysql = MySQLConfiguration(dataStoreProperties, maxBatchSize = 10, maxBatchInputStates = 100) + ) + doReturn(notaryConfig).whenever(it).notary + } + ) + ) + } } diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index b87dfbc211..13e3417b34 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -115,13 +115,36 @@ data class NotaryConfig(val validating: Boolean, "raft, bftSMaRt, custom, and mysql configs cannot be specified together" } } + val isClusterConfig: Boolean get() = raft != null || bftSMaRt != null || mysql != null } data class MySQLConfiguration( val dataSource: Properties, - val connectionRetries: Int = 0 -) { + /** + * Number of times to attempt to reconnect to the database. + */ + val connectionRetries: Int = 2, // Default value for a 3 server cluster. + /** + * Time increment between re-connection attempts. + * + * The total back-off duration is calculated as: backOffIncrement * backOffBase ^ currentRetryCount + */ + val backOffIncrement: Int = 500, + /** Exponential back-off multiplier base. */ + val backOffBase: Double = 1.5, + /** The maximum number of transactions processed in a single batch. */ + val maxBatchSize: Int = 500, + /** The maximum combined number of input states processed in a single batch. */ + val maxBatchInputStates: Int = 10_000, + /** A batch will be processed after a specified timeout even if it has not yet reached full capacity. */ + val batchTimeoutMs: Long = 200, + /** + * The maximum number of commit requests in flight. Once the capacity is reached the service will block on + * further commit requests. + */ + val maxQueueSize: Int = 100_000 + ) { init { require(connectionRetries >= 0) { "connectionRetries cannot be negative" } } @@ -410,6 +433,7 @@ data class SecurityConfiguration(val authService: SecurityConfiguration.AuthServ } } } + data class RelayConfiguration(val relayHost: String, val remoteInboundPort: Int, val username: String, diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/MySQLNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/MySQLNotaryService.kt index 067d020184..1c43dd4bb1 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/MySQLNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/MySQLNotaryService.kt @@ -12,7 +12,7 @@ package net.corda.node.services.transactions import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowSession -import net.corda.core.internal.notary.TrustedAuthorityNotaryService +import net.corda.core.internal.notary.AsyncCFTNotaryService import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.MySQLConfiguration import java.security.PublicKey @@ -23,19 +23,20 @@ abstract class MySQLNotaryService( override val notaryIdentityKey: PublicKey, configuration: MySQLConfiguration, /** Database table will be automatically created in dev mode */ - val devMode: Boolean) : TrustedAuthorityNotaryService() { + val devMode: Boolean) : AsyncCFTNotaryService() { - override val uniquenessProvider = MySQLUniquenessProvider( + override val asyncUniquenessProvider = MySQLUniquenessProvider( services.monitoringService.metrics, + services.clock, configuration ) override fun start() { - if (devMode) uniquenessProvider.createTable() + if (devMode) asyncUniquenessProvider.createTable() } override fun stop() { - uniquenessProvider.stop() + asyncUniquenessProvider.stop() } } @@ -47,8 +48,8 @@ class MySQLNonValidatingNotaryService(services: ServiceHubInternal, } class MySQLValidatingNotaryService(services: ServiceHubInternal, - notaryIdentityKey: PublicKey, - configuration: MySQLConfiguration, - devMode: Boolean = false) : MySQLNotaryService(services, notaryIdentityKey, configuration, devMode) { + notaryIdentityKey: PublicKey, + configuration: MySQLConfiguration, + devMode: Boolean = false) : MySQLNotaryService(services, notaryIdentityKey, configuration, devMode) { override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = ValidatingNotaryFlow(otherPartySession, this) } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt index b0a9754511..a89a1c3cc7 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/MySQLUniquenessProvider.kt @@ -10,12 +10,15 @@ package net.corda.node.services.transactions +import com.codahale.metrics.Gauge import com.codahale.metrics.MetricRegistry import com.google.common.base.Stopwatch +import com.google.common.collect.Queues import com.mysql.cj.jdbc.exceptions.CommunicationsException import com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException import com.zaxxer.hikari.HikariConfig import com.zaxxer.hikari.HikariDataSource +import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.StateRef import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash @@ -24,16 +27,29 @@ import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotaryError import net.corda.core.flows.StateConsumptionDetails import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.OpenFuture +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.notary.AsyncUniquenessProvider +import net.corda.core.internal.notary.AsyncUniquenessProvider.Result import net.corda.core.internal.notary.NotaryInternalException -import net.corda.core.internal.notary.UniquenessProvider +import net.corda.core.internal.notary.isConsumedByTheSameTx +import net.corda.core.internal.notary.validateTimeWindow import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize +import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor +import net.corda.core.utilities.trace import net.corda.node.services.config.MySQLConfiguration import net.corda.nodeapi.internal.serialization.CordaSerializationEncoding.SNAPPY import java.sql.* +import java.time.Clock +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit +import kotlin.concurrent.thread + /** * Uniqueness provider backed by a MySQL database. It is intended to be used with a multi-master synchronously replicated @@ -43,24 +59,26 @@ import java.util.concurrent.TimeUnit */ class MySQLUniquenessProvider( metrics: MetricRegistry, - configuration: MySQLConfiguration -) : UniquenessProvider, SingletonSerializeAsToken() { - + val clock: Clock, + val config: MySQLConfiguration +) : AsyncUniquenessProvider, SingletonSerializeAsToken() { companion object { private val log = loggerFor() - // TODO: optimize table schema for InnoDB - private val createCommittedStateTable = + private const val createCommittedStateTable = "CREATE TABLE IF NOT EXISTS notary_committed_states (" + "issue_transaction_id BINARY(32) NOT NULL," + "issue_transaction_output_id INT UNSIGNED NOT NULL," + "consuming_transaction_id BINARY(32) NOT NULL," + "CONSTRAINT id PRIMARY KEY (issue_transaction_id, issue_transaction_output_id)" + ")" - private val insertStateStatement = "INSERT INTO notary_committed_states (issue_transaction_id, issue_transaction_output_id, consuming_transaction_id) VALUES (?, ?, ?)" - private val findStatement = "SELECT consuming_transaction_id FROM notary_committed_states WHERE issue_transaction_id = ? AND issue_transaction_output_id = ?" + private const val insertStateStatement = "INSERT INTO notary_committed_states (issue_transaction_id, issue_transaction_output_id, consuming_transaction_id) VALUES (?, ?, ?)" + private const val findStateStatement = "SELECT consuming_transaction_id, issue_transaction_id, issue_transaction_output_id " + + "FROM notary_committed_states " + + "WHERE (issue_transaction_id = ? AND issue_transaction_output_id = ?)" + private const val findClause = "OR (issue_transaction_id = ? AND issue_transaction_output_id = ?)" - private val createRequestLogTable = + private const val createRequestLogTable = "CREATE TABLE IF NOT EXISTS notary_request_log (" + "consuming_transaction_id BINARY(32) NOT NULL," + "requesting_party_name TEXT NOT NULL," + @@ -69,12 +87,27 @@ class MySQLUniquenessProvider( "request_id INT UNSIGNED NOT NULL AUTO_INCREMENT," + "CONSTRAINT rid PRIMARY KEY (request_id)" + ")" - private val insertRequestStatement = "INSERT INTO notary_request_log (consuming_transaction_id, requesting_party_name, request_signature) VALUES (?, ?, ?)" + private const val insertRequestStatement = "INSERT INTO notary_request_log (consuming_transaction_id, requesting_party_name, request_signature) VALUES (?, ?, ?)" + + /** The maximum number of attempts to retry a database operation. */ + private const val maxRetries = 1000 } + private data class CommitRequest( + val states: List, + val txId: SecureHash, + val callerIdentity: Party, + val requestSignature: NotarisationRequestSignature, + val timeWindow: TimeWindow?, + val id: UUID = UUID.randomUUID()) + private val metricPrefix = MySQLUniquenessProvider::class.simpleName - /** Transaction commit duration and rate metric timer */ + /** Transaction commit duration timer and TPS meter. */ private val commitTimer = metrics.timer("$metricPrefix.Commit") + /** IPS (input states per second) meter. */ + private val inputStatesMeter = metrics.meter("$metricPrefix.IPS") + /** Transaction batch commit duration and rate meter. */ + private val batchTimer = metrics.timer("$metricPrefix.BatchCommit") /** * When writing to multiple masters with Galera, transaction rollbacks may happen due to high write contention. * This is a useful heath metric. @@ -84,32 +117,313 @@ class MySQLUniquenessProvider( private val connectionExceptionCounter = metrics.counter("$metricPrefix.ConnectionException") /** Track double spend attempts. Note that this will also include notarisation retries. */ private val conflictCounter = metrics.counter("$metricPrefix.Conflicts") - /** Track the distribution of the number of input states **/ - private val nrInputStates = metrics.histogram("$metricPrefix.NumberOfInputStates") + /** Track the distribution of the number of input states. **/ + private val inputStateCount = metrics.histogram("$metricPrefix.NumberOfInputStates") - val dataSource = HikariDataSource(HikariConfig(configuration.dataSource)) - private val connectionRetries = configuration.connectionRetries + private val dataSource = HikariDataSource(HikariConfig(config.dataSource)) + private val connectionRetries = config.connectionRetries + /** Attempts to obtain a database connection with number of retries specified in [connectionRetries]. */ private val connection: Connection - get() = getConnection() + get() { + var retries = 0 + while (true) { + try { + return dataSource.connection + } catch (e: SQLTransientConnectionException) { + if (retries == connectionRetries) { + log.warn("Couldn't obtain connection with $retries retries, giving up", e) + throw e + } + retries++ + connectionExceptionCounter.inc() + log.warn("Error trying to obtain a database connection, retrying. Attempts: $retries") + val backOffDurationMs = Math.round( + config.backOffIncrement * Math.pow(config.backOffBase, retries.toDouble()) + ) + Thread.sleep(backOffDurationMs) + } + } + } + + private val requestQueue = LinkedBlockingQueue(config.maxQueueSize) + private val requestFutures = ConcurrentHashMap>() + + /** Track the request queue size. */ + private val queueSizeGauge = metrics.register( + "$metricPrefix.RequestsQueueSize", + Gauge { requestQueue.size } + ) + /** Track the batch size. **/ + private val processedBatchSize = metrics.histogram("$metricPrefix.ProcessedBatchSize") + + /** A request processor thread. */ + private val processorThread = thread(name = "Notary request queue processor", isDaemon = true) { + try { + processRequests() + } catch (e: InterruptedException) { + } + log.debug { "Shutting down with ${requestQueue.size} in-flight requests unprocessed." } + } /** - * Attempts to obtain a database connection with number of retries specified in [connectionRetries]. - * No backoff strategy is used since it's expected that the service can immediately fail over to a different - * database server in the replicated MySQL cluster. + * Generates and adds a [CommitRequest] to the request queue. If the request queue is full, this method will block + * until space is available. + * + * Returns a future that will complete once the request is processed, containing the commit [Result]. */ - private fun getConnection(nRetries: Int = 0): Connection { - return try { - dataSource.connection - } catch (e: SQLTransientConnectionException) { - if (nRetries == connectionRetries) { - log.warn("Couldn't obtain connection with {} retries, giving up, {}", nRetries, e) - throw e - } - log.warn("Error trying to obtain a database connection, retrying", nRetries + 1) - connectionExceptionCounter.inc() - getConnection(nRetries + 1) + override fun commitAsync( + states: List, + txId: SecureHash, + callerIdentity: Party, + requestSignature: NotarisationRequestSignature, + timeWindow: TimeWindow? + ): CordaFuture { + inputStateCount.update(states.size) + val timer = Stopwatch.createStarted() + val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow) + val future = openFuture() + requestFutures[request.id] = future + future.then { + recordDuration(timer) } + requestQueue.put(request) + return future + } + + private fun recordDuration(totalTime: Stopwatch) { + totalTime.stop() + val elapsed = totalTime.elapsed(TimeUnit.MILLISECONDS) + commitTimer.update(elapsed, TimeUnit.MILLISECONDS) + } + + /** + * Processes notarisation requests in batches. It attempts to fill the batch with up to [maxBatchSize] requests, + * with a total combined number of input states no greater than [maxBatchInputStates]. + * + * If there are not enough requests to fill the batch, it will get processed after a timeout of [batchTimeoutMs]. + */ + private fun processRequests() { + val buffer = LinkedList() + while (!Thread.interrupted()) { + val drainedSize = Queues.drain(requestQueue, buffer, config.maxBatchSize, config.batchTimeoutMs, TimeUnit.MILLISECONDS) + if (drainedSize == 0) continue + processBuffer(buffer) + } + } + + /** + * Processes the request [buffer], potentially splitting it into more than one if the total number of + * inputs is over [maxBatchInputStates]. + */ + private fun processBuffer(buffer: LinkedList) { + var inputStateCount = 0 + val batch = ArrayList() + while (buffer.isNotEmpty()) { + while (buffer.isNotEmpty() && inputStateCount + buffer.peek().states.size <= config.maxBatchInputStates) { + val request = buffer.poll() + batch.add(request) + inputStateCount += request.states.size + } + log.debug { "Processing a batch of size: ${batch.size}, input states: $inputStateCount" } + processBatch(batch) + processedBatchSize.update(batch.size) + inputStatesMeter.mark(inputStateCount.toLong()) + batch.clear() + inputStateCount = 0 + } + } + + private fun processBatch(requests: List) { + val batchTime = Stopwatch.createStarted() + try { + runWithRetry(LogRequests(requests)) + val results = runWithRetry(CommitStates(requests, clock)) + respondWithSuccess(results) + } catch (e: Exception) { + // Unhandled exception, we assume that signals a problem with the database that can't be fixed with + // a retry, such as misconfiguration. + log.error("Error notarising transactions", e) + respondWithError(requests) + } finally { + batchTime.stop() + val elapsed = batchTime.elapsed(TimeUnit.MILLISECONDS) + batchTimer.update(elapsed, TimeUnit.MILLISECONDS) + log.trace { "Processed a batch of ${requests.size} requests in $elapsed ms" } + } + } + + /** + * Completes request futures with a successful response. This will resume service flows that will generate and + * send signatures back to the request originators. + */ + private fun respondWithSuccess(results: Map) { + for ((requestId, result) in results) { + requestFutures[requestId]?.let { + it.set(result) + requestFutures.remove(requestId) + if (result is Result.Failure && result.error is NotaryError.Conflict){ + conflictCounter.inc() + } + } + } + } + + /** + * If a database exception occurred when processing the batch, propagate the error to each request. This will + * resume the service flows that will forward the error message to the request originators. + */ + private fun respondWithError(requests: List) { + for (request in requests) { + requestFutures[request.id]?.let { + it.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error.")))) + requestFutures.remove(request.id) + } + } + } + + /** Stores the notarisation requests including the request signature. */ + private class LogRequests(val requests: List) : DBOperation { + override fun execute(connection: Connection) { + // Write request signature to log + connection.prepareStatement(insertRequestStatement).apply { + requests.forEach { (_, txId, callerIdentity, requestSignature) -> + setBytes(1, txId.bytes) + setString(2, callerIdentity.name.toString()) + setBytes(3, requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(SNAPPY)).bytes) + + addBatch() + clearParameters() + } + executeBatch() + close() + } + connection.commit() + } + } + + /** + * Stores all input states that don't yet exist in the database. + * A [Result.Conflict] is created for each transaction with one or more inputs already present in the database. + */ + private class CommitStates(val requests: List, val clock: Clock) : DBOperation> { + override fun execute(connection: Connection): Map { + val results = mutableMapOf() + + val allStates = requests.flatMap { it.states } + val allConflicts = findAlreadyCommitted(connection, allStates).toMutableMap() + val toCommit = mutableListOf() + requests.forEach { request -> + val conflicts = allConflicts.filter { it.key in request.states } + + results[request.id] = if (conflicts.isNotEmpty()) { + if (isConsumedByTheSameTx(request.txId.sha256(), conflicts)) { + Result.Success + } else { + Result.Failure(NotaryError.Conflict(request.txId, conflicts)) + } + } else { + val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow) + if (outsideTimeWindowError == null) { + toCommit.add(request) + // Mark states as consumed to capture conflicting transactions in the same batch + request.states.forEach { + allConflicts[it] = StateConsumptionDetails(request.txId.sha256()) + } + Result.Success + } else { + Result.Failure(outsideTimeWindowError) + } + } + } + + connection.prepareStatement(insertStateStatement).apply { + toCommit.forEach { (states, txId, _, _) -> + states.forEach { stateRef -> + // StateRef + setBytes(1, stateRef.txhash.bytes) + setInt(2, stateRef.index) + // Consuming transaction + setBytes(3, txId.bytes) + addBatch() + clearParameters() + } + } + executeBatch() + close() + } + connection.commit() + return results + } + + private fun findAlreadyCommitted(connection: Connection, states: List): Map { + val queryString = buildQueryString(states.size) + val preparedStatement = connection.prepareStatement(queryString).apply { + var parameterIndex = 0 + states.forEach { (txId, index) -> + setBytes(++parameterIndex, txId.bytes) + setInt(++parameterIndex, index) + } + + } + val resultSet = preparedStatement.executeQuery() + val committedStates = mutableMapOf() + while (resultSet.next()) { + val consumingTxId = SecureHash.SHA256(resultSet.getBytes(1)) + val stateRef = StateRef(SecureHash.SHA256(resultSet.getBytes(2)), resultSet.getInt(3)) + committedStates[stateRef] = StateConsumptionDetails(consumingTxId.sha256()) + } + preparedStatement.close() + return committedStates + } + + private fun buildQueryString(stateCount: Int): String { + val queryStringBuilder = StringBuilder(findStateStatement) + (1 until stateCount).forEach { queryStringBuilder.append(findClause) } + return queryStringBuilder.toString() + } + } + + /** An interface for custom database operations. */ + private interface DBOperation { + fun execute(connection: Connection): T + } + + /** Runs the provided [operation], retrying on transient database errors. */ + private fun runWithRetry(operation: DBOperation): T { + var retryCount = 0 + while (retryCount < maxRetries) { + connection.use { + sameConnection@ while (retryCount < maxRetries) { + retryCount++ + try { + return operation.execute(it) + } catch (e: Exception) { + when (e) { + is BatchUpdateException, // Occurs when a competing transaction commits conflicting input states + is MySQLTransactionRollbackException -> { + log.warn("Database transaction conflict, retrying", e) + it.rollback() + rollbackCounter.inc() + continue@sameConnection // Retrying using the same connection + } + is SQLRecoverableException, is CommunicationsException, // Occurs when an issue is encountered during execute() (e.g. connection lost) + is SQLNonTransientConnectionException -> { // Occurs when an issue is encountered during commit() (e.g. connection lost) + log.warn("Lost connection to the database, retrying", e) + break@sameConnection // Retrying using a new connection + // TODO: don't reinsert notarisation request on retry + } + else -> { + log.warn("Unexpected error occurred, attempting to rollback", e) + it.rollback() + throw e + } + } + } + } + } + } + throw IllegalStateException("Database operation reached the maximum number of retries: $retryCount, something went wrong.") } fun createTable() { @@ -123,107 +437,6 @@ class MySQLUniquenessProvider( fun stop() { dataSource.close() - } - - override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?) { - val s = Stopwatch.createStarted() - try { - runWithRetry(CommitAll(states, txId, callerIdentity, requestSignature)) - nrInputStates.update(states.size) - } catch (e: BatchUpdateException) { - log.info("Unable to commit input states, finding conflicts, txId: $txId", e) - // TODO: do not increment the conflict counter if the conflict was caused by the service retrying - // db transaction. E.g. when failing over to a different MySQL server. - conflictCounter.inc() - runWithRetry(FindConflicts(txId, states)) - } finally { - val dt = s.stop().elapsed(TimeUnit.MILLISECONDS) - commitTimer.update(dt, TimeUnit.MILLISECONDS) - log.info("Processed notarisation request, txId: $txId, nrInputStates: ${states.size}, dt: $dt") - } - } - - private fun runWithRetry(action: DBTransaction) { - connection.use { - loop@ while (true) { - try { - action.run(it) - break - } catch (e: Exception) { - when (e) { - is MySQLTransactionRollbackException -> { - log.warn("Rollback exception occurred, retrying", e) - rollbackCounter.inc() - continue@loop // Retrying using the same connection - } - is SQLRecoverableException, is CommunicationsException, // Occurs when an issue is encountered during execute() (e.g. connection lost) - is SQLNonTransientConnectionException -> { // Occurs when an issue is encountered during commit() (e.g. connection lost) - log.warn("Lost connection to the database, retrying", e) - runWithRetry(action) // Retrying using a new connection - // TODO: don't reinsert notarisation request on retry - break@loop - } - else -> { - log.warn("Unexpected error occurred, attempting to rollback", e) - it.rollback() - throw e - } - } - } - } - } - } - - interface DBTransaction { - fun run(conn: Connection) - } - - private class CommitAll(val states: List, val txId: SecureHash, val callerIdentity: Party, val requestSignature: NotarisationRequestSignature) : DBTransaction { - override fun run(conn: Connection) { - conn.prepareStatement(insertRequestStatement).apply { - setBytes(1, txId.bytes) - setString(2, callerIdentity.name.toString()) - setBytes(3, requestSignature.serialize(context = SerializationDefaults.STORAGE_CONTEXT.withEncoding(SNAPPY)).bytes) - execute() - close() - } - // We commit here, since we want to make sure the notarisation request insertion - // doesn't get rolled back in case of a conflict when committing inputs - conn.commit() - conn.setSavepoint() - conn.prepareStatement(insertStateStatement).apply { - states.forEach { stateRef -> - // StateRef - setBytes(1, stateRef.txhash.bytes) - setInt(2, stateRef.index) - // Consuming transaction - setBytes(3, txId.bytes) - addBatch() - clearParameters() - } - executeBatch() - close() - } - conn.commit() - } - } - - private class FindConflicts(val txId: SecureHash, val states: List) : DBTransaction { - override fun run(conn: Connection) { - val conflicts = mutableMapOf() - states.forEach { - val st = conn.prepareStatement(findStatement).apply { - setBytes(1, it.txhash.bytes) - setInt(2, it.index) - } - val result = st.executeQuery() - if (result.next()) { - val consumingTxId = SecureHash.SHA256(result.getBytes(1)) - conflicts[it] = StateConsumptionDetails(consumingTxId.sha256()) - } - } - conn.commit() - if (conflicts.isNotEmpty()) throw NotaryInternalException(NotaryError.Conflict(txId, conflicts)) - } + processorThread.interrupt() } } \ No newline at end of file