CORDA-1844: Support for high throughput Observables shipped via RPC (#3698)

* CORDA-1844: Exposing a problem via Unit test.

* CORDA-1844: Unit test update following input from Andras.

* CORDA-1844: Add optional parameter to reduce the time it takes to shutdown RPCServer.

* CORDA-1844: Add optional parameter to reduce the time it takes to shutdown RPCServer and sensibly default it.

Minor changes.
This commit is contained in:
Viktor Kolomeyko 2018-08-06 13:14:53 +01:00 committed by GitHub
parent 441fe78e41
commit 76d87b67ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 19 deletions

View File

@ -3,6 +3,7 @@ package net.corda.client.rpc
import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.seconds
import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.node.User import net.corda.testing.node.User
@ -13,6 +14,7 @@ import net.corda.testing.node.internal.startRpcClient
import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.api.core.client.ClientSession
import org.junit.Rule import org.junit.Rule
import org.junit.runners.Parameterized import org.junit.runners.Parameterized
import java.time.Duration
open class AbstractRPCTest { open class AbstractRPCTest {
@Rule @Rule
@ -44,19 +46,20 @@ open class AbstractRPCTest {
ops: I, ops: I,
rpcUser: User = rpcTestUser, rpcUser: User = rpcTestUser,
clientConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, clientConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
): TestProxy<I> { queueDrainTimeout: Duration = 5.seconds
): TestProxy<I> {
return when (mode) { return when (mode) {
RPCTestMode.InVm -> RPCTestMode.InVm ->
startInVmRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { startInVmRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration, queueDrainTimeout = queueDrainTimeout).flatMap {
startInVmRpcClient<I>(rpcUser.username, rpcUser.password, clientConfiguration).map { startInVmRpcClient<I>(rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startInVmArtemisSession(rpcUser.username, rpcUser.password) }) TestProxy(it) { startInVmArtemisSession(rpcUser.username, rpcUser.password) }
} }
} }
RPCTestMode.Netty -> RPCTestMode.Netty ->
startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { (broker) -> startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { (broker) ->
startRpcClient<I>(broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map { startRpcClient<I>(broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startArtemisSession(broker.hostAndPort!!, rpcUser.username, rpcUser.password) }) TestProxy(it) { startArtemisSession(broker.hostAndPort!!, rpcUser.username, rpcUser.password) }
} }
} }
}.get() }.get()

View File

@ -0,0 +1,43 @@
package net.corda.client.rpc
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.millis
import net.corda.testing.node.internal.RPCDriverDSL
import net.corda.testing.node.internal.rpcDriver
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import rx.Observable
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
@RunWith(Parameterized::class)
class RPCHighThroughputObservableTests : AbstractRPCTest() {
private fun RPCDriverDSL.testProxy(): TestOps {
return testProxy<TestOps>(TestOpsImpl(), queueDrainTimeout = 10.millis).ops
}
internal interface TestOps : RPCOps {
fun makeObservable(): Observable<Int>
}
internal class TestOpsImpl : TestOps {
override val protocolVersion = 1
override fun makeObservable(): Observable<Int> = Observable.interval(0, TimeUnit.MICROSECONDS).map { it.toInt() + 1 }
}
@Test
fun `simple observable`() {
rpcDriver {
val proxy = testProxy()
// This tests that the observations are transmitted correctly, also check that server side doesn't try to serialize the whole lot
// till client consumed some of the output produced.
val observations = proxy.makeObservable()
val observationsList = observations.take(4).toBlocking().toIterable().toList()
assertEquals(listOf(1, 2, 3, 4), observationsList)
}
}
}

View File

@ -18,11 +18,7 @@ import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
import net.corda.core.utilities.Try import net.corda.core.utilities.*
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.days
import net.corda.core.utilities.debug
import net.corda.core.utilities.seconds
import net.corda.node.internal.security.AuthorizingSubject import net.corda.node.internal.security.AuthorizingSubject
import net.corda.node.internal.security.RPCSecurityManager import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.serialization.amqp.RpcServerObservableSerializer import net.corda.node.serialization.amqp.RpcServerObservableSerializer
@ -247,9 +243,10 @@ class RPCServer(
} }
} }
fun close() { fun close(queueDrainTimeout: Duration = 5.seconds) {
// Putting Stop message onto the queue will eventually make senderThread to stop.
sendJobQueue.put(RpcSendJob.Stop) sendJobQueue.put(RpcSendJob.Stop)
senderThread?.join() senderThread?.join(queueDrainTimeout.toMillis())
reaperScheduledFuture?.cancel(false) reaperScheduledFuture?.cancel(false)
rpcExecutor?.shutdownNow() rpcExecutor?.shutdownNow()
reaperExecutor?.shutdownNow() reaperExecutor?.shutdownNow()

View File

@ -17,6 +17,7 @@ import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.node.NetworkParameters import net.corda.core.node.NetworkParameters
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.node.internal.security.RPCSecurityManagerImpl import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.services.messaging.RPCServer import net.corda.node.services.messaging.RPCServer
import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.node.services.messaging.RPCServerConfiguration
@ -53,6 +54,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3
import java.lang.reflect.Method import java.lang.reflect.Method
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths
import java.time.Duration
import java.util.* import java.util.*
import net.corda.nodeapi.internal.config.User as InternalUser import net.corda.nodeapi.internal.config.User as InternalUser
@ -100,7 +102,6 @@ val fakeNodeLegalName = CordaX500Name(organisation = "Not:a:real:name", locality
// Use a global pool so that we can run RPC tests in parallel // Use a global pool so that we can run RPC tests in parallel
private val globalPortAllocation = PortAllocation.Incremental(10000) private val globalPortAllocation = PortAllocation.Incremental(10000)
private val globalDebugPortAllocation = PortAllocation.Incremental(5005) private val globalDebugPortAllocation = PortAllocation.Incremental(5005)
private val globalMonitorPortAllocation = PortAllocation.Incremental(7005)
fun <A> rpcDriver( fun <A> rpcDriver(
isDebug: Boolean = false, isDebug: Boolean = false,
@ -244,10 +245,11 @@ data class RPCDriverDSL(
maxFileSize: Int = MAX_MESSAGE_SIZE, maxFileSize: Int = MAX_MESSAGE_SIZE,
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE,
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
ops: I ops: I,
queueDrainTimeout: Duration = 5.seconds
): CordaFuture<RpcServerHandle> { ): CordaFuture<RpcServerHandle> {
return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker -> return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker ->
startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker) startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker, queueDrainTimeout)
} }
} }
@ -440,7 +442,7 @@ data class RPCDriverDSL(
} }
} }
fun startInVmRpcBroker( private fun startInVmRpcBroker(
rpcUser: User = rpcTestUser, rpcUser: User = rpcTestUser,
maxFileSize: Int = MAX_MESSAGE_SIZE, maxFileSize: Int = MAX_MESSAGE_SIZE,
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE
@ -468,7 +470,8 @@ data class RPCDriverDSL(
nodeLegalName: CordaX500Name = fakeNodeLegalName, nodeLegalName: CordaX500Name = fakeNodeLegalName,
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
ops: I, ops: I,
brokerHandle: RpcBrokerHandle brokerHandle: RpcBrokerHandle,
queueDrainTimeout: Duration = 5.seconds
): RpcServerHandle { ): RpcServerHandle {
val locator = ActiveMQClient.createServerLocatorWithoutHA(brokerHandle.clientTransportConfiguration).apply { val locator = ActiveMQClient.createServerLocatorWithoutHA(brokerHandle.clientTransportConfiguration).apply {
minLargeMessageSize = MAX_MESSAGE_SIZE minLargeMessageSize = MAX_MESSAGE_SIZE
@ -485,7 +488,7 @@ data class RPCDriverDSL(
configuration configuration
) )
driverDSL.shutdownManager.registerShutdown { driverDSL.shutdownManager.registerShutdown {
rpcServer.close() rpcServer.close(queueDrainTimeout)
locator.close() locator.close()
} }
rpcServer.start(brokerHandle.serverControl) rpcServer.start(brokerHandle.serverControl)
@ -520,7 +523,7 @@ class RandomRpcUser {
Generator.sequence(method.parameters.map { Generator.sequence(method.parameters.map {
generatorStore[it.type] ?: throw Exception("No generator for ${it.type}") generatorStore[it.type] ?: throw Exception("No generator for ${it.type}")
}).map { arguments -> }).map { arguments ->
Call(method, { method.invoke(handle.proxy, *arguments.toTypedArray()) }) Call(method) { method.invoke(handle.proxy, *arguments.toTypedArray()) }
} }
} }
val callGenerator = Generator.choice(callGenerators) val callGenerator = Generator.choice(callGenerators)