From e6af60edda7be2a55b18b2ef625afa9e7566a16a Mon Sep 17 00:00:00 2001 From: Tamas Veingartner Date: Fri, 7 Aug 2020 09:18:09 +0100 Subject: [PATCH] =?UTF-8?q?NOTICK=20Migrate=20recent=20RPC=20related=20cha?= =?UTF-8?q?nges=20to=20OS,=20as=20these=20were=20initially=20imp=E2=80=A6?= =?UTF-8?q?=20(#6532)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Migrate recent RPC related changes to OS, as these were initially implemented in ENT only * tests cleanup * cleanup imports --- .../CordaRPCClientReconnectionTest.kt | 28 ++++++- .../rpc/internal/RPCClientProxyHandler.kt | 75 ++++++++++--------- .../rpc/internal/ReconnectingCordaRPCOps.kt | 5 +- 3 files changed, 67 insertions(+), 41 deletions(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt index fbf55e194b..5a074204c6 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt @@ -6,6 +6,7 @@ import net.corda.client.rpc.CordaRPCClientTest import net.corda.client.rpc.GracefulReconnect import net.corda.client.rpc.MaxRpcRetryException import net.corda.client.rpc.RPCException +import net.corda.client.rpc.UnrecoverableRPCException import net.corda.client.rpc.internal.ReconnectingCordaRPCOps import net.corda.core.messaging.startTrackedFlow import net.corda.core.utilities.NetworkHostAndPort @@ -82,6 +83,29 @@ class CordaRPCClientReconnectionTest { } + @Test(timeout=300_000) + fun `minimum server protocol version should cause exception if higher than allowed`() { + driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS)) { + val address = NetworkHostAndPort("localhost", portAllocator.nextPort()) + + fun startNode(): NodeHandle { + return startNode( + providedName = CHARLIE_NAME, + rpcUsers = listOf(CordaRPCClientTest.rpcUser), + customOverrides = mapOf("rpcSettings.address" to address.toString()) + ).getOrThrow() + } + + assertThatThrownBy { + val node = startNode () + val client = CordaRPCClient(node.rpcAddress, config.copy(minimumServerProtocolVersion = 100, maxReconnectAttempts = 1)) + client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect) + } + .isInstanceOf(UnrecoverableRPCException::class.java) + .hasMessageStartingWith("Requested minimum protocol version (100) is higher than the server's supported protocol version ") + } + } + @Test(timeout=300_000) fun `rpc client calls and returned observables continue working when the server crashes and restarts`() { driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS)) { @@ -292,7 +316,7 @@ class CordaRPCClientReconnectionTest { val node = startNode() CordaRPCClient(node.rpcAddress, config).start(rpcUser.username, rpcUser.password, gracefulReconnect).use { node.stop() - thread() { + thread { it.proxy.startTrackedFlow( ::CashIssueFlow, 10.DOLLARS, @@ -349,4 +373,4 @@ class CordaRPCClientReconnectionTest { } } } -} +} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index b0965f29b4..678abe5338 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -46,6 +46,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage import org.apache.activemq.artemis.api.core.client.ClientProducer import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.api.core.client.ClientSessionFactory +import org.apache.activemq.artemis.api.core.client.FailoverEventListener import org.apache.activemq.artemis.api.core.client.FailoverEventType import org.apache.activemq.artemis.api.core.client.ServerLocator import rx.Notification @@ -124,7 +125,7 @@ internal class RPCClientProxyHandler( val toStringMethod: Method = Object::toString.javaMethod!! val equalsMethod: Method = Object::equals.javaMethod!! val hashCodeMethod: Method = Object::hashCode.javaMethod!! - + var terminating = false private fun addRpcCallSiteToThrowable(throwable: Throwable, callSite: CallSite) { var currentThrowable = throwable while (true) { @@ -217,7 +218,7 @@ internal class RPCClientProxyHandler( .weakValues() .removalListener(onObservableRemove) .executor(MoreExecutors.directExecutor()), - "RpcClientProxyHandler_rpcObservable" + "RpcClientProxyHandler_rpcObservable" ) } @@ -233,6 +234,22 @@ internal class RPCClientProxyHandler( private val sendingEnabled = AtomicBoolean(true) // Used to interrupt failover thread (i.e. client is closed while failing over). private var haFailoverThread: Thread? = null + private val haFailoverHandler: FailoverHandler = FailoverHandler( + detected = { log.warn("Connection failure. Attempting to reconnect using back-up addresses.") + cleanUpOnConnectionLoss() + sessionFactory?.apply { + connection.destroy() + cleanup() + close() + } + haFailoverThread = Thread.currentThread() + attemptReconnect() + }) + private val defaultFailoverHandler: FailoverHandler = FailoverHandler( + detected = { cleanUpOnConnectionLoss() }, + completed = { sendingEnabled.set(true) + log.info("RPC server available.")}, + failed = { log.error("Could not reconnect to the RPC server.")}) /** * Start the client. This creates the per-client queue, starts the consumer session and the reaper. @@ -265,15 +282,27 @@ internal class RPCClientProxyHandler( } // Depending on how the client is constructed, connection failure is treated differently if (serverLocator.staticTransportConfigurations.size == 1) { - sessionFactory!!.addFailoverListener(this::failoverHandler) + sessionFactory!!.addFailoverListener(defaultFailoverHandler) } else { - sessionFactory!!.addFailoverListener(this::haFailoverHandler) + sessionFactory!!.addFailoverListener(haFailoverHandler) } initSessions() lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET) startSessions() } + class FailoverHandler(private val detected: () -> Unit = {}, + private val completed: () -> Unit = {}, + private val failed: () -> Unit = {}): FailoverEventListener { + override fun failoverEvent(eventType: FailoverEventType?) { + when (eventType) { + FailoverEventType.FAILURE_DETECTED -> { detected() } + FailoverEventType.FAILOVER_COMPLETED -> { completed() } + FailoverEventType.FAILOVER_FAILED -> { if (!terminating) failed() } + } + } + } + // This is the general function that transforms a client side RPC to internal Artemis messages. override fun invoke(proxy: Any, method: Method, arguments: Array?): Any? { lifeCycle.requireState { it == State.STARTED || it == State.SERVER_VERSION_NOT_SET } @@ -313,6 +342,10 @@ internal class RPCClientProxyHandler( "Generated several RPC requests with same ID $replyId" } + if (request.methodName.equals("terminate", true)){ + terminating = true + } + sendMessage(request) return replyFuture.getOrThrow() } catch (e: RuntimeException) { @@ -573,7 +606,7 @@ internal class RPCClientProxyHandler( log.debug { "Connected successfully after $reconnectAttempt attempts using ${transport.params}." } log.info("RPC server available.") - sessionFactory!!.addFailoverListener(this::haFailoverHandler) + sessionFactory!!.addFailoverListener(haFailoverHandler) initSessions() startSessions() sendingEnabled.set(true) @@ -602,38 +635,6 @@ internal class RPCClientProxyHandler( producerSession!!.start() } - private fun haFailoverHandler(event: FailoverEventType) { - if (event == FailoverEventType.FAILURE_DETECTED) { - log.warn("Connection failure. Attempting to reconnect using back-up addresses.") - cleanUpOnConnectionLoss() - sessionFactory?.apply { - connection.destroy() - cleanup() - close() - } - haFailoverThread = Thread.currentThread() - attemptReconnect() - } - // Other events are not considered as reconnection is not done by Artemis. - } - - private fun failoverHandler(event: FailoverEventType) { - when (event) { - FailoverEventType.FAILURE_DETECTED -> { - cleanUpOnConnectionLoss() - } - - FailoverEventType.FAILOVER_COMPLETED -> { - sendingEnabled.set(true) - log.info("RPC server available.") - } - - FailoverEventType.FAILOVER_FAILED -> { - log.error("Could not reconnect to the RPC server.") - } - } - } - private fun cleanUpOnConnectionLoss() { sendingEnabled.set(false) log.warn("Terminating observables.") diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index 641d2323ca..4b4945f4f6 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -9,6 +9,7 @@ import net.corda.client.rpc.MaxRpcRetryException import net.corda.client.rpc.PermissionException import net.corda.client.rpc.RPCConnection import net.corda.client.rpc.RPCException +import net.corda.client.rpc.UnrecoverableRPCException import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CLOSED import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTED import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTING @@ -211,7 +212,7 @@ class ReconnectingCordaRPCOps private constructor( * Establishes a connection by automatically retrying if the attempt to establish a connection fails. * * @param retryInterval the interval between retries. - * @param roundRobinIndex index of the address that will be used for the connection. + * @param roundRobinIndex the index of the address that will be used for the connection. * @param retries the number of retries remaining. A negative value implies infinite retries. */ private tailrec fun establishConnectionWithRetry( @@ -240,7 +241,7 @@ class ReconnectingCordaRPCOps private constructor( } } catch (ex: Exception) { when (ex) { - is ActiveMQSecurityException -> { + is UnrecoverableRPCException, is ActiveMQSecurityException -> { log.error("Failed to login to node.", ex) throw ex }