Merge pull request #1355 from corda/aslemmer-fix-flaky-retry-test

Fix flaky distributed retry test
This commit is contained in:
Andras Slemmer
2017-09-01 10:55:42 +01:00
committed by GitHub

View File

@ -26,7 +26,6 @@ import org.junit.Test
import java.util.* import java.util.*
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
class P2PMessagingTest : NodeBasedTest() { class P2PMessagingTest : NodeBasedTest() {
@ -105,17 +104,23 @@ class P2PMessagingTest : NodeBasedTest() {
val dummyTopic = "dummy.topic" val dummyTopic = "dummy.topic"
val responseMessage = "response" val responseMessage = "response"
simulateCrashingNode(distributedServiceNodes, dummyTopic, responseMessage) val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage)
// Send a single request with retry // Send a single request with retry
val response = with(alice.network) { val responseFuture = with(alice.network) {
val request = TestRequest(replyTo = myAddress) val request = TestRequest(replyTo = myAddress)
val responseFuture = onNext<Any>(dummyTopic, request.sessionID) val responseFuture = onNext<Any>(dummyTopic, request.sessionID)
val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes) val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes)
send(msg, serviceAddress, retryId = request.sessionID) send(msg, serviceAddress, retryId = request.sessionID)
responseFuture responseFuture
}.getOrThrow(10.seconds) }
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
// The request wasn't successful.
assertThat(responseFuture.isDone).isFalse()
crashingNodes.ignoreRequests = false
// The retry should be successful.
val response = responseFuture.getOrThrow(10.seconds)
assertThat(response).isEqualTo(responseMessage) assertThat(response).isEqualTo(responseMessage)
} }
@ -130,7 +135,7 @@ class P2PMessagingTest : NodeBasedTest() {
val dummyTopic = "dummy.topic" val dummyTopic = "dummy.topic"
val responseMessage = "response" val responseMessage = "response"
val (firstRequestReceived, requestsReceived) = simulateCrashingNode(distributedServiceNodes, dummyTopic, responseMessage) val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage)
val sessionId = random63BitValue() val sessionId = random63BitValue()
@ -142,37 +147,48 @@ class P2PMessagingTest : NodeBasedTest() {
} }
// Wait until the first request is received // Wait until the first request is received
firstRequestReceived.await(5, TimeUnit.SECONDS) crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
// Stop alice's node before the request is redelivered the first request is ignored // Stop alice's node after we ensured that the first request was delivered and ignored.
alice.stop() alice.stop()
assertThat(requestsReceived.get()).isEqualTo(1) val numberOfRequestsReceived = crashingNodes.requestsReceived.get()
assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1)
crashingNodes.ignoreRequests = false
// Restart the node and expect a response // Restart the node and expect a response
val aliceRestarted = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow() val aliceRestarted = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
val response = aliceRestarted.network.onNext<Any>(dummyTopic, sessionId).getOrThrow(5.seconds) val response = aliceRestarted.network.onNext<Any>(dummyTopic, sessionId).getOrThrow(5.seconds)
assertThat(requestsReceived.get()).isGreaterThanOrEqualTo(2)
assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived)
assertThat(response).isEqualTo(responseMessage) assertThat(response).isEqualTo(responseMessage)
} }
data class CrashingNodes(
val firstRequestReceived: CountDownLatch,
val requestsReceived: AtomicInteger,
var ignoreRequests: Boolean
)
/** /**
* Sets up the [distributedServiceNodes] to respond to [dummyTopic] requests. The first node in the service to * Sets up the [distributedServiceNodes] to respond to [dummyTopic] requests. All nodes will receive requests and
* receive a request will ignore it and all subsequent requests. This simulates the scenario where a node receives * either ignore them or respond, depending on the value of [CrashingNodes.ignoreRequests], initially set to true.
* a request message, but crashes before sending back a response. The other nodes will respond to _all_ requests. * This may be used to simulate scenarios where nodes receive request messages but crash before sending back a response.
*/ */
private fun simulateCrashingNode(distributedServiceNodes: List<Node>, dummyTopic: String, responseMessage: String): Pair<CountDownLatch, AtomicInteger> { private fun simulateCrashingNodes(distributedServiceNodes: List<Node>, dummyTopic: String, responseMessage: String): CrashingNodes {
val firstToReceive = AtomicBoolean(true) val crashingNodes = CrashingNodes(
val requestsReceived = AtomicInteger(0) requestsReceived = AtomicInteger(0),
val firstRequestReceived = CountDownLatch(1) firstRequestReceived = CountDownLatch(1),
ignoreRequests = true
)
distributedServiceNodes.forEach { distributedServiceNodes.forEach {
val nodeName = it.info.legalIdentity.name val nodeName = it.info.legalIdentity.name
var ignoreRequests = false
it.network.addMessageHandler(dummyTopic) { netMessage, _ -> it.network.addMessageHandler(dummyTopic) { netMessage, _ ->
requestsReceived.incrementAndGet() crashingNodes.requestsReceived.incrementAndGet()
firstRequestReceived.countDown() crashingNodes.firstRequestReceived.countDown()
// The node which receives the first request will ignore all requests // The node which receives the first request will ignore all requests
if (firstToReceive.getAndSet(false)) ignoreRequests = true
print("$nodeName: Received request - ") print("$nodeName: Received request - ")
if (ignoreRequests) { if (crashingNodes.ignoreRequests) {
println("ignoring") println("ignoring")
// Requests are ignored to simulate a service node crashing before sending back a response. // Requests are ignored to simulate a service node crashing before sending back a response.
// A retry by the client will result in the message being redelivered to another node in the service cluster. // A retry by the client will result in the message being redelivered to another node in the service cluster.
@ -184,7 +200,7 @@ class P2PMessagingTest : NodeBasedTest() {
} }
} }
} }
return Pair(firstRequestReceived, requestsReceived) return crashingNodes
} }
private fun assertAllNodesAreUsed(participatingServiceNodes: List<Node>, serviceName: X500Name, originatingNode: Node) { private fun assertAllNodesAreUsed(participatingServiceNodes: List<Node>, serviceName: X500Name, originatingNode: Node) {