CORDA-3943 Remove sleeps from flow reload tests (#6567)

* Remove use of Thread.sleep() FROM FlowReloadAfterCheckpointTest, instead relying on CountdownLatch to wait until the target number has been hit or a timeout occurs, so the thread can continue as soon as the target is hit.
* Replace use of hashmaps to a concurrent queue, to mitigate risk of complex threading issues.
This commit is contained in:
Ross Nicoll 2020-08-05 21:10:11 +01:00 committed by GitHub
parent 01451d57b9
commit fd374bfc6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -34,9 +34,11 @@ import net.corda.testing.node.internal.FINANCE_CORDAPPS
import net.corda.testing.node.internal.enclosedCordapp
import org.junit.Test
import java.sql.SQLTransientConnectionException
import java.util.concurrent.Semaphore
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue
class FlowReloadAfterCheckpointTest {
@ -46,9 +48,9 @@ class FlowReloadAfterCheckpointTest {
@Test(timeout = 300_000)
fun `flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true`() {
val reloadCounts = mutableMapOf<StateMachineRunId, Int>()
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 }
reloads.add(id)
}
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
@ -65,16 +67,16 @@ class FlowReloadAfterCheckpointTest {
val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), false, false, false)
val flowStartedByAlice = handle.id
handle.returnValue.getOrThrow()
assertEquals(5, reloadCounts[flowStartedByAlice])
assertEquals(6, reloadCounts[ReloadFromCheckpointResponder.flowId])
assertEquals(5, reloads.filter { it == flowStartedByAlice }.count())
assertEquals(6, reloads.filter { it == ReloadFromCheckpointResponder.flowId }.count())
}
}
@Test(timeout = 300_000)
fun `flow will not reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is false`() {
val reloadCounts = mutableMapOf<StateMachineRunId, Int>()
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 }
reloads.add(id)
}
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
@ -89,24 +91,22 @@ class FlowReloadAfterCheckpointTest {
.getOrThrow()
val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), false, false, false)
val flowStartedByAlice = handle.id
handle.returnValue.getOrThrow()
assertNull(reloadCounts[flowStartedByAlice])
assertNull(reloadCounts[ReloadFromCheckpointResponder.flowId])
assertEquals(0, reloads.size)
}
}
@Test(timeout = 300_000)
fun `flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true and be kept for observation due to failed deserialization`() {
val reloadCounts = mutableMapOf<StateMachineRunId, Int>()
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 }
reloads.add(id)
}
lateinit var flowKeptForObservation: StateMachineRunId
val lock = Semaphore(0)
val lock = CountDownLatch(1)
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { id, _ ->
flowKeptForObservation = id
lock.release()
lock.countDown()
}
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
@ -122,18 +122,18 @@ class FlowReloadAfterCheckpointTest {
val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), true, false, false)
val flowStartedByAlice = handle.id
lock.acquire()
lock.await()
assertEquals(flowStartedByAlice, flowKeptForObservation)
assertEquals(4, reloadCounts[flowStartedByAlice])
assertEquals(4, reloadCounts[ReloadFromCheckpointResponder.flowId])
assertEquals(4, reloads.filter { it == flowStartedByAlice }.count())
assertEquals(4, reloads.filter { it == ReloadFromCheckpointResponder.flowId }.count())
}
}
@Test(timeout = 300_000)
fun `flow will reload from a previous checkpoint after calling suspending function and skipping the persisting the current checkpoint when reloadCheckpointAfterSuspend is true`() {
val reloadCounts = mutableMapOf<StateMachineRunId, Int>()
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 }
reloads.add(id)
}
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
@ -150,8 +150,8 @@ class FlowReloadAfterCheckpointTest {
val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), false, false, true)
val flowStartedByAlice = handle.id
handle.returnValue.getOrThrow()
assertEquals(5, reloadCounts[flowStartedByAlice])
assertEquals(6, reloadCounts[ReloadFromCheckpointResponder.flowId])
assertEquals(5, reloads.filter { it == flowStartedByAlice }.count())
assertEquals(6, reloads.filter { it == ReloadFromCheckpointResponder.flowId }.count())
}
}
@ -189,8 +189,8 @@ class FlowReloadAfterCheckpointTest {
@Test(timeout = 300_000)
fun `timed flow will reload from initial checkpoint after calling a suspending function when reloadCheckpointAfterSuspend is true`() {
var reloadCount = 0
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 }
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { runId -> reloads.add(runId) }
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val alice = startNode(
@ -199,14 +199,14 @@ class FlowReloadAfterCheckpointTest {
).getOrThrow()
alice.rpc.startFlow(::MyTimedFlow).returnValue.getOrThrow()
assertEquals(5, reloadCount)
assertEquals(5, reloads.size)
}
}
@Test(timeout = 300_000)
fun `flow will correctly retry after an error when reloadCheckpointAfterSuspend is true`() {
var reloadCount = 0
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 }
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { runId -> reloads.add(runId) }
var timesDischarged = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ -> timesDischarged += 1 }
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
@ -217,15 +217,21 @@ class FlowReloadAfterCheckpointTest {
).getOrThrow()
alice.rpc.startFlow(::TransientConnectionFailureFlow).returnValue.getOrThrow()
assertEquals(5, reloadCount)
assertEquals(5, reloads.size)
assertEquals(3, timesDischarged)
}
}
@Test(timeout = 300_000)
fun `flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() {
var reloadCount = 0
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 }
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
val firstLatch = CountDownLatch(2)
val secondLatch = CountDownLatch(5)
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { runId ->
reloads.add(runId)
firstLatch.countDown()
secondLatch.countDown()
}
driver(
DriverParameters(
inMemoryDB = false,
@ -241,25 +247,31 @@ class FlowReloadAfterCheckpointTest {
).getOrThrow()
alice.rpc.startFlow(::MyHospitalizingFlow)
Thread.sleep(10.seconds.toMillis())
assertTrue { firstLatch.await(10, TimeUnit.SECONDS) }
alice.stop()
assertEquals(2, reloads.size)
// Set up a new latch
startNode(
providedName = ALICE_NAME,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
).getOrThrow()
Thread.sleep(20.seconds.toMillis())
assertEquals(5, reloadCount)
assertTrue { secondLatch.await(20, TimeUnit.SECONDS) }
assertEquals(5, reloads.size)
}
}
@Test(timeout = 300_000)
fun `idempotent flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() {
var reloadCount = 0
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { _ -> reloadCount += 1 }
// restarts completely from the beginning and forgets the in-memory reload count therefore
// it reloads an extra 2 times for checkpoints it had already reloaded before the node shutdown
val reloadsExpected = CountDownLatch(7)
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { runId ->
reloads.add(runId)
reloadsExpected.countDown()
}
driver(
DriverParameters(
inMemoryDB = false,
@ -284,19 +296,18 @@ class FlowReloadAfterCheckpointTest {
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
).getOrThrow()
Thread.sleep(20.seconds.toMillis())
// restarts completely from the beginning and forgets the in-memory reload count therefore
// it reloads an extra 2 times for checkpoints it had already reloaded before the node shutdown
assertEquals(7, reloadCount)
assertTrue { reloadsExpected.await(20, TimeUnit.SECONDS) }
assertEquals(7, reloads.size)
}
}
@Test(timeout = 300_000)
fun `more complicated flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true`() {
val reloadCounts = mutableMapOf<StateMachineRunId, Int>()
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
reloadCounts.compute(id) { _, value -> value?.plus(1) ?: 1 }
reloads.add(id)
}
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = FINANCE_CORDAPPS)) {
@ -325,8 +336,8 @@ class FlowReloadAfterCheckpointTest {
.toSet()
.single()
Thread.sleep(10.seconds.toMillis())
assertEquals(7, reloadCounts[flowStartedByAlice])
assertEquals(6, reloadCounts[flowStartedByBob])
assertEquals(7, reloads.filter { it == flowStartedByAlice }.size)
assertEquals(6, reloads.filter { it == flowStartedByBob }.size)
}
}