From 70e8e69a873d684bb1010de3b3f5caabf141d486 Mon Sep 17 00:00:00 2001 From: Dimos Raptis Date: Fri, 14 Feb 2020 09:58:32 +0000 Subject: [PATCH] NOTICK - Add tests for handling of user errors in reconnecting observables (#5932) * Add tests for handling of user errors in reconnecting observables * detekt --- .../CordaRPCClientReconnectionTest.kt | 42 +++++++++++++++++++ .../rpc/internal/ReconnectingObservable.kt | 29 +++++++++---- detekt-baseline.xml | 1 + 3 files changed, 65 insertions(+), 7 deletions(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt index 18e4f878cb..031b990b0a 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt @@ -26,6 +26,7 @@ import net.corda.testing.node.internal.rpcDriver import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.Test +import java.lang.RuntimeException import java.lang.Thread.sleep import java.time.Duration import java.util.concurrent.CountDownLatch @@ -163,6 +164,47 @@ class CordaRPCClientReconnectionTest { } } + @Test(timeout=300_000) + fun `when user code throws an error on a reconnecting observable, then onError is invoked and observable is unsubscribed successfully`() { + driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS)) { + val normalLatch = CountDownLatch(1) + val errorLatch = CountDownLatch(1) + var observedEvents = 0 + + fun startNode(address: NetworkHostAndPort): NodeHandle { + return startNode( + providedName = CHARLIE_NAME, + rpcUsers = listOf(CordaRPCClientTest.rpcUser), + customOverrides = mapOf("rpcSettings.address" to address.toString()) + ).getOrThrow() + } + + val addresses = listOf(NetworkHostAndPort("localhost", portAllocator.nextPort()), NetworkHostAndPort("localhost", portAllocator.nextPort())) + + startNode(addresses[0]) + val client = CordaRPCClient(addresses) + + (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use { + val rpcOps = it.proxy as ReconnectingCordaRPCOps + val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) + val subscription = cashStatesFeed.updates.subscribe ({ + normalLatch.countDown() + observedEvents++ + throw RuntimeException() + }, { + errorLatch.countDown() + }) + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + + assertTrue { normalLatch.await(2, TimeUnit.SECONDS) } + assertTrue { errorLatch.await(2, TimeUnit.SECONDS) } + assertThat(subscription.isUnsubscribed).isTrue() + assertThat(observedEvents).isEqualTo(1) + } + } + } + @Test(timeout=300_000) fun `an RPC call fails, when the maximum number of attempts is exceeded`() { driver(DriverParameters(cordappsForAllNodes = emptyList())) { diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt index b5052d9bbf..fe3c03f453 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt @@ -1,5 +1,6 @@ package net.corda.client.rpc.internal +import net.corda.client.rpc.ConnectionFailureException import net.corda.core.messaging.DataFeed import rx.Observable import rx.Subscriber @@ -54,16 +55,30 @@ class ReconnectingObservable private constructor(subscriber: ReconnectingSubs } } + /** + * Depending on the type of error, the reaction is different: + * - If the error is coming from a connection disruption, we establish a new connection and re-wire the observable + * without letting the client notice at all. + * - In any other case, we let the error propagate to the client's observable. Both of the observables + * (this one and the client's one) will be automatically unsubscribed, since that's the semantics of onError. + */ private fun scheduleResubscribe(error: Throwable) { if (unsubscribed) return - reconnectingRPCConnection.observersPool.execute { - if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute - reconnectingRPCConnection.reconnectOnError(error) - // It can take a while to reconnect so we might find that we've shutdown in in the meantime - if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute - val newDataFeed = createDataFeed() - subscribeImmediately(newDataFeed) + + if (error is ConnectionFailureException) { + reconnectingRPCConnection.observersPool.execute { + if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute + reconnectingRPCConnection.reconnectOnError(error) + // It can take a while to reconnect so we might find that we've shutdown in in the meantime + if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute + val newDataFeed = createDataFeed() + subscribeImmediately(newDataFeed) + } + } else { + val subscriber = checkNotNull(this.subscriber.get()) + subscriber.onError(error) } + } } } \ No newline at end of file diff --git a/detekt-baseline.xml b/detekt-baseline.xml index 2c6cf777c8..39017a0c2b 100644 --- a/detekt-baseline.xml +++ b/detekt-baseline.xml @@ -1656,6 +1656,7 @@ TooGenericExceptionThrown:ClassLoadingUtilsTest.kt$ClassLoadingUtilsTest$throw RuntimeException() TooGenericExceptionThrown:CommandParsers.kt$AzureParser.RegionConverter$throw Error("Unknown azure region: $value") TooGenericExceptionThrown:ContractHierarchyTest.kt$ContractHierarchyTest.IndirectContractParent$throw RuntimeException("Boom!") + TooGenericExceptionThrown:CordaRPCClientReconnectionTest.kt$CordaRPCClientReconnectionTest$throw RuntimeException() TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated exit of ${request.amount} from $issuer, however there is no cash to exit!" ) TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from $issuer, " + "however they only have $issuerQuantity!" ) TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from ${node.mainIdentity}, " + "however there is no cash from $issuer!" )