diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index f3ae4556ae..e3af49da13 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -26,7 +26,6 @@ import org.junit.Test import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger class P2PMessagingTest : NodeBasedTest() { @@ -105,17 +104,23 @@ class P2PMessagingTest : NodeBasedTest() { val dummyTopic = "dummy.topic" val responseMessage = "response" - simulateCrashingNode(distributedServiceNodes, dummyTopic, responseMessage) + val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage) // Send a single request with retry - val response = with(alice.network) { + val responseFuture = with(alice.network) { val request = TestRequest(replyTo = myAddress) val responseFuture = onNext(dummyTopic, request.sessionID) val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes) send(msg, serviceAddress, retryId = request.sessionID) responseFuture - }.getOrThrow(10.seconds) + } + crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS) + // The request wasn't successful. + assertThat(responseFuture.isDone).isFalse() + crashingNodes.ignoreRequests = false + // The retry should be successful. + val response = responseFuture.getOrThrow(10.seconds) assertThat(response).isEqualTo(responseMessage) } @@ -130,7 +135,7 @@ class P2PMessagingTest : NodeBasedTest() { val dummyTopic = "dummy.topic" val responseMessage = "response" - val (firstRequestReceived, requestsReceived) = simulateCrashingNode(distributedServiceNodes, dummyTopic, responseMessage) + val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage) val sessionId = random63BitValue() @@ -142,37 +147,48 @@ class P2PMessagingTest : NodeBasedTest() { } // Wait until the first request is received - firstRequestReceived.await(5, TimeUnit.SECONDS) - // Stop alice's node before the request is redelivered – the first request is ignored + crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS) + // Stop alice's node after we ensured that the first request was delivered and ignored. alice.stop() - assertThat(requestsReceived.get()).isEqualTo(1) + val numberOfRequestsReceived = crashingNodes.requestsReceived.get() + assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1) + + crashingNodes.ignoreRequests = false // Restart the node and expect a response val aliceRestarted = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow() val response = aliceRestarted.network.onNext(dummyTopic, sessionId).getOrThrow(5.seconds) - assertThat(requestsReceived.get()).isGreaterThanOrEqualTo(2) + + assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived) assertThat(response).isEqualTo(responseMessage) } + data class CrashingNodes( + val firstRequestReceived: CountDownLatch, + val requestsReceived: AtomicInteger, + var ignoreRequests: Boolean + ) + /** - * Sets up the [distributedServiceNodes] to respond to [dummyTopic] requests. The first node in the service to - * receive a request will ignore it and all subsequent requests. This simulates the scenario where a node receives - * a request message, but crashes before sending back a response. The other nodes will respond to _all_ requests. + * Sets up the [distributedServiceNodes] to respond to [dummyTopic] requests. All nodes will receive requests and + * either ignore them or respond, depending on the value of [CrashingNodes.ignoreRequests], initially set to true. + * This may be used to simulate scenarios where nodes receive request messages but crash before sending back a response. */ - private fun simulateCrashingNode(distributedServiceNodes: List, dummyTopic: String, responseMessage: String): Pair { - val firstToReceive = AtomicBoolean(true) - val requestsReceived = AtomicInteger(0) - val firstRequestReceived = CountDownLatch(1) + private fun simulateCrashingNodes(distributedServiceNodes: List, dummyTopic: String, responseMessage: String): CrashingNodes { + val crashingNodes = CrashingNodes( + requestsReceived = AtomicInteger(0), + firstRequestReceived = CountDownLatch(1), + ignoreRequests = true + ) + distributedServiceNodes.forEach { val nodeName = it.info.legalIdentity.name - var ignoreRequests = false it.network.addMessageHandler(dummyTopic) { netMessage, _ -> - requestsReceived.incrementAndGet() - firstRequestReceived.countDown() + crashingNodes.requestsReceived.incrementAndGet() + crashingNodes.firstRequestReceived.countDown() // The node which receives the first request will ignore all requests - if (firstToReceive.getAndSet(false)) ignoreRequests = true print("$nodeName: Received request - ") - if (ignoreRequests) { + if (crashingNodes.ignoreRequests) { println("ignoring") // Requests are ignored to simulate a service node crashing before sending back a response. // A retry by the client will result in the message being redelivered to another node in the service cluster. @@ -184,7 +200,7 @@ class P2PMessagingTest : NodeBasedTest() { } } } - return Pair(firstRequestReceived, requestsReceived) + return crashingNodes } private fun assertAllNodesAreUsed(participatingServiceNodes: List, serviceName: X500Name, originatingNode: Node) {