ENT-4513: Add checks for closing the RPC client (#5781)

This commit is contained in:
Ryan Fowler 2019-12-03 11:59:00 +00:00 committed by Rick Parker
parent e6f9b46584
commit 2abf22ccf9
4 changed files with 80 additions and 26 deletions

View File

@ -7,6 +7,7 @@ import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.MaxRpcRetryException
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
@ -25,7 +26,10 @@ import net.corda.testing.node.internal.FINANCE_CORDAPPS
import net.corda.testing.node.internal.rpcDriver
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.ClassRule
import org.junit.Test
import java.lang.Thread.sleep
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.concurrent.thread
@ -196,4 +200,34 @@ class CordaRPCClientReconnectionTest {
}
}
}
@Test(timeout = 120_000)
fun `RPC connection can be shut down after being disconnected from the node`() {
driver(DriverParameters(cordappsForAllNodes = emptyList())) {
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
fun startNode(): NodeHandle {
return startNode(
providedName = CHARLIE_NAME,
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
customOverrides = mapOf("rpcSettings.address" to address.toString())
).getOrThrow()
}
val node = startNode()
CordaRPCClient(node.rpcAddress).start(rpcUser.username, rpcUser.password, gracefulReconnect).use {
node.stop()
thread() {
it.proxy.startTrackedFlow(
::CashIssueFlow,
10.DOLLARS,
OpaqueBytes.of(0),
defaultNotaryIdentity
)
}
// This just gives the flow time to get started so the RPC detects a problem
sleep(1000)
it.close()
}
}
}
}

View File

@ -143,6 +143,7 @@ class ReconnectingCordaRPCOps private constructor(
UNCONNECTED, CONNECTED, CONNECTING, CLOSED, DIED
}
@Volatile
private var currentState = UNCONNECTED
init {
@ -151,14 +152,22 @@ class ReconnectingCordaRPCOps private constructor(
private val current: CordaRPCConnection
@Synchronized get() = when (currentState) {
// The first attempt to establish a connection will try every address only once.
UNCONNECTED -> connect(infiniteRetries = false)
CONNECTED -> currentRPCConnection!!
CLOSED -> throw IllegalArgumentException("The ReconnectingRPCConnection has been closed.")
CONNECTING, DIED -> throw IllegalArgumentException("Illegal state: $currentState ")
UNCONNECTED ->
connect(infiniteRetries = false) ?: throw IllegalArgumentException("The ReconnectingRPCConnection has been closed.")
CONNECTED ->
currentRPCConnection!!
CLOSED ->
throw IllegalArgumentException("The ReconnectingRPCConnection has been closed.")
CONNECTING, DIED ->
throw IllegalArgumentException("Illegal state: $currentState ")
}
@Synchronized
private fun doReconnect(e: Throwable, previousConnection: CordaRPCConnection?) {
if (isClosed()) {
// We don't want to reconnect if we purposely closed
return
}
if (previousConnection != currentRPCConnection) {
// We've already done this, skip
return
@ -182,16 +191,20 @@ class ReconnectingCordaRPCOps private constructor(
val previousConnection = currentRPCConnection
doReconnect(e, previousConnection)
}
@Synchronized
private fun connect(infiniteRetries: Boolean): CordaRPCConnection {
private fun connect(infiniteRetries: Boolean): CordaRPCConnection? {
currentState = CONNECTING
currentRPCConnection = if (infiniteRetries) {
establishConnectionWithRetry()
} else {
establishConnectionWithRetry(retries = nodeHostAndPorts.size)
synchronized(this) {
currentRPCConnection = if (infiniteRetries) {
establishConnectionWithRetry()
} else {
establishConnectionWithRetry(retries = nodeHostAndPorts.size)
}
// It's possible we could get closed while waiting for the connection to establish.
if (!isClosed()) {
currentState = CONNECTED
}
}
currentState = CONNECTED
return currentRPCConnection!!
return currentRPCConnection
}
/**
@ -205,7 +218,11 @@ class ReconnectingCordaRPCOps private constructor(
retryInterval: Duration = 1.seconds,
roundRobinIndex: Int = 0,
retries: Int = -1
): CordaRPCConnection {
): CordaRPCConnection? {
if (isClosed()) {
// We've decided to exit for some reason (maybe the client is being shutdown)
return null
}
val attemptedAddress = nodeHostAndPorts[roundRobinIndex]
log.info("Connecting to: $attemptedAddress")
try {
@ -255,16 +272,19 @@ class ReconnectingCordaRPCOps private constructor(
get() = current.proxy
override val serverProtocolVersion
get() = current.serverProtocolVersion
@Synchronized
override fun notifyServerAndClose() {
currentState = CLOSED
currentRPCConnection?.notifyServerAndClose()
synchronized(this) {
currentRPCConnection?.notifyServerAndClose()
}
}
@Synchronized
override fun forceClose() {
currentState = CLOSED
currentRPCConnection?.forceClose()
synchronized(this) {
currentRPCConnection?.forceClose()
}
}
fun isClosed(): Boolean = currentState == CLOSED
}
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler {
private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
@ -283,6 +303,9 @@ class ReconnectingCordaRPCOps private constructor(
* A negative number for [maxNumberOfAttempts] means an unlimited number of retries will be performed.
*/
private fun doInvoke(method: Method, args: Array<out Any>?, maxNumberOfAttempts: Int): Any? {
if (reconnectingRPCConnection.isClosed()) {
throw RPCException("Cannot execute RPC command after client has shut down.")
}
var remainingAttempts = maxNumberOfAttempts
var lastException: Throwable? = null
while (remainingAttempts != 0) {

View File

@ -57,8 +57,10 @@ class ReconnectingObservable<T> private constructor(subscriber: ReconnectingSubs
private fun scheduleResubscribe(error: Throwable) {
if (unsubscribed) return
reconnectingRPCConnection.observersPool.execute {
if (unsubscribed) return@execute
if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute
reconnectingRPCConnection.reconnectOnError(error)
// It can take a while to reconnect so we might find that we've shutdown in in the meantime
if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute
val newDataFeed = createDataFeed()
subscribeImmediately(newDataFeed)
}

View File

@ -182,7 +182,8 @@
<ID>ComplexMethod:RPCClientProxyHandler.kt$RPCClientProxyHandler$// This is the general function that transforms a client side RPC to internal Artemis messages. override fun invoke(proxy: Any, method: Method, arguments: Array&lt;out Any?&gt;?): Any?</ID>
<ID>ComplexMethod:RPCClientProxyHandler.kt$RPCClientProxyHandler$private fun attemptReconnect()</ID>
<ID>ComplexMethod:RPCServer.kt$RPCServer$private fun clientArtemisMessageHandler(artemisMessage: ClientMessage)</ID>
<ID>ComplexMethod:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ReconnectingRPCConnection$ private tailrec fun establishConnectionWithRetry( retryInterval: Duration = 1.seconds, roundRobinIndex: Int = 0, retries: Int = -1 ): CordaRPCConnection</ID>
<ID>ComplexMethod:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ErrorInterceptingHandler$ private fun doInvoke(method: Method, args: Array&lt;out Any&gt;?, maxNumberOfAttempts: Int): Any?</ID>
<ID>ComplexMethod:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ReconnectingRPCConnection$ private tailrec fun establishConnectionWithRetry( retryInterval: Duration = 1.seconds, roundRobinIndex: Int = 0, retries: Int = -1 ): CordaRPCConnection?</ID>
<ID>ComplexMethod:RemoteTypeCarpenter.kt$SchemaBuildingRemoteTypeCarpenter$override fun carpent(typeInformation: RemoteTypeInformation): Type</ID>
<ID>ComplexMethod:RpcReconnectTests.kt$RpcReconnectTests$ @Test fun `test that the RPC client is able to reconnect and proceed after node failure, restart, or connection reset`()</ID>
<ID>ComplexMethod:SchemaMigration.kt$SchemaMigration$ private fun migrateOlderDatabaseToUseLiquibase(existingCheckpoints: Boolean): Boolean</ID>
@ -2516,7 +2517,6 @@
<ID>MaxLineLength:MockServices.kt$MockServices.Companion$makeMockMockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys.toSet(), keyManagementService, schemaService, database)</ID>
<ID>MaxLineLength:MockServices.kt$MockServices.Companion$return object : MockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys.toTypedArray(), keyManagementService) { override var networkParametersService: NetworkParametersService = MockNetworkParametersStorage(networkParameters) override val vaultService: VaultService = makeVaultService(schemaService, persistence, cordappLoader) override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable&lt;SignedTransaction&gt;) { ServiceHubInternal.recordTransactions( statesToRecord, txs as? Collection ?: txs.toList(), validatedTransactions as WritableTransactionStorage, mockStateMachineRecordedTransactionMappingStorage, vaultService as VaultServiceInternal, persistence ) } override fun jdbcSession(): Connection = persistence.createSession() override fun &lt;T : Any?&gt; withEntityManager(block: EntityManager.() -&gt; T): T { return block(contextTransaction.restrictedEntityManager) } override fun withEntityManager(block: Consumer&lt;EntityManager&gt;) { return block.accept(contextTransaction.restrictedEntityManager) } }</ID>
<ID>MaxLineLength:MockServices.kt$MockServices.Companion$val database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService, schemaService.internalSchemas())</ID>
<ID>MaxLineLength:MyCustomNotaryService.kt$MyCustomValidatingNotaryService : SinglePartyNotaryService</ID>
<ID>MaxLineLength:Network.kt$Network$node.getWorldMapLocation()?.coordinate?.project(mapPane.width, mapPane.height, 85.0511, -85.0511, -180.0, 180.0) ?: ScreenCoordinate(0.0, 0.0)</ID>
<ID>MaxLineLength:Network.kt$Network$private val peerButtons = peerComponents.filtered { myIdentity.value !in it.nodeInfo.legalIdentitiesAndCerts.map { it.party } }.map { it.button }</ID>
<ID>MaxLineLength:Network.kt$Network$val inputParties = it.inputs.sequence() .map { it as? PartiallyResolvedTransaction.InputResolution.Resolved } .filterNotNull() .map { it.stateAndRef.state.data }.getParties() val outputParties = it.transaction.coreTransaction.let { if (it is WireTransaction) it.outputStates.observable().getParties() // For ContractUpgradeWireTransaction and NotaryChangeWireTransaction the output parties are the same as input parties else inputParties } val signingParties = it.transaction.sigs.map { it.by.toKnownParty() } // Input parties fire a bullets to all output parties, and to the signing parties. !! This is a rough guess of how the message moves in the network. // TODO : Expose artemis queue to get real message information. inputParties.cross(outputParties) + inputParties.cross(signingParties)</ID>
@ -3365,7 +3365,6 @@
<ID>MaxLineLength:ThrowableSerializer.kt$ThrowableSerializer${ try { // TODO: This will need reworking when we have multiple class loaders val clazz = Class.forName(proxy.exceptionClass, false, factory.classloader) // If it is CordaException or CordaRuntimeException, we can seek any constructor and then set the properties // Otherwise we just make a CordaRuntimeException if (CordaThrowable::class.java.isAssignableFrom(clazz) &amp;&amp; Throwable::class.java.isAssignableFrom(clazz)) { val typeInformation = factory.getTypeInformation(clazz) val constructor = typeInformation.constructor val params = constructor.parameters.map { parameter -&gt; proxy.additionalProperties[parameter.name] ?: proxy.additionalProperties[parameter.name.capitalize()] } val throwable = constructor.observedMethod.newInstance(*params.toTypedArray()) (throwable as CordaThrowable).apply { if (this.javaClass.name != proxy.exceptionClass) this.originalExceptionClassName = proxy.exceptionClass this.setMessage(proxy.message) this.setCause(proxy.cause) this.addSuppressed(proxy.suppressed) } return (throwable as Throwable).apply { this.stackTrace = proxy.stackTrace } } } catch (e: Exception) { logger.warn("Unexpected exception de-serializing throwable: ${proxy.exceptionClass}. Converting to CordaRuntimeException.", e) } // If the criteria are not met or we experience an exception constructing the exception, we fall back to our own unchecked exception. return CordaRuntimeException(proxy.exceptionClass, null, null).apply { this.setMessage(proxy.message) this.setCause(proxy.cause) this.stackTrace = proxy.stackTrace this.addSuppressed(proxy.suppressed) } }</ID>
<ID>MaxLineLength:TimedFlowTests.kt$TimedFlowTests$addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)</ID>
<ID>MaxLineLength:TimedFlowTests.kt$TimedFlowTests.Companion$defaultParameters = MockNetworkParameters().withServicePeerAllocationStrategy(InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin())</ID>
<ID>MaxLineLength:TimedFlowTests.kt$TimedFlowTests.TestNotaryService$@Suspendable override</ID>
<ID>MaxLineLength:TimedFlowTests.kt$TimedFlowTests.TestNotaryService$override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic&lt;Void?&gt;</ID>
<ID>MaxLineLength:TimedFlowTests.kt$TimedFlowTests.TestNotaryService$private</ID>
<ID>MaxLineLength:TimedFlowTests.kt$TimedFlowTests.TestNotaryService.&lt;no name provided&gt;$override</ID>
@ -4811,7 +4810,6 @@
<ID>WildcardImport:NotaryServiceFlow.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:NotaryServiceTests.kt$import net.corda.core.crypto.*</ID>
<ID>WildcardImport:NotaryServiceTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:NotaryUtils.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:NotaryWhitelistTests.kt$import net.corda.core.crypto.*</ID>
<ID>WildcardImport:NotaryWhitelistTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:OGSwapPricingExample.kt$import com.opengamma.strata.product.swap.*</ID>
@ -4843,8 +4841,6 @@
<ID>WildcardImport:PersistentIdentityServiceTests.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:PersistentNetworkMapCacheTest.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:PersistentStateServiceTests.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:PersistentUniquenessProvider.kt$import javax.persistence.*</ID>
<ID>WildcardImport:PersistentUniquenessProvider.kt$import net.corda.core.internal.notary.*</ID>
<ID>WildcardImport:Portfolio.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:PortfolioApi.kt$import javax.ws.rs.*</ID>
<ID>WildcardImport:PortfolioState.kt$import net.corda.core.contracts.*</ID>
@ -4869,7 +4865,6 @@
<ID>WildcardImport:RPCSecurityManagerImpl.kt$import org.apache.shiro.authc.*</ID>
<ID>WildcardImport:RPCServer.kt$import net.corda.core.utilities.*</ID>
<ID>WildcardImport:RPCServer.kt$import org.apache.activemq.artemis.api.core.client.*</ID>
<ID>WildcardImport:RaftUniquenessProvider.kt$import javax.persistence.*</ID>
<ID>WildcardImport:ReceiveFinalityFlowTest.kt$import net.corda.node.services.statemachine.StaffedFlowHospital.*</ID>
<ID>WildcardImport:ReceiveFinalityFlowTest.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:ReceiveTransactionFlow.kt$import net.corda.core.contracts.*</ID>