From 17eb30965dd3edae7b8c712f56a55e584023b89a Mon Sep 17 00:00:00 2001 From: szymonsztuka Date: Wed, 7 Mar 2018 17:14:32 +0000 Subject: [PATCH 1/2] CORDA-792 - Standalone Shell - fix test on Windows (#2757) Fix test which uses Paths.get() and doesn't match on Windows. --- .../tools/shell/StandaloneShellArgsParserTest.kt | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tools/shell/src/test/kotlin/net/corda/tools/shell/StandaloneShellArgsParserTest.kt b/tools/shell/src/test/kotlin/net/corda/tools/shell/StandaloneShellArgsParserTest.kt index 4ac40cf64c..9c96317e98 100644 --- a/tools/shell/src/test/kotlin/net/corda/tools/shell/StandaloneShellArgsParserTest.kt +++ b/tools/shell/src/test/kotlin/net/corda/tools/shell/StandaloneShellArgsParserTest.kt @@ -9,7 +9,7 @@ import java.io.File class StandaloneShellArgsParserTest { - private val CONFIG_FILE = File(javaClass.classLoader.getResource("config.conf")!!.file) + private val CONFIG_FILE = File(StandaloneShellArgsParserTest::class.java.getResource("/config.conf").toURI()) @Test fun args_to_cmd_options() { @@ -33,8 +33,8 @@ class StandaloneShellArgsParserTest { "--keystore-type", "JKS") val expectedOptions = CommandLineOptions(configFile = "/x/y/z/config.conf", - commandsDirectory = Paths.get("/x/y/commands"), - cordappsDirectory = Paths.get("/x/y/cordapps"), + commandsDirectory = Paths.get("/x/y/commands").normalize().toAbsolutePath(), + cordappsDirectory = Paths.get("/x/y/cordapps").normalize().toAbsolutePath(), host = "alocalhost", port = "1234", user = "demo", @@ -42,11 +42,11 @@ class StandaloneShellArgsParserTest { help = true, loggingLevel = Level.DEBUG, sshdPort = "2223", - sshdHostKeyDirectory = Paths.get("/x/y/ssh"), + sshdHostKeyDirectory = Paths.get("/x/y/ssh").normalize().toAbsolutePath(), keyStorePassword = "pass1", trustStorePassword = "pass2", - keyStoreFile = Paths.get("/x/y/keystore.jks"), - trustStoreFile = Paths.get("/x/y/truststore.jks"), + keyStoreFile = Paths.get("/x/y/keystore.jks").normalize().toAbsolutePath(), + trustStoreFile = Paths.get("/x/y/truststore.jks").normalize().toAbsolutePath(), trustStoreType = "dummy", keyStoreType = "JKS") @@ -125,7 +125,7 @@ class StandaloneShellArgsParserTest { } @Test - fun acmd_options_to_config_from_file() { + fun cmd_options_to_config_from_file() { val options = CommandLineOptions(configFile = CONFIG_FILE.absolutePath, commandsDirectory = null, From 98a6c71480f0bf23664a18ba29d4bab46d3e2b08 Mon Sep 17 00:00:00 2001 From: bpaunescu Date: Thu, 8 Mar 2018 08:21:35 +0000 Subject: [PATCH 2/2] CORDA-296: added handler for connection failover (#2639) * CORDA-296: added handler for connection failover print rpc address during node startup * address PR comments: typo, whitespace, onError observables * reworked rpc client msg buffering to avoid race conditions when failure happens while the request is being prepared to be sent; added unit test for 8 threads sharing same client sending 1000 requests * decreased sleep time in rpc test, code cleanup --- .../net/corda/client/rpc/RPCStabilityTests.kt | 114 ++++++++++++++++++ .../rpc/internal/RPCClientProxyHandler.kt | 40 +++++- 2 files changed, 153 insertions(+), 1 deletion(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 0ac85510f9..5025507f9c 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -7,13 +7,16 @@ import net.corda.core.crypto.random63BitValue import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.transpose import net.corda.core.messaging.RPCOps +import net.corda.core.node.NodeInfo import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.serialize import net.corda.core.utilities.* import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.nodeapi.RPCApi +import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.internal.testThreadFactory +import net.corda.testing.node.User import net.corda.testing.node.internal.* import org.apache.activemq.artemis.api.core.SimpleString import org.junit.After @@ -25,12 +28,15 @@ import rx.Observable import rx.subjects.PublishSubject import rx.subjects.UnicastSubject import java.time.Duration +import java.time.Instant import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicLong +import kotlin.concurrent.thread +import kotlin.test.assertNotNull class RPCStabilityTests { @Rule @@ -253,6 +259,114 @@ class RPCStabilityTests { } } + @Test + fun `client reconnects to server and resends buffered messages`() { + rpcDriver(startNodesInProcess = false) { + var nodeInfo: NodeInfo? = null + var nodeTime: Instant? = null + val alice = startNode(providedName = ALICE_NAME, + rpcUsers = listOf(User("alice", "alice", setOf("ALL")))).getOrThrow() + CordaRPCClient(alice.rpcAddress).use("alice", "alice") { connection -> + val proxy = connection.proxy + alice.stop() + val nodeInfoThread = thread { + nodeInfo = proxy.nodeInfo() + } + + val currentTimeThread = thread { + nodeTime = proxy.currentNodeTime() + } + + Thread.sleep(5000) + startNode(providedName = ALICE_NAME, + rpcUsers = listOf(User("alice", "alice", setOf("ALL"))), + customOverrides = mapOf("rpcSettings" to mapOf("address" to "localhost:${alice.rpcAddress.port}"))) + currentTimeThread.join() + nodeInfoThread.join() + assertNotNull(nodeInfo) + assertNotNull(nodeTime) + } + } + } + + @Test + fun `connection failover fails, rpc calls throw`() { + rpcDriver { + val ops = object : ReconnectOps { + override val protocolVersion = 0 + override fun ping() = "pong" + } + + val serverFollower = shutdownManager.follower() + val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! + serverFollower.unfollow() + // Set retry interval to 1s to reduce test duration + val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5) + val clientFollower = shutdownManager.follower() + val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() + clientFollower.unfollow() + assertEquals("pong", client.ping()) + serverFollower.shutdown() + try { + client.ping() + } catch (e: Exception) { + assertTrue(e is RPCException) + } + clientFollower.shutdown() // Driver would do this after the new server, causing hang. + } + } + + interface ThreadOps : RPCOps { + fun sendMessage(id: Int, msgNo: Int): String + } + + @Test + fun `multiple threads with 1000 messages for each thread`() { + val messageNo = 1000 + val threadNo = 8 + val ops = object : ThreadOps { + override val protocolVersion = 0 + override fun sendMessage(id: Int, msgNo: Int): String { + return "($id-$msgNo)" + } + } + + rpcDriver(startNodesInProcess = false) { + val serverFollower = shutdownManager.follower() + val serverPort = startRpcServer(rpcUser = User("alice", "alice", setOf("ALL")), + ops = ops).getOrThrow().broker.hostAndPort!! + + serverFollower.unfollow() + val proxy = RPCClient(serverPort).start(ThreadOps::class.java, "alice", "alice").proxy + val expectedMap = mutableMapOf() + val resultsMap = mutableMapOf() + + (1 until threadNo).forEach { nr -> + (1 until messageNo).forEach { msgNo -> + expectedMap[nr] = expectedMap.getOrDefault(nr, StringBuilder()).append("($nr-$msgNo)") + } + } + + val threads = mutableMapOf() + (1 until threadNo).forEach { nr -> + val thread = thread { + (1 until messageNo).forEach { msgNo -> + resultsMap[nr] = resultsMap.getOrDefault(nr, StringBuilder()).append(proxy.sendMessage(nr, msgNo)) + } + } + threads[nr] = thread + } + // give the threads a chance to start sending some messages + Thread.sleep(50) + serverFollower.shutdown() + startRpcServer(rpcUser = User("alice", "alice", setOf("ALL")), + ops = ops, customPort = serverPort).getOrThrow() + threads.values.forEach { it.join() } + (1 until threadNo).forEach { assertEquals(expectedMap[it].toString(), resultsMap[it].toString()) } + } + + } + interface TrackSubscriberOps : RPCOps { fun subscribe(): Observable } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 2f65f21e40..f8cbc370ad 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -161,6 +161,9 @@ class RPCClientProxyHandler( build() } + // Used to buffer client requests if the server is unavailable + private val outgoingRequestBuffer = ConcurrentHashMap() + private var sessionFactory: ClientSessionFactory? = null private var producerSession: ClientSession? = null private var consumerSession: ClientSession? = null @@ -195,6 +198,7 @@ class RPCClientProxyHandler( consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress) rpcConsumer = consumerSession!!.createConsumer(clientAddress) rpcConsumer!!.setMessageHandler(this::artemisMessageHandler) + producerSession!!.addFailoverListener(this::failoverHandler) lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET) consumerSession!!.start() producerSession!!.start() @@ -228,8 +232,14 @@ class RPCClientProxyHandler( require(rpcReplyMap.put(replyId, replyFuture) == null) { "Generated several RPC requests with same ID $replyId" } + + outgoingRequestBuffer[replyId] = request + // try and send the request sendMessage(request) - return replyFuture.getOrThrow() + val result = replyFuture.getOrThrow() + // at this point the server responded, remove the buffered request + outgoingRequestBuffer.remove(replyId) + return result } catch (e: RuntimeException) { // Already an unchecked exception, so just rethrow it throw e @@ -393,6 +403,34 @@ class RPCClientProxyHandler( sendMessage(RPCApi.ClientToServer.ObservablesClosed(observableIds)) } } + + private fun failoverHandler(event: FailoverEventType) { + when (event) { + FailoverEventType.FAILURE_DETECTED -> { + log.warn("RPC server unavailable. RPC calls are being buffered.") + } + + FailoverEventType.FAILOVER_COMPLETED -> { + log.info("RPC server available. Draining request buffer.") + outgoingRequestBuffer.keys.forEach { replyId -> + outgoingRequestBuffer[replyId]?.let { sendMessage(it) } + } + } + + FailoverEventType.FAILOVER_FAILED -> { + log.error("Could not reconnect to the RPC server. All buffered requests will be discarded and RPC calls " + + "will throw an RPCException.") + rpcReplyMap.forEach { id, replyFuture -> + replyFuture.setException(RPCException("Could not re-connect to RPC server. Failover failed.")) + val observable = observableContext.observableMap.getIfPresent(id) + observable?.onError(RPCException("Could not re-connect to RPC server. Failover failed.")) + } + outgoingRequestBuffer.clear() + rpcReplyMap.clear() + callSiteMap?.clear() + } + } + } } private typealias RpcObservableMap = Cache>>