CORDA-3946 Make RetryFlowMockTest less flakey (#6786)

Co-authored-by: Dan Newton <danknewton@hotmail.com>
This commit is contained in:
Ross Nicoll 2020-10-25 17:56:36 +00:00 committed by GitHub
parent fde094721e
commit d17f536bc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,7 +2,11 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.* import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.FlowStateMachine
@ -15,9 +19,12 @@ import net.corda.node.services.FinalityHandler
import net.corda.node.services.messaging.Message import net.corda.node.services.messaging.Message
import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.nodeapi.internal.persistence.contextTransaction import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.testing.common.internal.eventually
import net.corda.testing.core.TestIdentity import net.corda.testing.core.TestIdentity
import net.corda.testing.node.internal.* import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.enclosedCordapp
import net.corda.testing.node.internal.MessagingServiceSpy
import net.corda.testing.node.internal.newContext
import net.corda.testing.node.internal.TestStartedNode
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.Assertions.assertThatThrownBy
import org.hibernate.exception.ConstraintViolationException import org.hibernate.exception.ConstraintViolationException
@ -28,7 +35,9 @@ import org.junit.Test
import java.sql.SQLException import java.sql.SQLException
import java.time.Duration import java.time.Duration
import java.util.* import java.util.*
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.CountDownLatch
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertNotNull import kotlin.test.assertNotNull
import kotlin.test.assertNull import kotlin.test.assertNull
@ -47,7 +56,6 @@ class RetryFlowMockTest {
RetryFlow.count = 0 RetryFlow.count = 0
SendAndRetryFlow.count = 0 SendAndRetryFlow.count = 0
RetryInsertFlow.count = 0 RetryInsertFlow.count = 0
KeepSendingFlow.count.set(0)
} }
private fun <T> TestStartedNode.startFlow(logic: FlowLogic<T>): CordaFuture<T> { private fun <T> TestStartedNode.startFlow(logic: FlowLogic<T>): CordaFuture<T> {
@ -89,35 +97,33 @@ class RetryFlowMockTest {
assertEquals(2, SendAndRetryFlow.count) assertEquals(2, SendAndRetryFlow.count)
} }
@Test @Test(timeout=300_000)
fun `Restart does not set senderUUID`() { fun `Restart does not set senderUUID`() {
val messagesSent = Collections.synchronizedList(mutableListOf<Message>()) val messagesSent = Collections.synchronizedList(mutableListOf<Message>())
val partyB = nodeB.info.legalIdentities.first() val partyB = nodeB.info.legalIdentities.first()
val expectedMessagesSent = CountDownLatch(3)
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() { nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() {
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) { override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
messagesSent.add(message) messagesSent.add(message)
expectedMessagesSent.countDown()
messagingService.send(message, target) messagingService.send(message, target)
} }
}) })
val count = 10000 // Lots of iterations so the flow keeps going long enough nodeA.startFlow(KeepSendingFlow(partyB))
nodeA.startFlow(KeepSendingFlow(count, partyB)) KeepSendingFlow.lock.acquire()
eventually(waitBetween = Duration.ofMillis(10)) { assertTrue(messagesSent.isNotEmpty())
assertTrue(messagesSent.isNotEmpty()) assertNotNull(messagesSent.first().senderUUID)
assertNotNull(messagesSent.first().senderUUID)
}
nodeA = mockNet.restartNode(nodeA) nodeA = mockNet.restartNode(nodeA)
// This is a bit racy because restarting the node actually starts it, so we need to make sure there's enough iterations we get here with flow still going.
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() { nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() {
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) { override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
messagesSent.add(message) messagesSent.add(message)
expectedMessagesSent.countDown()
messagingService.send(message, target) messagingService.send(message, target)
} }
}) })
// Now short circuit the iterations so the flow finishes soon. ReceiveFlow3.lock.release()
KeepSendingFlow.count.set(count - 2) assertTrue(expectedMessagesSent.await(20, TimeUnit.SECONDS))
eventually(waitBetween = Duration.ofMillis(10)) { assertEquals(3, messagesSent.size)
assertTrue(nodeA.smm.allStateMachines.isEmpty())
}
assertNull(messagesSent.last().senderUUID) assertNull(messagesSent.last().senderUUID)
} }
@ -235,32 +241,36 @@ class RetryFlowMockTest {
} }
@InitiatingFlow @InitiatingFlow
class KeepSendingFlow(private val i: Int, private val other: Party) : FlowLogic<Unit>() { class KeepSendingFlow(private val other: Party) : FlowLogic<Unit>() {
companion object { companion object {
val count = AtomicInteger(0) val lock = Semaphore(0)
} }
@Suspendable @Suspendable
override fun call() { override fun call() {
val session = initiateFlow(other) val session = initiateFlow(other)
session.send(i.toString()) session.send("boo")
do { lock.release()
logger.info("Sending... $count") session.receive<String>()
session.send("Boo") session.send("boo")
} while (count.getAndIncrement() < i)
} }
} }
@Suppress("unused") @Suppress("unused")
@InitiatedBy(KeepSendingFlow::class) @InitiatedBy(KeepSendingFlow::class)
class ReceiveFlow3(private val other: FlowSession) : FlowLogic<Unit>() { class ReceiveFlow3(private val other: FlowSession) : FlowLogic<Unit>() {
companion object {
val lock = Semaphore(0)
}
@Suspendable @Suspendable
override fun call() { override fun call() {
var count = other.receive<String>().unwrap { it.toInt() } other.receive<String>()
while (count-- > 0) { lock.acquire()
val received = other.receive<String>().unwrap { it } other.send("hoo")
logger.info("Received... $received $count") other.receive<String>()
}
} }
} }
@ -286,4 +296,27 @@ class RetryFlowMockTest {
contextTransaction.session.save(tx) contextTransaction.session.save(tx)
} }
} }
@InitiatingFlow
class UnbalancedSendAndReceiveFlow(private val other: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(other)
session.send("boo")
session.receive<String>()
session.receive<String>()
}
}
@Suppress("unused")
@InitiatedBy(UnbalancedSendAndReceiveFlow::class)
class UnbalancedSendAndReceiveResponder(private val other: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
other.receive<String>()
other.send("hoo")
}
}
} }