[CORDA-2923] - Make the RPC client reconnect with gracefulReconnect param (#5244)

When set to true the RPC client will:

* automatically reconnect when the connection is broken
* simple RPC calls will block until connection is established
* Observables returned from RPC will automatically resubscribe on reconnect so the client continues to receive events. This doesn't guarantee that events will not be lost during the reconnect.
This commit is contained in:
Dimos Raptis 2019-07-16 11:29:21 +01:00 committed by Shams Asari
parent e96dcedfc6
commit 8962d930d4
11 changed files with 420 additions and 234 deletions

6
.idea/compiler.xml generated
View File

@ -265,6 +265,10 @@
<module name="cordformation_test" target="1.8" />
<module name="core-deterministic_main" target="1.8" />
<module name="core-deterministic_test" target="1.8" />
<module name="core-tests_integrationTest" target="1.8" />
<module name="core-tests_main" target="1.8" />
<module name="core-tests_smokeTest" target="1.8" />
<module name="core-tests_test" target="1.8" />
<module name="core_extraResource" target="1.8" />
<module name="core_integrationTest" target="1.8" />
<module name="core_main" target="1.8" />
@ -363,6 +367,8 @@
<module name="netparams_test" target="1.8" />
<module name="network-bootstrapper_main" target="1.8" />
<module name="network-bootstrapper_test" target="1.8" />
<module name="network-builder_main" target="1.8" />
<module name="network-builder_test" target="1.8" />
<module name="network-verifier-contracts_main" target="1.8" />
<module name="network-verifier-contracts_test" target="1.8" />
<module name="network-verifier-workflows_main" target="1.8" />

View File

@ -1,9 +1,7 @@
package net.corda.client.jfx.model
import javafx.application.Platform
import javafx.beans.property.SimpleObjectProperty
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.internal.asReconnectingWithInitialValues
import net.corda.core.contracts.ContractState
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
@ -85,24 +83,19 @@ class NodeMonitorModel : AutoCloseable {
}.toSet()
val consumedStates = statesSnapshot.states.toSet() - unconsumedStates
val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates, references = emptySet())
vaultUpdates.asReconnectingWithInitialValues(listOf(initialVaultUpdate))
.subscribe(
onNext = vaultUpdatesSubject::onNext,
onDisconnect = { Platform.runLater { proxyObservable.value = null } },
onReconnect = { Platform.runLater { proxyObservable.value = rpc } },
onStop = {})
vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject::onNext)
// Transactions
val (transactions, newTransactions) = rpc.internalVerifiedTransactionsFeed()
newTransactions.asReconnectingWithInitialValues(transactions).subscribe(transactionsSubject::onNext)
newTransactions.startWith(transactions).subscribe(transactionsSubject::onNext)
// SM -> TX mapping
val (smTxMappings, futureSmTxMappings) = rpc.stateMachineRecordedTransactionMappingFeed()
futureSmTxMappings.asReconnectingWithInitialValues(smTxMappings).subscribe(stateMachineTransactionMappingSubject::onNext)
futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject::onNext)
// Parties on network
val (parties, futurePartyUpdate) = rpc.networkMapFeed()
futurePartyUpdate.asReconnectingWithInitialValues(parties.map(MapChange::Added)).subscribe(networkMapSubject::onNext)
futurePartyUpdate.startWith(parties.map(MapChange::Added)).subscribe(networkMapSubject::onNext)
val stateMachines = rpc.stateMachinesSnapshot()

View File

@ -0,0 +1,147 @@
package net.corda.client.rpc
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueFlow
import net.corda.node.services.Permissions
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.node.User
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class CordaRPCClientReconnectionTest {
private val portAllocator = incrementalPortAllocation()
companion object {
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
}
@Test
fun `rpc client calls and returned observables continue working when the server crashes and restarts`() {
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS, startNodesInProcess = false, inMemoryDB = false)) {
val latch = CountDownLatch(2)
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
fun startNode(): NodeHandle {
return startNode(
providedName = CHARLIE_NAME,
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
customOverrides = mapOf("rpcSettings.address" to address.toString())
).getOrThrow()
}
val node = startNode()
val client = CordaRPCClient(node.rpcAddress, CordaRPCClientConfiguration.DEFAULT.copy(
maxReconnectAttempts = 5
))
val rpcOps = client.start(rpcUser.username, rpcUser.password, gracefulReconnect = true).proxy
val networkParameters = rpcOps.networkParameters
val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)
cashStatesFeed.updates.subscribe { latch.countDown() }
rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get()
node.stop()
startNode()
rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get()
val networkParametersAfterCrash = rpcOps.networkParameters
assertThat(networkParameters).isEqualTo(networkParametersAfterCrash)
assertTrue {
latch.await(2, TimeUnit.SECONDS)
}
}
}
@Test
fun `a client can successfully unsubscribe a reconnecting observable`() {
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS, startNodesInProcess = false, inMemoryDB = false)) {
val latch = CountDownLatch(2)
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
fun startNode(): NodeHandle {
return startNode(
providedName = CHARLIE_NAME,
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
customOverrides = mapOf("rpcSettings.address" to address.toString())
).getOrThrow()
}
val node = startNode()
val client = CordaRPCClient(node.rpcAddress, CordaRPCClientConfiguration.DEFAULT.copy(
maxReconnectAttempts = 5
))
val rpcOps = client.start(rpcUser.username, rpcUser.password, true).proxy
val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)
val subscription = cashStatesFeed.updates.subscribe { latch.countDown() }
rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get()
node.stop()
startNode()
subscription.unsubscribe()
rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get()
assertFalse {
latch.await(4, TimeUnit.SECONDS)
}
}
}
@Test
fun `rpc client calls and returned observables continue working when there is failover between servers`() {
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS, startNodesInProcess = false, inMemoryDB = false)) {
val latch = CountDownLatch(2)
fun startNode(address: NetworkHostAndPort): NodeHandle {
return startNode(
providedName = CHARLIE_NAME,
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
customOverrides = mapOf("rpcSettings.address" to address.toString())
).getOrThrow()
}
val addresses = listOf(NetworkHostAndPort("localhost", portAllocator.nextPort()), NetworkHostAndPort("localhost", portAllocator.nextPort()))
val node = startNode(addresses[0])
val client = CordaRPCClient(addresses, CordaRPCClientConfiguration.DEFAULT.copy(
maxReconnectAttempts = 5
))
val rpcOps = client.start(rpcUser.username, rpcUser.password, true).proxy
val networkParameters = rpcOps.networkParameters
val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)
cashStatesFeed.updates.subscribe { latch.countDown() }
rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get()
node.stop()
startNode(addresses[1])
rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get()
val networkParametersAfterCrash = rpcOps.networkParameters
assertThat(networkParameters).isEqualTo(networkParametersAfterCrash)
assertTrue {
latch.await(2, TimeUnit.SECONDS)
}
}
}
}

View File

@ -2,12 +2,15 @@ package net.corda.client.rpc
import com.github.benmanes.caffeine.cache.Caffeine
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
import net.corda.core.internal.createInstancesOfClassesImplementing
import net.corda.client.rpc.reconnect.CouldNotStartFlowException
import net.corda.core.CordaInternal
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.createInstancesOfClassesImplementing
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.messaging.CordaRPCOps
@ -23,14 +26,39 @@ import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
import net.corda.serialization.internal.amqp.SerializationFactoryCacheKey
import net.corda.serialization.internal.amqp.SerializerFactory
import java.time.Duration
import java.util.ServiceLoader
import java.util.*
/**
* This class is essentially just a wrapper for an RPCConnection<CordaRPCOps> and can be treated identically.
*
* @see RPCConnection
*/
class CordaRPCConnection internal constructor(connection: RPCConnection<CordaRPCOps>) : RPCConnection<CordaRPCOps> by connection
class CordaRPCConnection private constructor(
private val oneTimeConnection: RPCConnection<CordaRPCOps>?,
private val reconnectingCordaRPCOps: ReconnectingCordaRPCOps?
) : RPCConnection<CordaRPCOps> {
internal constructor(connection: RPCConnection<CordaRPCOps>?) : this(connection, null)
companion object {
@CordaInternal
internal fun createWithGracefulReconnection(username: String, password: String, addresses: List<NetworkHostAndPort>): CordaRPCConnection {
return CordaRPCConnection(null, ReconnectingCordaRPCOps(addresses, username, password))
}
}
override val proxy: CordaRPCOps get() = reconnectingCordaRPCOps ?: oneTimeConnection!!.proxy
private val actualConnection: RPCConnection<CordaRPCOps>
get() = reconnectingCordaRPCOps?.reconnectingRPCConnection ?: oneTimeConnection!!
override val serverProtocolVersion: Int get() = actualConnection.serverProtocolVersion
override fun notifyServerAndClose() = actualConnection.notifyServerAndClose()
override fun forceClose() = actualConnection.forceClose()
override fun close() = actualConnection.close()
}
/**
* Can be used to configure the RPC client connection.
@ -235,11 +263,17 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
* observable with another RPC.
*
* In case of loss of connection to the server, the client will try to reconnect using the settings provided via
* [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw
* [CordaRPCClientConfiguration]. If the client was created using a list of hosts via [haAddressPool], automatic failover will occur
* (the servers have to be started in HA mode). While attempting failover, current and future RPC calls will throw
* [RPCException] and previously returned observables will call onError().
*
* If the client was created using a list of hosts, automatic failover will occur (the servers have to be started in
* HA mode).
* If you want to enable a more graceful form of reconnection, you can make use of the gracefulReconnect argument of the [start] method.
* If this is set to true, then:
* - The client will automatically reconnect, when the connection is broken regardless of whether you provided a single or multiple addresses.
* - Simple RPC calls that return data (e.g. [CordaRPCOps.networkParameters]) will **block** and return after the connection has been re-established and the node is up.
* - RPC calls that return [rx.Observable]s (e.g. [CordaRPCOps.vaultTrack]) will automatically reconnect and keep sending events for the subscribed [rx.Observable]s.
* Note: In this approach, some events might be lost during a re-connection and not sent in the subscribed [rx.Observable]s.
* - RPC calls that invoke flows (e.g. [CordaRPCOps.startFlowDynamic]) will fail during a disconnection throwing a [CouldNotStartFlowException].
*
* @param hostAndPort The network address to connect to.
* @param configuration An optional configuration used to tweak client behaviour.
@ -335,10 +369,12 @@ class CordaRPCClient private constructor(
*
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param gracefulReconnect whether the connection will reconnect gracefully.
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
fun start(username: String, password: String): CordaRPCConnection {
return start(username, password, null, null)
@JvmOverloads
fun start(username: String, password: String, gracefulReconnect: Boolean = false): CordaRPCConnection {
return start(username, password, null, null, gracefulReconnect)
}
/**
@ -350,10 +386,12 @@ class CordaRPCClient private constructor(
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param targetLegalIdentity in case of multi-identity RPC endpoint specific legal identity to which the calls must be addressed.
* @param gracefulReconnect whether the connection will reconnect gracefully.
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
fun start(username: String, password: String, targetLegalIdentity: CordaX500Name): CordaRPCConnection {
return start(username, password, null, null, targetLegalIdentity)
@JvmOverloads
fun start(username: String, password: String, targetLegalIdentity: CordaX500Name, gracefulReconnect: Boolean = false): CordaRPCConnection {
return start(username, password, null, null, targetLegalIdentity, gracefulReconnect)
}
/**
@ -366,10 +404,12 @@ class CordaRPCClient private constructor(
* @param password The password to authenticate with.
* @param externalTrace external [Trace] for correlation.
* @param impersonatedActor the actor on behalf of which all the invocations will be made.
* @param gracefulReconnect whether the connection will reconnect gracefully.
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?): CordaRPCConnection {
return start(username, password, externalTrace, impersonatedActor, null)
@JvmOverloads
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, gracefulReconnect: Boolean = false): CordaRPCConnection {
return start(username, password, externalTrace, impersonatedActor, null, gracefulReconnect)
}
/**
@ -383,10 +423,22 @@ class CordaRPCClient private constructor(
* @param externalTrace external [Trace] for correlation.
* @param impersonatedActor the actor on behalf of which all the invocations will be made.
* @param targetLegalIdentity in case of multi-identity RPC endpoint specific legal identity to which the calls must be addressed.
* @param gracefulReconnect whether the connection will reconnect gracefully.
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?): CordaRPCConnection {
return CordaRPCConnection(getRpcClient().start(InternalCordaRPCOps::class.java, username, password, externalTrace, impersonatedActor, targetLegalIdentity))
@JvmOverloads
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?, gracefulReconnect: Boolean = false): CordaRPCConnection {
val addresses = if (haAddressPool.isEmpty()) {
listOf(hostAndPort!!)
} else {
haAddressPool
}
return if (gracefulReconnect) {
CordaRPCConnection.createWithGracefulReconnection(username, password, addresses)
} else {
CordaRPCConnection(getRpcClient().start(InternalCordaRPCOps::class.java, username, password, externalTrace, impersonatedActor, targetLegalIdentity))
}
}
/**

View File

@ -1,6 +1,7 @@
package net.corda.client.rpc.internal
import net.corda.client.rpc.*
import net.corda.client.rpc.reconnect.CouldNotStartFlowException
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.div
import net.corda.core.internal.messaging.InternalCordaRPCOps
@ -10,21 +11,21 @@ import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowHandle
import net.corda.core.utilities.*
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.seconds
import net.corda.nodeapi.exceptions.RejectedCommandException
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException
import rx.Observable
import java.lang.reflect.InvocationHandler
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.lang.reflect.Proxy
import java.time.Duration
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
/**
@ -41,8 +42,10 @@ import java.util.concurrent.TimeUnit
*
* *This class is not a stable API. Any project that wants to use it, must copy and paste it.*
*/
// TODO The executor service is not needed. All we need is a single thread that deals with reconnecting and onto which ReconnectingObservables
// and other things can attach themselves as listeners for reconnect events.
class ReconnectingCordaRPCOps private constructor(
private val reconnectingRPCConnection: ReconnectingRPCConnection,
val reconnectingRPCConnection: ReconnectingRPCConnection,
private val observersPool: ExecutorService,
private val userPool: Boolean
) : AutoCloseable, InternalCordaRPCOps by proxy(reconnectingRPCConnection, observersPool) {
@ -118,26 +121,10 @@ class ReconnectingCordaRPCOps private constructor(
}
}
/**
* This function is similar to [runFlowWithLogicalRetry] but is blocking and it returns the result of the flow.
*
* [runFlow] - starts a flow and returns the [FlowHandle].
* [hasFlowCompleted] - Runs a vault query and is able to recreate the result of the flow.
*/
fun <T> runFlowAndReturnResultWithLogicalRetry(runFlow: (CordaRPCOps) -> FlowHandle<T>, hasFlowCompleted: (CordaRPCOps) -> T?, timeout: Duration = 4.seconds): T {
return try {
runFlow(this).returnValue.get()
} catch (e: CouldNotStartFlowException) {
log.error("Couldn't start flow: ${e.message}")
Thread.sleep(timeout.toMillis())
hasFlowCompleted(this) ?: runFlowAndReturnResultWithLogicalRetry(runFlow, hasFlowCompleted, timeout)
}
}
/**
* Helper class useful for reconnecting to a Node.
*/
internal data class ReconnectingRPCConnection(
data class ReconnectingRPCConnection(
val nodeHostAndPorts: List<NetworkHostAndPort>,
val username: String,
val password: String,
@ -169,10 +156,11 @@ class ReconnectingCordaRPCOps private constructor(
* Will block until the connection is established again.
*/
@Synchronized
fun error(e: Throwable) {
fun reconnectOnError(e: Throwable) {
currentState = CurrentState.DIED
//TODO - handle error cases
log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
log.debug("", e)
connect()
}
@ -193,9 +181,9 @@ class ReconnectingCordaRPCOps private constructor(
).start(username, password).also {
// Check connection is truly operational before returning it.
require(it.proxy.nodeInfo().legalIdentitiesAndCerts.isNotEmpty()) {
"Could not establish connection to ${nodeHostAndPorts}."
"Could not establish connection to $nodeHostAndPorts."
}
log.debug { "Connection successfully established with: ${nodeHostAndPorts}" }
log.debug { "Connection successfully established with: $nodeHostAndPorts" }
}
} catch (ex: Exception) {
when (ex) {
@ -256,57 +244,6 @@ class ReconnectingCordaRPCOps private constructor(
}
}
internal class ReconnectingObservableImpl<T> internal constructor(
val reconnectingRPCConnection: ReconnectingRPCConnection,
val observersPool: ExecutorService,
val initial: DataFeed<*, T>,
val createDataFeed: () -> DataFeed<*, T>
) : Observable<T>(null), ReconnectingObservable<T> {
private var initialStartWith: Iterable<T>? = null
private fun _subscribeWithReconnect(observerHandle: ObserverHandle, onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit, startWithValues: Iterable<T>? = null) {
var subscriptionError: Throwable?
try {
val subscription = initial.updates.let { if (startWithValues != null) it.startWith(startWithValues) else it }
.subscribe(onNext, observerHandle::fail, observerHandle::stop)
subscriptionError = observerHandle.await()
subscription.unsubscribe()
} catch (e: Exception) {
log.error("Failed to register subscriber .", e)
subscriptionError = e
}
// In case there was no exception the observer has finished gracefully.
if (subscriptionError == null) {
onStop()
return
}
onDisconnect()
// Only continue if the subscription failed.
reconnectingRPCConnection.error(subscriptionError)
log.debug { "Recreating data feed." }
val newObservable = createDataFeed().updates as ReconnectingObservableImpl<T>
onReconnect()
return newObservable._subscribeWithReconnect(observerHandle, onNext, onStop, onDisconnect, onReconnect)
}
override fun subscribe(onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit): ObserverHandle {
val observerNotifier = ObserverHandle()
// TODO - change the establish connection method to be non-blocking
observersPool.execute {
_subscribeWithReconnect(observerNotifier, onNext, onStop, onDisconnect, onReconnect, initialStartWith)
}
return observerNotifier
}
override fun startWithValues(values: Iterable<T>): ReconnectingObservable<T> {
initialStartWith = values
return this
}
}
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection, val observersPool: ExecutorService) : InvocationHandler {
private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
@ -327,23 +264,23 @@ class ReconnectingCordaRPCOps private constructor(
when (e.targetException) {
is RejectedCommandException -> {
log.error("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e)
reconnectingRPCConnection.error(e)
reconnectingRPCConnection.reconnectOnError(e)
this.invoke(proxy, method, args)
}
is ConnectionFailureException -> {
log.error("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e)
reconnectingRPCConnection.error(e)
reconnectingRPCConnection.reconnectOnError(e)
retry()
}
is RPCException -> {
log.error("Failed to perform operation ${method.name}. RPCException. Retrying....", e)
reconnectingRPCConnection.error(e)
reconnectingRPCConnection.reconnectOnError(e)
Thread.sleep(1000) // TODO - explain why this sleep is necessary
retry()
}
else -> {
log.error("Failed to perform operation ${method.name}. Unknown error. Retrying....", e)
reconnectingRPCConnection.error(e)
reconnectingRPCConnection.reconnectOnError(e)
retry()
}
}
@ -353,7 +290,7 @@ class ReconnectingCordaRPCOps private constructor(
DataFeed::class.java -> {
// Intercept the data feed methods and returned a ReconnectingObservable instance
val initialFeed: DataFeed<Any, Any?> = uncheckedCast(result)
val observable = ReconnectingObservableImpl(reconnectingRPCConnection, observersPool, initialFeed) {
val observable = ReconnectingObservable(reconnectingRPCConnection, observersPool, initialFeed) {
// This handles reconnecting and creates new feeds.
uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args))
}
@ -371,42 +308,3 @@ class ReconnectingCordaRPCOps private constructor(
reconnectingRPCConnection.forceClose()
}
}
/**
* Returned as the `updates` field when calling methods that return a [DataFeed] on the [ReconnectingCordaRPCOps].
*
* TODO - provide a logical function to know how to retrieve missing events that happened during disconnects.
*/
interface ReconnectingObservable<T> {
fun subscribe(onNext: (T) -> Unit): ObserverHandle = subscribe(onNext, {}, {}, {})
fun subscribe(onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit): ObserverHandle
fun startWithValues(values: Iterable<T>): ReconnectingObservable<T>
}
/**
* Utility to externally control a subscribed observer.
*/
class ObserverHandle {
private val terminated = LinkedBlockingQueue<Optional<Throwable>>(1)
fun stop() = terminated.put(Optional.empty())
internal fun fail(e: Throwable) = terminated.put(Optional.of(e))
/**
* Returns null if the observation ended successfully.
*/
internal fun await(): Throwable? = terminated.take().orElse(null)
}
/**
* Thrown when a flow start command died before receiving a [net.corda.core.messaging.FlowHandle].
* On catching this exception, the typical behaviour is to run a "logical retry", meaning only retry the flow if the expected outcome did not occur.
*/
class CouldNotStartFlowException(cause: Throwable? = null) : RPCException("Could not start flow as connection failed", cause)
/**
* Mainly for Kotlin users.
*/
fun <T> Observable<T>.asReconnecting(): ReconnectingObservable<T> = uncheckedCast(this)
fun <T> Observable<T>.asReconnectingWithInitialValues(values: Iterable<T>): ReconnectingObservable<T> = asReconnecting().startWithValues(values)

View File

@ -0,0 +1,68 @@
package net.corda.client.rpc.internal
import net.corda.core.messaging.DataFeed
import rx.Observable
import rx.Subscriber
import rx.Subscription
import java.util.concurrent.ExecutorService
import java.util.concurrent.atomic.AtomicReference
class ReconnectingObservable<T> private constructor(subscriber: ReconnectingSubscriber<T>) : Observable<T>(subscriber) {
constructor(
reconnectingRPCConnection: ReconnectingCordaRPCOps.ReconnectingRPCConnection,
executor: ExecutorService,
initialDataFeed: DataFeed<*, T>,
createDataFeed: () -> DataFeed<*, T>
) : this(ReconnectingSubscriber(reconnectingRPCConnection, executor, initialDataFeed, createDataFeed))
private class ReconnectingSubscriber<T>(
private val reconnectingRPCConnection: ReconnectingCordaRPCOps.ReconnectingRPCConnection,
private val executor: ExecutorService,
private val initialDataFeed: DataFeed<*, T>,
private val createDataFeed: () -> DataFeed<*, T>
) : OnSubscribe<T>, Subscription {
private val subscriber = AtomicReference<Subscriber<in T>>()
@Volatile
private var backingSubscription: Subscription? = null
@Volatile
private var unsubscribed = false
override fun unsubscribe() {
backingSubscription?.unsubscribe()
unsubscribed = true
}
override fun isUnsubscribed(): Boolean = unsubscribed
override fun call(subscriber: Subscriber<in T>) {
if (this.subscriber.compareAndSet(null, subscriber)) {
subscriber.add(this)
subscribeImmediately(initialDataFeed)
} else {
subscriber.onError(IllegalStateException("Only a single subscriber is allowed"))
}
}
private fun subscribeImmediately(dataFeed: DataFeed<*, T>) {
if (unsubscribed) return
val subscriber = checkNotNull(this.subscriber.get())
try {
backingSubscription = dataFeed.updates.subscribe(subscriber::onNext, ::scheduleResubscribe, subscriber::onCompleted)
} catch (e: Exception) {
scheduleResubscribe(e)
}
}
private fun scheduleResubscribe(error: Throwable) {
if (unsubscribed) return
executor.execute {
if (unsubscribed) return@execute
reconnectingRPCConnection.reconnectOnError(error)
val newDataFeed = createDataFeed()
subscribeImmediately(newDataFeed)
}
}
}
}

View File

@ -0,0 +1,10 @@
package net.corda.client.rpc.reconnect
import net.corda.client.rpc.RPCException
/**
* Thrown when a flow start command died before receiving a [net.corda.core.messaging.FlowHandle].
* On catching this exception, the typical behaviour is to run a "logical retry", meaning only retry the flow if the expected outcome did not occur.
*/
class CouldNotStartFlowException(cause: Throwable? = null) : RPCException("Could not start flow as connection failed", cause)

View File

@ -23,13 +23,19 @@ Version 5.0
* It is now possible to re-record transactions if a node wishes to record as an observer a transaction it has participated in. If this is
done, then the node may record new output states that are not relevant to the node.
.. warning:: Nodes may re-record transactions if they have previously recorded them as a participant and wish to record them as an observer.
However, the node cannot resolve the forward chain of transactions if this is done. This means that if you wish to re-record a chain of
transactions and get the new output states to be correctly marked as consumed, the full chain must be sent to the node *in order*.
.. warning:: Nodes may re-record transactions if they have previously recorded them as a participant and wish to record them as an observer.
However, the node cannot resolve the forward chain of transactions if this is done. This means that if you wish to re-record a chain of
transactions and get the new output states to be correctly marked as consumed, the full chain must be sent to the node *in order*.
* Added ``nodeDiagnosticInfo`` to the RPC API. The new RPC is also available as the ``run nodeDiagnosticInfo`` command executable from
the Corda shell. It retrieves version information about the Corda platform and the CorDapps installed on the node.
* ``CordaRPCClient.start`` has a new ``gracefulReconnect`` parameter. When ``true`` (the default is ``false``) it will cause the RPC client
to try to automatically reconnect to the node on disconnect. Further any ``Observable`` s previously created will continue to vend new
events on reconnect.
.. note:: This is only best-effort and there are no guarantees of reliability.
.. _changelog_v4.2:
Version 4.2

View File

@ -355,56 +355,45 @@ This does not expose internal information to clients, strengthening privacy and
Reconnecting RPC clients
------------------------
In the current version of Corda the RPC connection and all the observervables that are created by a client will just throw exceptions and die
when the node or TCP connection become unavailable.
In the current version of Corda, an RPC client connected to a node stops functioning when the node becomes unavailable or the associated TCP connection is interrupted.
Running RPC commands against a stopped node will just throw exceptions. Any subscriptions to ``Observable``\s that have been created before the disconnection will stop receiving events after the node restarts.
RPCs which have a side effect, such as starting flows, may or may not have executed on the node depending on when the client was disconnected.
It is the client's responsibility to handle these errors and reconnect once the node is running again. Running RPC commands against a stopped
node will just throw exceptions. Previously created Observables will not emit any events after the node restarts. The client must explicitly re-run the command and
re-subscribe to receive more events.
It is the client's responsibility to handle these errors and reconnect once the node is running again. The client will have to re-subscribe to any ``Observable``\s in order to keep receiving updates.
With regards to RPCs with side effects, the client will have to inspect the state of the node to infer whether the flow was executed or not before retrying it.
RPCs which have a side effect, such as starting flows, may have executed on the node even if the return value is not received by the client.
The only way to confirm is to perform a business-level query and retry accordingly. The sample `runFlowWithLogicalRetry` helps with this.
Clients can make use of the options described below in order to take advantage of some automatic reconnection functionality that mitigates some of these issues.
In case users require such a functionality to write a resilient RPC client we have a sample that showcases how this can be implemented and also
a thorough test that demonstrates it works as expected.
Enabling automatic reconnection
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The code that performs the reconnecting logic is: `ReconnectingCordaRPCOps.kt <https://github.com/corda/corda/blob/master/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt>`_.
If you provide a list of addresses via the ``haAddressPool`` argument when instantiating a ``CordaRPCClient``, then automatic reconnection will be performed when the existing connection is dropped.
However, any in-flight calls during reconnection will fail and previously returned observables will call ``onError``. The client code is responsible for waiting for the connection to be established
in order to retry any calls, retrieve new observables and re-subscribe to them.
.. note:: This sample code is not exposed as an official Corda API, and must be included directly in the client codebase and adjusted.
Enabling graceful reconnection
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The usage is showcased in the: `RpcReconnectTests.kt <https://github.com/corda/corda/blob/master/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt>`_.
In case resiliency is a requirement, then it is recommended that users will write a similar test.
A more graceful form of reconnection is also available, which will block all in-flight calls until the connection is re-established and
will also reconnect the existing ``Observable``\s, so that they keep emitting events to the existing subscribers.
How to initialize the `ReconnectingCordaRPCOps`:
.. warning:: In this approach, some events might be lost during a re-connection and not sent from the subscribed ``Observable``\s.
.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt
:language: kotlin
:start-after: DOCSTART rpcReconnectingRPC
:end-before: DOCEND rpcReconnectingRPC
You can enable this graceful form of reconnection by using the ``gracefulReconnect`` parameter in the following way:
.. sourcecode:: kotlin
How to track the vault :
val cordaClient = CordaRPCClient(nodeRpcAddress)
val cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect = true).proxy
.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt
:language: kotlin
:start-after: DOCSTART rpcReconnectingRPCVaultTracking
:end-before: DOCEND rpcReconnectingRPCVaultTracking
Logical retries for flow invocation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
As described above, if you want to retry a flow that failed during a disconnection, you will first need to verify it has not been previously executed.
The only way currently to confirm this is by performing a business-level query.
How to start a flow with a logical retry function that checks for the side effects of the flow:
.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt
:language: kotlin
:start-after: DOCSTART rpcReconnectingRPCFlowStarting
:end-before: DOCEND rpcReconnectingRPCFlowStarting
Note that, as shown by the test, during reconnecting some events might be lost.
.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt
:language: kotlin
:start-after: DOCSTART missingVaultEvents
:end-before: DOCEND missingVaultEvents
.. note:: Future releases of Corda are expected to contain new APIs for coping with reconnection in a more resilient way providing stricter
safety guarantees.
Wire security

View File

@ -1,7 +1,6 @@
package net.corda.node.services.rpc
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.internal.asReconnecting
import net.corda.core.contracts.Amount
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.concurrent.transpose
@ -10,6 +9,7 @@ import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.builder
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
@ -20,6 +20,7 @@ import net.corda.node.services.Permissions
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.OutOfProcess
import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.OutOfProcessImpl
@ -36,16 +37,25 @@ import kotlin.test.assertEquals
import kotlin.test.assertTrue
/**
* This is a slow test!
* This is a stress test for the rpc reconnection logic, which triggers failures in a probabilistic way.
*
* You can adjust the variable [NUMBER_OF_FLOWS_TO_RUN] to adjust the number of flows to run and the duration of the test.
*/
class RpcReconnectTests {
companion object {
// 150 flows take ~5 minutes
const val NUMBER_OF_FLOWS_TO_RUN = 150
private val log = contextLogger()
}
private val portAllocator = incrementalPortAllocation()
private lateinit var proxy: RandomFailingProxy
private lateinit var node: NodeHandle
private lateinit var currentAddressPair: AddressPair
/**
* This test showcases and stress tests the demo [ReconnectingCordaRPCOps].
*
@ -57,17 +67,12 @@ class RpcReconnectTests {
*/
@Test
fun `test that the RPC client is able to reconnect and proceed after node failure, restart, or connection reset`() {
val nrOfFlowsToRun = 150 // Takes around 5 minutes.
val nodeRunningTime = { Random().nextInt(12000) + 8000 }
val demoUser = User("demo", "demo", setOf(Permissions.all()))
val nodePort = portAllocator.nextPort()
val proxyPort = portAllocator.nextPort()
val tcpProxy = RandomFailingProxy(serverPort = proxyPort, remotePort = nodePort).start()
// When this reaches 0 - the test will end.
val flowsCountdownLatch = CountDownLatch(nrOfFlowsToRun)
val flowsCountdownLatch = CountDownLatch(NUMBER_OF_FLOWS_TO_RUN)
// These are the expected progress steps for the CashIssueAndPayFlow.
val expectedProgress = listOf(
@ -90,21 +95,26 @@ class RpcReconnectTests {
)
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS, startNodesInProcess = false, inMemoryDB = false)) {
fun startBankA() = startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("rpcSettings.address" to "localhost:$nodePort"))
fun startBankA(address: NetworkHostAndPort) = startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("rpcSettings.address" to address.toString()))
fun startProxy(addressPair: AddressPair) = RandomFailingProxy(serverPort = addressPair.proxyAddress.port, remotePort = addressPair.nodeAddress.port).start()
var (bankA, bankB) = listOf(
startBankA(),
val addresses = (1..3).map { getRandomAddressPair() }
currentAddressPair = addresses[0]
proxy = startProxy(currentAddressPair)
val (bankA, bankB) = listOf(
startBankA(currentAddressPair.nodeAddress),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser))
).transpose().getOrThrow()
node = bankA
val notary = defaultNotaryIdentity
val baseAmount = Amount.parseCurrency("0 USD")
val issuerRef = OpaqueBytes.of(0x01)
// Create a reconnecting rpc client through the TCP proxy.
val bankAAddress = bankA.rpcAddress.copy(port = proxyPort)
val addressesForRpc = addresses.map { it.proxyAddress }
// DOCSTART rpcReconnectingRPC
val bankAReconnectingRpc = ReconnectingCordaRPCOps(bankAAddress, demoUser.username, demoUser.password)
val bankAReconnectingRpc = ReconnectingCordaRPCOps(addressesForRpc, demoUser.username, demoUser.password)
// DOCEND rpcReconnectingRPC
// Observe the vault and collect the observations.
@ -114,7 +124,7 @@ class RpcReconnectTests {
Cash.State::class.java,
QueryCriteria.VaultQueryCriteria(),
PageSpecification(1, 1))
val vaultObserverHandle = vaultFeed.updates.asReconnecting().subscribe { update: Vault.Update<Cash.State> ->
val vaultSubscription = vaultFeed.updates.subscribe { update: Vault.Update<Cash.State> ->
log.info("vault update produced ${update.produced.map { it.state.data.amount }} consumed ${update.consumed.map { it.ref }}")
vaultEvents.add(update)
}
@ -122,7 +132,7 @@ class RpcReconnectTests {
// Observe the stateMachine and collect the observations.
val stateMachineEvents = Collections.synchronizedList(mutableListOf<StateMachineUpdate>())
val stateMachineObserverHandle = bankAReconnectingRpc.stateMachinesFeed().updates.asReconnecting().subscribe { update ->
val stateMachineSubscription = bankAReconnectingRpc.stateMachinesFeed().updates.subscribe { update ->
log.info(update.toString())
stateMachineEvents.add(update)
}
@ -141,37 +151,45 @@ class RpcReconnectTests {
if (flowsCountdownLatch.count == 0L) break
when (Random().nextInt().rem(6).absoluteValue) {
when (Random().nextInt().rem(7).absoluteValue) {
0 -> {
log.info("Forcefully killing node and proxy.")
(bankA as OutOfProcessImpl).onStopCallback()
(bankA as OutOfProcess).process.destroyForcibly()
tcpProxy.stop()
bankA = startBankA().get()
tcpProxy.start()
(node as OutOfProcessImpl).onStopCallback()
(node as OutOfProcess).process.destroyForcibly()
proxy.stop()
node = startBankA(currentAddressPair.nodeAddress).get()
proxy.start()
}
1 -> {
log.info("Forcefully killing node.")
(bankA as OutOfProcessImpl).onStopCallback()
(bankA as OutOfProcess).process.destroyForcibly()
bankA = startBankA().get()
(node as OutOfProcessImpl).onStopCallback()
(node as OutOfProcess).process.destroyForcibly()
node = startBankA(currentAddressPair.nodeAddress).get()
}
2 -> {
log.info("Shutting down node.")
bankA.stop()
tcpProxy.stop()
bankA = startBankA().get()
tcpProxy.start()
node.stop()
proxy.stop()
node = startBankA(currentAddressPair.nodeAddress).get()
proxy.start()
}
3, 4 -> {
log.info("Killing proxy.")
tcpProxy.stop()
proxy.stop()
Thread.sleep(Random().nextInt(5000).toLong())
tcpProxy.start()
proxy.start()
}
5 -> {
log.info("Dropping connection.")
tcpProxy.failConnection()
proxy.failConnection()
}
6 -> {
log.info("Performing failover to a different node")
node.stop()
proxy.stop()
currentAddressPair = addresses[Random().nextInt(addresses.size)]
node = startBankA(currentAddressPair.nodeAddress).get()
proxy = startProxy(currentAddressPair)
}
}
nrRestarts.incrementAndGet()
@ -180,7 +198,7 @@ class RpcReconnectTests {
// Start nrOfFlowsToRun and provide a logical retry function that checks the vault.
val flowProgressEvents = mutableMapOf<StateMachineRunId, MutableList<String>>()
for (amount in (1..nrOfFlowsToRun)) {
for (amount in (1..NUMBER_OF_FLOWS_TO_RUN)) {
// DOCSTART rpcReconnectingRPCFlowStarting
bankAReconnectingRpc.runFlowWithLogicalRetry(
runFlow = { rpc ->
@ -248,16 +266,16 @@ class RpcReconnectTests {
var nrRetries = 0
// It might be necessary to wait more for all events to arrive when the node is slow.
while (allCashStates.size < nrOfFlowsToRun && nrRetries++ < 50) {
while (allCashStates.size < NUMBER_OF_FLOWS_TO_RUN && nrRetries++ < 50) {
Thread.sleep(2000)
allCashStates = readCashStates()
}
val allCash = allCashStates.map { it.state.data.amount.quantity }.toSet()
val missingCash = (1..nrOfFlowsToRun).filterNot { allCash.contains(it.toLong() * 100) }
val missingCash = (1..NUMBER_OF_FLOWS_TO_RUN).filterNot { allCash.contains(it.toLong() * 100) }
log.info("MISSING: $missingCash")
assertEquals(nrOfFlowsToRun, allCashStates.size, "Not all flows were executed successfully")
assertEquals(NUMBER_OF_FLOWS_TO_RUN, allCashStates.size, "Not all flows were executed successfully")
// The progress status for each flow can only miss the last events, because the node might have been killed.
val missingProgressEvents = flowProgressEvents.filterValues { expectedProgress.subList(0, it.size) != it }
@ -267,7 +285,7 @@ class RpcReconnectTests {
// Check that enough vault events were received.
// This check is fuzzy because events can go missing during node restarts.
// Ideally there should be nrOfFlowsToRun events receive but some might get lost for each restart.
assertTrue(vaultEvents!!.size + nrFailures * 3 >= nrOfFlowsToRun, "Not all vault events were received")
assertTrue(vaultEvents!!.size + nrFailures * 3 >= NUMBER_OF_FLOWS_TO_RUN, "Not all vault events were received")
// DOCEND missingVaultEvents
// Check that no flow was triggered twice.
@ -276,21 +294,26 @@ class RpcReconnectTests {
log.info("SM EVENTS: ${stateMachineEvents!!.size}")
// State machine events are very likely to get lost more often because they seem to be sent with a delay.
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Added } > nrOfFlowsToRun / 3, "Too many Added state machine events lost.")
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Removed } > nrOfFlowsToRun / 3, "Too many Removed state machine events lost.")
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Added } > NUMBER_OF_FLOWS_TO_RUN / 3, "Too many Added state machine events lost.")
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Removed } > NUMBER_OF_FLOWS_TO_RUN / 3, "Too many Removed state machine events lost.")
// Stop the observers.
vaultObserverHandle.stop()
stateMachineObserverHandle.stop()
vaultSubscription.unsubscribe()
stateMachineSubscription.unsubscribe()
bankAReconnectingRpc.close()
}
tcpProxy.close()
proxy.close()
}
@Synchronized
fun MutableMap<StateMachineRunId, MutableList<String>>.addEvent(id: StateMachineRunId, progress: String?): Boolean {
return getOrPut(id) { mutableListOf() }.let { if (progress != null) it.add(progress) else false }
}
private fun getRandomAddressPair() = AddressPair(getRandomAddress(), getRandomAddress())
private fun getRandomAddress() = NetworkHostAndPort("localhost", portAllocator.nextPort())
data class AddressPair(val proxyAddress: NetworkHostAndPort, val nodeAddress: NetworkHostAndPort)
}

View File

@ -13,8 +13,6 @@ import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.internal.ReconnectingObservable
import net.corda.client.rpc.internal.asReconnectingWithInitialValues
import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier
@ -463,11 +461,7 @@ object InteractiveShell {
val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed()
val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) }
val subscriber = FlowWatchPrintingSubscriber(out)
if (stateMachineUpdates is ReconnectingObservable<*>) {
stateMachineUpdates.asReconnectingWithInitialValues(currentStateMachines).subscribe(subscriber::onNext)
} else {
stateMachineUpdates.startWith(currentStateMachines).subscribe(subscriber)
}
stateMachineUpdates.startWith(currentStateMachines).subscribe(subscriber)
var result: Any? = subscriber.future
if (result is Future<*>) {
if (!result.isDone) {