diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 88c059ff9b..b0965f29b4 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -10,7 +10,6 @@ import net.corda.client.rpc.ConnectionFailureException import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCSinceVersion -import net.corda.nodeapi.internal.rpc.client.RpcClientObservableDeSerializer import net.corda.core.context.Actor import net.corda.core.context.Trace import net.corda.core.context.Trace.InvocationId @@ -35,6 +34,7 @@ import net.corda.nodeapi.internal.DeduplicationChecker import net.corda.nodeapi.internal.rpc.client.CallSite import net.corda.nodeapi.internal.rpc.client.CallSiteMap import net.corda.nodeapi.internal.rpc.client.ObservableContext +import net.corda.nodeapi.internal.rpc.client.RpcClientObservableDeSerializer import net.corda.nodeapi.internal.rpc.client.RpcObservableMap import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException @@ -50,6 +50,7 @@ import org.apache.activemq.artemis.api.core.client.FailoverEventType import org.apache.activemq.artemis.api.core.client.ServerLocator import rx.Notification import rx.Observable +import rx.exceptions.OnErrorNotImplementedException import rx.subjects.UnicastSubject import java.lang.reflect.InvocationHandler import java.lang.reflect.Method @@ -142,6 +143,19 @@ internal class RPCClientProxyHandler( } } } + + @Suppress("TooGenericExceptionCaught") + private fun closeObservable(observable: UnicastSubject>) { + // Notify listeners of the observables that the connection is being terminated. + try { + observable.onError(ConnectionFailureException()) + } catch (ex: OnErrorNotImplementedException) { + // Indicates the observer does not have any error handling. + log.debug { "Closed connection on observable whose observers have no error handling." } + } catch (ex: Exception) { + log.error("Unexpected exception when RPC connection failure handling", ex) + } + } } // Used for reaping @@ -452,14 +466,9 @@ internal class RPCClientProxyHandler( } reaperScheduledFuture?.cancel(false) - val observablesMap = observableContext.observableMap.asMap() - observablesMap.keys.forEach { key -> + observableContext.observableMap.asMap().forEach { (key, observable) -> observationExecutorPool.run(key) { - try { - observablesMap[key]?.onError(ConnectionFailureException()) - } catch (e: Exception) { - log.error("Unexpected exception when RPC connection failure handling", e) - } + observable?.also(Companion::closeObservable) } } observableContext.observableMap.invalidateAll()