From 6771386b4befcb9142a30ba7c037ff1602477e65 Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Fri, 12 Apr 2019 14:03:38 +0100 Subject: [PATCH] CORDA-2743 - utilities and test to show rpc operations that support disconnects (#5009) --- .../rpc/internal/ReconnectingCordaRPCOps.kt | 402 ++++++++++++++++++ docs/source/clientrpc.rst | 71 ++-- .../node/services/rpc/RandomFailingProxy.kt | 101 +++++ .../node/services/rpc/RpcReconnectTests.kt | 285 +++++++++++++ .../testing/driver/internal/DriverInternal.kt | 2 +- 5 files changed, 835 insertions(+), 26 deletions(-) create mode 100644 client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/services/rpc/RandomFailingProxy.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt new file mode 100644 index 0000000000..563b9130d4 --- /dev/null +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -0,0 +1,402 @@ +package net.corda.client.rpc.internal + +import net.corda.client.rpc.* +import net.corda.core.flows.StateMachineRunId +import net.corda.core.internal.div +import net.corda.core.internal.times +import net.corda.core.internal.uncheckedCast +import net.corda.core.messaging.ClientRpcSslOptions +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.DataFeed +import net.corda.core.messaging.FlowHandle +import net.corda.core.utilities.* +import net.corda.nodeapi.exceptions.RejectedCommandException +import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException +import org.apache.activemq.artemis.api.core.ActiveMQSecurityException +import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException +import rx.Observable +import java.lang.reflect.InvocationHandler +import java.lang.reflect.InvocationTargetException +import java.lang.reflect.Method +import java.lang.reflect.Proxy +import java.time.Duration +import java.util.* +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +/** + * Wrapper over [CordaRPCOps] that handles exceptions when the node or the connection to the node fail. + * + * All operations are retried on failure, except flow start operations that die before receiving a valid [FlowHandle], in which case a [CouldNotStartFlowException] is thrown. + * + * When calling methods that return a [DataFeed] like [CordaRPCOps.vaultTrackBy], the returned [DataFeed.updates] object will no longer + * be a usable [rx.Observable] but an instance of [ReconnectingObservable]. + * The caller has to explicitly cast to [ReconnectingObservable] and call [ReconnectingObservable.subscribe]. If used as an [rx.Observable] it will just fail. + * The returned [DataFeed.snapshot] is the snapshot as it was when the feed was first retrieved. + * + * Note: There is no guarantee that observations will not be lost. + * + * *This class is not a stable API. Any project that wants to use it, must copy and paste it.* + */ +class ReconnectingCordaRPCOps private constructor( + private val reconnectingRPCConnection: ReconnectingRPCConnection, + private val observersPool: ExecutorService, + private val userPool: Boolean +) : AutoCloseable, CordaRPCOps by proxy(reconnectingRPCConnection, observersPool) { + + // Constructors that mirror CordaRPCClient. + constructor( + nodeHostAndPort: NetworkHostAndPort, + username: String, + password: String, + sslConfiguration: ClientRpcSslOptions? = null, + classLoader: ClassLoader? = null, + observersPool: ExecutorService? = null + ) : this( + ReconnectingRPCConnection(listOf(nodeHostAndPort), username, password, sslConfiguration, classLoader), + observersPool ?: Executors.newCachedThreadPool(), + observersPool != null) + + constructor( + nodeHostAndPorts: List, + username: String, + password: String, + sslConfiguration: ClientRpcSslOptions? = null, + classLoader: ClassLoader? = null, + observersPool: ExecutorService? = null + ) : this( + ReconnectingRPCConnection(nodeHostAndPorts, username, password, sslConfiguration, classLoader), + observersPool ?: Executors.newCachedThreadPool(), + observersPool != null) + + private companion object { + private val log = contextLogger() + private fun proxy(reconnectingRPCConnection: ReconnectingRPCConnection, observersPool: ExecutorService): CordaRPCOps { + return Proxy.newProxyInstance( + this::class.java.classLoader, + arrayOf(CordaRPCOps::class.java), + ErrorInterceptingHandler(reconnectingRPCConnection, observersPool)) as CordaRPCOps + } + } + + private val retryFlowsPool = Executors.newScheduledThreadPool(1) + + /** + * This function runs a flow and retries until it completes successfully. + * + * [runFlow] is a function that starts a flow. + * [hasFlowStarted] is a function that checks if the flow has actually completed by checking some side-effect, for example the vault. + * [onFlowConfirmed] Callback when the flow is confirmed. + * [timeout] Indicative timeout to wait until the flow would create the side-effect. Should be increased if the flow is slow. Note that + * this timeout is calculated after the rpc client has reconnected to the node. + * + * Note that this method does not guarantee 100% that the flow will not be started twice. + */ + fun runFlowWithLogicalRetry(runFlow: (CordaRPCOps) -> StateMachineRunId, hasFlowStarted: (CordaRPCOps) -> Boolean, onFlowConfirmed: () -> Unit = {}, timeout: Duration = 4.seconds) { + try { + runFlow(this) + onFlowConfirmed() + } catch (e: CouldNotStartFlowException) { + log.error("Couldn't start flow: ${e.message}") + retryFlowsPool.schedule( + { + if (!hasFlowStarted(this)) { + runFlowWithLogicalRetry(runFlow, hasFlowStarted, onFlowConfirmed, timeout) + } else { + onFlowConfirmed() + } + }, + timeout.seconds, TimeUnit.SECONDS + ) + } + } + + /** + * This function is similar to [runFlowWithLogicalRetry] but is blocking and it returns the result of the flow. + * + * [runFlow] - starts a flow and returns the [FlowHandle]. + * [hasFlowCompleted] - Runs a vault query and is able to recreate the result of the flow. + */ + fun runFlowAndReturnResultWithLogicalRetry(runFlow: (CordaRPCOps) -> FlowHandle, hasFlowCompleted: (CordaRPCOps) -> T?, timeout: Duration = 4.seconds): T { + return try { + runFlow(this).returnValue.get() + } catch (e: CouldNotStartFlowException) { + log.error("Couldn't start flow: ${e.message}") + Thread.sleep(timeout.toMillis()) + hasFlowCompleted(this) ?: runFlowAndReturnResultWithLogicalRetry(runFlow, hasFlowCompleted, timeout) + } + } + + /** + * Helper class useful for reconnecting to a Node. + */ + internal data class ReconnectingRPCConnection( + val nodeHostAndPorts: List, + val username: String, + val password: String, + val sslConfiguration: ClientRpcSslOptions? = null, + val classLoader: ClassLoader? + ) : RPCConnection { + private var currentRPCConnection: CordaRPCConnection? = null + + init { + connect() + } + + enum class CurrentState { + UNCONNECTED, CONNECTED, CONNECTING, CLOSED, DIED + } + + private var currentState = CurrentState.UNCONNECTED + + private val current: CordaRPCConnection + @Synchronized get() = when (currentState) { + CurrentState.CONNECTED -> currentRPCConnection!! + CurrentState.UNCONNECTED, CurrentState.CLOSED -> { + connect() + currentRPCConnection!! + } + CurrentState.CONNECTING, CurrentState.DIED -> throw IllegalArgumentException("Illegal state") + } + + /** + * Called on external error. + * Will block until the connection is established again. + */ + @Synchronized + fun error(e: Throwable) { + currentState = CurrentState.DIED + //TODO - handle error cases + log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}") + connect() + } + + private fun connect() { + currentState = CurrentState.CONNECTING + currentRPCConnection = establishConnectionWithRetry() + currentState = CurrentState.CONNECTED + } + + private tailrec fun establishConnectionWithRetry(retryInterval: Duration = 1.seconds, nrRetries: Int = 0): CordaRPCConnection { + log.info("Connecting to: $nodeHostAndPorts") + try { + return CordaRPCClient( + nodeHostAndPorts, CordaRPCClientConfiguration(connectionMaxRetryInterval = retryInterval), sslConfiguration, classLoader + ).start(username, password).also { + // Check connection is truly operational before returning it. + require(it.proxy.nodeInfo().legalIdentitiesAndCerts.isNotEmpty()) { + "Could not establish connection to ${nodeHostAndPorts}." + } + log.debug { "Connection successfully established with: ${nodeHostAndPorts}" } + } + } catch (ex: Exception) { + when (ex) { + is ActiveMQSecurityException -> { + // Happens when incorrect credentials provided. + // It can happen at startup as well when the credentials are correct. + if (nrRetries > 1) throw ex + } + is RPCException -> { + // Deliberately not logging full stack trace as it will be full of internal stacktraces. + log.debug { "Exception upon establishing connection: ${ex.message}" } + } + is ActiveMQConnectionTimedOutException -> { + // Deliberately not logging full stack trace as it will be full of internal stacktraces. + log.debug { "Exception upon establishing connection: ${ex.message}" } + } + is ActiveMQUnBlockedException -> { + // Deliberately not logging full stack trace as it will be full of internal stacktraces. + log.debug { "Exception upon establishing connection: ${ex.message}" } + } + else -> { + log.debug("Unknown exception upon establishing connection.", ex) + } + } + } + + // Could not connect this time round - pause before giving another try. + Thread.sleep(retryInterval.toMillis()) + return establishConnectionWithRetry((retryInterval * 3) / 2, nrRetries + 1) + } + + override val proxy: CordaRPCOps + get() = current.proxy + + override val serverProtocolVersion + get() = current.serverProtocolVersion + + @Synchronized + override fun notifyServerAndClose() { + currentState = CurrentState.CLOSED + currentRPCConnection?.notifyServerAndClose() + } + + @Synchronized + override fun forceClose() { + currentState = CurrentState.CLOSED + currentRPCConnection?.forceClose() + } + + @Synchronized + override fun close() { + currentState = CurrentState.CLOSED + currentRPCConnection?.close() + } + } + + internal class ReconnectingObservableImpl internal constructor( + val reconnectingRPCConnection: ReconnectingRPCConnection, + val observersPool: ExecutorService, + val initial: DataFeed<*, T>, + val createDataFeed: () -> DataFeed<*, T> + ) : Observable(null), ReconnectingObservable { + + private var initialStartWith: Iterable? = null + private fun _subscribeWithReconnect(observerHandle: ObserverHandle, onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit, startWithValues: Iterable? = null) { + var subscriptionError: Throwable? + try { + val subscription = initial.updates.let { if (startWithValues != null) it.startWith(startWithValues) else it } + .subscribe(onNext, observerHandle::fail, observerHandle::stop) + subscriptionError = observerHandle.await() + subscription.unsubscribe() + } catch (e: Exception) { + log.error("Failed to register subscriber .", e) + subscriptionError = e + } + + // In case there was no exception the observer has finished gracefully. + if (subscriptionError == null) { + onStop() + return + } + + onDisconnect() + // Only continue if the subscription failed. + reconnectingRPCConnection.error(subscriptionError) + log.debug { "Recreating data feed." } + + val newObservable = createDataFeed().updates as ReconnectingObservableImpl + onReconnect() + return newObservable._subscribeWithReconnect(observerHandle, onNext, onStop, onDisconnect, onReconnect) + } + + override fun subscribe(onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit): ObserverHandle { + val observerNotifier = ObserverHandle() + // TODO - change the establish connection method to be non-blocking + observersPool.execute { + _subscribeWithReconnect(observerNotifier, onNext, onStop, onDisconnect, onReconnect, initialStartWith) + } + return observerNotifier + } + + override fun startWithValues(values: Iterable): ReconnectingObservable { + initialStartWith = values + return this + } + } + + private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection, val observersPool: ExecutorService) : InvocationHandler { + private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow") + + override fun invoke(proxy: Any, method: Method, args: Array?): Any? { + val result: Any? = try { + log.debug { "Invoking RPC $method..." } + method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also { + log.debug { "RPC $method invoked successfully." } + } + } catch (e: InvocationTargetException) { + fun retry() = if (method.isStartFlow()) { + // Don't retry flows + throw CouldNotStartFlowException(e.targetException) + } else { + this.invoke(proxy, method, args) + } + + when (e.targetException) { + is RejectedCommandException -> { + log.error("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e) + reconnectingRPCConnection.error(e) + this.invoke(proxy, method, args) + } + is ConnectionFailureException -> { + log.error("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e) + reconnectingRPCConnection.error(e) + retry() + } + is RPCException -> { + log.error("Failed to perform operation ${method.name}. RPCException. Retrying....", e) + reconnectingRPCConnection.error(e) + Thread.sleep(1000) // TODO - explain why this sleep is necessary + retry() + } + else -> { + log.error("Failed to perform operation ${method.name}. Unknown error. Retrying....", e) + reconnectingRPCConnection.error(e) + retry() + } + } + } + + return when (method.returnType) { + DataFeed::class.java -> { + // Intercept the data feed methods and returned a ReconnectingObservable instance + val initialFeed: DataFeed = uncheckedCast(result) + val observable = ReconnectingObservableImpl(reconnectingRPCConnection, observersPool, initialFeed) { + // This handles reconnecting and creates new feeds. + uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args)) + } + initialFeed.copy(updates = observable) + } + // TODO - add handlers for Observable return types. + else -> result + } + } + } + + override fun close() { + if (!userPool) observersPool.shutdown() + retryFlowsPool.shutdown() + reconnectingRPCConnection.forceClose() + } +} + +/** + * Returned as the `updates` field when calling methods that return a [DataFeed] on the [ReconnectingCordaRPCOps]. + * + * TODO - provide a logical function to know how to retrieve missing events that happened during disconnects. + */ +interface ReconnectingObservable { + fun subscribe(onNext: (T) -> Unit): ObserverHandle = subscribe(onNext, {}, {}, {}) + fun subscribe(onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit): ObserverHandle + fun startWithValues(values: Iterable): ReconnectingObservable +} + +/** + * Utility to externally control a subscribed observer. + */ +class ObserverHandle { + private val terminated = LinkedBlockingQueue>(1) + + fun stop() = terminated.put(Optional.empty()) + internal fun fail(e: Throwable) = terminated.put(Optional.of(e)) + + /** + * Returns null if the observation ended successfully. + */ + internal fun await(duration: Duration = 60.minutes): Throwable? = terminated.poll(duration.seconds, TimeUnit.SECONDS).orElse(null) +} + +/** + * Thrown when a flow start command died before receiving a [net.corda.core.messaging.FlowHandle]. + * On catching this exception, the typical behaviour is to run a "logical retry", meaning only retry the flow if the expected outcome did not occur. + */ +class CouldNotStartFlowException(cause: Throwable? = null) : RPCException("Could not start flow as connection failed", cause) + +/** + * Mainly for Kotlin users. + */ +fun Observable.asReconnecting(): ReconnectingObservable = uncheckedCast(this) + +fun Observable.asReconnectingWithInitialValues(values: Iterable): ReconnectingObservable = asReconnecting().startWithValues(values) diff --git a/docs/source/clientrpc.rst b/docs/source/clientrpc.rst index 670768c50a..144fa54ebc 100644 --- a/docs/source/clientrpc.rst +++ b/docs/source/clientrpc.rst @@ -352,39 +352,60 @@ When not in ``devMode``, the server will mask exceptions not meant for clients a This does not expose internal information to clients, strengthening privacy and security. CorDapps can have exceptions implement ``ClientRelevantError`` to allow them to reach RPC clients. -Connection management ---------------------- -It is possible to not be able to connect to the server on the first attempt. In that case, the ``CordaRPCClient.start()`` -method will throw an exception. The following code snippet is an example of how to write a simple retry mechanism for -such situations: +Reconnecting RPC clients +------------------------ -.. literalinclude:: ../../samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt +In the current version of Corda the RPC connection and all the observervables that are created by a client will just throw exceptions and die +when the node or TCP connection become unavailable. + +It is the client's responsibility to handle these errors and reconnect once the node is running again. Running RPC commands against a stopped +node will just throw exceptions. Previously created Observables will not emit any events after the node restarts. The client must explicitly re-run the command and +re-subscribe to receive more events. + +RPCs which have a side effect, such as starting flows, may have executed on the node even if the return value is not received by the client. +The only way to confirm is to perform a business-level query and retry accordingly. The sample `runFlowWithLogicalRetry` helps with this. + +In case users require such a functionality to write a resilient RPC client we have a sample that showcases how this can be implemented and also +a thorough test that demonstrates it works as expected. + +The code that performs the reconnecting logic is: `ReconnectingCordaRPCOps.kt `_. + +.. note:: This sample code is not exposed as an official Corda API, and must be included directly in the client codebase and adjusted. + +The usage is showcased in the: `RpcReconnectTests.kt `_. +In case resiliency is a requirement, then it is recommended that users will write a similar test. + +How to initialize the `ReconnectingCordaRPCOps`: + +.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt :language: kotlin - :start-after: DOCSTART rpcClientConnectionWithRetry - :end-before: DOCEND rpcClientConnectionWithRetry + :start-after: DOCSTART rpcReconnectingRPC + :end-before: DOCEND rpcReconnectingRPC -.. warning:: The list of ``NetworkHostAndPort`` passed to this function should represent one or more addresses reflecting the number of - instances of a node configured to service the client RPC request. See ``haAddressPool`` in `CordaRPCClient`_ for further information on - using an RPC Client for load balancing and failover. -After a successful connection, it is possible for the server to become unavailable. In this case, all RPC calls will throw -an exception and created observables will no longer receive observations. Below is an example of how to reconnect and -back-fill any data that might have been missed while the connection was down. This is done by using the ``onError`` handler -on the ``Observable`` returned by ``CordaRPCOps``. +How to track the vault : -.. literalinclude:: ../../samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt +.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt :language: kotlin - :start-after: DOCSTART rpcClientConnectionRecovery - :end-before: DOCEND rpcClientConnectionRecovery + :start-after: DOCSTART rpcReconnectingRPCVaultTracking + :end-before: DOCEND rpcReconnectingRPCVaultTracking -In this code snippet it is possible to see that the function ``performRpcReconnect`` creates an RPC connection and implements -the error handler upon subscription to an ``Observable``. The call to this ``onError`` handler will be triggered upon failover, at which -point the client will terminate its existing subscription, close its RPC connection and recursively call ``performRpcReconnect``, -which will re-subscribe once the RPC connection is re-established. -Within the body of the ``subscribe`` function itself, the client code receives instances of ``StateMachineInfo``. Upon re-connecting, this code receives -*all* the instances of ``StateMachineInfo``, some of which may already been delivered to the client code prior to previous disconnect. -It is the responsibility of the client code to handle potential duplicated instances of ``StateMachineInfo`` as appropriate. +How to start a flow with a logical retry function that checks for the side effects of the flow: + +.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt + :language: kotlin + :start-after: DOCSTART rpcReconnectingRPCFlowStarting + :end-before: DOCEND rpcReconnectingRPCFlowStarting + + +Note that, as shown by the test, during reconnecting some events might be lost. + +.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt + :language: kotlin + :start-after: DOCSTART missingVaultEvents + :end-before: DOCEND missingVaultEvents + Wire security ------------- diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RandomFailingProxy.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RandomFailingProxy.kt new file mode 100644 index 0000000000..4a9a276ea0 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RandomFailingProxy.kt @@ -0,0 +1,101 @@ +package net.corda.node.services.rpc + +import java.io.IOException +import java.io.InputStream +import java.io.OutputStream +import java.net.ServerSocket +import java.net.Socket +import java.net.SocketException +import java.util.* +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicBoolean + +/** + * Simple proxy that can be restarted and introduces random latencies. + * + * Also acts as a mock load balancer. + */ +class RandomFailingProxy(val serverPort: Int, val remotePort: Int) : AutoCloseable { + private val threadPool = Executors.newCachedThreadPool() + private val stopCopy = AtomicBoolean(false) + private var currentServerSocket: ServerSocket? = null + private val rnd = ThreadLocal.withInitial { Random() } + + fun start(): RandomFailingProxy { + stopCopy.set(false) + currentServerSocket = ServerSocket(serverPort) + threadPool.execute { + try { + currentServerSocket.use { serverSocket -> + while (!stopCopy.get() && !serverSocket!!.isClosed) { + handleConnection(serverSocket.accept()) + } + } + } catch (e: SocketException) { + // The Server socket could be closed + } + } + return this + } + + private fun handleConnection(socket: Socket) { + threadPool.execute { + socket.use { _ -> + try { + Socket("localhost", remotePort).use { target -> + // send message to node + threadPool.execute { + try { + socket.getInputStream().flakeyCopyTo(target.getOutputStream()) + } catch (e: IOException) { + // Thrown when the connection to the target server dies. + } + } + target.getInputStream().flakeyCopyTo(socket.getOutputStream()) + } + } catch (e: IOException) { + // Thrown when the connection to the target server dies. + } + } + } + } + + fun stop(): RandomFailingProxy { + stopCopy.set(true) + currentServerSocket?.close() + return this + } + + private val failOneConnection = AtomicBoolean(false) + fun failConnection() { + failOneConnection.set(true) + } + + override fun close() { + try { + stop() + threadPool.shutdownNow() + } catch (e: Exception) { + // Nothing can be done. + } + } + + private fun InputStream.flakeyCopyTo(out: OutputStream, bufferSize: Int = DEFAULT_BUFFER_SIZE): Long { + var bytesCopied: Long = 0 + val buffer = ByteArray(bufferSize) + var bytes = read(buffer) + while (bytes >= 0 && !stopCopy.get()) { + // Introduce intermittent slowness. + if (rnd.get().nextInt().rem(700) == 0) { + Thread.sleep(rnd.get().nextInt(2000).toLong()) + } + if (failOneConnection.compareAndSet(true, false)) { + throw IOException("Randomly dropped one connection") + } + out.write(buffer, 0, bytes) + bytesCopied += bytes + bytes = read(buffer) + } + return bytesCopied + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt new file mode 100644 index 0000000000..41d09e9c7d --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt @@ -0,0 +1,285 @@ +package net.corda.node.services.rpc + +import net.corda.client.rpc.internal.ReconnectingCordaRPCOps +import net.corda.client.rpc.internal.asReconnecting +import net.corda.core.contracts.Amount +import net.corda.core.flows.StateMachineRunId +import net.corda.core.internal.concurrent.transpose +import net.corda.core.messaging.StateMachineUpdate +import net.corda.core.node.services.Vault +import net.corda.core.node.services.vault.PageSpecification +import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.node.services.vault.builder +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.getOrThrow +import net.corda.finance.contracts.asset.Cash +import net.corda.finance.flows.CashIssueAndPaymentFlow +import net.corda.finance.schemas.CashSchemaV1 +import net.corda.node.services.Permissions +import net.corda.testing.core.DUMMY_BANK_A_NAME +import net.corda.testing.core.DUMMY_BANK_B_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.OutOfProcess +import net.corda.testing.driver.driver +import net.corda.testing.driver.internal.OutOfProcessImpl +import net.corda.testing.node.User +import net.corda.testing.node.internal.FINANCE_CORDAPPS +import org.junit.Test +import java.util.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicInteger +import kotlin.concurrent.thread +import kotlin.math.absoluteValue +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +/** + * This is a slow test! + */ +class RpcReconnectTests { + + companion object { + private val log = contextLogger() + } + + /** + * This test showcases and stress tests the demo [ReconnectingCordaRPCOps]. + * + * Note that during node failure events can be lost and starting flows can become unreliable. + * The only available way to retry failed flows is to attempt a "logical retry" which is also showcased. + * + * This test runs flows in a loop and in the background kills the node or restarts it. + * Also the RPC connection is made through a proxy that introduces random latencies and is also periodically killed. + */ + @Test + fun `test that the RPC client is able to reconnect and proceed after node failure, restart, or connection reset`() { + val nrOfFlowsToRun = 450 // Takes around 5 minutes. + val nodeRunningTime = { Random().nextInt(12000) + 8000 } + + val demoUser = User("demo", "demo", setOf(Permissions.all())) + + val nodePort = 20006 + val proxyPort = 20007 + val tcpProxy = RandomFailingProxy(serverPort = proxyPort, remotePort = nodePort).start() + + // When this reaches 0 - the test will end. + val flowsCountdownLatch = CountDownLatch(nrOfFlowsToRun) + + // These are the expected progress steps for the CashIssueAndPayFlow. + val expectedProgress = listOf( + "Starting", + "Issuing cash", + "Generating transaction", + "Signing transaction", + "Finalising transaction", + "Broadcasting transaction to participants", + "Paying recipient", + "Generating anonymous identities", + "Generating transaction", + "Signing transaction", + "Finalising transaction", + "Requesting signature by notary service", + "Requesting signature by Notary service", + "Validating response from Notary service", + "Broadcasting transaction to participants", + "Done" + ) + + driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS, startNodesInProcess = false, inMemoryDB = false)) { + fun startBankA() = startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("rpcSettings.address" to "localhost:$nodePort")) + + var (bankA, bankB) = listOf( + startBankA(), + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)) + ).transpose().getOrThrow() + + val notary = defaultNotaryIdentity + val baseAmount = Amount.parseCurrency("0 USD") + val issuerRef = OpaqueBytes.of(0x01) + + // Create a reconnecting rpc client through the TCP proxy. + val bankAAddress = bankA.rpcAddress.copy(port = proxyPort) + // DOCSTART rpcReconnectingRPC + val bankAReconnectingRpc = ReconnectingCordaRPCOps(bankAAddress, demoUser.username, demoUser.password) + // DOCEND rpcReconnectingRPC + + // Observe the vault and collect the observations. + val vaultEvents = Collections.synchronizedList(mutableListOf>()) + // DOCSTART rpcReconnectingRPCVaultTracking + val vaultFeed = bankAReconnectingRpc.vaultTrackByWithPagingSpec( + Cash.State::class.java, + QueryCriteria.VaultQueryCriteria(), + PageSpecification(1, 1)) + val vaultObserverHandle = vaultFeed.updates.asReconnecting().subscribe { update: Vault.Update -> + log.info("vault update produced ${update.produced.map { it.state.data.amount }} consumed ${update.consumed.map { it.ref }}") + vaultEvents.add(update) + } + // DOCEND rpcReconnectingRPCVaultTracking + + // Observe the stateMachine and collect the observations. + val stateMachineEvents = Collections.synchronizedList(mutableListOf()) + val stateMachineObserverHandle = bankAReconnectingRpc.stateMachinesFeed().updates.asReconnecting().subscribe { update -> + log.info(update.toString()) + stateMachineEvents.add(update) + } + + // While the flows are running, randomly apply a different failure scenario. + val nrRestarts = AtomicInteger() + thread(name = "Node killer") { + while (true) { + if (flowsCountdownLatch.count == 0L) break + + // Let the node run for a random time interval. + nodeRunningTime().also { ms -> + log.info("Running node for ${ms / 1000} s.") + Thread.sleep(ms.toLong()) + } + + if (flowsCountdownLatch.count == 0L) break + + when (Random().nextInt().rem(6).absoluteValue) { + 0 -> { + log.info("Forcefully killing node and proxy.") + (bankA as OutOfProcessImpl).onStopCallback() + (bankA as OutOfProcess).process.destroyForcibly() + tcpProxy.stop() + bankA = startBankA().get() + tcpProxy.start() + } + 1 -> { + log.info("Forcefully killing node.") + (bankA as OutOfProcessImpl).onStopCallback() + (bankA as OutOfProcess).process.destroyForcibly() + bankA = startBankA().get() + } + 2 -> { + log.info("Shutting down node.") + bankA.stop() + tcpProxy.stop() + bankA = startBankA().get() + tcpProxy.start() + } + 3, 4 -> { + log.info("Killing proxy.") + tcpProxy.stop() + Thread.sleep(Random().nextInt(5000).toLong()) + tcpProxy.start() + } + 5 -> { + log.info("Dropping connection.") + tcpProxy.failConnection() + } + } + nrRestarts.incrementAndGet() + } + } + + // Start nrOfFlowsToRun and provide a logical retry function that checks the vault. + val flowProgressEvents = mutableMapOf>() + for (amount in (1..nrOfFlowsToRun)) { + // DOCSTART rpcReconnectingRPCFlowStarting + bankAReconnectingRpc.runFlowWithLogicalRetry( + runFlow = { rpc -> + log.info("Starting CashIssueAndPaymentFlow for $amount") + val flowHandle = rpc.startTrackedFlowDynamic( + CashIssueAndPaymentFlow::class.java, + baseAmount.plus(Amount.parseCurrency("$amount USD")), + issuerRef, + bankB.nodeInfo.legalIdentities.first(), + false, + notary + ) + val flowId = flowHandle.id + log.info("Started flow $amount with flowId: $flowId") + flowProgressEvents.addEvent(flowId, null) + + // No reconnecting possible. + flowHandle.progress.subscribe( + { prog -> + flowProgressEvents.addEvent(flowId, prog) + log.info("Progress $flowId : $prog") + }, + { error -> + log.error("Error thrown in the flow progress observer", error) + }) + flowHandle.id + }, + hasFlowStarted = { rpc -> + // Query for a state that is the result of this flow. + val criteria = QueryCriteria.VaultCustomQueryCriteria(builder { CashSchemaV1.PersistentCashState::pennies.equal(amount.toLong() * 100) }, status = Vault.StateStatus.ALL) + val results = rpc.vaultQueryByCriteria(criteria, Cash.State::class.java) + log.info("$amount - Found states ${results.states}") + // The flow has completed if a state is found + results.states.isNotEmpty() + }, + onFlowConfirmed = { + flowsCountdownLatch.countDown() + log.info("Flow started for $amount. Remaining flows: ${flowsCountdownLatch.count}") + } + ) + // DOCEND rpcReconnectingRPCFlowStarting + + Thread.sleep(Random().nextInt(250).toLong()) + } + + log.info("Started all flows") + + // Wait until all flows have been started. + flowsCountdownLatch.await() + + log.info("Confirmed all flows.") + + // Wait for all events to come in and flows to finish. + Thread.sleep(4000) + + val nrFailures = nrRestarts.get() + log.info("Checking results after $nrFailures restarts.") + + // The progress status for each flow can only miss the last events, because the node might have been killed. + val missingProgressEvents = flowProgressEvents.filterValues { expectedProgress.subList(0, it.size) != it } + assertTrue(missingProgressEvents.isEmpty(), "The flow progress tracker is missing events: $missingProgressEvents") + + // DOCSTART missingVaultEvents + // Check that enough vault events were received. + // This check is fuzzy because events can go missing during node restarts. + // Ideally there should be nrOfFlowsToRun events receive but some might get lost for each restart. + assertTrue(vaultEvents!!.size + nrFailures * 2 >= nrOfFlowsToRun, "Not all vault events were received") + // DOCEND missingVaultEvents + + // Query the vault and check that states were created for all flows. + val allCashStates = bankAReconnectingRpc + .vaultQueryByWithPagingSpec(Cash.State::class.java, QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.CONSUMED), PageSpecification(1, 10000)) + .states + + val allCash = allCashStates.map { it.state.data.amount.quantity }.toSet() + val missingCash = (1..nrOfFlowsToRun).filterNot { allCash.contains(it.toLong() * 100) } + log.info("MISSING: $missingCash") + + assertEquals(nrOfFlowsToRun, allCashStates.size, "Not all flows were executed successfully") + + // Check that no flow was triggered twice. + val duplicates = allCashStates.groupBy { it.state.data.amount }.filterValues { it.size > 1 } + assertTrue(duplicates.isEmpty(), "${duplicates.size} flows were retried illegally.") + + log.info("SM EVENTS: ${stateMachineEvents!!.size}") + // State machine events are very likely to get lost more often because they seem to be sent with a delay. + assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Added } > nrOfFlowsToRun / 2, "Too many Added state machine events lost.") + assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Removed } > nrOfFlowsToRun / 2, "Too many Removed state machine events lost.") + + // Stop the observers. + vaultObserverHandle.stop() + stateMachineObserverHandle.stop() + + bankAReconnectingRpc.close() + } + + tcpProxy.close() + } + + @Synchronized + fun MutableMap>.addEvent(id: StateMachineRunId, progress: String?): Boolean { + return getOrPut(id) { mutableListOf() }.let { if (progress != null) it.add(progress) else false } + } +} + diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/internal/DriverInternal.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/internal/DriverInternal.kt index fb4b37f6f9..c761b96498 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/internal/DriverInternal.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/internal/DriverInternal.kt @@ -35,7 +35,7 @@ data class OutOfProcessImpl( override val useHTTPS: Boolean, val debugPort: Int?, override val process: Process, - private val onStopCallback: () -> Unit + val onStopCallback: () -> Unit ) : OutOfProcess, NodeHandleInternal { override val rpcUsers: List = configuration.rpcUsers.map { User(it.username, it.password, it.permissions) } override fun stop() {