mirror of
https://github.com/corda/corda.git
synced 2025-04-19 08:36:39 +00:00
ENT-2489: Experiment with multi-identity RPC worker (#1377)
* ENT-2489: Trivial change to prep for multiple RpcWorkerServiceHubs (i.e. identities) * ENT-2489: Allow passing `targetLegalIdentity` from Client RPC call. * ENT-2489: Starting RpcWorker with multiple RpcWorkerServiceHubs (unfinished) * ENT-2489: Starting RpcWorker with single (for now) RpcWorkerServiceHub * ENT-2489: Tighten-up integration tests assertions * ENT-2489: Introduce RPCOpsRouting * ENT-2489: Output configs for reference * ENT-2489: Extend test for RpcWorker to operate with multiple identities. * ENT-2489: Remove un-necessary P2P address * ENT-2489: New test for RpcWorker getting paid. * ENT-2489: Make RpcWorkerMultiIdentityTest work * ENT-2489: Use MAX_RPC_MESSAGE_SIZE when configuring RPC broker. * ENT-2489: Add exception clause when client attempts to use the wrong identity. * ENT-2489: Fixes post merge from `master`. * ENT-2489: Fixes post merge from `master`. * ENT-2489: Explicitly specify X500 name in test. * ENT-2489: Use single flow worker and switch anonymity off when making payment. * ENT-2489: Fix for RpcWorkerTest. Add `NetworkMapUpdater` to `FlowWorkerServiceHub` or else no-one will ever send a signal on `networkMapCache.nodeReady` future. Not having `networkMapCache.nodeReady` in completed state, will prevent SMM from operating properly. * ENT-2489: Handle gracefully the fact that session might be already closed. * ENT-2489: Fix incorrect merge from `master`. * ENT-2489: Make `RPCOpsRouting` generic with regard to `RPCOps` following discussion with @mnesbit * ENT-2489: Make `methodTable` uniform for all the legal names. * ENT-2489: Make `ObservableContext` non-generic. * ENT-2489: Tidy-up shutdown sequence post merge from `master` * ENT-2489: Correct exception type thrown * ENT-2489: Generics test compilation fix.
This commit is contained in:
parent
dbee84c01d
commit
4c68b515f8
@ -606,7 +606,7 @@ class RPCStabilityTests {
|
||||
}
|
||||
}
|
||||
|
||||
fun RPCDriverDSL.pollUntilClientNumber(server: RpcServerHandle, expected: Int) {
|
||||
fun <OPS : RPCOps> RPCDriverDSL.pollUntilClientNumber(server: RpcServerHandle<OPS>, expected: Int) {
|
||||
pollUntilTrue("number of RPC clients to become $expected") {
|
||||
val clientAddresses = server.broker.serverControl.addressNames.filter { it.startsWith(RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX) }
|
||||
clientAddresses.size == expected
|
||||
|
@ -14,6 +14,8 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.flatMap
|
||||
import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.createDirectory
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.node.NodeInfo
|
||||
@ -22,6 +24,7 @@ import net.corda.core.utilities.seconds
|
||||
import net.corda.flowworker.FlowWorker
|
||||
import net.corda.flowworker.FlowWorkerServiceHub
|
||||
import net.corda.node.internal.NetworkParametersReader
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.internal.artemis.ArtemisBroker
|
||||
import net.corda.node.internal.security.RPCSecurityManagerImpl
|
||||
import net.corda.node.services.config.*
|
||||
@ -29,17 +32,16 @@ import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.node.services.network.NodeInfoWatcher
|
||||
import net.corda.node.services.rpc.ArtemisRpcBroker
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.PLATFORM_VERSION
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.nodeapi.internal.config.toConfig
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.core.getTestPartyAndCertificate
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.internal.DriverDSLImpl
|
||||
import net.corda.testing.node.internal.InternalDriverDSL
|
||||
import net.corda.testing.node.internal.TestCordappDirectories
|
||||
import net.corda.testing.node.internal.genericDriver
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
import net.corda.testing.node.internal.*
|
||||
import org.apache.commons.io.FileUtils
|
||||
import java.nio.file.Paths
|
||||
import java.security.KeyPair
|
||||
@ -62,141 +64,201 @@ data class RpcFlowWorkerHandle(val rpcAddress: NetworkHostAndPort)
|
||||
data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : InternalDriverDSL by driverDSL {
|
||||
|
||||
fun startRpcFlowWorker(myLegalName: CordaX500Name, rpcUsers: List<net.corda.testing.node.User>, numberOfFlowWorkers: Int = 1): CordaFuture<RpcFlowWorkerHandle> {
|
||||
val (config, rpcWorkerConfig, flowWorkerConfigs, bridgeConfig) = generateConfigs(myLegalName, rpcUsers, numberOfFlowWorkers)
|
||||
return startRpcFlowWorker(Collections.singleton(myLegalName), rpcUsers, numberOfFlowWorkers)
|
||||
}
|
||||
|
||||
val trustRoot = rpcWorkerConfig.p2pSslOptions.trustStore.get().query { getCertificate(X509Utilities.CORDA_ROOT_CA) }
|
||||
val nodeCa = rpcWorkerConfig.signingCertificateStore.get().query { getCertificate(X509Utilities.CORDA_CLIENT_CA) }
|
||||
fun startRpcFlowWorker(legalNames: Set<CordaX500Name>, rpcUsers: List<net.corda.testing.node.User>, numberOfFlowWorkers: Int = 1): CordaFuture<RpcFlowWorkerHandle> {
|
||||
|
||||
val ourKeyPair = Crypto.generateKeyPair()
|
||||
val ourParty = Party(myLegalName, ourKeyPair.public)
|
||||
val ourPartyAndCertificate = getTestPartyAndCertificate(ourParty)
|
||||
val myInfo = NodeInfo(listOf(bridgeConfig.inboundConfig!!.listeningAddress), listOf(ourPartyAndCertificate), 4, 1)
|
||||
val rpcWorkerServiceHubsFuture = legalNames.map { myLegalName ->
|
||||
|
||||
val nodeInfoAndSigned = NodeInfoAndSigned(myInfo) { _, serialised ->
|
||||
ourKeyPair.private.sign(serialised.bytes)
|
||||
val (config, flowWorkerConfigs) = generateNodeAndFlowConfigs(myLegalName, numberOfFlowWorkers)
|
||||
|
||||
val bridgeConfig = generateBridgeConfig(config)
|
||||
|
||||
val trustRoot = config.p2pSslOptions.trustStore.get().query { getCertificate(X509Utilities.CORDA_ROOT_CA) }
|
||||
val nodeCa = config.signingCertificateStore.get().query { getCertificate(X509Utilities.CORDA_CLIENT_CA) }
|
||||
|
||||
val ourKeyPair = Crypto.generateKeyPair()
|
||||
val ourParty = Party(myLegalName, ourKeyPair.public)
|
||||
val ourPartyAndCertificate = getTestPartyAndCertificate(ourParty)
|
||||
val myInfo = NodeInfo(listOf(bridgeConfig.inboundConfig!!.listeningAddress), listOf(ourPartyAndCertificate), PLATFORM_VERSION, 1)
|
||||
|
||||
val nodeInfoAndSigned = NodeInfoAndSigned(myInfo) { _, serialised ->
|
||||
ourKeyPair.private.sign(serialised.bytes)
|
||||
}
|
||||
NodeInfoWatcher.saveToFile(config.baseDirectory, nodeInfoAndSigned)
|
||||
|
||||
driverDSL.networkMapAvailability.flatMap {
|
||||
val visibilityHandle = driverDSL.networkVisibilityController.register(myLegalName)
|
||||
it!!.networkParametersCopier.install(config.baseDirectory)
|
||||
it.nodeInfosCopier.addConfig(config.baseDirectory)
|
||||
|
||||
val signedNetworkParameters = NetworkParametersReader(trustRoot, null, config.baseDirectory).read()
|
||||
val maxMessageSize = signedNetworkParameters.networkParameters.maxMessageSize
|
||||
|
||||
val flowWorkerBroker = createFlowWorkerBroker(config, maxMessageSize)
|
||||
|
||||
val flowWorkers = flowWorkerConfigs.map {
|
||||
val (flowWorker, _) = createFlowWorker(it, myInfo, ourKeyPair, trustRoot, nodeCa, signedNetworkParameters)
|
||||
flowWorker
|
||||
}
|
||||
|
||||
val rpcWorkerServiceHub = createRpcWorkerServiceHub(config, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa)
|
||||
rpcWorkerServiceHub.start()
|
||||
|
||||
val bridge = createBridge(bridgeConfig, config as NodeConfigurationImpl)
|
||||
|
||||
shutdownManager.registerShutdown {
|
||||
// Gracefully shutdown bottom-up, i.e.: FlowWorker, RPC Worker, Brokers, Bridge
|
||||
flowWorkers.forEach { it.stop() }
|
||||
flowWorkerBroker.stop()
|
||||
rpcWorkerServiceHub.stop()
|
||||
bridge.stop()
|
||||
}
|
||||
|
||||
visibilityHandle.listen(rpcWorkerServiceHub.rpcOps).map { rpcWorkerServiceHub }
|
||||
}
|
||||
}
|
||||
NodeInfoWatcher.saveToFile(rpcWorkerConfig.baseDirectory, nodeInfoAndSigned)
|
||||
|
||||
return driverDSL.networkMapAvailability.flatMap {
|
||||
val visibilityHandle = driverDSL.networkVisibilityController.register(myLegalName)
|
||||
it!!.networkParametersCopier.install(rpcWorkerConfig.baseDirectory)
|
||||
it.nodeInfosCopier.addConfig(rpcWorkerConfig.baseDirectory)
|
||||
val rpcWorkerConfig = generateRpcWorkerConfig(rpcUsers)
|
||||
|
||||
val signedNetworkParameters = NetworkParametersReader(trustRoot, null, rpcWorkerConfig.baseDirectory).read()
|
||||
val rpcWorkerFuture = rpcWorkerServiceHubsFuture.transpose().map { rpcWorkerServiceHubs ->
|
||||
|
||||
val flowWorkerBroker = createFlowWorkerBroker(config, signedNetworkParameters.networkParameters.maxMessageSize)
|
||||
val rpcWorkerBroker = createRpcWorkerBroker(rpcWorkerConfig, signedNetworkParameters.networkParameters.maxMessageSize)
|
||||
|
||||
val flowWorkers = flowWorkerConfigs.map {
|
||||
val (flowWorker, _) = createFlowWorker(it, myInfo, ourKeyPair, trustRoot, nodeCa, signedNetworkParameters)
|
||||
flowWorker
|
||||
}
|
||||
|
||||
val (rpcWorker, rpcWorkerServiceHub) = createRpcWorker(rpcWorkerConfig, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa, rpcWorkerBroker.serverControl)
|
||||
|
||||
val bridge = createBridge(bridgeConfig)
|
||||
val rpcWorkerBroker = createRpcWorkerBroker(rpcWorkerConfig)
|
||||
|
||||
val rpcWorker = RpcWorker(rpcWorkerBroker.serverControl, rpcWorkerConfig, *rpcWorkerServiceHubs.toTypedArray()).start()
|
||||
shutdownManager.registerShutdown {
|
||||
// Gracefully shutdown bottom-up, i.e.: FlowWorker, RPC Worker, Brokers, Bridge
|
||||
flowWorkers.forEach { it.stop() }
|
||||
rpcWorker.stop()
|
||||
flowWorkerBroker.stop()
|
||||
rpcWorkerBroker.stop()
|
||||
bridge.stop()
|
||||
}
|
||||
rpcWorker
|
||||
}
|
||||
|
||||
visibilityHandle.listen(rpcWorkerServiceHub.rpcOps).map {
|
||||
RpcFlowWorkerHandle(rpcWorkerConfig.rpcOptions.address)
|
||||
}
|
||||
return rpcWorkerFuture.map {
|
||||
RpcFlowWorkerHandle(rpcWorkerConfig.rpcOptions.address)
|
||||
}
|
||||
}
|
||||
|
||||
private data class ConfigWrapper(val config: NodeConfiguration, val rpcWorkerConfig: NodeConfiguration, val flowWorkerConfigs: List<NodeConfiguration>, val bridgeConfig: FirewallConfiguration)
|
||||
|
||||
private fun generateConfigs(myLegalName: CordaX500Name, rpcUsers: List<net.corda.testing.node.User>, numberOfFlowWorkers: Int): ConfigWrapper {
|
||||
val cordappDirectories = TestCordappDirectories.cached(driverDSL.cordappsForAllNodes).toList()
|
||||
|
||||
private fun generateRpcWorkerConfig(rpcUsers: List<net.corda.testing.node.User>): NodeConfiguration {
|
||||
val rpcWorkerBrokerAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
|
||||
val rpcWorkerBrokerAdminAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
|
||||
val flowWorkerBrokerAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
|
||||
|
||||
val bridgeListeningAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
|
||||
val RPC_WORKER_LEGAL_NAME = CordaX500Name("RpcWorker", "Kiev", "UA")
|
||||
|
||||
val rpcWorkerConfig = genericConfig().copy(
|
||||
myLegalName = RPC_WORKER_LEGAL_NAME,
|
||||
baseDirectory = driverDSL.driverDirectory / "rpcWorker",
|
||||
rpcUsers = rpcUsers.map { User(it.username, it.password, it.permissions) },
|
||||
rpcSettings = NodeRpcSettings(rpcWorkerBrokerAddress, rpcWorkerBrokerAdminAddress, true, false, null))
|
||||
// create test certificates
|
||||
rpcWorkerConfig.configureWithDevSSLCertificate()
|
||||
|
||||
// Write config (for reference)
|
||||
writeConfig(rpcWorkerConfig.baseDirectory, "rpcWorker.conf", rpcWorkerConfig.toConfig())
|
||||
|
||||
return rpcWorkerConfig
|
||||
}
|
||||
|
||||
private fun generateNodeAndFlowConfigs(myLegalName: CordaX500Name, numberOfFlowWorkers: Int): Pair<NodeConfiguration, List<NodeConfiguration>> {
|
||||
val cordappDirectories = TestCordappDirectories.cached(driverDSL.cordappsForAllNodes).toList()
|
||||
|
||||
val flowWorkerBrokerAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
|
||||
|
||||
val baseDirectory = driverDSL.driverDirectory / myLegalName.organisation
|
||||
baseDirectory.createDirectory()
|
||||
|
||||
val dataSourceProperties = MockServices.makeTestDataSourceProperties()
|
||||
dataSourceProperties.setProperty("maximumPoolSize", "10")
|
||||
|
||||
val config = genericConfig().copy(myLegalName = myLegalName, baseDirectory = baseDirectory,
|
||||
messagingServerAddress = flowWorkerBrokerAddress, dataSourceProperties = MockServices.makeTestDataSourceProperties(),
|
||||
messagingServerAddress = flowWorkerBrokerAddress, dataSourceProperties = dataSourceProperties,
|
||||
cordappDirectories = cordappDirectories)
|
||||
// create test certificates
|
||||
config.configureWithDevSSLCertificate()
|
||||
|
||||
val rpcWorkerConfig = config.copy(baseDirectory = driverDSL.driverDirectory / myLegalName.organisation / "rpcWorker",
|
||||
rpcUsers = rpcUsers.map { User(it.username, it.password, it.permissions) },
|
||||
rpcSettings = NodeRpcSettings(rpcWorkerBrokerAddress, rpcWorkerBrokerAdminAddress, true, false, null))
|
||||
// copy over certificates to RpcWorker
|
||||
FileUtils.copyDirectory(config.certificatesDirectory.toFile(), (rpcWorkerConfig.baseDirectory / "certificates").toFile())
|
||||
|
||||
val flowWorkerConfigs = (1..numberOfFlowWorkers).map {
|
||||
val flowWorkerConfig = config.copy(baseDirectory = driverDSL.driverDirectory / myLegalName.organisation / "flowWorker$it")
|
||||
// copy over certificates to FlowWorker
|
||||
FileUtils.copyDirectory(config.certificatesDirectory.toFile(), (flowWorkerConfig.baseDirectory / "certificates").toFile())
|
||||
|
||||
// Write config (for reference)
|
||||
writeConfig(flowWorkerConfig.baseDirectory, "flowWorker.conf", flowWorkerConfig.toConfig())
|
||||
flowWorkerConfig
|
||||
}
|
||||
|
||||
val bridgeConfig = FirewallConfigurationImpl(baseDirectory = baseDirectory / "rpcWorker", crlCheckSoftFail = true,
|
||||
// Write config (for reference)
|
||||
writeConfig(config.baseDirectory, "node.conf", config.toConfig())
|
||||
|
||||
return Pair(config, flowWorkerConfigs)
|
||||
}
|
||||
|
||||
private fun generateBridgeConfig(nodeConfig: NodeConfiguration): FirewallConfiguration {
|
||||
|
||||
val bridgeListeningAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
|
||||
|
||||
val baseDirectory = driverDSL.driverDirectory / "bridge_${nodeConfig.myLegalName.organisation}"
|
||||
|
||||
val bridgeConfig = FirewallConfigurationImpl(baseDirectory = baseDirectory, crlCheckSoftFail = true,
|
||||
bridgeInnerConfig = null, keyStorePassword = "pass", trustStorePassword = "pass", firewallMode = FirewallMode.SenderReceiver,
|
||||
networkParametersPath = baseDirectory / "rpcWorker", outboundConfig = BridgeOutboundConfigurationImpl(flowWorkerBrokerAddress, listOf(), null, null),
|
||||
networkParametersPath = baseDirectory, outboundConfig = BridgeOutboundConfigurationImpl(nodeConfig.messagingServerAddress!!, listOf(), null, null),
|
||||
inboundConfig = BridgeInboundConfigurationImpl(bridgeListeningAddress, null), enableAMQPPacketTrace = false, floatOuterConfig = null, haConfig = null)
|
||||
|
||||
return ConfigWrapper(config, rpcWorkerConfig, flowWorkerConfigs, bridgeConfig)
|
||||
baseDirectory.createDirectories()
|
||||
// Write config (for reference)
|
||||
writeConfig(bridgeConfig.baseDirectory, "bridge.conf", bridgeConfig.toConfig())
|
||||
return bridgeConfig
|
||||
}
|
||||
}
|
||||
|
||||
private fun genericConfig(): NodeConfigurationImpl {
|
||||
return NodeConfigurationImpl(baseDirectory = Paths.get("."), myLegalName = DUMMY_BANK_A_NAME, emailAddress = "",
|
||||
keyStorePassword = "pass", trustStorePassword = "pass", crlCheckSoftFail = true, dataSourceProperties = Properties(),
|
||||
rpcUsers = listOf(), verifierType = VerifierType.InMemory, flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0),
|
||||
p2pAddress = NetworkHostAndPort("localhost", 1), rpcSettings = NodeRpcSettings(NetworkHostAndPort("localhost", 1), null, ssl = null),
|
||||
relay = null, messagingServerAddress = null, enterpriseConfiguration = EnterpriseConfiguration(mutualExclusionConfiguration = MutualExclusionConfiguration(updateInterval = 0, waitInterval = 0), externalBridge = true),
|
||||
notary = null)
|
||||
}
|
||||
|
||||
private fun createRpcWorkerBroker(config: NodeConfiguration, maxMessageSize: Int): ArtemisBroker {
|
||||
val rpcOptions = config.rpcOptions
|
||||
val securityManager = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(config.rpcUsers))
|
||||
val broker = if (rpcOptions.useSsl) {
|
||||
ArtemisRpcBroker.withSsl(config.p2pSslOptions, rpcOptions.address, rpcOptions.adminAddress, rpcOptions.sslConfig!!, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
|
||||
} else {
|
||||
ArtemisRpcBroker.withoutSsl(config.p2pSslOptions, rpcOptions.address, rpcOptions.adminAddress, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
|
||||
private fun genericConfig(): NodeConfigurationImpl {
|
||||
return NodeConfigurationImpl(baseDirectory = Paths.get("."), myLegalName = DUMMY_BANK_A_NAME, emailAddress = "",
|
||||
keyStorePassword = "pass", trustStorePassword = "pass", crlCheckSoftFail = true, dataSourceProperties = Properties(),
|
||||
rpcUsers = listOf(), verifierType = VerifierType.InMemory, flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0),
|
||||
p2pAddress = NetworkHostAndPort("localhost", 1), rpcSettings = NodeRpcSettings(NetworkHostAndPort("localhost", 1), null, ssl = null),
|
||||
relay = null, messagingServerAddress = null, enterpriseConfiguration = EnterpriseConfiguration(mutualExclusionConfiguration = MutualExclusionConfiguration(updateInterval = 0, waitInterval = 0), externalBridge = true),
|
||||
notary = null)
|
||||
}
|
||||
broker.start()
|
||||
return broker
|
||||
}
|
||||
|
||||
private fun createRpcWorker(config: NodeConfiguration, myInfo: NodeInfo, signedNetworkParameters: NetworkParametersReader.NetworkParametersAndSigned, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate, serverControl: ActiveMQServerControl): Pair<RpcWorker, RpcWorkerServiceHub> {
|
||||
val rpcWorkerServiceHub = RpcWorkerServiceHub(config, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa)
|
||||
val rpcWorker = RpcWorker(rpcWorkerServiceHub, serverControl)
|
||||
rpcWorker.start()
|
||||
return Pair(rpcWorker, rpcWorkerServiceHub)
|
||||
}
|
||||
private fun createRpcWorkerBroker(config: NodeConfiguration): ArtemisBroker {
|
||||
val rpcOptions = config.rpcOptions
|
||||
val securityManager = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(config.rpcUsers))
|
||||
val broker = if (rpcOptions.useSsl) {
|
||||
ArtemisRpcBroker.withSsl(config.p2pSslOptions, rpcOptions.address, rpcOptions.adminAddress, rpcOptions.sslConfig!!, securityManager,
|
||||
Node.MAX_RPC_MESSAGE_SIZE, false, config.baseDirectory / "artemis", false)
|
||||
} else {
|
||||
ArtemisRpcBroker.withoutSsl(config.p2pSslOptions, rpcOptions.address, rpcOptions.adminAddress, securityManager,
|
||||
Node.MAX_RPC_MESSAGE_SIZE, false, config.baseDirectory / "artemis", false)
|
||||
}
|
||||
broker.start()
|
||||
return broker
|
||||
}
|
||||
|
||||
private fun createFlowWorkerBroker(config: NodeConfiguration, maxMessageSize: Int): ArtemisBroker {
|
||||
val broker = ArtemisMessagingServer(config, config.messagingServerAddress!!, maxMessageSize)
|
||||
broker.start()
|
||||
return broker
|
||||
}
|
||||
private fun createRpcWorkerServiceHub(config: NodeConfiguration, myInfo: NodeInfo, signedNetworkParameters: NetworkParametersReader.NetworkParametersAndSigned, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate): RpcWorkerServiceHub {
|
||||
return RpcWorkerServiceHub(config, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa)
|
||||
}
|
||||
|
||||
private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate, signedNetworkParameters: NetworkParametersReader.NetworkParametersAndSigned): Pair<FlowWorker, FlowWorkerServiceHub> {
|
||||
val flowWorkerServiceHub = FlowWorkerServiceHub(config, myInfo, ourKeyPair, trustRoot, nodeCa, signedNetworkParameters)
|
||||
val flowWorker = FlowWorker(UUID.randomUUID().toString(), flowWorkerServiceHub)
|
||||
flowWorker.start()
|
||||
return Pair(flowWorker, flowWorkerServiceHub)
|
||||
}
|
||||
private fun createFlowWorkerBroker(config: NodeConfiguration, maxMessageSize: Int): ArtemisBroker {
|
||||
val broker = ArtemisMessagingServer(config, config.messagingServerAddress!!, maxMessageSize)
|
||||
broker.start()
|
||||
return broker
|
||||
}
|
||||
|
||||
private fun createBridge(firewallConfiguration: FirewallConfiguration): FirewallInstance {
|
||||
val bridge = FirewallInstance(firewallConfiguration, FirewallVersionInfo(4, "1.1", "Dummy", "Test"))
|
||||
bridge.start()
|
||||
return bridge
|
||||
private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, ourKeyPair: KeyPair,
|
||||
trustRoot: X509Certificate, nodeCa: X509Certificate, signedNetworkParameters: NetworkParametersReader.NetworkParametersAndSigned): Pair<FlowWorker, FlowWorkerServiceHub> {
|
||||
val flowWorkerServiceHub = FlowWorkerServiceHub(config, myInfo, ourKeyPair, trustRoot, nodeCa, signedNetworkParameters)
|
||||
val flowWorker = FlowWorker(UUID.randomUUID().toString(), flowWorkerServiceHub)
|
||||
flowWorker.start()
|
||||
return Pair(flowWorker, flowWorkerServiceHub)
|
||||
}
|
||||
|
||||
private fun createBridge(firewallConfiguration: FirewallConfiguration, config: NodeConfigurationImpl): FirewallInstance {
|
||||
// Copy key stores
|
||||
val certificatesTarget = firewallConfiguration.baseDirectory / "certificates"
|
||||
FileUtils.copyDirectory(config.certificatesDirectory.toFile(), certificatesTarget.toFile())
|
||||
// Install network parameters
|
||||
NetworkParametersCopier(driverDSL.networkParameters).install(firewallConfiguration.baseDirectory)
|
||||
|
||||
val bridge = FirewallInstance(firewallConfiguration, FirewallVersionInfo(PLATFORM_VERSION, "1.1", "Dummy", "Test"))
|
||||
bridge.start()
|
||||
return bridge
|
||||
}
|
||||
}
|
@ -0,0 +1,48 @@
|
||||
package net.corda.rpcWorker
|
||||
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.RPCException
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.finance.GBP
|
||||
import net.corda.finance.POUNDS
|
||||
import net.corda.finance.contracts.getCashBalances
|
||||
import net.corda.finance.flows.CashIssueFlow
|
||||
import net.corda.finance.flows.CashPaymentFlow
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.node.User
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class RpcWorkerMultiIdentityTest {
|
||||
|
||||
@Test
|
||||
fun `cash pay`() {
|
||||
rpcFlowWorkerDriver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
|
||||
val rpcUser = User("username", "password", permissions = setOf("ALL"))
|
||||
val combinedRpcHandle = startRpcFlowWorker(setOf(DUMMY_BANK_A_NAME, DUMMY_BANK_B_NAME), listOf(rpcUser), 1).get()
|
||||
val bankC = startNode(providedName = DUMMY_BANK_C_NAME).get()
|
||||
|
||||
val bankAProxy = CordaRPCClient(combinedRpcHandle.rpcAddress).start(rpcUser.username, rpcUser.password, DUMMY_BANK_A_NAME).proxy
|
||||
val bankBProxy = CordaRPCClient(combinedRpcHandle.rpcAddress).start(rpcUser.username, rpcUser.password, DUMMY_BANK_B_NAME).proxy
|
||||
|
||||
val cashIssueResult = bankAProxy.startFlow(::CashIssueFlow, 11.POUNDS, OpaqueBytes.of(0x01), defaultNotaryIdentity).returnValue.get()
|
||||
assertEquals(11.POUNDS, bankAProxy.getCashBalances()[GBP])
|
||||
|
||||
val cashPayResult = bankAProxy.startFlow(::CashPaymentFlow, 8.POUNDS, bankC.nodeInfo.singleIdentity(), false).returnValue.get()
|
||||
assertEquals(3.POUNDS, bankAProxy.getCashBalances()[GBP])
|
||||
assertEquals(8.POUNDS, bankC.rpc.getCashBalances()[GBP])
|
||||
|
||||
val cashPayResult2 = bankC.rpc.startFlow(::CashPaymentFlow, 2.POUNDS, bankBProxy.nodeInfo().singleIdentity()).returnValue.get()
|
||||
assertEquals(3.POUNDS, bankAProxy.getCashBalances()[GBP])
|
||||
assertEquals(6.POUNDS, bankC.rpc.getCashBalances()[GBP])
|
||||
// assertEquals(2.POUNDS, bankBProxy.getCashBalances()[GBP]) TODO: Investigate race condition, this condition sometimes passes and sometimes not
|
||||
|
||||
Assertions.assertThatThrownBy {
|
||||
CordaRPCClient(combinedRpcHandle.rpcAddress).start(rpcUser.username, rpcUser.password, CHARLIE_NAME).proxy }.isInstanceOf(CordaRuntimeException::class.java)
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,38 @@
|
||||
package net.corda.rpcWorker
|
||||
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.finance.GBP
|
||||
import net.corda.finance.POUNDS
|
||||
import net.corda.finance.contracts.getCashBalances
|
||||
import net.corda.finance.flows.CashIssueFlow
|
||||
import net.corda.finance.flows.CashPaymentFlow
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.node.User
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class RpcWorkerPaidTest {
|
||||
|
||||
@Test
|
||||
fun `cash pay`() {
|
||||
rpcFlowWorkerDriver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
|
||||
val bankAUser = User("username", "password", permissions = setOf("ALL"))
|
||||
val bankA = startRpcFlowWorker(DUMMY_BANK_A_NAME, listOf(bankAUser)).get()
|
||||
val bankB = startNode().get()
|
||||
|
||||
val bankAProxy = CordaRPCClient(bankA.rpcAddress).start("username", "password").proxy
|
||||
|
||||
val cashIssueResult = bankB.rpc.startFlow(::CashIssueFlow, 10.POUNDS, OpaqueBytes.of(0x01), defaultNotaryIdentity).returnValue.get()
|
||||
assertEquals(10.POUNDS, bankB.rpc.getCashBalances()[GBP])
|
||||
|
||||
val cashPayResult = bankB.rpc.startFlow(::CashPaymentFlow, 2.POUNDS, bankAProxy.nodeInfo().singleIdentity(), false).returnValue.get()
|
||||
assertEquals(8.POUNDS, bankB.rpc.getCashBalances()[GBP])
|
||||
assertEquals(2.POUNDS, bankAProxy.getCashBalances()[GBP])
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -8,6 +8,7 @@ import net.corda.finance.contracts.getCashBalances
|
||||
import net.corda.finance.flows.CashIssueFlow
|
||||
import net.corda.finance.flows.CashPaymentFlow
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.core.DUMMY_BANK_C_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.node.User
|
||||
@ -19,8 +20,8 @@ class RpcWorkerTest {
|
||||
fun `cash pay`() {
|
||||
rpcFlowWorkerDriver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
|
||||
val bankAUser = User("username", "password", permissions = setOf("ALL"))
|
||||
val bankA = startRpcFlowWorker(DUMMY_BANK_A_NAME, listOf(bankAUser), 2).get()
|
||||
val bankB = startNode().get()
|
||||
val bankA = startRpcFlowWorker(DUMMY_BANK_A_NAME, listOf(bankAUser), 1).get()
|
||||
val bankB = startNode(providedName = DUMMY_BANK_C_NAME).get()
|
||||
|
||||
val bankAProxy = CordaRPCClient(bankA.rpcAddress).start("username", "password").proxy
|
||||
|
||||
@ -28,7 +29,7 @@ class RpcWorkerTest {
|
||||
println(cashIssueResult)
|
||||
println(bankAProxy.getCashBalances())
|
||||
|
||||
val cashPayResult = bankAProxy.startFlow(::CashPaymentFlow, 2.DOLLARS, bankB.nodeInfo.singleIdentity()).returnValue.get()
|
||||
val cashPayResult = bankAProxy.startFlow(::CashPaymentFlow, 2.DOLLARS, bankB.nodeInfo.singleIdentity(), false).returnValue.get()
|
||||
println(cashPayResult)
|
||||
println(bankAProxy.getCashBalances())
|
||||
}
|
||||
|
@ -137,6 +137,9 @@ class CordaRpcWorkerOps(
|
||||
sorting: Sort,
|
||||
contractStateType: Class<out T>): Vault.Page<T> {
|
||||
contractStateType.checkIsA<ContractState>()
|
||||
// TODO hack, fix the way we populate contractStateTypeMappings, or else query like `net.corda.finance.contracts.GetBalances.getCashBalances(net.corda.core.messaging.CordaRPCOps)`
|
||||
// are not going to work
|
||||
(services.vaultService as NodeVaultService).bootstrapContractStateTypes()
|
||||
return services.vaultService._queryBy(criteria, paging, sorting, contractStateType)
|
||||
}
|
||||
|
||||
|
@ -12,6 +12,7 @@ import net.corda.node.internal.security.RPCSecurityManagerImpl
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.SecurityConfiguration
|
||||
import net.corda.node.services.messaging.InternalRPCMessagingClient
|
||||
import net.corda.node.services.messaging.RPCOpsRouting
|
||||
import net.corda.node.services.messaging.RPCServerConfiguration
|
||||
import net.corda.node.services.rpc.ArtemisRpcBroker
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
@ -111,31 +112,40 @@ class Main : Runnable {
|
||||
|
||||
private fun createRpcWorker(config: NodeConfiguration, myInfo: NodeInfo, signedNetworkParameters: NetworkParametersReader.NetworkParametersAndSigned, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate, serverControl: ActiveMQServerControl): Pair<RpcWorker, RpcWorkerServiceHub> {
|
||||
val rpcWorkerServiceHub = RpcWorkerServiceHub(config, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa)
|
||||
val rpcWorker = RpcWorker(rpcWorkerServiceHub, serverControl)
|
||||
val rpcWorker = RpcWorker(serverControl, TODO(), rpcWorkerServiceHub)
|
||||
rpcWorker.start()
|
||||
return Pair(rpcWorker, rpcWorkerServiceHub)
|
||||
}
|
||||
}
|
||||
|
||||
class RpcWorker(private val rpcWorkerServiceHub: RpcWorkerServiceHub, private val serverControl: ActiveMQServerControl) {
|
||||
class RpcWorker(private val serverControl: ActiveMQServerControl, private val rpcWorkerConfig: NodeConfiguration, private vararg val rpcWorkerServiceHubs: RpcWorkerServiceHub) {
|
||||
|
||||
private val runOnStop = ArrayList<() -> Any?>()
|
||||
|
||||
fun start() {
|
||||
val rpcServerConfiguration = RPCServerConfiguration.DEFAULT.copy(
|
||||
rpcThreadPoolSize = rpcWorkerServiceHub.configuration.enterpriseConfiguration.tuning.rpcThreadPoolSize
|
||||
)
|
||||
val securityManager = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(rpcWorkerServiceHub.configuration.rpcUsers))
|
||||
val nodeName = CordaX500Name.build(rpcWorkerServiceHub.configuration.p2pSslOptions.keyStore.get().query { getCertificate(X509Utilities.CORDA_CLIENT_TLS).subjectX500Principal })
|
||||
fun start(): RpcWorker {
|
||||
|
||||
val internalRpcMessagingClient = InternalRPCMessagingClient(rpcWorkerServiceHub.configuration.p2pSslOptions, rpcWorkerServiceHub.configuration.rpcOptions.adminAddress, Node.MAX_RPC_MESSAGE_SIZE, nodeName, rpcServerConfiguration)
|
||||
internalRpcMessagingClient.init(rpcWorkerServiceHub.rpcOps, securityManager)
|
||||
val rpcServerConfiguration = RPCServerConfiguration.DEFAULT.copy(
|
||||
rpcThreadPoolSize = rpcWorkerConfig.enterpriseConfiguration.tuning.rpcThreadPoolSize
|
||||
)
|
||||
val securityManager = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(rpcWorkerConfig.rpcUsers))
|
||||
val nodeName =
|
||||
if(rpcWorkerServiceHubs.size == 1) {
|
||||
rpcWorkerServiceHubs.single().configuration.myLegalName
|
||||
}
|
||||
else {
|
||||
CordaX500Name.build(rpcWorkerConfig.p2pSslOptions.keyStore.get().query { getCertificate(X509Utilities.CORDA_CLIENT_TLS).subjectX500Principal })
|
||||
}
|
||||
|
||||
val internalRpcMessagingClient = InternalRPCMessagingClient<CordaRpcWorkerOps>(rpcWorkerConfig.p2pSslOptions, rpcWorkerConfig.rpcOptions.adminAddress, Node.MAX_RPC_MESSAGE_SIZE, nodeName, rpcServerConfiguration)
|
||||
|
||||
val rpcOpsMap = rpcWorkerServiceHubs.map { Pair(it.myInfo.legalIdentities.single().name, it.rpcOps) }.toMap()
|
||||
|
||||
internalRpcMessagingClient.init(RPCOpsRouting(rpcOpsMap), securityManager)
|
||||
internalRpcMessagingClient.start(serverControl)
|
||||
|
||||
runOnStop += { rpcWorkerServiceHub.stop() }
|
||||
rpcWorkerServiceHub.start()
|
||||
|
||||
runOnStop += { internalRpcMessagingClient.stop() }
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
|
@ -113,7 +113,7 @@ class ArtemisRpcTests {
|
||||
}
|
||||
artemisBroker.use { broker ->
|
||||
broker.start()
|
||||
InternalRPCMessagingClient(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB"), RPCServerConfiguration.DEFAULT).use { server ->
|
||||
InternalRPCMessagingClient<TestRpcOps>(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB"), RPCServerConfiguration.DEFAULT).use { server ->
|
||||
server.start(TestRpcOpsImpl(), securityManager, broker.serverControl)
|
||||
|
||||
val client = RPCClient<TestRpcOps>(rpcConnectorTcpTransport(broker.addresses.primary, clientSslOptions))
|
||||
@ -126,7 +126,7 @@ class ArtemisRpcTests {
|
||||
}
|
||||
}
|
||||
|
||||
private fun <OPS : RPCOps> InternalRPCMessagingClient.start(ops: OPS, securityManager: RPCSecurityManager, brokerControl: ActiveMQServerControl) {
|
||||
private fun <OPS : RPCOps> InternalRPCMessagingClient<OPS>.start(ops: OPS, securityManager: RPCSecurityManager, brokerControl: ActiveMQServerControl) {
|
||||
apply {
|
||||
init(ops, securityManager)
|
||||
start(brokerControl)
|
||||
|
@ -143,7 +143,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
override val log: Logger get() = staticLog
|
||||
override val transactionVerifierWorkerCount: Int get() = 4
|
||||
|
||||
private var internalRpcMessagingClient: InternalRPCMessagingClient? = null
|
||||
private var internalRpcMessagingClient: InternalRPCMessagingClient<RPCOps>? = null
|
||||
private var rpcBroker: ArtemisBroker? = null
|
||||
|
||||
private var shutdownHook: ShutdownHook? = null
|
||||
|
@ -16,11 +16,15 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
/**
|
||||
* Used by the Node to communicate with the RPC broker.
|
||||
*/
|
||||
class InternalRPCMessagingClient(val sslConfig: MutualSslConfiguration, val serverAddress: NetworkHostAndPort, val maxMessageSize: Int, val nodeName: CordaX500Name, val rpcServerConfiguration: RPCServerConfiguration) : SingletonSerializeAsToken(), AutoCloseable {
|
||||
class InternalRPCMessagingClient<OPS : RPCOps>(val sslConfig: MutualSslConfiguration, val serverAddress: NetworkHostAndPort, val maxMessageSize: Int, val nodeName: CordaX500Name, val rpcServerConfiguration: RPCServerConfiguration) : SingletonSerializeAsToken(), AutoCloseable {
|
||||
private var locator: ServerLocator? = null
|
||||
private var rpcServer: RPCServer? = null
|
||||
private var rpcServer: RPCServer<OPS>? = null
|
||||
|
||||
fun init(rpcOps: RPCOps, securityManager: RPCSecurityManager) = synchronized(this) {
|
||||
fun init(rpcOps: OPS, securityManager: RPCSecurityManager) {
|
||||
init(RPCOpsRouting.singleton(nodeName, rpcOps), securityManager)
|
||||
}
|
||||
|
||||
fun init(rpcOpsRouting: RPCOpsRouting<OPS>, securityManager: RPCSecurityManager) = synchronized(this) {
|
||||
|
||||
val tcpTransport = ArtemisTcpTransport.rpcInternalClientTcpTransport(serverAddress, sslConfig)
|
||||
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||
@ -32,7 +36,7 @@ class InternalRPCMessagingClient(val sslConfig: MutualSslConfiguration, val serv
|
||||
isUseGlobalPools = nodeSerializationEnv != null
|
||||
}
|
||||
|
||||
rpcServer = RPCServer(rpcOps, NODE_RPC_USER, NODE_RPC_USER, locator!!, securityManager, nodeName, rpcServerConfiguration)
|
||||
rpcServer = RPCServer(rpcOpsRouting, NODE_RPC_USER, NODE_RPC_USER, locator!!, securityManager, nodeName, rpcServerConfiguration)
|
||||
}
|
||||
|
||||
fun start(serverControl: ActiveMQServerControl) = synchronized(this) {
|
||||
|
@ -0,0 +1,32 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.utilities.contextLogger
|
||||
|
||||
class RPCOpsRouting<out OPS : RPCOps>(private val rpcOpsMap: Map<CordaX500Name, OPS>) {
|
||||
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
|
||||
fun <OPS : RPCOps> singleton(name: CordaX500Name, rpcOps: OPS): RPCOpsRouting<OPS> {
|
||||
return RPCOpsRouting(mapOf(name to rpcOps))
|
||||
}
|
||||
}
|
||||
|
||||
init {
|
||||
if(rpcOpsMap.isEmpty()) {
|
||||
throw IllegalStateException("RPC Ops mapping cannot be empty")
|
||||
}
|
||||
}
|
||||
|
||||
fun names(): Set<CordaX500Name> = rpcOpsMap.keys
|
||||
|
||||
operator fun get(targetLegalName: CordaX500Name): OPS {
|
||||
return rpcOpsMap[targetLegalName] ?: {
|
||||
val msg = "Cannot find RPC Ops for name: '$targetLegalName'"
|
||||
logger.error(msg)
|
||||
throw IllegalArgumentException(msg)
|
||||
}()
|
||||
}
|
||||
}
|
@ -49,12 +49,7 @@ import java.lang.reflect.InvocationTargetException
|
||||
import java.lang.reflect.Method
|
||||
import java.time.Duration
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
import java.util.concurrent.ScheduledFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.*
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
private typealias ObservableSubscriptionMap = Cache<InvocationId, ObservableSubscription>
|
||||
@ -84,8 +79,8 @@ data class RPCServerConfiguration(
|
||||
*
|
||||
* The way this is done is similar to that in [RPCClient], we use Kryo and add a context to stores the subscription map.
|
||||
*/
|
||||
class RPCServer(
|
||||
private val ops: RPCOps,
|
||||
class RPCServer<OPS : RPCOps>(
|
||||
private val opsRouting: RPCOpsRouting<OPS>,
|
||||
private val rpcServerUsername: String,
|
||||
private val rpcServerPassword: String,
|
||||
private val serverLocator: ServerLocator,
|
||||
@ -95,6 +90,28 @@ class RPCServer(
|
||||
) {
|
||||
private companion object {
|
||||
private val log = contextLogger()
|
||||
|
||||
/*
|
||||
* We construct an observable context on each RPC request. If subsequently a nested Observable is encountered this
|
||||
* same context is propagated by serialization context. This way all observations rooted in a single RPC will be
|
||||
* muxed correctly. Note that the context construction itself is quite cheap.
|
||||
*/
|
||||
class ObservableContext(
|
||||
override val observableMap: ObservableSubscriptionMap,
|
||||
override val clientAddressToObservables: ConcurrentHashMap<SimpleString, HashSet<InvocationId>>,
|
||||
override val deduplicationIdentity: String,
|
||||
override val clientAddress: SimpleString,
|
||||
private val sendJobQueue: BlockingQueue<RpcSendJob>
|
||||
) : ObservableContextInterface {
|
||||
private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(
|
||||
observableContext = this,
|
||||
serializationContext = SerializationDefaults.RPC_SERVER_CONTEXT)
|
||||
|
||||
override fun sendMessage(serverToClient: RPCApi.ServerToClient) {
|
||||
sendJobQueue.put(RpcSendJob.Send(contextDatabaseOrNull, clientAddress,
|
||||
serializationContextWithObservableContext, serverToClient))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private enum class State {
|
||||
@ -139,7 +156,20 @@ class RPCServer(
|
||||
private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry)
|
||||
private var deduplicationIdentity: String? = null
|
||||
|
||||
constructor(
|
||||
ops: OPS,
|
||||
rpcServerUsername: String,
|
||||
rpcServerPassword: String,
|
||||
serverLocator: ServerLocator,
|
||||
securityManager: RPCSecurityManager,
|
||||
nodeLegalName: CordaX500Name,
|
||||
rpcConfiguration: RPCServerConfiguration
|
||||
) : this(RPCOpsRouting.singleton(nodeLegalName, ops), rpcServerUsername, rpcServerPassword, serverLocator, securityManager, nodeLegalName, rpcConfiguration)
|
||||
|
||||
init {
|
||||
// It is assumed that all the identities have the same type of RPCOps associated with them.
|
||||
val ops = opsRouting[opsRouting.names().first()]
|
||||
|
||||
val groupedMethods = ops.javaClass.declaredMethods.groupBy { it.name }
|
||||
groupedMethods.forEach { name, methods ->
|
||||
if (methods.size > 1) {
|
||||
@ -360,16 +390,17 @@ class RPCServer(
|
||||
|
||||
private fun invokeRpc(context: RpcAuthContext, methodName: String, arguments: List<Any?>): Try<Any> {
|
||||
return Try.on {
|
||||
val targetLegalName = context.invocation.actor?.owningLegalIdentity ?: nodeLegalName
|
||||
try {
|
||||
CURRENT_RPC_CONTEXT.set(context)
|
||||
log.trace { "Calling $methodName" }
|
||||
val method = methodTable[methodName] ?:
|
||||
throw RPCException("Received RPC for unknown method $methodName - possible client/server version skew?")
|
||||
method.invoke(ops, *arguments.toTypedArray())
|
||||
log.trace { "Calling $methodName for legalName: '$targetLegalName'" }
|
||||
val method = methodTable[methodName]
|
||||
?: throw RPCException("Received RPC for unknown method $methodName - possible client/server version skew?")
|
||||
method.invoke(opsRouting[targetLegalName], *arguments.toTypedArray())
|
||||
} catch (e: InvocationTargetException) {
|
||||
throw e.cause ?: RPCException("Caught InvocationTargetException without cause")
|
||||
} catch (e: Exception) {
|
||||
log.warn("Caught exception attempting to invoke RPC $methodName", e)
|
||||
log.warn("Caught exception attempting to invoke RPC $methodName for legalName: '$targetLegalName'", e)
|
||||
throw e
|
||||
} finally {
|
||||
CURRENT_RPC_CONTEXT.remove()
|
||||
@ -387,7 +418,8 @@ class RPCServer(
|
||||
observableMap,
|
||||
clientAddressToObservables,
|
||||
deduplicationIdentity!!,
|
||||
clientAddress
|
||||
clientAddress,
|
||||
sendJobQueue
|
||||
)
|
||||
|
||||
val buffered = bufferIfQueueNotBound(clientAddress, reply, observableContext)
|
||||
@ -434,26 +466,7 @@ class RPCServer(
|
||||
return Pair(Actor(Id(validatedUser), securityManager.id, targetLegalIdentity), securityManager.buildSubject(validatedUser))
|
||||
}
|
||||
|
||||
/*
|
||||
* We construct an observable context on each RPC request. If subsequently a nested Observable is encountered this
|
||||
* same context is propagated by serialization context. This way all observations rooted in a single RPC will be
|
||||
* muxed correctly. Note that the context construction itself is quite cheap.
|
||||
*/
|
||||
inner class ObservableContext(
|
||||
override val observableMap: ObservableSubscriptionMap,
|
||||
override val clientAddressToObservables: ConcurrentHashMap<SimpleString, HashSet<InvocationId>>,
|
||||
override val deduplicationIdentity: String,
|
||||
override val clientAddress: SimpleString
|
||||
) : ObservableContextInterface {
|
||||
private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(
|
||||
observableContext = this,
|
||||
serializationContext = SerializationDefaults.RPC_SERVER_CONTEXT)
|
||||
|
||||
override fun sendMessage(serverToClient: RPCApi.ServerToClient) {
|
||||
sendJobQueue.put(RpcSendJob.Send(contextDatabaseOrNull, clientAddress,
|
||||
serializationContextWithObservableContext, serverToClient))
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class RpcSendJob {
|
||||
data class Send(
|
||||
|
@ -91,9 +91,9 @@ data class RpcBrokerHandle(
|
||||
val serverControl: ActiveMQServerControl
|
||||
)
|
||||
|
||||
data class RpcServerHandle(
|
||||
data class RpcServerHandle<I : RPCOps>(
|
||||
val broker: RpcBrokerHandle,
|
||||
val rpcServer: RPCServer
|
||||
val rpcServer: RPCServer<I>
|
||||
)
|
||||
|
||||
val rpcTestUser = User("user1", "test", permissions = emptySet())
|
||||
@ -247,7 +247,7 @@ data class RPCDriverDSL(
|
||||
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
|
||||
ops: I,
|
||||
queueDrainTimeout: Duration = 5.seconds
|
||||
): CordaFuture<RpcServerHandle> {
|
||||
): CordaFuture<RpcServerHandle<I>> {
|
||||
return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker ->
|
||||
startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker, queueDrainTimeout)
|
||||
}
|
||||
@ -316,7 +316,7 @@ data class RPCDriverDSL(
|
||||
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
|
||||
customPort: NetworkHostAndPort? = null,
|
||||
ops: I
|
||||
): CordaFuture<RpcServerHandle> {
|
||||
): CordaFuture<RpcServerHandle<I>> {
|
||||
return startRpcBroker(serverName, rpcUser, maxFileSize, maxBufferedBytesPerClient, customPort).map { broker ->
|
||||
startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker)
|
||||
}
|
||||
@ -472,7 +472,7 @@ data class RPCDriverDSL(
|
||||
ops: I,
|
||||
brokerHandle: RpcBrokerHandle,
|
||||
queueDrainTimeout: Duration = 5.seconds
|
||||
): RpcServerHandle {
|
||||
): RpcServerHandle<I> {
|
||||
val locator = ActiveMQClient.createServerLocatorWithoutHA(brokerHandle.clientTransportConfiguration).apply {
|
||||
minLargeMessageSize = MAX_MESSAGE_SIZE
|
||||
isUseGlobalPools = false
|
||||
|
Loading…
x
Reference in New Issue
Block a user