From 8962d930d4c821689884f7b6ea502f6aa39abd99 Mon Sep 17 00:00:00 2001 From: Dimos Raptis Date: Tue, 16 Jul 2019 11:29:21 +0100 Subject: [PATCH] [CORDA-2923] - Make the RPC client reconnect with gracefulReconnect param (#5244) When set to true the RPC client will: * automatically reconnect when the connection is broken * simple RPC calls will block until connection is established * Observables returned from RPC will automatically resubscribe on reconnect so the client continues to receive events. This doesn't guarantee that events will not be lost during the reconnect. --- .idea/compiler.xml | 6 + .../client/jfx/model/NodeMonitorModel.kt | 15 +- .../rpc/CordaRPCClientReconnectionTest.kt | 147 ++++++++++++++++++ .../net/corda/client/rpc/CordaRPCClient.kt | 80 ++++++++-- .../rpc/internal/ReconnectingCordaRPCOps.kt | 138 +++------------- .../rpc/internal/ReconnectingObservable.kt | 68 ++++++++ .../reconnect/CouldNotStartFlowException.kt | 10 ++ docs/source/changelog.rst | 12 +- docs/source/clientrpc.rst | 63 ++++---- .../node/services/rpc/RpcReconnectTests.kt | 107 ++++++++----- .../net/corda/tools/shell/InteractiveShell.kt | 8 +- 11 files changed, 420 insertions(+), 234 deletions(-) create mode 100644 client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientReconnectionTest.kt create mode 100644 client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt create mode 100644 client/rpc/src/main/kotlin/net/corda/client/rpc/reconnect/CouldNotStartFlowException.kt diff --git a/.idea/compiler.xml b/.idea/compiler.xml index 3628d6937c..a909f7b4af 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -265,6 +265,10 @@ + + + + @@ -363,6 +367,8 @@ + + diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt index 867d42f725..36be2b5632 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt @@ -1,9 +1,7 @@ package net.corda.client.jfx.model -import javafx.application.Platform import javafx.beans.property.SimpleObjectProperty import net.corda.client.rpc.internal.ReconnectingCordaRPCOps -import net.corda.client.rpc.internal.asReconnectingWithInitialValues import net.corda.core.contracts.ContractState import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party @@ -85,24 +83,19 @@ class NodeMonitorModel : AutoCloseable { }.toSet() val consumedStates = statesSnapshot.states.toSet() - unconsumedStates val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates, references = emptySet()) - vaultUpdates.asReconnectingWithInitialValues(listOf(initialVaultUpdate)) - .subscribe( - onNext = vaultUpdatesSubject::onNext, - onDisconnect = { Platform.runLater { proxyObservable.value = null } }, - onReconnect = { Platform.runLater { proxyObservable.value = rpc } }, - onStop = {}) + vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject::onNext) // Transactions val (transactions, newTransactions) = rpc.internalVerifiedTransactionsFeed() - newTransactions.asReconnectingWithInitialValues(transactions).subscribe(transactionsSubject::onNext) + newTransactions.startWith(transactions).subscribe(transactionsSubject::onNext) // SM -> TX mapping val (smTxMappings, futureSmTxMappings) = rpc.stateMachineRecordedTransactionMappingFeed() - futureSmTxMappings.asReconnectingWithInitialValues(smTxMappings).subscribe(stateMachineTransactionMappingSubject::onNext) + futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject::onNext) // Parties on network val (parties, futurePartyUpdate) = rpc.networkMapFeed() - futurePartyUpdate.asReconnectingWithInitialValues(parties.map(MapChange::Added)).subscribe(networkMapSubject::onNext) + futurePartyUpdate.startWith(parties.map(MapChange::Added)).subscribe(networkMapSubject::onNext) val stateMachines = rpc.stateMachinesSnapshot() diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientReconnectionTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientReconnectionTest.kt new file mode 100644 index 0000000000..08fe413fa5 --- /dev/null +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientReconnectionTest.kt @@ -0,0 +1,147 @@ +package net.corda.client.rpc + +import net.corda.core.messaging.startTrackedFlow +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.getOrThrow +import net.corda.finance.DOLLARS +import net.corda.finance.contracts.asset.Cash +import net.corda.finance.flows.CashIssueFlow +import net.corda.node.services.Permissions +import net.corda.testing.core.CHARLIE_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.NodeHandle +import net.corda.testing.driver.driver +import net.corda.testing.driver.internal.incrementalPortAllocation +import net.corda.testing.node.User +import net.corda.testing.node.internal.FINANCE_CORDAPPS +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class CordaRPCClientReconnectionTest { + + private val portAllocator = incrementalPortAllocation() + + companion object { + val rpcUser = User("user1", "test", permissions = setOf(Permissions.all())) + } + + @Test + fun `rpc client calls and returned observables continue working when the server crashes and restarts`() { + driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS, startNodesInProcess = false, inMemoryDB = false)) { + val latch = CountDownLatch(2) + val address = NetworkHostAndPort("localhost", portAllocator.nextPort()) + + fun startNode(): NodeHandle { + return startNode( + providedName = CHARLIE_NAME, + rpcUsers = listOf(CordaRPCClientTest.rpcUser), + customOverrides = mapOf("rpcSettings.address" to address.toString()) + ).getOrThrow() + } + + val node = startNode() + val client = CordaRPCClient(node.rpcAddress, CordaRPCClientConfiguration.DEFAULT.copy( + maxReconnectAttempts = 5 + )) + + val rpcOps = client.start(rpcUser.username, rpcUser.password, gracefulReconnect = true).proxy + val networkParameters = rpcOps.networkParameters + val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) + cashStatesFeed.updates.subscribe { latch.countDown() } + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + + node.stop() + startNode() + + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + + val networkParametersAfterCrash = rpcOps.networkParameters + assertThat(networkParameters).isEqualTo(networkParametersAfterCrash) + assertTrue { + latch.await(2, TimeUnit.SECONDS) + } + } + } + + @Test + fun `a client can successfully unsubscribe a reconnecting observable`() { + driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS, startNodesInProcess = false, inMemoryDB = false)) { + val latch = CountDownLatch(2) + val address = NetworkHostAndPort("localhost", portAllocator.nextPort()) + + fun startNode(): NodeHandle { + return startNode( + providedName = CHARLIE_NAME, + rpcUsers = listOf(CordaRPCClientTest.rpcUser), + customOverrides = mapOf("rpcSettings.address" to address.toString()) + ).getOrThrow() + } + + val node = startNode() + val client = CordaRPCClient(node.rpcAddress, CordaRPCClientConfiguration.DEFAULT.copy( + maxReconnectAttempts = 5 + )) + + val rpcOps = client.start(rpcUser.username, rpcUser.password, true).proxy + val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) + val subscription = cashStatesFeed.updates.subscribe { latch.countDown() } + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + + node.stop() + startNode() + + subscription.unsubscribe() + + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + + assertFalse { + latch.await(4, TimeUnit.SECONDS) + } + } + } + + @Test + fun `rpc client calls and returned observables continue working when there is failover between servers`() { + driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS, startNodesInProcess = false, inMemoryDB = false)) { + val latch = CountDownLatch(2) + + fun startNode(address: NetworkHostAndPort): NodeHandle { + return startNode( + providedName = CHARLIE_NAME, + rpcUsers = listOf(CordaRPCClientTest.rpcUser), + customOverrides = mapOf("rpcSettings.address" to address.toString()) + ).getOrThrow() + } + + val addresses = listOf(NetworkHostAndPort("localhost", portAllocator.nextPort()), NetworkHostAndPort("localhost", portAllocator.nextPort())) + + val node = startNode(addresses[0]) + val client = CordaRPCClient(addresses, CordaRPCClientConfiguration.DEFAULT.copy( + maxReconnectAttempts = 5 + )) + + val rpcOps = client.start(rpcUser.username, rpcUser.password, true).proxy + val networkParameters = rpcOps.networkParameters + val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) + cashStatesFeed.updates.subscribe { latch.countDown() } + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + + node.stop() + startNode(addresses[1]) + + rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() + + val networkParametersAfterCrash = rpcOps.networkParameters + assertThat(networkParameters).isEqualTo(networkParametersAfterCrash) + assertTrue { + latch.await(2, TimeUnit.SECONDS) + } + } + } + +} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index 3e65fa7628..a60cfdcbf8 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -2,12 +2,15 @@ package net.corda.client.rpc import com.github.benmanes.caffeine.cache.Caffeine import net.corda.client.rpc.internal.RPCClient +import net.corda.client.rpc.internal.ReconnectingCordaRPCOps import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme -import net.corda.core.internal.createInstancesOfClassesImplementing +import net.corda.client.rpc.reconnect.CouldNotStartFlowException +import net.corda.core.CordaInternal import net.corda.core.context.Actor import net.corda.core.context.Trace import net.corda.core.identity.CordaX500Name import net.corda.core.internal.PLATFORM_VERSION +import net.corda.core.internal.createInstancesOfClassesImplementing import net.corda.core.internal.messaging.InternalCordaRPCOps import net.corda.core.messaging.ClientRpcSslOptions import net.corda.core.messaging.CordaRPCOps @@ -23,14 +26,39 @@ import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT import net.corda.serialization.internal.amqp.SerializationFactoryCacheKey import net.corda.serialization.internal.amqp.SerializerFactory import java.time.Duration -import java.util.ServiceLoader +import java.util.* /** * This class is essentially just a wrapper for an RPCConnection and can be treated identically. * * @see RPCConnection */ -class CordaRPCConnection internal constructor(connection: RPCConnection) : RPCConnection by connection +class CordaRPCConnection private constructor( + private val oneTimeConnection: RPCConnection?, + private val reconnectingCordaRPCOps: ReconnectingCordaRPCOps? +) : RPCConnection { + internal constructor(connection: RPCConnection?) : this(connection, null) + + companion object { + @CordaInternal + internal fun createWithGracefulReconnection(username: String, password: String, addresses: List): CordaRPCConnection { + return CordaRPCConnection(null, ReconnectingCordaRPCOps(addresses, username, password)) + } + } + + override val proxy: CordaRPCOps get() = reconnectingCordaRPCOps ?: oneTimeConnection!!.proxy + + private val actualConnection: RPCConnection + get() = reconnectingCordaRPCOps?.reconnectingRPCConnection ?: oneTimeConnection!! + + override val serverProtocolVersion: Int get() = actualConnection.serverProtocolVersion + + override fun notifyServerAndClose() = actualConnection.notifyServerAndClose() + + override fun forceClose() = actualConnection.forceClose() + + override fun close() = actualConnection.close() +} /** * Can be used to configure the RPC client connection. @@ -235,11 +263,17 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( * observable with another RPC. * * In case of loss of connection to the server, the client will try to reconnect using the settings provided via - * [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw + * [CordaRPCClientConfiguration]. If the client was created using a list of hosts via [haAddressPool], automatic failover will occur + * (the servers have to be started in HA mode). While attempting failover, current and future RPC calls will throw * [RPCException] and previously returned observables will call onError(). * - * If the client was created using a list of hosts, automatic failover will occur (the servers have to be started in - * HA mode). + * If you want to enable a more graceful form of reconnection, you can make use of the gracefulReconnect argument of the [start] method. + * If this is set to true, then: + * - The client will automatically reconnect, when the connection is broken regardless of whether you provided a single or multiple addresses. + * - Simple RPC calls that return data (e.g. [CordaRPCOps.networkParameters]) will **block** and return after the connection has been re-established and the node is up. + * - RPC calls that return [rx.Observable]s (e.g. [CordaRPCOps.vaultTrack]) will automatically reconnect and keep sending events for the subscribed [rx.Observable]s. + * Note: In this approach, some events might be lost during a re-connection and not sent in the subscribed [rx.Observable]s. + * - RPC calls that invoke flows (e.g. [CordaRPCOps.startFlowDynamic]) will fail during a disconnection throwing a [CouldNotStartFlowException]. * * @param hostAndPort The network address to connect to. * @param configuration An optional configuration used to tweak client behaviour. @@ -335,10 +369,12 @@ class CordaRPCClient private constructor( * * @param username The username to authenticate with. * @param password The password to authenticate with. + * @param gracefulReconnect whether the connection will reconnect gracefully. * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. */ - fun start(username: String, password: String): CordaRPCConnection { - return start(username, password, null, null) + @JvmOverloads + fun start(username: String, password: String, gracefulReconnect: Boolean = false): CordaRPCConnection { + return start(username, password, null, null, gracefulReconnect) } /** @@ -350,10 +386,12 @@ class CordaRPCClient private constructor( * @param username The username to authenticate with. * @param password The password to authenticate with. * @param targetLegalIdentity in case of multi-identity RPC endpoint specific legal identity to which the calls must be addressed. + * @param gracefulReconnect whether the connection will reconnect gracefully. * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. */ - fun start(username: String, password: String, targetLegalIdentity: CordaX500Name): CordaRPCConnection { - return start(username, password, null, null, targetLegalIdentity) + @JvmOverloads + fun start(username: String, password: String, targetLegalIdentity: CordaX500Name, gracefulReconnect: Boolean = false): CordaRPCConnection { + return start(username, password, null, null, targetLegalIdentity, gracefulReconnect) } /** @@ -366,10 +404,12 @@ class CordaRPCClient private constructor( * @param password The password to authenticate with. * @param externalTrace external [Trace] for correlation. * @param impersonatedActor the actor on behalf of which all the invocations will be made. + * @param gracefulReconnect whether the connection will reconnect gracefully. * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. */ - fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?): CordaRPCConnection { - return start(username, password, externalTrace, impersonatedActor, null) + @JvmOverloads + fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, gracefulReconnect: Boolean = false): CordaRPCConnection { + return start(username, password, externalTrace, impersonatedActor, null, gracefulReconnect) } /** @@ -383,10 +423,22 @@ class CordaRPCClient private constructor( * @param externalTrace external [Trace] for correlation. * @param impersonatedActor the actor on behalf of which all the invocations will be made. * @param targetLegalIdentity in case of multi-identity RPC endpoint specific legal identity to which the calls must be addressed. + * @param gracefulReconnect whether the connection will reconnect gracefully. * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. */ - fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?): CordaRPCConnection { - return CordaRPCConnection(getRpcClient().start(InternalCordaRPCOps::class.java, username, password, externalTrace, impersonatedActor, targetLegalIdentity)) + @JvmOverloads + fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?, gracefulReconnect: Boolean = false): CordaRPCConnection { + val addresses = if (haAddressPool.isEmpty()) { + listOf(hostAndPort!!) + } else { + haAddressPool + } + + return if (gracefulReconnect) { + CordaRPCConnection.createWithGracefulReconnection(username, password, addresses) + } else { + CordaRPCConnection(getRpcClient().start(InternalCordaRPCOps::class.java, username, password, externalTrace, impersonatedActor, targetLegalIdentity)) + } } /** diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index 48cf2e7e91..a4f1f9212c 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -1,6 +1,7 @@ package net.corda.client.rpc.internal import net.corda.client.rpc.* +import net.corda.client.rpc.reconnect.CouldNotStartFlowException import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.div import net.corda.core.internal.messaging.InternalCordaRPCOps @@ -10,21 +11,21 @@ import net.corda.core.messaging.ClientRpcSslOptions import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.DataFeed import net.corda.core.messaging.FlowHandle -import net.corda.core.utilities.* +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug +import net.corda.core.utilities.seconds import net.corda.nodeapi.exceptions.RejectedCommandException import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException -import rx.Observable import java.lang.reflect.InvocationHandler import java.lang.reflect.InvocationTargetException import java.lang.reflect.Method import java.lang.reflect.Proxy import java.time.Duration -import java.util.* import java.util.concurrent.ExecutorService import java.util.concurrent.Executors -import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit /** @@ -41,8 +42,10 @@ import java.util.concurrent.TimeUnit * * *This class is not a stable API. Any project that wants to use it, must copy and paste it.* */ +// TODO The executor service is not needed. All we need is a single thread that deals with reconnecting and onto which ReconnectingObservables +// and other things can attach themselves as listeners for reconnect events. class ReconnectingCordaRPCOps private constructor( - private val reconnectingRPCConnection: ReconnectingRPCConnection, + val reconnectingRPCConnection: ReconnectingRPCConnection, private val observersPool: ExecutorService, private val userPool: Boolean ) : AutoCloseable, InternalCordaRPCOps by proxy(reconnectingRPCConnection, observersPool) { @@ -118,26 +121,10 @@ class ReconnectingCordaRPCOps private constructor( } } - /** - * This function is similar to [runFlowWithLogicalRetry] but is blocking and it returns the result of the flow. - * - * [runFlow] - starts a flow and returns the [FlowHandle]. - * [hasFlowCompleted] - Runs a vault query and is able to recreate the result of the flow. - */ - fun runFlowAndReturnResultWithLogicalRetry(runFlow: (CordaRPCOps) -> FlowHandle, hasFlowCompleted: (CordaRPCOps) -> T?, timeout: Duration = 4.seconds): T { - return try { - runFlow(this).returnValue.get() - } catch (e: CouldNotStartFlowException) { - log.error("Couldn't start flow: ${e.message}") - Thread.sleep(timeout.toMillis()) - hasFlowCompleted(this) ?: runFlowAndReturnResultWithLogicalRetry(runFlow, hasFlowCompleted, timeout) - } - } - /** * Helper class useful for reconnecting to a Node. */ - internal data class ReconnectingRPCConnection( + data class ReconnectingRPCConnection( val nodeHostAndPorts: List, val username: String, val password: String, @@ -169,10 +156,11 @@ class ReconnectingCordaRPCOps private constructor( * Will block until the connection is established again. */ @Synchronized - fun error(e: Throwable) { + fun reconnectOnError(e: Throwable) { currentState = CurrentState.DIED //TODO - handle error cases log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}") + log.debug("", e) connect() } @@ -193,9 +181,9 @@ class ReconnectingCordaRPCOps private constructor( ).start(username, password).also { // Check connection is truly operational before returning it. require(it.proxy.nodeInfo().legalIdentitiesAndCerts.isNotEmpty()) { - "Could not establish connection to ${nodeHostAndPorts}." + "Could not establish connection to $nodeHostAndPorts." } - log.debug { "Connection successfully established with: ${nodeHostAndPorts}" } + log.debug { "Connection successfully established with: $nodeHostAndPorts" } } } catch (ex: Exception) { when (ex) { @@ -256,57 +244,6 @@ class ReconnectingCordaRPCOps private constructor( } } - internal class ReconnectingObservableImpl internal constructor( - val reconnectingRPCConnection: ReconnectingRPCConnection, - val observersPool: ExecutorService, - val initial: DataFeed<*, T>, - val createDataFeed: () -> DataFeed<*, T> - ) : Observable(null), ReconnectingObservable { - - private var initialStartWith: Iterable? = null - private fun _subscribeWithReconnect(observerHandle: ObserverHandle, onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit, startWithValues: Iterable? = null) { - var subscriptionError: Throwable? - try { - val subscription = initial.updates.let { if (startWithValues != null) it.startWith(startWithValues) else it } - .subscribe(onNext, observerHandle::fail, observerHandle::stop) - subscriptionError = observerHandle.await() - subscription.unsubscribe() - } catch (e: Exception) { - log.error("Failed to register subscriber .", e) - subscriptionError = e - } - - // In case there was no exception the observer has finished gracefully. - if (subscriptionError == null) { - onStop() - return - } - - onDisconnect() - // Only continue if the subscription failed. - reconnectingRPCConnection.error(subscriptionError) - log.debug { "Recreating data feed." } - - val newObservable = createDataFeed().updates as ReconnectingObservableImpl - onReconnect() - return newObservable._subscribeWithReconnect(observerHandle, onNext, onStop, onDisconnect, onReconnect) - } - - override fun subscribe(onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit): ObserverHandle { - val observerNotifier = ObserverHandle() - // TODO - change the establish connection method to be non-blocking - observersPool.execute { - _subscribeWithReconnect(observerNotifier, onNext, onStop, onDisconnect, onReconnect, initialStartWith) - } - return observerNotifier - } - - override fun startWithValues(values: Iterable): ReconnectingObservable { - initialStartWith = values - return this - } - } - private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection, val observersPool: ExecutorService) : InvocationHandler { private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow") @@ -327,23 +264,23 @@ class ReconnectingCordaRPCOps private constructor( when (e.targetException) { is RejectedCommandException -> { log.error("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e) - reconnectingRPCConnection.error(e) + reconnectingRPCConnection.reconnectOnError(e) this.invoke(proxy, method, args) } is ConnectionFailureException -> { log.error("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e) - reconnectingRPCConnection.error(e) + reconnectingRPCConnection.reconnectOnError(e) retry() } is RPCException -> { log.error("Failed to perform operation ${method.name}. RPCException. Retrying....", e) - reconnectingRPCConnection.error(e) + reconnectingRPCConnection.reconnectOnError(e) Thread.sleep(1000) // TODO - explain why this sleep is necessary retry() } else -> { log.error("Failed to perform operation ${method.name}. Unknown error. Retrying....", e) - reconnectingRPCConnection.error(e) + reconnectingRPCConnection.reconnectOnError(e) retry() } } @@ -353,7 +290,7 @@ class ReconnectingCordaRPCOps private constructor( DataFeed::class.java -> { // Intercept the data feed methods and returned a ReconnectingObservable instance val initialFeed: DataFeed = uncheckedCast(result) - val observable = ReconnectingObservableImpl(reconnectingRPCConnection, observersPool, initialFeed) { + val observable = ReconnectingObservable(reconnectingRPCConnection, observersPool, initialFeed) { // This handles reconnecting and creates new feeds. uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args)) } @@ -371,42 +308,3 @@ class ReconnectingCordaRPCOps private constructor( reconnectingRPCConnection.forceClose() } } - -/** - * Returned as the `updates` field when calling methods that return a [DataFeed] on the [ReconnectingCordaRPCOps]. - * - * TODO - provide a logical function to know how to retrieve missing events that happened during disconnects. - */ -interface ReconnectingObservable { - fun subscribe(onNext: (T) -> Unit): ObserverHandle = subscribe(onNext, {}, {}, {}) - fun subscribe(onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit): ObserverHandle - fun startWithValues(values: Iterable): ReconnectingObservable -} - -/** - * Utility to externally control a subscribed observer. - */ -class ObserverHandle { - private val terminated = LinkedBlockingQueue>(1) - - fun stop() = terminated.put(Optional.empty()) - internal fun fail(e: Throwable) = terminated.put(Optional.of(e)) - - /** - * Returns null if the observation ended successfully. - */ - internal fun await(): Throwable? = terminated.take().orElse(null) -} - -/** - * Thrown when a flow start command died before receiving a [net.corda.core.messaging.FlowHandle]. - * On catching this exception, the typical behaviour is to run a "logical retry", meaning only retry the flow if the expected outcome did not occur. - */ -class CouldNotStartFlowException(cause: Throwable? = null) : RPCException("Could not start flow as connection failed", cause) - -/** - * Mainly for Kotlin users. - */ -fun Observable.asReconnecting(): ReconnectingObservable = uncheckedCast(this) - -fun Observable.asReconnectingWithInitialValues(values: Iterable): ReconnectingObservable = asReconnecting().startWithValues(values) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt new file mode 100644 index 0000000000..a8fea5e71f --- /dev/null +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt @@ -0,0 +1,68 @@ +package net.corda.client.rpc.internal + +import net.corda.core.messaging.DataFeed +import rx.Observable +import rx.Subscriber +import rx.Subscription +import java.util.concurrent.ExecutorService +import java.util.concurrent.atomic.AtomicReference + +class ReconnectingObservable private constructor(subscriber: ReconnectingSubscriber) : Observable(subscriber) { + + constructor( + reconnectingRPCConnection: ReconnectingCordaRPCOps.ReconnectingRPCConnection, + executor: ExecutorService, + initialDataFeed: DataFeed<*, T>, + createDataFeed: () -> DataFeed<*, T> + ) : this(ReconnectingSubscriber(reconnectingRPCConnection, executor, initialDataFeed, createDataFeed)) + + private class ReconnectingSubscriber( + private val reconnectingRPCConnection: ReconnectingCordaRPCOps.ReconnectingRPCConnection, + private val executor: ExecutorService, + private val initialDataFeed: DataFeed<*, T>, + private val createDataFeed: () -> DataFeed<*, T> + ) : OnSubscribe, Subscription { + + private val subscriber = AtomicReference>() + @Volatile + private var backingSubscription: Subscription? = null + @Volatile + private var unsubscribed = false + + override fun unsubscribe() { + backingSubscription?.unsubscribe() + unsubscribed = true + } + + override fun isUnsubscribed(): Boolean = unsubscribed + + override fun call(subscriber: Subscriber) { + if (this.subscriber.compareAndSet(null, subscriber)) { + subscriber.add(this) + subscribeImmediately(initialDataFeed) + } else { + subscriber.onError(IllegalStateException("Only a single subscriber is allowed")) + } + } + + private fun subscribeImmediately(dataFeed: DataFeed<*, T>) { + if (unsubscribed) return + val subscriber = checkNotNull(this.subscriber.get()) + try { + backingSubscription = dataFeed.updates.subscribe(subscriber::onNext, ::scheduleResubscribe, subscriber::onCompleted) + } catch (e: Exception) { + scheduleResubscribe(e) + } + } + + private fun scheduleResubscribe(error: Throwable) { + if (unsubscribed) return + executor.execute { + if (unsubscribed) return@execute + reconnectingRPCConnection.reconnectOnError(error) + val newDataFeed = createDataFeed() + subscribeImmediately(newDataFeed) + } + } + } +} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/reconnect/CouldNotStartFlowException.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/reconnect/CouldNotStartFlowException.kt new file mode 100644 index 0000000000..08bbed4887 --- /dev/null +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/reconnect/CouldNotStartFlowException.kt @@ -0,0 +1,10 @@ +package net.corda.client.rpc.reconnect + +import net.corda.client.rpc.RPCException + +/** + * Thrown when a flow start command died before receiving a [net.corda.core.messaging.FlowHandle]. + * On catching this exception, the typical behaviour is to run a "logical retry", meaning only retry the flow if the expected outcome did not occur. + */ +class CouldNotStartFlowException(cause: Throwable? = null) : RPCException("Could not start flow as connection failed", cause) + diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index fbd5d88e90..314e8a7eb4 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -23,13 +23,19 @@ Version 5.0 * It is now possible to re-record transactions if a node wishes to record as an observer a transaction it has participated in. If this is done, then the node may record new output states that are not relevant to the node. -.. warning:: Nodes may re-record transactions if they have previously recorded them as a participant and wish to record them as an observer. - However, the node cannot resolve the forward chain of transactions if this is done. This means that if you wish to re-record a chain of - transactions and get the new output states to be correctly marked as consumed, the full chain must be sent to the node *in order*. + .. warning:: Nodes may re-record transactions if they have previously recorded them as a participant and wish to record them as an observer. + However, the node cannot resolve the forward chain of transactions if this is done. This means that if you wish to re-record a chain of + transactions and get the new output states to be correctly marked as consumed, the full chain must be sent to the node *in order*. * Added ``nodeDiagnosticInfo`` to the RPC API. The new RPC is also available as the ``run nodeDiagnosticInfo`` command executable from the Corda shell. It retrieves version information about the Corda platform and the CorDapps installed on the node. +* ``CordaRPCClient.start`` has a new ``gracefulReconnect`` parameter. When ``true`` (the default is ``false``) it will cause the RPC client + to try to automatically reconnect to the node on disconnect. Further any ``Observable`` s previously created will continue to vend new + events on reconnect. + + .. note:: This is only best-effort and there are no guarantees of reliability. + .. _changelog_v4.2: Version 4.2 diff --git a/docs/source/clientrpc.rst b/docs/source/clientrpc.rst index bbc62bd9a6..899d9a5b60 100644 --- a/docs/source/clientrpc.rst +++ b/docs/source/clientrpc.rst @@ -355,56 +355,45 @@ This does not expose internal information to clients, strengthening privacy and Reconnecting RPC clients ------------------------ -In the current version of Corda the RPC connection and all the observervables that are created by a client will just throw exceptions and die -when the node or TCP connection become unavailable. +In the current version of Corda, an RPC client connected to a node stops functioning when the node becomes unavailable or the associated TCP connection is interrupted. +Running RPC commands against a stopped node will just throw exceptions. Any subscriptions to ``Observable``\s that have been created before the disconnection will stop receiving events after the node restarts. +RPCs which have a side effect, such as starting flows, may or may not have executed on the node depending on when the client was disconnected. -It is the client's responsibility to handle these errors and reconnect once the node is running again. Running RPC commands against a stopped -node will just throw exceptions. Previously created Observables will not emit any events after the node restarts. The client must explicitly re-run the command and -re-subscribe to receive more events. +It is the client's responsibility to handle these errors and reconnect once the node is running again. The client will have to re-subscribe to any ``Observable``\s in order to keep receiving updates. +With regards to RPCs with side effects, the client will have to inspect the state of the node to infer whether the flow was executed or not before retrying it. -RPCs which have a side effect, such as starting flows, may have executed on the node even if the return value is not received by the client. -The only way to confirm is to perform a business-level query and retry accordingly. The sample `runFlowWithLogicalRetry` helps with this. +Clients can make use of the options described below in order to take advantage of some automatic reconnection functionality that mitigates some of these issues. -In case users require such a functionality to write a resilient RPC client we have a sample that showcases how this can be implemented and also -a thorough test that demonstrates it works as expected. +Enabling automatic reconnection +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The code that performs the reconnecting logic is: `ReconnectingCordaRPCOps.kt `_. +If you provide a list of addresses via the ``haAddressPool`` argument when instantiating a ``CordaRPCClient``, then automatic reconnection will be performed when the existing connection is dropped. +However, any in-flight calls during reconnection will fail and previously returned observables will call ``onError``. The client code is responsible for waiting for the connection to be established +in order to retry any calls, retrieve new observables and re-subscribe to them. -.. note:: This sample code is not exposed as an official Corda API, and must be included directly in the client codebase and adjusted. +Enabling graceful reconnection +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The usage is showcased in the: `RpcReconnectTests.kt `_. -In case resiliency is a requirement, then it is recommended that users will write a similar test. +A more graceful form of reconnection is also available, which will block all in-flight calls until the connection is re-established and +will also reconnect the existing ``Observable``\s, so that they keep emitting events to the existing subscribers. -How to initialize the `ReconnectingCordaRPCOps`: +.. warning:: In this approach, some events might be lost during a re-connection and not sent from the subscribed ``Observable``\s. -.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt - :language: kotlin - :start-after: DOCSTART rpcReconnectingRPC - :end-before: DOCEND rpcReconnectingRPC +You can enable this graceful form of reconnection by using the ``gracefulReconnect`` parameter in the following way: +.. sourcecode:: kotlin -How to track the vault : + val cordaClient = CordaRPCClient(nodeRpcAddress) + val cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect = true).proxy -.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt - :language: kotlin - :start-after: DOCSTART rpcReconnectingRPCVaultTracking - :end-before: DOCEND rpcReconnectingRPCVaultTracking +Logical retries for flow invocation +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +As described above, if you want to retry a flow that failed during a disconnection, you will first need to verify it has not been previously executed. +The only way currently to confirm this is by performing a business-level query. -How to start a flow with a logical retry function that checks for the side effects of the flow: - -.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt - :language: kotlin - :start-after: DOCSTART rpcReconnectingRPCFlowStarting - :end-before: DOCEND rpcReconnectingRPCFlowStarting - - -Note that, as shown by the test, during reconnecting some events might be lost. - -.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt - :language: kotlin - :start-after: DOCSTART missingVaultEvents - :end-before: DOCEND missingVaultEvents +.. note:: Future releases of Corda are expected to contain new APIs for coping with reconnection in a more resilient way providing stricter + safety guarantees. Wire security diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt index e4b4bfd6af..8bfbc676f3 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt @@ -1,7 +1,6 @@ package net.corda.node.services.rpc import net.corda.client.rpc.internal.ReconnectingCordaRPCOps -import net.corda.client.rpc.internal.asReconnecting import net.corda.core.contracts.Amount import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.concurrent.transpose @@ -10,6 +9,7 @@ import net.corda.core.node.services.Vault import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.builder +import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow @@ -20,6 +20,7 @@ import net.corda.node.services.Permissions import net.corda.testing.core.DUMMY_BANK_A_NAME import net.corda.testing.core.DUMMY_BANK_B_NAME import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.OutOfProcess import net.corda.testing.driver.driver import net.corda.testing.driver.internal.OutOfProcessImpl @@ -36,16 +37,25 @@ import kotlin.test.assertEquals import kotlin.test.assertTrue /** - * This is a slow test! + * This is a stress test for the rpc reconnection logic, which triggers failures in a probabilistic way. + * + * You can adjust the variable [NUMBER_OF_FLOWS_TO_RUN] to adjust the number of flows to run and the duration of the test. */ class RpcReconnectTests { companion object { + // 150 flows take ~5 minutes + const val NUMBER_OF_FLOWS_TO_RUN = 150 + private val log = contextLogger() } private val portAllocator = incrementalPortAllocation() + private lateinit var proxy: RandomFailingProxy + private lateinit var node: NodeHandle + private lateinit var currentAddressPair: AddressPair + /** * This test showcases and stress tests the demo [ReconnectingCordaRPCOps]. * @@ -57,17 +67,12 @@ class RpcReconnectTests { */ @Test fun `test that the RPC client is able to reconnect and proceed after node failure, restart, or connection reset`() { - val nrOfFlowsToRun = 150 // Takes around 5 minutes. val nodeRunningTime = { Random().nextInt(12000) + 8000 } val demoUser = User("demo", "demo", setOf(Permissions.all())) - val nodePort = portAllocator.nextPort() - val proxyPort = portAllocator.nextPort() - val tcpProxy = RandomFailingProxy(serverPort = proxyPort, remotePort = nodePort).start() - // When this reaches 0 - the test will end. - val flowsCountdownLatch = CountDownLatch(nrOfFlowsToRun) + val flowsCountdownLatch = CountDownLatch(NUMBER_OF_FLOWS_TO_RUN) // These are the expected progress steps for the CashIssueAndPayFlow. val expectedProgress = listOf( @@ -90,21 +95,26 @@ class RpcReconnectTests { ) driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS, startNodesInProcess = false, inMemoryDB = false)) { - fun startBankA() = startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("rpcSettings.address" to "localhost:$nodePort")) + fun startBankA(address: NetworkHostAndPort) = startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("rpcSettings.address" to address.toString())) + fun startProxy(addressPair: AddressPair) = RandomFailingProxy(serverPort = addressPair.proxyAddress.port, remotePort = addressPair.nodeAddress.port).start() - var (bankA, bankB) = listOf( - startBankA(), + val addresses = (1..3).map { getRandomAddressPair() } + currentAddressPair = addresses[0] + + proxy = startProxy(currentAddressPair) + val (bankA, bankB) = listOf( + startBankA(currentAddressPair.nodeAddress), startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)) ).transpose().getOrThrow() + node = bankA val notary = defaultNotaryIdentity val baseAmount = Amount.parseCurrency("0 USD") val issuerRef = OpaqueBytes.of(0x01) - // Create a reconnecting rpc client through the TCP proxy. - val bankAAddress = bankA.rpcAddress.copy(port = proxyPort) + val addressesForRpc = addresses.map { it.proxyAddress } // DOCSTART rpcReconnectingRPC - val bankAReconnectingRpc = ReconnectingCordaRPCOps(bankAAddress, demoUser.username, demoUser.password) + val bankAReconnectingRpc = ReconnectingCordaRPCOps(addressesForRpc, demoUser.username, demoUser.password) // DOCEND rpcReconnectingRPC // Observe the vault and collect the observations. @@ -114,7 +124,7 @@ class RpcReconnectTests { Cash.State::class.java, QueryCriteria.VaultQueryCriteria(), PageSpecification(1, 1)) - val vaultObserverHandle = vaultFeed.updates.asReconnecting().subscribe { update: Vault.Update -> + val vaultSubscription = vaultFeed.updates.subscribe { update: Vault.Update -> log.info("vault update produced ${update.produced.map { it.state.data.amount }} consumed ${update.consumed.map { it.ref }}") vaultEvents.add(update) } @@ -122,7 +132,7 @@ class RpcReconnectTests { // Observe the stateMachine and collect the observations. val stateMachineEvents = Collections.synchronizedList(mutableListOf()) - val stateMachineObserverHandle = bankAReconnectingRpc.stateMachinesFeed().updates.asReconnecting().subscribe { update -> + val stateMachineSubscription = bankAReconnectingRpc.stateMachinesFeed().updates.subscribe { update -> log.info(update.toString()) stateMachineEvents.add(update) } @@ -141,37 +151,45 @@ class RpcReconnectTests { if (flowsCountdownLatch.count == 0L) break - when (Random().nextInt().rem(6).absoluteValue) { + when (Random().nextInt().rem(7).absoluteValue) { 0 -> { log.info("Forcefully killing node and proxy.") - (bankA as OutOfProcessImpl).onStopCallback() - (bankA as OutOfProcess).process.destroyForcibly() - tcpProxy.stop() - bankA = startBankA().get() - tcpProxy.start() + (node as OutOfProcessImpl).onStopCallback() + (node as OutOfProcess).process.destroyForcibly() + proxy.stop() + node = startBankA(currentAddressPair.nodeAddress).get() + proxy.start() } 1 -> { log.info("Forcefully killing node.") - (bankA as OutOfProcessImpl).onStopCallback() - (bankA as OutOfProcess).process.destroyForcibly() - bankA = startBankA().get() + (node as OutOfProcessImpl).onStopCallback() + (node as OutOfProcess).process.destroyForcibly() + node = startBankA(currentAddressPair.nodeAddress).get() } 2 -> { log.info("Shutting down node.") - bankA.stop() - tcpProxy.stop() - bankA = startBankA().get() - tcpProxy.start() + node.stop() + proxy.stop() + node = startBankA(currentAddressPair.nodeAddress).get() + proxy.start() } 3, 4 -> { log.info("Killing proxy.") - tcpProxy.stop() + proxy.stop() Thread.sleep(Random().nextInt(5000).toLong()) - tcpProxy.start() + proxy.start() } 5 -> { log.info("Dropping connection.") - tcpProxy.failConnection() + proxy.failConnection() + } + 6 -> { + log.info("Performing failover to a different node") + node.stop() + proxy.stop() + currentAddressPair = addresses[Random().nextInt(addresses.size)] + node = startBankA(currentAddressPair.nodeAddress).get() + proxy = startProxy(currentAddressPair) } } nrRestarts.incrementAndGet() @@ -180,7 +198,7 @@ class RpcReconnectTests { // Start nrOfFlowsToRun and provide a logical retry function that checks the vault. val flowProgressEvents = mutableMapOf>() - for (amount in (1..nrOfFlowsToRun)) { + for (amount in (1..NUMBER_OF_FLOWS_TO_RUN)) { // DOCSTART rpcReconnectingRPCFlowStarting bankAReconnectingRpc.runFlowWithLogicalRetry( runFlow = { rpc -> @@ -248,16 +266,16 @@ class RpcReconnectTests { var nrRetries = 0 // It might be necessary to wait more for all events to arrive when the node is slow. - while (allCashStates.size < nrOfFlowsToRun && nrRetries++ < 50) { + while (allCashStates.size < NUMBER_OF_FLOWS_TO_RUN && nrRetries++ < 50) { Thread.sleep(2000) allCashStates = readCashStates() } val allCash = allCashStates.map { it.state.data.amount.quantity }.toSet() - val missingCash = (1..nrOfFlowsToRun).filterNot { allCash.contains(it.toLong() * 100) } + val missingCash = (1..NUMBER_OF_FLOWS_TO_RUN).filterNot { allCash.contains(it.toLong() * 100) } log.info("MISSING: $missingCash") - assertEquals(nrOfFlowsToRun, allCashStates.size, "Not all flows were executed successfully") + assertEquals(NUMBER_OF_FLOWS_TO_RUN, allCashStates.size, "Not all flows were executed successfully") // The progress status for each flow can only miss the last events, because the node might have been killed. val missingProgressEvents = flowProgressEvents.filterValues { expectedProgress.subList(0, it.size) != it } @@ -267,7 +285,7 @@ class RpcReconnectTests { // Check that enough vault events were received. // This check is fuzzy because events can go missing during node restarts. // Ideally there should be nrOfFlowsToRun events receive but some might get lost for each restart. - assertTrue(vaultEvents!!.size + nrFailures * 3 >= nrOfFlowsToRun, "Not all vault events were received") + assertTrue(vaultEvents!!.size + nrFailures * 3 >= NUMBER_OF_FLOWS_TO_RUN, "Not all vault events were received") // DOCEND missingVaultEvents // Check that no flow was triggered twice. @@ -276,21 +294,26 @@ class RpcReconnectTests { log.info("SM EVENTS: ${stateMachineEvents!!.size}") // State machine events are very likely to get lost more often because they seem to be sent with a delay. - assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Added } > nrOfFlowsToRun / 3, "Too many Added state machine events lost.") - assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Removed } > nrOfFlowsToRun / 3, "Too many Removed state machine events lost.") + assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Added } > NUMBER_OF_FLOWS_TO_RUN / 3, "Too many Added state machine events lost.") + assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Removed } > NUMBER_OF_FLOWS_TO_RUN / 3, "Too many Removed state machine events lost.") // Stop the observers. - vaultObserverHandle.stop() - stateMachineObserverHandle.stop() + vaultSubscription.unsubscribe() + stateMachineSubscription.unsubscribe() bankAReconnectingRpc.close() } - tcpProxy.close() + proxy.close() } @Synchronized fun MutableMap>.addEvent(id: StateMachineRunId, progress: String?): Boolean { return getOrPut(id) { mutableListOf() }.let { if (progress != null) it.add(progress) else false } } + + private fun getRandomAddressPair() = AddressPair(getRandomAddress(), getRandomAddress()) + private fun getRandomAddress() = NetworkHostAndPort("localhost", portAllocator.nextPort()) + + data class AddressPair(val proxyAddress: NetworkHostAndPort, val nodeAddress: NetworkHostAndPort) } diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt index 769a19ade2..ae9297cc10 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt @@ -13,8 +13,6 @@ import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.PermissionException import net.corda.client.rpc.internal.ReconnectingCordaRPCOps -import net.corda.client.rpc.internal.ReconnectingObservable -import net.corda.client.rpc.internal.asReconnectingWithInitialValues import net.corda.core.CordaException import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.UniqueIdentifier @@ -463,11 +461,7 @@ object InteractiveShell { val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed() val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) } val subscriber = FlowWatchPrintingSubscriber(out) - if (stateMachineUpdates is ReconnectingObservable<*>) { - stateMachineUpdates.asReconnectingWithInitialValues(currentStateMachines).subscribe(subscriber::onNext) - } else { - stateMachineUpdates.startWith(currentStateMachines).subscribe(subscriber) - } + stateMachineUpdates.startWith(currentStateMachines).subscribe(subscriber) var result: Any? = subscriber.future if (result is Future<*>) { if (!result.isDone) {