CORDA-2406: FinalityFlow can support mix of participants using the new and old APIs (#4532)

Otherwise it's impossible to finalise a transaction and the participants are a mix of those using the new API and those using the old.
This commit is contained in:
Shams Asari 2019-01-12 12:01:23 +00:00 committed by GitHub
parent 0832587096
commit 6f14a9f0b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 117 additions and 46 deletions

View File

@ -12,6 +12,7 @@ import net.corda.core.node.StatesToRecord.ONLY_RELEVANT
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.debug
/**
* Verifies the given transaction, then sends it to the named notary. If the notary agrees that the transaction
@ -29,44 +30,74 @@ import net.corda.core.utilities.ProgressTracker
* The flow returns the same transaction but with the additional signatures from the notary.
*
* NOTE: This is an inlined flow but for backwards compatibility is annotated with [InitiatingFlow].
*
* @param transaction What to commit.
* @param sessions A collection of [FlowSession]s who will be given the notarised transaction. This list **must** include
* all participants in the transaction (excluding the local identity).
*/
// To maintain backwards compatibility with the old API, FinalityFlow can act both as an initiating flow and as an inlined flow.
// This is only possible because a flow is only truly initiating when the first call to initiateFlow is made (where the
// presence of @InitiatingFlow is checked). So the new API is inlined simply because that code path doesn't call initiateFlow.
@InitiatingFlow
class FinalityFlow private constructor(val transaction: SignedTransaction,
private val extraRecipients: Set<Party>,
private val oldParticipants: Collection<Party>,
override val progressTracker: ProgressTracker,
private val sessions: Collection<FlowSession>?) : FlowLogic<SignedTransaction>() {
private val sessions: Collection<FlowSession>,
private val newApi: Boolean) : FlowLogic<SignedTransaction>() {
@Deprecated(DEPRECATION_MSG)
constructor(transaction: SignedTransaction, extraRecipients: Set<Party>, progressTracker: ProgressTracker) : this(
transaction, extraRecipients, progressTracker, null
transaction, extraRecipients, progressTracker, emptyList(), false
)
@Deprecated(DEPRECATION_MSG)
constructor(transaction: SignedTransaction, extraRecipients: Set<Party>) : this(transaction, extraRecipients, tracker(), null)
constructor(transaction: SignedTransaction, extraRecipients: Set<Party>) : this(transaction, extraRecipients, tracker(), emptyList(), false)
@Deprecated(DEPRECATION_MSG)
constructor(transaction: SignedTransaction) : this(transaction, emptySet(), tracker(), null)
constructor(transaction: SignedTransaction) : this(transaction, emptySet(), tracker(), emptyList(), false)
@Deprecated(DEPRECATION_MSG)
constructor(transaction: SignedTransaction, progressTracker: ProgressTracker) : this(transaction, emptySet(), progressTracker, null)
constructor(transaction: SignedTransaction, progressTracker: ProgressTracker) : this(transaction, emptySet(), progressTracker, emptyList(), false)
constructor(transaction: SignedTransaction, sessions: Collection<FlowSession>, progressTracker: ProgressTracker) : this(
transaction, emptySet(), progressTracker, sessions
)
constructor(transaction: SignedTransaction, sessions: Collection<FlowSession>) : this(
transaction, emptySet(), tracker(), sessions
)
/**
* Notarise the given transaction and broadcast it to the given [FlowSession]s. This list **must** at least include
* all the non-local participants of the transaction. Sessions to non-participants can also be provided.
*
* @param transaction What to commit.
*/
constructor(transaction: SignedTransaction, firstSession: FlowSession, vararg restSessions: FlowSession) : this(
transaction, emptySet(), tracker(), listOf(firstSession) + restSessions.asList()
transaction, listOf(firstSession) + restSessions.asList()
)
/**
* Notarise the given transaction and broadcast it to all the participants.
*
* @param transaction What to commit.
* @param sessions A collection of [FlowSession]s for each non-local participant of the transaction. Sessions to non-participants can
* also be provided.
*/
@JvmOverloads
constructor(
transaction: SignedTransaction,
sessions: Collection<FlowSession>,
progressTracker: ProgressTracker = tracker()
) : this(transaction, emptyList(), progressTracker, sessions, true)
/**
* Notarise the given transaction and broadcast it to all the participants.
*
* @param transaction What to commit.
* @param sessions A collection of [FlowSession]s for each non-local participant.
* @param oldParticipants An **optional** collection of parties for participants who are still using the old API.
*
* You will only need to use this parameter if you have upgraded your CorDapp from the V3 FinalityFlow API but are required to provide
* backwards compatibility with participants running V3 nodes. If you're writing a new CorDapp then this does not apply and this
* parameter should be ignored.
*/
@Deprecated(DEPRECATION_MSG)
constructor(
transaction: SignedTransaction,
sessions: Collection<FlowSession>,
oldParticipants: Collection<Party>,
progressTracker: ProgressTracker
) : this(transaction, oldParticipants, progressTracker, sessions, true)
companion object {
private const val DEPRECATION_MSG = "It is unsafe to use this constructor as it requires nodes to automatically " +
"accept notarised transactions without first checking their relevancy. Instead, use one of the constructors " +
"that takes in existing FlowSessions."
"that requires only FlowSessions."
object NOTARISING : ProgressTracker.Step("Requesting signature by notary service") {
override fun childProgressTracker() = NotaryFlow.Client.tracker()
@ -81,7 +112,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
@Suspendable
@Throws(NotaryException::class)
override fun call(): SignedTransaction {
if (sessions == null) {
if (!newApi) {
require(CordappResolver.currentTargetVersion < 4) {
"A flow session for each external participant to the transaction must be provided. If you wish to continue " +
"using this insecure API then specify a target platform version of less than 4 for your CorDapp."
@ -103,30 +134,25 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
transaction.pushToLoggingContext()
logCommandData()
val ledgerTransaction = verifyTx()
val externalParticipants = extractExternalParticipants(ledgerTransaction)
val externalTxParticipants = extractExternalParticipants(ledgerTransaction)
if (sessions != null) {
val missingRecipients = externalParticipants - sessions.map { it.counterparty }
if (newApi) {
val sessionParties = sessions.map { it.counterparty }
val missingRecipients = externalTxParticipants - sessionParties - oldParticipants
require(missingRecipients.isEmpty()) {
"Flow sessions were not provided for the following transaction participants: $missingRecipients"
}
sessionParties.intersect(oldParticipants).let {
require(it.isEmpty()) { "The following parties are specified both in flow sessions and in the oldParticipants list: $it" }
}
}
val notarised = notariseAndRecord()
// Each transaction has its own set of recipients, but extra recipients get them all.
progressTracker.currentStep = BROADCASTING
if (sessions == null) {
val recipients = externalParticipants + (extraRecipients - serviceHub.myInfo.legalIdentities)
logger.info("Broadcasting transaction to parties ${recipients.joinToString(", ", "[", "]")}.")
for (recipient in recipients) {
logger.info("Sending transaction to party ${recipient.name}.")
val session = initiateFlow(recipient)
subFlow(SendTransactionFlow(session, notarised))
logger.info("Party $recipient received the transaction.")
}
} else {
if (newApi) {
oldV3Broadcast(notarised, oldParticipants.toSet())
for (session in sessions) {
try {
subFlow(SendTransactionFlow(session, notarised))
@ -140,6 +166,8 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
)
}
}
} else {
oldV3Broadcast(notarised, (externalTxParticipants + oldParticipants).toSet())
}
logger.info("All parties received the transaction successfully.")
@ -147,6 +175,18 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
return notarised
}
@Suspendable
private fun oldV3Broadcast(notarised: SignedTransaction, recipients: Set<Party>) {
for (recipient in recipients) {
if (!serviceHub.myInfo.isLegalIdentity(recipient)) {
logger.debug { "Sending transaction to party $recipient." }
val session = initiateFlow(recipient)
subFlow(SendTransactionFlow(session, notarised))
logger.info("Party $recipient received the transaction.")
}
}
}
private fun logCommandData() {
if (logger.isDebugEnabled) {
val commandDataTypes = transaction.tx.commands.asSequence().mapNotNull { it.value::class.qualifiedName }.distinct()

View File

@ -3,6 +3,7 @@ package net.corda.core.flows
import com.natpryce.hamkrest.and
import com.natpryce.hamkrest.assertion.assert
import net.corda.core.flows.mixins.WithFinality
import net.corda.core.flows.mixins.WithFinality.FinalityInvoker
import net.corda.core.identity.Party
import net.corda.core.internal.cordapp.CordappResolver
import net.corda.core.transactions.SignedTransaction
@ -75,15 +76,29 @@ class FinalityFlowTests : WithFinality {
@Test
fun `allow use of the old API if the CorDapp target version is 3`() {
// We need Bob to load at least one old CorDapp so that its FinalityHandler is enabled
val bob = createBob(cordapps = listOf(cordappWithPackages("com.template").copy(targetPlatformVersion = 3)))
val stx = aliceNode.issuesCashTo(bob)
val oldBob = createBob(cordapps = listOf(tokenOldCordapp()))
val stx = aliceNode.issuesCashTo(oldBob)
val resultFuture = CordappResolver.withCordapp(targetPlatformVersion = 3) {
@Suppress("DEPRECATION")
aliceNode.startFlowAndRunNetwork(FinalityFlow(stx)).resultFuture
}
resultFuture.getOrThrow()
assertThat(bob.services.validatedTransactions.getTransaction(stx.id)).isNotNull()
assertThat(oldBob.services.validatedTransactions.getTransaction(stx.id)).isNotNull()
}
@Test
fun `broadcasting to both new and old participants`() {
val newCharlie = mockNet.createNode(InternalMockNodeParameters(legalName = CHARLIE_NAME))
val oldBob = createBob(cordapps = listOf(tokenOldCordapp()))
val stx = aliceNode.issuesCashTo(oldBob)
val resultFuture = aliceNode.startFlowAndRunNetwork(FinalityInvoker(
stx,
newRecipients = setOf(newCharlie.info.singleIdentity()),
oldRecipients = setOf(oldBob.info.singleIdentity())
)).resultFuture
resultFuture.getOrThrow()
assertThat(newCharlie.services.validatedTransactions.getTransaction(stx.id)).isNotNull()
assertThat(oldBob.services.validatedTransactions.getTransaction(stx.id)).isNotNull()
}
private fun createBob(cordapps: List<TestCordappInternal> = emptyList()): TestStartedNode {
@ -100,4 +115,7 @@ class FinalityFlowTests : WithFinality {
Cash().generateIssue(builder, amount, other, notary)
return services.signInitialTransaction(builder)
}
/** "Old" CorDapp which will force its node to keep its FinalityHandler enabled */
private fun tokenOldCordapp() = cordappWithPackages("com.template").copy(targetPlatformVersion = 3)
}

View File

@ -17,7 +17,7 @@ import net.corda.testing.node.internal.TestStartedNode
interface WithFinality : WithMockNet {
//region Operations
fun TestStartedNode.finalise(stx: SignedTransaction, vararg recipients: Party): FlowStateMachine<SignedTransaction> {
return startFlowAndRunNetwork(FinalityInvoker(stx, recipients.toSet()))
return startFlowAndRunNetwork(FinalityInvoker(stx, recipients.toSet(), emptySet()))
}
fun TestStartedNode.getValidatedTransaction(stx: SignedTransaction): SignedTransaction {
@ -25,7 +25,7 @@ interface WithFinality : WithMockNet {
}
fun CordaRPCOps.finalise(stx: SignedTransaction, vararg recipients: Party): FlowHandle<SignedTransaction> {
return startFlow(::FinalityInvoker, stx, recipients.toSet()).andRunNetwork()
return startFlow(::FinalityInvoker, stx, recipients.toSet(), emptySet()).andRunNetwork()
}
//endregion
@ -41,11 +41,13 @@ interface WithFinality : WithMockNet {
@InitiatingFlow
@StartableByRPC
class FinalityInvoker(private val transaction: SignedTransaction,
private val recipients: Set<Party>) : FlowLogic<SignedTransaction>() {
private val newRecipients: Set<Party>,
private val oldRecipients: Set<Party>) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val sessions = recipients.map(::initiateFlow)
return subFlow(FinalityFlow(transaction, sessions))
val sessions = newRecipients.map(::initiateFlow)
@Suppress("DEPRECATION")
return subFlow(FinalityFlow(transaction, sessions, oldRecipients, FinalityFlow.tracker()))
}
}

View File

@ -125,6 +125,9 @@ This is a three step process:
2. Change or create the flow that will receive the finalised transaction.
3. Make sure your application's minimum and target version numbers are both set to 4 (see step 2).
Upgrading a non-initiating flow
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
As an example, let's take a very simple flow that finalises a transaction without the involvement of a counterpart flow:
.. container:: codeset
@ -175,10 +178,17 @@ to record the finalised transaction:
:end-before: DOCEND SimpleNewResponderFlow
:dedent: 4
For flows which are already initiating counterpart flows then it's a simple matter of using the existing flow session.
.. note:: All the nodes in your business network will need the new CorDapp, otherwise they won't know how to receive the transaction. **This
includes nodes which previously didn't have the old CorDapp.** If a node is sent a transaction and it doesn't have the new CorDapp loaded
then simply restart it with the CorDapp and the transaction will be recorded.
Upgrading an initiating flow
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
For flows which are already initiating counterpart flows then it's a matter of using the existing flow session.
Note however, the new ``FinalityFlow`` is inlined and so the sequence of sends and receives between the two flows will
change and will be incompatible with your current flows. You can use the flow version API to write your flows in a
backwards compatible way.
backwards compatible manner.
Here's what an upgraded initiating flow may look like:
@ -212,8 +222,9 @@ finalised transaction. If the initiator is written in a backwards compatible way
:end-before: DOCEND ExistingResponderFlow
:dedent: 12
The responder flow may be waiting for the finalised transaction to appear in the local node's vault using ``waitForLedgerCommit``.
This is no longer necessary with ``ReceiveFinalityFlow`` and the call to ``waitForLedgerCommit`` can be removed.
You may already be using ``waitForLedgerCommit`` in your responder flow for the finalised transaction to appear in the local node's vault.
Now that it's calling ``ReceiveFinalityFlow``, which effectively does the same thing, this is no longer necessary. The call to
``waitForLedgerCommit`` should be removed.
Step 4. Security: Upgrade your use of SwapIdentitiesFlow
--------------------------------------------------------