Merge pull request #7430 from corda/adel/ENT-10122

ENT-10122: Add consuming transaction id to vault states table.
This commit is contained in:
Adel El-Beik 2023-08-07 13:35:04 +01:00 committed by GitHub
commit eccb9b4af6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 101 additions and 11 deletions

View File

@ -37,6 +37,7 @@ import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.DeprecatedConstructorForDeserialization
import net.corda.core.toFuture import net.corda.core.toFuture
import net.corda.core.transactions.LedgerTransaction import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.NonEmptySet import net.corda.core.utilities.NonEmptySet
@ -68,7 +69,7 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
* other transactions observed, then the changes are observed "net" of those. * other transactions observed, then the changes are observed "net" of those.
*/ */
@CordaSerializable @CordaSerializable
data class Update<U : ContractState> @JvmOverloads constructor( data class Update<U : ContractState> constructor(
val consumed: Set<StateAndRef<U>>, val consumed: Set<StateAndRef<U>>,
val produced: Set<StateAndRef<U>>, val produced: Set<StateAndRef<U>>,
val flowId: UUID? = null, val flowId: UUID? = null,
@ -78,8 +79,21 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
* differently. * differently.
*/ */
val type: UpdateType = UpdateType.GENERAL, val type: UpdateType = UpdateType.GENERAL,
val references: Set<StateAndRef<U>> = emptySet() val references: Set<StateAndRef<U>> = emptySet(),
val consumingTxIds: Map<StateRef, SecureHash> = emptyMap()
) { ) {
@DeprecatedConstructorForDeserialization(1)
@JvmOverloads constructor( consumed: Set<StateAndRef<U>>,
produced: Set<StateAndRef<U>>,
flowId: UUID? = null,
/**
* Specifies the type of update, currently supported types are general and, contract upgrade and notary change.
* Notary change transactions only modify the notary field on states, and potentially need to be handled
* differently.
*/
type: UpdateType = UpdateType.GENERAL,
references: Set<StateAndRef<U>> = emptySet()) : this(consumed, produced, flowId, type, references, consumingTxIds = emptyMap())
/** Checks whether the update contains a state of the specified type. */ /** Checks whether the update contains a state of the specified type. */
inline fun <reified T : ContractState> containsType() = consumed.any { it.state.data is T } || produced.any { it.state.data is T } || references.any { it.state.data is T } inline fun <reified T : ContractState> containsType() = consumed.any { it.state.data is T } || produced.any { it.state.data is T } || references.any { it.state.data is T }
@ -105,9 +119,9 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
val combinedConsumed = consumed + (rhs.consumed - produced) val combinedConsumed = consumed + (rhs.consumed - produced)
// The ordering below matters to preserve ordering of consumed/produced Sets when they are insertion order dependent implementations. // The ordering below matters to preserve ordering of consumed/produced Sets when they are insertion order dependent implementations.
val combinedProduced = produced.filter { it !in rhs.consumed }.toSet() + rhs.produced val combinedProduced = produced.filter { it !in rhs.consumed }.toSet() + rhs.produced
return copy(consumed = combinedConsumed, produced = combinedProduced, references = references + rhs.references) return copy(consumed = combinedConsumed, produced = combinedProduced, references = references + rhs.references, consumingTxIds = consumingTxIds + rhs.consumingTxIds)
} }
override fun toString(): String { override fun toString(): String {
val sb = StringBuilder() val sb = StringBuilder()
sb.appendln("${consumed.size} consumed, ${produced.size} produced") sb.appendln("${consumed.size} consumed, ${produced.size} produced")
@ -125,6 +139,10 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
references.forEach { references.forEach {
sb.appendln("${it.ref}: ${it.state}") sb.appendln("${it.ref}: ${it.state}")
} }
sb.appendln("Consuming TxIds:")
consumingTxIds.forEach {
sb.appendln("${it.key}: ${it.value}")
}
return sb.toString() return sb.toString()
} }
@ -135,9 +153,19 @@ class Vault<out T : ContractState>(val states: Iterable<StateAndRef<T>>) {
flowId: UUID? = null, flowId: UUID? = null,
type: UpdateType = UpdateType.GENERAL type: UpdateType = UpdateType.GENERAL
): Update<U> { ): Update<U> {
return Update(consumed, produced, flowId, type, references) return Update(consumed, produced, flowId, type, references, consumingTxIds)
} }
/** Additional copy method to maintain backwards compatibility. */
fun copy(
consumed: Set<StateAndRef<U>>,
produced: Set<StateAndRef<U>>,
flowId: UUID? = null,
type: UpdateType = UpdateType.GENERAL,
references: Set<StateAndRef<U>> = emptySet()
): Update<U> {
return Update(consumed, produced, flowId, type, references, consumingTxIds)
}
} }
@CordaSerializable @CordaSerializable

View File

@ -1,10 +1,16 @@
package net.corda.node package net.corda.node
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.messaging.vaultTrackBy
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.finance.DOLLARS import net.corda.finance.DOLLARS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueAndPaymentFlow import net.corda.finance.flows.CashIssueAndPaymentFlow
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
@ -17,6 +23,8 @@ import net.corda.testing.node.NotarySpec
import net.corda.testing.node.internal.findCordapp import net.corda.testing.node.internal.findCordapp
import org.junit.Test import org.junit.Test
import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertDoesNotThrow
import java.util.concurrent.CountDownLatch
import kotlin.test.assertEquals
/** /**
* Execute a flow with sub-flows, including the finality flow. * Execute a flow with sub-flows, including the finality flow.
@ -65,4 +73,36 @@ class CashIssueAndPaymentTest {
logger.info("TXN={}, recipient={}", result.stx, result.recipient) logger.info("TXN={}, recipient={}", result.stx, result.recipient)
} }
} }
@Test(timeout = 300_000)
fun `test can issue cash and see consumming transaction id in rpc client`() {
driver(parametersFor()) {
val alice = startNode(providedName = ALICE_NAME, customOverrides = configOverrides).getOrThrow()
val aliceParty = alice.nodeInfo.singleIdentity()
val notaryParty = notaryHandles.single().identity
val result = assertDoesNotThrow {
val criteria = QueryCriteria.VaultQueryCriteria(Vault.StateStatus.CONSUMED)
val (_, vaultUpdates) = alice.rpc.vaultTrackBy<Cash.State>(criteria = criteria, paging = PageSpecification(DEFAULT_PAGE_NUM))
val updateLatch = CountDownLatch(1)
vaultUpdates.subscribe { update ->
val consumedRef = update.consumed.single().ref
assertEquals( update.produced.single().ref.txhash, update.consumingTxIds[consumedRef] )
updateLatch.countDown()
}
val flowRet = alice.rpc.startFlow(::CashIssueAndPaymentFlow,
CASH_AMOUNT,
OpaqueBytes.of(0x01),
aliceParty,
false,
notaryParty
).use { flowHandle ->
flowHandle.returnValue.getOrThrow()
}
updateLatch.await()
flowRet
}
logger.info("TXN={}, recipient={}", result.stx, result.recipient)
}
}
} }

View File

@ -231,6 +231,7 @@ class NodeVaultService(
if (stateStatus != Vault.StateStatus.CONSUMED) { if (stateStatus != Vault.StateStatus.CONSUMED) {
stateStatus = Vault.StateStatus.CONSUMED stateStatus = Vault.StateStatus.CONSUMED
consumedTime = clock.instant() consumedTime = clock.instant()
consumingTxId = update.consumingTxIds[stateRef]?.toString()
// remove lock (if held) // remove lock (if held)
if (lockId != null) { if (lockId != null) {
lockId = null lockId = null
@ -370,8 +371,8 @@ class NodeVaultService(
} }
} }
} }
val consumedTxIds = consumedStates.associate { Pair(it.ref, tx.id) }
return Vault.Update(consumedStates.toSet(), ourNewStates.toSet(), references = newReferenceStateAndRefs.toSet()) return Vault.Update(consumedStates.toSet(), ourNewStates.toSet(), references = newReferenceStateAndRefs.toSet(), consumingTxIds = consumedTxIds)
} }
fun resolveAndMakeUpdate(tx: CoreTransaction): Vault.Update<ContractState>? { fun resolveAndMakeUpdate(tx: CoreTransaction): Vault.Update<ContractState>? {

View File

@ -91,7 +91,11 @@ object VaultSchemaV1 : MappedSchema(
/** associated constraint type data (if any) */ /** associated constraint type data (if any) */
@Column(name = "constraint_data", length = MAX_CONSTRAINT_DATA_SIZE, nullable = true) @Column(name = "constraint_data", length = MAX_CONSTRAINT_DATA_SIZE, nullable = true)
@Type(type = "corda-wrapper-binary") @Type(type = "corda-wrapper-binary")
var constraintData: ByteArray? = null var constraintData: ByteArray? = null,
/** consuming transaction */
@Column(name = "consuming_tx_id", length = 144, nullable = true)
var consumingTxId: String? = null
) : PersistentState() ) : PersistentState()
@Entity @Entity

View File

@ -13,4 +13,5 @@
<include file="migration/vault-schema.changelog-v8.xml"/> <include file="migration/vault-schema.changelog-v8.xml"/>
<include file="migration/vault-schema.changelog-v11.xml"/> <include file="migration/vault-schema.changelog-v11.xml"/>
<include file="migration/vault-schema.changelog-v13.xml"/> <include file="migration/vault-schema.changelog-v13.xml"/>
<include file="migration/vault-schema.changelog-v14.xml"/>
</databaseChangeLog> </databaseChangeLog>

View File

@ -0,0 +1,15 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
<changeSet author="R3.Corda" id="add_consuming_tx_id_column">
<preConditions onFail="MARK_RAN">
<not>
<columnExists tableName="vault_states" columnName="consuming_tx_id"/>
</not>
</preConditions>
<addColumn tableName="vault_states">
<column name="consuming_tx_id" type="NVARCHAR(144)"/>
</addColumn>
</changeSet>
</databaseChangeLog>

View File

@ -664,14 +664,15 @@ class NodeVaultServiceTest {
database.transaction { vaultService.notify(StatesToRecord.ONLY_RELEVANT, issueTx) } database.transaction { vaultService.notify(StatesToRecord.ONLY_RELEVANT, issueTx) }
val expectedIssueUpdate = Vault.Update(emptySet(), setOf(cashState), null) val expectedIssueUpdate = Vault.Update(emptySet(), setOf(cashState), null)
database.transaction { val moveTx = database.transaction {
val moveBuilder = TransactionBuilder(notary).apply { val moveBuilder = TransactionBuilder(notary).apply {
CashUtils.generateSpend(services, this, Amount(1000, GBP), identity, thirdPartyIdentity) CashUtils.generateSpend(services, this, Amount(1000, GBP), identity, thirdPartyIdentity)
} }
val moveTx = moveBuilder.toWireTransaction(services) val moveTx = moveBuilder.toWireTransaction(services)
vaultService.notify(StatesToRecord.ONLY_RELEVANT, moveTx) vaultService.notify(StatesToRecord.ONLY_RELEVANT, moveTx)
moveTx
} }
val expectedMoveUpdate = Vault.Update(setOf(cashState), emptySet(), null) val expectedMoveUpdate = Vault.Update(setOf(cashState), emptySet(), null, consumingTxIds = mapOf(cashState.ref to moveTx.id))
// ensure transaction contract state is persisted in DBStorage // ensure transaction contract state is persisted in DBStorage
val signedMoveTx = services.signInitialTransaction(issueBuilder) val signedMoveTx = services.signInitialTransaction(issueBuilder)
@ -740,7 +741,7 @@ class NodeVaultServiceTest {
val expectedIssueUpdate = Vault.Update(emptySet(), setOf(initialCashState), null) val expectedIssueUpdate = Vault.Update(emptySet(), setOf(initialCashState), null)
val expectedNotaryChangeUpdate = Vault.Update(setOf(initialCashState), setOf(cashStateWithNewNotary), null, Vault.UpdateType.NOTARY_CHANGE) val expectedNotaryChangeUpdate = Vault.Update(setOf(initialCashState), setOf(cashStateWithNewNotary), null, Vault.UpdateType.NOTARY_CHANGE)
val expectedMoveUpdate = Vault.Update(setOf(cashStateWithNewNotary), emptySet(), null) val expectedMoveUpdate = Vault.Update(setOf(cashStateWithNewNotary), emptySet(), null, consumingTxIds = mapOf(cashStateWithNewNotary.ref to moveTx.id))
val observedUpdates = vaultSubscriber.onNextEvents val observedUpdates = vaultSubscriber.onNextEvents
assertEquals(observedUpdates, listOf(expectedIssueUpdate, expectedNotaryChangeUpdate, expectedMoveUpdate)) assertEquals(observedUpdates, listOf(expectedIssueUpdate, expectedNotaryChangeUpdate, expectedMoveUpdate))