Merge remote-tracking branch 'origin/release/os/4.11' into colljos/encryption_receiver_self_recovery

This commit is contained in:
Jose Coll 2023-08-23 11:20:16 +01:00
commit 3b78b46619
5 changed files with 94 additions and 103 deletions

View File

@ -3244,8 +3244,7 @@ public final class net.corda.core.flows.ReceiveFinalityFlow extends net.corda.co
public <init>(net.corda.core.flows.FlowSession)
public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash)
public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash, net.corda.core.node.StatesToRecord)
public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash, net.corda.core.node.StatesToRecord, Boolean)
public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash, net.corda.core.node.StatesToRecord, Boolean, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash, net.corda.core.node.StatesToRecord, int, kotlin.jvm.internal.DefaultConstructorMarker)
@Suspendable
@NotNull
public net.corda.core.transactions.SignedTransaction call()
@ -3261,8 +3260,6 @@ public class net.corda.core.flows.ReceiveTransactionFlow extends net.corda.core.
public <init>(net.corda.core.flows.FlowSession, boolean)
public <init>(net.corda.core.flows.FlowSession, boolean, net.corda.core.node.StatesToRecord)
public <init>(net.corda.core.flows.FlowSession, boolean, net.corda.core.node.StatesToRecord, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(net.corda.core.flows.FlowSession, boolean, net.corda.core.node.StatesToRecord, boolean)
public <init>(net.corda.core.flows.FlowSession, boolean, net.corda.core.node.StatesToRecord, boolean, int, kotlin.jvm.internal.DefaultConstructorMarker)
@Suspendable
@NotNull
public net.corda.core.transactions.SignedTransaction call()
@ -3356,24 +3353,6 @@ public static final class net.corda.core.flows.SignTransactionFlow$Companion$SIG
public static final class net.corda.core.flows.SignTransactionFlow$Companion$VERIFYING extends net.corda.core.utilities.ProgressTracker$Step
public static final net.corda.core.flows.SignTransactionFlow$Companion$VERIFYING INSTANCE
##
@CordaSerializable
public final class net.corda.core.flows.SignedTransactionWithDistributionList extends java.lang.Object
public <init>(net.corda.core.transactions.SignedTransaction, byte[])
@NotNull
public final net.corda.core.transactions.SignedTransaction component1()
@NotNull
public final byte[] component2()
@NotNull
public final net.corda.core.flows.SignedTransactionWithDistributionList copy(net.corda.core.transactions.SignedTransaction, byte[])
public boolean equals(Object)
@NotNull
public final byte[] getDistributionList()
@NotNull
public final net.corda.core.transactions.SignedTransaction getStx()
public int hashCode()
@NotNull
public String toString()
##
public final class net.corda.core.flows.StackFrameDataToken extends java.lang.Object
public <init>(String)
@NotNull

View File

@ -603,7 +603,9 @@ class FinalityFlowTests : WithFinality {
val txBuilder = DummyContract.move(stateAndRef, newOwner)
val stxn = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey)
val sessionWithCounterParty = initiateFlow(newOwner)
subFlow(SendTransactionFlow(stxn, setOf(sessionWithCounterParty), emptySet(), StatesToRecord.ONLY_RELEVANT))
subFlow(object : SendTransactionFlow(stxn, setOf(sessionWithCounterParty), emptySet(), StatesToRecord.ONLY_RELEVANT, true) {
override fun isFinality(): Boolean = true
})
throw UnexpectedFlowEndException("${stxn.id}")
}
}

View File

@ -5,10 +5,8 @@ import net.corda.core.CordaInternal
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.flows.NotarySigCheck.needsNotarySignature
import net.corda.core.identity.Party
import net.corda.core.identity.groupAbstractPartyByWellKnownParty
import net.corda.core.internal.FetchDataFlow
import net.corda.core.internal.PlatformVersionSwitches
import net.corda.core.internal.ServiceHubCoreInternal
import net.corda.core.internal.pushToLoggingContext
@ -22,7 +20,6 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.Try
import net.corda.core.utilities.debug
import net.corda.core.utilities.unwrap
import java.time.Duration
/**
@ -219,8 +216,6 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
val requiresNotarisation = needsNotarySignature(transaction)
val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY
&& serviceHub.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY
if (useTwoPhaseFinality) {
val stxn = if (requiresNotarisation) {
recordLocallyAndBroadcast(newPlatformSessions, transaction)
@ -285,7 +280,9 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
try {
logger.debug { "Sending transaction to party sessions: $sessions." }
val (participantSessions, observerSessions) = deriveSessions(sessions)
subFlow(SendTransactionFlow(tx, participantSessions, observerSessions, statesToRecord, true))
subFlow(object : SendTransactionFlow(tx, participantSessions, observerSessions, statesToRecord, true) {
override fun isFinality(): Boolean = true
})
} catch (e: UnexpectedFlowEndException) {
throw UnexpectedFlowEndException(
"One of the sessions ${sessions.map { it.counterparty }} has finished prematurely and we're trying to send them a transaction." +
@ -499,55 +496,13 @@ class ReceiveFinalityFlow(private val otherSideSession: FlowSession,
@Suppress("ComplexMethod", "NestedBlockDepth")
@Suspendable
override fun call(): SignedTransaction {
val stx = subFlow(ReceiveTransactionFlow(otherSideSession, false, statesToRecord, true))
val requiresNotarisation = needsNotarySignature(stx)
val fromTwoPhaseFinalityNode = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSideSession.counterparty)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY
&& serviceHub.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY
if (fromTwoPhaseFinalityNode) {
if (requiresNotarisation) {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
logger.debug { "Peer recording transaction without notary signature." }
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx)
return subFlow(object : ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = true, statesToRecord = statesToRecord, handlePropagatedNotaryError = handlePropagatedNotaryError) {
override fun checkBeforeRecording(stx: SignedTransaction) {
require(expectedTxId == null || expectedTxId == stx.id) {
"We expected to receive transaction with ID $expectedTxId but instead got ${stx.id}. Transaction was" +
"not recorded and nor its states sent to the vault."
}
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.")
try {
val notarySignatures = otherSideSession.receive<Try<List<TransactionSignature>>>().unwrap { it.getOrThrow() }
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) {
logger.debug { "Peer received notarised signature." }
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord)
logger.info("Peer finalised transaction with notary signature.")
}
} catch (e: NotaryException) {
logger.info("Peer received notary error.")
val overrideHandlePropagatedNotaryError = handlePropagatedNotaryError ?:
(serviceHub.cordappProvider.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY)
if (overrideHandlePropagatedNotaryError) {
(serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(stx.id)
sleep(Duration.ZERO) // force checkpoint to persist db update.
throw e
}
else {
otherSideSession.receive<Any>() // simulate unexpected flow end
}
}
} else {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransaction", flowLogic = this) {
(serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord)
logger.info("Peer recorded transaction with recovery metadata.")
}
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
}
} else {
logger.warnOnce("The current usage of ReceiveFinalityFlow is not using Two Phase Finality. Please consider upgrading your CorDapp (refer to Corda 4.11 release notes).")
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordTransactions", flowLogic = this) {
serviceHub.recordTransactions(statesToRecord, setOf(stx))
}
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
logger.info("Peer successfully recorded received transaction.")
}
return stx
})
}
}

View File

@ -6,15 +6,22 @@ import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.TransactionResolutionException
import net.corda.core.contracts.TransactionVerificationException
import net.corda.core.crypto.TransactionSignature
import net.corda.core.internal.FetchDataFlow
import net.corda.core.internal.PlatformVersionSwitches
import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.internal.ServiceHubCoreInternal
import net.corda.core.internal.checkParameterHash
import net.corda.core.internal.pushToLoggingContext
import net.corda.core.internal.telemetry.telemetryServiceInternal
import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.Try
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
import net.corda.core.utilities.unwrap
import java.security.SignatureException
import java.time.Duration
/**
* The [ReceiveTransactionFlow] should be called in response to the [SendTransactionFlow].
@ -39,12 +46,13 @@ import java.security.SignatureException
open class ReceiveTransactionFlow constructor(private val otherSideSession: FlowSession,
private val checkSufficientSignatures: Boolean = true,
private val statesToRecord: StatesToRecord = StatesToRecord.NONE,
private val deferredAck: Boolean = false) : FlowLogic<SignedTransaction>() {
@JvmOverloads constructor(
private val handlePropagatedNotaryError: Boolean? = null) : FlowLogic<SignedTransaction>() {
@JvmOverloads
constructor(
otherSideSession: FlowSession,
checkSufficientSignatures: Boolean = true,
statesToRecord: StatesToRecord = StatesToRecord.NONE
) : this(otherSideSession, checkSufficientSignatures, statesToRecord, false)
) : this(otherSideSession, checkSufficientSignatures, statesToRecord, null)
@Suppress("KDocMissingDocumentation")
@Suspendable
@ -60,32 +68,83 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow
}
val payload = otherSideSession.receive<Any>().unwrap { it }
return if (isReallyReceiveFinality(payload)) {
doReceiveFinality(payload)
} else {
val deferredAck = isDeferredAck(payload)
val stx = resolvePayload(payload)
stx.pushToLoggingContext()
logger.info("Received transaction acknowledgement request from party ${otherSideSession.counterparty}.")
checkParameterHash(stx.networkParametersHash)
subFlow(ResolveTransactionsFlow(stx, otherSideSession, statesToRecord, deferredAck))
logger.info("Transaction dependencies resolution completed.")
try {
stx.verify(serviceHub, checkSufficientSignatures)
} catch (e: Exception) {
logger.warn("Transaction verification failed.")
throw e
}
if (checkSufficientSignatures) {
// We should only send a transaction to the vault for processing if we did in fact fully verify it, and
// there are no missing signatures. We don't want partly signed stuff in the vault.
checkBeforeRecording(stx)
logger.info("Successfully received fully signed tx. Sending it to the vault for processing.")
serviceHub.recordTransactions(statesToRecord, setOf(stx))
logger.info("Successfully recorded received transaction locally.")
if (deferredAck) otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
}
stx
}
}
private fun isDeferredAck(payload: Any): Boolean {
return payload is SignedTransactionWithDistributionList && checkSufficientSignatures && payload.isFinality
}
@Suspendable
private fun doReceiveFinality(payload: Any): SignedTransaction {
val stx = resolvePayload(payload)
stx.pushToLoggingContext()
logger.info("Received transaction acknowledgement request from party ${otherSideSession.counterparty}.")
checkParameterHash(stx.networkParametersHash)
subFlow(ResolveTransactionsFlow(stx, otherSideSession, statesToRecord, deferredAck))
subFlow(ResolveTransactionsFlow(stx, otherSideSession, statesToRecord, true))
logger.info("Transaction dependencies resolution completed.")
try {
stx.verify(serviceHub, checkSufficientSignatures)
} catch (e: Exception) {
logger.warn("Transaction verification failed.")
throw e
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
logger.debug { "Peer recording transaction without notary signature." }
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx)
}
if (checkSufficientSignatures) {
// We should only send a transaction to the vault for processing if we did in fact fully verify it, and
// there are no missing signatures. We don't want partly signed stuff in the vault.
checkBeforeRecording(stx)
logger.info("Successfully received fully signed tx. Sending it to the vault for processing.")
serviceHub.recordTransactions(statesToRecord, setOf(stx))
logger.info("Successfully recorded received transaction locally.")
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.")
try {
val notarySignatures = otherSideSession.receive<Try<List<TransactionSignature>>>().unwrap { it.getOrThrow() }
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) {
logger.debug { "Peer received notarised signature." }
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord)
logger.info("Peer finalised transaction with notary signature.")
}
} catch (e: NotaryException) {
logger.info("Peer received notary error.")
val overrideHandlePropagatedNotaryError = handlePropagatedNotaryError
?: (serviceHub.cordappProvider.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY)
if (overrideHandlePropagatedNotaryError) {
(serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(stx.id)
sleep(Duration.ZERO) // force checkpoint to persist db update.
throw e
} else {
otherSideSession.receive<Any>() // simulate unexpected flow end
}
}
return stx
}
private fun isReallyReceiveFinality(payload: Any): Boolean {
return payload is SignedTransactionWithDistributionList && checkSufficientSignatures && payload.isFinality && NotarySigCheck.needsNotarySignature(payload.stx)
}
open fun resolvePayload(payload: Any): SignedTransaction {
return if (payload is SignedTransactionWithDistributionList) {
if (checkSufficientSignatures || deferredAck) {
if (checkSufficientSignatures) {
(serviceHub as ServiceHubCoreInternal).recordReceiverTransactionRecoveryMetadata(payload.stx.id, otherSideSession.counterparty.name,
TransactionMetadata(otherSideSession.counterparty.name, DistributionList.ReceiverDistributionList(payload.distributionList, statesToRecord)))
payload.stx

View File

@ -15,13 +15,6 @@ import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.trace
import net.corda.core.utilities.unwrap
import kotlin.collections.List
import kotlin.collections.MutableSet
import kotlin.collections.Set
import kotlin.collections.flatMap
import kotlin.collections.map
import kotlin.collections.mutableSetOf
import kotlin.collections.plus
import kotlin.collections.toSet
import net.corda.core.flows.DistributionList.SenderDistributionList
@ -137,6 +130,8 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
// User can override this method to perform custom request verification.
}
protected open fun isFinality(): Boolean = false
@Suppress("ComplexCondition", "ComplexMethod", "LongMethod")
@Suspendable
override fun call(): Void? {
@ -175,7 +170,7 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
val payloadWithMetadata =
if (txnMetadata != null && toTwoPhaseFinalityNode && useTwoPhaseFinality && payload is SignedTransaction) {
val encryptedDistributionList = (serviceHub as ServiceHubCoreInternal).recordSenderTransactionRecoveryMetadata(payload.id, txnMetadata.copy(initiator = ourIdentity.name))
SignedTransactionWithDistributionList(payload, encryptedDistributionList!!)
SignedTransactionWithDistributionList(payload, encryptedDistributionList!!, isFinality())
} else null
otherSessions.forEachIndexed { idx, otherSideSession ->
@ -312,5 +307,6 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
@CordaSerializable
data class SignedTransactionWithDistributionList(
val stx: SignedTransaction,
val distributionList: ByteArray
val distributionList: ByteArray,
val isFinality: Boolean
)