From e486c8b3923fcba582a353fa9c3f5c3422583335 Mon Sep 17 00:00:00 2001 From: Thomas Schroeter Date: Wed, 21 Nov 2018 17:58:57 +0000 Subject: [PATCH] ENT-2358 Handle reference states in the MySQLUniquenessProvider (#1516) * Handle reference states in the MySQLUniquenessProvider * fix test --- .../net/corda/core/flows/NotaryError.kt | 2 +- .../notary/mysql/MySQLUniquenessProvider.kt | 87 +++++++++++--- .../notary/mysql/MySQLNotaryServiceTests.kt | 109 +++++++++++++++++- .../mysql/MySQLUniquenessProviderTest.kt | 72 ++++++++++++ 4 files changed, 247 insertions(+), 23 deletions(-) create mode 100644 notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLUniquenessProviderTest.kt diff --git a/core/src/main/kotlin/net/corda/core/flows/NotaryError.kt b/core/src/main/kotlin/net/corda/core/flows/NotaryError.kt index 6a41d8f80e..807d8a6908 100644 --- a/core/src/main/kotlin/net/corda/core/flows/NotaryError.kt +++ b/core/src/main/kotlin/net/corda/core/flows/NotaryError.kt @@ -27,7 +27,7 @@ sealed class NotaryError { /** Specifies which states have already been consumed in another transaction. */ val consumedStates: Map ) : NotaryError() { - override fun toString() = "One or more input states have already been used in other transactions. Conflicting state count: ${consumedStates.size}, consumption details:\n" + + override fun toString() = "One or more input states or referenced states have already been used as input states in other transactions. Conflicting state count: ${consumedStates.size}, consumption details:\n" + "${consumedStates.asSequence().joinToString(",\n", limit = 5) { it.key.toString() + " -> " + it.value }}.\n" + "To find out if any of the conflicting transactions have been generated by this node you can use the hashLookup Corda shell command." } diff --git a/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLUniquenessProvider.kt b/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLUniquenessProvider.kt index 445540ec5d..cfbfca7ad9 100644 --- a/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLUniquenessProvider.kt +++ b/notary/mysql/src/main/kotlin/net/corda/notary/mysql/MySQLUniquenessProvider.kt @@ -2,6 +2,7 @@ package net.corda.notary.mysql import com.codahale.metrics.Gauge import com.codahale.metrics.MetricRegistry +import com.google.common.annotations.VisibleForTesting import com.google.common.base.Stopwatch import com.google.common.collect.Queues import com.mysql.cj.jdbc.exceptions.CommunicationsException @@ -53,7 +54,7 @@ class MySQLUniquenessProvider( companion object { private val log = loggerFor() // TODO: optimize table schema for InnoDB - private const val createCommittedStateTable = + const val createCommittedStateTable = "CREATE TABLE IF NOT EXISTS notary_committed_states (" + "issue_transaction_id BINARY(32) NOT NULL," + "issue_transaction_output_id INT UNSIGNED NOT NULL," + @@ -75,13 +76,25 @@ class MySQLUniquenessProvider( "request_id INT UNSIGNED NOT NULL AUTO_INCREMENT," + "CONSTRAINT rid PRIMARY KEY (request_id)" + ")" + + private const val createCommittedTransactionsTable = + "CREATE TABLE IF NOT EXISTS notary_committed_transactions (" + + "transaction_id BINARY(32) NOT NULL," + + "CONSTRAINT tid PRIMARY KEY (transaction_id)" + + ")" + private const val insertRequestStatement = "INSERT INTO notary_request_log (consuming_transaction_id, requesting_party_name, request_signature) VALUES (?, ?, ?)" + private const val insertCommittedTransactionStatement = "INSERT INTO notary_committed_transactions (transaction_id) VALUES (?)" + + private const val selectCommittedTransactionStatement = "SELECT COUNT(transaction_id) FROM notary_committed_transactions WHERE transaction_id = ?" + /** The maximum number of attempts to retry a database operation. */ private const val maxRetries = 1000 } - private data class CommitRequest( + @VisibleForTesting + data class CommitRequest( val states: List, val txId: SecureHash, val callerIdentity: Party, @@ -113,7 +126,8 @@ class MySQLUniquenessProvider( private val connectionRetries = config.connectionRetries /** Attempts to obtain a database connection with number of retries specified in [connectionRetries]. */ - private val connection: Connection + @VisibleForTesting + val connection: Connection get() { var retries = 0 while (true) { @@ -296,26 +310,40 @@ class MySQLUniquenessProvider( * Stores all input states that don't yet exist in the database. * A [Result.Conflict] is created for each transaction with one or more inputs already present in the database. */ - private class CommitStates(val requests: List, val clock: Clock) : DBOperation> { + @VisibleForTesting + class CommitStates(val requests: List, val clock: Clock) : DBOperation> { override fun execute(connection: Connection): Map { val results = mutableMapOf() - val allStates = requests.flatMap { it.states } - val allConflicts = findAlreadyCommitted(connection, allStates).toMutableMap() + val allStates = requests.flatMap { it.states }.toSet() + val allReferences = requests.flatMap { it.references }.toSet() + val allConflicts = findAlreadyCommitted(connection, allStates, allReferences).toMutableMap() val toCommit = mutableListOf() + requests.forEach { request -> - val conflicts = allConflicts.filter { it.key in request.states } + val referenceStateConflicts = allConflicts.keys.intersect(request.references.toSet()).map { it to allConflicts[it]!! }.toMap() + val inputStateConflicts = allConflicts.keys.intersect(request.states.toSet()).map { it to allConflicts[it]!! }.toMap() + val conflicts = referenceStateConflicts + inputStateConflicts results[request.id] = if (conflicts.isNotEmpty()) { - if (isConsumedByTheSameTx(request.txId.sha256(), conflicts)) { + if (request.states.isEmpty() && referenceStateConflicts.isNotEmpty()) { + if (isPreviouslySigned(connection, request.txId)) { + Result.Success + } else { + Result.Failure(NotaryError.Conflict(request.txId, conflicts)) + } + } else if (inputStateConflicts.isNotEmpty() && isConsumedByTheSameTx(request.txId.sha256(), inputStateConflicts)) { Result.Success } else { Result.Failure(NotaryError.Conflict(request.txId, conflicts)) } } else { val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow) - if (outsideTimeWindowError == null) { - toCommit.add(request) + val preSigned = outsideTimeWindowError != null && request.states.isEmpty() && isPreviouslySigned(connection, request.txId) + if (outsideTimeWindowError == null || preSigned) { + if (!preSigned) { + toCommit.add(request) + } // Mark states as consumed to capture conflicting transactions in the same batch request.states.forEach { allConflicts[it] = StateConsumptionDetails(request.txId.sha256()) @@ -342,29 +370,55 @@ class MySQLUniquenessProvider( executeBatch() close() } + + connection.prepareStatement(insertCommittedTransactionStatement).apply { + toCommit.forEach { (_, txId, _, _) -> + setBytes(1, txId.bytes) + addBatch() + clearParameters() + } + executeBatch() + close() + } connection.commit() return results } - private fun findAlreadyCommitted(connection: Connection, states: List): Map { - if (states.isEmpty()) { + private fun isPreviouslySigned(connection: Connection, txId: SecureHash): Boolean { + val preparedStatement = connection.prepareStatement(selectCommittedTransactionStatement) + preparedStatement.setBytes(1, txId.bytes) + val resultSet = preparedStatement.executeQuery() + val nrRecords = if (resultSet.next()) { + resultSet.getInt(1) + } else { + 0 + } + preparedStatement.close() + return nrRecords == 1 + } + + private fun findAlreadyCommitted(connection: Connection, states: Set, references: Set): Map { + if (states.isEmpty() && references.isEmpty()) { return emptyMap() } - val queryString = buildQueryString(states.size) + val queryString = buildQueryString((states + references).size) val preparedStatement = connection.prepareStatement(queryString).apply { var parameterIndex = 0 - states.forEach { (txId, index) -> + (states + references).forEach { (txId, index) -> setBytes(++parameterIndex, txId.bytes) setInt(++parameterIndex, index) } - } val resultSet = preparedStatement.executeQuery() val committedStates = mutableMapOf() while (resultSet.next()) { val consumingTxId = SecureHash.SHA256(resultSet.getBytes(1)) val stateRef = StateRef(SecureHash.SHA256(resultSet.getBytes(2)), resultSet.getInt(3)) - committedStates[stateRef] = StateConsumptionDetails(consumingTxId.sha256()) + committedStates[stateRef] = if (stateRef in references) { + StateConsumptionDetails(consumingTxId.sha256(), StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE) + } else { + StateConsumptionDetails(consumingTxId.sha256()) + } } preparedStatement.close() return committedStates @@ -424,6 +478,7 @@ class MySQLUniquenessProvider( connection.use { it.createStatement().execute(createCommittedStateTable) it.createStatement().execute(createRequestLogTable) + it.createStatement().execute(createCommittedTransactionsTable) it.commit() } } diff --git a/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLNotaryServiceTests.kt b/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLNotaryServiceTests.kt index f93acafd36..5c7de929bd 100644 --- a/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLNotaryServiceTests.kt +++ b/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLNotaryServiceTests.kt @@ -8,6 +8,7 @@ import net.corda.client.mock.Generator import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionState import net.corda.core.crypto.SecureHash import net.corda.core.crypto.TransactionSignature import net.corda.core.crypto.generateKeyPair @@ -67,7 +68,7 @@ class MySQLNotaryServiceTests : IntegrationTest() { fun before() { mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts", "net.corda.notary.mysql"), threadPerNode = true) notaryParty = DevIdentityGenerator.generateDistributedNotarySingularIdentity(listOf(mockNet.baseDirectory(mockNet.nextNodeId)), notaryName) - val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryParty, true)))) + val networkParameters = NetworkParametersCopier(testNetworkParameters(notaries = listOf(NotaryInfo(notaryParty, true)), minimumPlatformVersion = 4)) val notaryNodeUnstarted = createNotaryNode() val nodeUnstarted = mockNet.createUnstartedNode() val startedNodes = listOf(notaryNodeUnstarted, nodeUnstarted).map { n -> @@ -107,12 +108,88 @@ class MySQLNotaryServiceTests : IntegrationTest() { assertEquals(error.txId, secondSpendTx.id) } + @Test + fun `handles reference states`() { + val inputState1 = issueState(node, notaryParty) + val inputState2 = issueState(node, notaryParty) + val outputState = TransactionState(DummyContract.SingleOwnerState(Random().nextInt(), node.services.myInfo.singleIdentity()), DummyContract.PROGRAM_ID, notaryParty) + + val firstTxBuilder = TransactionBuilder(notaryParty) + .addInputState(inputState1) + .addReferenceState(inputState2.referenced()) + .addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey)) + val firstSpendTx = node.services.signInitialTransaction(firstTxBuilder) + val firstSpend = node.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture + firstSpend.getOrThrow() + + val secondTxBuilder = TransactionBuilder(notaryParty) + .addInputState(inputState2) + .addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey)) + val secondSpendTx = node.services.signInitialTransaction(secondTxBuilder) + val secondSpend = node.services.startFlow(NotaryFlow.Client(secondSpendTx)).resultFuture + secondSpend.getOrThrow() + + val firstSpendReferenceInvalidButIdempotent = node.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture + firstSpendReferenceInvalidButIdempotent.getOrThrow() + + val thirdSpendTxBuilder = TransactionBuilder(notaryParty) + .addOutputState(outputState) + .addReferenceState(inputState2.referenced()) + .addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey)) + val thirdSpendTx = node.services.signInitialTransaction(thirdSpendTxBuilder) + val thirdSpend = node.services.startFlow(NotaryFlow.Client(thirdSpendTx)).resultFuture + + val ex = assertFailsWith(NotaryException::class) { thirdSpend.getOrThrow() } + val error = ex.error as NotaryError.Conflict + assertEquals(error.txId, thirdSpendTx.id) + assertEquals(error.consumedStates[inputState2.ref]!!.type, StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE) + } + + @Test + fun `handles transactions with only reference states`() { + val outputState = TransactionState(DummyContract.SingleOwnerState(Random().nextInt(), node.services.myInfo.singleIdentity()), DummyContract.PROGRAM_ID, notaryParty) + val inputState1 = issueState(node, notaryParty) + val inputState2 = issueState(node, notaryParty) + + val firstTxBuilder = TransactionBuilder(notaryParty) + .addOutputState(outputState) + .addReferenceState(inputState1.referenced()) + .addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey)) + val firstSpendTx = node.services.signInitialTransaction(firstTxBuilder) + val firstSpend = node.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture + firstSpend.getOrThrow() + + val secondTxBuilder = TransactionBuilder(notaryParty) + .addInputState(inputState1) + .addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey)) + val secondSpendTx = node.services.signInitialTransaction(secondTxBuilder) + val secondSpend = node.services.startFlow(NotaryFlow.Client(secondSpendTx)).resultFuture + secondSpend.getOrThrow() + + val firstSpendSecondNotarisation = node.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture + firstSpendSecondNotarisation.getOrThrow() + + val thirdTxBuilder = TransactionBuilder(notaryParty) + .addInputState(inputState2) + .addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey)) + .addReferenceState(inputState1.referenced()) + val thirdSpendTx = node.services.signInitialTransaction(thirdTxBuilder) + val thirdSpendNotarisation = node.services.startFlow(NotaryFlow.Client(thirdSpendTx)).resultFuture + val ex = assertFailsWith(NotaryException::class) { thirdSpendNotarisation.getOrThrow() } + val error = ex.error as NotaryError.Conflict + assertEquals(error.txId, thirdSpendTx.id) + assertEquals(error.consumedStates[inputState1.ref]!!.type, StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE) + } + + @Test fun `notarisations are idempotent`() { val inputState = issueState(node, notaryParty) + val referenceState = issueState(node, notaryParty) val txBuilder = TransactionBuilder(notaryParty) .addInputState(inputState) + .addReferenceState(referenceState.referenced()) .addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey)) val spendTx = node.services.signInitialTransaction(txBuilder) @@ -162,6 +239,30 @@ class MySQLNotaryServiceTests : IntegrationTest() { assertEquals(sig2.by, notaryParty.owningKey) } + @Test + fun `should re-sign a transaction with an expired time-window without input states`() { + val stx = run { + val outputState = issueState(node, notaryParty) + val tx = TransactionBuilder(notaryParty) + .addOutputState(outputState.state) + .addCommand(dummyCommand(nodeParty.owningKey)) + .setTimeWindow(node.services.clock.instant(), 30.seconds) + node.services.signInitialTransaction(tx) + } + + val sig1 = node.services.startFlow(NotaryFlow.Client(stx)).resultFuture.get().first() + assertEquals(sig1.by, notaryParty.owningKey) + assertTrue(sig1.isValid(stx.id)) + + mockNet.nodes.forEach { + val nodeClock = (it.started!!.services.clock as TestClock) + nodeClock.advanceBy(Duration.ofDays(1)) + } + + val sig2 = node.services.startFlow(NotaryFlow.Client(stx)).resultFuture.get().first() + assertEquals(sig2.by, notaryParty.owningKey) + } + @Test fun `should report error for transaction with an invalid time-window`() { val stx = run { @@ -212,7 +313,6 @@ class MySQLNotaryServiceTests : IntegrationTest() { @Suspendable override fun call(): List { val futures = mutableListOf>() - var requestSignature: NotarisationRequestSignature? = null for (i in 1..transactionCount) { val txId: SecureHash = txIdGenerator.generateOrFail(random) val callerParty = partyGenerator.generateOrFail(random) @@ -222,15 +322,12 @@ class MySQLNotaryServiceTests : IntegrationTest() { Generator.replicate(inputStateCount, stateRefGenerator) } val inputs = inputGenerator.generateOrFail(random) - if (requestSignature == null || random.nextInt(10) < 2) { - requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub) - } futures += SinglePartyNotaryService.CommitOperation( service, inputs, txId, callerParty, - requestSignature, + NotarisationRequest(inputs, txId).generateSignature(serviceHub), null, emptyList()).execute("") } diff --git a/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLUniquenessProviderTest.kt b/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLUniquenessProviderTest.kt new file mode 100644 index 0000000000..888ad4420a --- /dev/null +++ b/notary/mysql/src/test/kotlin/net/corda/notary/mysql/MySQLUniquenessProviderTest.kt @@ -0,0 +1,72 @@ +package net.corda.notary.mysql + +import com.codahale.metrics.MetricRegistry +import com.typesafe.config.ConfigFactory +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.DigitalSignature +import net.corda.core.crypto.NullKeys +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.generateKeyPair +import net.corda.core.flows.NotarisationRequestSignature +import net.corda.core.internal.notary.UniquenessProvider +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.getTestPartyAndCertificate +import net.corda.testing.internal.IntegrationTest +import net.corda.testing.node.internal.makeInternalTestDataSourceProperties +import org.hamcrest.CoreMatchers.instanceOf +import org.junit.After +import org.junit.Assert.assertThat +import org.junit.Before +import org.junit.Test +import java.time.Clock +import java.util.* + +class MySQLUniquenessProviderTest : IntegrationTest() { + + val dataStoreProperties = makeInternalTestDataSourceProperties(configSupplier = { ConfigFactory.empty() }).apply { + setProperty("autoCommit", "false") + } + + val config = MySQLNotaryConfiguration(dataStoreProperties, maxBatchSize = 10, maxBatchInputStates = 100) + val clock = Clock.systemUTC() + + val party = getTestPartyAndCertificate(ALICE_NAME, generateKeyPair().public).party + + val uniquenessProvider = MySQLUniquenessProvider(MetricRegistry(), clock, config) + + @Before + fun before() { + uniquenessProvider.createTable() + } + + @After + fun after() { + uniquenessProvider.stop() + } + + @Test + fun `intra-batch conflict between reference state and input state is detected`() { + val inputs = listOf(StateRef(SecureHash.randomSHA256(), 0)) + val request = MySQLUniquenessProvider.CommitRequest( + inputs, + SecureHash.randomSHA256(), + party, + NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0), + null, + emptyList(), + UUID.randomUUID() + ) + val conflictingRequest = MySQLUniquenessProvider.CommitRequest( + listOf(StateRef(SecureHash.randomSHA256(), 0)), + SecureHash.randomSHA256(), + party, + NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0), + null, + request.states, + UUID.randomUUID() + ) + val results = MySQLUniquenessProvider.CommitStates(listOf(request, conflictingRequest), clock).execute(uniquenessProvider.connection) + assertThat(results[request.id], instanceOf(UniquenessProvider.Result.Success::class.java)) + assertThat(results[conflictingRequest.id], instanceOf(UniquenessProvider.Result.Failure::class.java)) + } +}