CORDA-3946 Make RetryFlowMockTest less flakey (#6570)

This commit is contained in:
Dan Newton 2020-08-05 13:10:35 +01:00 committed by GitHub
parent d60feb1138
commit f216853c3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -9,6 +9,7 @@ import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.KilledFlowException import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.FlowStateMachine
@ -22,7 +23,6 @@ import net.corda.node.services.FinalityHandler
import net.corda.node.services.messaging.Message import net.corda.node.services.messaging.Message
import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.nodeapi.internal.persistence.contextTransaction import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.testing.common.internal.eventually
import net.corda.testing.core.TestIdentity import net.corda.testing.core.TestIdentity
import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.MessagingServiceSpy import net.corda.testing.node.internal.MessagingServiceSpy
@ -30,6 +30,7 @@ import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.enclosedCordapp import net.corda.testing.node.internal.enclosedCordapp
import net.corda.testing.node.internal.newContext import net.corda.testing.node.internal.newContext
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.h2.util.Utils import org.h2.util.Utils
import org.junit.After import org.junit.After
import org.junit.Assert.assertTrue import org.junit.Assert.assertTrue
@ -39,7 +40,9 @@ import java.sql.SQLException
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.Collections import java.util.Collections
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.CountDownLatch
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull import kotlin.test.assertNotNull
@ -59,7 +62,6 @@ class RetryFlowMockTest {
RetryFlow.count = 0 RetryFlow.count = 0
SendAndRetryFlow.count = 0 SendAndRetryFlow.count = 0
RetryInsertFlow.count = 0 RetryInsertFlow.count = 0
KeepSendingFlow.count.set(0)
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> t is LimitedRetryCausingError } StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> t is LimitedRetryCausingError }
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> t is RetryCausingError } StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> t is RetryCausingError }
} }
@ -97,37 +99,43 @@ class RetryFlowMockTest {
} }
@Test(timeout=300_000) @Test(timeout=300_000)
fun `Restart does not set senderUUID and early end session message does not hang receiving flow`() { fun `Restart does not set senderUUID`() {
val messagesSent = Collections.synchronizedList(mutableListOf<Message>()) val messagesSent = Collections.synchronizedList(mutableListOf<Message>())
val partyB = nodeB.info.legalIdentities.first() val partyB = nodeB.info.legalIdentities.first()
val expectedMessagesSent = CountDownLatch(3)
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() { nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() {
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) { override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
messagesSent.add(message) messagesSent.add(message)
expectedMessagesSent.countDown()
messagingService.send(message, target) messagingService.send(message, target)
} }
}) })
val count = 10000 // Lots of iterations so the flow keeps going long enough nodeA.startFlow(KeepSendingFlow(partyB))
nodeA.startFlow(KeepSendingFlow(count, partyB)) KeepSendingFlow.lock.acquire()
eventually(duration = Duration.ofSeconds(30), waitBetween = Duration.ofMillis(100)) { assertTrue(messagesSent.isNotEmpty())
assertTrue(messagesSent.isNotEmpty()) assertNotNull(messagesSent.first().senderUUID)
assertNotNull(messagesSent.first().senderUUID)
}
nodeA = mockNet.restartNode(nodeA) nodeA = mockNet.restartNode(nodeA)
// This is a bit racy because restarting the node actually starts it, so we need to make sure there's enough iterations we get here with flow still going.
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() { nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() {
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) { override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
messagesSent.add(message) messagesSent.add(message)
expectedMessagesSent.countDown()
messagingService.send(message, target) messagingService.send(message, target)
} }
}) })
// Now short circuit the iterations so the flow finishes soon. ReceiveFlow3.lock.release()
KeepSendingFlow.count.set(count - 2) assertTrue(expectedMessagesSent.await(20, TimeUnit.SECONDS))
eventually(duration = Duration.ofSeconds(30), waitBetween = Duration.ofMillis(100)) { assertEquals(3, messagesSent.size)
assertTrue(nodeA.smm.allStateMachines.isEmpty())
}
assertNull(messagesSent.last().senderUUID) assertNull(messagesSent.last().senderUUID)
} }
@Test(timeout=300_000)
fun `Early end session message does not hang receiving flow`() {
val partyB = nodeB.info.legalIdentities.first()
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
nodeA.startFlow(UnbalancedSendAndReceiveFlow(partyB)).getOrThrow(20.seconds)
}.withMessage("Received session end message instead of a data session message. Mismatched send and receive?")
}
@Test(timeout=300_000) @Test(timeout=300_000)
fun `Retry duplicate insert`() { fun `Retry duplicate insert`() {
assertEquals(Unit, nodeA.startFlow(RetryInsertFlow(1)).get()) assertEquals(Unit, nodeA.startFlow(RetryInsertFlow(1)).get())
@ -253,32 +261,36 @@ class RetryFlowMockTest {
} }
@InitiatingFlow @InitiatingFlow
class KeepSendingFlow(private val i: Int, private val other: Party) : FlowLogic<Unit>() { class KeepSendingFlow(private val other: Party) : FlowLogic<Unit>() {
companion object { companion object {
val count = AtomicInteger(0) val lock = Semaphore(0)
} }
@Suspendable @Suspendable
override fun call() { override fun call() {
val session = initiateFlow(other) val session = initiateFlow(other)
session.send(i.toString()) session.send("boo")
do { lock.release()
logger.info("Sending... $count") session.receive<String>()
session.send("Boo") session.send("boo")
} while (count.getAndIncrement() < i)
} }
} }
@Suppress("unused") @Suppress("unused")
@InitiatedBy(KeepSendingFlow::class) @InitiatedBy(KeepSendingFlow::class)
class ReceiveFlow3(private val other: FlowSession) : FlowLogic<Unit>() { class ReceiveFlow3(private val other: FlowSession) : FlowLogic<Unit>() {
companion object {
val lock = Semaphore(0)
}
@Suspendable @Suspendable
override fun call() { override fun call() {
var count = other.receive<String>().unwrap { it.toInt() } other.receive<String>()
while (count-- > 0) { lock.acquire()
val received = other.receive<String>().unwrap { it } other.send("hoo")
logger.info("Received... $received $count") other.receive<String>()
}
} }
} }
@ -305,4 +317,27 @@ class RetryFlowMockTest {
contextTransaction.session.save(tx) contextTransaction.session.save(tx)
} }
} }
@InitiatingFlow
class UnbalancedSendAndReceiveFlow(private val other: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(other)
session.send("boo")
session.receive<String>()
session.receive<String>()
}
}
@Suppress("unused")
@InitiatedBy(UnbalancedSendAndReceiveFlow::class)
class UnbalancedSendAndReceiveResponder(private val other: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
other.receive<String>()
other.send("hoo")
}
}
} }