mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
Fix flaky distributed retry test
This commit is contained in:
@ -26,7 +26,6 @@ import org.junit.Test
|
|||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.CountDownLatch
|
import java.util.concurrent.CountDownLatch
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
class P2PMessagingTest : NodeBasedTest() {
|
class P2PMessagingTest : NodeBasedTest() {
|
||||||
@ -105,17 +104,23 @@ class P2PMessagingTest : NodeBasedTest() {
|
|||||||
val dummyTopic = "dummy.topic"
|
val dummyTopic = "dummy.topic"
|
||||||
val responseMessage = "response"
|
val responseMessage = "response"
|
||||||
|
|
||||||
simulateCrashingNode(distributedServiceNodes, dummyTopic, responseMessage)
|
val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage)
|
||||||
|
|
||||||
// Send a single request with retry
|
// Send a single request with retry
|
||||||
val response = with(alice.network) {
|
val responseFuture = with(alice.network) {
|
||||||
val request = TestRequest(replyTo = myAddress)
|
val request = TestRequest(replyTo = myAddress)
|
||||||
val responseFuture = onNext<Any>(dummyTopic, request.sessionID)
|
val responseFuture = onNext<Any>(dummyTopic, request.sessionID)
|
||||||
val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes)
|
val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes)
|
||||||
send(msg, serviceAddress, retryId = request.sessionID)
|
send(msg, serviceAddress, retryId = request.sessionID)
|
||||||
responseFuture
|
responseFuture
|
||||||
}.getOrThrow(10.seconds)
|
}
|
||||||
|
crashingNodes.firstRequestReceived.await()
|
||||||
|
// The request wasn't successful.
|
||||||
|
assertThat(responseFuture.isDone).isFalse()
|
||||||
|
crashingNodes.ignoreRequests = false
|
||||||
|
|
||||||
|
// The retry should be successful.
|
||||||
|
val response = responseFuture.getOrThrow(10.seconds)
|
||||||
assertThat(response).isEqualTo(responseMessage)
|
assertThat(response).isEqualTo(responseMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,7 +135,7 @@ class P2PMessagingTest : NodeBasedTest() {
|
|||||||
val dummyTopic = "dummy.topic"
|
val dummyTopic = "dummy.topic"
|
||||||
val responseMessage = "response"
|
val responseMessage = "response"
|
||||||
|
|
||||||
val (firstRequestReceived, requestsReceived) = simulateCrashingNode(distributedServiceNodes, dummyTopic, responseMessage)
|
val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage)
|
||||||
|
|
||||||
val sessionId = random63BitValue()
|
val sessionId = random63BitValue()
|
||||||
|
|
||||||
@ -142,37 +147,48 @@ class P2PMessagingTest : NodeBasedTest() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wait until the first request is received
|
// Wait until the first request is received
|
||||||
firstRequestReceived.await(5, TimeUnit.SECONDS)
|
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
|
||||||
// Stop alice's node before the request is redelivered – the first request is ignored
|
// Stop alice's node before the request is redelivered – the first request is ignored
|
||||||
alice.stop()
|
alice.stop()
|
||||||
assertThat(requestsReceived.get()).isEqualTo(1)
|
val numberOfRequestsReceived = crashingNodes.requestsReceived.get()
|
||||||
|
assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1)
|
||||||
|
|
||||||
|
crashingNodes.ignoreRequests = false
|
||||||
|
|
||||||
// Restart the node and expect a response
|
// Restart the node and expect a response
|
||||||
val aliceRestarted = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
|
val aliceRestarted = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
|
||||||
val response = aliceRestarted.network.onNext<Any>(dummyTopic, sessionId).getOrThrow(5.seconds)
|
val response = aliceRestarted.network.onNext<Any>(dummyTopic, sessionId).getOrThrow(5.seconds)
|
||||||
assertThat(requestsReceived.get()).isGreaterThanOrEqualTo(2)
|
|
||||||
|
assertThat(crashingNodes.requestsReceived.get()).isGreaterThanOrEqualTo(numberOfRequestsReceived + 1)
|
||||||
assertThat(response).isEqualTo(responseMessage)
|
assertThat(response).isEqualTo(responseMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
data class CrashingNodes(
|
||||||
|
val firstRequestReceived: CountDownLatch,
|
||||||
|
val requestsReceived: AtomicInteger,
|
||||||
|
var ignoreRequests: Boolean
|
||||||
|
)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets up the [distributedServiceNodes] to respond to [dummyTopic] requests. The first node in the service to
|
* Sets up the [distributedServiceNodes] to respond to [dummyTopic] requests. All nodes will receive requests and
|
||||||
* receive a request will ignore it and all subsequent requests. This simulates the scenario where a node receives
|
* either ignore them or respond, depending on the value of [CrashingNodes.ignoreRequests], initially set to true.
|
||||||
* a request message, but crashes before sending back a response. The other nodes will respond to _all_ requests.
|
* This may be used to simulate scenarios where nodes receive request messages but crash before sending back a response.
|
||||||
*/
|
*/
|
||||||
private fun simulateCrashingNode(distributedServiceNodes: List<Node>, dummyTopic: String, responseMessage: String): Pair<CountDownLatch, AtomicInteger> {
|
private fun simulateCrashingNodes(distributedServiceNodes: List<Node>, dummyTopic: String, responseMessage: String): CrashingNodes {
|
||||||
val firstToReceive = AtomicBoolean(true)
|
val crashingNodes = CrashingNodes(
|
||||||
val requestsReceived = AtomicInteger(0)
|
requestsReceived = AtomicInteger(0),
|
||||||
val firstRequestReceived = CountDownLatch(1)
|
firstRequestReceived = CountDownLatch(1),
|
||||||
|
ignoreRequests = true
|
||||||
|
)
|
||||||
|
|
||||||
distributedServiceNodes.forEach {
|
distributedServiceNodes.forEach {
|
||||||
val nodeName = it.info.legalIdentity.name
|
val nodeName = it.info.legalIdentity.name
|
||||||
var ignoreRequests = false
|
|
||||||
it.network.addMessageHandler(dummyTopic) { netMessage, _ ->
|
it.network.addMessageHandler(dummyTopic) { netMessage, _ ->
|
||||||
requestsReceived.incrementAndGet()
|
crashingNodes.requestsReceived.incrementAndGet()
|
||||||
firstRequestReceived.countDown()
|
crashingNodes.firstRequestReceived.countDown()
|
||||||
// The node which receives the first request will ignore all requests
|
// The node which receives the first request will ignore all requests
|
||||||
if (firstToReceive.getAndSet(false)) ignoreRequests = true
|
|
||||||
print("$nodeName: Received request - ")
|
print("$nodeName: Received request - ")
|
||||||
if (ignoreRequests) {
|
if (crashingNodes.ignoreRequests) {
|
||||||
println("ignoring")
|
println("ignoring")
|
||||||
// Requests are ignored to simulate a service node crashing before sending back a response.
|
// Requests are ignored to simulate a service node crashing before sending back a response.
|
||||||
// A retry by the client will result in the message being redelivered to another node in the service cluster.
|
// A retry by the client will result in the message being redelivered to another node in the service cluster.
|
||||||
@ -184,7 +200,7 @@ class P2PMessagingTest : NodeBasedTest() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Pair(firstRequestReceived, requestsReceived)
|
return crashingNodes
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun assertAllNodesAreUsed(participatingServiceNodes: List<Node>, serviceName: X500Name, originatingNode: Node) {
|
private fun assertAllNodesAreUsed(participatingServiceNodes: List<Node>, serviceName: X500Name, originatingNode: Node) {
|
||||||
|
Reference in New Issue
Block a user