mirror of
https://github.com/corda/corda.git
synced 2025-03-15 00:36:49 +00:00
ENT-2358 Handle reference states in the MySQLUniquenessProvider (#1516)
* Handle reference states in the MySQLUniquenessProvider * fix test
This commit is contained in:
parent
1d511269f6
commit
e486c8b392
@ -27,7 +27,7 @@ sealed class NotaryError {
|
||||
/** Specifies which states have already been consumed in another transaction. */
|
||||
val consumedStates: Map<StateRef, StateConsumptionDetails>
|
||||
) : NotaryError() {
|
||||
override fun toString() = "One or more input states have already been used in other transactions. Conflicting state count: ${consumedStates.size}, consumption details:\n" +
|
||||
override fun toString() = "One or more input states or referenced states have already been used as input states in other transactions. Conflicting state count: ${consumedStates.size}, consumption details:\n" +
|
||||
"${consumedStates.asSequence().joinToString(",\n", limit = 5) { it.key.toString() + " -> " + it.value }}.\n" +
|
||||
"To find out if any of the conflicting transactions have been generated by this node you can use the hashLookup Corda shell command."
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package net.corda.notary.mysql
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import com.google.common.base.Stopwatch
|
||||
import com.google.common.collect.Queues
|
||||
import com.mysql.cj.jdbc.exceptions.CommunicationsException
|
||||
@ -53,7 +54,7 @@ class MySQLUniquenessProvider(
|
||||
companion object {
|
||||
private val log = loggerFor<MySQLUniquenessProvider>()
|
||||
// TODO: optimize table schema for InnoDB
|
||||
private const val createCommittedStateTable =
|
||||
const val createCommittedStateTable =
|
||||
"CREATE TABLE IF NOT EXISTS notary_committed_states (" +
|
||||
"issue_transaction_id BINARY(32) NOT NULL," +
|
||||
"issue_transaction_output_id INT UNSIGNED NOT NULL," +
|
||||
@ -75,13 +76,25 @@ class MySQLUniquenessProvider(
|
||||
"request_id INT UNSIGNED NOT NULL AUTO_INCREMENT," +
|
||||
"CONSTRAINT rid PRIMARY KEY (request_id)" +
|
||||
")"
|
||||
|
||||
private const val createCommittedTransactionsTable =
|
||||
"CREATE TABLE IF NOT EXISTS notary_committed_transactions (" +
|
||||
"transaction_id BINARY(32) NOT NULL," +
|
||||
"CONSTRAINT tid PRIMARY KEY (transaction_id)" +
|
||||
")"
|
||||
|
||||
private const val insertRequestStatement = "INSERT INTO notary_request_log (consuming_transaction_id, requesting_party_name, request_signature) VALUES (?, ?, ?)"
|
||||
|
||||
private const val insertCommittedTransactionStatement = "INSERT INTO notary_committed_transactions (transaction_id) VALUES (?)"
|
||||
|
||||
private const val selectCommittedTransactionStatement = "SELECT COUNT(transaction_id) FROM notary_committed_transactions WHERE transaction_id = ?"
|
||||
|
||||
/** The maximum number of attempts to retry a database operation. */
|
||||
private const val maxRetries = 1000
|
||||
}
|
||||
|
||||
private data class CommitRequest(
|
||||
@VisibleForTesting
|
||||
data class CommitRequest(
|
||||
val states: List<StateRef>,
|
||||
val txId: SecureHash,
|
||||
val callerIdentity: Party,
|
||||
@ -113,7 +126,8 @@ class MySQLUniquenessProvider(
|
||||
private val connectionRetries = config.connectionRetries
|
||||
|
||||
/** Attempts to obtain a database connection with number of retries specified in [connectionRetries]. */
|
||||
private val connection: Connection
|
||||
@VisibleForTesting
|
||||
val connection: Connection
|
||||
get() {
|
||||
var retries = 0
|
||||
while (true) {
|
||||
@ -296,26 +310,40 @@ class MySQLUniquenessProvider(
|
||||
* Stores all input states that don't yet exist in the database.
|
||||
* A [Result.Conflict] is created for each transaction with one or more inputs already present in the database.
|
||||
*/
|
||||
private class CommitStates(val requests: List<CommitRequest>, val clock: Clock) : DBOperation<Map<UUID, Result>> {
|
||||
@VisibleForTesting
|
||||
class CommitStates(val requests: List<CommitRequest>, val clock: Clock) : DBOperation<Map<UUID, Result>> {
|
||||
override fun execute(connection: Connection): Map<UUID, Result> {
|
||||
val results = mutableMapOf<UUID, Result>()
|
||||
|
||||
val allStates = requests.flatMap { it.states }
|
||||
val allConflicts = findAlreadyCommitted(connection, allStates).toMutableMap()
|
||||
val allStates = requests.flatMap { it.states }.toSet()
|
||||
val allReferences = requests.flatMap { it.references }.toSet()
|
||||
val allConflicts = findAlreadyCommitted(connection, allStates, allReferences).toMutableMap()
|
||||
val toCommit = mutableListOf<CommitRequest>()
|
||||
|
||||
requests.forEach { request ->
|
||||
val conflicts = allConflicts.filter { it.key in request.states }
|
||||
val referenceStateConflicts = allConflicts.keys.intersect(request.references.toSet()).map { it to allConflicts[it]!! }.toMap()
|
||||
val inputStateConflicts = allConflicts.keys.intersect(request.states.toSet()).map { it to allConflicts[it]!! }.toMap()
|
||||
val conflicts = referenceStateConflicts + inputStateConflicts
|
||||
|
||||
results[request.id] = if (conflicts.isNotEmpty()) {
|
||||
if (isConsumedByTheSameTx(request.txId.sha256(), conflicts)) {
|
||||
if (request.states.isEmpty() && referenceStateConflicts.isNotEmpty()) {
|
||||
if (isPreviouslySigned(connection, request.txId)) {
|
||||
Result.Success
|
||||
} else {
|
||||
Result.Failure(NotaryError.Conflict(request.txId, conflicts))
|
||||
}
|
||||
} else if (inputStateConflicts.isNotEmpty() && isConsumedByTheSameTx(request.txId.sha256(), inputStateConflicts)) {
|
||||
Result.Success
|
||||
} else {
|
||||
Result.Failure(NotaryError.Conflict(request.txId, conflicts))
|
||||
}
|
||||
} else {
|
||||
val outsideTimeWindowError = validateTimeWindow(clock.instant(), request.timeWindow)
|
||||
if (outsideTimeWindowError == null) {
|
||||
toCommit.add(request)
|
||||
val preSigned = outsideTimeWindowError != null && request.states.isEmpty() && isPreviouslySigned(connection, request.txId)
|
||||
if (outsideTimeWindowError == null || preSigned) {
|
||||
if (!preSigned) {
|
||||
toCommit.add(request)
|
||||
}
|
||||
// Mark states as consumed to capture conflicting transactions in the same batch
|
||||
request.states.forEach {
|
||||
allConflicts[it] = StateConsumptionDetails(request.txId.sha256())
|
||||
@ -342,29 +370,55 @@ class MySQLUniquenessProvider(
|
||||
executeBatch()
|
||||
close()
|
||||
}
|
||||
|
||||
connection.prepareStatement(insertCommittedTransactionStatement).apply {
|
||||
toCommit.forEach { (_, txId, _, _) ->
|
||||
setBytes(1, txId.bytes)
|
||||
addBatch()
|
||||
clearParameters()
|
||||
}
|
||||
executeBatch()
|
||||
close()
|
||||
}
|
||||
connection.commit()
|
||||
return results
|
||||
}
|
||||
|
||||
private fun findAlreadyCommitted(connection: Connection, states: List<StateRef>): Map<StateRef, StateConsumptionDetails> {
|
||||
if (states.isEmpty()) {
|
||||
private fun isPreviouslySigned(connection: Connection, txId: SecureHash): Boolean {
|
||||
val preparedStatement = connection.prepareStatement(selectCommittedTransactionStatement)
|
||||
preparedStatement.setBytes(1, txId.bytes)
|
||||
val resultSet = preparedStatement.executeQuery()
|
||||
val nrRecords = if (resultSet.next()) {
|
||||
resultSet.getInt(1)
|
||||
} else {
|
||||
0
|
||||
}
|
||||
preparedStatement.close()
|
||||
return nrRecords == 1
|
||||
}
|
||||
|
||||
private fun findAlreadyCommitted(connection: Connection, states: Set<StateRef>, references: Set<StateRef>): Map<StateRef, StateConsumptionDetails> {
|
||||
if (states.isEmpty() && references.isEmpty()) {
|
||||
return emptyMap()
|
||||
}
|
||||
val queryString = buildQueryString(states.size)
|
||||
val queryString = buildQueryString((states + references).size)
|
||||
val preparedStatement = connection.prepareStatement(queryString).apply {
|
||||
var parameterIndex = 0
|
||||
states.forEach { (txId, index) ->
|
||||
(states + references).forEach { (txId, index) ->
|
||||
setBytes(++parameterIndex, txId.bytes)
|
||||
setInt(++parameterIndex, index)
|
||||
}
|
||||
|
||||
}
|
||||
val resultSet = preparedStatement.executeQuery()
|
||||
val committedStates = mutableMapOf<StateRef, StateConsumptionDetails>()
|
||||
while (resultSet.next()) {
|
||||
val consumingTxId = SecureHash.SHA256(resultSet.getBytes(1))
|
||||
val stateRef = StateRef(SecureHash.SHA256(resultSet.getBytes(2)), resultSet.getInt(3))
|
||||
committedStates[stateRef] = StateConsumptionDetails(consumingTxId.sha256())
|
||||
committedStates[stateRef] = if (stateRef in references) {
|
||||
StateConsumptionDetails(consumingTxId.sha256(), StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE)
|
||||
} else {
|
||||
StateConsumptionDetails(consumingTxId.sha256())
|
||||
}
|
||||
}
|
||||
preparedStatement.close()
|
||||
return committedStates
|
||||
@ -424,6 +478,7 @@ class MySQLUniquenessProvider(
|
||||
connection.use {
|
||||
it.createStatement().execute(createCommittedStateTable)
|
||||
it.createStatement().execute(createRequestLogTable)
|
||||
it.createStatement().execute(createCommittedTransactionsTable)
|
||||
it.commit()
|
||||
}
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import net.corda.client.mock.Generator
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
@ -67,7 +68,7 @@ class MySQLNotaryServiceTests : IntegrationTest() {
|
||||
fun before() {
|
||||
mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts", "net.corda.notary.mysql"), threadPerNode = true)
|
||||
notaryParty = DevIdentityGenerator.generateDistributedNotarySingularIdentity(listOf(mockNet.baseDirectory(mockNet.nextNodeId)), notaryName)
|
||||
val networkParameters = NetworkParametersCopier(testNetworkParameters(listOf(NotaryInfo(notaryParty, true))))
|
||||
val networkParameters = NetworkParametersCopier(testNetworkParameters(notaries = listOf(NotaryInfo(notaryParty, true)), minimumPlatformVersion = 4))
|
||||
val notaryNodeUnstarted = createNotaryNode()
|
||||
val nodeUnstarted = mockNet.createUnstartedNode()
|
||||
val startedNodes = listOf(notaryNodeUnstarted, nodeUnstarted).map { n ->
|
||||
@ -107,12 +108,88 @@ class MySQLNotaryServiceTests : IntegrationTest() {
|
||||
assertEquals(error.txId, secondSpendTx.id)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `handles reference states`() {
|
||||
val inputState1 = issueState(node, notaryParty)
|
||||
val inputState2 = issueState(node, notaryParty)
|
||||
val outputState = TransactionState(DummyContract.SingleOwnerState(Random().nextInt(), node.services.myInfo.singleIdentity()), DummyContract.PROGRAM_ID, notaryParty)
|
||||
|
||||
val firstTxBuilder = TransactionBuilder(notaryParty)
|
||||
.addInputState(inputState1)
|
||||
.addReferenceState(inputState2.referenced())
|
||||
.addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey))
|
||||
val firstSpendTx = node.services.signInitialTransaction(firstTxBuilder)
|
||||
val firstSpend = node.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture
|
||||
firstSpend.getOrThrow()
|
||||
|
||||
val secondTxBuilder = TransactionBuilder(notaryParty)
|
||||
.addInputState(inputState2)
|
||||
.addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey))
|
||||
val secondSpendTx = node.services.signInitialTransaction(secondTxBuilder)
|
||||
val secondSpend = node.services.startFlow(NotaryFlow.Client(secondSpendTx)).resultFuture
|
||||
secondSpend.getOrThrow()
|
||||
|
||||
val firstSpendReferenceInvalidButIdempotent = node.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture
|
||||
firstSpendReferenceInvalidButIdempotent.getOrThrow()
|
||||
|
||||
val thirdSpendTxBuilder = TransactionBuilder(notaryParty)
|
||||
.addOutputState(outputState)
|
||||
.addReferenceState(inputState2.referenced())
|
||||
.addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey))
|
||||
val thirdSpendTx = node.services.signInitialTransaction(thirdSpendTxBuilder)
|
||||
val thirdSpend = node.services.startFlow(NotaryFlow.Client(thirdSpendTx)).resultFuture
|
||||
|
||||
val ex = assertFailsWith(NotaryException::class) { thirdSpend.getOrThrow() }
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
assertEquals(error.txId, thirdSpendTx.id)
|
||||
assertEquals(error.consumedStates[inputState2.ref]!!.type, StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `handles transactions with only reference states`() {
|
||||
val outputState = TransactionState(DummyContract.SingleOwnerState(Random().nextInt(), node.services.myInfo.singleIdentity()), DummyContract.PROGRAM_ID, notaryParty)
|
||||
val inputState1 = issueState(node, notaryParty)
|
||||
val inputState2 = issueState(node, notaryParty)
|
||||
|
||||
val firstTxBuilder = TransactionBuilder(notaryParty)
|
||||
.addOutputState(outputState)
|
||||
.addReferenceState(inputState1.referenced())
|
||||
.addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey))
|
||||
val firstSpendTx = node.services.signInitialTransaction(firstTxBuilder)
|
||||
val firstSpend = node.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture
|
||||
firstSpend.getOrThrow()
|
||||
|
||||
val secondTxBuilder = TransactionBuilder(notaryParty)
|
||||
.addInputState(inputState1)
|
||||
.addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey))
|
||||
val secondSpendTx = node.services.signInitialTransaction(secondTxBuilder)
|
||||
val secondSpend = node.services.startFlow(NotaryFlow.Client(secondSpendTx)).resultFuture
|
||||
secondSpend.getOrThrow()
|
||||
|
||||
val firstSpendSecondNotarisation = node.services.startFlow(NotaryFlow.Client(firstSpendTx)).resultFuture
|
||||
firstSpendSecondNotarisation.getOrThrow()
|
||||
|
||||
val thirdTxBuilder = TransactionBuilder(notaryParty)
|
||||
.addInputState(inputState2)
|
||||
.addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey))
|
||||
.addReferenceState(inputState1.referenced())
|
||||
val thirdSpendTx = node.services.signInitialTransaction(thirdTxBuilder)
|
||||
val thirdSpendNotarisation = node.services.startFlow(NotaryFlow.Client(thirdSpendTx)).resultFuture
|
||||
val ex = assertFailsWith(NotaryException::class) { thirdSpendNotarisation.getOrThrow() }
|
||||
val error = ex.error as NotaryError.Conflict
|
||||
assertEquals(error.txId, thirdSpendTx.id)
|
||||
assertEquals(error.consumedStates[inputState1.ref]!!.type, StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE)
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
fun `notarisations are idempotent`() {
|
||||
val inputState = issueState(node, notaryParty)
|
||||
val referenceState = issueState(node, notaryParty)
|
||||
|
||||
val txBuilder = TransactionBuilder(notaryParty)
|
||||
.addInputState(inputState)
|
||||
.addReferenceState(referenceState.referenced())
|
||||
.addCommand(dummyCommand(node.services.myInfo.singleIdentity().owningKey))
|
||||
val spendTx = node.services.signInitialTransaction(txBuilder)
|
||||
|
||||
@ -162,6 +239,30 @@ class MySQLNotaryServiceTests : IntegrationTest() {
|
||||
assertEquals(sig2.by, notaryParty.owningKey)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should re-sign a transaction with an expired time-window without input states`() {
|
||||
val stx = run {
|
||||
val outputState = issueState(node, notaryParty)
|
||||
val tx = TransactionBuilder(notaryParty)
|
||||
.addOutputState(outputState.state)
|
||||
.addCommand(dummyCommand(nodeParty.owningKey))
|
||||
.setTimeWindow(node.services.clock.instant(), 30.seconds)
|
||||
node.services.signInitialTransaction(tx)
|
||||
}
|
||||
|
||||
val sig1 = node.services.startFlow(NotaryFlow.Client(stx)).resultFuture.get().first()
|
||||
assertEquals(sig1.by, notaryParty.owningKey)
|
||||
assertTrue(sig1.isValid(stx.id))
|
||||
|
||||
mockNet.nodes.forEach {
|
||||
val nodeClock = (it.started!!.services.clock as TestClock)
|
||||
nodeClock.advanceBy(Duration.ofDays(1))
|
||||
}
|
||||
|
||||
val sig2 = node.services.startFlow(NotaryFlow.Client(stx)).resultFuture.get().first()
|
||||
assertEquals(sig2.by, notaryParty.owningKey)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should report error for transaction with an invalid time-window`() {
|
||||
val stx = run {
|
||||
@ -212,7 +313,6 @@ class MySQLNotaryServiceTests : IntegrationTest() {
|
||||
@Suspendable
|
||||
override fun call(): List<Result> {
|
||||
val futures = mutableListOf<CordaFuture<Result>>()
|
||||
var requestSignature: NotarisationRequestSignature? = null
|
||||
for (i in 1..transactionCount) {
|
||||
val txId: SecureHash = txIdGenerator.generateOrFail(random)
|
||||
val callerParty = partyGenerator.generateOrFail(random)
|
||||
@ -222,15 +322,12 @@ class MySQLNotaryServiceTests : IntegrationTest() {
|
||||
Generator.replicate(inputStateCount, stateRefGenerator)
|
||||
}
|
||||
val inputs = inputGenerator.generateOrFail(random)
|
||||
if (requestSignature == null || random.nextInt(10) < 2) {
|
||||
requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub)
|
||||
}
|
||||
futures += SinglePartyNotaryService.CommitOperation(
|
||||
service,
|
||||
inputs,
|
||||
txId,
|
||||
callerParty,
|
||||
requestSignature,
|
||||
NotarisationRequest(inputs, txId).generateSignature(serviceHub),
|
||||
null,
|
||||
emptyList()).execute("")
|
||||
}
|
||||
|
@ -0,0 +1,72 @@
|
||||
package net.corda.notary.mysql
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.DigitalSignature
|
||||
import net.corda.core.crypto.NullKeys
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.flows.NotarisationRequestSignature
|
||||
import net.corda.core.internal.notary.UniquenessProvider
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.getTestPartyAndCertificate
|
||||
import net.corda.testing.internal.IntegrationTest
|
||||
import net.corda.testing.node.internal.makeInternalTestDataSourceProperties
|
||||
import org.hamcrest.CoreMatchers.instanceOf
|
||||
import org.junit.After
|
||||
import org.junit.Assert.assertThat
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.time.Clock
|
||||
import java.util.*
|
||||
|
||||
class MySQLUniquenessProviderTest : IntegrationTest() {
|
||||
|
||||
val dataStoreProperties = makeInternalTestDataSourceProperties(configSupplier = { ConfigFactory.empty() }).apply {
|
||||
setProperty("autoCommit", "false")
|
||||
}
|
||||
|
||||
val config = MySQLNotaryConfiguration(dataStoreProperties, maxBatchSize = 10, maxBatchInputStates = 100)
|
||||
val clock = Clock.systemUTC()
|
||||
|
||||
val party = getTestPartyAndCertificate(ALICE_NAME, generateKeyPair().public).party
|
||||
|
||||
val uniquenessProvider = MySQLUniquenessProvider(MetricRegistry(), clock, config)
|
||||
|
||||
@Before
|
||||
fun before() {
|
||||
uniquenessProvider.createTable()
|
||||
}
|
||||
|
||||
@After
|
||||
fun after() {
|
||||
uniquenessProvider.stop()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `intra-batch conflict between reference state and input state is detected`() {
|
||||
val inputs = listOf(StateRef(SecureHash.randomSHA256(), 0))
|
||||
val request = MySQLUniquenessProvider.CommitRequest(
|
||||
inputs,
|
||||
SecureHash.randomSHA256(),
|
||||
party,
|
||||
NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0),
|
||||
null,
|
||||
emptyList(),
|
||||
UUID.randomUUID()
|
||||
)
|
||||
val conflictingRequest = MySQLUniquenessProvider.CommitRequest(
|
||||
listOf(StateRef(SecureHash.randomSHA256(), 0)),
|
||||
SecureHash.randomSHA256(),
|
||||
party,
|
||||
NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0),
|
||||
null,
|
||||
request.states,
|
||||
UUID.randomUUID()
|
||||
)
|
||||
val results = MySQLUniquenessProvider.CommitStates(listOf(request, conflictingRequest), clock).execute(uniquenessProvider.connection)
|
||||
assertThat(results[request.id], instanceOf(UniquenessProvider.Result.Success::class.java))
|
||||
assertThat(results[conflictingRequest.id], instanceOf(UniquenessProvider.Result.Failure::class.java))
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user