CORDA-3777 Reload after every checkpoint (#6494)

Enable reloading of a flow after every checkpoint is saved. This
includes reloading the checkpoint from the database and recreating the
fiber.

When a flow and its `StateMachineState` is created it checks the node's
config to see if the `reloadCheckpointAfterSuspend` is set to true. If it is
it initialises `StateMachineState.reloadCheckpointAfterSuspendCount`
with the value 0. Otherwise, it remains `null`.

This count represents how many times the flow has reloaded from its
checkpoint (not the same as retrying). It is incremented every time the
flow is reloaded.

When a flow suspends, it processes the suspend event like usual, but
it will now also check if `reloadCheckpointAfterSuspendCount` is not
`null` (that it is activated) and process a 
`ReloadFlowFromCheckpointAfterSuspend`event, if and only if 
`reloadCheckpointAfterSuspendCount` is greater than
`CheckpointState.numberOfSuspends`.

This means idempotent flows can reload from the start and not reload
again until reaching a new suspension point.

Flows that skip checkpoints can reload from a previously saved
checkpoint (or from the initial checkpoint) and will continue reloading
on reaching the next new suspension point (not the suspension point that
it skipped saving).

If the flow fails to deserialize the checkpoint from the database upon
reloading a `ReloadFlowFromCheckpointException` is throw. This causes
the flow to be kept for observation.
This commit is contained in:
Dan Newton 2020-07-28 16:27:51 +01:00 committed by GitHub
parent 52cbe04b8c
commit c2fd8253ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 779 additions and 307 deletions

View File

@ -0,0 +1,511 @@
package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.IdempotentFlow
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.finance.DOLLARS
import net.corda.finance.flows.CashIssueAndPaymentFlow
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.FlowTimeoutException
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import net.corda.testing.node.internal.enclosedCordapp
import org.junit.Test
import java.sql.SQLTransientConnectionException
import java.util.concurrent.Semaphore
import kotlin.test.assertEquals
import kotlin.test.assertNull
class FlowReloadAfterCheckpointTest {
private companion object {
val cordapps = listOf(enclosedCordapp())
}
@Test(timeout = 300_000)
fun `flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true`() {
val reloadCounts = mutableMapOf<StateMachineRunId, Int>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 }
}
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
.map {
startNode(
providedName = it,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
)
}
.transpose()
.getOrThrow()
val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), false, false, false)
val flowStartedByAlice = handle.id
handle.returnValue.getOrThrow()
assertEquals(5, reloadCounts[flowStartedByAlice])
assertEquals(6, reloadCounts[ReloadFromCheckpointResponder.flowId])
}
}
@Test(timeout = 300_000)
fun `flow will not reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is false`() {
val reloadCounts = mutableMapOf<StateMachineRunId, Int>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 }
}
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
.map {
startNode(
providedName = it,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to false)
)
}
.transpose()
.getOrThrow()
val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), false, false, false)
val flowStartedByAlice = handle.id
handle.returnValue.getOrThrow()
assertNull(reloadCounts[flowStartedByAlice])
assertNull(reloadCounts[ReloadFromCheckpointResponder.flowId])
}
}
@Test(timeout = 300_000)
fun `flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true and be kept for observation due to failed deserialization`() {
val reloadCounts = mutableMapOf<StateMachineRunId, Int>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 }
}
lateinit var flowKeptForObservation: StateMachineRunId
val lock = Semaphore(0)
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { id, _ ->
flowKeptForObservation = id
lock.release()
}
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
.map {
startNode(
providedName = it,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
)
}
.transpose()
.getOrThrow()
val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), true, false, false)
val flowStartedByAlice = handle.id
lock.acquire()
assertEquals(flowStartedByAlice, flowKeptForObservation)
assertEquals(4, reloadCounts[flowStartedByAlice])
assertEquals(4, reloadCounts[ReloadFromCheckpointResponder.flowId])
}
}
@Test(timeout = 300_000)
fun `flow will reload from a previous checkpoint after calling suspending function and skipping the persisting the current checkpoint when reloadCheckpointAfterSuspend is true`() {
val reloadCounts = mutableMapOf<StateMachineRunId, Int>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 }
}
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
.map {
startNode(
providedName = it,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
)
}
.transpose()
.getOrThrow()
val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), false, false, true)
val flowStartedByAlice = handle.id
handle.returnValue.getOrThrow()
assertEquals(5, reloadCounts[flowStartedByAlice])
assertEquals(6, reloadCounts[ReloadFromCheckpointResponder.flowId])
}
}
@Test(timeout = 300_000)
fun `idempotent flow will reload from initial checkpoint after calling a suspending function when reloadCheckpointAfterSuspend is true`() {
var reloadCount = 0
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 }
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val alice = startNode(
providedName = ALICE_NAME,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
).getOrThrow()
alice.rpc.startFlow(::MyIdempotentFlow, false).returnValue.getOrThrow()
assertEquals(5, reloadCount)
}
}
@Test(timeout = 300_000)
fun `idempotent flow will reload from initial checkpoint after calling a suspending function when reloadCheckpointAfterSuspend is true but can't throw deserialization error from objects in the call function`() {
var reloadCount = 0
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 }
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val alice = startNode(
providedName = ALICE_NAME,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
).getOrThrow()
alice.rpc.startFlow(::MyIdempotentFlow, true).returnValue.getOrThrow()
assertEquals(5, reloadCount)
}
}
@Test(timeout = 300_000)
fun `timed flow will reload from initial checkpoint after calling a suspending function when reloadCheckpointAfterSuspend is true`() {
var reloadCount = 0
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 }
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val alice = startNode(
providedName = ALICE_NAME,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
).getOrThrow()
alice.rpc.startFlow(::MyTimedFlow).returnValue.getOrThrow()
assertEquals(5, reloadCount)
}
}
@Test(timeout = 300_000)
fun `flow will correctly retry after an error when reloadCheckpointAfterSuspend is true`() {
var reloadCount = 0
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 }
var timesDischarged = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> timesDischarged += 1 }
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val alice = startNode(
providedName = ALICE_NAME,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
).getOrThrow()
alice.rpc.startFlow(::TransientConnectionFailureFlow).returnValue.getOrThrow()
assertEquals(5, reloadCount)
assertEquals(3, timesDischarged)
}
}
@Test(timeout = 300_000)
fun `flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() {
var reloadCount = 0
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 }
driver(
DriverParameters(
inMemoryDB = false,
startNodesInProcess = true,
notarySpecs = emptyList(),
cordappsForAllNodes = cordapps
)
) {
val alice = startNode(
providedName = ALICE_NAME,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
).getOrThrow()
alice.rpc.startFlow(::MyHospitalizingFlow)
Thread.sleep(10.seconds.toMillis())
alice.stop()
startNode(
providedName = ALICE_NAME,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
).getOrThrow()
Thread.sleep(20.seconds.toMillis())
assertEquals(5, reloadCount)
}
}
@Test(timeout = 300_000)
fun `idempotent flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() {
var reloadCount = 0
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 }
driver(
DriverParameters(
inMemoryDB = false,
startNodesInProcess = true,
notarySpecs = emptyList(),
cordappsForAllNodes = cordapps
)
) {
val alice = startNode(
providedName = ALICE_NAME,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
).getOrThrow()
alice.rpc.startFlow(::IdempotentHospitalizingFlow)
Thread.sleep(10.seconds.toMillis())
alice.stop()
startNode(
providedName = ALICE_NAME,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
).getOrThrow()
Thread.sleep(20.seconds.toMillis())
// restarts completely from the beginning and forgets the in-memory reload count therefore
// it reloads an extra 2 times for checkpoints it had already reloaded before the node shutdown
assertEquals(7, reloadCount)
}
}
@Test(timeout = 300_000)
fun `more complicated flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true`() {
val reloadCounts = mutableMapOf<StateMachineRunId, Int>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 }
}
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = FINANCE_CORDAPPS)) {
val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
.map {
startNode(
providedName = it,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
)
}
.transpose()
.getOrThrow()
val handle = alice.rpc.startFlow(
::CashIssueAndPaymentFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
bob.nodeInfo.singleIdentity(),
false,
defaultNotaryIdentity
)
val flowStartedByAlice = handle.id
handle.returnValue.getOrThrow(30.seconds)
val flowStartedByBob = bob.rpc.stateMachineRecordedTransactionMappingSnapshot()
.map(StateMachineTransactionMapping::stateMachineRunId)
.toSet()
.single()
Thread.sleep(10.seconds.toMillis())
assertEquals(7, reloadCounts[flowStartedByAlice])
assertEquals(6, reloadCounts[flowStartedByBob])
}
}
/**
* Has 4 suspension points inside the flow and 1 in [FlowStateMachineImpl.run] totaling 5.
* Therefore this flow should reload 5 times when completed without errors or restarts.
*/
@StartableByRPC
@InitiatingFlow
class ReloadFromCheckpointFlow(
private val party: Party,
private val shouldHaveDeserializationError: Boolean,
private val counterPartyHasDeserializationError: Boolean,
private val skipCheckpoints: Boolean
) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(party)
session.send(counterPartyHasDeserializationError, skipCheckpoints)
session.receive(String::class.java, skipCheckpoints).unwrap { it }
stateMachine.suspend(FlowIORequest.ForceCheckpoint, skipCheckpoints)
val map = if (shouldHaveDeserializationError) {
BrokenMap(mutableMapOf("i dont want" to "this to work"))
} else {
mapOf("i dont want" to "this to work")
}
logger.info("I need to use my variable to pass the build!: $map")
session.sendAndReceive<String>("hey I made it this far")
}
}
/**
* Has 5 suspension points inside the flow and 1 in [FlowStateMachineImpl.run] totaling 6.
* Therefore this flow should reload 6 times when completed without errors or restarts.
*/
@InitiatedBy(ReloadFromCheckpointFlow::class)
class ReloadFromCheckpointResponder(private val session: FlowSession) : FlowLogic<Unit>() {
companion object {
var flowId: StateMachineRunId? = null
}
@Suspendable
override fun call() {
flowId = runId
val counterPartyHasDeserializationError = session.receive<Boolean>().unwrap { it }
session.send("hello there 12312311")
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
val map = if (counterPartyHasDeserializationError) {
BrokenMap(mutableMapOf("i dont want" to "this to work"))
} else {
mapOf("i dont want" to "this to work")
}
logger.info("I need to use my variable to pass the build!: $map")
session.receive<String>().unwrap { it }
session.send("sending back a message")
}
}
/**
* Has 4 suspension points inside the flow and 1 in [FlowStateMachineImpl.run] totaling 5.
* Therefore this flow should reload 5 times when completed without errors or restarts.
*/
@StartableByRPC
@InitiatingFlow
class MyIdempotentFlow(private val shouldHaveDeserializationError: Boolean) : FlowLogic<Unit>(), IdempotentFlow {
@Suspendable
override fun call() {
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
val map = if (shouldHaveDeserializationError) {
BrokenMap(mutableMapOf("i dont want" to "this to work"))
} else {
mapOf("i dont want" to "this to work")
}
logger.info("I need to use my variable to pass the build!: $map")
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
}
}
/**
* Has 4 suspension points inside the flow and 1 in [FlowStateMachineImpl.run] totaling 5.
* Therefore this flow should reload 5 times when completed without errors or restarts.
*/
@StartableByRPC
@InitiatingFlow
class MyTimedFlow : FlowLogic<Unit>(), TimedFlow {
companion object {
var thrown = false
}
override val isTimeoutEnabled: Boolean = true
@Suspendable
override fun call() {
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
if (!thrown) {
thrown = true
throw FlowTimeoutException()
}
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
}
}
@StartableByRPC
@InitiatingFlow
class TransientConnectionFailureFlow : FlowLogic<Unit>() {
companion object {
var retryCount = 0
}
@Suspendable
override fun call() {
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
if (retryCount < 3) {
retryCount += 1
throw SQLTransientConnectionException("Connection is not available")
}
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
}
}
/**
* Has 4 suspension points inside the flow and 1 in [FlowStateMachineImpl.run] totaling 5.
* Therefore this flow should reload 5 times when completed without errors or restarts.
*/
@StartableByRPC
@InitiatingFlow
class MyHospitalizingFlow : FlowLogic<Unit>() {
companion object {
var thrown = false
}
@Suspendable
override fun call() {
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
if (!thrown) {
thrown = true
throw HospitalizeFlowException("i want to try again")
}
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
}
}
/**
* Has 4 suspension points inside the flow and 1 in [FlowStateMachineImpl.run] totaling 5.
* Therefore this flow should reload 5 times when completed without errors or restarts.
*/
@StartableByRPC
@InitiatingFlow
class IdempotentHospitalizingFlow : FlowLogic<Unit>(), IdempotentFlow {
companion object {
var thrown = false
}
@Suspendable
override fun call() {
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
if (!thrown) {
thrown = true
throw HospitalizeFlowException("i want to try again")
}
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
}
}
}

View File

@ -1,10 +1,13 @@
package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.core.CordaRuntimeException
import net.corda.core.flows.*
import net.corda.core.flows.FlowExternalAsyncOperation
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.internal.IdempotentFlow
import net.corda.core.internal.concurrent.transpose
@ -23,6 +26,7 @@ import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import net.corda.testing.node.internal.enclosedCordapp
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.hibernate.exception.ConstraintViolationException
import org.junit.After
@ -33,7 +37,8 @@ import java.sql.SQLException
import java.sql.SQLTransientConnectionException
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.Collections
import java.util.HashSet
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
@ -41,7 +46,11 @@ import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
class FlowRetryTest {
val config = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryIntervalMultiplier = 1.1)
private companion object {
val user = User("mark", "dadada", setOf(Permissions.all()))
val cordapps = listOf(enclosedCordapp())
}
@Before
fun resetCounters() {
@ -58,154 +67,134 @@ class FlowRetryTest {
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
}
@Test(timeout=300_000)
fun `flows continue despite errors`() {
@Test(timeout = 300_000)
fun `flows continue despite errors`() {
val numSessions = 2
val numIterations = 10
val user = User("mark", "dadada", setOf(Permissions.startFlow<InitiatorFlow>()))
val result: Any? = driver(DriverParameters(
startNodesInProcess = isQuasarAgentSpecified(),
notarySpecs = emptyList()
)) {
val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
.map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
val result: Any? = driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
it.proxy.startFlow(::InitiatorFlow, numSessions, numIterations, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow()
}
val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
.map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
val result = nodeAHandle.rpc.startFlow(
::InitiatorFlow,
numSessions,
numIterations,
nodeBHandle.nodeInfo.singleIdentity()
).returnValue.getOrThrow()
result
}
assertNotNull(result)
assertEquals("$numSessions:$numIterations", result)
}
@Test(timeout=300_000)
fun `async operation deduplication id is stable accross retries`() {
val user = User("mark", "dadada", setOf(Permissions.startFlow<AsyncRetryFlow>()))
driver(DriverParameters(
startNodesInProcess = isQuasarAgentSpecified(),
notarySpecs = emptyList()
)) {
@Test(timeout = 300_000)
fun `async operation deduplication id is stable accross retries`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
nodeAHandle.rpc.startFlow(::AsyncRetryFlow).returnValue.getOrThrow()
}
}
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
it.proxy.startFlow(::AsyncRetryFlow).returnValue.getOrThrow()
@Test(timeout = 300_000)
fun `flow gives up after number of exceptions, even if this is the first line of the flow`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
assertFailsWith<CordaRuntimeException> {
nodeAHandle.rpc.startFlow(::RetryFlow).returnValue.getOrThrow()
}
}
}
@Test(timeout=300_000)
fun `flow gives up after number of exceptions, even if this is the first line of the flow`() {
val user = User("mark", "dadada", setOf(Permissions.startFlow<RetryFlow>()))
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
driver(DriverParameters(
startNodesInProcess = isQuasarAgentSpecified(),
notarySpecs = emptyList()
)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
it.proxy.startFlow(::RetryFlow).returnValue.getOrThrow()
}
result
@Test(timeout = 300_000)
fun `flow that throws in constructor throw for the RPC client that attempted to start them`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
assertFailsWith<CordaRuntimeException> {
nodeAHandle.rpc.startFlow(::ThrowingFlow).returnValue.getOrThrow()
}
}
}
@Test(timeout=300_000)
fun `flow that throws in constructor throw for the RPC client that attempted to start them`() {
val user = User("mark", "dadada", setOf(Permissions.startFlow<ThrowingFlow>()))
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
driver(DriverParameters(
startNodesInProcess = isQuasarAgentSpecified(),
notarySpecs = emptyList()
)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
it.proxy.startFlow(::ThrowingFlow).returnValue.getOrThrow()
}
result
}
}
}
@Test(timeout=300_000)
fun `SQLTransientConnectionExceptions thrown by hikari are retried 3 times and then kept in the checkpoints table`() {
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) {
@Test(timeout = 300_000)
fun `SQLTransientConnectionExceptions thrown by hikari are retried 3 times and then kept in the checkpoints table`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
.map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
assertFailsWith<TimeoutException> {
it.proxy.startFlow(::TransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity())
.returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
assertEquals(3, TransientConnectionFailureFlow.retryCount)
assertEquals(1, it.proxy.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.HOSPITALIZED).returnValue.get())
.map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
assertFailsWith<TimeoutException> {
nodeAHandle.rpc.startFlow(::TransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity())
.returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
assertEquals(3, TransientConnectionFailureFlow.retryCount)
assertEquals(
1,
nodeAHandle.rpc.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.HOSPITALIZED).returnValue.get()
)
}
}
@Test(timeout=300_000)
fun `Specific exception still detected even if it is nested inside another exception`() {
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) {
@Test(timeout = 300_000)
fun `Specific exception still detected even if it is nested inside another exception`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
.map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
assertFailsWith<TimeoutException> {
it.proxy.startFlow(::WrappedTransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity())
.returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
assertEquals(3, WrappedTransientConnectionFailureFlow.retryCount)
assertEquals(1, it.proxy.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.HOSPITALIZED).returnValue.get())
.map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
assertFailsWith<TimeoutException> {
nodeAHandle.rpc.startFlow(::WrappedTransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity())
.returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
assertEquals(3, WrappedTransientConnectionFailureFlow.retryCount)
assertEquals(
1,
nodeAHandle.rpc.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.HOSPITALIZED).returnValue.get()
)
}
}
@Test(timeout=300_000)
fun `General external exceptions are not retried and propagate`() {
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) {
@Test(timeout = 300_000)
fun `General external exceptions are not retried and propagate`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
.map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
.map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
assertFailsWith<CordaRuntimeException> {
it.proxy.startFlow(::GeneralExternalFailureFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow()
}
assertEquals(0, GeneralExternalFailureFlow.retryCount)
assertEquals(1, it.proxy.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.FAILED).returnValue.get())
assertFailsWith<CordaRuntimeException> {
nodeAHandle.rpc.startFlow(
::GeneralExternalFailureFlow,
nodeBHandle.nodeInfo.singleIdentity()
).returnValue.getOrThrow()
}
assertEquals(0, GeneralExternalFailureFlow.retryCount)
assertEquals(
1,
nodeAHandle.rpc.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.FAILED).returnValue.get()
)
}
}
@Test(timeout=300_000)
fun `Permission exceptions are not retried and propagate`() {
@Test(timeout = 300_000)
fun `Permission exceptions are not retried and propagate`() {
val user = User("mark", "dadada", setOf())
driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
it.proxy.startFlow(::AsyncRetryFlow).returnValue.getOrThrow()
}.withMessageStartingWith("User not authorized to perform RPC call")
// This stays at -1 since the flow never even got called
assertEquals(-1, GeneralExternalFailureFlow.retryCount)
}
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
nodeAHandle.rpc.startFlow(::AsyncRetryFlow).returnValue.getOrThrow()
}.withMessageStartingWith("User not authorized to perform RPC call")
// This stays at -1 since the flow never even got called
assertEquals(-1, GeneralExternalFailureFlow.retryCount)
}
}
}
@ -315,6 +304,10 @@ enum class Step { First, BeforeInitiate, AfterInitiate, AfterInitiateSendReceive
data class Visited(val sessionNum: Int, val iterationNum: Int, val step: Step)
class BrokenMap<K, V>(delegate: MutableMap<K, V> = mutableMapOf()) : MutableMap<K, V> by delegate {
override fun put(key: K, value: V): V? = throw IllegalStateException("Broken on purpose")
}
@StartableByRPC
class RetryFlow() : FlowLogic<String>(), IdempotentFlow {
companion object {
@ -342,7 +335,7 @@ class AsyncRetryFlow() : FlowLogic<String>(), IdempotentFlow {
val deduplicationIds = mutableSetOf<String>()
}
class RecordDeduplicationId: FlowExternalAsyncOperation<String> {
class RecordDeduplicationId : FlowExternalAsyncOperation<String> {
override fun execute(deduplicationId: String): CompletableFuture<String> {
val dedupeIdIsNew = deduplicationIds.add(deduplicationId)
if (dedupeIdIsNew) {
@ -423,8 +416,9 @@ class WrappedTransientConnectionFailureFlow(private val party: Party) : FlowLogi
// checkpoint will restart the flow after the send
retryCount += 1
throw IllegalStateException(
"wrapped error message",
IllegalStateException("another layer deep", SQLTransientConnectionException("Connection is not available")))
"wrapped error message",
IllegalStateException("another layer deep", SQLTransientConnectionException("Connection is not available"))
)
}
}
@ -465,12 +459,14 @@ class GeneralExternalFailureResponder(private val session: FlowSession) : FlowLo
@StartableByRPC
class GetCheckpointNumberOfStatusFlow(private val flowStatus: Checkpoint.FlowStatus) : FlowLogic<Long>() {
@Suspendable
override fun call(): Long {
val sqlStatement =
"select count(*) " +
"from node_checkpoints " +
"where status = ${flowStatus.ordinal} " +
"and flow_id != '${runId.uuid}' " // don't count in the checkpoint of the current flow
"select count(*) " +
"from node_checkpoints " +
"where status = ${flowStatus.ordinal} " +
"and flow_id != '${runId.uuid}' " // don't count in the checkpoint of the current flow
return serviceHub.jdbcSession().prepareStatement(sqlStatement).use { ps ->
ps.executeQuery().use { rs ->

View File

@ -93,6 +93,8 @@ interface NodeConfiguration : ConfigurationWithOptionsContainer {
val quasarExcludePackages: List<String>
val reloadCheckpointAfterSuspend: Boolean
companion object {
// default to at least 8MB and a bit extra for larger heap sizes
val defaultTransactionCacheSize: Long = 8.MB + getAdditionalCacheMemory()
@ -125,9 +127,13 @@ enum class JmxReporterType {
}
data class DevModeOptions(
val disableCheckpointChecker: Boolean = Defaults.disableCheckpointChecker,
val allowCompatibilityZone: Boolean = Defaults.allowCompatibilityZone,
val djvm: DJVMOptions? = null
@Deprecated(
"The checkpoint checker has been replaced by the ability to reload a checkpoint from the database after every suspend" +
"Use [NodeConfiguration.disableReloadCheckpointAfterSuspend] instead."
)
val disableCheckpointChecker: Boolean = Defaults.disableCheckpointChecker,
val allowCompatibilityZone: Boolean = Defaults.allowCompatibilityZone,
val djvm: DJVMOptions? = null
) {
internal object Defaults {
val disableCheckpointChecker = false
@ -140,10 +146,6 @@ data class DJVMOptions(
val cordaSource: List<String>
)
fun NodeConfiguration.shouldCheckCheckpoints(): Boolean {
return this.devMode && this.devModeOptions?.disableCheckpointChecker != true
}
fun NodeConfiguration.shouldStartSSHDaemon() = this.sshd != null
fun NodeConfiguration.shouldStartLocalShell() = !this.noLocalShell && System.console() != null && this.devMode
fun NodeConfiguration.shouldInitCrashShell() = shouldStartLocalShell() || shouldStartSSHDaemon()

View File

@ -84,7 +84,9 @@ data class NodeConfigurationImpl(
override val blacklistedAttachmentSigningKeys: List<String> = Defaults.blacklistedAttachmentSigningKeys,
override val configurationWithOptions: ConfigurationWithOptions,
override val flowExternalOperationThreadPoolSize: Int = Defaults.flowExternalOperationThreadPoolSize,
override val quasarExcludePackages: List<String> = Defaults.quasarExcludePackages
override val quasarExcludePackages: List<String> = Defaults.quasarExcludePackages,
override val reloadCheckpointAfterSuspend: Boolean = Defaults.reloadCheckpointAfterSuspend
) : NodeConfiguration {
internal object Defaults {
val jmxMonitoringHttpPort: Int? = null
@ -123,6 +125,7 @@ data class NodeConfigurationImpl(
val blacklistedAttachmentSigningKeys: List<String> = emptyList()
const val flowExternalOperationThreadPoolSize: Int = 1
val quasarExcludePackages: List<String> = emptyList()
val reloadCheckpointAfterSuspend: Boolean = System.getProperty("reloadCheckpointAfterSuspend", "false")!!.toBoolean()
fun cordappsDirectories(baseDirectory: Path) = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT)

View File

@ -8,6 +8,7 @@ import net.corda.common.validation.internal.Validated.Companion.invalid
import net.corda.common.validation.internal.Validated.Companion.valid
import net.corda.node.services.config.*
import net.corda.node.services.config.NodeConfigurationImpl.Defaults
import net.corda.node.services.config.NodeConfigurationImpl.Defaults.reloadCheckpointAfterSuspend
import net.corda.node.services.config.schema.parsers.*
internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfiguration>("NodeConfiguration") {
@ -66,6 +67,7 @@ internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfig
.withDefaultValue(Defaults.networkParameterAcceptanceSettings)
private val flowExternalOperationThreadPoolSize by int().optional().withDefaultValue(Defaults.flowExternalOperationThreadPoolSize)
private val quasarExcludePackages by string().list().optional().withDefaultValue(Defaults.quasarExcludePackages)
private val reloadCheckpointAfterSuspend by boolean().optional().withDefaultValue(Defaults.reloadCheckpointAfterSuspend)
@Suppress("unused")
private val custom by nestedObject().optional()
@Suppress("unused")
@ -133,7 +135,8 @@ internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfig
networkParameterAcceptanceSettings = config[networkParameterAcceptanceSettings],
configurationWithOptions = ConfigurationWithOptions(configuration, Configuration.Options.defaults),
flowExternalOperationThreadPoolSize = config[flowExternalOperationThreadPoolSize],
quasarExcludePackages = config[quasarExcludePackages]
quasarExcludePackages = config[quasarExcludePackages],
reloadCheckpointAfterSuspend = config[reloadCheckpointAfterSuspend]
))
} catch (e: Exception) {
return when (e) {

View File

@ -105,18 +105,18 @@ sealed class Event {
* @param progressStep the current progress tracker step.
*/
data class Suspend(
val ioRequest: FlowIORequest<*>,
val maySkipCheckpoint: Boolean,
val fiber: SerializedBytes<FlowStateMachineImpl<*>>,
var progressStep: ProgressTracker.Step?
val ioRequest: FlowIORequest<*>,
val maySkipCheckpoint: Boolean,
val fiber: SerializedBytes<FlowStateMachineImpl<*>>,
var progressStep: ProgressTracker.Step?
) : Event() {
override fun toString() =
"Suspend(" +
"ioRequest=$ioRequest, " +
"maySkipCheckpoint=$maySkipCheckpoint, " +
"fiber=${fiber.hash}, " +
"currentStep=${progressStep?.label}" +
")"
"Suspend(" +
"ioRequest=$ioRequest, " +
"maySkipCheckpoint=$maySkipCheckpoint, " +
"fiber=${fiber.hash}, " +
"currentStep=${progressStep?.label}" +
")"
}
/**
@ -148,12 +148,21 @@ sealed class Event {
data class AsyncOperationThrows(val throwable: Throwable) : Event()
/**
* Retry a flow from the last checkpoint, or if there is no checkpoint, restart the flow with the same invocation details.
* Retry a flow from its last checkpoint, or if there is no checkpoint, restart the flow with the same invocation details.
*/
object RetryFlowFromSafePoint : Event() {
override fun toString() = "RetryFlowFromSafePoint"
}
/**
* Reload a flow from its last checkpoint, or if there is no checkpoint, restart the flow with the same invocation details.
* This is separate from [RetryFlowFromSafePoint] which is used for error handling within the state machine.
* [ReloadFlowFromCheckpointAfterSuspend] is only used when [NodeConfiguration.reloadCheckpointAfterSuspend] is true.
*/
object ReloadFlowFromCheckpointAfterSuspend : Event() {
override fun toString() = "ReloadFlowFromCheckpointAfterSuspend"
}
/**
* Keeps a flow for overnight observation. Overnight observation practically sends the fiber to get suspended,
* in [FlowStateMachineImpl.processEventsUntilFlowIsResumed]. Since the fiber's channel will have no more events to process,

View File

@ -19,6 +19,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.currentStateMachine
import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.isEnabledTimedFlow
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -36,21 +37,23 @@ class NonResidentFlow(val runId: StateMachineRunId, val checkpoint: Checkpoint)
}
class FlowCreator(
val checkpointSerializationContext: CheckpointSerializationContext,
private val checkpointSerializationContext: CheckpointSerializationContext,
private val checkpointStorage: CheckpointStorage,
val scheduler: FiberScheduler,
val database: CordaPersistence,
val transitionExecutor: TransitionExecutor,
val actionExecutor: ActionExecutor,
val secureRandom: SecureRandom,
val serviceHub: ServiceHubInternal,
val unfinishedFibers: ReusableLatch,
val resetCustomTimeout: (StateMachineRunId, Long) -> Unit) {
private val scheduler: FiberScheduler,
private val database: CordaPersistence,
private val transitionExecutor: TransitionExecutor,
private val actionExecutor: ActionExecutor,
private val secureRandom: SecureRandom,
private val serviceHub: ServiceHubInternal,
private val unfinishedFibers: ReusableLatch,
private val resetCustomTimeout: (StateMachineRunId, Long) -> Unit) {
companion object {
private val logger = contextLogger()
}
private val reloadCheckpointAfterSuspend = serviceHub.configuration.reloadCheckpointAfterSuspend
fun createFlowFromNonResidentFlow(nonResidentFlow: NonResidentFlow): Flow<*>? {
// As for paused flows we don't extract the serialized flow state we need to re-extract the checkpoint from the database.
val checkpoint = when (nonResidentFlow.checkpoint.status) {
@ -65,13 +68,23 @@ class FlowCreator(
return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint)
}
fun createFlowFromCheckpoint(runId: StateMachineRunId, oldCheckpoint: Checkpoint): Flow<*>? {
fun createFlowFromCheckpoint(
runId: StateMachineRunId,
oldCheckpoint: Checkpoint,
reloadCheckpointAfterSuspendCount: Int? = null
): Flow<*>? {
val checkpoint = oldCheckpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE)
val fiber = checkpoint.getFiberFromCheckpoint(runId) ?: return null
val resultFuture = openFuture<Any?>()
fiber.logic.stateMachine = fiber
verifyFlowLogicIsSuspendable(fiber.logic)
val state = createStateMachineState(checkpoint, fiber, true)
val state = createStateMachineState(
checkpoint = checkpoint,
fiber = fiber,
anyCheckpointPersisted = true,
reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount
?: if (reloadCheckpointAfterSuspend) checkpoint.checkpointState.numberOfSuspends else null
)
fiber.transientValues = createTransientValues(runId, resultFuture)
fiber.transientState = state
return Flow(fiber, resultFuture)
@ -108,11 +121,13 @@ class FlowCreator(
).getOrThrow()
val state = createStateMachineState(
checkpoint,
flowStateMachineImpl,
existingCheckpoint != null,
deduplicationHandler,
senderUUID)
checkpoint = checkpoint,
fiber = flowStateMachineImpl,
anyCheckpointPersisted = existingCheckpoint != null,
reloadCheckpointAfterSuspendCount = if (reloadCheckpointAfterSuspend) 0 else null,
deduplicationHandler = deduplicationHandler,
senderUUID = senderUUID
)
flowStateMachineImpl.transientState = state
return Flow(flowStateMachineImpl, resultFuture)
}
@ -125,9 +140,7 @@ class FlowCreator(
}
is FlowState.Started -> tryCheckpointDeserialize(this.flowState.frozenFiber, runId) ?: return null
// Places calling this function is rely on it to return null if the flow cannot be created from the checkpoint.
else -> {
return null
}
else -> null
}
}
@ -136,8 +149,16 @@ class FlowCreator(
return try {
bytes.checkpointDeserialize(context = checkpointSerializationContext)
} catch (e: Exception) {
logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e)
null
if (reloadCheckpointAfterSuspend && currentStateMachine() != null) {
logger.error(
"Unable to deserialize checkpoint for flow $flowId. [reloadCheckpointAfterSuspend] is turned on, throwing exception",
e
)
throw ReloadFlowFromCheckpointException(e)
} else {
logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e)
null
}
}
}
@ -169,12 +190,15 @@ class FlowCreator(
)
}
@Suppress("LongParameterList")
private fun createStateMachineState(
checkpoint: Checkpoint,
fiber: FlowStateMachineImpl<*>,
anyCheckpointPersisted: Boolean,
deduplicationHandler: DeduplicationHandler? = null,
senderUUID: String? = null): StateMachineState {
checkpoint: Checkpoint,
fiber: FlowStateMachineImpl<*>,
anyCheckpointPersisted: Boolean,
reloadCheckpointAfterSuspendCount: Int?,
deduplicationHandler: DeduplicationHandler? = null,
senderUUID: String? = null
): StateMachineState {
return StateMachineState(
checkpoint = checkpoint,
pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(),
@ -186,6 +210,8 @@ class FlowCreator(
isRemoved = false,
isKilled = false,
flowLogic = fiber.logic,
senderUUID = senderUUID)
senderUUID = senderUUID,
reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount
)
}
}

View File

@ -29,6 +29,7 @@ import net.corda.core.internal.DeclaredField
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.IdempotentFlow
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.isIdempotentFlow
import net.corda.core.internal.isRegularFile
@ -87,6 +88,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
private val log: Logger = LoggerFactory.getLogger("net.corda.flow")
private val SERIALIZER_BLOCKER = Fiber::class.java.getDeclaredField("SERIALIZER_BLOCKER").apply { isAccessible = true }.get(null)
@VisibleForTesting
var onReloadFlowFromCheckpoint: ((id: StateMachineRunId) -> Unit)? = null
}
data class TransientValues(
@ -504,10 +508,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
contextTransactionOrNull = transaction.value
val event = try {
Event.Suspend(
ioRequest = ioRequest,
maySkipCheckpoint = skipPersistingCheckpoint,
fiber = this.checkpointSerialize(context = serializationContext.value),
progressStep = logic.progressTracker?.currentStep
ioRequest = ioRequest,
maySkipCheckpoint = skipPersistingCheckpoint,
fiber = this.checkpointSerialize(context = serializationContext.value),
progressStep = logic.progressTracker?.currentStep
)
} catch (exception: Exception) {
Event.Error(exception)
@ -529,6 +533,18 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
unpark(SERIALIZER_BLOCKER)
}
}
transientState.reloadCheckpointAfterSuspendCount?.let { count ->
if (count < transientState.checkpoint.checkpointState.numberOfSuspends) {
onReloadFlowFromCheckpoint?.invoke(id)
processEventImmediately(
Event.ReloadFlowFromCheckpointAfterSuspend,
isDbTransactionOpenOnEntry = false,
isDbTransactionOpenOnExit = false
)
}
}
return uncheckedCast(processEventsUntilFlowIsResumed(
isDbTransactionOpenOnEntry = false,
isDbTransactionOpenOnExit = true

View File

@ -30,11 +30,9 @@ import net.corda.core.utilities.debug
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.currentStateMachine
import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
import net.corda.node.utilities.AffinityExecutor
@ -89,7 +87,6 @@ internal class SingleThreadedStateMachineManager(
private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub)
private val actionFutureExecutor = ActionFutureExecutor(innerState, serviceHub, scheduledFutureExecutor)
private val flowTimeoutScheduler = FlowTimeoutScheduler(innerState, scheduledFutureExecutor, serviceHub)
private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
private var checkpointSerializationContext: CheckpointSerializationContext? = null
@ -97,6 +94,7 @@ internal class SingleThreadedStateMachineManager(
override val flowHospital: StaffedFlowHospital = makeFlowHospital()
private val transitionExecutor = makeTransitionExecutor()
private val reloadCheckpointAfterSuspend = serviceHub.configuration.reloadCheckpointAfterSuspend
override val allStateMachines: List<FlowLogic<*>>
get() = innerState.withLock { flows.values.map { it.fiber.logic } }
@ -124,7 +122,6 @@ internal class SingleThreadedStateMachineManager(
)
this.checkpointSerializationContext = checkpointSerializationContext
val actionExecutor = makeActionExecutor(checkpointSerializationContext)
fiberDeserializationChecker?.start(checkpointSerializationContext)
when (startMode) {
StateMachineManager.StartMode.ExcludingPaused -> {}
StateMachineManager.StartMode.Safe -> markAllFlowsAsPaused()
@ -207,10 +204,6 @@ internal class SingleThreadedStateMachineManager(
// Account for any expected Fibers in a test scenario.
liveFibers.countDown(allowedUnsuspendedFiberCount)
liveFibers.await()
fiberDeserializationChecker?.let {
val foundUnrestorableFibers = it.stop()
check(!foundUnrestorableFibers) { "Unrestorable checkpoints were created, please check the logs for details." }
}
flowHospital.close()
scheduledFutureExecutor.shutdown()
scheduler.shutdown()
@ -397,7 +390,7 @@ internal class SingleThreadedStateMachineManager(
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId) ?: return
// Resurrect flow
flowCreator.createFlowFromCheckpoint(flowId, checkpoint) ?: return
flowCreator.createFlowFromCheckpoint(flowId, checkpoint, currentState.reloadCheckpointAfterSuspendCount) ?: return
} else {
// Just flow initiation message
null
@ -632,8 +625,16 @@ internal class SingleThreadedStateMachineManager(
return try {
serializedCheckpoint.deserialize(checkpointSerializationContext!!)
} catch (e: Exception) {
logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e)
null
if (reloadCheckpointAfterSuspend && currentStateMachine() != null) {
logger.error(
"Unable to deserialize checkpoint for flow $flowId. [reloadCheckpointAfterSuspend] is turned on, throwing exception",
e
)
throw ReloadFlowFromCheckpointException(e)
} else {
logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e)
null
}
}
}
@ -700,9 +701,6 @@ internal class SingleThreadedStateMachineManager(
if (serviceHub.configuration.devMode) {
interceptors.add { DumpHistoryOnErrorInterceptor(it) }
}
if (serviceHub.configuration.shouldCheckCheckpoints()) {
interceptors.add { FiberDeserializationCheckingInterceptor(fiberDeserializationChecker!!, it) }
}
if (logger.isDebugEnabled) {
interceptors.add { PrintingInterceptor(it) }
}

View File

@ -589,6 +589,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
return if (newError.mentionsThrowable(StateTransitionException::class.java)) {
when {
newError.mentionsThrowable(InterruptedException::class.java) -> Diagnosis.TERMINAL
newError.mentionsThrowable(ReloadFlowFromCheckpointException::class.java) -> Diagnosis.OVERNIGHT_OBSERVATION
newError.mentionsThrowable(AsyncOperationTransitionException::class.java) -> Diagnosis.NOT_MY_SPECIALTY
history.notDischargedForTheSameThingMoreThan(2, this, currentState) -> Diagnosis.DISCHARGE
else -> Diagnosis.OVERNIGHT_OBSERVATION

View File

@ -59,7 +59,8 @@ data class StateMachineState(
val isRemoved: Boolean,
@Volatile
var isKilled: Boolean,
val senderUUID: String?
val senderUUID: String?,
val reloadCheckpointAfterSuspendCount: Int?
) : KryoSerializable {
override fun write(kryo: Kryo?, output: Output?) {
throw IllegalStateException("${StateMachineState::class.qualifiedName} should never be serialized")

View File

@ -1,6 +1,6 @@
package net.corda.node.services.statemachine
import net.corda.core.CordaException
import net.corda.core.CordaRuntimeException
import net.corda.core.serialization.ConstructorForDeserialization
// CORDA-3353 - These exceptions should not be propagated up to rpc as they suppress the real exceptions
@ -9,12 +9,17 @@ class StateTransitionException(
val transitionAction: Action?,
val transitionEvent: Event?,
val exception: Exception
) : CordaException(exception.message, exception) {
) : CordaRuntimeException(exception.message, exception) {
@ConstructorForDeserialization
constructor(exception: Exception): this(null, null, exception)
}
class AsyncOperationTransitionException(exception: Exception) : CordaException(exception.message, exception)
class AsyncOperationTransitionException(exception: Exception) : CordaRuntimeException(exception.message, exception)
class ErrorStateTransitionException(val exception: Exception) : CordaException(exception.message, exception)
class ErrorStateTransitionException(val exception: Exception) : CordaRuntimeException(exception.message, exception)
class ReloadFlowFromCheckpointException(cause: Exception) : CordaRuntimeException(
"Could not reload flow from checkpoint. This is likely due to a discrepancy " +
"between the serialization and deserialization of an object in the flow's checkpoint", cause
)

View File

@ -1,101 +0,0 @@
package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationContext
import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.util.concurrent.LinkedBlockingQueue
import kotlin.concurrent.thread
/**
* This interceptor checks whether a checkpointed fiber state can be deserialised in a separate thread.
*/
class FiberDeserializationCheckingInterceptor(
val fiberDeserializationChecker: FiberDeserializationChecker,
val delegate: TransitionExecutor
) : TransitionExecutor {
@Suspendable
override fun executeTransition(
fiber: FlowFiber,
previousState: StateMachineState,
event: Event,
transition: TransitionResult,
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> {
val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
val previousFlowState = previousState.checkpoint.flowState
val nextFlowState = nextState.checkpoint.flowState
if (nextFlowState is FlowState.Started) {
if (previousFlowState !is FlowState.Started || previousFlowState.frozenFiber != nextFlowState.frozenFiber) {
fiberDeserializationChecker.submitCheck(nextFlowState.frozenFiber)
}
}
return Pair(continuation, nextState)
}
}
/**
* A fiber deserialisation checker thread. It checks the queued up serialised checkpoints to see if they can be
* deserialised. This is only run in development mode to allow detecting of corrupt serialised checkpoints before they
* are actually used.
*/
class FiberDeserializationChecker {
companion object {
val log = contextLogger()
}
private sealed class Job {
class Check(val serializedFiber: SerializedBytes<FlowStateMachineImpl<*>>) : Job()
object Finish : Job()
}
private var checkerThread: Thread? = null
private val jobQueue = LinkedBlockingQueue<Job>()
private var foundUnrestorableFibers: Boolean = false
fun start(checkpointSerializationContext: CheckpointSerializationContext) {
require(checkerThread == null){"Checking thread must not already be started"}
checkerThread = thread(name = "FiberDeserializationChecker") {
while (true) {
val job = jobQueue.take()
when (job) {
is Job.Check -> {
try {
job.serializedFiber.checkpointDeserialize(context = checkpointSerializationContext)
} catch (exception: Exception) {
log.error("Encountered unrestorable checkpoint!", exception)
foundUnrestorableFibers = true
}
}
Job.Finish -> {
return@thread
}
}
}
}
}
fun submitCheck(serializedFiber: SerializedBytes<FlowStateMachineImpl<*>>) {
jobQueue.add(Job.Check(serializedFiber))
}
/**
* Returns true if some unrestorable checkpoints were encountered, false otherwise
*/
fun stop(): Boolean {
jobQueue.add(Job.Finish)
checkerThread?.join()
return foundUnrestorableFibers
}
}

View File

@ -58,7 +58,8 @@ class TopLevelTransition(
is Event.InitiateFlow -> initiateFlowTransition(event)
is Event.AsyncOperationCompletion -> asyncOperationCompletionTransition(event)
is Event.AsyncOperationThrows -> asyncOperationThrowsTransition(event)
is Event.RetryFlowFromSafePoint -> retryFlowFromSafePointTransition(startingState)
is Event.RetryFlowFromSafePoint -> retryFlowFromSafePointTransition()
is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition()
is Event.OvernightObservation -> overnightObservationTransition()
is Event.WakeUpFromSleep -> wakeUpFromSleepTransition()
}
@ -198,8 +199,8 @@ class TopLevelTransition(
Action.ScheduleEvent(Event.DoRemainingWork)
))
currentState = currentState.copy(
checkpoint = newCheckpoint,
isFlowResumed = false
checkpoint = newCheckpoint,
isFlowResumed = false
)
} else {
actions.addAll(arrayOf(
@ -210,10 +211,10 @@ class TopLevelTransition(
Action.ScheduleEvent(Event.DoRemainingWork)
))
currentState = currentState.copy(
checkpoint = newCheckpoint,
pendingDeduplicationHandlers = emptyList(),
isFlowResumed = false,
isAnyCheckpointPersisted = true
checkpoint = newCheckpoint,
pendingDeduplicationHandlers = emptyList(),
isFlowResumed = false,
isAnyCheckpointPersisted = true
)
}
FlowContinuation.ProcessEvents
@ -315,10 +316,18 @@ class TopLevelTransition(
}
}
private fun retryFlowFromSafePointTransition(startingState: StateMachineState): TransitionResult {
private fun retryFlowFromSafePointTransition(): TransitionResult {
return builder {
// Need to create a flow from the prior checkpoint or flow initiation.
actions.add(Action.RetryFlowFromSafePoint(startingState))
actions.add(Action.RetryFlowFromSafePoint(currentState))
FlowContinuation.Abort
}
}
private fun reloadFlowFromCheckpointAfterSuspendTransition(): TransitionResult {
return builder {
currentState = currentState.copy(reloadCheckpointAfterSuspendCount = currentState.reloadCheckpointAfterSuspendCount!! + 1)
actions.add(Action.RetryFlowFromSafePoint(currentState))
FlowContinuation.Abort
}
}

View File

@ -60,14 +60,6 @@ class NodeConfigurationImplTest {
assertThat(configValidationResult.first()).contains("crlCheckSoftFail")
}
@Test(timeout=3_000)
fun `check devModeOptions flag helper`() {
assertTrue { configDebugOptions(true, null).shouldCheckCheckpoints() }
assertTrue { configDebugOptions(true, DevModeOptions()).shouldCheckCheckpoints() }
assertTrue { configDebugOptions(true, DevModeOptions(false)).shouldCheckCheckpoints() }
assertFalse { configDebugOptions(true, DevModeOptions(true)).shouldCheckCheckpoints() }
}
@Test(timeout=3_000)
fun `check crashShell flags helper`() {
assertFalse { testConfiguration.copy(sshd = null).shouldStartSSHDaemon() }

View File

@ -639,6 +639,7 @@ private fun mockNodeConfiguration(certificatesDirectory: Path): NodeConfiguratio
doReturn(NetworkParameterAcceptanceSettings()).whenever(it).networkParameterAcceptanceSettings
doReturn(rigorousMock<ConfigurationWithOptions>()).whenever(it).configurationWithOptions
doReturn(2).whenever(it).flowExternalOperationThreadPoolSize
doReturn(false).whenever(it).reloadCheckpointAfterSuspend
}
}