From f216853c3f6c305e228bf169ed3f3cd772a86dd7 Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Wed, 5 Aug 2020 13:10:35 +0100 Subject: [PATCH] CORDA-3946 Make `RetryFlowMockTest` less flakey (#6570) --- .../statemachine/RetryFlowMockTest.kt | 91 +++++++++++++------ 1 file changed, 63 insertions(+), 28 deletions(-) diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index 061345efe7..ecaa28f0fe 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -9,6 +9,7 @@ import net.corda.core.flows.FlowSession import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.KilledFlowException +import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.FlowStateMachine @@ -22,7 +23,6 @@ import net.corda.node.services.FinalityHandler import net.corda.node.services.messaging.Message import net.corda.node.services.persistence.DBTransactionStorage import net.corda.nodeapi.internal.persistence.contextTransaction -import net.corda.testing.common.internal.eventually import net.corda.testing.core.TestIdentity import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.MessagingServiceSpy @@ -30,6 +30,7 @@ import net.corda.testing.node.internal.TestStartedNode import net.corda.testing.node.internal.enclosedCordapp import net.corda.testing.node.internal.newContext import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.h2.util.Utils import org.junit.After import org.junit.Assert.assertTrue @@ -39,7 +40,9 @@ import java.sql.SQLException import java.time.Duration import java.time.Instant import java.util.Collections -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Semaphore +import java.util.concurrent.TimeUnit import kotlin.test.assertEquals import kotlin.test.assertFailsWith import kotlin.test.assertNotNull @@ -59,7 +62,6 @@ class RetryFlowMockTest { RetryFlow.count = 0 SendAndRetryFlow.count = 0 RetryInsertFlow.count = 0 - KeepSendingFlow.count.set(0) StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> t is LimitedRetryCausingError } StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> t is RetryCausingError } } @@ -97,37 +99,43 @@ class RetryFlowMockTest { } @Test(timeout=300_000) - fun `Restart does not set senderUUID and early end session message does not hang receiving flow`() { + fun `Restart does not set senderUUID`() { val messagesSent = Collections.synchronizedList(mutableListOf()) val partyB = nodeB.info.legalIdentities.first() + val expectedMessagesSent = CountDownLatch(3) nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() { override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) { messagesSent.add(message) + expectedMessagesSent.countDown() messagingService.send(message, target) } }) - val count = 10000 // Lots of iterations so the flow keeps going long enough - nodeA.startFlow(KeepSendingFlow(count, partyB)) - eventually(duration = Duration.ofSeconds(30), waitBetween = Duration.ofMillis(100)) { - assertTrue(messagesSent.isNotEmpty()) - assertNotNull(messagesSent.first().senderUUID) - } + nodeA.startFlow(KeepSendingFlow(partyB)) + KeepSendingFlow.lock.acquire() + assertTrue(messagesSent.isNotEmpty()) + assertNotNull(messagesSent.first().senderUUID) nodeA = mockNet.restartNode(nodeA) - // This is a bit racy because restarting the node actually starts it, so we need to make sure there's enough iterations we get here with flow still going. nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() { override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) { messagesSent.add(message) + expectedMessagesSent.countDown() messagingService.send(message, target) } }) - // Now short circuit the iterations so the flow finishes soon. - KeepSendingFlow.count.set(count - 2) - eventually(duration = Duration.ofSeconds(30), waitBetween = Duration.ofMillis(100)) { - assertTrue(nodeA.smm.allStateMachines.isEmpty()) - } + ReceiveFlow3.lock.release() + assertTrue(expectedMessagesSent.await(20, TimeUnit.SECONDS)) + assertEquals(3, messagesSent.size) assertNull(messagesSent.last().senderUUID) } + @Test(timeout=300_000) + fun `Early end session message does not hang receiving flow`() { + val partyB = nodeB.info.legalIdentities.first() + assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy { + nodeA.startFlow(UnbalancedSendAndReceiveFlow(partyB)).getOrThrow(20.seconds) + }.withMessage("Received session end message instead of a data session message. Mismatched send and receive?") + } + @Test(timeout=300_000) fun `Retry duplicate insert`() { assertEquals(Unit, nodeA.startFlow(RetryInsertFlow(1)).get()) @@ -253,32 +261,36 @@ class RetryFlowMockTest { } @InitiatingFlow - class KeepSendingFlow(private val i: Int, private val other: Party) : FlowLogic() { + class KeepSendingFlow(private val other: Party) : FlowLogic() { + companion object { - val count = AtomicInteger(0) + val lock = Semaphore(0) } @Suspendable override fun call() { val session = initiateFlow(other) - session.send(i.toString()) - do { - logger.info("Sending... $count") - session.send("Boo") - } while (count.getAndIncrement() < i) + session.send("boo") + lock.release() + session.receive() + session.send("boo") } } @Suppress("unused") @InitiatedBy(KeepSendingFlow::class) class ReceiveFlow3(private val other: FlowSession) : FlowLogic() { + + companion object { + val lock = Semaphore(0) + } + @Suspendable override fun call() { - var count = other.receive().unwrap { it.toInt() } - while (count-- > 0) { - val received = other.receive().unwrap { it } - logger.info("Received... $received $count") - } + other.receive() + lock.acquire() + other.send("hoo") + other.receive() } } @@ -305,4 +317,27 @@ class RetryFlowMockTest { contextTransaction.session.save(tx) } } + + @InitiatingFlow + class UnbalancedSendAndReceiveFlow(private val other: Party) : FlowLogic() { + + @Suspendable + override fun call() { + val session = initiateFlow(other) + session.send("boo") + session.receive() + session.receive() + } + } + + @Suppress("unused") + @InitiatedBy(UnbalancedSendAndReceiveFlow::class) + class UnbalancedSendAndReceiveResponder(private val other: FlowSession) : FlowLogic() { + + @Suspendable + override fun call() { + other.receive() + other.send("hoo") + } + } }