diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/KillFlowTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/KillFlowTest.kt index f29248d161..0404412fe4 100644 --- a/node/src/integration-test/kotlin/net/corda/node/flows/KillFlowTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/flows/KillFlowTest.kt @@ -36,9 +36,10 @@ import net.corda.testing.driver.OutOfProcess import net.corda.testing.driver.driver import net.corda.testing.node.internal.FINANCE_CORDAPPS import org.assertj.core.api.Assertions -import org.junit.Ignore import org.junit.Test import java.time.Duration +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CountDownLatch import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit import kotlin.system.measureTimeMillis @@ -61,8 +62,7 @@ class KillFlowTest { assertFailsWith { handle.returnValue.getOrThrow(1.minutes) } - val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, checkpoints) + assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) } } } @@ -71,30 +71,45 @@ class KillFlowTest { fun `a killed flow will propagate the killed error to counter parties when it reaches the next suspension point`() { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { val (alice, bob, charlie) = listOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME) - .map { startNode(providedName = it) } - .transpose() - .getOrThrow() + .map { startNode(providedName = it) } + .transpose() + .getOrThrow() alice.rpc.let { rpc -> + + AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks[BOB_NAME] = CountDownLatch(1) + AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks[CHARLIE_NAME] = CountDownLatch(1) + val handle = rpc.startFlow( ::AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriends, listOf(bob.nodeInfo.singleIdentity(), charlie.nodeInfo.singleIdentity()) ) + AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriends.lockA.acquire() - AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks.forEach { it.value.acquire() } + + AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks.forEach { (_, lock) -> + lock.await(30, TimeUnit.SECONDS) + } + + AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks[BOB_NAME] = CountDownLatch(1) + AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks[CHARLIE_NAME] = CountDownLatch(1) + rpc.killFlow(handle.id) + AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriends.lockB.release() + assertFailsWith { handle.returnValue.getOrThrow(1.minutes) } - AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks.forEach { it.value.acquire() } + + AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks.forEach { (_, lock) -> + lock.await(30, TimeUnit.SECONDS) + } + assertTrue(AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[BOB_NAME]!!) assertTrue(AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[CHARLIE_NAME]!!) - val aliceCheckpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, aliceCheckpoints) - val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, bobCheckpoints) - val charlieCheckpoints = charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, charlieCheckpoints) + assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) + assertEquals(1, bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) + assertEquals(1, charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) } } } @@ -114,8 +129,7 @@ class KillFlowTest { } assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow") assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow") - val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, checkpoints) + assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) } } } @@ -124,9 +138,9 @@ class KillFlowTest { fun `killing a flow suspended in send + receive + sendAndReceive ends the flow immediately`() { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = false)) { val (alice, bob) = listOf(ALICE_NAME, BOB_NAME) - .map { startNode(providedName = it) } - .transpose() - .getOrThrow() + .map { startNode(providedName = it) } + .transpose() + .getOrThrow() val bobParty = bob.nodeInfo.singleIdentity() bob.stop() val terminated = (bob as OutOfProcess).process.waitFor(30, TimeUnit.SECONDS) @@ -153,8 +167,7 @@ class KillFlowTest { } assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow") assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow") - val checkpoints = startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, checkpoints) + assertEquals(1, startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) } @Test(timeout = 300_000) @@ -172,8 +185,7 @@ class KillFlowTest { } assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow") assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow") - val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, checkpoints) + assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) } } } @@ -193,43 +205,45 @@ class KillFlowTest { } assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow") assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow") - val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, checkpoints) + assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) } } } - @Ignore("CORDA-3948: Disabled pending availability of engineers to diagnose") @Test(timeout = 300_000) fun `a killed flow will propagate the killed error to counter parties if it was suspended`() { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { val (alice, bob, charlie) = listOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME) - .map { startNode(providedName = it) } - .transpose() - .getOrThrow() + .map { startNode(providedName = it) } + .transpose() + .getOrThrow() alice.rpc.let { rpc -> + + AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks[BOB_NAME] = CountDownLatch(1) + AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks[CHARLIE_NAME] = CountDownLatch(1) + val handle = rpc.startFlow( ::AFlowThatGetsMurderedAndSomehowKillsItsFriends, listOf(bob.nodeInfo.singleIdentity(), charlie.nodeInfo.singleIdentity()) ) - AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks.forEach { - it.value.acquire() - } + + AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks.forEach { (_, lock) -> lock.await(30, TimeUnit.SECONDS) } + + AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks[BOB_NAME] = CountDownLatch(1) + AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks[CHARLIE_NAME] = CountDownLatch(1) + rpc.killFlow(handle.id) assertFailsWith { handle.returnValue.getOrThrow(20.seconds) } - AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks.forEach { - it.value.acquire() - } + + AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks.forEach { (_, lock) -> lock.await(30, TimeUnit.SECONDS) } + assertTrue(AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[BOB_NAME]!!) assertTrue(AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[CHARLIE_NAME]!!) - val aliceCheckpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, aliceCheckpoints) - val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, bobCheckpoints) - val charlieCheckpoints = charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, charlieCheckpoints) + assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) + assertEquals(1, bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) + assertEquals(1, charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) } } } @@ -238,15 +252,21 @@ class KillFlowTest { fun `a killed initiated flow will propagate the killed error to the initiator and its counter parties`() { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { val (alice, bob, charlie) = listOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME) - .map { startNode(providedName = it) } - .transpose() - .getOrThrow() + .map { startNode(providedName = it) } + .transpose() + .getOrThrow() + + AFlowThatGetsMurderedByItsFriendResponder.locks[BOB_NAME] = CountDownLatch(1) + AFlowThatGetsMurderedByItsFriendResponder.locks[CHARLIE_NAME] = CountDownLatch(1) + val handle = alice.rpc.startFlow( ::AFlowThatGetsMurderedByItsFriend, listOf(bob.nodeInfo.singleIdentity(), charlie.nodeInfo.singleIdentity()) ) - AFlowThatGetsMurderedByItsFriendResponder.locks.forEach { it.value.acquire() } + AFlowThatGetsMurderedByItsFriendResponder.locks.forEach { (_, lock) -> lock.await(30, TimeUnit.SECONDS) } + + AFlowThatGetsMurderedByItsFriendResponder.locks[CHARLIE_NAME] = CountDownLatch(1) val initiatedFlowId = AFlowThatGetsMurderedByItsFriendResponder.flowIds[BOB_NAME]!! @@ -255,16 +275,15 @@ class KillFlowTest { assertFailsWith { handle.returnValue.getOrThrow(1.minutes) } - AFlowThatGetsMurderedByItsFriendResponder.locks[CHARLIE_NAME]!!.acquire() + + AFlowThatGetsMurderedByItsFriendResponder.locks[CHARLIE_NAME]!!.await(30, TimeUnit.SECONDS) + assertTrue(AFlowThatGetsMurderedByItsFriend.receivedKilledException) assertFalse(AFlowThatGetsMurderedByItsFriendResponder.receivedKilledExceptions[BOB_NAME]!!) assertTrue(AFlowThatGetsMurderedByItsFriendResponder.receivedKilledExceptions[CHARLIE_NAME]!!) - val aliceCheckpoints = alice.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, aliceCheckpoints) - val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, bobCheckpoints) - val charlieCheckpoints = charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) - assertEquals(1, charlieCheckpoints) + assertEquals(1, alice.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) + assertEquals(1, bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) + assertEquals(1, charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) } } @@ -352,14 +371,11 @@ class KillFlowTest { FlowLogic() { companion object { - val locks = mapOf( - BOB_NAME to Semaphore(0), - CHARLIE_NAME to Semaphore(0) - ) - var receivedKilledExceptions = mutableMapOf( - BOB_NAME to false, - CHARLIE_NAME to false - ) + val locks = ConcurrentHashMap() + var receivedKilledExceptions = ConcurrentHashMap().apply { + this[BOB_NAME] = false + this[CHARLIE_NAME] = false + } } @Suspendable @@ -367,12 +383,12 @@ class KillFlowTest { session.receive() session.send("hi") session.receive() - locks[ourIdentity.name]!!.release() + locks[ourIdentity.name]!!.countDown() try { session.receive() } catch (e: UnexpectedFlowEndException) { receivedKilledExceptions[ourIdentity.name] = true - locks[ourIdentity.name]!!.release() + locks[ourIdentity.name]!!.countDown() throw e } } @@ -489,14 +505,11 @@ class KillFlowTest { class AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder(private val session: FlowSession) : FlowLogic() { companion object { - val locks = mapOf( - BOB_NAME to Semaphore(0), - CHARLIE_NAME to Semaphore(0) - ) - var receivedKilledExceptions = mutableMapOf( - BOB_NAME to false, - CHARLIE_NAME to false - ) + val locks = ConcurrentHashMap() + var receivedKilledExceptions = ConcurrentHashMap().apply { + this[BOB_NAME] = false + this[CHARLIE_NAME] = false + } } @Suspendable @@ -504,12 +517,12 @@ class KillFlowTest { session.receive() session.send("hi") session.receive() - locks[ourIdentity.name]!!.release() + locks[ourIdentity.name]!!.countDown() try { session.receive() } catch (e: UnexpectedFlowEndException) { receivedKilledExceptions[ourIdentity.name] = true - locks[ourIdentity.name]!!.release() + locks[ourIdentity.name]!!.countDown() throw e } } @@ -548,15 +561,12 @@ class KillFlowTest { class AFlowThatGetsMurderedByItsFriendResponder(private val session: FlowSession) : FlowLogic() { companion object { - val locks = mapOf( - BOB_NAME to Semaphore(0), - CHARLIE_NAME to Semaphore(0) - ) - var receivedKilledExceptions = mutableMapOf( - BOB_NAME to false, - CHARLIE_NAME to false - ) - var flowIds = mutableMapOf() + val locks = ConcurrentHashMap() + var receivedKilledExceptions = ConcurrentHashMap().apply { + this[BOB_NAME] = false + this[CHARLIE_NAME] = false + } + var flowIds = ConcurrentHashMap() } @Suspendable @@ -565,12 +575,12 @@ class KillFlowTest { session.receive() session.send("hi") session.receive() - locks[ourIdentity.name]!!.release() + locks[ourIdentity.name]!!.countDown() try { session.receive() } catch (e: UnexpectedFlowEndException) { receivedKilledExceptions[ourIdentity.name] = true - locks[ourIdentity.name]!!.release() + locks[ourIdentity.name]!!.countDown() throw e } }