Terminate observables on RPC connection failures (#2770)

This commit is contained in:
Thomas Schroeter 2018-03-13 10:01:44 +00:00 committed by GitHub
parent 568f7d7c17
commit 721728c8ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 3 deletions

View File

@ -316,6 +316,46 @@ class RPCStabilityTests {
}
}
interface NoOps : RPCOps {
fun subscribe(): Observable<Nothing>
}
@Test
fun `observables error when connection breaks`() {
rpcDriver {
val ops = object : NoOps {
override val protocolVersion = 0
override fun subscribe(): Observable<Nothing> {
return PublishSubject.create<Nothing>()
}
}
val serverFollower = shutdownManager.follower()
val serverPort = startRpcServer<NoOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1)
val clientFollower = shutdownManager.follower()
val client = startRpcClient<NoOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow()
var terminateHandlerCalled = false
var errorHandlerCalled = false
val subscription = client.subscribe()
.doOnTerminate{ terminateHandlerCalled = true }
.doOnError { errorHandlerCalled = true }
.subscribe()
serverFollower.shutdown()
Thread.sleep(100)
assertTrue(terminateHandlerCalled)
assertTrue(errorHandlerCalled)
assertTrue(subscription.isUnsubscribed)
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}
}
interface ThreadOps : RPCOps {
fun sendMessage(id: Int, msgNo: Int): String
}

View File

@ -408,6 +408,14 @@ class RPCClientProxyHandler(
when (event) {
FailoverEventType.FAILURE_DETECTED -> {
log.warn("RPC server unavailable. RPC calls are being buffered.")
log.warn("Terminating observables.")
val m = observableContext.observableMap.asMap()
m.keys.forEach { k ->
observationExecutorPool.run(k) {
m[k]?.onError(RPCException("Connection failure detected."))
}
}
observableContext.observableMap.invalidateAll()
}
FailoverEventType.FAILOVER_COMPLETED -> {
@ -422,8 +430,6 @@ class RPCClientProxyHandler(
"will throw an RPCException.")
rpcReplyMap.forEach { id, replyFuture ->
replyFuture.setException(RPCException("Could not re-connect to RPC server. Failover failed."))
val observable = observableContext.observableMap.getIfPresent(id)
observable?.onError(RPCException("Could not re-connect to RPC server. Failover failed."))
}
outgoingRequestBuffer.clear()
rpcReplyMap.clear()
@ -507,4 +513,4 @@ object RpcClientObservableSerializer : Serializer<Observable<*>>() {
val rpcRequestOrObservableId = kryo.context[RPCApi.RpcRequestOrObservableIdKey] as InvocationId
return observableContext.callSiteMap?.get(rpcRequestOrObservableId)
}
}
}