From 75cd76b09baf98cb72845d15bff3248113d988bb Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Wed, 30 Aug 2017 11:57:03 +0100 Subject: [PATCH 1/2] Fix flaky distributed retry test --- .../services/messaging/P2PMessagingTest.kt | 58 ++++++++++++------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index f3ae4556ae..887419c2cb 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -26,7 +26,6 @@ import org.junit.Test import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger class P2PMessagingTest : NodeBasedTest() { @@ -105,17 +104,23 @@ class P2PMessagingTest : NodeBasedTest() { val dummyTopic = "dummy.topic" val responseMessage = "response" - simulateCrashingNode(distributedServiceNodes, dummyTopic, responseMessage) + val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage) // Send a single request with retry - val response = with(alice.network) { + val responseFuture = with(alice.network) { val request = TestRequest(replyTo = myAddress) val responseFuture = onNext(dummyTopic, request.sessionID) val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes) send(msg, serviceAddress, retryId = request.sessionID) responseFuture - }.getOrThrow(10.seconds) + } + crashingNodes.firstRequestReceived.await() + // The request wasn't successful. + assertThat(responseFuture.isDone).isFalse() + crashingNodes.ignoreRequests = false + // The retry should be successful. + val response = responseFuture.getOrThrow(10.seconds) assertThat(response).isEqualTo(responseMessage) } @@ -130,7 +135,7 @@ class P2PMessagingTest : NodeBasedTest() { val dummyTopic = "dummy.topic" val responseMessage = "response" - val (firstRequestReceived, requestsReceived) = simulateCrashingNode(distributedServiceNodes, dummyTopic, responseMessage) + val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage) val sessionId = random63BitValue() @@ -142,37 +147,48 @@ class P2PMessagingTest : NodeBasedTest() { } // Wait until the first request is received - firstRequestReceived.await(5, TimeUnit.SECONDS) + crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS) // Stop alice's node before the request is redelivered – the first request is ignored alice.stop() - assertThat(requestsReceived.get()).isEqualTo(1) + val numberOfRequestsReceived = crashingNodes.requestsReceived.get() + assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1) + + crashingNodes.ignoreRequests = false // Restart the node and expect a response val aliceRestarted = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow() val response = aliceRestarted.network.onNext(dummyTopic, sessionId).getOrThrow(5.seconds) - assertThat(requestsReceived.get()).isGreaterThanOrEqualTo(2) + + assertThat(crashingNodes.requestsReceived.get()).isGreaterThanOrEqualTo(numberOfRequestsReceived + 1) assertThat(response).isEqualTo(responseMessage) } + data class CrashingNodes( + val firstRequestReceived: CountDownLatch, + val requestsReceived: AtomicInteger, + var ignoreRequests: Boolean + ) + /** - * Sets up the [distributedServiceNodes] to respond to [dummyTopic] requests. The first node in the service to - * receive a request will ignore it and all subsequent requests. This simulates the scenario where a node receives - * a request message, but crashes before sending back a response. The other nodes will respond to _all_ requests. + * Sets up the [distributedServiceNodes] to respond to [dummyTopic] requests. All nodes will receive requests and + * either ignore them or respond, depending on the value of [CrashingNodes.ignoreRequests], initially set to true. + * This may be used to simulate scenarios where nodes receive request messages but crash before sending back a response. */ - private fun simulateCrashingNode(distributedServiceNodes: List, dummyTopic: String, responseMessage: String): Pair { - val firstToReceive = AtomicBoolean(true) - val requestsReceived = AtomicInteger(0) - val firstRequestReceived = CountDownLatch(1) + private fun simulateCrashingNodes(distributedServiceNodes: List, dummyTopic: String, responseMessage: String): CrashingNodes { + val crashingNodes = CrashingNodes( + requestsReceived = AtomicInteger(0), + firstRequestReceived = CountDownLatch(1), + ignoreRequests = true + ) + distributedServiceNodes.forEach { val nodeName = it.info.legalIdentity.name - var ignoreRequests = false it.network.addMessageHandler(dummyTopic) { netMessage, _ -> - requestsReceived.incrementAndGet() - firstRequestReceived.countDown() + crashingNodes.requestsReceived.incrementAndGet() + crashingNodes.firstRequestReceived.countDown() // The node which receives the first request will ignore all requests - if (firstToReceive.getAndSet(false)) ignoreRequests = true print("$nodeName: Received request - ") - if (ignoreRequests) { + if (crashingNodes.ignoreRequests) { println("ignoring") // Requests are ignored to simulate a service node crashing before sending back a response. // A retry by the client will result in the message being redelivered to another node in the service cluster. @@ -184,7 +200,7 @@ class P2PMessagingTest : NodeBasedTest() { } } } - return Pair(firstRequestReceived, requestsReceived) + return crashingNodes } private fun assertAllNodesAreUsed(participatingServiceNodes: List, serviceName: X500Name, originatingNode: Node) { From 9891fb58b0680eeae95201de3849d172588c8402 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Wed, 30 Aug 2017 17:51:19 +0100 Subject: [PATCH 2/2] Address comments --- .../kotlin/net/corda/services/messaging/P2PMessagingTest.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index 887419c2cb..e3af49da13 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -114,7 +114,7 @@ class P2PMessagingTest : NodeBasedTest() { send(msg, serviceAddress, retryId = request.sessionID) responseFuture } - crashingNodes.firstRequestReceived.await() + crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS) // The request wasn't successful. assertThat(responseFuture.isDone).isFalse() crashingNodes.ignoreRequests = false @@ -148,7 +148,7 @@ class P2PMessagingTest : NodeBasedTest() { // Wait until the first request is received crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS) - // Stop alice's node before the request is redelivered – the first request is ignored + // Stop alice's node after we ensured that the first request was delivered and ignored. alice.stop() val numberOfRequestsReceived = crashingNodes.requestsReceived.get() assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1) @@ -159,7 +159,7 @@ class P2PMessagingTest : NodeBasedTest() { val aliceRestarted = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow() val response = aliceRestarted.network.onNext(dummyTopic, sessionId).getOrThrow(5.seconds) - assertThat(crashingNodes.requestsReceived.get()).isGreaterThanOrEqualTo(numberOfRequestsReceived + 1) + assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived) assertThat(response).isEqualTo(responseMessage) }