diff --git a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt index 1913e6bf84..a6b42b9541 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt @@ -37,6 +37,7 @@ import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.Sort import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.DeprecatedConstructorForDeserialization import net.corda.core.toFuture import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.NonEmptySet @@ -68,7 +69,7 @@ class Vault(val states: Iterable>) { * other transactions observed, then the changes are observed "net" of those. */ @CordaSerializable - data class Update @JvmOverloads constructor( + data class Update constructor( val consumed: Set>, val produced: Set>, val flowId: UUID? = null, @@ -78,8 +79,21 @@ class Vault(val states: Iterable>) { * differently. */ val type: UpdateType = UpdateType.GENERAL, - val references: Set> = emptySet() + val references: Set> = emptySet(), + val consumingTxIds: Map = emptyMap() ) { + @DeprecatedConstructorForDeserialization(1) + @JvmOverloads constructor( consumed: Set>, + produced: Set>, + flowId: UUID? = null, + /** + * Specifies the type of update, currently supported types are general and, contract upgrade and notary change. + * Notary change transactions only modify the notary field on states, and potentially need to be handled + * differently. + */ + type: UpdateType = UpdateType.GENERAL, + references: Set> = emptySet()) : this(consumed, produced, flowId, type, references, consumingTxIds = emptyMap()) + /** Checks whether the update contains a state of the specified type. */ inline fun containsType() = consumed.any { it.state.data is T } || produced.any { it.state.data is T } || references.any { it.state.data is T } @@ -105,9 +119,9 @@ class Vault(val states: Iterable>) { val combinedConsumed = consumed + (rhs.consumed - produced) // The ordering below matters to preserve ordering of consumed/produced Sets when they are insertion order dependent implementations. val combinedProduced = produced.filter { it !in rhs.consumed }.toSet() + rhs.produced - return copy(consumed = combinedConsumed, produced = combinedProduced, references = references + rhs.references) + return copy(consumed = combinedConsumed, produced = combinedProduced, references = references + rhs.references, consumingTxIds = consumingTxIds + rhs.consumingTxIds) } - + override fun toString(): String { val sb = StringBuilder() sb.appendln("${consumed.size} consumed, ${produced.size} produced") @@ -125,6 +139,10 @@ class Vault(val states: Iterable>) { references.forEach { sb.appendln("${it.ref}: ${it.state}") } + sb.appendln("Consuming TxIds:") + consumingTxIds.forEach { + sb.appendln("${it.key}: ${it.value}") + } return sb.toString() } @@ -135,9 +153,19 @@ class Vault(val states: Iterable>) { flowId: UUID? = null, type: UpdateType = UpdateType.GENERAL ): Update { - return Update(consumed, produced, flowId, type, references) + return Update(consumed, produced, flowId, type, references, consumingTxIds) } + /** Additional copy method to maintain backwards compatibility. */ + fun copy( + consumed: Set>, + produced: Set>, + flowId: UUID? = null, + type: UpdateType = UpdateType.GENERAL, + references: Set> = emptySet() + ): Update { + return Update(consumed, produced, flowId, type, references, consumingTxIds) + } } @CordaSerializable diff --git a/node/src/integration-test/kotlin/net/corda/node/CashIssueAndPaymentTest.kt b/node/src/integration-test/kotlin/net/corda/node/CashIssueAndPaymentTest.kt index 2da38e1509..93b08b7842 100644 --- a/node/src/integration-test/kotlin/net/corda/node/CashIssueAndPaymentTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/CashIssueAndPaymentTest.kt @@ -1,10 +1,16 @@ package net.corda.node import net.corda.core.messaging.startFlow +import net.corda.core.messaging.vaultTrackBy +import net.corda.core.node.services.Vault +import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM +import net.corda.core.node.services.vault.PageSpecification +import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.loggerFor import net.corda.finance.DOLLARS +import net.corda.finance.contracts.asset.Cash import net.corda.finance.flows.CashIssueAndPaymentFlow import net.corda.node.services.config.NodeConfiguration import net.corda.testing.core.ALICE_NAME @@ -17,6 +23,8 @@ import net.corda.testing.node.NotarySpec import net.corda.testing.node.internal.findCordapp import org.junit.Test import org.junit.jupiter.api.assertDoesNotThrow +import java.util.concurrent.CountDownLatch +import kotlin.test.assertEquals /** * Execute a flow with sub-flows, including the finality flow. @@ -65,4 +73,36 @@ class CashIssueAndPaymentTest { logger.info("TXN={}, recipient={}", result.stx, result.recipient) } } + + @Test(timeout = 300_000) + fun `test can issue cash and see consumming transaction id in rpc client`() { + driver(parametersFor()) { + val alice = startNode(providedName = ALICE_NAME, customOverrides = configOverrides).getOrThrow() + val aliceParty = alice.nodeInfo.singleIdentity() + val notaryParty = notaryHandles.single().identity + val result = assertDoesNotThrow { + + val criteria = QueryCriteria.VaultQueryCriteria(Vault.StateStatus.CONSUMED) + val (_, vaultUpdates) = alice.rpc.vaultTrackBy(criteria = criteria, paging = PageSpecification(DEFAULT_PAGE_NUM)) + val updateLatch = CountDownLatch(1) + vaultUpdates.subscribe { update -> + val consumedRef = update.consumed.single().ref + assertEquals( update.produced.single().ref.txhash, update.consumingTxIds[consumedRef] ) + updateLatch.countDown() + } + val flowRet = alice.rpc.startFlow(::CashIssueAndPaymentFlow, + CASH_AMOUNT, + OpaqueBytes.of(0x01), + aliceParty, + false, + notaryParty + ).use { flowHandle -> + flowHandle.returnValue.getOrThrow() + } + updateLatch.await() + flowRet + } + logger.info("TXN={}, recipient={}", result.stx, result.recipient) + } + } } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index ec4984ea68..26b20544f6 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -231,6 +231,7 @@ class NodeVaultService( if (stateStatus != Vault.StateStatus.CONSUMED) { stateStatus = Vault.StateStatus.CONSUMED consumedTime = clock.instant() + consumingTxId = update.consumingTxIds[stateRef]?.toString() // remove lock (if held) if (lockId != null) { lockId = null @@ -370,8 +371,8 @@ class NodeVaultService( } } } - - return Vault.Update(consumedStates.toSet(), ourNewStates.toSet(), references = newReferenceStateAndRefs.toSet()) + val consumedTxIds = consumedStates.associate { Pair(it.ref, tx.id) } + return Vault.Update(consumedStates.toSet(), ourNewStates.toSet(), references = newReferenceStateAndRefs.toSet(), consumingTxIds = consumedTxIds) } fun resolveAndMakeUpdate(tx: CoreTransaction): Vault.Update? { diff --git a/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt b/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt index 09c71fe1f7..ee59ee170f 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/VaultSchema.kt @@ -91,7 +91,11 @@ object VaultSchemaV1 : MappedSchema( /** associated constraint type data (if any) */ @Column(name = "constraint_data", length = MAX_CONSTRAINT_DATA_SIZE, nullable = true) @Type(type = "corda-wrapper-binary") - var constraintData: ByteArray? = null + var constraintData: ByteArray? = null, + + /** consuming transaction */ + @Column(name = "consuming_tx_id", length = 144, nullable = true) + var consumingTxId: String? = null ) : PersistentState() @Entity diff --git a/node/src/main/resources/migration/vault-schema.changelog-master.xml b/node/src/main/resources/migration/vault-schema.changelog-master.xml index 44684647fa..8fea107366 100644 --- a/node/src/main/resources/migration/vault-schema.changelog-master.xml +++ b/node/src/main/resources/migration/vault-schema.changelog-master.xml @@ -13,4 +13,5 @@ + diff --git a/node/src/main/resources/migration/vault-schema.changelog-v14.xml b/node/src/main/resources/migration/vault-schema.changelog-v14.xml new file mode 100644 index 0000000000..75f576f214 --- /dev/null +++ b/node/src/main/resources/migration/vault-schema.changelog-v14.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt index 682af85bd8..3b01205ba7 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt @@ -664,14 +664,15 @@ class NodeVaultServiceTest { database.transaction { vaultService.notify(StatesToRecord.ONLY_RELEVANT, issueTx) } val expectedIssueUpdate = Vault.Update(emptySet(), setOf(cashState), null) - database.transaction { + val moveTx = database.transaction { val moveBuilder = TransactionBuilder(notary).apply { CashUtils.generateSpend(services, this, Amount(1000, GBP), identity, thirdPartyIdentity) } val moveTx = moveBuilder.toWireTransaction(services) vaultService.notify(StatesToRecord.ONLY_RELEVANT, moveTx) + moveTx } - val expectedMoveUpdate = Vault.Update(setOf(cashState), emptySet(), null) + val expectedMoveUpdate = Vault.Update(setOf(cashState), emptySet(), null, consumingTxIds = mapOf(cashState.ref to moveTx.id)) // ensure transaction contract state is persisted in DBStorage val signedMoveTx = services.signInitialTransaction(issueBuilder) @@ -740,7 +741,7 @@ class NodeVaultServiceTest { val expectedIssueUpdate = Vault.Update(emptySet(), setOf(initialCashState), null) val expectedNotaryChangeUpdate = Vault.Update(setOf(initialCashState), setOf(cashStateWithNewNotary), null, Vault.UpdateType.NOTARY_CHANGE) - val expectedMoveUpdate = Vault.Update(setOf(cashStateWithNewNotary), emptySet(), null) + val expectedMoveUpdate = Vault.Update(setOf(cashStateWithNewNotary), emptySet(), null, consumingTxIds = mapOf(cashStateWithNewNotary.ref to moveTx.id)) val observedUpdates = vaultSubscriber.onNextEvents assertEquals(observedUpdates, listOf(expectedIssueUpdate, expectedNotaryChangeUpdate, expectedMoveUpdate))