ENT-10306 Swap logic from receive finality to receive transaction flows (#7451)

* Swap logic from receive finality to receive transaction flows

* Remote TPV check

* Make finality check more robust

* Make emulation of finality in tests compliant with changes

* Improve deferring of ack when issue transaction

* Remove API checking of SignedTransactionWithDistributionList as added it 4.11 so cannot be incompatible, yet.

* Regenerated API file from 4.10 to check only compatibility with 4.10

* Move function to private

* Revert "Regenerated API file from 4.10 to check only compatibility with 4.10"

This reverts commit 6428f957e1.

* Reset ReceiveTransactionFlow and ReceiveFinalityFlow APIs
This commit is contained in:
Rick Parker 2023-08-23 10:09:42 +01:00 committed by GitHub
parent 4a7a4eb5bb
commit ec261cb0c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 94 additions and 103 deletions

View File

@ -3222,8 +3222,7 @@ public final class net.corda.core.flows.ReceiveFinalityFlow extends net.corda.co
public <init>(net.corda.core.flows.FlowSession) public <init>(net.corda.core.flows.FlowSession)
public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash) public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash)
public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash, net.corda.core.node.StatesToRecord) public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash, net.corda.core.node.StatesToRecord)
public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash, net.corda.core.node.StatesToRecord, Boolean) public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash, net.corda.core.node.StatesToRecord, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash, net.corda.core.node.StatesToRecord, Boolean, int, kotlin.jvm.internal.DefaultConstructorMarker)
@Suspendable @Suspendable
@NotNull @NotNull
public net.corda.core.transactions.SignedTransaction call() public net.corda.core.transactions.SignedTransaction call()
@ -3239,8 +3238,6 @@ public class net.corda.core.flows.ReceiveTransactionFlow extends net.corda.core.
public <init>(net.corda.core.flows.FlowSession, boolean) public <init>(net.corda.core.flows.FlowSession, boolean)
public <init>(net.corda.core.flows.FlowSession, boolean, net.corda.core.node.StatesToRecord) public <init>(net.corda.core.flows.FlowSession, boolean, net.corda.core.node.StatesToRecord)
public <init>(net.corda.core.flows.FlowSession, boolean, net.corda.core.node.StatesToRecord, int, kotlin.jvm.internal.DefaultConstructorMarker) public <init>(net.corda.core.flows.FlowSession, boolean, net.corda.core.node.StatesToRecord, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(net.corda.core.flows.FlowSession, boolean, net.corda.core.node.StatesToRecord, boolean)
public <init>(net.corda.core.flows.FlowSession, boolean, net.corda.core.node.StatesToRecord, boolean, int, kotlin.jvm.internal.DefaultConstructorMarker)
@Suspendable @Suspendable
@NotNull @NotNull
public net.corda.core.transactions.SignedTransaction call() public net.corda.core.transactions.SignedTransaction call()
@ -3334,24 +3331,6 @@ public static final class net.corda.core.flows.SignTransactionFlow$Companion$SIG
public static final class net.corda.core.flows.SignTransactionFlow$Companion$VERIFYING extends net.corda.core.utilities.ProgressTracker$Step public static final class net.corda.core.flows.SignTransactionFlow$Companion$VERIFYING extends net.corda.core.utilities.ProgressTracker$Step
public static final net.corda.core.flows.SignTransactionFlow$Companion$VERIFYING INSTANCE public static final net.corda.core.flows.SignTransactionFlow$Companion$VERIFYING INSTANCE
## ##
@CordaSerializable
public final class net.corda.core.flows.SignedTransactionWithDistributionList extends java.lang.Object
public <init>(net.corda.core.transactions.SignedTransaction, byte[])
@NotNull
public final net.corda.core.transactions.SignedTransaction component1()
@NotNull
public final byte[] component2()
@NotNull
public final net.corda.core.flows.SignedTransactionWithDistributionList copy(net.corda.core.transactions.SignedTransaction, byte[])
public boolean equals(Object)
@NotNull
public final byte[] getDistributionList()
@NotNull
public final net.corda.core.transactions.SignedTransaction getStx()
public int hashCode()
@NotNull
public String toString()
##
public final class net.corda.core.flows.StackFrameDataToken extends java.lang.Object public final class net.corda.core.flows.StackFrameDataToken extends java.lang.Object
public <init>(String) public <init>(String)
@NotNull @NotNull

View File

@ -593,7 +593,9 @@ class FinalityFlowTests : WithFinality {
val txBuilder = DummyContract.move(stateAndRef, newOwner) val txBuilder = DummyContract.move(stateAndRef, newOwner)
val stxn = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey) val stxn = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey)
val sessionWithCounterParty = initiateFlow(newOwner) val sessionWithCounterParty = initiateFlow(newOwner)
subFlow(SendTransactionFlow(stxn, setOf(sessionWithCounterParty), emptySet(), StatesToRecord.ONLY_RELEVANT)) subFlow(object : SendTransactionFlow(stxn, setOf(sessionWithCounterParty), emptySet(), StatesToRecord.ONLY_RELEVANT, true) {
override fun isFinality(): Boolean = true
})
throw UnexpectedFlowEndException("${stxn.id}") throw UnexpectedFlowEndException("${stxn.id}")
} }
} }

View File

@ -5,10 +5,8 @@ import net.corda.core.CordaInternal
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature import net.corda.core.crypto.TransactionSignature
import net.corda.core.crypto.isFulfilledBy import net.corda.core.crypto.isFulfilledBy
import net.corda.core.flows.NotarySigCheck.needsNotarySignature
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.identity.groupAbstractPartyByWellKnownParty import net.corda.core.identity.groupAbstractPartyByWellKnownParty
import net.corda.core.internal.FetchDataFlow
import net.corda.core.internal.PlatformVersionSwitches import net.corda.core.internal.PlatformVersionSwitches
import net.corda.core.internal.ServiceHubCoreInternal import net.corda.core.internal.ServiceHubCoreInternal
import net.corda.core.internal.pushToLoggingContext import net.corda.core.internal.pushToLoggingContext
@ -22,7 +20,6 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.Try import net.corda.core.utilities.Try
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.core.utilities.unwrap
import java.time.Duration import java.time.Duration
/** /**
@ -219,8 +216,6 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
val requiresNotarisation = needsNotarySignature(transaction) val requiresNotarisation = needsNotarySignature(transaction)
val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY
&& serviceHub.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY
if (useTwoPhaseFinality) { if (useTwoPhaseFinality) {
val stxn = if (requiresNotarisation) { val stxn = if (requiresNotarisation) {
recordLocallyAndBroadcast(newPlatformSessions, transaction) recordLocallyAndBroadcast(newPlatformSessions, transaction)
@ -285,7 +280,9 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
try { try {
logger.debug { "Sending transaction to party sessions: $sessions." } logger.debug { "Sending transaction to party sessions: $sessions." }
val (participantSessions, observerSessions) = deriveSessions(sessions) val (participantSessions, observerSessions) = deriveSessions(sessions)
subFlow(SendTransactionFlow(tx, participantSessions, observerSessions, statesToRecord, true)) subFlow(object : SendTransactionFlow(tx, participantSessions, observerSessions, statesToRecord, true) {
override fun isFinality(): Boolean = true
})
} catch (e: UnexpectedFlowEndException) { } catch (e: UnexpectedFlowEndException) {
throw UnexpectedFlowEndException( throw UnexpectedFlowEndException(
"One of the sessions ${sessions.map { it.counterparty }} has finished prematurely and we're trying to send them a transaction." + "One of the sessions ${sessions.map { it.counterparty }} has finished prematurely and we're trying to send them a transaction." +
@ -499,55 +496,13 @@ class ReceiveFinalityFlow(private val otherSideSession: FlowSession,
@Suppress("ComplexMethod", "NestedBlockDepth") @Suppress("ComplexMethod", "NestedBlockDepth")
@Suspendable @Suspendable
override fun call(): SignedTransaction { override fun call(): SignedTransaction {
val stx = subFlow(ReceiveTransactionFlow(otherSideSession, false, statesToRecord, true)) return subFlow(object : ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = true, statesToRecord = statesToRecord, handlePropagatedNotaryError = handlePropagatedNotaryError) {
override fun checkBeforeRecording(stx: SignedTransaction) {
val requiresNotarisation = needsNotarySignature(stx) require(expectedTxId == null || expectedTxId == stx.id) {
val fromTwoPhaseFinalityNode = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSideSession.counterparty)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY "We expected to receive transaction with ID $expectedTxId but instead got ${stx.id}. Transaction was" +
&& serviceHub.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY "not recorded and nor its states sent to the vault."
if (fromTwoPhaseFinalityNode) {
if (requiresNotarisation) {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
logger.debug { "Peer recording transaction without notary signature." }
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx)
}
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.")
try {
val notarySignatures = otherSideSession.receive<Try<List<TransactionSignature>>>().unwrap { it.getOrThrow() }
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) {
logger.debug { "Peer received notarised signature." }
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord)
logger.info("Peer finalised transaction with notary signature.")
}
} catch (e: NotaryException) {
logger.info("Peer received notary error.")
val overrideHandlePropagatedNotaryError = handlePropagatedNotaryError ?:
(serviceHub.cordappProvider.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY)
if (overrideHandlePropagatedNotaryError) {
(serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(stx.id)
sleep(Duration.ZERO) // force checkpoint to persist db update.
throw e
}
else {
otherSideSession.receive<Any>() // simulate unexpected flow end
} }
} }
} else { })
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransaction", flowLogic = this) {
(serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord)
logger.info("Peer recorded transaction with recovery metadata.")
}
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
}
} else {
logger.warnOnce("The current usage of ReceiveFinalityFlow is not using Two Phase Finality. Please consider upgrading your CorDapp (refer to Corda 4.11 release notes).")
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordTransactions", flowLogic = this) {
serviceHub.recordTransactions(statesToRecord, setOf(stx))
}
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
logger.info("Peer successfully recorded received transaction.")
}
return stx
} }
} }

View File

@ -6,15 +6,22 @@ import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.TransactionResolutionException import net.corda.core.contracts.TransactionResolutionException
import net.corda.core.contracts.TransactionVerificationException import net.corda.core.contracts.TransactionVerificationException
import net.corda.core.crypto.TransactionSignature
import net.corda.core.internal.FetchDataFlow
import net.corda.core.internal.PlatformVersionSwitches
import net.corda.core.internal.ResolveTransactionsFlow import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.internal.ServiceHubCoreInternal import net.corda.core.internal.ServiceHubCoreInternal
import net.corda.core.internal.checkParameterHash import net.corda.core.internal.checkParameterHash
import net.corda.core.internal.pushToLoggingContext import net.corda.core.internal.pushToLoggingContext
import net.corda.core.internal.telemetry.telemetryServiceInternal
import net.corda.core.node.StatesToRecord import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.Try
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace import net.corda.core.utilities.trace
import net.corda.core.utilities.unwrap import net.corda.core.utilities.unwrap
import java.security.SignatureException import java.security.SignatureException
import java.time.Duration
/** /**
* The [ReceiveTransactionFlow] should be called in response to the [SendTransactionFlow]. * The [ReceiveTransactionFlow] should be called in response to the [SendTransactionFlow].
@ -39,12 +46,13 @@ import java.security.SignatureException
open class ReceiveTransactionFlow constructor(private val otherSideSession: FlowSession, open class ReceiveTransactionFlow constructor(private val otherSideSession: FlowSession,
private val checkSufficientSignatures: Boolean = true, private val checkSufficientSignatures: Boolean = true,
private val statesToRecord: StatesToRecord = StatesToRecord.NONE, private val statesToRecord: StatesToRecord = StatesToRecord.NONE,
private val deferredAck: Boolean = false) : FlowLogic<SignedTransaction>() { private val handlePropagatedNotaryError: Boolean? = null) : FlowLogic<SignedTransaction>() {
@JvmOverloads constructor( @JvmOverloads
constructor(
otherSideSession: FlowSession, otherSideSession: FlowSession,
checkSufficientSignatures: Boolean = true, checkSufficientSignatures: Boolean = true,
statesToRecord: StatesToRecord = StatesToRecord.NONE statesToRecord: StatesToRecord = StatesToRecord.NONE
) : this(otherSideSession, checkSufficientSignatures, statesToRecord, false) ) : this(otherSideSession, checkSufficientSignatures, statesToRecord, null)
@Suppress("KDocMissingDocumentation") @Suppress("KDocMissingDocumentation")
@Suspendable @Suspendable
@ -60,6 +68,10 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow
} }
val payload = otherSideSession.receive<Any>().unwrap { it } val payload = otherSideSession.receive<Any>().unwrap { it }
return if (isReallyReceiveFinality(payload)) {
doReceiveFinality(payload)
} else {
val deferredAck = isDeferredAck(payload)
val stx = resolvePayload(payload) val stx = resolvePayload(payload)
stx.pushToLoggingContext() stx.pushToLoggingContext()
logger.info("Received transaction acknowledgement request from party ${otherSideSession.counterparty}.") logger.info("Received transaction acknowledgement request from party ${otherSideSession.counterparty}.")
@ -79,13 +91,60 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow
logger.info("Successfully received fully signed tx. Sending it to the vault for processing.") logger.info("Successfully received fully signed tx. Sending it to the vault for processing.")
serviceHub.recordTransactions(statesToRecord, setOf(stx)) serviceHub.recordTransactions(statesToRecord, setOf(stx))
logger.info("Successfully recorded received transaction locally.") logger.info("Successfully recorded received transaction locally.")
if (deferredAck) otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
}
stx
}
}
private fun isDeferredAck(payload: Any): Boolean {
return payload is SignedTransactionWithDistributionList && checkSufficientSignatures && payload.isFinality
}
@Suspendable
private fun doReceiveFinality(payload: Any): SignedTransaction {
val stx = resolvePayload(payload)
stx.pushToLoggingContext()
logger.info("Received transaction acknowledgement request from party ${otherSideSession.counterparty}.")
checkParameterHash(stx.networkParametersHash)
subFlow(ResolveTransactionsFlow(stx, otherSideSession, statesToRecord, true))
logger.info("Transaction dependencies resolution completed.")
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
logger.debug { "Peer recording transaction without notary signature." }
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx)
}
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.")
try {
val notarySignatures = otherSideSession.receive<Try<List<TransactionSignature>>>().unwrap { it.getOrThrow() }
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) {
logger.debug { "Peer received notarised signature." }
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord)
logger.info("Peer finalised transaction with notary signature.")
}
} catch (e: NotaryException) {
logger.info("Peer received notary error.")
val overrideHandlePropagatedNotaryError = handlePropagatedNotaryError
?: (serviceHub.cordappProvider.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY)
if (overrideHandlePropagatedNotaryError) {
(serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(stx.id)
sleep(Duration.ZERO) // force checkpoint to persist db update.
throw e
} else {
otherSideSession.receive<Any>() // simulate unexpected flow end
}
} }
return stx return stx
} }
private fun isReallyReceiveFinality(payload: Any): Boolean {
return payload is SignedTransactionWithDistributionList && checkSufficientSignatures && payload.isFinality && NotarySigCheck.needsNotarySignature(payload.stx)
}
open fun resolvePayload(payload: Any): SignedTransaction { open fun resolvePayload(payload: Any): SignedTransaction {
return if (payload is SignedTransactionWithDistributionList) { return if (payload is SignedTransactionWithDistributionList) {
if (checkSufficientSignatures || deferredAck) { if (checkSufficientSignatures) {
(serviceHub as ServiceHubCoreInternal).recordReceiverTransactionRecoveryMetadata(payload.stx.id, otherSideSession.counterparty.name, ourIdentity.name, statesToRecord, payload.distributionList) (serviceHub as ServiceHubCoreInternal).recordReceiverTransactionRecoveryMetadata(payload.stx.id, otherSideSession.counterparty.name, ourIdentity.name, statesToRecord, payload.distributionList)
payload.stx payload.stx
} else payload.stx } else payload.stx

View File

@ -15,13 +15,6 @@ import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.trace import net.corda.core.utilities.trace
import net.corda.core.utilities.unwrap import net.corda.core.utilities.unwrap
import kotlin.collections.List
import kotlin.collections.MutableSet
import kotlin.collections.Set
import kotlin.collections.flatMap
import kotlin.collections.map
import kotlin.collections.mutableSetOf
import kotlin.collections.plus
import kotlin.collections.toSet import kotlin.collections.toSet
/** /**
@ -136,6 +129,8 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
// User can override this method to perform custom request verification. // User can override this method to perform custom request verification.
} }
protected open fun isFinality(): Boolean = false
@Suppress("ComplexCondition", "ComplexMethod", "LongMethod") @Suppress("ComplexCondition", "ComplexMethod", "LongMethod")
@Suspendable @Suspendable
override fun call(): Void? { override fun call(): Void? {
@ -174,7 +169,7 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
val payloadWithMetadata = val payloadWithMetadata =
if (txnMetadata != null && toTwoPhaseFinalityNode && useTwoPhaseFinality && payload is SignedTransaction) { if (txnMetadata != null && toTwoPhaseFinalityNode && useTwoPhaseFinality && payload is SignedTransaction) {
val encryptedDistributionList = (serviceHub as ServiceHubCoreInternal).recordSenderTransactionRecoveryMetadata(payload.id, txnMetadata.copy(initiator = ourIdentity.name)) val encryptedDistributionList = (serviceHub as ServiceHubCoreInternal).recordSenderTransactionRecoveryMetadata(payload.id, txnMetadata.copy(initiator = ourIdentity.name))
SignedTransactionWithDistributionList(payload, encryptedDistributionList!!) SignedTransactionWithDistributionList(payload, encryptedDistributionList!!, isFinality())
} else null } else null
otherSessions.forEachIndexed { idx, otherSideSession -> otherSessions.forEachIndexed { idx, otherSideSession ->
@ -311,5 +306,6 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
@CordaSerializable @CordaSerializable
data class SignedTransactionWithDistributionList( data class SignedTransactionWithDistributionList(
val stx: SignedTransaction, val stx: SignedTransaction,
val distributionList: ByteArray val distributionList: ByteArray,
val isFinality: Boolean
) )