diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt index 5bd5f65b5d..95caf1bb17 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt @@ -118,7 +118,7 @@ class NodeMonitorModel : AutoCloseable { statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED }.toSet() val consumedStates = statesSnapshot.states.toSet() - unconsumedStates - val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates) + val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates, references = emptySet()) vaultUpdates.startWith(initialVaultUpdate).subscribe({ vaultUpdatesSubject.onNext(it) }, {}) // Transactions diff --git a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt index 1abaf49676..802c64d683 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/VaultService.kt @@ -47,7 +47,7 @@ class Vault(val states: Iterable>) { * other transactions observed, then the changes are observed "net" of those. */ @CordaSerializable - data class Update( + data class Update @JvmOverloads constructor( val consumed: Set>, val produced: Set>, val flowId: UUID? = null, @@ -56,10 +56,11 @@ class Vault(val states: Iterable>) { * Notary change transactions only modify the notary field on states, and potentially need to be handled * differently. */ - val type: UpdateType = UpdateType.GENERAL + val type: UpdateType = UpdateType.GENERAL, + val references: Set> = emptySet() ) { /** Checks whether the update contains a state of the specified type. */ - inline fun containsType() = consumed.any { it.state.data is T } || produced.any { it.state.data is T } + inline fun containsType() = consumed.any { it.state.data is T } || produced.any { it.state.data is T } || references.any { it.state.data is T } /** Checks whether the update contains a state of the specified type and state status */ fun containsType(clazz: Class, status: StateStatus) = @@ -83,7 +84,7 @@ class Vault(val states: Iterable>) { val combinedConsumed = consumed + (rhs.consumed - produced) // The ordering below matters to preserve ordering of consumed/produced Sets when they are insertion order dependent implementations. val combinedProduced = produced.filter { it !in rhs.consumed }.toSet() + rhs.produced - return copy(consumed = combinedConsumed, produced = combinedProduced) + return copy(consumed = combinedConsumed, produced = combinedProduced, references = references + rhs.references) } override fun toString(): String { @@ -99,8 +100,23 @@ class Vault(val states: Iterable>) { produced.forEach { sb.appendln("${it.ref}: ${it.state}") } + sb.appendln("References:") + references.forEach { + sb.appendln("${it.ref}: ${it.state}") + } return sb.toString() } + + /** Additional copy method to maintain backwards compatibility. */ + fun copy( + consumed: Set>, + produced: Set>, + flowId: UUID? = null, + type: UpdateType = UpdateType.GENERAL + ): Update { + return Update(consumed, produced, flowId, type, references) + } + } @CordaSerializable @@ -232,9 +248,9 @@ class Vault(val states: Iterable>) { companion object { @Deprecated("No longer used. The vault does not emit empty updates") - val NoUpdate = Update(emptySet(), emptySet(), type = Vault.UpdateType.GENERAL) + val NoUpdate = Update(emptySet(), emptySet(), type = Vault.UpdateType.GENERAL, references = emptySet()) @Deprecated("No longer used. The vault does not emit empty updates") - val NoNotaryUpdate = Vault.Update(emptySet(), emptySet(), type = Vault.UpdateType.NOTARY_CHANGE) + val NoNotaryUpdate = Vault.Update(emptySet(), emptySet(), type = Vault.UpdateType.NOTARY_CHANGE, references = emptySet()) } } @@ -284,7 +300,7 @@ interface VaultService { val result = trackBy(query) val snapshot = result.snapshot.states return if (snapshot.isNotEmpty()) { - doneFuture(Vault.Update(consumed = setOf(snapshot.single()), produced = emptySet())) + doneFuture(Vault.Update(consumed = setOf(snapshot.single()), produced = emptySet(), references = emptySet())) } else { result.updates.toFuture() } diff --git a/core/src/test/kotlin/net/corda/core/flows/WithReferencedStatesFlowTests.kt b/core/src/test/kotlin/net/corda/core/flows/ReferencedStatesFlowTests.kt similarity index 56% rename from core/src/test/kotlin/net/corda/core/flows/WithReferencedStatesFlowTests.kt rename to core/src/test/kotlin/net/corda/core/flows/ReferencedStatesFlowTests.kt index b88fbab7fb..efd33f335c 100644 --- a/core/src/test/kotlin/net/corda/core/flows/WithReferencedStatesFlowTests.kt +++ b/core/src/test/kotlin/net/corda/core/flows/ReferencedStatesFlowTests.kt @@ -8,31 +8,41 @@ import net.corda.core.node.StatesToRecord import net.corda.core.node.services.Vault import net.corda.core.node.services.queryBy import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.toFuture import net.corda.core.transactions.LedgerTransaction import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow import net.corda.node.VersionInfo import net.corda.testing.common.internal.testNetworkParameters +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.internal.vault.DUMMY_LINEAR_CONTRACT_PROGRAM_ID +import net.corda.testing.internal.vault.DummyLinearContract +import net.corda.testing.node.StartedMockNode import net.corda.testing.node.internal.* +import net.corda.testing.node.transaction import org.junit.After +import org.junit.Before +import org.junit.Rule import org.junit.Test import kotlin.test.assertEquals -class WithReferencedStatesFlowTests { - companion object { - @JvmStatic - private val mockNet = InternalMockNetwork( - cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP, enclosedCordapp()), - threadPerNode = true, - initialNetworkParameters = testNetworkParameters(minimumPlatformVersion = 4) - ) - } +class ReferencedStatesFlowTests { - private val nodes = (0..1).map { - mockNet.createNode( - parameters = InternalMockNodeParameters(version = VersionInfo(4, "Blah", "Blah", "Blah")) - ) + var mockNet: InternalMockNetwork = InternalMockNetwork( + cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP, enclosedCordapp()), + threadPerNode = true, + initialNetworkParameters = testNetworkParameters(minimumPlatformVersion = 4) + ) + lateinit var nodes: List + + @Before + fun setup() { + nodes = (0..1).map { + mockNet.createNode( + parameters = InternalMockNodeParameters(version = VersionInfo(4, "Blah", "Blah", "Blah")) + ) + } } @After @@ -41,7 +51,7 @@ class WithReferencedStatesFlowTests { } @Test - fun test() { + fun `with referenced states flow blocks until the reference state update is received`() { // 1. Create reference state. val newRefTx = nodes[0].services.startFlow(CreateRefState()).resultFuture.getOrThrow() val newRefState = newRefTx.tx.outRefsOfType().single() @@ -54,7 +64,8 @@ class WithReferencedStatesFlowTests { val updatedRefState = updatedRefTx.tx.outRefsOfType().single() // 4. Try to use the old reference state. This will throw a NotaryException. - val useRefTx = nodes[1].services.startFlow(WithReferencedStatesFlow { UseRefState(newRefState.state.data.linearId) }).resultFuture + val nodeOneIdentity = nodes[1].info.legalIdentities.first() + val useRefTx = nodes[1].services.startFlow(WithReferencedStatesFlow { UseRefState(nodeOneIdentity, newRefState.state.data.linearId) }).resultFuture // 5. Share the update reference state. nodes[0].services.startFlow(Initiator(updatedRefState)).resultFuture.getOrThrow() @@ -64,6 +75,40 @@ class WithReferencedStatesFlowTests { assertEquals(updatedRefState.ref, result.tx.references.single()) } + @Test + fun `check ref state is persisted when used in tx with relevant states`() { + // 1. Create a state to be used as a reference state. Don't share it. + val newRefTx = nodes[0].services.startFlow(CreateRefState()).resultFuture.getOrThrow() + val newRefState = newRefTx.tx.outRefsOfType().single() + // 2. Use the "newRefState" a transaction involving another party (nodes[1]) which creates a new state. They should store the new state and the reference state. + val newTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId)).resultFuture.getOrThrow() + // Wait until node 1 stores the new tx. + nodes[1].services.validatedTransactions.updates.filter { it.id == newTx.id }.toFuture().getOrThrow() + // Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!). + val allRefStates = nodes[1].services.vaultService.queryBy() + // nodes[1] should have two states. The newly created output and the reference state created by nodes[0]. + assertEquals(2, allRefStates.states.size) + // Now let's find the specific reference state on nodes[1]. + val refStateLinearId = newRefState.state.data.linearId + val query = QueryCriteria.LinearStateQueryCriteria(linearId = listOf(refStateLinearId)) + val theReferencedState = nodes[1].services.vaultService.queryBy(query) + // There should be one result - the reference state. + assertEquals(newRefState, theReferencedState.states.single()) + println(theReferencedState.statesMetadata.single()) + // nodes[0] should also have the same state. + val nodeZeroQuery = QueryCriteria.LinearStateQueryCriteria(linearId = listOf(refStateLinearId)) + val theReferencedStateOnNodeZero = nodes[0].services.vaultService.queryBy(nodeZeroQuery) + assertEquals(newRefState, theReferencedStateOnNodeZero.states.single()) + println(theReferencedStateOnNodeZero.statesMetadata.single()) + // nodes[0] sends the tx that created the reference state to nodes[1]. + nodes[0].services.startFlow(Initiator(newRefState)).resultFuture.getOrThrow() + // Query again. + val theReferencedStateAgain = nodes[1].services.vaultService.queryBy(query) + // There should be one result - the reference state. + assertEquals(newRefState, theReferencedStateAgain.states.single()) + println(theReferencedStateAgain.statesMetadata.single()) + } + // A dummy reference state contract. class RefState : Contract { companion object { @@ -135,7 +180,8 @@ class WithReferencedStatesFlowTests { } // A flow to use a reference state in another transaction. - class UseRefState(private val linearId: UniqueIdentifier) : FlowLogic() { + @InitiatingFlow + class UseRefState(private val participant: Party, private val linearId: UniqueIdentifier) : FlowLogic() { @Suspendable override fun call(): SignedTransaction { val notary = serviceHub.networkMapCache.notaryIdentities.first() @@ -147,10 +193,23 @@ class WithReferencedStatesFlowTests { val stx = serviceHub.signInitialTransaction(TransactionBuilder(notary = notary).apply { addReferenceState(referenceState.referenced()) - addOutputState(RefState.State(ourIdentity), RefState.CONTRACT_ID) + addOutputState(RefState.State(participant), RefState.CONTRACT_ID) addCommand(RefState.Create(), listOf(ourIdentity.owningKey)) }) - return subFlow(FinalityFlow(stx, emptyList())) + return if (participant != ourIdentity) { + subFlow(FinalityFlow(stx, listOf(initiateFlow(participant)))) + } else { + subFlow(FinalityFlow(stx, emptyList())) + } + } + } + + @InitiatedBy(UseRefState::class) + class UseRefStateResponder(val otherSession: FlowSession) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + // This should also store the reference state if one is there. + return subFlow(ReceiveFinalityFlow(otherSession, statesToRecord = StatesToRecord.ONLY_RELEVANT)) } } } diff --git a/core/src/test/kotlin/net/corda/core/node/VaultUpdateTests.kt b/core/src/test/kotlin/net/corda/core/node/VaultUpdateTests.kt index 750a76af04..53c9e1ae6a 100644 --- a/core/src/test/kotlin/net/corda/core/node/VaultUpdateTests.kt +++ b/core/src/test/kotlin/net/corda/core/node/VaultUpdateTests.kt @@ -16,7 +16,7 @@ class VaultUpdateTests { private companion object { const val DUMMY_PROGRAM_ID = "net.corda.core.node.VaultUpdateTests.DummyContract" val DUMMY_NOTARY = TestIdentity(DUMMY_NOTARY_NAME, 20).party - val emptyUpdate = Vault.Update(emptySet(), emptySet(), type = Vault.UpdateType.GENERAL) + val emptyUpdate = Vault.Update(emptySet(), emptySet(), type = Vault.UpdateType.GENERAL, references = emptySet()) } object DummyContract : Contract { diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 17a256ce29..ea2c0fca1f 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -101,59 +101,73 @@ class NodeVaultService( } } + private fun saveStates(session: Session, states: Map>) { + states.forEach { stateAndRef -> + val stateOnly = stateAndRef.value.state.data + // TODO: Optimise this. + // + // For EVERY state to be committed to the vault, this checks whether it is spendable by the recording + // node. The behaviour is as follows: + // + // 1) All vault updates marked as RELEVANT will, of course, all have relevancy_status = 1 in the + // "vault_states" table. + // 2) For ALL_VISIBLE updates, those which are not relevant according to the relevancy rules will have + // relevancy_status = 0 in the "vault_states" table. + // + // This is useful when it comes to querying for fungible states, when we do not want irrelevant states + // included in the result. + // + // The same functionality could be obtained by passing in a list of participants to the vault query, + // however this: + // + // * requires a join on the participants table which results in slow queries + // * states may flip from being non-relevant to relevant + // * it's more complicated for CorDapp developers + // + // Adding a new column in the "VaultStates" table was considered the best approach. + val keys = stateOnly.participants.map { it.owningKey } + val persistentStateRef = PersistentStateRef(stateAndRef.key) + // This check is done to set the "relevancyStatus". When one performs a vault query, it is possible to return ALL states, ONLY + // RELEVANT states or NOT relevant states. + val isRelevant = isRelevant(stateOnly, keyManagementService.filterMyKeys(keys).toSet()) + val constraintInfo = Vault.ConstraintInfo(stateAndRef.value.state.constraint) + // Save a row for each party in the state_party table. + // TODO: Perhaps these can be stored in a batch? + stateOnly.participants.groupBy { it.owningKey }.forEach { participants -> + val persistentParty = VaultSchemaV1.PersistentParty(persistentStateRef, participants.value.first()) + session.save(persistentParty) + } + val stateToAdd = VaultSchemaV1.VaultStates( + notary = stateAndRef.value.state.notary, + contractStateClassName = stateAndRef.value.state.data.javaClass.name, + stateStatus = Vault.StateStatus.UNCONSUMED, + recordedTime = clock.instant(), + relevancyStatus = if (isRelevant) Vault.RelevancyStatus.RELEVANT else Vault.RelevancyStatus.NOT_RELEVANT, + constraintType = constraintInfo.type(), + constraintData = constraintInfo.data() + ) + stateToAdd.stateRef = persistentStateRef + session.save(stateToAdd) + } + } + private fun recordUpdate(update: Vault.Update): Vault.Update { if (!update.isEmpty()) { val producedStateRefs = update.produced.map { it.ref } val producedStateRefsMap = update.produced.associateBy { it.ref } val consumedStateRefs = update.consumed.map { it.ref } + val referenceStateRefsMap = update.references.associateBy { it.ref } log.trace { "Removing $consumedStateRefs consumed contract states and adding $producedStateRefs produced contract states to the database." } val session = currentDBSession() - producedStateRefsMap.forEach { stateAndRef -> - val stateOnly = stateAndRef.value.state.data - // TODO: Optimise this. - // - // For EVERY state to be committed to the vault, this checks whether it is spendable by the recording - // node. The behaviour is as follows: - // - // 1) All vault updates marked as RELEVANT will, of course, all have relevancy_status = 1 in the - // "vault_states" table. - // 2) For ALL_VISIBLE updates, those which are not relevant according to the relevancy rules will have - // relevancy_status = 0 in the "vault_states" table. - // - // This is useful when it comes to querying for fungible states, when we do not want irrelevant states - // included in the result. - // - // The same functionality could be obtained by passing in a list of participants to the vault query, - // however this: - // - // * requires a join on the participants table which results in slow queries - // * states may flip from being non-relevant to relevant - // * it's more complicated for CorDapp developers - // - // Adding a new column in the "VaultStates" table was considered the best approach. - val keys = stateOnly.participants.map { it.owningKey } - val persistentStateRef = PersistentStateRef(stateAndRef.key) - val isRelevant = isRelevant(stateOnly, keyManagementService.filterMyKeys(keys).toSet()) - val constraintInfo = Vault.ConstraintInfo(stateAndRef.value.state.constraint) - // Save a row for each party in the state_party table. - // TODO: Perhaps these can be stored in a batch? - stateOnly.participants.groupBy { it.owningKey }.forEach { participants -> - val persistentParty = VaultSchemaV1.PersistentParty(persistentStateRef, participants.value.first()) - session.save(persistentParty) - } - val stateToAdd = VaultSchemaV1.VaultStates( - notary = stateAndRef.value.state.notary, - contractStateClassName = stateAndRef.value.state.data.javaClass.name, - stateStatus = Vault.StateStatus.UNCONSUMED, - recordedTime = clock.instant(), - relevancyStatus = if (isRelevant) Vault.RelevancyStatus.RELEVANT else Vault.RelevancyStatus.NOT_RELEVANT, - constraintType = constraintInfo.type(), - constraintData = constraintInfo.data() - ) - stateToAdd.stateRef = persistentStateRef - session.save(stateToAdd) - } + + // Persist the outputs. + saveStates(session, producedStateRefsMap) + + // Persist the reference states. + saveStates(session, referenceStateRefsMap) + + // Persist the consumed inputs. consumedStateRefs.forEach { stateRef -> val state = session.get(VaultSchemaV1.VaultStates::class.java, PersistentStateRef(stateRef)) state?.run { @@ -168,6 +182,7 @@ class NodeVaultService( session.save(state) } } + } return update } @@ -208,16 +223,34 @@ class NodeVaultService( StatesToRecord.ALL_VISIBLE -> tx.outputs.withIndex() }.map { tx.outRef(it.index) } - // Retrieve all unconsumed states for this transaction's inputs + // Retrieve all unconsumed states for this transaction's inputs. val consumedStates = loadStates(tx.inputs) - // Is transaction irrelevant? + // Is transaction irrelevant? If so, then we don't care about the reference states either. if (consumedStates.isEmpty() && ourNewStates.isEmpty()) { log.trace { "tx ${tx.id} was irrelevant to this vault, ignoring" } return null } - return Vault.Update(consumedStates.toSet(), ourNewStates.toSet()) + // This list should only contain NEW states which we have not seen before as an output in another transaction. If we can't + // obtain the references from the vault then the reference must be a state we have not seen before, therefore we should store it + // in the vault. If StateToRecord is set to ALL_VISIBLE or ONLY_RELEVANT then we should store all of the previously unseen + // states in the reference list. The assumption is that we might need to inspect them at some point if they were referred to + // in the contracts of the input or output states. If states to record is none then we shouldn't record any reference states. + val newReferenceStateAndRefs = if (tx.references.isEmpty()) { + emptyList() + } else { + when (statesToRecord) { + StatesToRecord.NONE -> throw AssertionError("Should not reach here") + StatesToRecord.ALL_VISIBLE, StatesToRecord.ONLY_RELEVANT -> { + val notSeenReferences = tx.references - loadStates(tx.references).map { it.ref } + // TODO: This is expensive - is there another way? + tx.toLedgerTransaction(servicesForResolution).references.filter { it.ref in notSeenReferences } + } + } + } + + return Vault.Update(consumedStates.toSet(), ourNewStates.toSet(), references = newReferenceStateAndRefs.toSet()) } fun resolveAndMakeUpdate(tx: CoreTransaction): Vault.Update? { @@ -244,12 +277,14 @@ class NodeVaultService( return null } + val referenceStateAndRefs = ltx.references + val updateType = if (tx is ContractUpgradeWireTransaction) { Vault.UpdateType.CONTRACT_UPGRADE } else { Vault.UpdateType.NOTARY_CHANGE } - return Vault.Update(consumedStateAndRefs.toSet(), producedStateAndRefs.toSet(), null, updateType) + return Vault.Update(consumedStateAndRefs.toSet(), producedStateAndRefs.toSet(), null, updateType, referenceStateAndRefs.toSet()) } @@ -293,7 +328,7 @@ class NodeVaultService( softLockReserve(uuid, stateRefs) } } - persistentStateService.persist(vaultUpdate.produced) + persistentStateService.persist(vaultUpdate.produced + vaultUpdate.references) updatesPublisher.onNext(vaultUpdate) } }