ENT-5669 Improve robustness of FlowReloadAfterCheckpointTest (#6627)

Improve robustness of `FlowReloadAfterCheckpointTest` by adding a countdown latch to observe for when the reloads should have finished.
This commit is contained in:
Ross Nicoll 2020-08-13 15:04:52 +01:00 committed by GitHub
parent a7ea8df9a7
commit bf53e47f0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 20 deletions

View File

@ -34,6 +34,7 @@ import net.corda.testing.node.internal.FINANCE_CORDAPPS
import net.corda.testing.node.internal.enclosedCordapp import net.corda.testing.node.internal.enclosedCordapp
import org.junit.Test import org.junit.Test
import java.sql.SQLTransientConnectionException import java.sql.SQLTransientConnectionException
import java.time.Duration
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -43,6 +44,7 @@ import kotlin.test.assertTrue
class FlowReloadAfterCheckpointTest { class FlowReloadAfterCheckpointTest {
private companion object { private companion object {
private val DEFAULT_TIMEOUT = Duration.ofSeconds(10)
val cordapps = listOf(enclosedCordapp()) val cordapps = listOf(enclosedCordapp())
} }
@ -98,15 +100,13 @@ class FlowReloadAfterCheckpointTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true and be kept for observation due to failed deserialization`() { fun `flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true and be kept for observation due to failed deserialization`() {
val reloads = ConcurrentLinkedQueue<StateMachineRunId>() val observations = QueueWithCountdown<StateMachineRunId>(1)
val reloads = QueueWithCountdown<StateMachineRunId>(8)
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id -> FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
reloads.add(id) reloads.add(id)
} }
lateinit var flowKeptForObservation: StateMachineRunId
val lock = CountDownLatch(1)
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { id, _ -> StaffedFlowHospital.onFlowKeptForOvernightObservation.add { id, _ ->
flowKeptForObservation = id observations.add(id)
lock.countDown()
} }
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
@ -121,9 +121,12 @@ class FlowReloadAfterCheckpointTest {
.getOrThrow() .getOrThrow()
val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), true, false, false) val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), true, false, false)
val flowStartedByAlice = handle.id val flowStartedByAlice: StateMachineRunId = handle.id
lock.await()
assertEquals(flowStartedByAlice, flowKeptForObservation) // We can't wait on the flow ending, because it breaks, so we need to wait on internal status changes instead
observations.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
assertEquals(flowStartedByAlice, observations.singleOrNull())
assertEquals(4, reloads.filter { it == flowStartedByAlice }.count()) assertEquals(4, reloads.filter { it == flowStartedByAlice }.count())
assertEquals(4, reloads.filter { it == ReloadFromCheckpointResponder.flowId }.count()) assertEquals(4, reloads.filter { it == ReloadFromCheckpointResponder.flowId }.count())
} }
@ -131,7 +134,7 @@ class FlowReloadAfterCheckpointTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `flow will reload from a previous checkpoint after calling suspending function and skipping the persisting the current checkpoint when reloadCheckpointAfterSuspend is true`() { fun `flow will reload from a previous checkpoint after calling suspending function and skipping the persisting the current checkpoint when reloadCheckpointAfterSuspend is true`() {
val reloads = ConcurrentLinkedQueue<StateMachineRunId>() val reloads = QueueWithCountdown<StateMachineRunId>(11)
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id -> FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
reloads.add(id) reloads.add(id)
} }
@ -150,6 +153,7 @@ class FlowReloadAfterCheckpointTest {
val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), false, false, true) val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), false, false, true)
val flowStartedByAlice = handle.id val flowStartedByAlice = handle.id
handle.returnValue.getOrThrow() handle.returnValue.getOrThrow()
reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
assertEquals(5, reloads.filter { it == flowStartedByAlice }.count()) assertEquals(5, reloads.filter { it == flowStartedByAlice }.count())
assertEquals(6, reloads.filter { it == ReloadFromCheckpointResponder.flowId }.count()) assertEquals(6, reloads.filter { it == ReloadFromCheckpointResponder.flowId }.count())
} }
@ -224,13 +228,11 @@ class FlowReloadAfterCheckpointTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() { fun `flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() {
val reloads = ConcurrentLinkedQueue<StateMachineRunId>() val reloads = QueueWithCountdown<StateMachineRunId>(5)
val firstLatch = CountDownLatch(2) val firstLatch = CountDownLatch(2)
val secondLatch = CountDownLatch(5)
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { runId -> FlowStateMachineImpl.onReloadFlowFromCheckpoint = { runId ->
reloads.add(runId) reloads.add(runId)
firstLatch.countDown() firstLatch.countDown()
secondLatch.countDown()
} }
driver( driver(
DriverParameters( DriverParameters(
@ -257,7 +259,7 @@ class FlowReloadAfterCheckpointTest {
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true) customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
).getOrThrow() ).getOrThrow()
assertTrue { secondLatch.await(20, TimeUnit.SECONDS) } reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
assertEquals(5, reloads.size) assertEquals(5, reloads.size)
} }
} }
@ -266,11 +268,9 @@ class FlowReloadAfterCheckpointTest {
fun `idempotent flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() { fun `idempotent flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() {
// restarts completely from the beginning and forgets the in-memory reload count therefore // restarts completely from the beginning and forgets the in-memory reload count therefore
// it reloads an extra 2 times for checkpoints it had already reloaded before the node shutdown // it reloads an extra 2 times for checkpoints it had already reloaded before the node shutdown
val reloadsExpected = CountDownLatch(7) val reloads = QueueWithCountdown<StateMachineRunId>(7)
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { runId -> FlowStateMachineImpl.onReloadFlowFromCheckpoint = { runId ->
reloads.add(runId) reloads.add(runId)
reloadsExpected.countDown()
} }
driver( driver(
DriverParameters( DriverParameters(
@ -298,14 +298,14 @@ class FlowReloadAfterCheckpointTest {
// restarts completely from the beginning and forgets the in-memory reload count therefore // restarts completely from the beginning and forgets the in-memory reload count therefore
// it reloads an extra 2 times for checkpoints it had already reloaded before the node shutdown // it reloads an extra 2 times for checkpoints it had already reloaded before the node shutdown
assertTrue { reloadsExpected.await(20, TimeUnit.SECONDS) } reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
assertEquals(7, reloads.size) assertEquals(7, reloads.size)
} }
} }
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `more complicated flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true`() { fun `more complicated flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true`() {
val reloads = ConcurrentLinkedQueue<StateMachineRunId>() val reloads = QueueWithCountdown<StateMachineRunId>(13)
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id -> FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
reloads.add(id) reloads.add(id)
} }
@ -335,7 +335,7 @@ class FlowReloadAfterCheckpointTest {
.map(StateMachineTransactionMapping::stateMachineRunId) .map(StateMachineTransactionMapping::stateMachineRunId)
.toSet() .toSet()
.single() .single()
Thread.sleep(10.seconds.toMillis()) reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
assertEquals(7, reloads.filter { it == flowStartedByAlice }.size) assertEquals(7, reloads.filter { it == flowStartedByAlice }.size)
assertEquals(6, reloads.filter { it == flowStartedByBob }.size) assertEquals(6, reloads.filter { it == flowStartedByBob }.size)
} }
@ -524,3 +524,4 @@ class FlowReloadAfterCheckpointTest {
internal class BrokenMap<K, V>(delegate: MutableMap<K, V> = mutableMapOf()) : MutableMap<K, V> by delegate { internal class BrokenMap<K, V>(delegate: MutableMap<K, V> = mutableMapOf()) : MutableMap<K, V> by delegate {
override fun put(key: K, value: V): V? = throw IllegalStateException("Broken on purpose") override fun put(key: K, value: V): V? = throw IllegalStateException("Broken on purpose")
} }

View File

@ -0,0 +1,28 @@
package net.corda.node.flows
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
/**
* Helper class for waiting until another thread has put a set number of objects
* into a queue.
*/
internal class QueueWithCountdown<E> private constructor(
count: Int = 0,
private val queue: ConcurrentLinkedQueue<E>
) : Collection<E> by queue {
constructor(count: Int = 0) : this(count, ConcurrentLinkedQueue<E>())
private val latch: CountDownLatch = CountDownLatch(count)
fun add(element: E) {
queue.add(element)
latch.countDown()
}
fun await() = latch.await()
fun await(timeout: Long, unit: TimeUnit) = latch.await(timeout, unit)
}