From 50d4ab6951bb4bcb6c781b4f0ec9c4388108b5c7 Mon Sep 17 00:00:00 2001 From: James Higgs <45565019+JamesHR3@users.noreply.github.com> Date: Wed, 17 Jul 2019 17:41:22 +0100 Subject: [PATCH] [CORDA-2923] Ensure the RPC connection is closed in Reconnection test (#5303) --- .../rpc/CordaRPCClientReconnectionTest.kt | 75 ++++++++++--------- 1 file changed, 41 insertions(+), 34 deletions(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientReconnectionTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientReconnectionTest.kt index 08fe413fa5..8536f0a8f1 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientReconnectionTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientReconnectionTest.kt @@ -1,5 +1,6 @@ package net.corda.client.rpc +import net.corda.client.rpc.internal.ReconnectingCordaRPCOps import net.corda.core.messaging.startTrackedFlow import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.OpaqueBytes @@ -49,21 +50,23 @@ class CordaRPCClientReconnectionTest { maxReconnectAttempts = 5 )) - val rpcOps = client.start(rpcUser.username, rpcUser.password, gracefulReconnect = true).proxy - val networkParameters = rpcOps.networkParameters - val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) - cashStatesFeed.updates.subscribe { latch.countDown() } - rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = true).proxy as ReconnectingCordaRPCOps).use { + val rpcOps = it + val networkParameters = rpcOps.networkParameters + val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) + cashStatesFeed.updates.subscribe { latch.countDown() } + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() - node.stop() - startNode() + node.stop() + startNode() - rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() - val networkParametersAfterCrash = rpcOps.networkParameters - assertThat(networkParameters).isEqualTo(networkParametersAfterCrash) - assertTrue { - latch.await(2, TimeUnit.SECONDS) + val networkParametersAfterCrash = rpcOps.networkParameters + assertThat(networkParameters).isEqualTo(networkParametersAfterCrash) + assertTrue { + latch.await(2, TimeUnit.SECONDS) + } } } } @@ -87,20 +90,22 @@ class CordaRPCClientReconnectionTest { maxReconnectAttempts = 5 )) - val rpcOps = client.start(rpcUser.username, rpcUser.password, true).proxy - val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) - val subscription = cashStatesFeed.updates.subscribe { latch.countDown() } - rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = true).proxy as ReconnectingCordaRPCOps).use { + val rpcOps = it + val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) + val subscription = cashStatesFeed.updates.subscribe { latch.countDown() } + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() - node.stop() - startNode() + node.stop() + startNode() - subscription.unsubscribe() + subscription.unsubscribe() - rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() - assertFalse { - latch.await(4, TimeUnit.SECONDS) + assertFalse { + latch.await(4, TimeUnit.SECONDS) + } } } } @@ -125,21 +130,23 @@ class CordaRPCClientReconnectionTest { maxReconnectAttempts = 5 )) - val rpcOps = client.start(rpcUser.username, rpcUser.password, true).proxy - val networkParameters = rpcOps.networkParameters - val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) - cashStatesFeed.updates.subscribe { latch.countDown() } - rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = true).proxy as ReconnectingCordaRPCOps).use { + val rpcOps = it + val networkParameters = rpcOps.networkParameters + val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) + cashStatesFeed.updates.subscribe { latch.countDown() } + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() - node.stop() - startNode(addresses[1]) + node.stop() + startNode(addresses[1]) - rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() - val networkParametersAfterCrash = rpcOps.networkParameters - assertThat(networkParameters).isEqualTo(networkParametersAfterCrash) - assertTrue { - latch.await(2, TimeUnit.SECONDS) + val networkParametersAfterCrash = rpcOps.networkParameters + assertThat(networkParameters).isEqualTo(networkParametersAfterCrash) + assertTrue { + latch.await(2, TimeUnit.SECONDS) + } } } }