CORDA-3948 Make KillFlowTest less flakey (#6606)

`KillFlowTest` is failing quite often. This is probably due to issues in ordering when taking and releasing locks. By using `CountDownLatch` in places instead of `Semaphore`s should reduce the likelihood of tests failing.
This commit is contained in:
Dan Newton 2020-08-10 16:59:28 +01:00 committed by GitHub
parent 8aafb1db4a
commit c191960cb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -36,9 +36,10 @@ import net.corda.testing.driver.OutOfProcess
import net.corda.testing.driver.driver
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import org.assertj.core.api.Assertions
import org.junit.Ignore
import org.junit.Test
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import kotlin.system.measureTimeMillis
@ -61,8 +62,7 @@ class KillFlowTest {
assertFailsWith<KilledFlowException> {
handle.returnValue.getOrThrow(1.minutes)
}
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
}
}
}
@ -71,30 +71,45 @@ class KillFlowTest {
fun `a killed flow will propagate the killed error to counter parties when it reaches the next suspension point`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val (alice, bob, charlie) = listOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME)
.map { startNode(providedName = it) }
.transpose()
.getOrThrow()
.map { startNode(providedName = it) }
.transpose()
.getOrThrow()
alice.rpc.let { rpc ->
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks[BOB_NAME] = CountDownLatch(1)
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks[CHARLIE_NAME] = CountDownLatch(1)
val handle = rpc.startFlow(
::AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriends,
listOf(bob.nodeInfo.singleIdentity(), charlie.nodeInfo.singleIdentity())
)
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriends.lockA.acquire()
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks.forEach { it.value.acquire() }
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks.forEach { (_, lock) ->
lock.await(30, TimeUnit.SECONDS)
}
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks[BOB_NAME] = CountDownLatch(1)
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks[CHARLIE_NAME] = CountDownLatch(1)
rpc.killFlow(handle.id)
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriends.lockB.release()
assertFailsWith<KilledFlowException> {
handle.returnValue.getOrThrow(1.minutes)
}
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks.forEach { it.value.acquire() }
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks.forEach { (_, lock) ->
lock.await(30, TimeUnit.SECONDS)
}
assertTrue(AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[BOB_NAME]!!)
assertTrue(AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[CHARLIE_NAME]!!)
val aliceCheckpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, aliceCheckpoints)
val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, bobCheckpoints)
val charlieCheckpoints = charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, charlieCheckpoints)
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
}
}
}
@ -114,8 +129,7 @@ class KillFlowTest {
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
}
}
}
@ -124,9 +138,9 @@ class KillFlowTest {
fun `killing a flow suspended in send + receive + sendAndReceive ends the flow immediately`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = false)) {
val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
.map { startNode(providedName = it) }
.transpose()
.getOrThrow()
.map { startNode(providedName = it) }
.transpose()
.getOrThrow()
val bobParty = bob.nodeInfo.singleIdentity()
bob.stop()
val terminated = (bob as OutOfProcess).process.waitFor(30, TimeUnit.SECONDS)
@ -153,8 +167,7 @@ class KillFlowTest {
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
val checkpoints = startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
assertEquals(1, startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
}
@Test(timeout = 300_000)
@ -172,8 +185,7 @@ class KillFlowTest {
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
}
}
}
@ -193,43 +205,45 @@ class KillFlowTest {
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
}
}
}
@Ignore("CORDA-3948: Disabled pending availability of engineers to diagnose")
@Test(timeout = 300_000)
fun `a killed flow will propagate the killed error to counter parties if it was suspended`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val (alice, bob, charlie) = listOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME)
.map { startNode(providedName = it) }
.transpose()
.getOrThrow()
.map { startNode(providedName = it) }
.transpose()
.getOrThrow()
alice.rpc.let { rpc ->
AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks[BOB_NAME] = CountDownLatch(1)
AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks[CHARLIE_NAME] = CountDownLatch(1)
val handle = rpc.startFlow(
::AFlowThatGetsMurderedAndSomehowKillsItsFriends,
listOf(bob.nodeInfo.singleIdentity(), charlie.nodeInfo.singleIdentity())
)
AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks.forEach {
it.value.acquire()
}
AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks.forEach { (_, lock) -> lock.await(30, TimeUnit.SECONDS) }
AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks[BOB_NAME] = CountDownLatch(1)
AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks[CHARLIE_NAME] = CountDownLatch(1)
rpc.killFlow(handle.id)
assertFailsWith<KilledFlowException> {
handle.returnValue.getOrThrow(20.seconds)
}
AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks.forEach {
it.value.acquire()
}
AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks.forEach { (_, lock) -> lock.await(30, TimeUnit.SECONDS) }
assertTrue(AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[BOB_NAME]!!)
assertTrue(AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[CHARLIE_NAME]!!)
val aliceCheckpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, aliceCheckpoints)
val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, bobCheckpoints)
val charlieCheckpoints = charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, charlieCheckpoints)
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
}
}
}
@ -238,15 +252,21 @@ class KillFlowTest {
fun `a killed initiated flow will propagate the killed error to the initiator and its counter parties`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val (alice, bob, charlie) = listOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME)
.map { startNode(providedName = it) }
.transpose()
.getOrThrow()
.map { startNode(providedName = it) }
.transpose()
.getOrThrow()
AFlowThatGetsMurderedByItsFriendResponder.locks[BOB_NAME] = CountDownLatch(1)
AFlowThatGetsMurderedByItsFriendResponder.locks[CHARLIE_NAME] = CountDownLatch(1)
val handle = alice.rpc.startFlow(
::AFlowThatGetsMurderedByItsFriend,
listOf(bob.nodeInfo.singleIdentity(), charlie.nodeInfo.singleIdentity())
)
AFlowThatGetsMurderedByItsFriendResponder.locks.forEach { it.value.acquire() }
AFlowThatGetsMurderedByItsFriendResponder.locks.forEach { (_, lock) -> lock.await(30, TimeUnit.SECONDS) }
AFlowThatGetsMurderedByItsFriendResponder.locks[CHARLIE_NAME] = CountDownLatch(1)
val initiatedFlowId = AFlowThatGetsMurderedByItsFriendResponder.flowIds[BOB_NAME]!!
@ -255,16 +275,15 @@ class KillFlowTest {
assertFailsWith<UnexpectedFlowEndException> {
handle.returnValue.getOrThrow(1.minutes)
}
AFlowThatGetsMurderedByItsFriendResponder.locks[CHARLIE_NAME]!!.acquire()
AFlowThatGetsMurderedByItsFriendResponder.locks[CHARLIE_NAME]!!.await(30, TimeUnit.SECONDS)
assertTrue(AFlowThatGetsMurderedByItsFriend.receivedKilledException)
assertFalse(AFlowThatGetsMurderedByItsFriendResponder.receivedKilledExceptions[BOB_NAME]!!)
assertTrue(AFlowThatGetsMurderedByItsFriendResponder.receivedKilledExceptions[CHARLIE_NAME]!!)
val aliceCheckpoints = alice.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, aliceCheckpoints)
val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, bobCheckpoints)
val charlieCheckpoints = charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, charlieCheckpoints)
assertEquals(1, alice.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
}
}
@ -352,14 +371,11 @@ class KillFlowTest {
FlowLogic<Unit>() {
companion object {
val locks = mapOf(
BOB_NAME to Semaphore(0),
CHARLIE_NAME to Semaphore(0)
)
var receivedKilledExceptions = mutableMapOf(
BOB_NAME to false,
CHARLIE_NAME to false
)
val locks = ConcurrentHashMap<CordaX500Name, CountDownLatch>()
var receivedKilledExceptions = ConcurrentHashMap<CordaX500Name, Boolean>().apply {
this[BOB_NAME] = false
this[CHARLIE_NAME] = false
}
}
@Suspendable
@ -367,12 +383,12 @@ class KillFlowTest {
session.receive<String>()
session.send("hi")
session.receive<String>()
locks[ourIdentity.name]!!.release()
locks[ourIdentity.name]!!.countDown()
try {
session.receive<String>()
} catch (e: UnexpectedFlowEndException) {
receivedKilledExceptions[ourIdentity.name] = true
locks[ourIdentity.name]!!.release()
locks[ourIdentity.name]!!.countDown()
throw e
}
}
@ -489,14 +505,11 @@ class KillFlowTest {
class AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder(private val session: FlowSession) : FlowLogic<Unit>() {
companion object {
val locks = mapOf(
BOB_NAME to Semaphore(0),
CHARLIE_NAME to Semaphore(0)
)
var receivedKilledExceptions = mutableMapOf(
BOB_NAME to false,
CHARLIE_NAME to false
)
val locks = ConcurrentHashMap<CordaX500Name, CountDownLatch>()
var receivedKilledExceptions = ConcurrentHashMap<CordaX500Name, Boolean>().apply {
this[BOB_NAME] = false
this[CHARLIE_NAME] = false
}
}
@Suspendable
@ -504,12 +517,12 @@ class KillFlowTest {
session.receive<String>()
session.send("hi")
session.receive<String>()
locks[ourIdentity.name]!!.release()
locks[ourIdentity.name]!!.countDown()
try {
session.receive<String>()
} catch (e: UnexpectedFlowEndException) {
receivedKilledExceptions[ourIdentity.name] = true
locks[ourIdentity.name]!!.release()
locks[ourIdentity.name]!!.countDown()
throw e
}
}
@ -548,15 +561,12 @@ class KillFlowTest {
class AFlowThatGetsMurderedByItsFriendResponder(private val session: FlowSession) : FlowLogic<Unit>() {
companion object {
val locks = mapOf(
BOB_NAME to Semaphore(0),
CHARLIE_NAME to Semaphore(0)
)
var receivedKilledExceptions = mutableMapOf(
BOB_NAME to false,
CHARLIE_NAME to false
)
var flowIds = mutableMapOf<CordaX500Name, StateMachineRunId>()
val locks = ConcurrentHashMap<CordaX500Name, CountDownLatch>()
var receivedKilledExceptions = ConcurrentHashMap<CordaX500Name, Boolean>().apply {
this[BOB_NAME] = false
this[CHARLIE_NAME] = false
}
var flowIds = ConcurrentHashMap<CordaX500Name, StateMachineRunId>()
}
@Suspendable
@ -565,12 +575,12 @@ class KillFlowTest {
session.receive<String>()
session.send("hi")
session.receive<String>()
locks[ourIdentity.name]!!.release()
locks[ourIdentity.name]!!.countDown()
try {
session.receive<String>()
} catch (e: UnexpectedFlowEndException) {
receivedKilledExceptions[ourIdentity.name] = true
locks[ourIdentity.name]!!.release()
locks[ourIdentity.name]!!.countDown()
throw e
}
}