diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt index ef1e9a71a4..a4a942dd26 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt @@ -7,6 +7,7 @@ import net.corda.client.rpc.GracefulReconnect import net.corda.client.rpc.MaxRpcRetryException import net.corda.client.rpc.RPCException import net.corda.client.rpc.internal.ReconnectingCordaRPCOps +import net.corda.core.internal.concurrent.doneFuture import net.corda.core.messaging.startTrackedFlow import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.OpaqueBytes @@ -25,7 +26,10 @@ import net.corda.testing.node.internal.FINANCE_CORDAPPS import net.corda.testing.node.internal.rpcDriver import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.ClassRule import org.junit.Test +import java.lang.Thread.sleep +import java.time.Duration import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import kotlin.concurrent.thread @@ -196,4 +200,34 @@ class CordaRPCClientReconnectionTest { } } -} \ No newline at end of file + @Test(timeout = 120_000) + fun `RPC connection can be shut down after being disconnected from the node`() { + driver(DriverParameters(cordappsForAllNodes = emptyList())) { + val address = NetworkHostAndPort("localhost", portAllocator.nextPort()) + fun startNode(): NodeHandle { + return startNode( + providedName = CHARLIE_NAME, + rpcUsers = listOf(CordaRPCClientTest.rpcUser), + customOverrides = mapOf("rpcSettings.address" to address.toString()) + ).getOrThrow() + } + + val node = startNode() + CordaRPCClient(node.rpcAddress).start(rpcUser.username, rpcUser.password, gracefulReconnect).use { + node.stop() + thread() { + it.proxy.startTrackedFlow( + ::CashIssueFlow, + 10.DOLLARS, + OpaqueBytes.of(0), + defaultNotaryIdentity + ) + } + // This just gives the flow time to get started so the RPC detects a problem + sleep(1000) + it.close() + } + } + } + +} diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index 65d0cd6abf..afe09497a7 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -143,6 +143,7 @@ class ReconnectingCordaRPCOps private constructor( UNCONNECTED, CONNECTED, CONNECTING, CLOSED, DIED } + @Volatile private var currentState = UNCONNECTED init { @@ -151,14 +152,22 @@ class ReconnectingCordaRPCOps private constructor( private val current: CordaRPCConnection @Synchronized get() = when (currentState) { // The first attempt to establish a connection will try every address only once. - UNCONNECTED -> connect(infiniteRetries = false) - CONNECTED -> currentRPCConnection!! - CLOSED -> throw IllegalArgumentException("The ReconnectingRPCConnection has been closed.") - CONNECTING, DIED -> throw IllegalArgumentException("Illegal state: $currentState ") + UNCONNECTED -> + connect(infiniteRetries = false) ?: throw IllegalArgumentException("The ReconnectingRPCConnection has been closed.") + CONNECTED -> + currentRPCConnection!! + CLOSED -> + throw IllegalArgumentException("The ReconnectingRPCConnection has been closed.") + CONNECTING, DIED -> + throw IllegalArgumentException("Illegal state: $currentState ") } @Synchronized private fun doReconnect(e: Throwable, previousConnection: CordaRPCConnection?) { + if (isClosed()) { + // We don't want to reconnect if we purposely closed + return + } if (previousConnection != currentRPCConnection) { // We've already done this, skip return @@ -182,16 +191,20 @@ class ReconnectingCordaRPCOps private constructor( val previousConnection = currentRPCConnection doReconnect(e, previousConnection) } - @Synchronized - private fun connect(infiniteRetries: Boolean): CordaRPCConnection { + private fun connect(infiniteRetries: Boolean): CordaRPCConnection? { currentState = CONNECTING - currentRPCConnection = if (infiniteRetries) { - establishConnectionWithRetry() - } else { - establishConnectionWithRetry(retries = nodeHostAndPorts.size) + synchronized(this) { + currentRPCConnection = if (infiniteRetries) { + establishConnectionWithRetry() + } else { + establishConnectionWithRetry(retries = nodeHostAndPorts.size) + } + // It's possible we could get closed while waiting for the connection to establish. + if (!isClosed()) { + currentState = CONNECTED + } } - currentState = CONNECTED - return currentRPCConnection!! + return currentRPCConnection } /** @@ -205,7 +218,11 @@ class ReconnectingCordaRPCOps private constructor( retryInterval: Duration = 1.seconds, roundRobinIndex: Int = 0, retries: Int = -1 - ): CordaRPCConnection { + ): CordaRPCConnection? { + if (isClosed()) { + // We've decided to exit for some reason (maybe the client is being shutdown) + return null + } val attemptedAddress = nodeHostAndPorts[roundRobinIndex] log.info("Connecting to: $attemptedAddress") try { @@ -255,16 +272,19 @@ class ReconnectingCordaRPCOps private constructor( get() = current.proxy override val serverProtocolVersion get() = current.serverProtocolVersion - @Synchronized override fun notifyServerAndClose() { currentState = CLOSED - currentRPCConnection?.notifyServerAndClose() + synchronized(this) { + currentRPCConnection?.notifyServerAndClose() + } } - @Synchronized override fun forceClose() { currentState = CLOSED - currentRPCConnection?.forceClose() + synchronized(this) { + currentRPCConnection?.forceClose() + } } + fun isClosed(): Boolean = currentState == CLOSED } private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler { private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow") @@ -283,6 +303,9 @@ class ReconnectingCordaRPCOps private constructor( * A negative number for [maxNumberOfAttempts] means an unlimited number of retries will be performed. */ private fun doInvoke(method: Method, args: Array?, maxNumberOfAttempts: Int): Any? { + if (reconnectingRPCConnection.isClosed()) { + throw RPCException("Cannot execute RPC command after client has shut down.") + } var remainingAttempts = maxNumberOfAttempts var lastException: Throwable? = null while (remainingAttempts != 0) { diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt index 102b0a953c..b5052d9bbf 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt @@ -57,8 +57,10 @@ class ReconnectingObservable private constructor(subscriber: ReconnectingSubs private fun scheduleResubscribe(error: Throwable) { if (unsubscribed) return reconnectingRPCConnection.observersPool.execute { - if (unsubscribed) return@execute + if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute reconnectingRPCConnection.reconnectOnError(error) + // It can take a while to reconnect so we might find that we've shutdown in in the meantime + if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute val newDataFeed = createDataFeed() subscribeImmediately(newDataFeed) } diff --git a/detekt-baseline.xml b/detekt-baseline.xml index 53f7b49139..d87a2ce4d2 100644 --- a/detekt-baseline.xml +++ b/detekt-baseline.xml @@ -182,7 +182,8 @@ ComplexMethod:RPCClientProxyHandler.kt$RPCClientProxyHandler$// This is the general function that transforms a client side RPC to internal Artemis messages. override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?): Any? ComplexMethod:RPCClientProxyHandler.kt$RPCClientProxyHandler$private fun attemptReconnect() ComplexMethod:RPCServer.kt$RPCServer$private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) - ComplexMethod:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ReconnectingRPCConnection$ private tailrec fun establishConnectionWithRetry( retryInterval: Duration = 1.seconds, roundRobinIndex: Int = 0, retries: Int = -1 ): CordaRPCConnection + ComplexMethod:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ErrorInterceptingHandler$ private fun doInvoke(method: Method, args: Array<out Any>?, maxNumberOfAttempts: Int): Any? + ComplexMethod:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ReconnectingRPCConnection$ private tailrec fun establishConnectionWithRetry( retryInterval: Duration = 1.seconds, roundRobinIndex: Int = 0, retries: Int = -1 ): CordaRPCConnection? ComplexMethod:RemoteTypeCarpenter.kt$SchemaBuildingRemoteTypeCarpenter$override fun carpent(typeInformation: RemoteTypeInformation): Type ComplexMethod:RpcReconnectTests.kt$RpcReconnectTests$ @Test fun `test that the RPC client is able to reconnect and proceed after node failure, restart, or connection reset`() ComplexMethod:SchemaMigration.kt$SchemaMigration$ private fun migrateOlderDatabaseToUseLiquibase(existingCheckpoints: Boolean): Boolean @@ -2516,7 +2517,6 @@ MaxLineLength:MockServices.kt$MockServices.Companion$makeMockMockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys.toSet(), keyManagementService, schemaService, database) MaxLineLength:MockServices.kt$MockServices.Companion$return object : MockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys.toTypedArray(), keyManagementService) { override var networkParametersService: NetworkParametersService = MockNetworkParametersStorage(networkParameters) override val vaultService: VaultService = makeVaultService(schemaService, persistence, cordappLoader) override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) { ServiceHubInternal.recordTransactions( statesToRecord, txs as? Collection ?: txs.toList(), validatedTransactions as WritableTransactionStorage, mockStateMachineRecordedTransactionMappingStorage, vaultService as VaultServiceInternal, persistence ) } override fun jdbcSession(): Connection = persistence.createSession() override fun <T : Any?> withEntityManager(block: EntityManager.() -> T): T { return block(contextTransaction.restrictedEntityManager) } override fun withEntityManager(block: Consumer<EntityManager>) { return block.accept(contextTransaction.restrictedEntityManager) } } MaxLineLength:MockServices.kt$MockServices.Companion$val database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService, schemaService.internalSchemas()) - MaxLineLength:MyCustomNotaryService.kt$MyCustomValidatingNotaryService : SinglePartyNotaryService MaxLineLength:Network.kt$Network$node.getWorldMapLocation()?.coordinate?.project(mapPane.width, mapPane.height, 85.0511, -85.0511, -180.0, 180.0) ?: ScreenCoordinate(0.0, 0.0) MaxLineLength:Network.kt$Network$private val peerButtons = peerComponents.filtered { myIdentity.value !in it.nodeInfo.legalIdentitiesAndCerts.map { it.party } }.map { it.button } MaxLineLength:Network.kt$Network$val inputParties = it.inputs.sequence() .map { it as? PartiallyResolvedTransaction.InputResolution.Resolved } .filterNotNull() .map { it.stateAndRef.state.data }.getParties() val outputParties = it.transaction.coreTransaction.let { if (it is WireTransaction) it.outputStates.observable().getParties() // For ContractUpgradeWireTransaction and NotaryChangeWireTransaction the output parties are the same as input parties else inputParties } val signingParties = it.transaction.sigs.map { it.by.toKnownParty() } // Input parties fire a bullets to all output parties, and to the signing parties. !! This is a rough guess of how the message moves in the network. // TODO : Expose artemis queue to get real message information. inputParties.cross(outputParties) + inputParties.cross(signingParties) @@ -3365,7 +3365,6 @@ MaxLineLength:ThrowableSerializer.kt$ThrowableSerializer${ try { // TODO: This will need reworking when we have multiple class loaders val clazz = Class.forName(proxy.exceptionClass, false, factory.classloader) // If it is CordaException or CordaRuntimeException, we can seek any constructor and then set the properties // Otherwise we just make a CordaRuntimeException if (CordaThrowable::class.java.isAssignableFrom(clazz) && Throwable::class.java.isAssignableFrom(clazz)) { val typeInformation = factory.getTypeInformation(clazz) val constructor = typeInformation.constructor val params = constructor.parameters.map { parameter -> proxy.additionalProperties[parameter.name] ?: proxy.additionalProperties[parameter.name.capitalize()] } val throwable = constructor.observedMethod.newInstance(*params.toTypedArray()) (throwable as CordaThrowable).apply { if (this.javaClass.name != proxy.exceptionClass) this.originalExceptionClassName = proxy.exceptionClass this.setMessage(proxy.message) this.setCause(proxy.cause) this.addSuppressed(proxy.suppressed) } return (throwable as Throwable).apply { this.stackTrace = proxy.stackTrace } } } catch (e: Exception) { logger.warn("Unexpected exception de-serializing throwable: ${proxy.exceptionClass}. Converting to CordaRuntimeException.", e) } // If the criteria are not met or we experience an exception constructing the exception, we fall back to our own unchecked exception. return CordaRuntimeException(proxy.exceptionClass, null, null).apply { this.setMessage(proxy.message) this.setCause(proxy.cause) this.stackTrace = proxy.stackTrace this.addSuppressed(proxy.suppressed) } } MaxLineLength:TimedFlowTests.kt$TimedFlowTests$addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) MaxLineLength:TimedFlowTests.kt$TimedFlowTests.Companion$defaultParameters = MockNetworkParameters().withServicePeerAllocationStrategy(InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin()) - MaxLineLength:TimedFlowTests.kt$TimedFlowTests.TestNotaryService$@Suspendable override MaxLineLength:TimedFlowTests.kt$TimedFlowTests.TestNotaryService$override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> MaxLineLength:TimedFlowTests.kt$TimedFlowTests.TestNotaryService$private MaxLineLength:TimedFlowTests.kt$TimedFlowTests.TestNotaryService.<no name provided>$override @@ -4811,7 +4810,6 @@ WildcardImport:NotaryServiceFlow.kt$import net.corda.core.flows.* WildcardImport:NotaryServiceTests.kt$import net.corda.core.crypto.* WildcardImport:NotaryServiceTests.kt$import net.corda.testing.node.internal.* - WildcardImport:NotaryUtils.kt$import net.corda.core.flows.* WildcardImport:NotaryWhitelistTests.kt$import net.corda.core.crypto.* WildcardImport:NotaryWhitelistTests.kt$import net.corda.testing.node.internal.* WildcardImport:OGSwapPricingExample.kt$import com.opengamma.strata.product.swap.* @@ -4843,8 +4841,6 @@ WildcardImport:PersistentIdentityServiceTests.kt$import net.corda.testing.core.* WildcardImport:PersistentNetworkMapCacheTest.kt$import net.corda.testing.core.* WildcardImport:PersistentStateServiceTests.kt$import net.corda.core.contracts.* - WildcardImport:PersistentUniquenessProvider.kt$import javax.persistence.* - WildcardImport:PersistentUniquenessProvider.kt$import net.corda.core.internal.notary.* WildcardImport:Portfolio.kt$import net.corda.core.contracts.* WildcardImport:PortfolioApi.kt$import javax.ws.rs.* WildcardImport:PortfolioState.kt$import net.corda.core.contracts.* @@ -4869,7 +4865,6 @@ WildcardImport:RPCSecurityManagerImpl.kt$import org.apache.shiro.authc.* WildcardImport:RPCServer.kt$import net.corda.core.utilities.* WildcardImport:RPCServer.kt$import org.apache.activemq.artemis.api.core.client.* - WildcardImport:RaftUniquenessProvider.kt$import javax.persistence.* WildcardImport:ReceiveFinalityFlowTest.kt$import net.corda.node.services.statemachine.StaffedFlowHospital.* WildcardImport:ReceiveFinalityFlowTest.kt$import net.corda.testing.node.internal.* WildcardImport:ReceiveTransactionFlow.kt$import net.corda.core.contracts.*