diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowReloadAfterCheckpointTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowReloadAfterCheckpointTest.kt new file mode 100644 index 0000000000..1fd0ca76a7 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowReloadAfterCheckpointTest.kt @@ -0,0 +1,511 @@ +package net.corda.node.flows + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.HospitalizeFlowException +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.flows.StateMachineRunId +import net.corda.core.identity.Party +import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.IdempotentFlow +import net.corda.core.internal.TimedFlow +import net.corda.core.internal.concurrent.transpose +import net.corda.core.messaging.StateMachineTransactionMapping +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.core.utilities.unwrap +import net.corda.finance.DOLLARS +import net.corda.finance.flows.CashIssueAndPaymentFlow +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.statemachine.FlowStateMachineImpl +import net.corda.node.services.statemachine.FlowTimeoutException +import net.corda.node.services.statemachine.StaffedFlowHospital +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.node.internal.FINANCE_CORDAPPS +import net.corda.testing.node.internal.enclosedCordapp +import org.junit.Test +import java.sql.SQLTransientConnectionException +import java.util.concurrent.Semaphore +import kotlin.test.assertEquals +import kotlin.test.assertNull + +class FlowReloadAfterCheckpointTest { + + private companion object { + val cordapps = listOf(enclosedCordapp()) + } + + @Test(timeout = 300_000) + fun `flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true`() { + val reloadCounts = mutableMapOf() + FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id -> + reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 } + } + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + + val (alice, bob) = listOf(ALICE_NAME, BOB_NAME) + .map { + startNode( + providedName = it, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) + ) + } + .transpose() + .getOrThrow() + + val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), false, false, false) + val flowStartedByAlice = handle.id + handle.returnValue.getOrThrow() + assertEquals(5, reloadCounts[flowStartedByAlice]) + assertEquals(6, reloadCounts[ReloadFromCheckpointResponder.flowId]) + } + } + + @Test(timeout = 300_000) + fun `flow will not reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is false`() { + val reloadCounts = mutableMapOf() + FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id -> + reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 } + } + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + + val (alice, bob) = listOf(ALICE_NAME, BOB_NAME) + .map { + startNode( + providedName = it, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to false) + ) + } + .transpose() + .getOrThrow() + + val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), false, false, false) + val flowStartedByAlice = handle.id + handle.returnValue.getOrThrow() + assertNull(reloadCounts[flowStartedByAlice]) + assertNull(reloadCounts[ReloadFromCheckpointResponder.flowId]) + } + } + + @Test(timeout = 300_000) + fun `flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true and be kept for observation due to failed deserialization`() { + val reloadCounts = mutableMapOf() + FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id -> + reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 } + } + lateinit var flowKeptForObservation: StateMachineRunId + val lock = Semaphore(0) + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { id, _ -> + flowKeptForObservation = id + lock.release() + } + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + + val (alice, bob) = listOf(ALICE_NAME, BOB_NAME) + .map { + startNode( + providedName = it, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) + ) + } + .transpose() + .getOrThrow() + + val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), true, false, false) + val flowStartedByAlice = handle.id + lock.acquire() + assertEquals(flowStartedByAlice, flowKeptForObservation) + assertEquals(4, reloadCounts[flowStartedByAlice]) + assertEquals(4, reloadCounts[ReloadFromCheckpointResponder.flowId]) + } + } + + @Test(timeout = 300_000) + fun `flow will reload from a previous checkpoint after calling suspending function and skipping the persisting the current checkpoint when reloadCheckpointAfterSuspend is true`() { + val reloadCounts = mutableMapOf() + FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id -> + reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 } + } + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + + val (alice, bob) = listOf(ALICE_NAME, BOB_NAME) + .map { + startNode( + providedName = it, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) + ) + } + .transpose() + .getOrThrow() + + val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), false, false, true) + val flowStartedByAlice = handle.id + handle.returnValue.getOrThrow() + assertEquals(5, reloadCounts[flowStartedByAlice]) + assertEquals(6, reloadCounts[ReloadFromCheckpointResponder.flowId]) + } + } + + @Test(timeout = 300_000) + fun `idempotent flow will reload from initial checkpoint after calling a suspending function when reloadCheckpointAfterSuspend is true`() { + var reloadCount = 0 + FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 } + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + + val alice = startNode( + providedName = ALICE_NAME, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) + ).getOrThrow() + + alice.rpc.startFlow(::MyIdempotentFlow, false).returnValue.getOrThrow() + assertEquals(5, reloadCount) + } + } + + @Test(timeout = 300_000) + fun `idempotent flow will reload from initial checkpoint after calling a suspending function when reloadCheckpointAfterSuspend is true but can't throw deserialization error from objects in the call function`() { + var reloadCount = 0 + FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 } + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + + val alice = startNode( + providedName = ALICE_NAME, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) + ).getOrThrow() + + alice.rpc.startFlow(::MyIdempotentFlow, true).returnValue.getOrThrow() + assertEquals(5, reloadCount) + } + } + + @Test(timeout = 300_000) + fun `timed flow will reload from initial checkpoint after calling a suspending function when reloadCheckpointAfterSuspend is true`() { + var reloadCount = 0 + FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 } + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + + val alice = startNode( + providedName = ALICE_NAME, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) + ).getOrThrow() + + alice.rpc.startFlow(::MyTimedFlow).returnValue.getOrThrow() + assertEquals(5, reloadCount) + } + } + + @Test(timeout = 300_000) + fun `flow will correctly retry after an error when reloadCheckpointAfterSuspend is true`() { + var reloadCount = 0 + FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 } + var timesDischarged = 0 + StaffedFlowHospital.onFlowDischarged.add { _, _ -> timesDischarged += 1 } + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + + val alice = startNode( + providedName = ALICE_NAME, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) + ).getOrThrow() + + alice.rpc.startFlow(::TransientConnectionFailureFlow).returnValue.getOrThrow() + assertEquals(5, reloadCount) + assertEquals(3, timesDischarged) + } + } + + @Test(timeout = 300_000) + fun `flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() { + var reloadCount = 0 + FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 } + driver( + DriverParameters( + inMemoryDB = false, + startNodesInProcess = true, + notarySpecs = emptyList(), + cordappsForAllNodes = cordapps + ) + ) { + + val alice = startNode( + providedName = ALICE_NAME, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) + ).getOrThrow() + + alice.rpc.startFlow(::MyHospitalizingFlow) + Thread.sleep(10.seconds.toMillis()) + + alice.stop() + + startNode( + providedName = ALICE_NAME, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) + ).getOrThrow() + + Thread.sleep(20.seconds.toMillis()) + + assertEquals(5, reloadCount) + } + } + + @Test(timeout = 300_000) + fun `idempotent flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() { + var reloadCount = 0 + FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 } + driver( + DriverParameters( + inMemoryDB = false, + startNodesInProcess = true, + notarySpecs = emptyList(), + cordappsForAllNodes = cordapps + ) + ) { + + val alice = startNode( + providedName = ALICE_NAME, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) + ).getOrThrow() + + alice.rpc.startFlow(::IdempotentHospitalizingFlow) + Thread.sleep(10.seconds.toMillis()) + + alice.stop() + + startNode( + providedName = ALICE_NAME, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) + ).getOrThrow() + + Thread.sleep(20.seconds.toMillis()) + + // restarts completely from the beginning and forgets the in-memory reload count therefore + // it reloads an extra 2 times for checkpoints it had already reloaded before the node shutdown + assertEquals(7, reloadCount) + } + } + + @Test(timeout = 300_000) + fun `more complicated flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true`() { + val reloadCounts = mutableMapOf() + FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id -> + reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 } + } + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = FINANCE_CORDAPPS)) { + + val (alice, bob) = listOf(ALICE_NAME, BOB_NAME) + .map { + startNode( + providedName = it, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) + ) + } + .transpose() + .getOrThrow() + + val handle = alice.rpc.startFlow( + ::CashIssueAndPaymentFlow, + 500.DOLLARS, + OpaqueBytes.of(0x01), + bob.nodeInfo.singleIdentity(), + false, + defaultNotaryIdentity + ) + val flowStartedByAlice = handle.id + handle.returnValue.getOrThrow(30.seconds) + val flowStartedByBob = bob.rpc.stateMachineRecordedTransactionMappingSnapshot() + .map(StateMachineTransactionMapping::stateMachineRunId) + .toSet() + .single() + Thread.sleep(10.seconds.toMillis()) + assertEquals(7, reloadCounts[flowStartedByAlice]) + assertEquals(6, reloadCounts[flowStartedByBob]) + } + } + + /** + * Has 4 suspension points inside the flow and 1 in [FlowStateMachineImpl.run] totaling 5. + * Therefore this flow should reload 5 times when completed without errors or restarts. + */ + @StartableByRPC + @InitiatingFlow + class ReloadFromCheckpointFlow( + private val party: Party, + private val shouldHaveDeserializationError: Boolean, + private val counterPartyHasDeserializationError: Boolean, + private val skipCheckpoints: Boolean + ) : FlowLogic() { + + @Suspendable + override fun call() { + val session = initiateFlow(party) + session.send(counterPartyHasDeserializationError, skipCheckpoints) + session.receive(String::class.java, skipCheckpoints).unwrap { it } + stateMachine.suspend(FlowIORequest.ForceCheckpoint, skipCheckpoints) + val map = if (shouldHaveDeserializationError) { + BrokenMap(mutableMapOf("i dont want" to "this to work")) + } else { + mapOf("i dont want" to "this to work") + } + logger.info("I need to use my variable to pass the build!: $map") + session.sendAndReceive("hey I made it this far") + } + } + + /** + * Has 5 suspension points inside the flow and 1 in [FlowStateMachineImpl.run] totaling 6. + * Therefore this flow should reload 6 times when completed without errors or restarts. + */ + @InitiatedBy(ReloadFromCheckpointFlow::class) + class ReloadFromCheckpointResponder(private val session: FlowSession) : FlowLogic() { + + companion object { + var flowId: StateMachineRunId? = null + } + + @Suspendable + override fun call() { + flowId = runId + val counterPartyHasDeserializationError = session.receive().unwrap { it } + session.send("hello there 12312311") + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + val map = if (counterPartyHasDeserializationError) { + BrokenMap(mutableMapOf("i dont want" to "this to work")) + } else { + mapOf("i dont want" to "this to work") + } + logger.info("I need to use my variable to pass the build!: $map") + session.receive().unwrap { it } + session.send("sending back a message") + } + } + + /** + * Has 4 suspension points inside the flow and 1 in [FlowStateMachineImpl.run] totaling 5. + * Therefore this flow should reload 5 times when completed without errors or restarts. + */ + @StartableByRPC + @InitiatingFlow + class MyIdempotentFlow(private val shouldHaveDeserializationError: Boolean) : FlowLogic(), IdempotentFlow { + + @Suspendable + override fun call() { + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + val map = if (shouldHaveDeserializationError) { + BrokenMap(mutableMapOf("i dont want" to "this to work")) + } else { + mapOf("i dont want" to "this to work") + } + logger.info("I need to use my variable to pass the build!: $map") + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + } + } + + /** + * Has 4 suspension points inside the flow and 1 in [FlowStateMachineImpl.run] totaling 5. + * Therefore this flow should reload 5 times when completed without errors or restarts. + */ + @StartableByRPC + @InitiatingFlow + class MyTimedFlow : FlowLogic(), TimedFlow { + + companion object { + var thrown = false + } + + override val isTimeoutEnabled: Boolean = true + + @Suspendable + override fun call() { + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + if (!thrown) { + thrown = true + throw FlowTimeoutException() + } + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + } + } + + @StartableByRPC + @InitiatingFlow + class TransientConnectionFailureFlow : FlowLogic() { + + companion object { + var retryCount = 0 + } + + @Suspendable + override fun call() { + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + if (retryCount < 3) { + retryCount += 1 + throw SQLTransientConnectionException("Connection is not available") + + } + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + } + } + + /** + * Has 4 suspension points inside the flow and 1 in [FlowStateMachineImpl.run] totaling 5. + * Therefore this flow should reload 5 times when completed without errors or restarts. + */ + @StartableByRPC + @InitiatingFlow + class MyHospitalizingFlow : FlowLogic() { + + companion object { + var thrown = false + } + + @Suspendable + override fun call() { + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + if (!thrown) { + thrown = true + throw HospitalizeFlowException("i want to try again") + } + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + } + } + + /** + * Has 4 suspension points inside the flow and 1 in [FlowStateMachineImpl.run] totaling 5. + * Therefore this flow should reload 5 times when completed without errors or restarts. + */ + @StartableByRPC + @InitiatingFlow + class IdempotentHospitalizingFlow : FlowLogic(), IdempotentFlow { + + companion object { + var thrown = false + } + + @Suspendable + override fun call() { + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + if (!thrown) { + thrown = true + throw HospitalizeFlowException("i want to try again") + } + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt index 8d82b1a07d..1dda43c691 100644 --- a/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt @@ -1,10 +1,13 @@ package net.corda.node.flows import co.paralleluniverse.fibers.Suspendable -import net.corda.client.rpc.CordaRPCClient -import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.core.CordaRuntimeException -import net.corda.core.flows.* +import net.corda.core.flows.FlowExternalAsyncOperation +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.StartableByRPC import net.corda.core.identity.Party import net.corda.core.internal.IdempotentFlow import net.corda.core.internal.concurrent.transpose @@ -23,6 +26,7 @@ import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.driver import net.corda.testing.node.User +import net.corda.testing.node.internal.enclosedCordapp import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.hibernate.exception.ConstraintViolationException import org.junit.After @@ -33,7 +37,8 @@ import java.sql.SQLException import java.sql.SQLTransientConnectionException import java.time.Duration import java.time.temporal.ChronoUnit -import java.util.* +import java.util.Collections +import java.util.HashSet import java.util.concurrent.CompletableFuture import java.util.concurrent.TimeoutException import kotlin.test.assertEquals @@ -41,7 +46,11 @@ import kotlin.test.assertFailsWith import kotlin.test.assertNotNull class FlowRetryTest { - val config = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryIntervalMultiplier = 1.1) + + private companion object { + val user = User("mark", "dadada", setOf(Permissions.all())) + val cordapps = listOf(enclosedCordapp()) + } @Before fun resetCounters() { @@ -58,154 +67,134 @@ class FlowRetryTest { StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear() } - @Test(timeout=300_000) - fun `flows continue despite errors`() { + @Test(timeout = 300_000) + fun `flows continue despite errors`() { val numSessions = 2 val numIterations = 10 - val user = User("mark", "dadada", setOf(Permissions.startFlow())) - val result: Any? = driver(DriverParameters( - startNodesInProcess = isQuasarAgentSpecified(), - notarySpecs = emptyList() - )) { - val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME) - .map { startNode(providedName = it, rpcUsers = listOf(user)) } - .transpose() - .getOrThrow() + val result: Any? = driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { - val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use { - it.proxy.startFlow(::InitiatorFlow, numSessions, numIterations, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow() - } + val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME) + .map { startNode(providedName = it, rpcUsers = listOf(user)) } + .transpose() + .getOrThrow() + + val result = nodeAHandle.rpc.startFlow( + ::InitiatorFlow, + numSessions, + numIterations, + nodeBHandle.nodeInfo.singleIdentity() + ).returnValue.getOrThrow() result } assertNotNull(result) assertEquals("$numSessions:$numIterations", result) } - @Test(timeout=300_000) - fun `async operation deduplication id is stable accross retries`() { - val user = User("mark", "dadada", setOf(Permissions.startFlow())) - driver(DriverParameters( - startNodesInProcess = isQuasarAgentSpecified(), - notarySpecs = emptyList() - )) { + @Test(timeout = 300_000) + fun `async operation deduplication id is stable accross retries`() { + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + nodeAHandle.rpc.startFlow(::AsyncRetryFlow).returnValue.getOrThrow() + } + } - CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use { - it.proxy.startFlow(::AsyncRetryFlow).returnValue.getOrThrow() + @Test(timeout = 300_000) + fun `flow gives up after number of exceptions, even if this is the first line of the flow`() { + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + assertFailsWith { + nodeAHandle.rpc.startFlow(::RetryFlow).returnValue.getOrThrow() } } } - @Test(timeout=300_000) - fun `flow gives up after number of exceptions, even if this is the first line of the flow`() { - val user = User("mark", "dadada", setOf(Permissions.startFlow())) - assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy { - driver(DriverParameters( - startNodesInProcess = isQuasarAgentSpecified(), - notarySpecs = emptyList() - )) { - val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() - - val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use { - it.proxy.startFlow(::RetryFlow).returnValue.getOrThrow() - } - result + @Test(timeout = 300_000) + fun `flow that throws in constructor throw for the RPC client that attempted to start them`() { + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + assertFailsWith { + nodeAHandle.rpc.startFlow(::ThrowingFlow).returnValue.getOrThrow() } } } - @Test(timeout=300_000) - fun `flow that throws in constructor throw for the RPC client that attempted to start them`() { - val user = User("mark", "dadada", setOf(Permissions.startFlow())) - assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy { - driver(DriverParameters( - startNodesInProcess = isQuasarAgentSpecified(), - notarySpecs = emptyList() - )) { - val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() - - val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use { - it.proxy.startFlow(::ThrowingFlow).returnValue.getOrThrow() - } - result - } - } - } - - @Test(timeout=300_000) - fun `SQLTransientConnectionExceptions thrown by hikari are retried 3 times and then kept in the checkpoints table`() { - val user = User("mark", "dadada", setOf(Permissions.all())) - driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) { + @Test(timeout = 300_000) + fun `SQLTransientConnectionExceptions thrown by hikari are retried 3 times and then kept in the checkpoints table`() { + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME) - .map { startNode(providedName = it, rpcUsers = listOf(user)) } - .transpose() - .getOrThrow() - CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use { - assertFailsWith { - it.proxy.startFlow(::TransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity()) - .returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS)) - } - assertEquals(3, TransientConnectionFailureFlow.retryCount) - assertEquals(1, it.proxy.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.HOSPITALIZED).returnValue.get()) + .map { startNode(providedName = it, rpcUsers = listOf(user)) } + .transpose() + .getOrThrow() + + assertFailsWith { + nodeAHandle.rpc.startFlow(::TransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity()) + .returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS)) } + assertEquals(3, TransientConnectionFailureFlow.retryCount) + assertEquals( + 1, + nodeAHandle.rpc.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.HOSPITALIZED).returnValue.get() + ) } } - @Test(timeout=300_000) - fun `Specific exception still detected even if it is nested inside another exception`() { - val user = User("mark", "dadada", setOf(Permissions.all())) - driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) { + @Test(timeout = 300_000) + fun `Specific exception still detected even if it is nested inside another exception`() { + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME) - .map { startNode(providedName = it, rpcUsers = listOf(user)) } - .transpose() - .getOrThrow() - CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use { - assertFailsWith { - it.proxy.startFlow(::WrappedTransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity()) - .returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS)) - } - assertEquals(3, WrappedTransientConnectionFailureFlow.retryCount) - assertEquals(1, it.proxy.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.HOSPITALIZED).returnValue.get()) + .map { startNode(providedName = it, rpcUsers = listOf(user)) } + .transpose() + .getOrThrow() + + assertFailsWith { + nodeAHandle.rpc.startFlow(::WrappedTransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity()) + .returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS)) } + assertEquals(3, WrappedTransientConnectionFailureFlow.retryCount) + assertEquals( + 1, + nodeAHandle.rpc.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.HOSPITALIZED).returnValue.get() + ) } } - @Test(timeout=300_000) - fun `General external exceptions are not retried and propagate`() { - val user = User("mark", "dadada", setOf(Permissions.all())) - driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) { + @Test(timeout = 300_000) + fun `General external exceptions are not retried and propagate`() { + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME) - .map { startNode(providedName = it, rpcUsers = listOf(user)) } - .transpose() - .getOrThrow() + .map { startNode(providedName = it, rpcUsers = listOf(user)) } + .transpose() + .getOrThrow() - CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use { - assertFailsWith { - it.proxy.startFlow(::GeneralExternalFailureFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow() - } - assertEquals(0, GeneralExternalFailureFlow.retryCount) - assertEquals(1, it.proxy.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.FAILED).returnValue.get()) + assertFailsWith { + nodeAHandle.rpc.startFlow( + ::GeneralExternalFailureFlow, + nodeBHandle.nodeInfo.singleIdentity() + ).returnValue.getOrThrow() } + assertEquals(0, GeneralExternalFailureFlow.retryCount) + assertEquals( + 1, + nodeAHandle.rpc.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.FAILED).returnValue.get() + ) } } - @Test(timeout=300_000) - fun `Permission exceptions are not retried and propagate`() { + @Test(timeout = 300_000) + fun `Permission exceptions are not retried and propagate`() { val user = User("mark", "dadada", setOf()) - driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) { + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() - CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use { - assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy { - it.proxy.startFlow(::AsyncRetryFlow).returnValue.getOrThrow() - }.withMessageStartingWith("User not authorized to perform RPC call") - // This stays at -1 since the flow never even got called - assertEquals(-1, GeneralExternalFailureFlow.retryCount) - } + assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy { + nodeAHandle.rpc.startFlow(::AsyncRetryFlow).returnValue.getOrThrow() + }.withMessageStartingWith("User not authorized to perform RPC call") + // This stays at -1 since the flow never even got called + assertEquals(-1, GeneralExternalFailureFlow.retryCount) } } } @@ -315,6 +304,10 @@ enum class Step { First, BeforeInitiate, AfterInitiate, AfterInitiateSendReceive data class Visited(val sessionNum: Int, val iterationNum: Int, val step: Step) +class BrokenMap(delegate: MutableMap = mutableMapOf()) : MutableMap by delegate { + override fun put(key: K, value: V): V? = throw IllegalStateException("Broken on purpose") +} + @StartableByRPC class RetryFlow() : FlowLogic(), IdempotentFlow { companion object { @@ -342,7 +335,7 @@ class AsyncRetryFlow() : FlowLogic(), IdempotentFlow { val deduplicationIds = mutableSetOf() } - class RecordDeduplicationId: FlowExternalAsyncOperation { + class RecordDeduplicationId : FlowExternalAsyncOperation { override fun execute(deduplicationId: String): CompletableFuture { val dedupeIdIsNew = deduplicationIds.add(deduplicationId) if (dedupeIdIsNew) { @@ -423,8 +416,9 @@ class WrappedTransientConnectionFailureFlow(private val party: Party) : FlowLogi // checkpoint will restart the flow after the send retryCount += 1 throw IllegalStateException( - "wrapped error message", - IllegalStateException("another layer deep", SQLTransientConnectionException("Connection is not available"))) + "wrapped error message", + IllegalStateException("another layer deep", SQLTransientConnectionException("Connection is not available")) + ) } } @@ -465,12 +459,14 @@ class GeneralExternalFailureResponder(private val session: FlowSession) : FlowLo @StartableByRPC class GetCheckpointNumberOfStatusFlow(private val flowStatus: Checkpoint.FlowStatus) : FlowLogic() { + + @Suspendable override fun call(): Long { val sqlStatement = - "select count(*) " + - "from node_checkpoints " + - "where status = ${flowStatus.ordinal} " + - "and flow_id != '${runId.uuid}' " // don't count in the checkpoint of the current flow + "select count(*) " + + "from node_checkpoints " + + "where status = ${flowStatus.ordinal} " + + "and flow_id != '${runId.uuid}' " // don't count in the checkpoint of the current flow return serviceHub.jdbcSession().prepareStatement(sqlStatement).use { ps -> ps.executeQuery().use { rs -> diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index a12989e169..9c3d5f9741 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -93,6 +93,8 @@ interface NodeConfiguration : ConfigurationWithOptionsContainer { val quasarExcludePackages: List + val reloadCheckpointAfterSuspend: Boolean + companion object { // default to at least 8MB and a bit extra for larger heap sizes val defaultTransactionCacheSize: Long = 8.MB + getAdditionalCacheMemory() @@ -125,9 +127,13 @@ enum class JmxReporterType { } data class DevModeOptions( - val disableCheckpointChecker: Boolean = Defaults.disableCheckpointChecker, - val allowCompatibilityZone: Boolean = Defaults.allowCompatibilityZone, - val djvm: DJVMOptions? = null + @Deprecated( + "The checkpoint checker has been replaced by the ability to reload a checkpoint from the database after every suspend" + + "Use [NodeConfiguration.disableReloadCheckpointAfterSuspend] instead." + ) + val disableCheckpointChecker: Boolean = Defaults.disableCheckpointChecker, + val allowCompatibilityZone: Boolean = Defaults.allowCompatibilityZone, + val djvm: DJVMOptions? = null ) { internal object Defaults { val disableCheckpointChecker = false @@ -140,10 +146,6 @@ data class DJVMOptions( val cordaSource: List ) -fun NodeConfiguration.shouldCheckCheckpoints(): Boolean { - return this.devMode && this.devModeOptions?.disableCheckpointChecker != true -} - fun NodeConfiguration.shouldStartSSHDaemon() = this.sshd != null fun NodeConfiguration.shouldStartLocalShell() = !this.noLocalShell && System.console() != null && this.devMode fun NodeConfiguration.shouldInitCrashShell() = shouldStartLocalShell() || shouldStartSSHDaemon() diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt index e1dcc86903..6106441279 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt @@ -84,7 +84,9 @@ data class NodeConfigurationImpl( override val blacklistedAttachmentSigningKeys: List = Defaults.blacklistedAttachmentSigningKeys, override val configurationWithOptions: ConfigurationWithOptions, override val flowExternalOperationThreadPoolSize: Int = Defaults.flowExternalOperationThreadPoolSize, - override val quasarExcludePackages: List = Defaults.quasarExcludePackages + override val quasarExcludePackages: List = Defaults.quasarExcludePackages, + override val reloadCheckpointAfterSuspend: Boolean = Defaults.reloadCheckpointAfterSuspend + ) : NodeConfiguration { internal object Defaults { val jmxMonitoringHttpPort: Int? = null @@ -123,6 +125,7 @@ data class NodeConfigurationImpl( val blacklistedAttachmentSigningKeys: List = emptyList() const val flowExternalOperationThreadPoolSize: Int = 1 val quasarExcludePackages: List = emptyList() + val reloadCheckpointAfterSuspend: Boolean = System.getProperty("reloadCheckpointAfterSuspend", "false")!!.toBoolean() fun cordappsDirectories(baseDirectory: Path) = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT) diff --git a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt index d396c466c0..a10c15870b 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt @@ -8,6 +8,7 @@ import net.corda.common.validation.internal.Validated.Companion.invalid import net.corda.common.validation.internal.Validated.Companion.valid import net.corda.node.services.config.* import net.corda.node.services.config.NodeConfigurationImpl.Defaults +import net.corda.node.services.config.NodeConfigurationImpl.Defaults.reloadCheckpointAfterSuspend import net.corda.node.services.config.schema.parsers.* internal object V1NodeConfigurationSpec : Configuration.Specification("NodeConfiguration") { @@ -66,6 +67,7 @@ internal object V1NodeConfigurationSpec : Configuration.Specification, - val maySkipCheckpoint: Boolean, - val fiber: SerializedBytes>, - var progressStep: ProgressTracker.Step? + val ioRequest: FlowIORequest<*>, + val maySkipCheckpoint: Boolean, + val fiber: SerializedBytes>, + var progressStep: ProgressTracker.Step? ) : Event() { override fun toString() = - "Suspend(" + - "ioRequest=$ioRequest, " + - "maySkipCheckpoint=$maySkipCheckpoint, " + - "fiber=${fiber.hash}, " + - "currentStep=${progressStep?.label}" + - ")" + "Suspend(" + + "ioRequest=$ioRequest, " + + "maySkipCheckpoint=$maySkipCheckpoint, " + + "fiber=${fiber.hash}, " + + "currentStep=${progressStep?.label}" + + ")" } /** @@ -148,12 +148,21 @@ sealed class Event { data class AsyncOperationThrows(val throwable: Throwable) : Event() /** - * Retry a flow from the last checkpoint, or if there is no checkpoint, restart the flow with the same invocation details. + * Retry a flow from its last checkpoint, or if there is no checkpoint, restart the flow with the same invocation details. */ object RetryFlowFromSafePoint : Event() { override fun toString() = "RetryFlowFromSafePoint" } + /** + * Reload a flow from its last checkpoint, or if there is no checkpoint, restart the flow with the same invocation details. + * This is separate from [RetryFlowFromSafePoint] which is used for error handling within the state machine. + * [ReloadFlowFromCheckpointAfterSuspend] is only used when [NodeConfiguration.reloadCheckpointAfterSuspend] is true. + */ + object ReloadFlowFromCheckpointAfterSuspend : Event() { + override fun toString() = "ReloadFlowFromCheckpointAfterSuspend" + } + /** * Keeps a flow for overnight observation. Overnight observation practically sends the fiber to get suspended, * in [FlowStateMachineImpl.processEventsUntilFlowIsResumed]. Since the fiber's channel will have no more events to process, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt index 08a006c345..8e388b8d35 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt @@ -19,6 +19,7 @@ import net.corda.core.utilities.contextLogger import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.messaging.DeduplicationHandler +import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.currentStateMachine import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.isEnabledTimedFlow import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -36,21 +37,23 @@ class NonResidentFlow(val runId: StateMachineRunId, val checkpoint: Checkpoint) } class FlowCreator( - val checkpointSerializationContext: CheckpointSerializationContext, + private val checkpointSerializationContext: CheckpointSerializationContext, private val checkpointStorage: CheckpointStorage, - val scheduler: FiberScheduler, - val database: CordaPersistence, - val transitionExecutor: TransitionExecutor, - val actionExecutor: ActionExecutor, - val secureRandom: SecureRandom, - val serviceHub: ServiceHubInternal, - val unfinishedFibers: ReusableLatch, - val resetCustomTimeout: (StateMachineRunId, Long) -> Unit) { + private val scheduler: FiberScheduler, + private val database: CordaPersistence, + private val transitionExecutor: TransitionExecutor, + private val actionExecutor: ActionExecutor, + private val secureRandom: SecureRandom, + private val serviceHub: ServiceHubInternal, + private val unfinishedFibers: ReusableLatch, + private val resetCustomTimeout: (StateMachineRunId, Long) -> Unit) { companion object { private val logger = contextLogger() } + private val reloadCheckpointAfterSuspend = serviceHub.configuration.reloadCheckpointAfterSuspend + fun createFlowFromNonResidentFlow(nonResidentFlow: NonResidentFlow): Flow<*>? { // As for paused flows we don't extract the serialized flow state we need to re-extract the checkpoint from the database. val checkpoint = when (nonResidentFlow.checkpoint.status) { @@ -65,13 +68,23 @@ class FlowCreator( return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint) } - fun createFlowFromCheckpoint(runId: StateMachineRunId, oldCheckpoint: Checkpoint): Flow<*>? { + fun createFlowFromCheckpoint( + runId: StateMachineRunId, + oldCheckpoint: Checkpoint, + reloadCheckpointAfterSuspendCount: Int? = null + ): Flow<*>? { val checkpoint = oldCheckpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE) val fiber = checkpoint.getFiberFromCheckpoint(runId) ?: return null val resultFuture = openFuture() fiber.logic.stateMachine = fiber verifyFlowLogicIsSuspendable(fiber.logic) - val state = createStateMachineState(checkpoint, fiber, true) + val state = createStateMachineState( + checkpoint = checkpoint, + fiber = fiber, + anyCheckpointPersisted = true, + reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount + ?: if (reloadCheckpointAfterSuspend) checkpoint.checkpointState.numberOfSuspends else null + ) fiber.transientValues = createTransientValues(runId, resultFuture) fiber.transientState = state return Flow(fiber, resultFuture) @@ -108,11 +121,13 @@ class FlowCreator( ).getOrThrow() val state = createStateMachineState( - checkpoint, - flowStateMachineImpl, - existingCheckpoint != null, - deduplicationHandler, - senderUUID) + checkpoint = checkpoint, + fiber = flowStateMachineImpl, + anyCheckpointPersisted = existingCheckpoint != null, + reloadCheckpointAfterSuspendCount = if (reloadCheckpointAfterSuspend) 0 else null, + deduplicationHandler = deduplicationHandler, + senderUUID = senderUUID + ) flowStateMachineImpl.transientState = state return Flow(flowStateMachineImpl, resultFuture) } @@ -125,9 +140,7 @@ class FlowCreator( } is FlowState.Started -> tryCheckpointDeserialize(this.flowState.frozenFiber, runId) ?: return null // Places calling this function is rely on it to return null if the flow cannot be created from the checkpoint. - else -> { - return null - } + else -> null } } @@ -136,8 +149,16 @@ class FlowCreator( return try { bytes.checkpointDeserialize(context = checkpointSerializationContext) } catch (e: Exception) { - logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e) - null + if (reloadCheckpointAfterSuspend && currentStateMachine() != null) { + logger.error( + "Unable to deserialize checkpoint for flow $flowId. [reloadCheckpointAfterSuspend] is turned on, throwing exception", + e + ) + throw ReloadFlowFromCheckpointException(e) + } else { + logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e) + null + } } } @@ -169,12 +190,15 @@ class FlowCreator( ) } + @Suppress("LongParameterList") private fun createStateMachineState( - checkpoint: Checkpoint, - fiber: FlowStateMachineImpl<*>, - anyCheckpointPersisted: Boolean, - deduplicationHandler: DeduplicationHandler? = null, - senderUUID: String? = null): StateMachineState { + checkpoint: Checkpoint, + fiber: FlowStateMachineImpl<*>, + anyCheckpointPersisted: Boolean, + reloadCheckpointAfterSuspendCount: Int?, + deduplicationHandler: DeduplicationHandler? = null, + senderUUID: String? = null + ): StateMachineState { return StateMachineState( checkpoint = checkpoint, pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(), @@ -186,6 +210,8 @@ class FlowCreator( isRemoved = false, isKilled = false, flowLogic = fiber.logic, - senderUUID = senderUUID) + senderUUID = senderUUID, + reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount + ) } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index ce4fdea2bd..6b0ad10698 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -29,6 +29,7 @@ import net.corda.core.internal.DeclaredField import net.corda.core.internal.FlowIORequest import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.IdempotentFlow +import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.isIdempotentFlow import net.corda.core.internal.isRegularFile @@ -87,6 +88,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, private val log: Logger = LoggerFactory.getLogger("net.corda.flow") private val SERIALIZER_BLOCKER = Fiber::class.java.getDeclaredField("SERIALIZER_BLOCKER").apply { isAccessible = true }.get(null) + + @VisibleForTesting + var onReloadFlowFromCheckpoint: ((id: StateMachineRunId) -> Unit)? = null } data class TransientValues( @@ -504,10 +508,10 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, contextTransactionOrNull = transaction.value val event = try { Event.Suspend( - ioRequest = ioRequest, - maySkipCheckpoint = skipPersistingCheckpoint, - fiber = this.checkpointSerialize(context = serializationContext.value), - progressStep = logic.progressTracker?.currentStep + ioRequest = ioRequest, + maySkipCheckpoint = skipPersistingCheckpoint, + fiber = this.checkpointSerialize(context = serializationContext.value), + progressStep = logic.progressTracker?.currentStep ) } catch (exception: Exception) { Event.Error(exception) @@ -529,6 +533,18 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, unpark(SERIALIZER_BLOCKER) } } + + transientState.reloadCheckpointAfterSuspendCount?.let { count -> + if (count < transientState.checkpoint.checkpointState.numberOfSuspends) { + onReloadFlowFromCheckpoint?.invoke(id) + processEventImmediately( + Event.ReloadFlowFromCheckpointAfterSuspend, + isDbTransactionOpenOnEntry = false, + isDbTransactionOpenOnExit = false + ) + } + } + return uncheckedCast(processEventsUntilFlowIsResumed( isDbTransactionOpenOnEntry = false, isDbTransactionOpenOnExit = true diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index fba5833661..d1aa9ee6aa 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -30,11 +30,9 @@ import net.corda.core.utilities.debug import net.corda.node.internal.InitiatedFlowFactory import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal -import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler +import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.currentStateMachine import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor -import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker -import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor import net.corda.node.services.statemachine.interceptors.PrintingInterceptor import net.corda.node.utilities.AffinityExecutor @@ -89,7 +87,6 @@ internal class SingleThreadedStateMachineManager( private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub) private val actionFutureExecutor = ActionFutureExecutor(innerState, serviceHub, scheduledFutureExecutor) private val flowTimeoutScheduler = FlowTimeoutScheduler(innerState, scheduledFutureExecutor, serviceHub) - private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null private val ourSenderUUID = serviceHub.networkService.ourSenderUUID private var checkpointSerializationContext: CheckpointSerializationContext? = null @@ -97,6 +94,7 @@ internal class SingleThreadedStateMachineManager( override val flowHospital: StaffedFlowHospital = makeFlowHospital() private val transitionExecutor = makeTransitionExecutor() + private val reloadCheckpointAfterSuspend = serviceHub.configuration.reloadCheckpointAfterSuspend override val allStateMachines: List> get() = innerState.withLock { flows.values.map { it.fiber.logic } } @@ -124,7 +122,6 @@ internal class SingleThreadedStateMachineManager( ) this.checkpointSerializationContext = checkpointSerializationContext val actionExecutor = makeActionExecutor(checkpointSerializationContext) - fiberDeserializationChecker?.start(checkpointSerializationContext) when (startMode) { StateMachineManager.StartMode.ExcludingPaused -> {} StateMachineManager.StartMode.Safe -> markAllFlowsAsPaused() @@ -207,10 +204,6 @@ internal class SingleThreadedStateMachineManager( // Account for any expected Fibers in a test scenario. liveFibers.countDown(allowedUnsuspendedFiberCount) liveFibers.await() - fiberDeserializationChecker?.let { - val foundUnrestorableFibers = it.stop() - check(!foundUnrestorableFibers) { "Unrestorable checkpoints were created, please check the logs for details." } - } flowHospital.close() scheduledFutureExecutor.shutdown() scheduler.shutdown() @@ -397,7 +390,7 @@ internal class SingleThreadedStateMachineManager( val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId) ?: return // Resurrect flow - flowCreator.createFlowFromCheckpoint(flowId, checkpoint) ?: return + flowCreator.createFlowFromCheckpoint(flowId, checkpoint, currentState.reloadCheckpointAfterSuspendCount) ?: return } else { // Just flow initiation message null @@ -632,8 +625,16 @@ internal class SingleThreadedStateMachineManager( return try { serializedCheckpoint.deserialize(checkpointSerializationContext!!) } catch (e: Exception) { - logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e) - null + if (reloadCheckpointAfterSuspend && currentStateMachine() != null) { + logger.error( + "Unable to deserialize checkpoint for flow $flowId. [reloadCheckpointAfterSuspend] is turned on, throwing exception", + e + ) + throw ReloadFlowFromCheckpointException(e) + } else { + logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e) + null + } } } @@ -700,9 +701,6 @@ internal class SingleThreadedStateMachineManager( if (serviceHub.configuration.devMode) { interceptors.add { DumpHistoryOnErrorInterceptor(it) } } - if (serviceHub.configuration.shouldCheckCheckpoints()) { - interceptors.add { FiberDeserializationCheckingInterceptor(fiberDeserializationChecker!!, it) } - } if (logger.isDebugEnabled) { interceptors.add { PrintingInterceptor(it) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index 4d6e73bfbe..f74497d48a 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -589,6 +589,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, return if (newError.mentionsThrowable(StateTransitionException::class.java)) { when { newError.mentionsThrowable(InterruptedException::class.java) -> Diagnosis.TERMINAL + newError.mentionsThrowable(ReloadFlowFromCheckpointException::class.java) -> Diagnosis.OVERNIGHT_OBSERVATION newError.mentionsThrowable(AsyncOperationTransitionException::class.java) -> Diagnosis.NOT_MY_SPECIALTY history.notDischargedForTheSameThingMoreThan(2, this, currentState) -> Diagnosis.DISCHARGE else -> Diagnosis.OVERNIGHT_OBSERVATION diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 6c124d41e6..5d8326b668 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -59,7 +59,8 @@ data class StateMachineState( val isRemoved: Boolean, @Volatile var isKilled: Boolean, - val senderUUID: String? + val senderUUID: String?, + val reloadCheckpointAfterSuspendCount: Int? ) : KryoSerializable { override fun write(kryo: Kryo?, output: Output?) { throw IllegalStateException("${StateMachineState::class.qualifiedName} should never be serialized") diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateTransitionExceptions.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateTransitionExceptions.kt index 2e37261c04..2c3fc5b641 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateTransitionExceptions.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateTransitionExceptions.kt @@ -1,6 +1,6 @@ package net.corda.node.services.statemachine -import net.corda.core.CordaException +import net.corda.core.CordaRuntimeException import net.corda.core.serialization.ConstructorForDeserialization // CORDA-3353 - These exceptions should not be propagated up to rpc as they suppress the real exceptions @@ -9,12 +9,17 @@ class StateTransitionException( val transitionAction: Action?, val transitionEvent: Event?, val exception: Exception -) : CordaException(exception.message, exception) { +) : CordaRuntimeException(exception.message, exception) { @ConstructorForDeserialization constructor(exception: Exception): this(null, null, exception) } -class AsyncOperationTransitionException(exception: Exception) : CordaException(exception.message, exception) +class AsyncOperationTransitionException(exception: Exception) : CordaRuntimeException(exception.message, exception) -class ErrorStateTransitionException(val exception: Exception) : CordaException(exception.message, exception) \ No newline at end of file +class ErrorStateTransitionException(val exception: Exception) : CordaRuntimeException(exception.message, exception) + +class ReloadFlowFromCheckpointException(cause: Exception) : CordaRuntimeException( + "Could not reload flow from checkpoint. This is likely due to a discrepancy " + + "between the serialization and deserialization of an object in the flow's checkpoint", cause +) \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt deleted file mode 100644 index 57c742b9ff..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt +++ /dev/null @@ -1,101 +0,0 @@ -package net.corda.node.services.statemachine.interceptors - -import co.paralleluniverse.fibers.Suspendable -import net.corda.core.serialization.SerializedBytes -import net.corda.core.serialization.internal.CheckpointSerializationContext -import net.corda.core.serialization.internal.checkpointDeserialize -import net.corda.core.utilities.contextLogger -import net.corda.node.services.statemachine.ActionExecutor -import net.corda.node.services.statemachine.Event -import net.corda.node.services.statemachine.FlowFiber -import net.corda.node.services.statemachine.FlowState -import net.corda.node.services.statemachine.FlowStateMachineImpl -import net.corda.node.services.statemachine.StateMachineState -import net.corda.node.services.statemachine.TransitionExecutor -import net.corda.node.services.statemachine.transitions.FlowContinuation -import net.corda.node.services.statemachine.transitions.TransitionResult -import java.util.concurrent.LinkedBlockingQueue -import kotlin.concurrent.thread - -/** - * This interceptor checks whether a checkpointed fiber state can be deserialised in a separate thread. - */ -class FiberDeserializationCheckingInterceptor( - val fiberDeserializationChecker: FiberDeserializationChecker, - val delegate: TransitionExecutor -) : TransitionExecutor { - - @Suspendable - override fun executeTransition( - fiber: FlowFiber, - previousState: StateMachineState, - event: Event, - transition: TransitionResult, - actionExecutor: ActionExecutor - ): Pair { - val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor) - val previousFlowState = previousState.checkpoint.flowState - val nextFlowState = nextState.checkpoint.flowState - if (nextFlowState is FlowState.Started) { - if (previousFlowState !is FlowState.Started || previousFlowState.frozenFiber != nextFlowState.frozenFiber) { - fiberDeserializationChecker.submitCheck(nextFlowState.frozenFiber) - } - } - return Pair(continuation, nextState) - } -} - -/** - * A fiber deserialisation checker thread. It checks the queued up serialised checkpoints to see if they can be - * deserialised. This is only run in development mode to allow detecting of corrupt serialised checkpoints before they - * are actually used. - */ -class FiberDeserializationChecker { - companion object { - val log = contextLogger() - } - - private sealed class Job { - class Check(val serializedFiber: SerializedBytes>) : Job() - object Finish : Job() - } - - private var checkerThread: Thread? = null - private val jobQueue = LinkedBlockingQueue() - private var foundUnrestorableFibers: Boolean = false - - fun start(checkpointSerializationContext: CheckpointSerializationContext) { - require(checkerThread == null){"Checking thread must not already be started"} - checkerThread = thread(name = "FiberDeserializationChecker") { - while (true) { - val job = jobQueue.take() - when (job) { - is Job.Check -> { - try { - job.serializedFiber.checkpointDeserialize(context = checkpointSerializationContext) - } catch (exception: Exception) { - log.error("Encountered unrestorable checkpoint!", exception) - foundUnrestorableFibers = true - } - } - Job.Finish -> { - return@thread - } - } - } - } - } - - fun submitCheck(serializedFiber: SerializedBytes>) { - jobQueue.add(Job.Check(serializedFiber)) - } - - /** - * Returns true if some unrestorable checkpoints were encountered, false otherwise - */ - fun stop(): Boolean { - jobQueue.add(Job.Finish) - checkerThread?.join() - return foundUnrestorableFibers - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 4846ee101d..169148108e 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -58,7 +58,8 @@ class TopLevelTransition( is Event.InitiateFlow -> initiateFlowTransition(event) is Event.AsyncOperationCompletion -> asyncOperationCompletionTransition(event) is Event.AsyncOperationThrows -> asyncOperationThrowsTransition(event) - is Event.RetryFlowFromSafePoint -> retryFlowFromSafePointTransition(startingState) + is Event.RetryFlowFromSafePoint -> retryFlowFromSafePointTransition() + is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition() is Event.OvernightObservation -> overnightObservationTransition() is Event.WakeUpFromSleep -> wakeUpFromSleepTransition() } @@ -198,8 +199,8 @@ class TopLevelTransition( Action.ScheduleEvent(Event.DoRemainingWork) )) currentState = currentState.copy( - checkpoint = newCheckpoint, - isFlowResumed = false + checkpoint = newCheckpoint, + isFlowResumed = false ) } else { actions.addAll(arrayOf( @@ -210,10 +211,10 @@ class TopLevelTransition( Action.ScheduleEvent(Event.DoRemainingWork) )) currentState = currentState.copy( - checkpoint = newCheckpoint, - pendingDeduplicationHandlers = emptyList(), - isFlowResumed = false, - isAnyCheckpointPersisted = true + checkpoint = newCheckpoint, + pendingDeduplicationHandlers = emptyList(), + isFlowResumed = false, + isAnyCheckpointPersisted = true ) } FlowContinuation.ProcessEvents @@ -315,10 +316,18 @@ class TopLevelTransition( } } - private fun retryFlowFromSafePointTransition(startingState: StateMachineState): TransitionResult { + private fun retryFlowFromSafePointTransition(): TransitionResult { return builder { // Need to create a flow from the prior checkpoint or flow initiation. - actions.add(Action.RetryFlowFromSafePoint(startingState)) + actions.add(Action.RetryFlowFromSafePoint(currentState)) + FlowContinuation.Abort + } + } + + private fun reloadFlowFromCheckpointAfterSuspendTransition(): TransitionResult { + return builder { + currentState = currentState.copy(reloadCheckpointAfterSuspendCount = currentState.reloadCheckpointAfterSuspendCount!! + 1) + actions.add(Action.RetryFlowFromSafePoint(currentState)) FlowContinuation.Abort } } diff --git a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt index 8cea3cebbb..069901080f 100644 --- a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt @@ -60,14 +60,6 @@ class NodeConfigurationImplTest { assertThat(configValidationResult.first()).contains("crlCheckSoftFail") } - @Test(timeout=3_000) - fun `check devModeOptions flag helper`() { - assertTrue { configDebugOptions(true, null).shouldCheckCheckpoints() } - assertTrue { configDebugOptions(true, DevModeOptions()).shouldCheckCheckpoints() } - assertTrue { configDebugOptions(true, DevModeOptions(false)).shouldCheckCheckpoints() } - assertFalse { configDebugOptions(true, DevModeOptions(true)).shouldCheckCheckpoints() } - } - @Test(timeout=3_000) fun `check crashShell flags helper`() { assertFalse { testConfiguration.copy(sshd = null).shouldStartSSHDaemon() } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index 9ae6f1f9d2..681325d382 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -639,6 +639,7 @@ private fun mockNodeConfiguration(certificatesDirectory: Path): NodeConfiguratio doReturn(NetworkParameterAcceptanceSettings()).whenever(it).networkParameterAcceptanceSettings doReturn(rigorousMock()).whenever(it).configurationWithOptions doReturn(2).whenever(it).flowExternalOperationThreadPoolSize + doReturn(false).whenever(it).reloadCheckpointAfterSuspend } }