diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowReloadAfterCheckpointTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowReloadAfterCheckpointTest.kt index dd51ad621d..add9ecb651 100644 --- a/node/src/integration-test/kotlin/net/corda/node/flows/FlowReloadAfterCheckpointTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowReloadAfterCheckpointTest.kt @@ -34,6 +34,7 @@ import net.corda.testing.node.internal.FINANCE_CORDAPPS import net.corda.testing.node.internal.enclosedCordapp import org.junit.Test import java.sql.SQLTransientConnectionException +import java.time.Duration import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit @@ -43,6 +44,7 @@ import kotlin.test.assertTrue class FlowReloadAfterCheckpointTest { private companion object { + private val DEFAULT_TIMEOUT = Duration.ofSeconds(10) val cordapps = listOf(enclosedCordapp()) } @@ -98,15 +100,13 @@ class FlowReloadAfterCheckpointTest { @Test(timeout = 300_000) fun `flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true and be kept for observation due to failed deserialization`() { - val reloads = ConcurrentLinkedQueue() + val observations = QueueWithCountdown(1) + val reloads = QueueWithCountdown(8) FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id -> reloads.add(id) } - lateinit var flowKeptForObservation: StateMachineRunId - val lock = CountDownLatch(1) StaffedFlowHospital.onFlowKeptForOvernightObservation.add { id, _ -> - flowKeptForObservation = id - lock.countDown() + observations.add(id) } driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { @@ -121,9 +121,12 @@ class FlowReloadAfterCheckpointTest { .getOrThrow() val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), true, false, false) - val flowStartedByAlice = handle.id - lock.await() - assertEquals(flowStartedByAlice, flowKeptForObservation) + val flowStartedByAlice: StateMachineRunId = handle.id + + // We can't wait on the flow ending, because it breaks, so we need to wait on internal status changes instead + observations.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) + reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) + assertEquals(flowStartedByAlice, observations.singleOrNull()) assertEquals(4, reloads.filter { it == flowStartedByAlice }.count()) assertEquals(4, reloads.filter { it == ReloadFromCheckpointResponder.flowId }.count()) } @@ -131,7 +134,7 @@ class FlowReloadAfterCheckpointTest { @Test(timeout = 300_000) fun `flow will reload from a previous checkpoint after calling suspending function and skipping the persisting the current checkpoint when reloadCheckpointAfterSuspend is true`() { - val reloads = ConcurrentLinkedQueue() + val reloads = QueueWithCountdown(11) FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id -> reloads.add(id) } @@ -150,6 +153,7 @@ class FlowReloadAfterCheckpointTest { val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), false, false, true) val flowStartedByAlice = handle.id handle.returnValue.getOrThrow() + reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) assertEquals(5, reloads.filter { it == flowStartedByAlice }.count()) assertEquals(6, reloads.filter { it == ReloadFromCheckpointResponder.flowId }.count()) } @@ -224,13 +228,11 @@ class FlowReloadAfterCheckpointTest { @Test(timeout = 300_000) fun `flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() { - val reloads = ConcurrentLinkedQueue() + val reloads = QueueWithCountdown(5) val firstLatch = CountDownLatch(2) - val secondLatch = CountDownLatch(5) FlowStateMachineImpl.onReloadFlowFromCheckpoint = { runId -> reloads.add(runId) firstLatch.countDown() - secondLatch.countDown() } driver( DriverParameters( @@ -257,7 +259,7 @@ class FlowReloadAfterCheckpointTest { customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) ).getOrThrow() - assertTrue { secondLatch.await(20, TimeUnit.SECONDS) } + reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) assertEquals(5, reloads.size) } } @@ -266,11 +268,9 @@ class FlowReloadAfterCheckpointTest { fun `idempotent flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() { // restarts completely from the beginning and forgets the in-memory reload count therefore // it reloads an extra 2 times for checkpoints it had already reloaded before the node shutdown - val reloadsExpected = CountDownLatch(7) - val reloads = ConcurrentLinkedQueue() + val reloads = QueueWithCountdown(7) FlowStateMachineImpl.onReloadFlowFromCheckpoint = { runId -> reloads.add(runId) - reloadsExpected.countDown() } driver( DriverParameters( @@ -298,14 +298,14 @@ class FlowReloadAfterCheckpointTest { // restarts completely from the beginning and forgets the in-memory reload count therefore // it reloads an extra 2 times for checkpoints it had already reloaded before the node shutdown - assertTrue { reloadsExpected.await(20, TimeUnit.SECONDS) } + reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) assertEquals(7, reloads.size) } } @Test(timeout = 300_000) fun `more complicated flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true`() { - val reloads = ConcurrentLinkedQueue() + val reloads = QueueWithCountdown(13) FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id -> reloads.add(id) } @@ -335,7 +335,7 @@ class FlowReloadAfterCheckpointTest { .map(StateMachineTransactionMapping::stateMachineRunId) .toSet() .single() - Thread.sleep(10.seconds.toMillis()) + reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) assertEquals(7, reloads.filter { it == flowStartedByAlice }.size) assertEquals(6, reloads.filter { it == flowStartedByBob }.size) } @@ -523,4 +523,5 @@ class FlowReloadAfterCheckpointTest { internal class BrokenMap(delegate: MutableMap = mutableMapOf()) : MutableMap by delegate { override fun put(key: K, value: V): V? = throw IllegalStateException("Broken on purpose") -} \ No newline at end of file +} + diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/QueueWithCountdown.kt b/node/src/integration-test/kotlin/net/corda/node/flows/QueueWithCountdown.kt new file mode 100644 index 0000000000..068aedd9fd --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/flows/QueueWithCountdown.kt @@ -0,0 +1,28 @@ +package net.corda.node.flows + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +/** + * Helper class for waiting until another thread has put a set number of objects + * into a queue. + */ +internal class QueueWithCountdown private constructor( + count: Int = 0, + private val queue: ConcurrentLinkedQueue +) : Collection by queue { + + constructor(count: Int = 0) : this(count, ConcurrentLinkedQueue()) + + private val latch: CountDownLatch = CountDownLatch(count) + + fun add(element: E) { + queue.add(element) + latch.countDown() + } + + fun await() = latch.await() + + fun await(timeout: Long, unit: TimeUnit) = latch.await(timeout, unit) +} \ No newline at end of file