CORDA-2005: FinalityFlow has been made into an inlined flow to resolve issue with FinalityHandler (#4050)

FinalityHandler is insecure in that it is open to receive any transaction from any party.

Any CorDapp targeting platform version 4 or above is required use the new c'tors which take in FlowSession objects to the counterpart flow. This flow must subcall ReceiveFinalityFlow to receive and record the finalised transaction.

Old CorDapps (with target platform version < 4) will continue to work as previously. However if there are no old CorDapps loaded then the node will disable FinalityHandler.
This commit is contained in:
Shams Asari
2018-11-14 14:16:22 +00:00
committed by GitHub
parent 8e6d4b4b38
commit e8b6f5f2f2
76 changed files with 1251 additions and 469 deletions

View File

@ -33,7 +33,6 @@ import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
class FlowsDrainingModeContentionTest {
private val portAllocation = PortAllocation.Incremental(10000)
private val user = User("mark", "dadada", setOf(all()))
private val users = listOf(user)
@ -90,7 +89,7 @@ class ProposeTransactionAndWaitForCommit(private val data: String,
subFlow(SendTransactionFlow(session, signedTx))
session.send(myRpcInfo)
return waitForLedgerCommit(signedTx.id)
return subFlow(ReceiveFinalityFlow(session, expectedTxId = signedTx.id))
}
}
@ -104,7 +103,7 @@ class SignTransactionTriggerDrainingModeAndFinality(private val session: FlowSes
triggerDrainingModeForInitiatingNode(initiatingRpcInfo)
subFlow(FinalityFlow(signedTx, setOf(session.counterparty)))
subFlow(FinalityFlow(signedTx, session))
}
private fun triggerDrainingModeForInitiatingNode(initiatingRpcInfo: RpcInfo) {

View File

@ -136,6 +136,6 @@ class SendMessageFlow(private val message: Message, private val notary: Party) :
val signedTx = serviceHub.signInitialTransaction(txBuilder)
progressTracker.currentStep = FINALISING_TRANSACTION
return subFlow(FinalityFlow(signedTx, FINALISING_TRANSACTION.childProgressTracker()))
return subFlow(FinalityFlow(signedTx, emptyList(), FINALISING_TRANSACTION.childProgressTracker()))
}
}

View File

@ -4,9 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
import com.google.common.collect.ImmutableList
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow
@ -32,6 +30,7 @@ import kotlin.test.assertEquals
class ScheduledFlowIntegrationTests {
@StartableByRPC
@InitiatingFlow
class InsertInitialStateFlow(private val destination: Party,
private val notary: Party,
private val identity: Int = 1,
@ -44,11 +43,20 @@ class ScheduledFlowIntegrationTests {
.addOutputState(scheduledState, DummyContract.PROGRAM_ID)
.addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx))
subFlow(FinalityFlow(tx, initiateFlow(destination)))
}
}
@InitiatedBy(InsertInitialStateFlow::class)
class InsertInitialStateResponderFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(ReceiveFinalityFlow(otherSide))
}
}
@StartableByRPC
@InitiatingFlow
class AnotherFlow(private val identity: String) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
@ -64,7 +72,15 @@ class ScheduledFlowIntegrationTests {
.addOutputState(outputState, DummyContract.PROGRAM_ID)
.addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx, outputState.participants.toSet()))
subFlow(FinalityFlow(tx, initiateFlow(state.state.data.destination)))
}
}
@InitiatedBy(AnotherFlow::class)
class AnotherResponderFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(ReceiveFinalityFlow(otherSide))
}
}

View File

@ -2,10 +2,7 @@ package net.corda.testMessage
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.*
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.SchedulableFlow
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.NonEmptySet
@ -16,6 +13,7 @@ import java.util.*
import kotlin.reflect.jvm.jvmName
@SchedulableFlow
@InitiatingFlow
class ScheduledFlow(private val stateRef: StateRef) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
@ -35,7 +33,15 @@ class ScheduledFlow(private val stateRef: StateRef) : FlowLogic<Unit>() {
.addOutputState(newStateOutput, DummyContract.PROGRAM_ID)
.addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx, setOf(scheduledState.destination)))
subFlow(FinalityFlow(tx, initiateFlow(scheduledState.destination)))
}
}
@InitiatedBy(ScheduledFlow::class)
class ScheduledResponderFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(ReceiveFinalityFlow(otherSide))
}
}

View File

@ -49,9 +49,9 @@ class SendMessageFlow(private val message: Message, private val notary: Party, p
return if (reciepent != null) {
val session = initiateFlow(reciepent)
subFlow(SendTransactionFlow(session, signedTx))
subFlow(FinalityFlow(signedTx, setOf(reciepent), FINALISING_TRANSACTION.childProgressTracker()))
subFlow(FinalityFlow(signedTx, listOf(session), FINALISING_TRANSACTION.childProgressTracker()))
} else {
subFlow(FinalityFlow(signedTx, FINALISING_TRANSACTION.childProgressTracker()))
subFlow(FinalityFlow(signedTx, emptyList(), FINALISING_TRANSACTION.childProgressTracker()))
}
}
}

View File

@ -649,12 +649,39 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
}
private fun installCoreFlows() {
flowManager.registerInitiatedCoreFlowFactory(FinalityFlow::class, FinalityHandler::class, ::FinalityHandler)
installFinalityHandler()
flowManager.registerInitiatedCoreFlowFactory(NotaryChangeFlow::class, NotaryChangeHandler::class, ::NotaryChangeHandler)
flowManager.registerInitiatedCoreFlowFactory(ContractUpgradeFlow.Initiate::class, NotaryChangeHandler::class, ::ContractUpgradeHandler)
// TODO Make this an inlined flow (and remove this flow mapping!), which should be possible now that FinalityFlow is also inlined
flowManager.registerInitiatedCoreFlowFactory(SwapIdentitiesFlow::class, SwapIdentitiesHandler::class, ::SwapIdentitiesHandler)
}
// The FinalityHandler is insecure as it blindly accepts any and all transactions into the node's local vault without doing any checks.
// To plug this hole, the sending-side FinalityFlow has been made inlined with an inlined ReceiveFinalityFlow counterpart. The old
// FinalityFlow API is gated to only work with old CorDapps (those whose target platform version < 4), and the FinalityHandler will only
// work if there is at least one old CorDapp loaded (to preserve backwards compatibility).
//
// If an attempt is made to send us a transaction via FinalityHandler, and it's disabled, we will reject the request at the session-init
// level by throwing a FinalityHandlerDisabled exception. This is picked up by the flow hospital which will not send the error back
// (immediately) and instead pause the request by keeping it un-acknowledged in the message broker. This means the request isn't lost
// across node restarts and allows the node operator time to accept or reject the request.
// TODO Add public API to allow the node operator to accept or reject
private fun installFinalityHandler() {
// Disable the insecure FinalityHandler if none of the loaded CorDapps are old enough to require it.
val cordappsNeedingFinalityHandler = cordappLoader.cordapps.filter { it.info.targetPlatformVersion < 4 }
if (cordappsNeedingFinalityHandler.isEmpty()) {
log.info("FinalityHandler is disabled as there are no CorDapps loaded which require it")
} else {
log.warn("FinalityHandler is enabled as there are CorDapps that require it: ${cordappsNeedingFinalityHandler.map { it.info }}. " +
"This is insecure and it is strongly recommended that newer versions of these CorDapps be used instead.")
}
val disabled = cordappsNeedingFinalityHandler.isEmpty()
flowManager.registerInitiatedCoreFlowFactory(FinalityFlow::class, FinalityHandler::class) {
if (disabled) throw SessionRejectException.FinalityHandlerDisabled()
FinalityHandler(it)
}
}
protected open fun makeTransactionStorage(transactionCacheSizeBytes: Long): WritableTransactionStorage {
return DBTransactionStorage(database, cacheFactory)
}

View File

@ -7,21 +7,15 @@ import net.corda.core.contracts.requireThat
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.ContractUpgradeUtils
import net.corda.core.transactions.ContractUpgradeWireTransaction
import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.ContractUpgradeWireTransaction
import net.corda.core.transactions.SignedTransaction
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
class FinalityHandler(private val sender: FlowSession) : FlowLogic<Unit>() {
class FinalityHandler(val sender: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(ReceiveTransactionFlow(sender, true, StatesToRecord.ONLY_RELEVANT))
}
internal fun sender(): Party = sender.counterparty
}
class NotaryChangeHandler(otherSideSession: FlowSession) : AbstractStateReplacementFlow.Acceptor<Party>(otherSideSession) {
@ -54,10 +48,13 @@ class ContractUpgradeHandler(otherSide: FlowSession) : AbstractStateReplacementF
override fun verifyProposal(stx: SignedTransaction, proposal: AbstractStateReplacementFlow.Proposal<Class<out UpgradedContract<ContractState, *>>>) {
// Retrieve signed transaction from our side, we will apply the upgrade logic to the transaction on our side, and
// verify outputs matches the proposed upgrade.
val ourSTX = serviceHub.validatedTransactions.getTransaction(proposal.stateRef.txhash)
requireNotNull(ourSTX) { "We don't have a copy of the referenced state" }
val oldStateAndRef = ourSTX!!.resolveBaseTransaction(serviceHub).outRef<ContractState>(proposal.stateRef.index)
val authorisedUpgrade = serviceHub.contractUpgradeService.getAuthorisedContractUpgrade(oldStateAndRef.ref) ?: throw IllegalStateException("Contract state upgrade is unauthorised. State hash : ${oldStateAndRef.ref}")
val ourSTX = requireNotNull(serviceHub.validatedTransactions.getTransaction(proposal.stateRef.txhash)) {
"We don't have a copy of the referenced state"
}
val oldStateAndRef = ourSTX.resolveBaseTransaction(serviceHub).outRef<ContractState>(proposal.stateRef.index)
val authorisedUpgrade = checkNotNull(serviceHub.contractUpgradeService.getAuthorisedContractUpgrade(oldStateAndRef.ref)) {
"Contract state upgrade is unauthorised. State hash : ${oldStateAndRef.ref}"
}
val proposedTx = stx.coreTransaction as ContractUpgradeWireTransaction
val expectedTx = ContractUpgradeUtils.assembleUpgradeTx(oldStateAndRef, proposal.modification, proposedTx.privacySalt, serviceHub)
requireThat {

View File

@ -12,5 +12,8 @@ open class SessionRejectException(message: String) : CordaException(message) {
class NotAFlow(val initiatorClass: Class<*>) : SessionRejectException("${initiatorClass.name} is not a flow")
class NotRegistered(val initiatorFlowClass: Class<out FlowLogic<*>>) : SessionRejectException("${initiatorFlowClass.name} is not registered")
}
class FinalityHandlerDisabled : SessionRejectException("Counterparty attempting to use the old insecure API of FinalityFlow. However this " +
"API is disabled on this node since there no CorDapps installed that require it. It may be that the counterparty is running an " +
"older verison of a CorDapp.")
}

View File

@ -42,6 +42,9 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
// installed on restart, at which point the message will be able proceed as normal. If not then it will need
// to be dropped manually.
Outcome.OVERNIGHT_OBSERVATION
} else if (error is SessionRejectException.FinalityHandlerDisabled) {
// TODO We need a way to be able to give the green light to such a session-init message
Outcome.OVERNIGHT_OBSERVATION
} else {
Outcome.UNTREATABLE
}
@ -284,7 +287,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
private fun warn(flowLogic: FinalityHandler, flowFiber: FlowFiber, currentState: StateMachineState) {
log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
"the flow by re-starting the node. State machine state: $currentState, initiating party was: ${flowLogic.sender().name}")
"the flow by re-starting the node. State machine state: $currentState, initiating party was: " +
"${flowLogic.sender.counterparty}")
}
}
}

View File

@ -2,11 +2,9 @@ package net.corda.node.modes.draining
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.*
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.SchedulableFlow
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.packageName
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
@ -43,7 +41,10 @@ class ScheduledFlowsDrainingModeTest {
@Before
fun setup() {
mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"), threadPerNode = true)
mockNet = InternalMockNetwork(
cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts", javaClass.packageName),
threadPerNode = true
)
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME))
notary = mockNet.defaultNotaryIdentity
@ -112,6 +113,7 @@ class ScheduledFlowsDrainingModeTest {
override val participants: List<Party> get() = listOf(source, destination)
}
@InitiatingFlow
class InsertInitialStateFlow(private val destination: Party, private val notary: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
@ -120,11 +122,20 @@ class ScheduledFlowsDrainingModeTest {
.addOutputState(scheduledState, DummyContract.PROGRAM_ID)
.addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx))
subFlow(FinalityFlow(tx, initiateFlow(destination)))
}
}
@InitiatedBy(InsertInitialStateFlow::class)
class InsertInitialStateResponderFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(ReceiveFinalityFlow(otherSide))
}
}
@SchedulableFlow
@InitiatingFlow
class ScheduledFlow(private val stateRef: StateRef) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
@ -142,7 +153,15 @@ class ScheduledFlowsDrainingModeTest {
.addOutputState(newStateOutput, DummyContract.PROGRAM_ID)
.addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx, setOf(scheduledState.destination)))
subFlow(FinalityFlow(tx, initiateFlow(scheduledState.destination)))
}
}
@InitiatedBy(ScheduledFlow::class)
class ScheduledResponderFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(ReceiveFinalityFlow(otherSide))
}
}
}

View File

@ -1,8 +1,12 @@
package net.corda.node.services
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.TransactionVerificationException
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.cordapp.CordappInfoResolver
import net.corda.core.internal.packageName
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
@ -16,8 +20,10 @@ import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.internal.*
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
import org.junit.Test
import rx.Observable
class FinalityHandlerTest {
private val mockNet = InternalMockNetwork()
@ -31,51 +37,115 @@ class FinalityHandlerTest {
fun `sent to flow hospital on error and attempted retry on node restart`() {
// Setup a network where only Alice has the finance CorDapp and it sends a cash tx to Bob who doesn't have the
// CorDapp. Bob's FinalityHandler will error when validating the tx.
val alice = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, additionalCordapps = setOf(FINANCE_CORDAPP)))
val alice = mockNet.createNode(InternalMockNodeParameters(
legalName = ALICE_NAME,
additionalCordapps = setOf(FINANCE_CORDAPP)
))
var bob = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME))
var bob = mockNet.createNode(InternalMockNodeParameters(
legalName = BOB_NAME,
// The node disables the FinalityHandler completely if there are no old CorDapps loaded, so we need to add
// a token old CorDapp to keep the handler running.
additionalCordapps = setOf(cordappForPackages(javaClass.packageName).withTargetVersion(3))
))
val stx = TransactionBuilder(mockNet.defaultNotaryIdentity).let {
Cash().generateIssue(
it,
1000.POUNDS.issuedBy(alice.info.singleIdentity().ref(0)),
bob.info.singleIdentity(),
mockNet.defaultNotaryIdentity
)
alice.services.signInitialTransaction(it)
val stx = alice.issueCashTo(bob)
val finalityHandlerId = bob.trackFinalityHandlerId().run {
alice.finaliseWithOldApi(stx)
getOrThrow()
}
val finalityHandlerIdFuture = bob.smm.track()
.updates
.filter { it.logic is FinalityHandler }
.map { it.logic.runId }
.toFuture()
val finalisedTx = alice.services.startFlow(FinalityFlow(stx)).run {
mockNet.runNetwork()
resultFuture.getOrThrow()
}
val finalityHandlerId = finalityHandlerIdFuture.getOrThrow()
bob.assertFlowSentForObservation(finalityHandlerId)
assertThat(bob.getTransaction(finalisedTx.id)).isNull()
bob.assertFlowSentForObservationDueToConstraintError(finalityHandlerId)
assertThat(bob.getTransaction(stx.id)).isNull()
bob = mockNet.restartNode(bob)
// Since we've not done anything to fix the orignal error, we expect the finality handler to be sent to the hospital
// again on restart
bob.assertFlowSentForObservation(finalityHandlerId)
assertThat(bob.getTransaction(finalisedTx.id)).isNull()
bob.assertFlowSentForObservationDueToConstraintError(finalityHandlerId)
assertThat(bob.getTransaction(stx.id)).isNull()
}
private fun TestStartedNode.assertFlowSentForObservation(runId: StateMachineRunId) {
val keptInForObservation = smm.flowHospital
.track()
.let { it.updates.startWith(it.snapshot) }
.ofType(MedicalRecord.Flow::class.java)
.filter { it.flowId == runId && it.outcome == Outcome.OVERNIGHT_OBSERVATION }
@Test
fun `disabled if there are no old CorDapps loaded`() {
val alice = mockNet.createNode(InternalMockNodeParameters(
legalName = ALICE_NAME,
additionalCordapps = setOf(FINANCE_CORDAPP)
))
val bob = mockNet.createNode(InternalMockNodeParameters(
legalName = BOB_NAME,
// Make sure the target version is 4, and not the current platform version which may be greater
additionalCordapps = setOf(FINANCE_CORDAPP.withTargetVersion(4))
))
val stx = alice.issueCashTo(bob)
val finalityFuture = alice.finaliseWithOldApi(stx)
val record = bob.medicalRecordsOfType<MedicalRecord.SessionInit>()
.toBlocking()
.first()
assertThat(keptInForObservation.by).contains(FinalityDoctor)
assertThat(record.outcome).isEqualTo(Outcome.OVERNIGHT_OBSERVATION)
assertThat(record.sender).isEqualTo(alice.info.singleIdentity())
assertThat(record.initiatorFlowClassName).isEqualTo(FinalityFlow::class.java.name)
assertThat(bob.getTransaction(stx.id)).isNull()
// Drop the session-init so that Alice gets the error message
assertThat(finalityFuture).isNotDone()
bob.smm.flowHospital.dropSessionInit(record.id)
mockNet.runNetwork()
assertThatThrownBy {
finalityFuture.getOrThrow()
}.hasMessageContaining("Counterparty attempting to use the old insecure API of FinalityFlow")
}
private fun TestStartedNode.issueCashTo(recipient: TestStartedNode): SignedTransaction {
return TransactionBuilder(mockNet.defaultNotaryIdentity).let {
Cash().generateIssue(
it,
1000.POUNDS.issuedBy(info.singleIdentity().ref(0)),
recipient.info.singleIdentity(),
mockNet.defaultNotaryIdentity
)
services.signInitialTransaction(it)
}
}
private fun TestStartedNode.trackFinalityHandlerId(): CordaFuture<StateMachineRunId> {
return smm
.track()
.updates
.filter { it.logic is FinalityHandler }
.map { it.logic.runId }
.toFuture()
}
private fun TestStartedNode.finaliseWithOldApi(stx: SignedTransaction): CordaFuture<SignedTransaction> {
return CordappInfoResolver.withCordappInfo(targetPlatformVersion = 3) {
@Suppress("DEPRECATION")
services.startFlow(FinalityFlow(stx)).resultFuture.apply {
mockNet.runNetwork()
}
}
}
private inline fun <reified R : MedicalRecord> TestStartedNode.medicalRecordsOfType(): Observable<R> {
return smm
.flowHospital
.track()
.let { it.updates.startWith(it.snapshot) }
.ofType(R::class.java)
}
private fun TestStartedNode.assertFlowSentForObservationDueToConstraintError(runId: StateMachineRunId) {
val observation = medicalRecordsOfType<MedicalRecord.Flow>()
.filter { it.flowId == runId }
.toBlocking()
.first()
assertThat(observation.outcome).isEqualTo(Outcome.OVERNIGHT_OBSERVATION)
assertThat(observation.by).contains(FinalityDoctor)
val error = observation.errors.single()
assertThat(error).isInstanceOf(TransactionVerificationException.ContractConstraintRejection::class.java)
}
private fun TestStartedNode.getTransaction(id: SecureHash): SignedTransaction? {

View File

@ -65,7 +65,7 @@ class ServiceHubConcurrentUsageTest {
val issuer = ourIdentity.ref(OpaqueBytes.of(0))
Cash().generateIssue(builder, 10.DOLLARS.issuedBy(issuer), ourIdentity, notary)
val stx = serviceHub.signInitialTransaction(builder)
return subFlow(FinalityFlow(stx))
return subFlow(FinalityFlow(stx, emptyList()))
}
}
}

View File

@ -143,7 +143,7 @@ class TimedFlowTests {
setTimeWindow(services.clock.instant(), 30.seconds)
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
}
val flow = FinalityFlow(issueTx)
val flow = FinalityFlow(issueTx, emptyList())
val progressTracker = flow.progressTracker
assertNotEquals(ProgressTracker.DONE, progressTracker.currentStep)
val progressTrackerDone = getDoneFuture(flow.progressTracker)

View File

@ -4,11 +4,9 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationOrigin
import net.corda.core.contracts.*
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.SchedulableFlow
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.packageName
import net.corda.core.node.services.VaultService
import net.corda.core.node.services.queryBy
import net.corda.core.node.services.vault.QueryCriteria.VaultQueryCriteria
@ -60,6 +58,7 @@ class ScheduledFlowTests {
override val participants: List<Party> get() = listOf(source, destination)
}
@InitiatingFlow
class InsertInitialStateFlow(private val destination: Party, private val notary: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
@ -68,10 +67,19 @@ class ScheduledFlowTests {
.addOutputState(scheduledState, DummyContract.PROGRAM_ID)
.addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx))
subFlow(FinalityFlow(tx, initiateFlow(destination)))
}
}
@InitiatedBy(InsertInitialStateFlow::class)
class InsertInitialStateResponderFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(ReceiveFinalityFlow(otherSide))
}
}
@InitiatingFlow
@SchedulableFlow
class ScheduledFlow(private val stateRef: StateRef) : FlowLogic<Unit>() {
@Suspendable
@ -90,13 +98,21 @@ class ScheduledFlowTests {
.addOutputState(newStateOutput, DummyContract.PROGRAM_ID)
.addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx, setOf(scheduledState.destination)))
subFlow(FinalityFlow(tx, initiateFlow(scheduledState.destination)))
}
}
@InitiatedBy(ScheduledFlow::class)
class ScheduledResponderFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(ReceiveFinalityFlow(otherSide))
}
}
@Before
fun setup() {
mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"), threadPerNode = true)
mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts", javaClass.packageName), threadPerNode = true)
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME))
notary = mockNet.defaultNotaryIdentity

View File

@ -7,14 +7,11 @@ import co.paralleluniverse.strands.concurrent.Semaphore
import net.corda.client.rpc.notUsed
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.queryBy
@ -30,7 +27,10 @@ import net.corda.core.utilities.unwrap
import net.corda.node.services.persistence.checkpoints
import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyState
import net.corda.testing.core.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.internal.LogHelper
import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin
@ -66,7 +66,7 @@ class FlowFrameworkTests {
@Before
fun setUpMockNet() {
mockNet = InternalMockNetwork(
cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"),
cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts") + FINANCE_CORDAPP,
servicePeerAllocationStrategy = RoundRobin()
)
@ -266,18 +266,20 @@ class FlowFrameworkTests {
}
@Test
fun `wait for transaction`() {
fun waitForLedgerCommit() {
val ptx = TransactionBuilder(notary = notaryIdentity)
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
.addCommand(dummyCommand(alice.owningKey))
val stx = aliceNode.services.signInitialTransaction(ptx)
val committerFiber = aliceNode.registerCordappFlowFactory(WaitingFlows.Waiter::class) {
WaitingFlows.Committer(it)
}.map { it.stateMachine }.map { uncheckedCast<FlowStateMachine<*>, FlowStateMachine<Any>>(it) }
val waiterStx = bobNode.services.startFlow(WaitingFlows.Waiter(stx, alice)).resultFuture
val committerStx = aliceNode.registerCordappFlowFactory(CommitReceiverFlow::class) {
CommitterFlow(it)
}.flatMap { it.stateMachine.resultFuture }
// The waitForLedgerCommit call has to occur on separate flow
val waiterStx = bobNode.services.startFlow(WaiterFlow(stx.id)).resultFuture
val commitReceiverStx = bobNode.services.startFlow(CommitReceiverFlow(stx, alice)).resultFuture
mockNet.runNetwork()
assertThat(waiterStx.getOrThrow()).isEqualTo(committerFiber.getOrThrow().resultFuture.getOrThrow())
assertThat(committerStx.getOrThrow()).isEqualTo(waiterStx.getOrThrow()).isEqualTo(commitReceiverStx.getOrThrow())
}
@Test
@ -287,10 +289,8 @@ class FlowFrameworkTests {
.addCommand(dummyCommand())
val stx = aliceNode.services.signInitialTransaction(ptx)
aliceNode.registerCordappFlowFactory(WaitingFlows.Waiter::class) {
WaitingFlows.Committer(it) { throw Exception("Error") }
}
val waiter = bobNode.services.startFlow(WaitingFlows.Waiter(stx, alice)).resultFuture
aliceNode.registerCordappFlowFactory(CommitReceiverFlow::class) { CommitterFlow(it) { throw Exception("Error") } }
val waiter = bobNode.services.startFlow(CommitReceiverFlow(stx, alice)).resultFuture
mockNet.runNetwork()
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
waiter.getOrThrow()
@ -299,18 +299,10 @@ class FlowFrameworkTests {
@Test
fun `verify vault query service is tokenizable by force checkpointing within a flow`() {
val ptx = TransactionBuilder(notary = notaryIdentity)
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
.addCommand(dummyCommand(alice.owningKey))
val stx = aliceNode.services.signInitialTransaction(ptx)
aliceNode.registerCordappFlowFactory(VaultQueryFlow::class) {
WaitingFlows.Committer(it)
}
val result = bobNode.services.startFlow(VaultQueryFlow(stx, alice)).resultFuture
aliceNode.registerCordappFlowFactory(VaultQueryFlow::class) { InitiatedSendFlow("Hello", it) }
val result = bobNode.services.startFlow(VaultQueryFlow(alice)).resultFuture
mockNet.runNetwork()
assertThat(result.getOrThrow()).isEmpty()
result.getOrThrow()
}
@Test
@ -492,24 +484,27 @@ class FlowFrameworkTests {
}
}
private object WaitingFlows {
@InitiatingFlow
class Waiter(val stx: SignedTransaction, val otherParty: Party) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val otherPartySession = initiateFlow(otherParty)
otherPartySession.send(stx)
return waitForLedgerCommit(stx.id)
}
}
class WaiterFlow(private val txId: SecureHash) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction = waitForLedgerCommit(txId)
}
class Committer(val otherPartySession: FlowSession, val throwException: (() -> Exception)? = null) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val stx = otherPartySession.receive<SignedTransaction>().unwrap { it }
if (throwException != null) throw throwException.invoke()
return subFlow(FinalityFlow(stx, setOf(otherPartySession.counterparty)))
}
@InitiatingFlow
class CommitReceiverFlow(val stx: SignedTransaction, private val otherParty: Party) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val otherPartySession = initiateFlow(otherParty)
otherPartySession.send(stx)
return subFlow(ReceiveFinalityFlow(otherPartySession, expectedTxId = stx.id))
}
}
class CommitterFlow(private val otherPartySession: FlowSession, private val throwException: (() -> Exception)? = null) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val stx = otherPartySession.receive<SignedTransaction>().unwrap { it }
if (throwException != null) throw throwException.invoke()
return subFlow(FinalityFlow(stx, otherPartySession))
}
}
@ -527,16 +522,15 @@ class FlowFrameworkTests {
private class IncorrectCustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty)
@InitiatingFlow
private class VaultQueryFlow(val stx: SignedTransaction, val otherParty: Party) : FlowLogic<List<StateAndRef<ContractState>>>() {
private class VaultQueryFlow(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call(): List<StateAndRef<ContractState>> {
override fun call() {
val otherPartySession = initiateFlow(otherParty)
otherPartySession.send(stx)
// hold onto reference here to force checkpoint of vaultService and thus
// Hold onto reference here to force checkpoint of vaultService and thus
// prove it is registered as a tokenizableService in the node
val vaultQuerySvc = serviceHub.vaultService
waitForLedgerCommit(stx.id)
return vaultQuerySvc.queryBy<ContractState>().states
otherPartySession.receive<Any>()
vaultQuerySvc.queryBy<ContractState>().states
}
}

View File

@ -1,9 +1,7 @@
package net.corda.node.services.vault
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.*
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.node.services.queryBy
@ -26,7 +24,6 @@ import java.util.concurrent.ExecutionException
import kotlin.test.assertEquals
class VaultFlowTest {
private lateinit var mockNetwork: MockNetwork
private lateinit var partyA: StartedMockNode
private lateinit var partyB: StartedMockNode
@ -72,17 +69,26 @@ class VaultFlowTest {
partyB.services.vaultService.queryBy<DummyDealContract.State>().states.size
})
}
}
@InitiatingFlow
class Initiator(private val participants: List<Party>) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val stx = serviceHub.signInitialTransaction(TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
addOutputState(UniqueDummyLinearContract.State(participants, "Dummy linear id"), UNIQUE_DUMMY_LINEAR_CONTRACT_PROGRAM_ID)
addOutputState(DummyDealContract.State(participants, "linear id"), DUMMY_DEAL_PROGRAM_ID)
addCommand(DummyCommandData, listOf(ourIdentity.owningKey))
})
subFlow(FinalityFlow(stx))
@InitiatingFlow
class Initiator(private val participants: List<Party>) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val stx = serviceHub.signInitialTransaction(TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
addOutputState(UniqueDummyLinearContract.State(participants, "Dummy linear id"), UNIQUE_DUMMY_LINEAR_CONTRACT_PROGRAM_ID)
addOutputState(DummyDealContract.State(participants, "linear id"), DUMMY_DEAL_PROGRAM_ID)
addCommand(DummyCommandData, listOf(ourIdentity.owningKey))
})
val sessions = participants.mapNotNull { if (it != ourIdentity) initiateFlow(it) else null }
subFlow(FinalityFlow(stx, sessions))
}
}
@InitiatedBy(Initiator::class)
class Responder(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(ReceiveFinalityFlow(otherSide))
}
}
}

View File

@ -101,12 +101,13 @@ class VaultSoftLockManagerTest {
object CommandDataImpl : CommandData
class ClientLogic(nodePair: NodePair, val state: ContractState) : NodePair.AbstractClientLogic<List<ContractState>>(nodePair) {
override fun callImpl() = run {
subFlow(FinalityFlow(serviceHub.signInitialTransaction(TransactionBuilder(notary = ourIdentity).apply {
override fun callImpl(): List<ContractState> {
val stx = serviceHub.signInitialTransaction(TransactionBuilder(notary = ourIdentity).apply {
addOutputState(state, ContractImpl::class.jvmName)
addCommand(CommandDataImpl, ourIdentity.owningKey)
})))
serviceHub.vaultService.queryBy<ContractState>(VaultQueryCriteria(softLockingCondition = SoftLockingCondition(LOCKED_ONLY))).states.map {
})
subFlow(FinalityFlow(stx, emptyList()))
return serviceHub.vaultService.queryBy<ContractState>(VaultQueryCriteria(softLockingCondition = SoftLockingCondition(LOCKED_ONLY))).states.map {
it.state.data
}
}