CORDA-3882 - Integrate new start method with reconnecting rpc client (#6607)

* Draft version of integrating start flow with client id with reconnecting rpc client

* Add recursive reconnect

* Execute 'doInvoke' on a different thread, fixes hanging

* Tidy up code and add explanatory comment

* Add test timeout

* Add test asserting 'removeClientId' does retry to connect if the node is down

* Modify test to not create a new node on node restart

* Add test asserting 'reattachFlowWithClientId' tries to reconnect if the node is down

* Add test asserting returned flow exception future continue working on node restart

* Add assertions for reconnecting future callbacks

* Remove unused import

* Remove unused local var

* Remove unneeded line breaks

* Remove unneeded parentheses

* Amend existing test; assert flow result reconnectable future returned from 'reattachFlowWithClientId' works

* Update test names

* Add explanatory comment

* Minor comment update

* Fix accidental methods moved

* Update obsolete comment
This commit is contained in:
Kyriakos Tharrouniatis 2020-08-14 12:00:52 +01:00 committed by GitHub
parent 845ef8d3d1
commit ac9d21f66c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 257 additions and 6 deletions

View File

@ -1,5 +1,6 @@
package net.corda.client.rpcreconnect
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCClientTest
@ -8,10 +9,18 @@ import net.corda.client.rpc.MaxRpcRetryException
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.UnrecoverableRPCException
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.core.CordaRuntimeException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.internal.concurrent.doOnComplete
import net.corda.core.internal.concurrent.doOnError
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.messaging.startFlowWithClientId
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.finance.DOLLARS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueFlow
@ -24,16 +33,22 @@ import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.node.User
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import net.corda.testing.node.internal.enclosedCordapp
import net.corda.testing.node.internal.rpcDriver
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import java.lang.IllegalStateException
import java.lang.RuntimeException
import java.lang.Thread.sleep
import java.time.Duration
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertTrue
@ -51,10 +66,6 @@ class CordaRPCClientReconnectionTest {
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
}
@Test(timeout=300_000)
fun `rpc node start when FlowsDrainingModeEnabled throws RejectedCommandException and won't attempt to reconnect`() {
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS)) {
@ -373,4 +384,200 @@ class CordaRPCClientReconnectionTest {
}
}
}
@Test(timeout=300_000)
fun `rpc returned flow -started with cient id- result future continue working when the node crashes and restarts`() {
driver(DriverParameters(inMemoryDB = false, cordappsForAllNodes = listOf(this.enclosedCordapp()))) {
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
fun startNode(): NodeHandle {
return startNode(
providedName = CHARLIE_NAME,
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
customOverrides = mapOf("rpcSettings.address" to address.toString())
).getOrThrow()
}
val node = startNode()
val client = CordaRPCClient(node.rpcAddress, config)
client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
val clientId = UUID.randomUUID().toString()
// assert result reconnectable futures returned work from both 'startFlowWithClientId' and 'reattachFlowWithClientId'
val flowHandle0 = rpcOps.startFlowWithClientId(clientId, ::SimpleFlow)
val flowHandle1 = rpcOps.reattachFlowWithClientId<Int>(clientId)
val completedCounter = AtomicInteger(0)
flowHandle0.returnValue.doOnComplete {
completedCounter.incrementAndGet()
}
flowHandle1!!.returnValue.doOnComplete {
completedCounter.incrementAndGet()
}
flowHandle0.returnValue.thenMatch({
completedCounter.incrementAndGet()
}, {})
flowHandle1.returnValue.thenMatch({
completedCounter.incrementAndGet()
}, {})
flowHandle0.returnValue.toCompletableFuture().thenApply {
completedCounter.incrementAndGet()
}
flowHandle1.returnValue.toCompletableFuture().thenApply {
completedCounter.incrementAndGet()
}
node.stop()
thread {
sleep(1000)
startNode()
}
var result1: Int? = null
thread {
result1 = flowHandle1.returnValue.get()
}
val result0 = flowHandle0.returnValue.get()
sleep(1000)
assertEquals(6, completedCounter.get())
assertEquals(5, result0!!)
assertEquals(5, result1!!)
assertThat(rpcOps.reconnectingRPCConnection.isClosed())
}
}
}
@Test(timeout=300_000)
fun `rpc returned flow -started with cient id- exception future continue working when the node crashes and restarts`() {
driver(DriverParameters(inMemoryDB = false, cordappsForAllNodes = listOf(this.enclosedCordapp()))) {
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
fun startNode(): NodeHandle {
return startNode(
providedName = CHARLIE_NAME,
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
customOverrides = mapOf("rpcSettings.address" to address.toString())
).getOrThrow()
}
val node = startNode()
val client = CordaRPCClient(node.rpcAddress, config)
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
val clientId = UUID.randomUUID().toString()
val flowHandle = rpcOps.startFlowWithClientId(clientId, ::ThrowingFlow)
var erroredCounter = 0
flowHandle.returnValue.doOnError {
erroredCounter++
}
flowHandle.returnValue.toCompletableFuture().exceptionally {
erroredCounter++
}
node.stop()
thread {
sleep(1000)
startNode()
}
assertFailsWith<CordaRuntimeException> {
flowHandle.returnValue.getOrThrow()
}
sleep(1000)
assertEquals(2, erroredCounter)
assertThat(rpcOps.reconnectingRPCConnection.isClosed())
}
}
}
@Test(timeout=300_000)
fun `rpc re attach to flow with client id tries to reconnect when the node is down`() {
driver(DriverParameters(inMemoryDB = false, cordappsForAllNodes = listOf(this.enclosedCordapp()))) {
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
fun startNode(): NodeHandle {
return startNode(
providedName = CHARLIE_NAME,
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
customOverrides = mapOf("rpcSettings.address" to address.toString())
).getOrThrow()
}
val node = startNode()
val client = CordaRPCClient(node.rpcAddress, config)
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
val clientId = UUID.randomUUID().toString()
rpcOps.startFlowWithClientId(clientId, ::SimpleFlow)
node.stop()
thread {
sleep(1000)
startNode()
}
val flowHandle = rpcOps.reattachFlowWithClientId<Int>(clientId)
val result = flowHandle!!.returnValue.get()
assertEquals(5, result!!)
assertThat(rpcOps.reconnectingRPCConnection.isClosed())
}
}
}
@Test(timeout=300_000)
fun `rpc remove client id tries to reconnect when the node is down`() {
driver(DriverParameters(inMemoryDB = false, cordappsForAllNodes = listOf(this.enclosedCordapp()))) {
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
fun startNode(): NodeHandle {
return startNode(
providedName = CHARLIE_NAME,
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
customOverrides = mapOf("rpcSettings.address" to address.toString())
).getOrThrow()
}
val node = startNode()
val client = CordaRPCClient(node.rpcAddress, config)
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
val clientId = UUID.randomUUID().toString()
rpcOps.startFlowWithClientId(clientId, ::SimpleFlow).returnValue.getOrThrow()
node.stop()
thread {
sleep(1000)
startNode()
}
val removed = rpcOps.removeClientId(clientId)
assertTrue(removed)
assertThat(rpcOps.reconnectingRPCConnection.isClosed())
}
}
}
@StartableByRPC
class SimpleFlow : FlowLogic<Int>() {
@Suspendable
override fun call(): Int {
sleep(10.seconds)
return 5
}
}
@StartableByRPC
class ThrowingFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
sleep(10.seconds)
throw IllegalStateException("bye")
}
}
}

View File

@ -12,4 +12,5 @@ object RPCUtils {
fun RPCApi.ClientToServer.RpcRequest.isShutdownCmd() = isShutdownMethodName(methodName)
fun Method.isShutdown() = isShutdownMethodName(name)
fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
fun Method.isStartFlowWithClientId() = name == "startFlowWithClientId" || name == "startFlowDynamicWithClientId"
}

View File

@ -12,13 +12,18 @@ import net.corda.client.rpc.RPCException
import net.corda.client.rpc.UnrecoverableRPCException
import net.corda.client.rpc.internal.RPCUtils.isShutdown
import net.corda.client.rpc.internal.RPCUtils.isStartFlow
import net.corda.client.rpc.internal.RPCUtils.isStartFlowWithClientId
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CLOSED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTING
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.DIED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.UNCONNECTED
import net.corda.client.rpc.reconnect.CouldNotStartFlowException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.internal.min
import net.corda.core.internal.times
@ -27,6 +32,8 @@ import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowHandleWithClientId
import net.corda.core.messaging.FlowHandleWithClientIdImpl
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
@ -295,8 +302,9 @@ class ReconnectingCordaRPCOps private constructor(
}
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler {
private fun checkIfIsStartFlow(method: Method, e: InvocationTargetException) {
if (method.isStartFlow()) {
// Don't retry flows
if (method.isStartFlow() && !method.isStartFlowWithClientId()) {
// Only retry flows that have started with a client id. For such flows alone it is safe to recall them since,
// on recalling trying to reconnect they will not start a new flow but re-hook to an existing one ,that matches the client id, instead.
throw CouldNotStartFlowException(e.targetException)
}
}
@ -383,10 +391,44 @@ class ReconnectingCordaRPCOps private constructor(
}
initialFeed.copy(updates = observable)
}
FlowHandleWithClientId::class.java -> {
val initialHandle: FlowHandleWithClientId<Any?> = uncheckedCast(doInvoke(method, args,
reconnectingRPCConnection.gracefulReconnect.maxAttempts))
val initialFuture = initialHandle.returnValue
// This is the future that is returned to the client. It will get carried until we reconnect to the node.
val returnFuture = openFuture<Any?>()
tryConnect(initialFuture, returnFuture) {
val handle: FlowHandleWithClientId<Any?> = uncheckedCast(doInvoke(method, args, reconnectingRPCConnection.gracefulReconnect.maxAttempts))
handle.returnValue
}
return (initialHandle as FlowHandleWithClientIdImpl<Any?>).copy(returnValue = returnFuture)
}
// TODO - add handlers for Observable return types.
else -> doInvoke(method, args, reconnectingRPCConnection.gracefulReconnect.maxAttempts)
}
}
private fun tryConnect(currentFuture: CordaFuture<*>, returnFuture: OpenFuture<Any?>, doInvoke: () -> CordaFuture<*>) {
currentFuture.thenMatch(
success = {
returnFuture.set(it)
} ,
failure = {
if (it is ConnectionFailureException) {
reconnectingRPCConnection.observersPool.execute {
val reconnectedFuture = doInvoke()
tryConnect(reconnectedFuture, returnFuture, doInvoke)
}
} else {
returnFuture.setException(it)
}
}
)
}
}
fun close() {
@ -394,3 +436,4 @@ class ReconnectingCordaRPCOps private constructor(
reconnectingRPCConnection.forceClose()
}
}