From 721728c8abc99c5ed7dc3fa1e6856c84c6c7b25a Mon Sep 17 00:00:00 2001 From: Thomas Schroeter Date: Tue, 13 Mar 2018 10:01:44 +0000 Subject: [PATCH] Terminate observables on RPC connection failures (#2770) --- .../net/corda/client/rpc/RPCStabilityTests.kt | 40 +++++++++++++++++++ .../rpc/internal/RPCClientProxyHandler.kt | 12 ++++-- 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 5025507f9c..d616f34bb3 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -316,6 +316,46 @@ class RPCStabilityTests { } } + interface NoOps : RPCOps { + fun subscribe(): Observable + } + + @Test + fun `observables error when connection breaks`() { + rpcDriver { + val ops = object : NoOps { + override val protocolVersion = 0 + override fun subscribe(): Observable { + return PublishSubject.create() + } + } + val serverFollower = shutdownManager.follower() + val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! + serverFollower.unfollow() + + val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1) + val clientFollower = shutdownManager.follower() + val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() + clientFollower.unfollow() + + var terminateHandlerCalled = false + var errorHandlerCalled = false + val subscription = client.subscribe() + .doOnTerminate{ terminateHandlerCalled = true } + .doOnError { errorHandlerCalled = true } + .subscribe() + + serverFollower.shutdown() + Thread.sleep(100) + + assertTrue(terminateHandlerCalled) + assertTrue(errorHandlerCalled) + assertTrue(subscription.isUnsubscribed) + + clientFollower.shutdown() // Driver would do this after the new server, causing hang. + } + } + interface ThreadOps : RPCOps { fun sendMessage(id: Int, msgNo: Int): String } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index f8cbc370ad..274dd63941 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -408,6 +408,14 @@ class RPCClientProxyHandler( when (event) { FailoverEventType.FAILURE_DETECTED -> { log.warn("RPC server unavailable. RPC calls are being buffered.") + log.warn("Terminating observables.") + val m = observableContext.observableMap.asMap() + m.keys.forEach { k -> + observationExecutorPool.run(k) { + m[k]?.onError(RPCException("Connection failure detected.")) + } + } + observableContext.observableMap.invalidateAll() } FailoverEventType.FAILOVER_COMPLETED -> { @@ -422,8 +430,6 @@ class RPCClientProxyHandler( "will throw an RPCException.") rpcReplyMap.forEach { id, replyFuture -> replyFuture.setException(RPCException("Could not re-connect to RPC server. Failover failed.")) - val observable = observableContext.observableMap.getIfPresent(id) - observable?.onError(RPCException("Could not re-connect to RPC server. Failover failed.")) } outgoingRequestBuffer.clear() rpcReplyMap.clear() @@ -507,4 +513,4 @@ object RpcClientObservableSerializer : Serializer>() { val rpcRequestOrObservableId = kryo.context[RPCApi.RpcRequestOrObservableIdKey] as InvocationId return observableContext.callSiteMap?.get(rpcRequestOrObservableId) } -} \ No newline at end of file +}