From 1df29ab06f981f8e5acf6b9ecbb1616e92214248 Mon Sep 17 00:00:00 2001 From: Andrzej Cichocki Date: Fri, 12 May 2017 10:27:27 +0100 Subject: [PATCH] Make webserver reconnect to rebooted node. (#662) --- .gitignore | 3 ++- .../net/corda/client/rpc/RPCStabilityTests.kt | 27 +++++++++++++++++++ .../corda/client/rpc/internal/RPCClient.kt | 4 +++ .../kotlin/net/corda/node/driver/Driver.kt | 22 +++++++++++++++ .../kotlin/net/corda/testing/RPCDriver.kt | 4 ++- 5 files changed, 58 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 0cbd0c082a..620d52100e 100644 --- a/.gitignore +++ b/.gitignore @@ -65,6 +65,7 @@ lib/dokka.jar # IntelliJ /out/ +/classes/ # mpeltonen/sbt-idea plugin .idea_modules/ @@ -87,4 +88,4 @@ node/config/currentView # Files you may find useful to have in your working directory. PLAN NOTES -TODO \ No newline at end of file +TODO diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 5287a0de4f..5b061dc27e 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -6,6 +6,7 @@ import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.pool.KryoPool import com.google.common.util.concurrent.Futures +import net.corda.core.getOrThrow import net.corda.core.messaging.RPCOps import net.corda.core.millis import net.corda.core.random63BitValue @@ -21,6 +22,7 @@ import rx.subjects.UnicastSubject import java.time.Duration import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.assertEquals class RPCStabilityTests { @@ -57,6 +59,31 @@ class RPCStabilityTests { } } + interface ReconnectOps : RPCOps { + fun ping(): String + } + + @Test + fun `client reconnects to rebooted server`() { + rpcDriver { + val ops = object : ReconnectOps { + override val protocolVersion = 0 + override fun ping() = "pong" + } + val serverFollower = shutdownManager.follower() + val serverPort = startRpcServer(ops = ops).getOrThrow().hostAndPort + serverFollower.unfollow() + val clientFollower = shutdownManager.follower() + val client = startRpcClient(serverPort).getOrThrow() + clientFollower.unfollow() + assertEquals("pong", client.ping()) + serverFollower.shutdown() + startRpcServer(ops = ops, customPort = serverPort).getOrThrow() + assertEquals("pong", client.ping()) + clientFollower.shutdown() // Driver would do this after the new server, causing hang. + } + } + interface TrackSubscriberOps : RPCOps { fun subscribe(): Observable } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index 60d50928bd..aca902ecab 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -53,10 +53,12 @@ data class RPCClientConfiguration( val connectionRetryIntervalMultiplier: Double, /** Maximum retry interval */ val connectionMaxRetryInterval: Duration, + val maxReconnectAttempts: Int, /** Maximum file size */ val maxFileSize: Int ) { companion object { + val unlimitedReconnectAttempts = -1 @JvmStatic val default = RPCClientConfiguration( minimumServerProtocolVersion = 0, @@ -68,6 +70,7 @@ data class RPCClientConfiguration( connectionRetryInterval = 5.seconds, connectionRetryIntervalMultiplier = 1.5, connectionMaxRetryInterval = 3.minutes, + maxReconnectAttempts = unlimitedReconnectAttempts, /** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */ maxFileSize = 10485760 ) @@ -139,6 +142,7 @@ class RPCClient( retryInterval = rpcConfiguration.connectionRetryInterval.toMillis() retryIntervalMultiplier = rpcConfiguration.connectionRetryIntervalMultiplier maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval.toMillis() + reconnectAttempts = rpcConfiguration.maxReconnectAttempts minLargeMessageSize = rpcConfiguration.maxFileSize } diff --git a/node/src/main/kotlin/net/corda/node/driver/Driver.kt b/node/src/main/kotlin/net/corda/node/driver/Driver.kt index 22fc5499e7..e5cfc40980 100644 --- a/node/src/main/kotlin/net/corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/net/corda/node/driver/Driver.kt @@ -381,6 +381,28 @@ class ShutdownManager(private val executorService: ExecutorService) { } registerShutdown(processShutdown) } + + interface Follower { + fun unfollow() + fun shutdown() + } + + fun follower() = object : Follower { + private val start = state.locked { registeredShutdowns.size } + private val end = AtomicInteger(start - 1) + override fun unfollow() = end.set(state.locked { registeredShutdowns.size }) + override fun shutdown() = end.get().let { end -> + start > end && throw IllegalStateException("You haven't called unfollow.") + state.locked { + registeredShutdowns.subList(start, end).listIterator(end - start).run { + while (hasPrevious()) { + previous().getOrThrow().invoke() + set(Futures.immediateFuture {}) // Don't break other followers by doing a remove. + } + } + } + } + } } class DriverDSL( diff --git a/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt b/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt index 6822c1fa76..2b7c4932ab 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt @@ -108,6 +108,7 @@ interface RPCDriverExposedDSLInterface : DriverDSLExposedInterface { maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE, maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE, configuration: RPCServerConfiguration = RPCServerConfiguration.default, + customPort: HostAndPort? = null, ops : I ) : ListenableFuture @@ -340,9 +341,10 @@ data class RPCDriverDSL( maxFileSize: Int, maxBufferedBytesPerClient: Long, configuration: RPCServerConfiguration, + customPort: HostAndPort?, ops: I ): ListenableFuture { - val hostAndPort = driverDSL.portAllocation.nextHostAndPort() + val hostAndPort = customPort ?: driverDSL.portAllocation.nextHostAndPort() addressMustNotBeBound(driverDSL.executorService, hostAndPort) return driverDSL.executorService.submit { val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, driverDSL.driverDirectory / serverName, hostAndPort)