ENT-4822: Don't attempt RPC reconnect for NotSerializableException (#2991) (#5870)

* ENT-4822: Don't attempt RPC reconnect for NotSerializableException

* ENT-4822: Don't attempt RPC reconnect for NotSerializableException
This commit is contained in:
Ryan Fowler 2020-01-22 11:37:38 +00:00 committed by Christian Sailer
parent 9a1d46ddd1
commit 597658c4ab
2 changed files with 82 additions and 2 deletions

View File

@ -1,14 +1,30 @@
package net.corda.client.rpc
import net.corda.core.contracts.BelongsToContract
import net.corda.core.contracts.CommandData
import net.corda.core.contracts.Contract
import net.corda.core.contracts.ContractState
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.vaultQueryBy
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.internal._driverSerializationEnv
import net.corda.core.serialization.internal._rpcClientSerializationEnv
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.internal.enclosedCordapp
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Test
import java.io.NotSerializableException
class RpcCustomSerializersTest {
@ -21,8 +37,8 @@ class RpcCustomSerializersTest {
val client = CordaRPCClient(hostAndPort = server.rpcAddress)
val serializers = client.getRegisteredCustomSerializers()
assertThat(serializers).hasSize(1)
assertThat(serializers).hasOnlyElementsOfType(MySerializer::class.java)
assertThat(serializers).hasSize(2)
assertThat(serializers).hasOnlyElementsOfTypes(MySerializer::class.java, TestStateSerializer::class.java)
}
}
}
@ -56,6 +72,31 @@ class RpcCustomSerializersTest {
}
}
@Test
fun `when a custom serializer is missing from the rpc client the resulting exception progagtes and client does not reconnect`() {
driver(DriverParameters(startNodesInProcess = false, cordappsForAllNodes = listOf(enclosedCordapp()))) {
val server = startNode(providedName = ALICE_NAME).get()
var numReconnects = 0
val gracefulReconnect = GracefulReconnect(onReconnect = {++numReconnects})
withSerializationEnvironmentsReset {
val client = CordaRPCClient(hostAndPort = server.rpcAddress, customSerializers = emptySet())
(client.start(server.rpcUsers.first().username, server.rpcUsers.first().password, gracefulReconnect)).use {
val rpcOps = it.proxy
rpcOps.startFlow(::InsertTestStateFlow, defaultNotaryIdentity).returnValue.getOrThrow()
assertThatExceptionOfType(RPCException::class.java).isThrownBy {
rpcOps.vaultQueryBy<TestState>().states
}.withCauseInstanceOf(NotSerializableException::class.java)
}
assertThat(numReconnects).isEqualTo(0)
}
}
}
/**
* This is done to avoid re-using the serialization environment setup by the driver the same way
* it will happen when a client is initialised outside a node.
@ -93,4 +134,34 @@ class RpcCustomSerializersTest {
}
class MyMagicClass(val someField: String)
class TestContract: Contract {
override fun verify(tx: LedgerTransaction) { }
class Insert : CommandData
}
@BelongsToContract(TestContract::class)
class TestState(parties: List<AbstractParty>) : ContractState {
override val participants = parties
}
@StartableByRPC
class InsertTestStateFlow(private val notary: Party): FlowLogic<Unit>() {
override fun call() {
val myKey = serviceHub.myInfo.legalIdentities.first().owningKey
val tx = TransactionBuilder(notary)
.addCommand(TestContract.Insert(), myKey)
.addOutputState(TestState(serviceHub.myInfo.legalIdentities))
val stx = serviceHub.signInitialTransaction(tx, myKey)
subFlow(FinalityFlow(stx, emptyList()))
}
}
@Suppress("UNUSED")
class TestStateSerializer: SerializationCustomSerializer<TestState, TestStateSerializer.Proxy> {
data class Proxy(val participants: List<AbstractParty>, val b: Int)
override fun toProxy(obj: TestState): Proxy = Proxy(obj.participants, 1)
override fun fromProxy(proxy: Proxy): TestState = TestState(proxy.participants)
}
}

View File

@ -32,6 +32,7 @@ import net.corda.nodeapi.exceptions.RejectedCommandException
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException
import java.io.NotSerializableException
import java.lang.reflect.InvocationHandler
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
@ -324,6 +325,8 @@ class ReconnectingCordaRPCOps private constructor(
checkIfIsStartFlow(method, e)
}
is RPCException -> {
rethrowIfUnrecoverable(e.targetException as RPCException)
log.warn("Failed to perform operation ${method.name}. RPCException. Retrying....", e)
reconnectingRPCConnection.reconnectOnError(e)
Thread.sleep(1000) // TODO - explain why this sleep is necessary
@ -352,6 +355,12 @@ class ReconnectingCordaRPCOps private constructor(
}
}
private fun rethrowIfUnrecoverable(e: RPCException) {
if (e.cause is NotSerializableException) { // Do not try to reconnect when we can't serialize
throw e
}
}
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
return when (method.returnType) {
DataFeed::class.java -> {