CORDA-2743 - utilities and test to show rpc operations that support disconnects (#5009) (#5015)

(cherry picked from commit 6771386b4b)
This commit is contained in:
Tudor Malene 2019-04-15 10:25:56 +01:00 committed by Katelyn Baker
parent ee884a92de
commit 21adcffd31
5 changed files with 835 additions and 26 deletions

View File

@ -0,0 +1,402 @@
package net.corda.client.rpc.internal
import net.corda.client.rpc.*
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.div
import net.corda.core.internal.times
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowHandle
import net.corda.core.utilities.*
import net.corda.nodeapi.exceptions.RejectedCommandException
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException
import rx.Observable
import java.lang.reflect.InvocationHandler
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.lang.reflect.Proxy
import java.time.Duration
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
/**
* Wrapper over [CordaRPCOps] that handles exceptions when the node or the connection to the node fail.
*
* All operations are retried on failure, except flow start operations that die before receiving a valid [FlowHandle], in which case a [CouldNotStartFlowException] is thrown.
*
* When calling methods that return a [DataFeed] like [CordaRPCOps.vaultTrackBy], the returned [DataFeed.updates] object will no longer
* be a usable [rx.Observable] but an instance of [ReconnectingObservable].
* The caller has to explicitly cast to [ReconnectingObservable] and call [ReconnectingObservable.subscribe]. If used as an [rx.Observable] it will just fail.
* The returned [DataFeed.snapshot] is the snapshot as it was when the feed was first retrieved.
*
* Note: There is no guarantee that observations will not be lost.
*
* *This class is not a stable API. Any project that wants to use it, must copy and paste it.*
*/
class ReconnectingCordaRPCOps private constructor(
private val reconnectingRPCConnection: ReconnectingRPCConnection,
private val observersPool: ExecutorService,
private val userPool: Boolean
) : AutoCloseable, CordaRPCOps by proxy(reconnectingRPCConnection, observersPool) {
// Constructors that mirror CordaRPCClient.
constructor(
nodeHostAndPort: NetworkHostAndPort,
username: String,
password: String,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null,
observersPool: ExecutorService? = null
) : this(
ReconnectingRPCConnection(listOf(nodeHostAndPort), username, password, sslConfiguration, classLoader),
observersPool ?: Executors.newCachedThreadPool(),
observersPool != null)
constructor(
nodeHostAndPorts: List<NetworkHostAndPort>,
username: String,
password: String,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null,
observersPool: ExecutorService? = null
) : this(
ReconnectingRPCConnection(nodeHostAndPorts, username, password, sslConfiguration, classLoader),
observersPool ?: Executors.newCachedThreadPool(),
observersPool != null)
private companion object {
private val log = contextLogger()
private fun proxy(reconnectingRPCConnection: ReconnectingRPCConnection, observersPool: ExecutorService): CordaRPCOps {
return Proxy.newProxyInstance(
this::class.java.classLoader,
arrayOf(CordaRPCOps::class.java),
ErrorInterceptingHandler(reconnectingRPCConnection, observersPool)) as CordaRPCOps
}
}
private val retryFlowsPool = Executors.newScheduledThreadPool(1)
/**
* This function runs a flow and retries until it completes successfully.
*
* [runFlow] is a function that starts a flow.
* [hasFlowStarted] is a function that checks if the flow has actually completed by checking some side-effect, for example the vault.
* [onFlowConfirmed] Callback when the flow is confirmed.
* [timeout] Indicative timeout to wait until the flow would create the side-effect. Should be increased if the flow is slow. Note that
* this timeout is calculated after the rpc client has reconnected to the node.
*
* Note that this method does not guarantee 100% that the flow will not be started twice.
*/
fun runFlowWithLogicalRetry(runFlow: (CordaRPCOps) -> StateMachineRunId, hasFlowStarted: (CordaRPCOps) -> Boolean, onFlowConfirmed: () -> Unit = {}, timeout: Duration = 4.seconds) {
try {
runFlow(this)
onFlowConfirmed()
} catch (e: CouldNotStartFlowException) {
log.error("Couldn't start flow: ${e.message}")
retryFlowsPool.schedule(
{
if (!hasFlowStarted(this)) {
runFlowWithLogicalRetry(runFlow, hasFlowStarted, onFlowConfirmed, timeout)
} else {
onFlowConfirmed()
}
},
timeout.seconds, TimeUnit.SECONDS
)
}
}
/**
* This function is similar to [runFlowWithLogicalRetry] but is blocking and it returns the result of the flow.
*
* [runFlow] - starts a flow and returns the [FlowHandle].
* [hasFlowCompleted] - Runs a vault query and is able to recreate the result of the flow.
*/
fun <T> runFlowAndReturnResultWithLogicalRetry(runFlow: (CordaRPCOps) -> FlowHandle<T>, hasFlowCompleted: (CordaRPCOps) -> T?, timeout: Duration = 4.seconds): T {
return try {
runFlow(this).returnValue.get()
} catch (e: CouldNotStartFlowException) {
log.error("Couldn't start flow: ${e.message}")
Thread.sleep(timeout.toMillis())
hasFlowCompleted(this) ?: runFlowAndReturnResultWithLogicalRetry(runFlow, hasFlowCompleted, timeout)
}
}
/**
* Helper class useful for reconnecting to a Node.
*/
internal data class ReconnectingRPCConnection(
val nodeHostAndPorts: List<NetworkHostAndPort>,
val username: String,
val password: String,
val sslConfiguration: ClientRpcSslOptions? = null,
val classLoader: ClassLoader?
) : RPCConnection<CordaRPCOps> {
private var currentRPCConnection: CordaRPCConnection? = null
init {
connect()
}
enum class CurrentState {
UNCONNECTED, CONNECTED, CONNECTING, CLOSED, DIED
}
private var currentState = CurrentState.UNCONNECTED
private val current: CordaRPCConnection
@Synchronized get() = when (currentState) {
CurrentState.CONNECTED -> currentRPCConnection!!
CurrentState.UNCONNECTED, CurrentState.CLOSED -> {
connect()
currentRPCConnection!!
}
CurrentState.CONNECTING, CurrentState.DIED -> throw IllegalArgumentException("Illegal state")
}
/**
* Called on external error.
* Will block until the connection is established again.
*/
@Synchronized
fun error(e: Throwable) {
currentState = CurrentState.DIED
//TODO - handle error cases
log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
connect()
}
private fun connect() {
currentState = CurrentState.CONNECTING
currentRPCConnection = establishConnectionWithRetry()
currentState = CurrentState.CONNECTED
}
private tailrec fun establishConnectionWithRetry(retryInterval: Duration = 1.seconds, nrRetries: Int = 0): CordaRPCConnection {
log.info("Connecting to: $nodeHostAndPorts")
try {
return CordaRPCClient(
nodeHostAndPorts, CordaRPCClientConfiguration(connectionMaxRetryInterval = retryInterval), sslConfiguration, classLoader
).start(username, password).also {
// Check connection is truly operational before returning it.
require(it.proxy.nodeInfo().legalIdentitiesAndCerts.isNotEmpty()) {
"Could not establish connection to ${nodeHostAndPorts}."
}
log.debug { "Connection successfully established with: ${nodeHostAndPorts}" }
}
} catch (ex: Exception) {
when (ex) {
is ActiveMQSecurityException -> {
// Happens when incorrect credentials provided.
// It can happen at startup as well when the credentials are correct.
if (nrRetries > 1) throw ex
}
is RPCException -> {
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
log.debug { "Exception upon establishing connection: ${ex.message}" }
}
is ActiveMQConnectionTimedOutException -> {
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
log.debug { "Exception upon establishing connection: ${ex.message}" }
}
is ActiveMQUnBlockedException -> {
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
log.debug { "Exception upon establishing connection: ${ex.message}" }
}
else -> {
log.debug("Unknown exception upon establishing connection.", ex)
}
}
}
// Could not connect this time round - pause before giving another try.
Thread.sleep(retryInterval.toMillis())
return establishConnectionWithRetry((retryInterval * 3) / 2, nrRetries + 1)
}
override val proxy: CordaRPCOps
get() = current.proxy
override val serverProtocolVersion
get() = current.serverProtocolVersion
@Synchronized
override fun notifyServerAndClose() {
currentState = CurrentState.CLOSED
currentRPCConnection?.notifyServerAndClose()
}
@Synchronized
override fun forceClose() {
currentState = CurrentState.CLOSED
currentRPCConnection?.forceClose()
}
@Synchronized
override fun close() {
currentState = CurrentState.CLOSED
currentRPCConnection?.close()
}
}
internal class ReconnectingObservableImpl<T> internal constructor(
val reconnectingRPCConnection: ReconnectingRPCConnection,
val observersPool: ExecutorService,
val initial: DataFeed<*, T>,
val createDataFeed: () -> DataFeed<*, T>
) : Observable<T>(null), ReconnectingObservable<T> {
private var initialStartWith: Iterable<T>? = null
private fun _subscribeWithReconnect(observerHandle: ObserverHandle, onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit, startWithValues: Iterable<T>? = null) {
var subscriptionError: Throwable?
try {
val subscription = initial.updates.let { if (startWithValues != null) it.startWith(startWithValues) else it }
.subscribe(onNext, observerHandle::fail, observerHandle::stop)
subscriptionError = observerHandle.await()
subscription.unsubscribe()
} catch (e: Exception) {
log.error("Failed to register subscriber .", e)
subscriptionError = e
}
// In case there was no exception the observer has finished gracefully.
if (subscriptionError == null) {
onStop()
return
}
onDisconnect()
// Only continue if the subscription failed.
reconnectingRPCConnection.error(subscriptionError)
log.debug { "Recreating data feed." }
val newObservable = createDataFeed().updates as ReconnectingObservableImpl<T>
onReconnect()
return newObservable._subscribeWithReconnect(observerHandle, onNext, onStop, onDisconnect, onReconnect)
}
override fun subscribe(onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit): ObserverHandle {
val observerNotifier = ObserverHandle()
// TODO - change the establish connection method to be non-blocking
observersPool.execute {
_subscribeWithReconnect(observerNotifier, onNext, onStop, onDisconnect, onReconnect, initialStartWith)
}
return observerNotifier
}
override fun startWithValues(values: Iterable<T>): ReconnectingObservable<T> {
initialStartWith = values
return this
}
}
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection, val observersPool: ExecutorService) : InvocationHandler {
private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
val result: Any? = try {
log.debug { "Invoking RPC $method..." }
method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also {
log.debug { "RPC $method invoked successfully." }
}
} catch (e: InvocationTargetException) {
fun retry() = if (method.isStartFlow()) {
// Don't retry flows
throw CouldNotStartFlowException(e.targetException)
} else {
this.invoke(proxy, method, args)
}
when (e.targetException) {
is RejectedCommandException -> {
log.error("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e)
reconnectingRPCConnection.error(e)
this.invoke(proxy, method, args)
}
is ConnectionFailureException -> {
log.error("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e)
reconnectingRPCConnection.error(e)
retry()
}
is RPCException -> {
log.error("Failed to perform operation ${method.name}. RPCException. Retrying....", e)
reconnectingRPCConnection.error(e)
Thread.sleep(1000) // TODO - explain why this sleep is necessary
retry()
}
else -> {
log.error("Failed to perform operation ${method.name}. Unknown error. Retrying....", e)
reconnectingRPCConnection.error(e)
retry()
}
}
}
return when (method.returnType) {
DataFeed::class.java -> {
// Intercept the data feed methods and returned a ReconnectingObservable instance
val initialFeed: DataFeed<Any, Any?> = uncheckedCast(result)
val observable = ReconnectingObservableImpl(reconnectingRPCConnection, observersPool, initialFeed) {
// This handles reconnecting and creates new feeds.
uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args))
}
initialFeed.copy(updates = observable)
}
// TODO - add handlers for Observable return types.
else -> result
}
}
}
override fun close() {
if (!userPool) observersPool.shutdown()
retryFlowsPool.shutdown()
reconnectingRPCConnection.forceClose()
}
}
/**
* Returned as the `updates` field when calling methods that return a [DataFeed] on the [ReconnectingCordaRPCOps].
*
* TODO - provide a logical function to know how to retrieve missing events that happened during disconnects.
*/
interface ReconnectingObservable<T> {
fun subscribe(onNext: (T) -> Unit): ObserverHandle = subscribe(onNext, {}, {}, {})
fun subscribe(onNext: (T) -> Unit, onStop: () -> Unit, onDisconnect: () -> Unit, onReconnect: () -> Unit): ObserverHandle
fun startWithValues(values: Iterable<T>): ReconnectingObservable<T>
}
/**
* Utility to externally control a subscribed observer.
*/
class ObserverHandle {
private val terminated = LinkedBlockingQueue<Optional<Throwable>>(1)
fun stop() = terminated.put(Optional.empty())
internal fun fail(e: Throwable) = terminated.put(Optional.of(e))
/**
* Returns null if the observation ended successfully.
*/
internal fun await(duration: Duration = 60.minutes): Throwable? = terminated.poll(duration.seconds, TimeUnit.SECONDS).orElse(null)
}
/**
* Thrown when a flow start command died before receiving a [net.corda.core.messaging.FlowHandle].
* On catching this exception, the typical behaviour is to run a "logical retry", meaning only retry the flow if the expected outcome did not occur.
*/
class CouldNotStartFlowException(cause: Throwable? = null) : RPCException("Could not start flow as connection failed", cause)
/**
* Mainly for Kotlin users.
*/
fun <T> Observable<T>.asReconnecting(): ReconnectingObservable<T> = uncheckedCast(this)
fun <T> Observable<T>.asReconnectingWithInitialValues(values: Iterable<T>): ReconnectingObservable<T> = asReconnecting().startWithValues(values)

View File

@ -352,39 +352,60 @@ When not in ``devMode``, the server will mask exceptions not meant for clients a
This does not expose internal information to clients, strengthening privacy and security. CorDapps can have exceptions implement
``ClientRelevantError`` to allow them to reach RPC clients.
Connection management
---------------------
It is possible to not be able to connect to the server on the first attempt. In that case, the ``CordaRPCClient.start()``
method will throw an exception. The following code snippet is an example of how to write a simple retry mechanism for
such situations:
Reconnecting RPC clients
------------------------
.. literalinclude:: ../../samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt
In the current version of Corda the RPC connection and all the observervables that are created by a client will just throw exceptions and die
when the node or TCP connection become unavailable.
It is the client's responsibility to handle these errors and reconnect once the node is running again. Running RPC commands against a stopped
node will just throw exceptions. Previously created Observables will not emit any events after the node restarts. The client must explicitly re-run the command and
re-subscribe to receive more events.
RPCs which have a side effect, such as starting flows, may have executed on the node even if the return value is not received by the client.
The only way to confirm is to perform a business-level query and retry accordingly. The sample `runFlowWithLogicalRetry` helps with this.
In case users require such a functionality to write a resilient RPC client we have a sample that showcases how this can be implemented and also
a thorough test that demonstrates it works as expected.
The code that performs the reconnecting logic is: `ReconnectingCordaRPCOps.kt <https://github.com/corda/samples/blob/release-V|platform_version|/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt>`_.
.. note:: This sample code is not exposed as an official Corda API, and must be included directly in the client codebase and adjusted.
The usage is showcased in the: `RpcReconnectTests.kt <https://github.com/corda/samples/blob/release-V|platform_version|/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt>`_.
In case resiliency is a requirement, then it is recommended that users will write a similar test.
How to initialize the `ReconnectingCordaRPCOps`:
.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt
:language: kotlin
:start-after: DOCSTART rpcClientConnectionWithRetry
:end-before: DOCEND rpcClientConnectionWithRetry
:start-after: DOCSTART rpcReconnectingRPC
:end-before: DOCEND rpcReconnectingRPC
.. warning:: The list of ``NetworkHostAndPort`` passed to this function should represent one or more addresses reflecting the number of
instances of a node configured to service the client RPC request. See ``haAddressPool`` in `CordaRPCClient`_ for further information on
using an RPC Client for load balancing and failover.
After a successful connection, it is possible for the server to become unavailable. In this case, all RPC calls will throw
an exception and created observables will no longer receive observations. Below is an example of how to reconnect and
back-fill any data that might have been missed while the connection was down. This is done by using the ``onError`` handler
on the ``Observable`` returned by ``CordaRPCOps``.
How to track the vault :
.. literalinclude:: ../../samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt
.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt
:language: kotlin
:start-after: DOCSTART rpcClientConnectionRecovery
:end-before: DOCEND rpcClientConnectionRecovery
:start-after: DOCSTART rpcReconnectingRPCVaultTracking
:end-before: DOCEND rpcReconnectingRPCVaultTracking
In this code snippet it is possible to see that the function ``performRpcReconnect`` creates an RPC connection and implements
the error handler upon subscription to an ``Observable``. The call to this ``onError`` handler will be triggered upon failover, at which
point the client will terminate its existing subscription, close its RPC connection and recursively call ``performRpcReconnect``,
which will re-subscribe once the RPC connection is re-established.
Within the body of the ``subscribe`` function itself, the client code receives instances of ``StateMachineInfo``. Upon re-connecting, this code receives
*all* the instances of ``StateMachineInfo``, some of which may already been delivered to the client code prior to previous disconnect.
It is the responsibility of the client code to handle potential duplicated instances of ``StateMachineInfo`` as appropriate.
How to start a flow with a logical retry function that checks for the side effects of the flow:
.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt
:language: kotlin
:start-after: DOCSTART rpcReconnectingRPCFlowStarting
:end-before: DOCEND rpcReconnectingRPCFlowStarting
Note that, as shown by the test, during reconnecting some events might be lost.
.. literalinclude:: ../../node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt
:language: kotlin
:start-after: DOCSTART missingVaultEvents
:end-before: DOCEND missingVaultEvents
Wire security
-------------

View File

@ -0,0 +1,101 @@
package net.corda.node.services.rpc
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
import java.net.ServerSocket
import java.net.Socket
import java.net.SocketException
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
/**
* Simple proxy that can be restarted and introduces random latencies.
*
* Also acts as a mock load balancer.
*/
class RandomFailingProxy(val serverPort: Int, val remotePort: Int) : AutoCloseable {
private val threadPool = Executors.newCachedThreadPool()
private val stopCopy = AtomicBoolean(false)
private var currentServerSocket: ServerSocket? = null
private val rnd = ThreadLocal.withInitial { Random() }
fun start(): RandomFailingProxy {
stopCopy.set(false)
currentServerSocket = ServerSocket(serverPort)
threadPool.execute {
try {
currentServerSocket.use { serverSocket ->
while (!stopCopy.get() && !serverSocket!!.isClosed) {
handleConnection(serverSocket.accept())
}
}
} catch (e: SocketException) {
// The Server socket could be closed
}
}
return this
}
private fun handleConnection(socket: Socket) {
threadPool.execute {
socket.use { _ ->
try {
Socket("localhost", remotePort).use { target ->
// send message to node
threadPool.execute {
try {
socket.getInputStream().flakeyCopyTo(target.getOutputStream())
} catch (e: IOException) {
// Thrown when the connection to the target server dies.
}
}
target.getInputStream().flakeyCopyTo(socket.getOutputStream())
}
} catch (e: IOException) {
// Thrown when the connection to the target server dies.
}
}
}
}
fun stop(): RandomFailingProxy {
stopCopy.set(true)
currentServerSocket?.close()
return this
}
private val failOneConnection = AtomicBoolean(false)
fun failConnection() {
failOneConnection.set(true)
}
override fun close() {
try {
stop()
threadPool.shutdownNow()
} catch (e: Exception) {
// Nothing can be done.
}
}
private fun InputStream.flakeyCopyTo(out: OutputStream, bufferSize: Int = DEFAULT_BUFFER_SIZE): Long {
var bytesCopied: Long = 0
val buffer = ByteArray(bufferSize)
var bytes = read(buffer)
while (bytes >= 0 && !stopCopy.get()) {
// Introduce intermittent slowness.
if (rnd.get().nextInt().rem(700) == 0) {
Thread.sleep(rnd.get().nextInt(2000).toLong())
}
if (failOneConnection.compareAndSet(true, false)) {
throw IOException("Randomly dropped one connection")
}
out.write(buffer, 0, bytes)
bytesCopied += bytes
bytes = read(buffer)
}
return bytesCopied
}
}

View File

@ -0,0 +1,285 @@
package net.corda.node.services.rpc
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.internal.asReconnecting
import net.corda.core.contracts.Amount
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.builder
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueAndPaymentFlow
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.services.Permissions
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.OutOfProcess
import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.OutOfProcessImpl
import net.corda.testing.node.User
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import org.junit.Test
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.math.absoluteValue
import kotlin.test.assertEquals
import kotlin.test.assertTrue
/**
* This is a slow test!
*/
class RpcReconnectTests {
companion object {
private val log = contextLogger()
}
/**
* This test showcases and stress tests the demo [ReconnectingCordaRPCOps].
*
* Note that during node failure events can be lost and starting flows can become unreliable.
* The only available way to retry failed flows is to attempt a "logical retry" which is also showcased.
*
* This test runs flows in a loop and in the background kills the node or restarts it.
* Also the RPC connection is made through a proxy that introduces random latencies and is also periodically killed.
*/
@Test
fun `test that the RPC client is able to reconnect and proceed after node failure, restart, or connection reset`() {
val nrOfFlowsToRun = 450 // Takes around 5 minutes.
val nodeRunningTime = { Random().nextInt(12000) + 8000 }
val demoUser = User("demo", "demo", setOf(Permissions.all()))
val nodePort = 20006
val proxyPort = 20007
val tcpProxy = RandomFailingProxy(serverPort = proxyPort, remotePort = nodePort).start()
// When this reaches 0 - the test will end.
val flowsCountdownLatch = CountDownLatch(nrOfFlowsToRun)
// These are the expected progress steps for the CashIssueAndPayFlow.
val expectedProgress = listOf(
"Starting",
"Issuing cash",
"Generating transaction",
"Signing transaction",
"Finalising transaction",
"Broadcasting transaction to participants",
"Paying recipient",
"Generating anonymous identities",
"Generating transaction",
"Signing transaction",
"Finalising transaction",
"Requesting signature by notary service",
"Requesting signature by Notary service",
"Validating response from Notary service",
"Broadcasting transaction to participants",
"Done"
)
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS, startNodesInProcess = false, inMemoryDB = false)) {
fun startBankA() = startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("rpcSettings.address" to "localhost:$nodePort"))
var (bankA, bankB) = listOf(
startBankA(),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser))
).transpose().getOrThrow()
val notary = defaultNotaryIdentity
val baseAmount = Amount.parseCurrency("0 USD")
val issuerRef = OpaqueBytes.of(0x01)
// Create a reconnecting rpc client through the TCP proxy.
val bankAAddress = bankA.rpcAddress.copy(port = proxyPort)
// DOCSTART rpcReconnectingRPC
val bankAReconnectingRpc = ReconnectingCordaRPCOps(bankAAddress, demoUser.username, demoUser.password)
// DOCEND rpcReconnectingRPC
// Observe the vault and collect the observations.
val vaultEvents = Collections.synchronizedList(mutableListOf<Vault.Update<Cash.State>>())
// DOCSTART rpcReconnectingRPCVaultTracking
val vaultFeed = bankAReconnectingRpc.vaultTrackByWithPagingSpec(
Cash.State::class.java,
QueryCriteria.VaultQueryCriteria(),
PageSpecification(1, 1))
val vaultObserverHandle = vaultFeed.updates.asReconnecting().subscribe { update: Vault.Update<Cash.State> ->
log.info("vault update produced ${update.produced.map { it.state.data.amount }} consumed ${update.consumed.map { it.ref }}")
vaultEvents.add(update)
}
// DOCEND rpcReconnectingRPCVaultTracking
// Observe the stateMachine and collect the observations.
val stateMachineEvents = Collections.synchronizedList(mutableListOf<StateMachineUpdate>())
val stateMachineObserverHandle = bankAReconnectingRpc.stateMachinesFeed().updates.asReconnecting().subscribe { update ->
log.info(update.toString())
stateMachineEvents.add(update)
}
// While the flows are running, randomly apply a different failure scenario.
val nrRestarts = AtomicInteger()
thread(name = "Node killer") {
while (true) {
if (flowsCountdownLatch.count == 0L) break
// Let the node run for a random time interval.
nodeRunningTime().also { ms ->
log.info("Running node for ${ms / 1000} s.")
Thread.sleep(ms.toLong())
}
if (flowsCountdownLatch.count == 0L) break
when (Random().nextInt().rem(6).absoluteValue) {
0 -> {
log.info("Forcefully killing node and proxy.")
(bankA as OutOfProcessImpl).onStopCallback()
(bankA as OutOfProcess).process.destroyForcibly()
tcpProxy.stop()
bankA = startBankA().get()
tcpProxy.start()
}
1 -> {
log.info("Forcefully killing node.")
(bankA as OutOfProcessImpl).onStopCallback()
(bankA as OutOfProcess).process.destroyForcibly()
bankA = startBankA().get()
}
2 -> {
log.info("Shutting down node.")
bankA.stop()
tcpProxy.stop()
bankA = startBankA().get()
tcpProxy.start()
}
3, 4 -> {
log.info("Killing proxy.")
tcpProxy.stop()
Thread.sleep(Random().nextInt(5000).toLong())
tcpProxy.start()
}
5 -> {
log.info("Dropping connection.")
tcpProxy.failConnection()
}
}
nrRestarts.incrementAndGet()
}
}
// Start nrOfFlowsToRun and provide a logical retry function that checks the vault.
val flowProgressEvents = mutableMapOf<StateMachineRunId, MutableList<String>>()
for (amount in (1..nrOfFlowsToRun)) {
// DOCSTART rpcReconnectingRPCFlowStarting
bankAReconnectingRpc.runFlowWithLogicalRetry(
runFlow = { rpc ->
log.info("Starting CashIssueAndPaymentFlow for $amount")
val flowHandle = rpc.startTrackedFlowDynamic(
CashIssueAndPaymentFlow::class.java,
baseAmount.plus(Amount.parseCurrency("$amount USD")),
issuerRef,
bankB.nodeInfo.legalIdentities.first(),
false,
notary
)
val flowId = flowHandle.id
log.info("Started flow $amount with flowId: $flowId")
flowProgressEvents.addEvent(flowId, null)
// No reconnecting possible.
flowHandle.progress.subscribe(
{ prog ->
flowProgressEvents.addEvent(flowId, prog)
log.info("Progress $flowId : $prog")
},
{ error ->
log.error("Error thrown in the flow progress observer", error)
})
flowHandle.id
},
hasFlowStarted = { rpc ->
// Query for a state that is the result of this flow.
val criteria = QueryCriteria.VaultCustomQueryCriteria(builder { CashSchemaV1.PersistentCashState::pennies.equal(amount.toLong() * 100) }, status = Vault.StateStatus.ALL)
val results = rpc.vaultQueryByCriteria(criteria, Cash.State::class.java)
log.info("$amount - Found states ${results.states}")
// The flow has completed if a state is found
results.states.isNotEmpty()
},
onFlowConfirmed = {
flowsCountdownLatch.countDown()
log.info("Flow started for $amount. Remaining flows: ${flowsCountdownLatch.count}")
}
)
// DOCEND rpcReconnectingRPCFlowStarting
Thread.sleep(Random().nextInt(250).toLong())
}
log.info("Started all flows")
// Wait until all flows have been started.
flowsCountdownLatch.await()
log.info("Confirmed all flows.")
// Wait for all events to come in and flows to finish.
Thread.sleep(4000)
val nrFailures = nrRestarts.get()
log.info("Checking results after $nrFailures restarts.")
// The progress status for each flow can only miss the last events, because the node might have been killed.
val missingProgressEvents = flowProgressEvents.filterValues { expectedProgress.subList(0, it.size) != it }
assertTrue(missingProgressEvents.isEmpty(), "The flow progress tracker is missing events: $missingProgressEvents")
// DOCSTART missingVaultEvents
// Check that enough vault events were received.
// This check is fuzzy because events can go missing during node restarts.
// Ideally there should be nrOfFlowsToRun events receive but some might get lost for each restart.
assertTrue(vaultEvents!!.size + nrFailures * 2 >= nrOfFlowsToRun, "Not all vault events were received")
// DOCEND missingVaultEvents
// Query the vault and check that states were created for all flows.
val allCashStates = bankAReconnectingRpc
.vaultQueryByWithPagingSpec(Cash.State::class.java, QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.CONSUMED), PageSpecification(1, 10000))
.states
val allCash = allCashStates.map { it.state.data.amount.quantity }.toSet()
val missingCash = (1..nrOfFlowsToRun).filterNot { allCash.contains(it.toLong() * 100) }
log.info("MISSING: $missingCash")
assertEquals(nrOfFlowsToRun, allCashStates.size, "Not all flows were executed successfully")
// Check that no flow was triggered twice.
val duplicates = allCashStates.groupBy { it.state.data.amount }.filterValues { it.size > 1 }
assertTrue(duplicates.isEmpty(), "${duplicates.size} flows were retried illegally.")
log.info("SM EVENTS: ${stateMachineEvents!!.size}")
// State machine events are very likely to get lost more often because they seem to be sent with a delay.
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Added } > nrOfFlowsToRun / 2, "Too many Added state machine events lost.")
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Removed } > nrOfFlowsToRun / 2, "Too many Removed state machine events lost.")
// Stop the observers.
vaultObserverHandle.stop()
stateMachineObserverHandle.stop()
bankAReconnectingRpc.close()
}
tcpProxy.close()
}
@Synchronized
fun MutableMap<StateMachineRunId, MutableList<String>>.addEvent(id: StateMachineRunId, progress: String?): Boolean {
return getOrPut(id) { mutableListOf() }.let { if (progress != null) it.add(progress) else false }
}
}

View File

@ -35,7 +35,7 @@ data class OutOfProcessImpl(
override val useHTTPS: Boolean,
val debugPort: Int?,
override val process: Process,
private val onStopCallback: () -> Unit
val onStopCallback: () -> Unit
) : OutOfProcess, NodeHandleInternal {
override val rpcUsers: List<User> = configuration.rpcUsers.map { User(it.username, it.password, it.permissions) }
override fun stop() {