CORDA-3194 Replay start flow events when responding flow fails initial checkpoint commit (#2601)

* CORDA-3194 Replay start flow events when responding flow fails initial checkpoint commit

Logic has already been added to recover from initial checkpoint commit
failures on the initiating flow side but this did not suffice for
the same failure occurring on the responding flow's side.

The same idea has been added to resolve the responding flow's issue.

`ExternalMessageEvent` now has a `flowId` that is maintained on the
event. Messages can then be replayed to start/restart the flow, while
the event provides the flow id to each flow start.

Each `ExternalMessageEvent` implementation generates a random `flowId`
when constructed.

Events are stored in Artemis. This allows the solution to recover across
node restarts as the events will be pulled from artemis again when
restarting.

In the future `flowId`s will probably moved off of the events and
generated purely on the responding flow's node.

* CORDA-3194 Add test to verify that errors removing a responding flow are recoverable
This commit is contained in:
Dan Newton 2019-10-02 11:07:06 +01:00 committed by LankyDan
parent 1f71b071aa
commit 268d129838
4 changed files with 349 additions and 0 deletions

View File

@ -138,6 +138,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
@ -232,6 +233,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
@ -326,6 +328,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
@ -426,6 +429,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
@ -530,6 +534,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
@ -647,6 +652,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
@ -752,6 +758,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
@ -870,10 +877,347 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
assertEquals(4, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - terminal") }.size)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event on a responding flow. The failure prevents the node from saving
* its original checkpoint.
*
* The exception is thrown 5 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test
fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully - responding flow`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = CHARLIE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val alice = startNode(
NodeParameters(
providedName = ALICE_NAME,
rpcUsers = listOf(rpcUser)
)
).getOrThrow()
val submit = Submit("localhost", 12000)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 5
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
val output = charlie.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, charlieClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event on a responding flow. The failure prevents the node from saving
* its original checkpoint.
*
* The exception is thrown 5 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*
* The final asserts for checking the checkpoints on the nodes are correct since the responding node can replay the flow starting events
* from artemis. Therefore, the checkpoint is missing due the failures from saving the original checkpoint. But, the node will still be
* able to recover when the node is restarted (by using the events). The initiating flow maintains the checkpoint as it is waiting for
* the responding flow to recover and finish its flow.
*/
@Test
fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists - responding flow`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = CHARLIE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val alice = startNode(
NodeParameters(
providedName = ALICE_NAME,
rpcUsers = listOf(rpcUser)
)
).getOrThrow()
val submit = Submit("localhost", 12000)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 7
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
}
val output = charlie.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for the flow that is waiting for the errored counterparty flow to finish and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
// the checkpoint is not persisted since it kept failing the original checkpoint commit
// the flow will recover since artemis will keep the events and replay them on node restart
assertEquals(1, charlieClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event when the flow is finishing on a responding node.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test
fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully - responding flow`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = CHARLIE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val alice = startNode(
NodeParameters(
providedName = ALICE_NAME,
rpcUsers = listOf(rpcUser)
)
).getOrThrow()
val submit = Submit("localhost", 12000)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when adding action to remove checkpoint
CLASS ${TopLevelTransition::class.java.name}
METHOD flowFinishTransition
AT ENTRY
IF !flagged("remove_checkpoint_flag")
DO flag("remove_checkpoint_flag"); traceln("Setting remove checkpoint flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction when removing checkpoint
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 3
DO incrementCounter("counter");
clear("remove_checkpoint_flag");
traceln("Throwing exception");
throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
val output = charlie.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, charlieClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
}
@StartableByRPC

View File

@ -3,6 +3,7 @@ package net.corda.node.services.messaging
import co.paralleluniverse.fibers.Suspendable
import com.codahale.metrics.MetricRegistry
import net.corda.core.crypto.toStringShort
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.ThreadBox
@ -424,6 +425,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
private inner class MessageDeduplicationHandler(val artemisMessage: ClientMessage, override val receivedMessage: ReceivedMessage) : DeduplicationHandler, ExternalEvent.ExternalMessageEvent {
override val externalCause: ExternalEvent
get() = this
override val flowId: StateMachineRunId = StateMachineRunId.createRandom()
override val deduplicationHandler: MessageDeduplicationHandler
get() = this

View File

@ -114,6 +114,7 @@ interface ExternalEvent {
* An external P2P message event.
*/
interface ExternalMessageEvent : ExternalEvent {
val flowId: StateMachineRunId
val receivedMessage: ReceivedMessage
}

View File

@ -1,5 +1,6 @@
package net.corda.testing.node.internal
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.PLATFORM_VERSION
@ -268,6 +269,7 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration,
private inner class InMemoryDeduplicationHandler(override val receivedMessage: ReceivedMessage, val transfer: InMemoryMessagingNetwork.MessageTransfer) : DeduplicationHandler, ExternalEvent.ExternalMessageEvent {
override val externalCause: ExternalEvent
get() = this
override val flowId: StateMachineRunId = StateMachineRunId.createRandom()
override val deduplicationHandler: DeduplicationHandler
get() = this