Use external bridge (#1384)

This commit is contained in:
cburlinchon 2018-09-11 10:22:07 +01:00 committed by GitHub
parent 4ffe2960db
commit 90c5f82514
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 12 deletions

View File

@ -40,6 +40,7 @@ dependencies {
integrationTestCompile project(":test-utils")
integrationTestCompile project(":node-driver")
compile project(":experimental:flow-worker")
compile project(":bridge")
}
jar {

View File

@ -1,5 +1,12 @@
package net.corda.rpcWorker
import net.corda.bridge.FirewallVersionInfo
import net.corda.bridge.internal.FirewallInstance
import net.corda.bridge.services.api.FirewallConfiguration
import net.corda.bridge.services.api.FirewallMode
import net.corda.bridge.services.config.BridgeInboundConfigurationImpl
import net.corda.bridge.services.config.BridgeOutboundConfigurationImpl
import net.corda.bridge.services.config.FirewallConfigurationImpl
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.sign
@ -23,7 +30,6 @@ import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.network.NodeInfoWatcher
import net.corda.node.services.rpc.ArtemisRpcBroker
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.bridging.BridgeControlListener
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.testing.core.DUMMY_BANK_A_NAME
@ -57,7 +63,7 @@ data class RpcFlowWorkerHandle(val rpcAddress: NetworkHostAndPort)
data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : InternalDriverDSL by driverDSL {
fun startRpcFlowWorker(myLegalName: CordaX500Name, rpcUsers: List<net.corda.testing.node.User>, numberOfFlowWorkers: Int = 1): CordaFuture<RpcFlowWorkerHandle> {
val (config, rpcWorkerConfig, flowWorkerConfigs) = generateConfigs(myLegalName, rpcUsers, numberOfFlowWorkers)
val (config, rpcWorkerConfig, flowWorkerConfigs, bridgeConfig) = generateConfigs(myLegalName, rpcUsers, numberOfFlowWorkers)
val trustRoot = rpcWorkerConfig.p2pSslOptions.trustStore.get().query { getCertificate(X509Utilities.CORDA_ROOT_CA) }
val nodeCa = rpcWorkerConfig.signingCertificateStore.get().query { getCertificate(X509Utilities.CORDA_CLIENT_CA) }
@ -65,7 +71,7 @@ data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : Intern
val ourKeyPair = Crypto.generateKeyPair()
val ourParty = Party(myLegalName, ourKeyPair.public)
val ourPartyAndCertificate = getTestPartyAndCertificate(ourParty)
val myInfo = NodeInfo(listOf(config.messagingServerAddress!!), listOf(ourPartyAndCertificate), 1, 1)
val myInfo = NodeInfo(listOf(bridgeConfig.inboundConfig!!.listeningAddress), listOf(ourPartyAndCertificate), 4, 1)
val nodeInfoAndSigned = NodeInfoAndSigned(myInfo) { _, serialised ->
ourKeyPair.private.sign(serialised.bytes)
@ -89,10 +95,10 @@ data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : Intern
val (rpcWorker, rpcWorkerServiceHub) = createRpcWorker(rpcWorkerConfig, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa, rpcWorkerBroker.serverControl)
val bridgeControlListener = createBridgeControlListener(config, signedNetworkParameters.networkParameters.maxMessageSize)
val bridge = createBridge(bridgeConfig)
shutdownManager.registerShutdown {
bridgeControlListener.stop()
bridge.stop()
rpcWorker.stop()
flowWorkerBroker.stop()
rpcWorkerBroker.stop()
@ -104,13 +110,17 @@ data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : Intern
}
}
private fun generateConfigs(myLegalName: CordaX500Name, rpcUsers: List<net.corda.testing.node.User>, numberOfFlowWorkers: Int): Triple<NodeConfiguration, NodeConfiguration, List<NodeConfiguration>> {
private data class ConfigWrapper(val config: NodeConfiguration, val rpcWorkerConfig: NodeConfiguration, val flowWorkerConfigs: List<NodeConfiguration>, val bridgeConfig: FirewallConfiguration)
private fun generateConfigs(myLegalName: CordaX500Name, rpcUsers: List<net.corda.testing.node.User>, numberOfFlowWorkers: Int): ConfigWrapper {
val cordappDirectories = TestCordappDirectories.cached(driverDSL.cordappsForAllNodes).toList()
val rpcWorkerBrokerAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
val rpcWorkerBrokerAdminAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
val flowWorkerBrokerAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
val bridgeListeningAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
val baseDirectory = driverDSL.driverDirectory / myLegalName.organisation
baseDirectory.createDirectory()
@ -134,7 +144,12 @@ data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : Intern
flowWorkerConfig
}
return Triple(config, rpcWorkerConfig, flowWorkerConfigs)
val bridgeConfig = FirewallConfigurationImpl(baseDirectory = baseDirectory / "rpcWorker", crlCheckSoftFail = true,
bridgeInnerConfig = null, keyStorePassword = "pass", trustStorePassword = "pass", firewallMode = FirewallMode.SenderReceiver,
networkParametersPath = baseDirectory / "rpcWorker", outboundConfig = BridgeOutboundConfigurationImpl(flowWorkerBrokerAddress, listOf(), null, null),
inboundConfig = BridgeInboundConfigurationImpl(bridgeListeningAddress, null), enableAMQPPacketTrace = false, floatOuterConfig = null, haConfig = null)
return ConfigWrapper(config, rpcWorkerConfig, flowWorkerConfigs, bridgeConfig)
}
}
@ -143,7 +158,7 @@ private fun genericConfig(): NodeConfigurationImpl {
keyStorePassword = "pass", trustStorePassword = "pass", crlCheckSoftFail = true, dataSourceProperties = Properties(),
rpcUsers = listOf(), verifierType = VerifierType.InMemory, flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0),
p2pAddress = NetworkHostAndPort("localhost", 1), rpcSettings = NodeRpcSettings(NetworkHostAndPort("localhost", 1), null, ssl = null),
relay = null, messagingServerAddress = null, enterpriseConfiguration = EnterpriseConfiguration(mutualExclusionConfiguration = MutualExclusionConfiguration(updateInterval = 0, waitInterval = 0)),
relay = null, messagingServerAddress = null, enterpriseConfiguration = EnterpriseConfiguration(mutualExclusionConfiguration = MutualExclusionConfiguration(updateInterval = 0, waitInterval = 0), externalBridge = true),
notary = null)
}
@ -179,8 +194,8 @@ private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, networ
return Pair(flowWorker, flowWorkerServiceHub)
}
private fun createBridgeControlListener(config: NodeConfiguration, maxMessageSize: Int): BridgeControlListener {
val bridgeControlListener = BridgeControlListener(config.p2pSslOptions, config.messagingServerAddress!!, maxMessageSize)
bridgeControlListener.start()
return bridgeControlListener
private fun createBridge(firewallConfiguration: FirewallConfiguration): FirewallInstance {
val bridge = FirewallInstance(firewallConfiguration, FirewallVersionInfo(4, "1.1", "Dummy", "Test"))
bridge.start()
return bridge
}