INFRA-438 Handle observers not having error handling (#6434)

* INFRA-438 Handle observers not having error handling

When the RPC client connection is closed, it notifies observers using onError(), which may not be the correct approach (TBD) but changing this is a much more invasive change. Where observers do not subscribe to error notifications, this is reflected to the calling client by an exception thrown.

This change catches that exception and lots it as debug rather an error level.
This commit is contained in:
Ross Nicoll 2020-08-05 00:05:33 +01:00 committed by GitHub
parent 25d1d61685
commit de5568854c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -10,7 +10,6 @@ import net.corda.client.rpc.ConnectionFailureException
import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCException
import net.corda.client.rpc.RPCSinceVersion import net.corda.client.rpc.RPCSinceVersion
import net.corda.nodeapi.internal.rpc.client.RpcClientObservableDeSerializer
import net.corda.core.context.Actor import net.corda.core.context.Actor
import net.corda.core.context.Trace import net.corda.core.context.Trace
import net.corda.core.context.Trace.InvocationId import net.corda.core.context.Trace.InvocationId
@ -35,6 +34,7 @@ import net.corda.nodeapi.internal.DeduplicationChecker
import net.corda.nodeapi.internal.rpc.client.CallSite import net.corda.nodeapi.internal.rpc.client.CallSite
import net.corda.nodeapi.internal.rpc.client.CallSiteMap import net.corda.nodeapi.internal.rpc.client.CallSiteMap
import net.corda.nodeapi.internal.rpc.client.ObservableContext import net.corda.nodeapi.internal.rpc.client.ObservableContext
import net.corda.nodeapi.internal.rpc.client.RpcClientObservableDeSerializer
import net.corda.nodeapi.internal.rpc.client.RpcObservableMap import net.corda.nodeapi.internal.rpc.client.RpcObservableMap
import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
@ -50,6 +50,7 @@ import org.apache.activemq.artemis.api.core.client.FailoverEventType
import org.apache.activemq.artemis.api.core.client.ServerLocator import org.apache.activemq.artemis.api.core.client.ServerLocator
import rx.Notification import rx.Notification
import rx.Observable import rx.Observable
import rx.exceptions.OnErrorNotImplementedException
import rx.subjects.UnicastSubject import rx.subjects.UnicastSubject
import java.lang.reflect.InvocationHandler import java.lang.reflect.InvocationHandler
import java.lang.reflect.Method import java.lang.reflect.Method
@ -142,6 +143,19 @@ internal class RPCClientProxyHandler(
} }
} }
} }
@Suppress("TooGenericExceptionCaught")
private fun closeObservable(observable: UnicastSubject<Notification<*>>) {
// Notify listeners of the observables that the connection is being terminated.
try {
observable.onError(ConnectionFailureException())
} catch (ex: OnErrorNotImplementedException) {
// Indicates the observer does not have any error handling.
log.debug { "Closed connection on observable whose observers have no error handling." }
} catch (ex: Exception) {
log.error("Unexpected exception when RPC connection failure handling", ex)
}
}
} }
// Used for reaping // Used for reaping
@ -452,14 +466,9 @@ internal class RPCClientProxyHandler(
} }
reaperScheduledFuture?.cancel(false) reaperScheduledFuture?.cancel(false)
val observablesMap = observableContext.observableMap.asMap() observableContext.observableMap.asMap().forEach { (key, observable) ->
observablesMap.keys.forEach { key ->
observationExecutorPool.run(key) { observationExecutorPool.run(key) {
try { observable?.also(Companion::closeObservable)
observablesMap[key]?.onError(ConnectionFailureException())
} catch (e: Exception) {
log.error("Unexpected exception when RPC connection failure handling", e)
}
} }
} }
observableContext.observableMap.invalidateAll() observableContext.observableMap.invalidateAll()