From 669d6590af92e7006530e02a76e803e1d7b2bd4c Mon Sep 17 00:00:00 2001 From: Adel El-Beik Date: Mon, 17 Jul 2023 17:58:31 +0100 Subject: [PATCH 1/6] ENT-10122: Add consuming transaction id to vault states table. --- .../corda/core/node/services/VaultService.kt | 28 +++++++++++++++++-- .../node/services/vault/NodeVaultService.kt | 5 ++-- .../corda/node/services/vault/VaultSchema.kt | 6 +++- .../vault-schema.changelog-master.xml | 1 + 4 files changed, 34 insertions(+), 6 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt index 1913e6bf84..b73673c3a3 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt @@ -68,7 +68,7 @@ class Vault(val states: Iterable>) { * other transactions observed, then the changes are observed "net" of those. */ @CordaSerializable - data class Update @JvmOverloads constructor( + data class Update constructor( val consumed: Set>, val produced: Set>, val flowId: UUID? = null, @@ -78,8 +78,20 @@ class Vault(val states: Iterable>) { * differently. */ val type: UpdateType = UpdateType.GENERAL, - val references: Set> = emptySet() + val references: Set> = emptySet(), + val consumingTxIds: Map = emptyMap() ) { + @JvmOverloads constructor( consumed: Set>, + produced: Set>, + flowId: UUID? = null, + /** + * Specifies the type of update, currently supported types are general and, contract upgrade and notary change. + * Notary change transactions only modify the notary field on states, and potentially need to be handled + * differently. + */ + type: UpdateType = UpdateType.GENERAL, + references: Set> = emptySet()) : this(consumed, produced, flowId, type, references, consumingTxIds = emptyMap()) + /** Checks whether the update contains a state of the specified type. */ inline fun containsType() = consumed.any { it.state.data is T } || produced.any { it.state.data is T } || references.any { it.state.data is T } @@ -105,7 +117,7 @@ class Vault(val states: Iterable>) { val combinedConsumed = consumed + (rhs.consumed - produced) // The ordering below matters to preserve ordering of consumed/produced Sets when they are insertion order dependent implementations. val combinedProduced = produced.filter { it !in rhs.consumed }.toSet() + rhs.produced - return copy(consumed = combinedConsumed, produced = combinedProduced, references = references + rhs.references) + return copy(consumed = combinedConsumed, produced = combinedProduced, references = references + rhs.references, consumingTxIds = consumingTxIds + rhs.consumingTxIds) } override fun toString(): String { @@ -138,6 +150,16 @@ class Vault(val states: Iterable>) { return Update(consumed, produced, flowId, type, references) } + /** Additional copy method to maintain backwards compatibility. */ + fun copy( + consumed: Set>, + produced: Set>, + flowId: UUID? = null, + type: UpdateType = UpdateType.GENERAL, + references: Set> = emptySet() + ): Update { + return Update(consumed, produced, flowId, type, references, consumingTxIds) + } } @CordaSerializable diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index ec4984ea68..cca094eb34 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -231,6 +231,7 @@ class NodeVaultService( if (stateStatus != Vault.StateStatus.CONSUMED) { stateStatus = Vault.StateStatus.CONSUMED consumedTime = clock.instant() + consumingTxId = update.consumingTxIds[stateRef]?.toString() ?: "" // remove lock (if held) if (lockId != null) { lockId = null @@ -370,8 +371,8 @@ class NodeVaultService( } } } - - return Vault.Update(consumedStates.toSet(), ourNewStates.toSet(), references = newReferenceStateAndRefs.toSet()) + val consumedTxIds = consumedStates.associate { Pair(it.ref, tx.id) } + return Vault.Update(consumedStates.toSet(), ourNewStates.toSet(), references = newReferenceStateAndRefs.toSet(), consumingTxIds = consumedTxIds) } fun resolveAndMakeUpdate(tx: CoreTransaction): Vault.Update? { diff --git a/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt b/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt index 09c71fe1f7..fc7fc9f9db 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt @@ -91,7 +91,11 @@ object VaultSchemaV1 : MappedSchema( /** associated constraint type data (if any) */ @Column(name = "constraint_data", length = MAX_CONSTRAINT_DATA_SIZE, nullable = true) @Type(type = "corda-wrapper-binary") - var constraintData: ByteArray? = null + var constraintData: ByteArray? = null, + + /** consuming transaction */ + @Column(name = "consuming_tx_id", length = 144, nullable = false) + var consumingTxId: String = "" ) : PersistentState() @Entity diff --git a/node/src/main/resources/migration/vault-schema.changelog-master.xml b/node/src/main/resources/migration/vault-schema.changelog-master.xml index 44684647fa..8fea107366 100644 --- a/node/src/main/resources/migration/vault-schema.changelog-master.xml +++ b/node/src/main/resources/migration/vault-schema.changelog-master.xml @@ -13,4 +13,5 @@ + From 60bb4c58f2946986caac3dc23494cf9382ce822a Mon Sep 17 00:00:00 2001 From: Adel El-Beik Date: Tue, 18 Jul 2023 17:45:53 +0100 Subject: [PATCH 2/6] ENT-10122: Made the consuming tx id field nullable, added missing changelog file. --- .../corda/node/services/vault/NodeVaultService.kt | 2 +- .../net/corda/node/services/vault/VaultSchema.kt | 4 ++-- .../migration/vault-schema.changelog-v14.xml | 15 +++++++++++++++ 3 files changed, 18 insertions(+), 3 deletions(-) create mode 100644 node/src/main/resources/migration/vault-schema.changelog-v14.xml diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index cca094eb34..26b20544f6 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -231,7 +231,7 @@ class NodeVaultService( if (stateStatus != Vault.StateStatus.CONSUMED) { stateStatus = Vault.StateStatus.CONSUMED consumedTime = clock.instant() - consumingTxId = update.consumingTxIds[stateRef]?.toString() ?: "" + consumingTxId = update.consumingTxIds[stateRef]?.toString() // remove lock (if held) if (lockId != null) { lockId = null diff --git a/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt b/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt index fc7fc9f9db..ee59ee170f 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt @@ -94,8 +94,8 @@ object VaultSchemaV1 : MappedSchema( var constraintData: ByteArray? = null, /** consuming transaction */ - @Column(name = "consuming_tx_id", length = 144, nullable = false) - var consumingTxId: String = "" + @Column(name = "consuming_tx_id", length = 144, nullable = true) + var consumingTxId: String? = null ) : PersistentState() @Entity diff --git a/node/src/main/resources/migration/vault-schema.changelog-v14.xml b/node/src/main/resources/migration/vault-schema.changelog-v14.xml new file mode 100644 index 0000000000..75f576f214 --- /dev/null +++ b/node/src/main/resources/migration/vault-schema.changelog-v14.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + \ No newline at end of file From aa9e41c7c26b2573f6bd526ecaa6255d4f2b84f9 Mon Sep 17 00:00:00 2001 From: Adel El-Beik Date: Wed, 19 Jul 2023 16:36:39 +0100 Subject: [PATCH 3/6] ENT-10122: Updated tests to include consuming transaction id in the Vault.Update check. --- .../kotlin/net/corda/core/node/services/VaultService.kt | 6 +++++- .../net/corda/node/services/vault/NodeVaultServiceTest.kt | 7 ++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt index b73673c3a3..8af0eb2d31 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt @@ -119,7 +119,7 @@ class Vault(val states: Iterable>) { val combinedProduced = produced.filter { it !in rhs.consumed }.toSet() + rhs.produced return copy(consumed = combinedConsumed, produced = combinedProduced, references = references + rhs.references, consumingTxIds = consumingTxIds + rhs.consumingTxIds) } - + //val consumingTxIds: Map = emptyMap() override fun toString(): String { val sb = StringBuilder() sb.appendln("${consumed.size} consumed, ${produced.size} produced") @@ -137,6 +137,10 @@ class Vault(val states: Iterable>) { references.forEach { sb.appendln("${it.ref}: ${it.state}") } + sb.appendln("Consuming TxIds:") + consumingTxIds.forEach { + sb.appendln("${it.key}: ${it.value}") + } return sb.toString() } diff --git a/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt index 682af85bd8..3b01205ba7 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt @@ -664,14 +664,15 @@ class NodeVaultServiceTest { database.transaction { vaultService.notify(StatesToRecord.ONLY_RELEVANT, issueTx) } val expectedIssueUpdate = Vault.Update(emptySet(), setOf(cashState), null) - database.transaction { + val moveTx = database.transaction { val moveBuilder = TransactionBuilder(notary).apply { CashUtils.generateSpend(services, this, Amount(1000, GBP), identity, thirdPartyIdentity) } val moveTx = moveBuilder.toWireTransaction(services) vaultService.notify(StatesToRecord.ONLY_RELEVANT, moveTx) + moveTx } - val expectedMoveUpdate = Vault.Update(setOf(cashState), emptySet(), null) + val expectedMoveUpdate = Vault.Update(setOf(cashState), emptySet(), null, consumingTxIds = mapOf(cashState.ref to moveTx.id)) // ensure transaction contract state is persisted in DBStorage val signedMoveTx = services.signInitialTransaction(issueBuilder) @@ -740,7 +741,7 @@ class NodeVaultServiceTest { val expectedIssueUpdate = Vault.Update(emptySet(), setOf(initialCashState), null) val expectedNotaryChangeUpdate = Vault.Update(setOf(initialCashState), setOf(cashStateWithNewNotary), null, Vault.UpdateType.NOTARY_CHANGE) - val expectedMoveUpdate = Vault.Update(setOf(cashStateWithNewNotary), emptySet(), null) + val expectedMoveUpdate = Vault.Update(setOf(cashStateWithNewNotary), emptySet(), null, consumingTxIds = mapOf(cashStateWithNewNotary.ref to moveTx.id)) val observedUpdates = vaultSubscriber.onNextEvents assertEquals(observedUpdates, listOf(expectedIssueUpdate, expectedNotaryChangeUpdate, expectedMoveUpdate)) From 117d319317c0fe78db709ac05c3f441f3b4a1671 Mon Sep 17 00:00:00 2001 From: Adel El-Beik Date: Thu, 20 Jul 2023 09:31:46 +0100 Subject: [PATCH 4/6] ENT-10122: Removed commented code left in. --- .../main/kotlin/net/corda/core/node/services/VaultService.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt index 8af0eb2d31..bf8db51be2 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt @@ -119,7 +119,7 @@ class Vault(val states: Iterable>) { val combinedProduced = produced.filter { it !in rhs.consumed }.toSet() + rhs.produced return copy(consumed = combinedConsumed, produced = combinedProduced, references = references + rhs.references, consumingTxIds = consumingTxIds + rhs.consumingTxIds) } - //val consumingTxIds: Map = emptyMap() + override fun toString(): String { val sb = StringBuilder() sb.appendln("${consumed.size} consumed, ${produced.size} produced") From c614b21a2a21a35fb3010e25ba4b9343f8fd8c19 Mon Sep 17 00:00:00 2001 From: Adel El-Beik Date: Tue, 1 Aug 2023 15:11:21 +0100 Subject: [PATCH 5/6] ENT-10122: Added annotation for backwards compatibility and added test. --- .../corda/core/node/services/VaultService.kt | 4 +- .../net/corda/node/CashIssueAndPaymentTest.kt | 40 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt index bf8db51be2..a6b42b9541 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt @@ -37,6 +37,7 @@ import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.Sort import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.DeprecatedConstructorForDeserialization import net.corda.core.toFuture import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.NonEmptySet @@ -81,6 +82,7 @@ class Vault(val states: Iterable>) { val references: Set> = emptySet(), val consumingTxIds: Map = emptyMap() ) { + @DeprecatedConstructorForDeserialization(1) @JvmOverloads constructor( consumed: Set>, produced: Set>, flowId: UUID? = null, @@ -151,7 +153,7 @@ class Vault(val states: Iterable>) { flowId: UUID? = null, type: UpdateType = UpdateType.GENERAL ): Update { - return Update(consumed, produced, flowId, type, references) + return Update(consumed, produced, flowId, type, references, consumingTxIds) } /** Additional copy method to maintain backwards compatibility. */ diff --git a/node/src/integration-test/kotlin/net/corda/node/CashIssueAndPaymentTest.kt b/node/src/integration-test/kotlin/net/corda/node/CashIssueAndPaymentTest.kt index 2da38e1509..59d4ddbdb7 100644 --- a/node/src/integration-test/kotlin/net/corda/node/CashIssueAndPaymentTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/CashIssueAndPaymentTest.kt @@ -1,10 +1,16 @@ package net.corda.node import net.corda.core.messaging.startFlow +import net.corda.core.messaging.vaultTrackBy +import net.corda.core.node.services.Vault +import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM +import net.corda.core.node.services.vault.PageSpecification +import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.loggerFor import net.corda.finance.DOLLARS +import net.corda.finance.contracts.asset.Cash import net.corda.finance.flows.CashIssueAndPaymentFlow import net.corda.node.services.config.NodeConfiguration import net.corda.testing.core.ALICE_NAME @@ -17,6 +23,8 @@ import net.corda.testing.node.NotarySpec import net.corda.testing.node.internal.findCordapp import org.junit.Test import org.junit.jupiter.api.assertDoesNotThrow +import java.util.concurrent.CountDownLatch +import kotlin.test.assertEquals /** * Execute a flow with sub-flows, including the finality flow. @@ -65,4 +73,36 @@ class CashIssueAndPaymentTest { logger.info("TXN={}, recipient={}", result.stx, result.recipient) } } + + @Test(timeout = 300_000) + fun `test can issue cash and see consumming transaction id in rpc client`() { + driver(parametersFor()) { + val alice = startNode(providedName = ALICE_NAME, customOverrides = configOverrides).getOrThrow() + val aliceParty = alice.nodeInfo.singleIdentity() + val notaryParty = notaryHandles.single().identity + val result = assertDoesNotThrow { + + val criteria = QueryCriteria.VaultQueryCriteria(Vault.StateStatus.CONSUMED) + val (vault, vaultUpdates) = alice.rpc.vaultTrackBy(criteria = criteria, paging = PageSpecification(DEFAULT_PAGE_NUM)) + val updateLatch = CountDownLatch(1) + vaultUpdates.subscribe { update -> + val consumedRef = update.consumed.single().ref + assertEquals( update.produced.single().ref.txhash, update.consumingTxIds[consumedRef] ) + updateLatch.countDown() + } + val flowRet = alice.rpc.startFlow(::CashIssueAndPaymentFlow, + CASH_AMOUNT, + OpaqueBytes.of(0x01), + aliceParty, + false, + notaryParty + ).use { flowHandle -> + flowHandle.returnValue.getOrThrow() + } + updateLatch.await() + flowRet + } + logger.info("TXN={}, recipient={}", result.stx, result.recipient) + } + } } From 3465917a936e4d2cf3d20d8c42976554cd6eb912 Mon Sep 17 00:00:00 2001 From: Adel El-Beik Date: Tue, 1 Aug 2023 15:48:34 +0100 Subject: [PATCH 6/6] ENT-10122: Fixed detekt issue. --- .../kotlin/net/corda/node/CashIssueAndPaymentTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/CashIssueAndPaymentTest.kt b/node/src/integration-test/kotlin/net/corda/node/CashIssueAndPaymentTest.kt index 59d4ddbdb7..93b08b7842 100644 --- a/node/src/integration-test/kotlin/net/corda/node/CashIssueAndPaymentTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/CashIssueAndPaymentTest.kt @@ -83,7 +83,7 @@ class CashIssueAndPaymentTest { val result = assertDoesNotThrow { val criteria = QueryCriteria.VaultQueryCriteria(Vault.StateStatus.CONSUMED) - val (vault, vaultUpdates) = alice.rpc.vaultTrackBy(criteria = criteria, paging = PageSpecification(DEFAULT_PAGE_NUM)) + val (_, vaultUpdates) = alice.rpc.vaultTrackBy(criteria = criteria, paging = PageSpecification(DEFAULT_PAGE_NUM)) val updateLatch = CountDownLatch(1) vaultUpdates.subscribe { update -> val consumedRef = update.consumed.single().ref