mirror of
https://github.com/corda/corda.git
synced 2025-01-18 02:39:51 +00:00
CORDA-2743 - utilities and test to show rpc operations that support disconnects (#5009)
This commit is contained in:
parent
367c98ec7c
commit
6771386b4b
@ -0,0 +1,402 @@
|
||||
package net.corda.client.rpc.internal
|
||||
|
||||
import net.corda.client.rpc.*
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.times
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.messaging.ClientRpcSslOptions
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.FlowHandle
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.nodeapi.exceptions.RejectedCommandException
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException
|
||||
import rx.Observable
|
||||
import java.lang.reflect.InvocationHandler
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.lang.reflect.Method
|
||||
import java.lang.reflect.Proxy
|
||||
import java.time.Duration
|
||||
import java.util.*
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* Wrapper over [CordaRPCOps] that handles exceptions when the node or the connection to the node fail.
|
||||
*
|
||||
* All operations are retried on failure, except flow start operations that die before receiving a valid [FlowHandle], in which case a [CouldNotStartFlowException] is thrown.
|
||||
*
|
||||
* When calling methods that return a [DataFeed] like [CordaRPCOps.vaultTrackBy], the returned [DataFeed.updates] object will no longer
|
||||
* be a usable [rx.Observable] but an instance of [ReconnectingObservable].
|
||||
* The caller has to explicitly cast to [ReconnectingObservable] and call [ReconnectingObservable.subscribe]. If used as an [rx.Observable] it will just fail.
|
||||
* The returned [DataFeed.snapshot] is the snapshot as it was when the feed was first retrieved.
|
||||
*
|
||||
* Note: There is no guarantee that observations will not be lost.
|
||||
*
|
||||
* *This class is not a stable API. Any project that wants to use it, must copy and paste it.*
|
||||
*/
|
||||
class ReconnectingCordaRPCOps private constructor(
|
||||
private val reconnectingRPCConnection: ReconnectingRPCConnection,
|
||||
private val observersPool: ExecutorService,
|
||||
private val userPool: Boolean
|
||||
) : AutoCloseable, CordaRPCOps by proxy(reconnectingRPCConnection, observersPool) {
|
||||
|
||||
// Constructors that mirror CordaRPCClient.
|
||||
constructor(
|
||||
nodeHostAndPort: NetworkHostAndPort,
|
||||
username: String,
|
||||
password: String,
|
||||
sslConfiguration: ClientRpcSslOptions? = null,
|
||||
classLoader: ClassLoader? = null,
|
||||
observersPool: ExecutorService? = null
|
||||
) : this(
|
||||
ReconnectingRPCConnection(listOf(nodeHostAndPort), username, password, sslConfiguration, classLoader),
|
||||
observersPool ?: Executors.newCachedThreadPool(),
|
||||
observersPool != null)
|
||||
|
||||
constructor(
|
||||
nodeHostAndPorts: List<NetworkHostAndPort>,
|
||||
username: String,
|
||||
password: String,
|
||||
sslConfiguration: ClientRpcSslOptions? = null,
|
||||
classLoader: ClassLoader? = null,
|
||||
observersPool: ExecutorService? = null
|
||||
) : this(
|
||||
ReconnectingRPCConnection(nodeHostAndPorts, username, password, sslConfiguration, classLoader),
|
||||
observersPool ?: Executors.newCachedThreadPool(),
|
||||
observersPool != null)
|
||||
|
||||
private companion object {
|
||||
private val log = contextLogger()
|
||||
private fun proxy(reconnectingRPCConnection: ReconnectingRPCConnection, observersPool: ExecutorService): CordaRPCOps {
|
||||
return Proxy.newProxyInstance(
|
||||
this::class.java.classLoader,
|
||||
arrayOf(CordaRPCOps::class.java),
|
||||
ErrorInterceptingHandler(reconnectingRPCConnection, observersPool)) as CordaRPCOps
|
||||
}
|
||||
}
|
||||
|
||||
private val retryFlowsPool = Executors.newScheduledThreadPool(1)
|
||||
|
||||
/**
|
||||
* This function runs a flow and retries until it completes successfully.
|
||||
*
|
||||
* [runFlow] is a function that starts a flow.
|
||||
* [hasFlowStarted] is a function that checks if the flow has actually completed by checking some side-effect, for example the vault.
|
||||
* [onFlowConfirmed] Callback when the flow is confirmed.
|
||||
* [timeout] Indicative timeout to wait until the flow would create the side-effect. Should be increased if the flow is slow. Note that
|
||||
* this timeout is calculated after the rpc client has reconnected to the node.
|
||||
*
|
||||
* Note that this method does not guarantee 100% that the flow will not be started twice.
|
||||
*/
|
||||
fun runFlowWithLogicalRetry(runFlow: (CordaRPCOps) -> StateMachineRunId, hasFlowStarted: (CordaRPCOps) -> Boolean, onFlowConfirmed: () -> Unit = {}, timeout: Duration = 4.seconds) {
|
||||
try {
|
||||
runFlow(this)
|
||||
onFlowConfirmed()
|
||||
} catch (e: CouldNotStartFlowException) {
|
||||
log.error("Couldn't start flow: ${e.message}")
|
||||
retryFlowsPool.schedule(
|
||||
{
|
||||
if (!hasFlowStarted(this)) {
|
||||
runFlowWithLogicalRetry(runFlow, hasFlowStarted, onFlowConfirmed, timeout)
|
||||
} else {
|
||||
onFlowConfirmed()
|
||||
}
|
||||
},
|
||||
timeout.seconds, TimeUnit.SECONDS
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is similar to [runFlowWithLogicalRetry] but is blocking and it returns the result of the flow.
|
||||
*
|
||||
* [runFlow] - starts a flow and returns the [FlowHandle].
|
||||
* [hasFlowCompleted] - Runs a vault query and is able to recreate the result of the flow.
|
||||
*/
|
||||
fun <T> runFlowAndReturnResultWithLogicalRetry(runFlow: (CordaRPCOps) -> FlowHandle<T>, hasFlowCompleted: (CordaRPCOps) -> T?, timeout: Duration = 4.seconds): T {
|
||||
return try {
|
||||
runFlow(this).returnValue.get()
|
||||
} catch (e: CouldNotStartFlowException) {
|
||||
log.error("Couldn't start flow: ${e.message}")
|
||||
Thread.sleep(timeout.toMillis())
|
||||
hasFlowCompleted(this) ?: runFlowAndReturnResultWithLogicalRetry(runFlow, hasFlowCompleted, timeout)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class useful for reconnecting to a Node.
|
||||
*/
|
||||
internal data class ReconnectingRPCConnection(
|
||||
val nodeHostAndPorts: List<NetworkHostAndPort>,
|
||||
val username: String,
|
||||
val password: String,
|
||||
val sslConfiguration: ClientRpcSslOptions? = null,
|
||||
val classLoader: ClassLoader?
|
||||
) : RPCConnection<CordaRPCOps> {
|
||||
private var currentRPCConnection: CordaRPCConnection? = null
|
||||
|
||||
init {
|
||||
connect()
|
||||
}
|
||||
|
||||
enum class CurrentState {
|
||||
UNCONNECTED, CONNECTED, CONNECTING, CLOSED, DIED
|
||||
}
|
||||
|
||||
private var currentState = CurrentState.UNCONNECTED
|
||||
|
||||
private val current: CordaRPCConnection
|
||||
@Synchronized get() = when (currentState) {
|
||||
CurrentState.CONNECTED -> currentRPCConnection!!
|
||||
CurrentState.UNCONNECTED, CurrentState.CLOSED -> {
|
||||
connect()
|
||||
currentRPCConnection!!
|
||||
}
|
||||
CurrentState.CONNECTING, CurrentState.DIED -> throw IllegalArgumentException("Illegal state")
|
||||
}
|
||||
|
||||
/**
|
||||
* Called on external error.
|
||||
* Will block until the connection is established again.
|
||||
*/
|
||||
@Synchronized
|
||||
fun error(e: Throwable) {
|
||||
currentState = CurrentState.DIED
|
||||
//TODO - handle error cases
|
||||
log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
|
||||
connect()
|
||||
}
|
||||
|
||||
private fun connect() {
|
||||
currentState = CurrentState.CONNECTING
|
||||
currentRPCConnection = establishConnectionWithRetry()
|
||||
currentState = CurrentState.CONNECTED
|
||||
}
|
||||
|
||||
private tailrec fun establishConnectionWithRetry(retryInterval: Duration = 1.seconds, nrRetries: Int = 0): CordaRPCConnection {
|
||||
log.info("Connecting to: $nodeHostAndPorts")
|
||||
try {
|
||||
return CordaRPCClient(
|
||||
nodeHostAndPorts, CordaRPCClientConfiguration(connectionMaxRetryInterval = retryInterval), sslConfiguration, classLoader
|
||||
).start(username, password).also {
|
||||
// Check connection is truly operational before returning it.
|
||||
require(it.proxy.nodeInfo().legalIdentitiesAndCerts.isNotEmpty()) {
|
||||
"Could not establish connection to ${nodeHostAndPorts}."
|
||||
}
|
||||
log.debug { "Connection successfully established with: ${nodeHostAndPorts}" }
|
||||
}
|
||||
} catch (ex: Exception) {
|
||||
when (ex) {
|
||||
is ActiveMQSecurityException -> {
|
||||
// Happens when incorrect credentials provided.
|
||||
// It can happen at startup as well when the credentials are correct.
|
||||
if (nrRetries > 1) throw ex
|
||||
}
|
||||
is RPCException -> {
|
||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||
log.debug { "Exception upon establishing connection: ${ex.message}" }
|
||||
}
|
||||
is ActiveMQConnectionTimedOutException -> {
|
||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||
log.debug { "Exception upon establishing connection: ${ex.message}" }
|
||||
}
|
||||
is ActiveMQUnBlockedException -> {
|
||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||
log.debug { "Exception upon establishing connection: ${ex.message}" }
|
||||
}
|
||||
else -> {
|
||||
log.debug("Unknown exception upon establishing connection.", ex)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Could not connect this time round - pause before giving another try.
|
||||
Thread.sleep(retryInterval.toMillis())
|
||||
return establishConnectionWithRetry((retryInterval * 3) / 2, nrRetries + 1)
|
||||
}
|
||||
|
||||
override val proxy: CordaRPCOps
|
||||
get() = current.proxy
|
||||
|
||||
override val serverProtocolVersion
|
||||
get() = current.serverProtocolVersion
|
||||
|
||||
@Synchronized
|
||||
override fun notifyServerAndClose() {
|
||||
currentState = CurrentState.CLOSED
|
||||
currentRPCConnection?.notifyServerAndClose()
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun forceClose() {
|
||||
currentState = CurrentState.CLOSED
|
||||
currentRPCConnection?.forceClose()
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun close() {
|
||||
currentState = CurrentState.CLOSED
|
||||
currentRPCConnection?.close()
|
||||
}
|
||||
}
|
||||
|
||||
internal class ReconnectingObservableImpl<T> internal constructor(
|
||||
val reconnectingRPCConnection: ReconnectingRPCConnection,
|
||||
val observersPool: ExecutorService,
|
||||
val initial: DataFeed<*, T>,
|
||||
val createDataFeed: () -> DataFeed<*, T>
|
||||
) : Observable<T>(null), ReconnectingObservable<T> {
|
||||
|
||||
private var initialStartWith: Iterable<T>? = null
|
||||
private fun _subscribeWithReconnect(observerHandle: ObserverHandle, onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit, startWithValues: Iterable<T>? = null) {
|
||||
var subscriptionError: Throwable?
|
||||
try {
|
||||
val subscription = initial.updates.let { if (startWithValues != null) it.startWith(startWithValues) else it }
|
||||
.subscribe(onNext, observerHandle::fail, observerHandle::stop)
|
||||
subscriptionError = observerHandle.await()
|
||||
subscription.unsubscribe()
|
||||
} catch (e: Exception) {
|
||||
log.error("Failed to register subscriber .", e)
|
||||
subscriptionError = e
|
||||
}
|
||||
|
||||
// In case there was no exception the observer has finished gracefully.
|
||||
if (subscriptionError == null) {
|
||||
onStop()
|
||||
return
|
||||
}
|
||||
|
||||
onDisconnect()
|
||||
// Only continue if the subscription failed.
|
||||
reconnectingRPCConnection.error(subscriptionError)
|
||||
log.debug { "Recreating data feed." }
|
||||
|
||||
val newObservable = createDataFeed().updates as ReconnectingObservableImpl<T>
|
||||
onReconnect()
|
||||
return newObservable._subscribeWithReconnect(observerHandle, onNext, onStop, onDisconnect, onReconnect)
|
||||
}
|
||||
|
||||
override fun subscribe(onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit): ObserverHandle {
|
||||
val observerNotifier = ObserverHandle()
|
||||
// TODO - change the establish connection method to be non-blocking
|
||||
observersPool.execute {
|
||||
_subscribeWithReconnect(observerNotifier, onNext, onStop, onDisconnect, onReconnect, initialStartWith)
|
||||
}
|
||||
return observerNotifier
|
||||
}
|
||||
|
||||
override fun startWithValues(values: Iterable<T>): ReconnectingObservable<T> {
|
||||
initialStartWith = values
|
||||
return this
|
||||
}
|
||||
}
|
||||
|
||||
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection, val observersPool: ExecutorService) : InvocationHandler {
|
||||
private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
|
||||
|
||||
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
|
||||
val result: Any? = try {
|
||||
log.debug { "Invoking RPC $method..." }
|
||||
method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also {
|
||||
log.debug { "RPC $method invoked successfully." }
|
||||
}
|
||||
} catch (e: InvocationTargetException) {
|
||||
fun retry() = if (method.isStartFlow()) {
|
||||
// Don't retry flows
|
||||
throw CouldNotStartFlowException(e.targetException)
|
||||
} else {
|
||||
this.invoke(proxy, method, args)
|
||||
}
|
||||
|
||||
when (e.targetException) {
|
||||
is RejectedCommandException -> {
|
||||
log.error("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e)
|
||||
reconnectingRPCConnection.error(e)
|
||||
this.invoke(proxy, method, args)
|
||||
}
|
||||
is ConnectionFailureException -> {
|
||||
log.error("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e)
|
||||
reconnectingRPCConnection.error(e)
|
||||
retry()
|
||||
}
|
||||
is RPCException -> {
|
||||
log.error("Failed to perform operation ${method.name}. RPCException. Retrying....", e)
|
||||
reconnectingRPCConnection.error(e)
|
||||
Thread.sleep(1000) // TODO - explain why this sleep is necessary
|
||||
retry()
|
||||
}
|
||||
else -> {
|
||||
log.error("Failed to perform operation ${method.name}. Unknown error. Retrying....", e)
|
||||
reconnectingRPCConnection.error(e)
|
||||
retry()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return when (method.returnType) {
|
||||
DataFeed::class.java -> {
|
||||
// Intercept the data feed methods and returned a ReconnectingObservable instance
|
||||
val initialFeed: DataFeed<Any, Any?> = uncheckedCast(result)
|
||||
val observable = ReconnectingObservableImpl(reconnectingRPCConnection, observersPool, initialFeed) {
|
||||
// This handles reconnecting and creates new feeds.
|
||||
uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args))
|
||||
}
|
||||
initialFeed.copy(updates = observable)
|
||||
}
|
||||
// TODO - add handlers for Observable return types.
|
||||
else -> result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
if (!userPool) observersPool.shutdown()
|
||||
retryFlowsPool.shutdown()
|
||||
reconnectingRPCConnection.forceClose()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returned as the `updates` field when calling methods that return a [DataFeed] on the [ReconnectingCordaRPCOps].
|
||||
*
|
||||
* TODO - provide a logical function to know how to retrieve missing events that happened during disconnects.
|
||||
*/
|
||||
interface ReconnectingObservable<T> {
|
||||
fun subscribe(onNext: (T) -> Unit): ObserverHandle = subscribe(onNext, {}, {}, {})
|
||||
fun subscribe(onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit): ObserverHandle
|
||||
fun startWithValues(values: Iterable<T>): ReconnectingObservable<T>
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility to externally control a subscribed observer.
|
||||
*/
|
||||
class ObserverHandle {
|
||||
private val terminated = LinkedBlockingQueue<Optional<Throwable>>(1)
|
||||
|
||||
fun stop() = terminated.put(Optional.empty())
|
||||
internal fun fail(e: Throwable) = terminated.put(Optional.of(e))
|
||||
|
||||
/**
|
||||
* Returns null if the observation ended successfully.
|
||||
*/
|
||||
internal fun await(duration: Duration = 60.minutes): Throwable? = terminated.poll(duration.seconds, TimeUnit.SECONDS).orElse(null)
|
||||
}
|
||||
|
||||
/**
|
||||
* Thrown when a flow start command died before receiving a [net.corda.core.messaging.FlowHandle].
|
||||
* On catching this exception, the typical behaviour is to run a "logical retry", meaning only retry the flow if the expected outcome did not occur.
|
||||
*/
|
||||
class CouldNotStartFlowException(cause: Throwable? = null) : RPCException("Could not start flow as connection failed", cause)
|
||||
|
||||
/**
|
||||
* Mainly for Kotlin users.
|
||||
*/
|
||||
fun <T> Observable<T>.asReconnecting(): ReconnectingObservable<T> = uncheckedCast(this)
|
||||
|
||||
fun <T> Observable<T>.asReconnectingWithInitialValues(values: Iterable<T>): ReconnectingObservable<T> = asReconnecting().startWithValues(values)
|
@ -352,39 +352,60 @@ When not in ``devMode``, the server will mask exceptions not meant for clients a
|
||||
This does not expose internal information to clients, strengthening privacy and security. CorDapps can have exceptions implement
|
||||
``ClientRelevantError`` to allow them to reach RPC clients.
|
||||
|
||||
Connection management
|
||||
---------------------
|
||||
It is possible to not be able to connect to the server on the first attempt. In that case, the ``CordaRPCClient.start()``
|
||||
method will throw an exception. The following code snippet is an example of how to write a simple retry mechanism for
|
||||
such situations:
|
||||
Reconnecting RPC clients
|
||||
------------------------
|
||||
|
||||
.. literalinclude:: ../../samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt
|
||||
In the current version of Corda the RPC connection and all the observervables that are created by a client will just throw exceptions and die
|
||||
when the node or TCP connection become unavailable.
|
||||
|
||||
It is the client's responsibility to handle these errors and reconnect once the node is running again. Running RPC commands against a stopped
|
||||
node will just throw exceptions. Previously created Observables will not emit any events after the node restarts. The client must explicitly re-run the command and
|
||||
re-subscribe to receive more events.
|
||||
|
||||
RPCs which have a side effect, such as starting flows, may have executed on the node even if the return value is not received by the client.
|
||||
The only way to confirm is to perform a business-level query and retry accordingly. The sample `runFlowWithLogicalRetry` helps with this.
|
||||
|
||||
In case users require such a functionality to write a resilient RPC client we have a sample that showcases how this can be implemented and also
|
||||
a thorough test that demonstrates it works as expected.
|
||||
|
||||
The code that performs the reconnecting logic is: `ReconnectingCordaRPCOps.kt <https://github.com/corda/samples/blob/release-V|platform_version|/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt>`_.
|
||||
|
||||
.. note:: This sample code is not exposed as an official Corda API, and must be included directly in the client codebase and adjusted.
|
||||
|
||||
The usage is showcased in the: `RpcReconnectTests.kt <https://github.com/corda/samples/blob/release-V|platform_version|/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt>`_.
|
||||
In case resiliency is a requirement, then it is recommended that users will write a similar test.
|
||||
|
||||
How to initialize the `ReconnectingCordaRPCOps`:
|
||||
|
||||
.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART rpcClientConnectionWithRetry
|
||||
:end-before: DOCEND rpcClientConnectionWithRetry
|
||||
:start-after: DOCSTART rpcReconnectingRPC
|
||||
:end-before: DOCEND rpcReconnectingRPC
|
||||
|
||||
.. warning:: The list of ``NetworkHostAndPort`` passed to this function should represent one or more addresses reflecting the number of
|
||||
instances of a node configured to service the client RPC request. See ``haAddressPool`` in `CordaRPCClient`_ for further information on
|
||||
using an RPC Client for load balancing and failover.
|
||||
|
||||
After a successful connection, it is possible for the server to become unavailable. In this case, all RPC calls will throw
|
||||
an exception and created observables will no longer receive observations. Below is an example of how to reconnect and
|
||||
back-fill any data that might have been missed while the connection was down. This is done by using the ``onError`` handler
|
||||
on the ``Observable`` returned by ``CordaRPCOps``.
|
||||
How to track the vault :
|
||||
|
||||
.. literalinclude:: ../../samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt
|
||||
.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART rpcClientConnectionRecovery
|
||||
:end-before: DOCEND rpcClientConnectionRecovery
|
||||
:start-after: DOCSTART rpcReconnectingRPCVaultTracking
|
||||
:end-before: DOCEND rpcReconnectingRPCVaultTracking
|
||||
|
||||
In this code snippet it is possible to see that the function ``performRpcReconnect`` creates an RPC connection and implements
|
||||
the error handler upon subscription to an ``Observable``. The call to this ``onError`` handler will be triggered upon failover, at which
|
||||
point the client will terminate its existing subscription, close its RPC connection and recursively call ``performRpcReconnect``,
|
||||
which will re-subscribe once the RPC connection is re-established.
|
||||
|
||||
Within the body of the ``subscribe`` function itself, the client code receives instances of ``StateMachineInfo``. Upon re-connecting, this code receives
|
||||
*all* the instances of ``StateMachineInfo``, some of which may already been delivered to the client code prior to previous disconnect.
|
||||
It is the responsibility of the client code to handle potential duplicated instances of ``StateMachineInfo`` as appropriate.
|
||||
How to start a flow with a logical retry function that checks for the side effects of the flow:
|
||||
|
||||
.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART rpcReconnectingRPCFlowStarting
|
||||
:end-before: DOCEND rpcReconnectingRPCFlowStarting
|
||||
|
||||
|
||||
Note that, as shown by the test, during reconnecting some events might be lost.
|
||||
|
||||
.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART missingVaultEvents
|
||||
:end-before: DOCEND missingVaultEvents
|
||||
|
||||
|
||||
Wire security
|
||||
-------------
|
||||
|
@ -0,0 +1,101 @@
|
||||
package net.corda.node.services.rpc
|
||||
|
||||
import java.io.IOException
|
||||
import java.io.InputStream
|
||||
import java.io.OutputStream
|
||||
import java.net.ServerSocket
|
||||
import java.net.Socket
|
||||
import java.net.SocketException
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
/**
|
||||
* Simple proxy that can be restarted and introduces random latencies.
|
||||
*
|
||||
* Also acts as a mock load balancer.
|
||||
*/
|
||||
class RandomFailingProxy(val serverPort: Int, val remotePort: Int) : AutoCloseable {
|
||||
private val threadPool = Executors.newCachedThreadPool()
|
||||
private val stopCopy = AtomicBoolean(false)
|
||||
private var currentServerSocket: ServerSocket? = null
|
||||
private val rnd = ThreadLocal.withInitial { Random() }
|
||||
|
||||
fun start(): RandomFailingProxy {
|
||||
stopCopy.set(false)
|
||||
currentServerSocket = ServerSocket(serverPort)
|
||||
threadPool.execute {
|
||||
try {
|
||||
currentServerSocket.use { serverSocket ->
|
||||
while (!stopCopy.get() && !serverSocket!!.isClosed) {
|
||||
handleConnection(serverSocket.accept())
|
||||
}
|
||||
}
|
||||
} catch (e: SocketException) {
|
||||
// The Server socket could be closed
|
||||
}
|
||||
}
|
||||
return this
|
||||
}
|
||||
|
||||
private fun handleConnection(socket: Socket) {
|
||||
threadPool.execute {
|
||||
socket.use { _ ->
|
||||
try {
|
||||
Socket("localhost", remotePort).use { target ->
|
||||
// send message to node
|
||||
threadPool.execute {
|
||||
try {
|
||||
socket.getInputStream().flakeyCopyTo(target.getOutputStream())
|
||||
} catch (e: IOException) {
|
||||
// Thrown when the connection to the target server dies.
|
||||
}
|
||||
}
|
||||
target.getInputStream().flakeyCopyTo(socket.getOutputStream())
|
||||
}
|
||||
} catch (e: IOException) {
|
||||
// Thrown when the connection to the target server dies.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun stop(): RandomFailingProxy {
|
||||
stopCopy.set(true)
|
||||
currentServerSocket?.close()
|
||||
return this
|
||||
}
|
||||
|
||||
private val failOneConnection = AtomicBoolean(false)
|
||||
fun failConnection() {
|
||||
failOneConnection.set(true)
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
try {
|
||||
stop()
|
||||
threadPool.shutdownNow()
|
||||
} catch (e: Exception) {
|
||||
// Nothing can be done.
|
||||
}
|
||||
}
|
||||
|
||||
private fun InputStream.flakeyCopyTo(out: OutputStream, bufferSize: Int = DEFAULT_BUFFER_SIZE): Long {
|
||||
var bytesCopied: Long = 0
|
||||
val buffer = ByteArray(bufferSize)
|
||||
var bytes = read(buffer)
|
||||
while (bytes >= 0 && !stopCopy.get()) {
|
||||
// Introduce intermittent slowness.
|
||||
if (rnd.get().nextInt().rem(700) == 0) {
|
||||
Thread.sleep(rnd.get().nextInt(2000).toLong())
|
||||
}
|
||||
if (failOneConnection.compareAndSet(true, false)) {
|
||||
throw IOException("Randomly dropped one connection")
|
||||
}
|
||||
out.write(buffer, 0, bytes)
|
||||
bytesCopied += bytes
|
||||
bytes = read(buffer)
|
||||
}
|
||||
return bytesCopied
|
||||
}
|
||||
}
|
@ -0,0 +1,285 @@
|
||||
package net.corda.node.services.rpc
|
||||
|
||||
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
|
||||
import net.corda.client.rpc.internal.asReconnecting
|
||||
import net.corda.core.contracts.Amount
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.messaging.StateMachineUpdate
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.vault.PageSpecification
|
||||
import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.node.services.vault.builder
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.flows.CashIssueAndPaymentFlow
|
||||
import net.corda.finance.schemas.CashSchemaV1
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.core.DUMMY_BANK_B_NAME
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.OutOfProcess
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.driver.internal.OutOfProcessImpl
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.FINANCE_CORDAPPS
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.math.absoluteValue
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
/**
|
||||
* This is a slow test!
|
||||
*/
|
||||
class RpcReconnectTests {
|
||||
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
/**
|
||||
* This test showcases and stress tests the demo [ReconnectingCordaRPCOps].
|
||||
*
|
||||
* Note that during node failure events can be lost and starting flows can become unreliable.
|
||||
* The only available way to retry failed flows is to attempt a "logical retry" which is also showcased.
|
||||
*
|
||||
* This test runs flows in a loop and in the background kills the node or restarts it.
|
||||
* Also the RPC connection is made through a proxy that introduces random latencies and is also periodically killed.
|
||||
*/
|
||||
@Test
|
||||
fun `test that the RPC client is able to reconnect and proceed after node failure, restart, or connection reset`() {
|
||||
val nrOfFlowsToRun = 450 // Takes around 5 minutes.
|
||||
val nodeRunningTime = { Random().nextInt(12000) + 8000 }
|
||||
|
||||
val demoUser = User("demo", "demo", setOf(Permissions.all()))
|
||||
|
||||
val nodePort = 20006
|
||||
val proxyPort = 20007
|
||||
val tcpProxy = RandomFailingProxy(serverPort = proxyPort, remotePort = nodePort).start()
|
||||
|
||||
// When this reaches 0 - the test will end.
|
||||
val flowsCountdownLatch = CountDownLatch(nrOfFlowsToRun)
|
||||
|
||||
// These are the expected progress steps for the CashIssueAndPayFlow.
|
||||
val expectedProgress = listOf(
|
||||
"Starting",
|
||||
"Issuing cash",
|
||||
"Generating transaction",
|
||||
"Signing transaction",
|
||||
"Finalising transaction",
|
||||
"Broadcasting transaction to participants",
|
||||
"Paying recipient",
|
||||
"Generating anonymous identities",
|
||||
"Generating transaction",
|
||||
"Signing transaction",
|
||||
"Finalising transaction",
|
||||
"Requesting signature by notary service",
|
||||
"Requesting signature by Notary service",
|
||||
"Validating response from Notary service",
|
||||
"Broadcasting transaction to participants",
|
||||
"Done"
|
||||
)
|
||||
|
||||
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS, startNodesInProcess = false, inMemoryDB = false)) {
|
||||
fun startBankA() = startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("rpcSettings.address" to "localhost:$nodePort"))
|
||||
|
||||
var (bankA, bankB) = listOf(
|
||||
startBankA(),
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser))
|
||||
).transpose().getOrThrow()
|
||||
|
||||
val notary = defaultNotaryIdentity
|
||||
val baseAmount = Amount.parseCurrency("0 USD")
|
||||
val issuerRef = OpaqueBytes.of(0x01)
|
||||
|
||||
// Create a reconnecting rpc client through the TCP proxy.
|
||||
val bankAAddress = bankA.rpcAddress.copy(port = proxyPort)
|
||||
// DOCSTART rpcReconnectingRPC
|
||||
val bankAReconnectingRpc = ReconnectingCordaRPCOps(bankAAddress, demoUser.username, demoUser.password)
|
||||
// DOCEND rpcReconnectingRPC
|
||||
|
||||
// Observe the vault and collect the observations.
|
||||
val vaultEvents = Collections.synchronizedList(mutableListOf<Vault.Update<Cash.State>>())
|
||||
// DOCSTART rpcReconnectingRPCVaultTracking
|
||||
val vaultFeed = bankAReconnectingRpc.vaultTrackByWithPagingSpec(
|
||||
Cash.State::class.java,
|
||||
QueryCriteria.VaultQueryCriteria(),
|
||||
PageSpecification(1, 1))
|
||||
val vaultObserverHandle = vaultFeed.updates.asReconnecting().subscribe { update: Vault.Update<Cash.State> ->
|
||||
log.info("vault update produced ${update.produced.map { it.state.data.amount }} consumed ${update.consumed.map { it.ref }}")
|
||||
vaultEvents.add(update)
|
||||
}
|
||||
// DOCEND rpcReconnectingRPCVaultTracking
|
||||
|
||||
// Observe the stateMachine and collect the observations.
|
||||
val stateMachineEvents = Collections.synchronizedList(mutableListOf<StateMachineUpdate>())
|
||||
val stateMachineObserverHandle = bankAReconnectingRpc.stateMachinesFeed().updates.asReconnecting().subscribe { update ->
|
||||
log.info(update.toString())
|
||||
stateMachineEvents.add(update)
|
||||
}
|
||||
|
||||
// While the flows are running, randomly apply a different failure scenario.
|
||||
val nrRestarts = AtomicInteger()
|
||||
thread(name = "Node killer") {
|
||||
while (true) {
|
||||
if (flowsCountdownLatch.count == 0L) break
|
||||
|
||||
// Let the node run for a random time interval.
|
||||
nodeRunningTime().also { ms ->
|
||||
log.info("Running node for ${ms / 1000} s.")
|
||||
Thread.sleep(ms.toLong())
|
||||
}
|
||||
|
||||
if (flowsCountdownLatch.count == 0L) break
|
||||
|
||||
when (Random().nextInt().rem(6).absoluteValue) {
|
||||
0 -> {
|
||||
log.info("Forcefully killing node and proxy.")
|
||||
(bankA as OutOfProcessImpl).onStopCallback()
|
||||
(bankA as OutOfProcess).process.destroyForcibly()
|
||||
tcpProxy.stop()
|
||||
bankA = startBankA().get()
|
||||
tcpProxy.start()
|
||||
}
|
||||
1 -> {
|
||||
log.info("Forcefully killing node.")
|
||||
(bankA as OutOfProcessImpl).onStopCallback()
|
||||
(bankA as OutOfProcess).process.destroyForcibly()
|
||||
bankA = startBankA().get()
|
||||
}
|
||||
2 -> {
|
||||
log.info("Shutting down node.")
|
||||
bankA.stop()
|
||||
tcpProxy.stop()
|
||||
bankA = startBankA().get()
|
||||
tcpProxy.start()
|
||||
}
|
||||
3, 4 -> {
|
||||
log.info("Killing proxy.")
|
||||
tcpProxy.stop()
|
||||
Thread.sleep(Random().nextInt(5000).toLong())
|
||||
tcpProxy.start()
|
||||
}
|
||||
5 -> {
|
||||
log.info("Dropping connection.")
|
||||
tcpProxy.failConnection()
|
||||
}
|
||||
}
|
||||
nrRestarts.incrementAndGet()
|
||||
}
|
||||
}
|
||||
|
||||
// Start nrOfFlowsToRun and provide a logical retry function that checks the vault.
|
||||
val flowProgressEvents = mutableMapOf<StateMachineRunId, MutableList<String>>()
|
||||
for (amount in (1..nrOfFlowsToRun)) {
|
||||
// DOCSTART rpcReconnectingRPCFlowStarting
|
||||
bankAReconnectingRpc.runFlowWithLogicalRetry(
|
||||
runFlow = { rpc ->
|
||||
log.info("Starting CashIssueAndPaymentFlow for $amount")
|
||||
val flowHandle = rpc.startTrackedFlowDynamic(
|
||||
CashIssueAndPaymentFlow::class.java,
|
||||
baseAmount.plus(Amount.parseCurrency("$amount USD")),
|
||||
issuerRef,
|
||||
bankB.nodeInfo.legalIdentities.first(),
|
||||
false,
|
||||
notary
|
||||
)
|
||||
val flowId = flowHandle.id
|
||||
log.info("Started flow $amount with flowId: $flowId")
|
||||
flowProgressEvents.addEvent(flowId, null)
|
||||
|
||||
// No reconnecting possible.
|
||||
flowHandle.progress.subscribe(
|
||||
{ prog ->
|
||||
flowProgressEvents.addEvent(flowId, prog)
|
||||
log.info("Progress $flowId : $prog")
|
||||
},
|
||||
{ error ->
|
||||
log.error("Error thrown in the flow progress observer", error)
|
||||
})
|
||||
flowHandle.id
|
||||
},
|
||||
hasFlowStarted = { rpc ->
|
||||
// Query for a state that is the result of this flow.
|
||||
val criteria = QueryCriteria.VaultCustomQueryCriteria(builder { CashSchemaV1.PersistentCashState::pennies.equal(amount.toLong() * 100) }, status = Vault.StateStatus.ALL)
|
||||
val results = rpc.vaultQueryByCriteria(criteria, Cash.State::class.java)
|
||||
log.info("$amount - Found states ${results.states}")
|
||||
// The flow has completed if a state is found
|
||||
results.states.isNotEmpty()
|
||||
},
|
||||
onFlowConfirmed = {
|
||||
flowsCountdownLatch.countDown()
|
||||
log.info("Flow started for $amount. Remaining flows: ${flowsCountdownLatch.count}")
|
||||
}
|
||||
)
|
||||
// DOCEND rpcReconnectingRPCFlowStarting
|
||||
|
||||
Thread.sleep(Random().nextInt(250).toLong())
|
||||
}
|
||||
|
||||
log.info("Started all flows")
|
||||
|
||||
// Wait until all flows have been started.
|
||||
flowsCountdownLatch.await()
|
||||
|
||||
log.info("Confirmed all flows.")
|
||||
|
||||
// Wait for all events to come in and flows to finish.
|
||||
Thread.sleep(4000)
|
||||
|
||||
val nrFailures = nrRestarts.get()
|
||||
log.info("Checking results after $nrFailures restarts.")
|
||||
|
||||
// The progress status for each flow can only miss the last events, because the node might have been killed.
|
||||
val missingProgressEvents = flowProgressEvents.filterValues { expectedProgress.subList(0, it.size) != it }
|
||||
assertTrue(missingProgressEvents.isEmpty(), "The flow progress tracker is missing events: $missingProgressEvents")
|
||||
|
||||
// DOCSTART missingVaultEvents
|
||||
// Check that enough vault events were received.
|
||||
// This check is fuzzy because events can go missing during node restarts.
|
||||
// Ideally there should be nrOfFlowsToRun events receive but some might get lost for each restart.
|
||||
assertTrue(vaultEvents!!.size + nrFailures * 2 >= nrOfFlowsToRun, "Not all vault events were received")
|
||||
// DOCEND missingVaultEvents
|
||||
|
||||
// Query the vault and check that states were created for all flows.
|
||||
val allCashStates = bankAReconnectingRpc
|
||||
.vaultQueryByWithPagingSpec(Cash.State::class.java, QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.CONSUMED), PageSpecification(1, 10000))
|
||||
.states
|
||||
|
||||
val allCash = allCashStates.map { it.state.data.amount.quantity }.toSet()
|
||||
val missingCash = (1..nrOfFlowsToRun).filterNot { allCash.contains(it.toLong() * 100) }
|
||||
log.info("MISSING: $missingCash")
|
||||
|
||||
assertEquals(nrOfFlowsToRun, allCashStates.size, "Not all flows were executed successfully")
|
||||
|
||||
// Check that no flow was triggered twice.
|
||||
val duplicates = allCashStates.groupBy { it.state.data.amount }.filterValues { it.size > 1 }
|
||||
assertTrue(duplicates.isEmpty(), "${duplicates.size} flows were retried illegally.")
|
||||
|
||||
log.info("SM EVENTS: ${stateMachineEvents!!.size}")
|
||||
// State machine events are very likely to get lost more often because they seem to be sent with a delay.
|
||||
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Added } > nrOfFlowsToRun / 2, "Too many Added state machine events lost.")
|
||||
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Removed } > nrOfFlowsToRun / 2, "Too many Removed state machine events lost.")
|
||||
|
||||
// Stop the observers.
|
||||
vaultObserverHandle.stop()
|
||||
stateMachineObserverHandle.stop()
|
||||
|
||||
bankAReconnectingRpc.close()
|
||||
}
|
||||
|
||||
tcpProxy.close()
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
fun MutableMap<StateMachineRunId, MutableList<String>>.addEvent(id: StateMachineRunId, progress: String?): Boolean {
|
||||
return getOrPut(id) { mutableListOf() }.let { if (progress != null) it.add(progress) else false }
|
||||
}
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ data class OutOfProcessImpl(
|
||||
override val useHTTPS: Boolean,
|
||||
val debugPort: Int?,
|
||||
override val process: Process,
|
||||
private val onStopCallback: () -> Unit
|
||||
val onStopCallback: () -> Unit
|
||||
) : OutOfProcess, NodeHandleInternal {
|
||||
override val rpcUsers: List<User> = configuration.rpcUsers.map { User(it.username, it.password, it.permissions) }
|
||||
override fun stop() {
|
||||
|
Loading…
Reference in New Issue
Block a user