Merge pull request #7025 from corda/dan/ENT-6376-port-to-os

ENT-6376 Don't hospitalize session end in `ReceiveFinalityFlow` (#4325)

Cherry pick from enterprise.
This commit is contained in:
Dan Newton 2022-01-13 15:19:28 +00:00 committed by GitHub
commit 524f1c14c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 264 additions and 12 deletions

View File

@ -4,6 +4,8 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClient
import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.PartyAndReference
import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.CollectSignaturesFlow
import net.corda.core.flows.FinalityFlow import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowException import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
@ -12,20 +14,29 @@ import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.NotaryException import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
import net.corda.core.flows.ReceiveFinalityFlow import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.SignTransactionFlow
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.StateMachineUpdate import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions import net.corda.node.services.Permissions
import net.corda.testing.contracts.DummyContract import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyContract.SingleOwnerState import net.corda.testing.contracts.DummyContract.SingleOwnerState
import net.corda.testing.contracts.DummyState
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver import net.corda.testing.driver.driver
import net.corda.testing.node.User import net.corda.testing.node.User
@ -33,6 +44,7 @@ import net.corda.testing.node.internal.enclosedCordapp
import net.corda.testing.node.internal.findCordapp import net.corda.testing.node.internal.findCordapp
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Before
import org.junit.Test import org.junit.Test
import java.sql.SQLException import java.sql.SQLException
import java.util.* import java.util.*
@ -47,6 +59,12 @@ class FlowHospitalTest {
private val rpcUser = User("user1", "test", permissions = setOf(Permissions.all())) private val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
@Before
fun before() {
SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow = false
CreateTransactionButDontFinalizeResponderFlow.exceptionSeenInUserFlow = false
}
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `when double spend occurs, the flow is successfully deleted on the counterparty`() { fun `when double spend occurs, the flow is successfully deleted on the counterparty`() {
driver(DriverParameters(cordappsForAllNodes = listOf(enclosedCordapp(), findCordapp("net.corda.testing.contracts")))) { driver(DriverParameters(cordappsForAllNodes = listOf(enclosedCordapp(), findCordapp("net.corda.testing.contracts")))) {
@ -172,7 +190,7 @@ class FlowHospitalTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `HospitalizeFlowException cloaking an important exception thrown`() { fun `HospitalizeFlowException cloaking an important exception thrown`() {
var dischargedCounter = 0 var dischargedCounter = 0
var observationCounter: Int = 0 var observationCounter = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> StaffedFlowHospital.onFlowDischarged.add { _, _ ->
++dischargedCounter ++dischargedCounter
} }
@ -197,6 +215,84 @@ class FlowHospitalTest {
} }
} }
@Test(timeout = 300_000)
fun `catching a notary error will cause a peer to fail with unexpected session end during ReceiveFinalityFlow that passes through user code`() {
var dischargedCounter = 0
StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
++dischargedCounter
}
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = false, startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
nodeAHandle.rpc.let {
val ref = it.startFlow(::CreateTransactionFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow(20.seconds)
it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
}
}
// 1 is the notary failing to notarise and propagating the error
// 2 is the receiving flow failing due to the unexpected session end error
assertEquals(2, dischargedCounter)
assertTrue(SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow)
}
@Test(timeout = 300_000)
fun `unexpected session end errors outside of ReceiveFinalityFlow are not handled`() {
var dischargedCounter = 0
var observationCounter = 0
StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
++dischargedCounter
}
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observationCounter
}
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = false, startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeCHandle = startNode(providedName = CHARLIE_NAME, rpcUsers = listOf(user)).getOrThrow()
nodeAHandle.rpc.let {
val ref = it.startFlow(::CreateTransactionFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow(20.seconds)
val ref2 = it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
val ref3 = it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeCHandle.nodeInfo.singleIdentity(), ref2).returnValue.getOrThrow(20.seconds)
it.startFlow(::CreateTransactionButDontFinalizeFlow, nodeBHandle.nodeInfo.singleIdentity(), ref3).returnValue.getOrThrow(20.seconds)
}
}
assertEquals(0, dischargedCounter)
assertEquals(1, observationCounter)
assertTrue(CreateTransactionButDontFinalizeResponderFlow.exceptionSeenInUserFlow)
}
@Test(timeout = 300_000)
fun `unexpected session end errors within ReceiveFinalityFlow can be caught and the flow can end gracefully`() {
var dischargedCounter = 0
var observationCounter = 0
StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
++dischargedCounter
}
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observationCounter
}
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = false, startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
nodeAHandle.rpc.let {
val ref = it.startFlow(::CreateTransactionFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow(20.seconds)
it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref, true).returnValue.getOrThrow(20.seconds)
}
}
// 1 is the notary failing to notarise and propagating the error
assertEquals(1, dischargedCounter)
assertEquals(0, observationCounter)
assertTrue(SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow)
}
@StartableByRPC @StartableByRPC
class IssueFlow(val notary: Party) : FlowLogic<StateAndRef<SingleOwnerState>>() { class IssueFlow(val notary: Party) : FlowLogic<StateAndRef<SingleOwnerState>>() {
@ -296,4 +392,136 @@ class FlowHospitalTest {
setCause(SQLException("deadlock")) setCause(SQLException("deadlock"))
} }
} }
@InitiatingFlow
@StartableByRPC
class CreateTransactionFlow(private val peer: Party) : FlowLogic<StateAndRef<DummyState>>() {
@Suspendable
override fun call(): StateAndRef<DummyState> {
val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
addOutputState(DummyState(participants = listOf(ourIdentity)))
addCommand(DummyContract.Commands.Create(), listOf(ourIdentity.owningKey, peer.owningKey))
}
val stx = serviceHub.signInitialTransaction(tx)
val session = initiateFlow(peer)
val ftx = subFlow(CollectSignaturesFlow(stx, listOf(session)))
subFlow(FinalityFlow(ftx, session))
return ftx.coreTransaction.outRef(0)
}
}
@InitiatedBy(CreateTransactionFlow::class)
class CreateTransactionResponderFlow(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
logger.info("CREATE TX - WAITING TO SIGN TX")
val stx = subFlow(object : SignTransactionFlow(session) {
override fun checkTransaction(stx: SignedTransaction) {
}
})
logger.info("CREATE TX - SIGNED TO SIGN TX")
subFlow(ReceiveFinalityFlow(session, stx.id))
logger.info("CREATE TX - RECEIVED TX")
}
}
@InitiatingFlow
@StartableByRPC
class SpendStateAndCatchDoubleSpendFlow(
private val peer: Party,
private val ref: StateAndRef<DummyState>,
private val consumePeerError: Boolean
) : FlowLogic<StateAndRef<DummyState>>() {
constructor(peer: Party, ref: StateAndRef<DummyState>): this(peer, ref, false)
@Suspendable
override fun call(): StateAndRef<DummyState> {
val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
addInputState(ref)
addOutputState(DummyState(participants = listOf(ourIdentity)))
addCommand(DummyContract.Commands.Move(), listOf(ourIdentity.owningKey, peer.owningKey))
}
val stx = serviceHub.signInitialTransaction(tx)
val session = initiateFlow(peer)
session.send(consumePeerError)
val ftx = subFlow(CollectSignaturesFlow(stx, listOf(session)))
try {
subFlow(FinalityFlow(ftx, session))
} catch(e: NotaryException) {
logger.info("Caught notary exception")
}
return ftx.coreTransaction.outRef(0)
}
}
@InitiatedBy(SpendStateAndCatchDoubleSpendFlow::class)
class SpendStateAndCatchDoubleSpendResponderFlow(private val session: FlowSession) : FlowLogic<Unit>() {
companion object {
var exceptionSeenInUserFlow = false
}
@Suspendable
override fun call() {
val consumeError = session.receive<Boolean>().unwrap { it }
val stx = subFlow(object : SignTransactionFlow(session) {
override fun checkTransaction(stx: SignedTransaction) {
}
})
try {
subFlow(ReceiveFinalityFlow(session, stx.id))
} catch (e: UnexpectedFlowEndException) {
exceptionSeenInUserFlow = true
if (!consumeError) {
throw e
}
}
}
}
@InitiatingFlow
@StartableByRPC
class CreateTransactionButDontFinalizeFlow(private val peer: Party, private val ref: StateAndRef<DummyState>) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
addInputState(ref)
addOutputState(DummyState(participants = listOf(ourIdentity)))
addCommand(DummyContract.Commands.Move(), listOf(ourIdentity.owningKey))
}
val stx = serviceHub.signInitialTransaction(tx)
val session = initiateFlow(peer)
// Send the transaction id to the peer instead of the transaction.
// This allows transaction dependency resolution to occur within the peer's [ReceiveTransactionFlow].
session.send(stx.id)
// Mimic notarisation from [FinalityFlow] so that failing inside [ResolveTransactionsFlow] can be achieved.
val notarySignatures = subFlow(NotaryFlow.Client(stx, skipVerification = true))
val notarisedTx = stx + notarySignatures
session.send(notarisedTx)
}
}
@InitiatedBy(CreateTransactionButDontFinalizeFlow::class)
class CreateTransactionButDontFinalizeResponderFlow(private val session: FlowSession) : FlowLogic<Unit>() {
companion object {
var exceptionSeenInUserFlow = false
}
@Suspendable
override fun call() {
val id = session.receive<SecureHash>().unwrap { it }
try {
subFlow(ReceiveFinalityFlow(session, id))
} catch (e: UnexpectedFlowEndException) {
exceptionSeenInUserFlow = true
throw e
}
}
}
} }

View File

@ -11,6 +11,7 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.DeclaredField import net.corda.core.internal.DeclaredField
import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.internal.ThreadBox import net.corda.core.internal.ThreadBox
import net.corda.core.internal.TimedFlow import net.corda.core.internal.TimedFlow
import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.VisibleForTesting
@ -21,6 +22,7 @@ import net.corda.core.utilities.debug
import net.corda.core.utilities.minutes import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.node.services.FinalityHandler import net.corda.node.services.FinalityHandler
import net.corda.node.services.statemachine.transitions.StartedFlowTransition
import org.hibernate.exception.ConstraintViolationException import org.hibernate.exception.ConstraintViolationException
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.io.Closeable import java.io.Closeable
@ -29,10 +31,9 @@ import java.sql.SQLTransientConnectionException
import java.time.Clock import java.time.Clock
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.* import java.util.Timer
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import javax.persistence.PersistenceException import javax.persistence.PersistenceException
import kotlin.collections.HashMap
import kotlin.concurrent.timerTask import kotlin.concurrent.timerTask
import kotlin.math.pow import kotlin.math.pow
@ -485,13 +486,22 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
"the flow by re-starting the node. State machine state: $currentState", newError) "the flow by re-starting the node. State machine state: $currentState", newError)
Diagnosis.OVERNIGHT_OBSERVATION Diagnosis.OVERNIGHT_OBSERVATION
} else if (isFromReceiveFinalityFlow(newError)) { } else if (isFromReceiveFinalityFlow(newError)) {
if (isErrorPropagatedFromCounterparty(newError) && isErrorThrownDuringReceiveFinality(newError)) { when {
// no need to keep around the flow, since notarisation has already failed at the counterparty. isErrorPropagatedFromCounterparty(newError) && isErrorThrownDuringReceiveTransactionFlow(newError) -> {
Diagnosis.NOT_MY_SPECIALTY // no need to keep around the flow, since notarisation has already failed at the counterparty.
} else { Diagnosis.NOT_MY_SPECIALTY
log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " + }
"the flow by re-starting the node. State machine state: $currentState", newError) isEndSessionErrorThrownDuringReceiveTransactionFlow(newError) -> {
Diagnosis.OVERNIGHT_OBSERVATION // Typically occurs if the initiating flow catches a notary exception and ends their flow successfully.
Diagnosis.NOT_MY_SPECIALTY
}
else -> {
log.warn(
"Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
"the flow by re-starting the node. State machine state: $currentState", newError
)
Diagnosis.OVERNIGHT_OBSERVATION
}
} }
} else { } else {
Diagnosis.NOT_MY_SPECIALTY Diagnosis.NOT_MY_SPECIALTY
@ -523,13 +533,26 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
* This is because in the latter case, the transaction might have already been finalised and deleting the flow * This is because in the latter case, the transaction might have already been finalised and deleting the flow
* would introduce risk for inconsistency between nodes. * would introduce risk for inconsistency between nodes.
*/ */
private fun isErrorThrownDuringReceiveFinality(error: Throwable): Boolean { private fun isErrorThrownDuringReceiveTransactionFlow(error: Throwable): Boolean {
val strippedStacktrace = error.stackTrace val strippedStacktrace = error.stackTrace
.filterNot { it?.className?.contains("counter-flow exception from peer") ?: false } .filterNot { it?.className?.contains("counter-flow exception from peer") ?: false }
.filterNot { it?.className?.startsWith("net.corda.node.services.statemachine.") ?: false } .filterNot { it?.className?.startsWith("net.corda.node.services.statemachine.") ?: false }
return strippedStacktrace.isNotEmpty() return strippedStacktrace.isNotEmpty()
&& strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!!) && strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!!)
} }
/**
* Checks if an end session error exception was thrown and that it did so within [ReceiveTransactionFlow].
*
* The check for [ReceiveTransactionFlow] is important to ensure that the session didn't end within [ResolveTransactionsFlow] which
* implies that it has been receiving the transaction's dependencies and therefore ending before receiving the whole transaction
* is incorrect behaviour.
*/
private fun isEndSessionErrorThrownDuringReceiveTransactionFlow(error: Throwable): Boolean {
return error is UnexpectedFlowEndException
&& error.message?.contains(StartedFlowTransition.UNEXPECTED_SESSION_END_MESSAGE) == true
&& isErrorThrownDuringReceiveTransactionFlow(error)
}
} }
/** /**

View File

@ -28,6 +28,7 @@ class StartedFlowTransition(
companion object { companion object {
private val logger: Logger = contextLogger() private val logger: Logger = contextLogger()
const val UNEXPECTED_SESSION_END_MESSAGE = "Received session end message instead of a data session message. Mismatched send and receive?"
} }
override fun transition(): TransitionResult { override fun transition(): TransitionResult {
@ -253,7 +254,7 @@ class StartedFlowTransition(
newSessionMessages[sessionId] = sessionState.copy(receivedMessages = messages.subList(1, messages.size).toArrayList()) newSessionMessages[sessionId] = sessionState.copy(receivedMessages = messages.subList(1, messages.size).toArrayList())
// at this point, we've already checked for errors and session ends, so it's guaranteed that the first message will be a data message. // at this point, we've already checked for errors and session ends, so it's guaranteed that the first message will be a data message.
resultMessages[sessionId] = if (messages[0] is EndSessionMessage) { resultMessages[sessionId] = if (messages[0] is EndSessionMessage) {
throw UnexpectedFlowEndException("Received session end message instead of a data session message. Mismatched send and receive?") throw UnexpectedFlowEndException(UNEXPECTED_SESSION_END_MESSAGE)
} else { } else {
(messages[0] as DataSessionMessage).payload (messages[0] as DataSessionMessage).payload
} }