From 76d87b67cec34bfeaf8af844ce34a793ad5e486f Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Mon, 6 Aug 2018 13:14:53 +0100 Subject: [PATCH] CORDA-1844: Support for high throughput Observables shipped via RPC (#3698) * CORDA-1844: Exposing a problem via Unit test. * CORDA-1844: Unit test update following input from Andras. * CORDA-1844: Add optional parameter to reduce the time it takes to shutdown RPCServer. * CORDA-1844: Add optional parameter to reduce the time it takes to shutdown RPCServer and sensibly default it. Minor changes. --- .../net/corda/client/rpc/AbstractRPCTest.kt | 13 +++--- .../rpc/RPCHighThroughputObservableTests.kt | 43 +++++++++++++++++++ .../node/services/messaging/RPCServer.kt | 11 ++--- .../corda/testing/node/internal/RPCDriver.kt | 17 +++++--- 4 files changed, 65 insertions(+), 19 deletions(-) create mode 100644 client/rpc/src/test/kotlin/net/corda/client/rpc/RPCHighThroughputObservableTests.kt diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt index 014f852262..10cb4409cf 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt @@ -3,6 +3,7 @@ package net.corda.client.rpc import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.map import net.corda.core.messaging.RPCOps +import net.corda.core.utilities.seconds import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.node.User @@ -13,6 +14,7 @@ import net.corda.testing.node.internal.startRpcClient import org.apache.activemq.artemis.api.core.client.ClientSession import org.junit.Rule import org.junit.runners.Parameterized +import java.time.Duration open class AbstractRPCTest { @Rule @@ -44,19 +46,20 @@ open class AbstractRPCTest { ops: I, rpcUser: User = rpcTestUser, clientConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, - serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT - ): TestProxy { + serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, + queueDrainTimeout: Duration = 5.seconds + ): TestProxy { return when (mode) { RPCTestMode.InVm -> - startInVmRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { + startInVmRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration, queueDrainTimeout = queueDrainTimeout).flatMap { startInVmRpcClient(rpcUser.username, rpcUser.password, clientConfiguration).map { - TestProxy(it, { startInVmArtemisSession(rpcUser.username, rpcUser.password) }) + TestProxy(it) { startInVmArtemisSession(rpcUser.username, rpcUser.password) } } } RPCTestMode.Netty -> startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { (broker) -> startRpcClient(broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map { - TestProxy(it, { startArtemisSession(broker.hostAndPort!!, rpcUser.username, rpcUser.password) }) + TestProxy(it) { startArtemisSession(broker.hostAndPort!!, rpcUser.username, rpcUser.password) } } } }.get() diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCHighThroughputObservableTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCHighThroughputObservableTests.kt new file mode 100644 index 0000000000..72013ca955 --- /dev/null +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCHighThroughputObservableTests.kt @@ -0,0 +1,43 @@ +package net.corda.client.rpc + +import net.corda.core.messaging.RPCOps +import net.corda.core.utilities.millis +import net.corda.testing.node.internal.RPCDriverDSL +import net.corda.testing.node.internal.rpcDriver +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import rx.Observable +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals + +@RunWith(Parameterized::class) +class RPCHighThroughputObservableTests : AbstractRPCTest() { + + private fun RPCDriverDSL.testProxy(): TestOps { + return testProxy(TestOpsImpl(), queueDrainTimeout = 10.millis).ops + } + + internal interface TestOps : RPCOps { + + fun makeObservable(): Observable + } + + internal class TestOpsImpl : TestOps { + override val protocolVersion = 1 + + override fun makeObservable(): Observable = Observable.interval(0, TimeUnit.MICROSECONDS).map { it.toInt() + 1 } + } + + @Test + fun `simple observable`() { + rpcDriver { + val proxy = testProxy() + // This tests that the observations are transmitted correctly, also check that server side doesn't try to serialize the whole lot + // till client consumed some of the output produced. + val observations = proxy.makeObservable() + val observationsList = observations.take(4).toBlocking().toIterable().toList() + assertEquals(listOf(1, 2, 3, 4), observationsList) + } + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index a61c6619f5..eefe70f323 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -18,11 +18,7 @@ import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT import net.corda.core.serialization.deserialize -import net.corda.core.utilities.Try -import net.corda.core.utilities.contextLogger -import net.corda.core.utilities.days -import net.corda.core.utilities.debug -import net.corda.core.utilities.seconds +import net.corda.core.utilities.* import net.corda.node.internal.security.AuthorizingSubject import net.corda.node.internal.security.RPCSecurityManager import net.corda.node.serialization.amqp.RpcServerObservableSerializer @@ -247,9 +243,10 @@ class RPCServer( } } - fun close() { + fun close(queueDrainTimeout: Duration = 5.seconds) { + // Putting Stop message onto the queue will eventually make senderThread to stop. sendJobQueue.put(RpcSendJob.Stop) - senderThread?.join() + senderThread?.join(queueDrainTimeout.toMillis()) reaperScheduledFuture?.cancel(false) rpcExecutor?.shutdownNow() reaperExecutor?.shutdownNow() diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index f6d7614cdd..1ce8494c99 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -17,6 +17,7 @@ import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.RPCOps import net.corda.core.node.NetworkParameters import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.seconds import net.corda.node.internal.security.RPCSecurityManagerImpl import net.corda.node.services.messaging.RPCServer import net.corda.node.services.messaging.RPCServerConfiguration @@ -53,6 +54,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3 import java.lang.reflect.Method import java.nio.file.Path import java.nio.file.Paths +import java.time.Duration import java.util.* import net.corda.nodeapi.internal.config.User as InternalUser @@ -100,7 +102,6 @@ val fakeNodeLegalName = CordaX500Name(organisation = "Not:a:real:name", locality // Use a global pool so that we can run RPC tests in parallel private val globalPortAllocation = PortAllocation.Incremental(10000) private val globalDebugPortAllocation = PortAllocation.Incremental(5005) -private val globalMonitorPortAllocation = PortAllocation.Incremental(7005) fun rpcDriver( isDebug: Boolean = false, @@ -244,10 +245,11 @@ data class RPCDriverDSL( maxFileSize: Int = MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, - ops: I + ops: I, + queueDrainTimeout: Duration = 5.seconds ): CordaFuture { return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker -> - startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker) + startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker, queueDrainTimeout) } } @@ -440,7 +442,7 @@ data class RPCDriverDSL( } } - fun startInVmRpcBroker( + private fun startInVmRpcBroker( rpcUser: User = rpcTestUser, maxFileSize: Int = MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE @@ -468,7 +470,8 @@ data class RPCDriverDSL( nodeLegalName: CordaX500Name = fakeNodeLegalName, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, ops: I, - brokerHandle: RpcBrokerHandle + brokerHandle: RpcBrokerHandle, + queueDrainTimeout: Duration = 5.seconds ): RpcServerHandle { val locator = ActiveMQClient.createServerLocatorWithoutHA(brokerHandle.clientTransportConfiguration).apply { minLargeMessageSize = MAX_MESSAGE_SIZE @@ -485,7 +488,7 @@ data class RPCDriverDSL( configuration ) driverDSL.shutdownManager.registerShutdown { - rpcServer.close() + rpcServer.close(queueDrainTimeout) locator.close() } rpcServer.start(brokerHandle.serverControl) @@ -520,7 +523,7 @@ class RandomRpcUser { Generator.sequence(method.parameters.map { generatorStore[it.type] ?: throw Exception("No generator for ${it.type}") }).map { arguments -> - Call(method, { method.invoke(handle.proxy, *arguments.toTypedArray()) }) + Call(method) { method.invoke(handle.proxy, *arguments.toTypedArray()) } } } val callGenerator = Generator.choice(callGenerators)