mirror of
https://github.com/corda/corda.git
synced 2025-05-31 22:50:53 +00:00
[CORDA-1937]: Fixes to enterprise float and bridge.
This commit is contained in:
parent
fbaa31e9d2
commit
31e58dd2e5
@ -17,12 +17,12 @@ import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.nodeapi.internal.*
|
||||
import net.corda.nodeapi.internal.bridging.BridgeControl
|
||||
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.crypto.*
|
||||
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.curator.test.TestingServer
|
||||
@ -67,22 +67,20 @@ class BridgeSmokeTest {
|
||||
|
||||
@Test
|
||||
fun `Run full features bridge from jar to ensure everything works`() {
|
||||
val artemisConfig = object : NodeSSLConfiguration {
|
||||
override val baseDirectory: Path = tempFolder.root.toPath()
|
||||
override val keyStorePassword: String = "cordacadevpass"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
override val crlCheckSoftFail: Boolean = true
|
||||
}
|
||||
|
||||
val baseDirectory = tempFolder.root.toPath().createDirectories()
|
||||
val artemisConfig = CertificateStoreStubs.P2P.withBaseDirectory(baseDirectory)
|
||||
|
||||
artemisConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
|
||||
copyBridgeResource("corda-firewall.jar")
|
||||
copyBridgeResource("firewall.conf")
|
||||
createNetworkParams(tempFolder.root.toPath())
|
||||
createNetworkParams(baseDirectory)
|
||||
val (artemisServer, artemisClient) = createArtemis()
|
||||
val zkServer = TestingServer(11105, false)
|
||||
try {
|
||||
installBridgeControlResponder(artemisClient)
|
||||
zkServer.start()
|
||||
val bridge = startBridge(tempFolder.root.toPath())
|
||||
val bridge = startBridge(baseDirectory)
|
||||
waitForBridge(bridge)
|
||||
} finally {
|
||||
zkServer.close()
|
||||
@ -115,18 +113,18 @@ class BridgeSmokeTest {
|
||||
copier.install(baseDirectory)
|
||||
}
|
||||
|
||||
private fun SSLConfiguration.createBridgeKeyStores(legalName: CordaX500Name,
|
||||
rootCert: X509Certificate = DEV_ROOT_CA.certificate,
|
||||
intermediateCa: CertificateAndKeyPair = DEV_INTERMEDIATE_CA) {
|
||||
private fun MutualSslConfiguration.createBridgeKeyStores(legalName: CordaX500Name,
|
||||
rootCert: X509Certificate = DEV_ROOT_CA.certificate,
|
||||
intermediateCa: CertificateAndKeyPair = DEV_INTERMEDIATE_CA) {
|
||||
|
||||
certificatesDirectory.createDirectories()
|
||||
if (!trustStoreFile.exists()) {
|
||||
loadKeyStore(javaClass.classLoader.getResourceAsStream("certificates/$DEV_CA_TRUST_STORE_FILE"), DEV_CA_TRUST_STORE_PASS).save(trustStoreFile, trustStorePassword)
|
||||
if (!trustStore.path.exists()) {
|
||||
val trustStore = trustStore.get(true)
|
||||
loadDevCaTrustStore().copyTo(trustStore)
|
||||
}
|
||||
|
||||
val (nodeCaCert, nodeCaKeyPair) = createDevNodeCa(intermediateCa, legalName)
|
||||
|
||||
val sslKeyStore = loadSslKeyStore(createNew = true)
|
||||
val sslKeyStore = keyStore.get(createNew = true)
|
||||
sslKeyStore.update {
|
||||
val tlsKeyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
|
||||
val tlsCert = X509Utilities.createCertificate(CertificateType.TLS, nodeCaCert, nodeCaKeyPair, legalName.x500Principal, tlsKeyPair.public)
|
||||
@ -203,17 +201,23 @@ class BridgeSmokeTest {
|
||||
}
|
||||
|
||||
private fun createArtemis(): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
||||
val baseDirectory = tempFolder.root.toPath()
|
||||
val certificatesDirectory = baseDirectory / "certificates"
|
||||
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
|
||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(tempFolder.root.toPath()).whenever(it).baseDirectory
|
||||
doReturn(baseDirectory).whenever(it).baseDirectory
|
||||
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
|
||||
doReturn(ALICE_NAME).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(NetworkHostAndPort("localhost", 11005)).whenever(it).p2pAddress
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
|
||||
}
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", 11005), MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", 11005), MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, NetworkHostAndPort("localhost", 11005), MAX_MESSAGE_SIZE)
|
||||
artemisServer.start()
|
||||
artemisClient.start()
|
||||
return Pair(artemisServer, artemisClient)
|
||||
|
@ -28,6 +28,7 @@ import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.core.MAX_MESSAGE_SIZE
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.curator.test.TestingServer
|
||||
@ -387,61 +388,76 @@ class BridgeIntegrationTest {
|
||||
|
||||
|
||||
private fun createArtemis(): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
||||
val baseDirectory = tempFolder.root.toPath()
|
||||
val certificatesDirectory = baseDirectory / "certificates"
|
||||
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(tempFolder.root.toPath()).whenever(it).baseDirectory
|
||||
doReturn(baseDirectory).whenever(it).baseDirectory
|
||||
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
|
||||
doReturn(ALICE_NAME).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
doReturn(NetworkHostAndPort("localhost", 11005)).whenever(it).p2pAddress
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
|
||||
}
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", 11005), MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", 11005), MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, NetworkHostAndPort("localhost", 11005), MAX_MESSAGE_SIZE)
|
||||
artemisServer.start()
|
||||
artemisClient.start()
|
||||
return Pair(artemisServer, artemisClient)
|
||||
}
|
||||
|
||||
private fun createArtemis2(): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
||||
val originalCertsFolderPath = tempFolder.root.toPath() / "certificates"
|
||||
val folderPath = tempFolder.root.toPath() / "artemis2"
|
||||
val baseDirectory = tempFolder.root.toPath()
|
||||
val originalCertsFolderPath = baseDirectory / "certificates"
|
||||
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(originalCertsFolderPath)
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(originalCertsFolderPath)
|
||||
|
||||
val folderPath = baseDirectory / "artemis2"
|
||||
val newCertsFolderPath = folderPath / "certificates"
|
||||
newCertsFolderPath.createDirectories()
|
||||
(originalCertsFolderPath / "truststore.jks").copyToDirectory(newCertsFolderPath)
|
||||
(originalCertsFolderPath / "sslkeystore.jks").copyToDirectory(newCertsFolderPath)
|
||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(folderPath).whenever(it).baseDirectory
|
||||
doReturn(originalCertsFolderPath).whenever(it).certificatesDirectory
|
||||
doReturn(ALICE_NAME).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
doReturn(NetworkHostAndPort("localhost", 12005)).whenever(it).p2pAddress
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
|
||||
}
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", 12005), MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", 12005), MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, NetworkHostAndPort("localhost", 12005), MAX_MESSAGE_SIZE)
|
||||
return Pair(artemisServer, artemisClient)
|
||||
}
|
||||
|
||||
private fun createDummyPeerArtemis(): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
||||
val originalCertsFolderPath = tempFolder.root.toPath() / "certificates"
|
||||
val folderPath = tempFolder.root.toPath() / "artemis3"
|
||||
val baseDirectory = tempFolder.root.toPath()
|
||||
val originalCertsFolderPath = baseDirectory / "certificates"
|
||||
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(originalCertsFolderPath)
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(originalCertsFolderPath)
|
||||
|
||||
val folderPath = baseDirectory / "artemis3"
|
||||
val newCertsFolderPath = folderPath / "certificates"
|
||||
newCertsFolderPath.createDirectories()
|
||||
(originalCertsFolderPath / "truststore.jks").copyToDirectory(newCertsFolderPath)
|
||||
(originalCertsFolderPath / "sslkeystore.jks").copyToDirectory(newCertsFolderPath)
|
||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(folderPath).whenever(it).baseDirectory
|
||||
doReturn(newCertsFolderPath).whenever(it).certificatesDirectory
|
||||
doReturn(DUMMY_BANK_A_NAME).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
doReturn(NetworkHostAndPort("localhost", 7890)).whenever(it).p2pAddress
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
|
||||
}
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", 7890), MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", 7890), MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, NetworkHostAndPort("localhost", 7890), MAX_MESSAGE_SIZE)
|
||||
artemisServer.start()
|
||||
artemisClient.start()
|
||||
artemisClient.started!!.session.createQueue(SimpleString("${P2P_PREFIX}12345"), RoutingType.ANYCAST, SimpleString("${P2P_PREFIX}12345"), true)
|
||||
|
@ -12,6 +12,7 @@ import net.corda.core.internal.div
|
||||
import net.corda.core.internal.readAll
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.config.CertificateStore
|
||||
import net.corda.nodeapi.internal.crypto.X509KeyStore
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
@ -25,7 +26,6 @@ import org.junit.Assert.assertArrayEquals
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.security.KeyStore
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class AMQPListenerTest {
|
||||
@ -58,14 +58,14 @@ class AMQPListenerTest {
|
||||
assertEquals(true, stateFollower.next())
|
||||
assertEquals(true, amqpListenerService.active)
|
||||
assertEquals(false, serverListening("localhost", 10005))
|
||||
val keyStoreBytes = bridgeConfig.sslKeystore.readAll()
|
||||
val trustStoreBytes = bridgeConfig.trustStoreFile.readAll()
|
||||
val keyStoreBytes = bridgeConfig.p2pSslOptions.keyStore.path.readAll()
|
||||
val trustStoreBytes = bridgeConfig.p2pSslOptions.trustStore.path.readAll()
|
||||
// start listening
|
||||
amqpListenerService.provisionKeysAndActivate(keyStoreBytes,
|
||||
bridgeConfig.keyStorePassword.toCharArray(),
|
||||
bridgeConfig.keyStorePassword.toCharArray(),
|
||||
bridgeConfig.p2pSslOptions.keyStore.password.toCharArray(),
|
||||
bridgeConfig.p2pSslOptions.keyStore.password.toCharArray(),
|
||||
trustStoreBytes,
|
||||
bridgeConfig.trustStorePassword.toCharArray())
|
||||
bridgeConfig.p2pSslOptions.trustStore.password.toCharArray())
|
||||
// Fire lots of activity to prove we are good
|
||||
assertEquals(TestAuditService.AuditEvent.STATUS_CHANGE, auditFollower.next())
|
||||
assertEquals(true, amqpListenerService.active)
|
||||
@ -76,12 +76,11 @@ class AMQPListenerTest {
|
||||
assertEquals(TestAuditService.AuditEvent.FAILED_CONNECTION, auditFollower.next())
|
||||
val clientConfig = createAndLoadConfigFromResource(tempFolder.root.toPath() / "client", configResource)
|
||||
clientConfig.createBridgeKeyStores(DUMMY_BANK_B_NAME)
|
||||
val clientKeyStore = clientConfig.loadSslKeyStore().internal
|
||||
val clientTrustStore = clientConfig.loadTrustStore().internal
|
||||
val clientKeyStore = clientConfig.p2pSslOptions.keyStore.get()
|
||||
val clientTrustStore = clientConfig.p2pSslOptions.trustStore.get()
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = clientKeyStore
|
||||
override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = clientTrustStore
|
||||
override val keyStore = clientKeyStore
|
||||
override val trustStore = clientTrustStore
|
||||
override val maxMessageSize: Int = maxMessageSize
|
||||
override val trace: Boolean = true
|
||||
}
|
||||
@ -134,26 +133,28 @@ class AMQPListenerTest {
|
||||
val amqpListenerService = BridgeAMQPListenerServiceImpl(bridgeConfig, maxMessageSize, auditService)
|
||||
amqpListenerService.start()
|
||||
auditService.start()
|
||||
val keyStoreBytes = bridgeConfig.sslKeystore.readAll()
|
||||
val trustStoreBytes = bridgeConfig.trustStoreFile.readAll()
|
||||
val keyStoreBytes = bridgeConfig.p2pSslOptions.keyStore.path.readAll()
|
||||
val trustStoreBytes = bridgeConfig.p2pSslOptions.trustStore.path.readAll()
|
||||
// start listening
|
||||
amqpListenerService.provisionKeysAndActivate(keyStoreBytes,
|
||||
bridgeConfig.keyStorePassword.toCharArray(),
|
||||
bridgeConfig.keyStorePassword.toCharArray(),
|
||||
bridgeConfig.p2pSslOptions.keyStore.password.toCharArray(),
|
||||
bridgeConfig.p2pSslOptions.keyStore.password.toCharArray(),
|
||||
trustStoreBytes,
|
||||
bridgeConfig.trustStorePassword.toCharArray())
|
||||
bridgeConfig.p2pSslOptions.trustStore.password.toCharArray())
|
||||
val connectionFollower = amqpListenerService.onConnection.toBlocking().iterator
|
||||
val auditFollower = auditService.onAuditEvent.toBlocking().iterator
|
||||
val clientKeys = Crypto.generateKeyPair(ECDSA_SECP256R1_SHA256)
|
||||
val clientCert = X509Utilities.createSelfSignedCACertificate(ALICE_NAME.x500Principal, clientKeys)
|
||||
val clientKeyStore = X509KeyStore("password")
|
||||
clientKeyStore.setPrivateKey("TLS_CERT", clientKeys.private, listOf(clientCert))
|
||||
val clientTrustStore = X509KeyStore("password")
|
||||
clientTrustStore.setCertificate("TLS_ROOT", clientCert)
|
||||
val clientKeyStoreRaw = X509KeyStore("password")
|
||||
clientKeyStoreRaw.setPrivateKey("TLS_CERT", clientKeys.private, listOf(clientCert))
|
||||
val clientKeyStore = CertificateStore.of(clientKeyStoreRaw, "password")
|
||||
|
||||
val clientTrustStoreRaw = X509KeyStore("password")
|
||||
clientTrustStoreRaw.setCertificate("TLS_ROOT", clientCert)
|
||||
val clientTrustStore = CertificateStore.of(clientTrustStoreRaw, "password")
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = clientKeyStore.internal
|
||||
override val keyStorePrivateKeyPassword: CharArray = "password".toCharArray()
|
||||
override val trustStore: KeyStore = clientTrustStore.internal
|
||||
override val keyStore = clientKeyStore
|
||||
override val trustStore = clientTrustStore
|
||||
override val maxMessageSize: Int = maxMessageSize
|
||||
override val trace: Boolean = true
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import net.corda.bridge.createAndLoadConfigFromResource
|
||||
import net.corda.bridge.createBridgeKeyStores
|
||||
import net.corda.bridge.createNetworkParams
|
||||
import net.corda.bridge.services.artemis.BridgeArtemisConnectionServiceImpl
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.services.config.EnterpriseConfiguration
|
||||
import net.corda.node.services.config.MutualExclusionConfiguration
|
||||
@ -15,6 +16,7 @@ import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.core.MAX_MESSAGE_SIZE
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
@ -84,11 +86,17 @@ class ArtemisConnectionTest {
|
||||
|
||||
|
||||
private fun createArtemis(): ArtemisMessagingServer {
|
||||
|
||||
val baseDirectory = tempFolder.root.toPath()
|
||||
val certificatesDirectory = baseDirectory / "certificates"
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
val p2pSslOptions = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
|
||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(tempFolder.root.toPath()).whenever(it).baseDirectory
|
||||
doReturn(baseDirectory).whenever(it).baseDirectory
|
||||
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
|
||||
doReturn(DUMMY_BANK_A_NAME).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(p2pSslOptions).whenever(it).p2pSslOptions
|
||||
doReturn(NetworkHostAndPort("localhost", 11005)).whenever(it).p2pAddress
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
|
||||
|
@ -2,8 +2,7 @@ package net.corda.bridge.services.api
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
|
||||
import java.nio.file.Path
|
||||
|
||||
@ -28,12 +27,7 @@ enum class FirewallMode {
|
||||
FloatOuter
|
||||
}
|
||||
|
||||
interface BridgeSSLConfiguration : SSLConfiguration {
|
||||
override val keyStorePassword: String
|
||||
override val trustStorePassword: String
|
||||
override val sslKeystore: Path
|
||||
override val trustStoreFile: Path
|
||||
}
|
||||
interface BridgeSSLConfiguration : MutualSslConfiguration
|
||||
|
||||
|
||||
/**
|
||||
@ -91,7 +85,8 @@ interface FloatOuterConfiguration {
|
||||
val customSSLConfiguration: BridgeSSLConfiguration?
|
||||
}
|
||||
|
||||
interface FirewallConfiguration : NodeSSLConfiguration {
|
||||
interface FirewallConfiguration {
|
||||
val baseDirectory: Path
|
||||
val firewallMode: FirewallMode
|
||||
val outboundConfig: BridgeOutboundConfiguration?
|
||||
val inboundConfig: BridgeInboundConfiguration?
|
||||
@ -115,4 +110,6 @@ interface FirewallConfiguration : NodeSSLConfiguration {
|
||||
// This is relevant to bridges, because we push messages into the inbox and use the async acknowledgement responses to reply to sender.
|
||||
val p2pConfirmationWindowSize: Int
|
||||
val whitelistedHeaders: List<String>
|
||||
val crlCheckSoftFail: Boolean
|
||||
val p2pSslOptions: MutualSslConfiguration
|
||||
}
|
@ -1,15 +1,15 @@
|
||||
package net.corda.bridge.services.artemis
|
||||
|
||||
import net.corda.bridge.services.api.*
|
||||
import net.corda.bridge.services.config.BridgeSSLConfigurationImpl
|
||||
import net.corda.bridge.services.util.ServiceStateCombiner
|
||||
import net.corda.bridge.services.util.ServiceStateHelper
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.InternalArtemisTcpTransport
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import org.apache.activemq.artemis.api.core.client.FailoverEventType
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
||||
@ -34,13 +34,13 @@ class BridgeArtemisConnectionServiceImpl(val conf: FirewallConfiguration,
|
||||
}
|
||||
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val sslConfiguration: BridgeSSLConfiguration
|
||||
private val sslConfiguration: MutualSslConfiguration
|
||||
private val statusFollower: ServiceStateCombiner
|
||||
private var statusSubscriber: Subscription? = null
|
||||
|
||||
init {
|
||||
statusFollower = ServiceStateCombiner(listOf(auditService))
|
||||
sslConfiguration = conf.outboundConfig?.customSSLConfiguration ?: BridgeSSLConfigurationImpl(conf)
|
||||
sslConfiguration = conf.outboundConfig?.customSSLConfiguration ?: conf.p2pSslOptions
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
@ -61,7 +61,7 @@ class BridgeArtemisConnectionServiceImpl(val conf: FirewallConfiguration,
|
||||
log.info("Connecting to message broker: ${outboundConf.artemisBrokerAddress}")
|
||||
val brokerAddresses = listOf(outboundConf.artemisBrokerAddress) + outboundConf.alternateArtemisBrokerAddresses
|
||||
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
|
||||
val tcpTransports = brokerAddresses.map { ArtemisTcpTransport.p2pConnectorTcpTransport(it, sslConfiguration) }
|
||||
val tcpTransports = brokerAddresses.map { InternalArtemisTcpTransport.p2pConnectorTcpTransport(it, sslConfiguration) }
|
||||
locator = ActiveMQClient.createServerLocatorWithoutHA(*tcpTransports.toTypedArray()).apply {
|
||||
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
||||
// would be the default and the two lines below can be deleted.
|
||||
|
@ -6,22 +6,23 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
|
||||
import net.corda.nodeapi.internal.config.SslConfiguration
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.config.parseAs
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
|
||||
|
||||
fun Config.parseAsFirewallConfiguration(): FirewallConfiguration = parseAs<FirewallConfigurationImpl>()
|
||||
|
||||
data class BridgeSSLConfigurationImpl(override val keyStorePassword: String,
|
||||
override val trustStorePassword: String,
|
||||
override val certificatesDirectory: Path = Paths.get("certificates"),
|
||||
override val sslKeystore: Path = certificatesDirectory / "sslkeystore.jks",
|
||||
override val trustStoreFile: Path = certificatesDirectory / "truststore.jks",
|
||||
override val crlCheckSoftFail: Boolean) : BridgeSSLConfiguration {
|
||||
constructor(config: NodeSSLConfiguration) : this(config.keyStorePassword, config.trustStorePassword, config.certificatesDirectory, config.sslKeystore, config.trustStoreFile, config.crlCheckSoftFail)
|
||||
data class BridgeSSLConfigurationImpl(private val sslKeystore: Path,
|
||||
private val keyStorePassword: String,
|
||||
private val trustStoreFile: Path,
|
||||
private val trustStorePassword: String,
|
||||
private val crlCheckSoftFail: Boolean) : BridgeSSLConfiguration {
|
||||
|
||||
override val keyStore = FileBasedCertificateStoreSupplier(sslKeystore, keyStorePassword)
|
||||
override val trustStore = FileBasedCertificateStoreSupplier(trustStoreFile, trustStorePassword)
|
||||
}
|
||||
|
||||
data class BridgeOutboundConfigurationImpl(override val artemisBrokerAddress: NetworkHostAndPort,
|
||||
@ -45,12 +46,12 @@ data class BridgeHAConfigImpl(override val haConnectionString: String, override
|
||||
|
||||
data class FirewallConfigurationImpl(
|
||||
override val baseDirectory: Path,
|
||||
override val certificatesDirectory: Path = baseDirectory / "certificates",
|
||||
override val sslKeystore: Path = certificatesDirectory / "sslkeystore.jks",
|
||||
override val trustStoreFile: Path = certificatesDirectory / "truststore.jks",
|
||||
private val certificatesDirectory: Path = baseDirectory / "certificates",
|
||||
private val sslKeystore: Path = certificatesDirectory / "sslkeystore.jks",
|
||||
private val trustStoreFile: Path = certificatesDirectory / "truststore.jks",
|
||||
override val crlCheckSoftFail: Boolean,
|
||||
override val keyStorePassword: String,
|
||||
override val trustStorePassword: String,
|
||||
private val keyStorePassword: String,
|
||||
private val trustStorePassword: String,
|
||||
override val firewallMode: FirewallMode,
|
||||
override val networkParametersPath: Path,
|
||||
override val outboundConfig: BridgeOutboundConfigurationImpl?,
|
||||
@ -74,6 +75,12 @@ data class FirewallConfigurationImpl(
|
||||
require(inboundConfig != null && floatOuterConfig != null) { "Missing required configuration" }
|
||||
}
|
||||
}
|
||||
|
||||
private val p2pKeystorePath = sslKeystore
|
||||
private val p2pKeyStore = FileBasedCertificateStoreSupplier(p2pKeystorePath, keyStorePassword)
|
||||
private val p2pTrustStoreFilePath = trustStoreFile
|
||||
private val p2pTrustStore = FileBasedCertificateStoreSupplier(p2pTrustStoreFilePath, trustStorePassword)
|
||||
override val p2pSslOptions: MutualSslConfiguration = SslConfiguration.mutual(p2pKeyStore, p2pTrustStore)
|
||||
}
|
||||
|
||||
|
||||
|
@ -7,7 +7,9 @@ import net.corda.bridge.services.api.ServiceStateSupport
|
||||
import net.corda.bridge.services.util.ServiceStateCombiner
|
||||
import net.corda.bridge.services.util.ServiceStateHelper
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.config.CertificateStore
|
||||
import net.corda.nodeapi.internal.crypto.KEYSTORE_TYPE
|
||||
import net.corda.nodeapi.internal.crypto.X509KeyStore
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||
@ -48,13 +50,13 @@ class BridgeAMQPListenerServiceImpl(val conf: FirewallConfiguration,
|
||||
trustStorePassword: CharArray) {
|
||||
require(active) { "AuditService must be active" }
|
||||
require(keyStorePassword !== keyStorePrivateKeyPassword) { "keyStorePassword and keyStorePrivateKeyPassword must reference distinct arrays!" }
|
||||
val keyStore = loadKeyStoreAndWipeKeys(keyStoreBytes, keyStorePassword)
|
||||
val trustStore = loadKeyStoreAndWipeKeys(trustStoreBytes, trustStorePassword)
|
||||
|
||||
val keyStore = CertificateStore.of(loadKeyStore(keyStoreBytes, keyStorePassword), java.lang.String.valueOf(keyStorePrivateKeyPassword)).also { wipeKeys(keyStoreBytes, keyStorePassword) }
|
||||
val trustStore = CertificateStore.of(loadKeyStore(trustStoreBytes, trustStorePassword), java.lang.String.valueOf(trustStorePassword)).also { wipeKeys(trustStoreBytes, trustStorePassword) }
|
||||
val bindAddress = conf.inboundConfig!!.listeningAddress
|
||||
val amqpConfiguration = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = keyStore
|
||||
override val keyStorePrivateKeyPassword: CharArray = keyStorePrivateKeyPassword
|
||||
override val trustStore: KeyStore = trustStore
|
||||
override val keyStore = keyStore
|
||||
override val trustStore = trustStore
|
||||
override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail
|
||||
override val maxMessageSize: Int = maximumMessageSize
|
||||
override val trace: Boolean = conf.enableAMQPPacketTrace
|
||||
@ -80,15 +82,18 @@ class BridgeAMQPListenerServiceImpl(val conf: FirewallConfiguration,
|
||||
consoleLogger.info(msg)
|
||||
}
|
||||
|
||||
private fun loadKeyStoreAndWipeKeys(keyStoreBytes: ByteArray, keyStorePassword: CharArray): KeyStore {
|
||||
private fun wipeKeys(keyStoreBytes: ByteArray, keyStorePassword: CharArray) {
|
||||
// We overwrite the keys we don't need anymore
|
||||
Arrays.fill(keyStoreBytes, 0xAA.toByte())
|
||||
Arrays.fill(keyStorePassword, 0xAA55.toChar())
|
||||
}
|
||||
|
||||
private fun loadKeyStore(keyStoreBytes: ByteArray, keyStorePassword: CharArray): X509KeyStore {
|
||||
val keyStore = KeyStore.getInstance(KEYSTORE_TYPE)
|
||||
ByteArrayInputStream(keyStoreBytes).use {
|
||||
keyStore.load(it, keyStorePassword)
|
||||
}
|
||||
// We overwrite the keys we don't need anymore
|
||||
Arrays.fill(keyStoreBytes, 0xAA.toByte())
|
||||
Arrays.fill(keyStorePassword, 0xAA55.toChar())
|
||||
return keyStore
|
||||
return X509KeyStore(keyStore, java.lang.String.valueOf(keyStorePassword))
|
||||
}
|
||||
|
||||
override fun wipeKeysAndDeactivate() {
|
||||
|
@ -1,7 +1,6 @@
|
||||
package net.corda.bridge.services.receiver
|
||||
|
||||
import net.corda.bridge.services.api.*
|
||||
import net.corda.bridge.services.config.BridgeSSLConfigurationImpl
|
||||
import net.corda.bridge.services.receiver.FloatControlTopics.FLOAT_DATA_TOPIC
|
||||
import net.corda.bridge.services.util.ServiceStateCombiner
|
||||
import net.corda.bridge.services.util.ServiceStateHelper
|
||||
@ -12,13 +11,13 @@ import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
|
||||
import rx.Subscription
|
||||
import java.security.KeyStore
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import kotlin.concurrent.withLock
|
||||
|
||||
@ -39,7 +38,7 @@ class FloatControlListenerService(val conf: FirewallConfiguration,
|
||||
private var connectSubscriber: Subscription? = null
|
||||
private var receiveSubscriber: Subscription? = null
|
||||
private var amqpControlServer: AMQPServer? = null
|
||||
private val sslConfiguration: BridgeSSLConfiguration
|
||||
private val sslConfiguration: MutualSslConfiguration
|
||||
private val floatControlAddress = conf.floatOuterConfig!!.floatAddress
|
||||
private val floatClientName = conf.floatOuterConfig!!.expectedCertificateSubject
|
||||
private var activeConnectionInfo: ConnectionChange? = null
|
||||
@ -48,7 +47,7 @@ class FloatControlListenerService(val conf: FirewallConfiguration,
|
||||
|
||||
init {
|
||||
statusFollower = ServiceStateCombiner(listOf(auditService, amqpListener))
|
||||
sslConfiguration = conf.floatOuterConfig?.customSSLConfiguration ?: BridgeSSLConfigurationImpl(conf)
|
||||
sslConfiguration = conf.floatOuterConfig?.customSSLConfiguration ?: conf.p2pSslOptions
|
||||
}
|
||||
|
||||
|
||||
@ -68,15 +67,13 @@ class FloatControlListenerService(val conf: FirewallConfiguration,
|
||||
|
||||
private fun startControlListener() {
|
||||
lock.withLock {
|
||||
val keyStore = sslConfiguration.loadSslKeyStore().internal
|
||||
val keyStorePrivateKeyPassword = sslConfiguration.keyStorePassword
|
||||
val trustStore = sslConfiguration.loadTrustStore().internal
|
||||
val keyStore = sslConfiguration.keyStore.get()
|
||||
val trustStore = sslConfiguration.trustStore.get()
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val userName: String? = null
|
||||
override val password: String? = null
|
||||
override val keyStore: KeyStore = keyStore
|
||||
override val keyStorePrivateKeyPassword: CharArray = keyStorePrivateKeyPassword.toCharArray()
|
||||
override val trustStore: KeyStore = trustStore
|
||||
override val keyStore = keyStore
|
||||
override val trustStore = trustStore
|
||||
override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail
|
||||
override val maxMessageSize: Int = maximumMessageSize
|
||||
override val trace: Boolean = conf.enableAMQPPacketTrace
|
||||
|
@ -5,7 +5,7 @@ import net.corda.bridge.services.util.ServiceStateCombiner
|
||||
import net.corda.bridge.services.util.ServiceStateHelper
|
||||
import net.corda.core.internal.readAll
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||
import rx.Subscription
|
||||
|
||||
@ -22,23 +22,23 @@ class InProcessBridgeReceiverService(val conf: FirewallConfiguration,
|
||||
private val statusFollower: ServiceStateCombiner
|
||||
private var statusSubscriber: Subscription? = null
|
||||
private var receiveSubscriber: Subscription? = null
|
||||
private val sslConfiguration: SSLConfiguration
|
||||
private val sslConfiguration: MutualSslConfiguration
|
||||
|
||||
init {
|
||||
statusFollower = ServiceStateCombiner(listOf(auditService, haService, amqpListenerService, filterService))
|
||||
sslConfiguration = conf.inboundConfig?.customSSLConfiguration ?: conf
|
||||
sslConfiguration = conf.inboundConfig?.customSSLConfiguration ?: conf.p2pSslOptions
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
statusSubscriber = statusFollower.activeChange.subscribe({
|
||||
if (it) {
|
||||
val keyStoreBytes = sslConfiguration.sslKeystore.readAll()
|
||||
val trustStoreBytes = sslConfiguration.trustStoreFile.readAll()
|
||||
val keyStoreBytes = sslConfiguration.keyStore.path.readAll()
|
||||
val trustStoreBytes = sslConfiguration.trustStore.path.readAll()
|
||||
amqpListenerService.provisionKeysAndActivate(keyStoreBytes,
|
||||
sslConfiguration.keyStorePassword.toCharArray(),
|
||||
sslConfiguration.keyStorePassword.toCharArray(),
|
||||
sslConfiguration.keyStore.password.toCharArray(),
|
||||
sslConfiguration.keyStore.password.toCharArray(),
|
||||
trustStoreBytes,
|
||||
sslConfiguration.trustStorePassword.toCharArray())
|
||||
sslConfiguration.trustStore.password.toCharArray())
|
||||
} else {
|
||||
if (amqpListenerService.running) {
|
||||
amqpListenerService.wipeKeysAndDeactivate()
|
||||
|
@ -13,7 +13,7 @@ import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||
@ -21,7 +21,6 @@ import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
|
||||
import rx.Subscription
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.security.KeyStore
|
||||
import java.security.SecureRandom
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.TimeoutException
|
||||
@ -41,15 +40,15 @@ class TunnelingBridgeReceiverService(val conf: FirewallConfiguration,
|
||||
private var connectSubscriber: Subscription? = null
|
||||
private var receiveSubscriber: Subscription? = null
|
||||
private var amqpControlClient: AMQPClient? = null
|
||||
private val controlLinkSSLConfiguration: SSLConfiguration
|
||||
private val floatListenerSSLConfiguration: SSLConfiguration
|
||||
private val controlLinkSSLConfiguration: MutualSslConfiguration
|
||||
private val floatListenerSSLConfiguration: MutualSslConfiguration
|
||||
private val expectedCertificateSubject: CordaX500Name
|
||||
private val secureRandom: SecureRandom = newSecureRandom()
|
||||
|
||||
init {
|
||||
statusFollower = ServiceStateCombiner(listOf(auditService, haService, filterService))
|
||||
controlLinkSSLConfiguration = conf.bridgeInnerConfig?.customSSLConfiguration ?: conf
|
||||
floatListenerSSLConfiguration = conf.bridgeInnerConfig?.customFloatOuterSSLConfiguration ?: conf
|
||||
controlLinkSSLConfiguration = conf.bridgeInnerConfig?.customSSLConfiguration ?: conf.p2pSslOptions
|
||||
floatListenerSSLConfiguration = conf.bridgeInnerConfig?.customFloatOuterSSLConfiguration ?: conf.p2pSslOptions
|
||||
expectedCertificateSubject = conf.bridgeInnerConfig!!.expectedCertificateSubject
|
||||
}
|
||||
|
||||
@ -58,15 +57,13 @@ class TunnelingBridgeReceiverService(val conf: FirewallConfiguration,
|
||||
statusSubscriber = statusFollower.activeChange.subscribe({
|
||||
if (it) {
|
||||
val floatAddresses = conf.bridgeInnerConfig!!.floatAddresses
|
||||
val controlLinkKeyStore = controlLinkSSLConfiguration.loadSslKeyStore().internal
|
||||
val controLinkKeyStorePrivateKeyPassword = controlLinkSSLConfiguration.keyStorePassword
|
||||
val controlLinkTrustStore = controlLinkSSLConfiguration.loadTrustStore().internal
|
||||
val controlLinkKeyStore = controlLinkSSLConfiguration.keyStore.get()
|
||||
val controlLinkTrustStore = controlLinkSSLConfiguration.trustStore.get()
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val userName: String? = null
|
||||
override val password: String? = null
|
||||
override val keyStore: KeyStore = controlLinkKeyStore
|
||||
override val keyStorePrivateKeyPassword: CharArray = controLinkKeyStorePrivateKeyPassword.toCharArray()
|
||||
override val trustStore: KeyStore = controlLinkTrustStore
|
||||
override val keyStore = controlLinkKeyStore
|
||||
override val trustStore = controlLinkTrustStore
|
||||
override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail
|
||||
override val maxMessageSize: Int = maximumMessageSize
|
||||
override val trace: Boolean = conf.enableAMQPPacketTrace
|
||||
@ -124,12 +121,12 @@ class TunnelingBridgeReceiverService(val conf: FirewallConfiguration,
|
||||
auditService.statusChangeEvent("Connection change on float control port $connectionChange")
|
||||
if (connectionChange.connected) {
|
||||
val (freshKeyStorePassword, freshKeyStoreKeyPassword, recodedKeyStore) = recodeKeyStore(floatListenerSSLConfiguration)
|
||||
val trustStoreBytes = floatListenerSSLConfiguration.trustStoreFile.readAll()
|
||||
val trustStoreBytes = floatListenerSSLConfiguration.trustStore.path.readAll()
|
||||
val activateMessage = ActivateFloat(recodedKeyStore,
|
||||
freshKeyStorePassword,
|
||||
freshKeyStoreKeyPassword,
|
||||
trustStoreBytes,
|
||||
floatListenerSSLConfiguration.trustStorePassword.toCharArray())
|
||||
floatListenerSSLConfiguration.trustStore.password.toCharArray())
|
||||
val amqpActivateMessage = amqpControlClient!!.createMessage(activateMessage.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes,
|
||||
FLOAT_CONTROL_TOPIC,
|
||||
expectedCertificateSubject.toString(),
|
||||
@ -150,9 +147,9 @@ class TunnelingBridgeReceiverService(val conf: FirewallConfiguration,
|
||||
}
|
||||
|
||||
// Recode KeyStore to use a fresh random password for entries and overall
|
||||
private fun recodeKeyStore(sslConfiguration: SSLConfiguration): Triple<CharArray, CharArray, ByteArray> {
|
||||
val keyStoreOriginal = sslConfiguration.loadSslKeyStore().internal
|
||||
val originalKeyStorePassword = sslConfiguration.keyStorePassword.toCharArray()
|
||||
private fun recodeKeyStore(sslConfiguration: MutualSslConfiguration): Triple<CharArray, CharArray, ByteArray> {
|
||||
val keyStoreOriginal = sslConfiguration.keyStore.get().value.internal
|
||||
val originalKeyStorePassword = sslConfiguration.keyStore.password.toCharArray()
|
||||
val freshKeyStorePassword = CharArray(20) { secureRandom.nextInt(0xD800).toChar() } // Stick to single character Unicode range
|
||||
val freshPrivateKeyPassword = CharArray(20) { secureRandom.nextInt(0xD800).toChar() } // Stick to single character Unicode range
|
||||
for (alias in keyStoreOriginal.aliases()) {
|
||||
|
@ -23,7 +23,7 @@ class DirectBridgeSenderService(val conf: FirewallConfiguration,
|
||||
private val statusFollower: ServiceStateCombiner
|
||||
private var statusSubscriber: Subscription? = null
|
||||
private var listenerActiveSubscriber: Subscription? = null
|
||||
private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf, conf.outboundConfig!!.socksProxyConfig, maxMessageSize, { ForwardingArtemisMessageClient(artemisConnectionService) })
|
||||
private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf.p2pSslOptions, conf.outboundConfig!!.socksProxyConfig, maxMessageSize, { ForwardingArtemisMessageClient(artemisConnectionService) })
|
||||
|
||||
init {
|
||||
statusFollower = ServiceStateCombiner(listOf(auditService, artemisConnectionService, haService))
|
||||
|
@ -7,9 +7,8 @@ import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.exists
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.node.services.AttachmentId
|
||||
import net.corda.nodeapi.internal.*
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.crypto.*
|
||||
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
@ -33,7 +32,7 @@ fun createNetworkParams(baseDirectory: Path): Int {
|
||||
maxMessageSize = 10485760,
|
||||
maxTransactionSize = 40000,
|
||||
epoch = 1,
|
||||
whitelistedContractImplementations = emptyMap<String, List<AttachmentId>>()
|
||||
whitelistedContractImplementations = emptyMap()
|
||||
)
|
||||
val copier = NetworkParametersCopier(networkParameters, overwriteFile = true)
|
||||
copier.install(baseDirectory)
|
||||
@ -55,18 +54,22 @@ fun createAndLoadConfigFromResource(baseDirectory: Path, configResource: String)
|
||||
return config
|
||||
}
|
||||
|
||||
fun SSLConfiguration.createBridgeKeyStores(legalName: CordaX500Name,
|
||||
rootCert: X509Certificate = DEV_ROOT_CA.certificate,
|
||||
intermediateCa: CertificateAndKeyPair = DEV_INTERMEDIATE_CA) {
|
||||
fun FirewallConfiguration.createBridgeKeyStores(legalName: CordaX500Name,
|
||||
rootCert: X509Certificate = DEV_ROOT_CA.certificate,
|
||||
intermediateCa: CertificateAndKeyPair = DEV_INTERMEDIATE_CA) = p2pSslOptions.createBridgeKeyStores(legalName, rootCert, intermediateCa)
|
||||
|
||||
certificatesDirectory.createDirectories()
|
||||
if (!trustStoreFile.exists()) {
|
||||
loadKeyStore(javaClass.classLoader.getResourceAsStream("certificates/${DEV_CA_TRUST_STORE_FILE}"), DEV_CA_TRUST_STORE_PASS).save(trustStoreFile, trustStorePassword)
|
||||
fun MutualSslConfiguration.createBridgeKeyStores(legalName: CordaX500Name,
|
||||
rootCert: X509Certificate = DEV_ROOT_CA.certificate,
|
||||
intermediateCa: CertificateAndKeyPair = DEV_INTERMEDIATE_CA) {
|
||||
|
||||
if (!trustStore.path.exists()) {
|
||||
val trustStore = trustStore.get(true)
|
||||
loadDevCaTrustStore().copyTo(trustStore)
|
||||
}
|
||||
|
||||
val (nodeCaCert, nodeCaKeyPair) = createDevNodeCa(intermediateCa, legalName)
|
||||
|
||||
val sslKeyStore = loadSslKeyStore(createNew = true)
|
||||
val sslKeyStore = keyStore.get(createNew = true)
|
||||
sslKeyStore.update {
|
||||
val tlsKeyPair = generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
|
||||
val tlsCert = X509Utilities.createCertificate(CertificateType.TLS, nodeCaCert, nodeCaKeyPair, legalName.x500Principal, tlsKeyPair.public)
|
||||
|
@ -62,23 +62,23 @@ class ConfigTest {
|
||||
fun `Load overridden cert config`() {
|
||||
val configResource = "/net/corda/bridge/custombasecerts/firewall.conf"
|
||||
val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
|
||||
assertEquals(Paths.get("customcerts/mysslkeystore.jks"), config.sslKeystore)
|
||||
assertEquals(Paths.get("customcerts/mytruststore.jks"), config.trustStoreFile)
|
||||
assertEquals(Paths.get("customcerts/mysslkeystore.jks"), config.p2pSslOptions.keyStore.path)
|
||||
assertEquals(Paths.get("customcerts/mytruststore.jks"), config.p2pSslOptions.trustStore.path)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Load custom inner certificate config`() {
|
||||
val configResource = "/net/corda/bridge/separatedwithcustomcerts/bridge/firewall.conf"
|
||||
val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
|
||||
assertEquals(Paths.get("outboundcerts/outboundkeys.jks"), config.outboundConfig!!.customSSLConfiguration!!.sslKeystore)
|
||||
assertEquals(Paths.get("outboundcerts/outboundtrust.jks"), config.outboundConfig!!.customSSLConfiguration!!.trustStoreFile)
|
||||
assertEquals("outboundkeypassword", config.outboundConfig!!.customSSLConfiguration!!.keyStorePassword)
|
||||
assertEquals("outboundtrustpassword", config.outboundConfig!!.customSSLConfiguration!!.trustStorePassword)
|
||||
assertEquals(Paths.get("outboundcerts/outboundkeys.jks"), config.outboundConfig!!.customSSLConfiguration!!.keyStore.path)
|
||||
assertEquals(Paths.get("outboundcerts/outboundtrust.jks"), config.outboundConfig!!.customSSLConfiguration!!.trustStore.path)
|
||||
assertEquals("outboundkeypassword", config.outboundConfig!!.customSSLConfiguration!!.keyStore.password)
|
||||
assertEquals("outboundtrustpassword", config.outboundConfig!!.customSSLConfiguration!!.trustStore.password)
|
||||
assertNull(config.inboundConfig)
|
||||
assertEquals(Paths.get("tunnelcerts/tunnelkeys.jks"), config.bridgeInnerConfig!!.customSSLConfiguration!!.sslKeystore)
|
||||
assertEquals(Paths.get("tunnelcerts/tunneltrust.jks"), config.bridgeInnerConfig!!.customSSLConfiguration!!.trustStoreFile)
|
||||
assertEquals("tunnelkeypassword", config.bridgeInnerConfig!!.customSSLConfiguration!!.keyStorePassword)
|
||||
assertEquals("tunneltrustpassword", config.bridgeInnerConfig!!.customSSLConfiguration!!.trustStorePassword)
|
||||
assertEquals(Paths.get("tunnelcerts/tunnelkeys.jks"), config.bridgeInnerConfig!!.customSSLConfiguration!!.keyStore.path)
|
||||
assertEquals(Paths.get("tunnelcerts/tunneltrust.jks"), config.bridgeInnerConfig!!.customSSLConfiguration!!.trustStore.path)
|
||||
assertEquals("tunnelkeypassword", config.bridgeInnerConfig!!.customSSLConfiguration!!.keyStore.password)
|
||||
assertEquals("tunneltrustpassword", config.bridgeInnerConfig!!.customSSLConfiguration!!.trustStore.password)
|
||||
assertNull(config.floatOuterConfig)
|
||||
}
|
||||
|
||||
@ -86,15 +86,15 @@ class ConfigTest {
|
||||
fun `Load custom outer certificate config`() {
|
||||
val configResource = "/net/corda/bridge/separatedwithcustomcerts/float/firewall.conf"
|
||||
val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
|
||||
assertEquals(Paths.get("inboundcerts/inboundkeys.jks"), config.inboundConfig!!.customSSLConfiguration!!.sslKeystore)
|
||||
assertEquals(Paths.get("inboundcerts/inboundtrust.jks"), config.inboundConfig!!.customSSLConfiguration!!.trustStoreFile)
|
||||
assertEquals("inboundkeypassword", config.inboundConfig!!.customSSLConfiguration!!.keyStorePassword)
|
||||
assertEquals("inboundtrustpassword", config.inboundConfig!!.customSSLConfiguration!!.trustStorePassword)
|
||||
assertEquals(Paths.get("inboundcerts/inboundkeys.jks"), config.inboundConfig!!.customSSLConfiguration!!.keyStore.path)
|
||||
assertEquals(Paths.get("inboundcerts/inboundtrust.jks"), config.inboundConfig!!.customSSLConfiguration!!.trustStore.path)
|
||||
assertEquals("inboundkeypassword", config.inboundConfig!!.customSSLConfiguration!!.keyStore.password)
|
||||
assertEquals("inboundtrustpassword", config.inboundConfig!!.customSSLConfiguration!!.trustStore.password)
|
||||
assertNull(config.outboundConfig)
|
||||
assertEquals(Paths.get("tunnelcerts/tunnelkeys.jks"), config.floatOuterConfig!!.customSSLConfiguration!!.sslKeystore)
|
||||
assertEquals(Paths.get("tunnelcerts/tunneltrust.jks"), config.floatOuterConfig!!.customSSLConfiguration!!.trustStoreFile)
|
||||
assertEquals("tunnelkeypassword", config.floatOuterConfig!!.customSSLConfiguration!!.keyStorePassword)
|
||||
assertEquals("tunneltrustpassword", config.floatOuterConfig!!.customSSLConfiguration!!.trustStorePassword)
|
||||
assertEquals(Paths.get("tunnelcerts/tunnelkeys.jks"), config.floatOuterConfig!!.customSSLConfiguration!!.keyStore.path)
|
||||
assertEquals(Paths.get("tunnelcerts/tunneltrust.jks"), config.floatOuterConfig!!.customSSLConfiguration!!.trustStore.path)
|
||||
assertEquals("tunnelkeypassword", config.floatOuterConfig!!.customSSLConfiguration!!.keyStore.password)
|
||||
assertEquals("tunneltrustpassword", config.floatOuterConfig!!.customSSLConfiguration!!.trustStore.password)
|
||||
assertNull(config.bridgeInnerConfig)
|
||||
}
|
||||
|
||||
|
@ -61,7 +61,6 @@ public class CordaRPCJavaClientTest extends NodeBasedTest {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
node = startNode(ALICE_NAME, 1000, singletonList(rpcUser));
|
||||
client = new CordaRPCClient(requireNonNull(node.getNode().getConfiguration().getRpcOptions().getAddress()));
|
||||
}
|
||||
|
@ -291,7 +291,7 @@ class CordaRPCClient private constructor(
|
||||
sslConfiguration: ClientRpcSslOptions? = null,
|
||||
classLoader: ClassLoader? = null
|
||||
): CordaRPCClient {
|
||||
return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, null, classLoader, haAddressPool)
|
||||
return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, classLoader, haAddressPool)
|
||||
}
|
||||
}
|
||||
|
||||
@ -309,7 +309,7 @@ class CordaRPCClient private constructor(
|
||||
|
||||
private fun getRpcClient(): RPCClient<CordaRPCOps> {
|
||||
return when {
|
||||
// Client->RPC broker
|
||||
// Client->RPC broker
|
||||
haAddressPool.isEmpty() -> RPCClient(
|
||||
rpcConnectorTcpTransport(hostAndPort, config = sslConfiguration),
|
||||
configuration,
|
||||
|
@ -16,6 +16,13 @@ fun createCordaRPCClientWithSslAndClassLoader(
|
||||
classLoader: ClassLoader? = null
|
||||
) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)
|
||||
|
||||
fun createCordaRPCClientWithSslAndClassLoader(
|
||||
haAddressPool: List<NetworkHostAndPort>,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
sslConfiguration: ClientRpcSslOptions? = null,
|
||||
classLoader: ClassLoader? = null
|
||||
) = CordaRPCClient.createWithSslAndClassLoader(haAddressPool, configuration, sslConfiguration, classLoader)
|
||||
|
||||
fun CordaRPCOps.drainAndShutdown(): Observable<Unit> {
|
||||
|
||||
setFlowsDrainingModeEnabled(true)
|
||||
|
@ -5,6 +5,9 @@ package net.corda.core.internal
|
||||
import net.corda.core.DeleteForDJVM
|
||||
import net.corda.core.KeepForDJVM
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.node.ServicesForResolution
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.UntrustworthyData
|
||||
|
@ -96,6 +96,12 @@ data class StateMachineTransactionMapping(val stateMachineRunId: StateMachineRun
|
||||
|
||||
/** RPC operations that the node exposes to clients. */
|
||||
interface CordaRPCOps : RPCOps {
|
||||
/**
|
||||
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed
|
||||
* to be present.
|
||||
*/
|
||||
override val protocolVersion: Int get() = nodeInfo().platformVersion
|
||||
|
||||
/** Returns a list of currently in-progress state machine infos. */
|
||||
fun stateMachinesSnapshot(): List<StateMachineInfo>
|
||||
|
||||
|
@ -91,8 +91,8 @@ class FlowWorkerTest {
|
||||
// create test certificates
|
||||
config.configureWithDevSSLCertificate()
|
||||
|
||||
val trustRoot = config.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
|
||||
val nodeCa = config.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
|
||||
val trustRoot = config.p2pSslOptions.trustStore.get().query { getCertificate(X509Utilities.CORDA_ROOT_CA) }
|
||||
val nodeCa = config.signingCertificateStore.get().query { getCertificate(X509Utilities.CORDA_CLIENT_CA) }
|
||||
|
||||
val broker = createFlowWorkerBroker(config, networkParameters.maxMessageSize)
|
||||
val bridgeControlListener = createBridgeControlListener(config, networkParameters.maxMessageSize)
|
||||
@ -146,8 +146,8 @@ class FlowWorkerTest {
|
||||
// create test certificates
|
||||
bankAConfig.configureWithDevSSLCertificate()
|
||||
|
||||
val bankATrustRoot = bankAConfig.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
|
||||
val bankANodeCa = bankAConfig.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
|
||||
val bankATrustRoot = bankAConfig.p2pSslOptions.trustStore.get().query { getCertificate(X509Utilities.CORDA_ROOT_CA) }
|
||||
val bankANodeCa = bankAConfig.signingCertificateStore.get().query { getCertificate(X509Utilities.CORDA_CLIENT_CA) }
|
||||
|
||||
val bankABroker = createFlowWorkerBroker(bankAConfig, networkParameters.maxMessageSize)
|
||||
val bankABridgeControlListener = createBridgeControlListener(bankAConfig, networkParameters.maxMessageSize)
|
||||
@ -166,8 +166,8 @@ class FlowWorkerTest {
|
||||
// create test certificates
|
||||
bankBConfig.configureWithDevSSLCertificate()
|
||||
|
||||
val bankBTrustRoot = bankBConfig.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
|
||||
val bankBNodeCa = bankBConfig.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
|
||||
val bankBTrustRoot = bankBConfig.p2pSslOptions.trustStore.get().query { getCertificate(X509Utilities.CORDA_ROOT_CA) }
|
||||
val bankBNodeCa = bankBConfig.signingCertificateStore.get().query { getCertificate(X509Utilities.CORDA_CLIENT_CA) }
|
||||
// NetworkParametersCopier(networkParameters).install(bankBConfig.baseDirectory)
|
||||
|
||||
val bankBBroker = createFlowWorkerBroker(bankBConfig, networkParameters.maxMessageSize)
|
||||
@ -234,13 +234,13 @@ class FlowWorkerTest {
|
||||
}
|
||||
|
||||
private fun createBridgeControlListener(config: NodeConfiguration, maxMessageSize: Int): BridgeControlListener {
|
||||
val bridgeControlListener = BridgeControlListener(config, config.messagingServerAddress!!, maxMessageSize)
|
||||
val bridgeControlListener = BridgeControlListener(config.p2pSslOptions, config.messagingServerAddress!!, maxMessageSize)
|
||||
bridgeControlListener.start()
|
||||
return bridgeControlListener
|
||||
}
|
||||
|
||||
private fun createArtemisClient(config: NodeConfiguration, queueAddress: String): Triple<ClientSession, ClientConsumer, ClientProducer> {
|
||||
val artemisClient = ArtemisMessagingClient(config, config.messagingServerAddress!!, MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(config.p2pSslOptions, config.messagingServerAddress!!, MAX_MESSAGE_SIZE)
|
||||
val started = artemisClient.start()
|
||||
started.session.createQueue(queueAddress, RoutingType.ANYCAST, queueAddress, true)
|
||||
return Triple(started.session, started.session.createConsumer(queueAddress), started.session.createProducer())
|
||||
|
@ -43,7 +43,7 @@ class FlowWorker(flowWorkerId: String, private val flowWorkerServiceHub: FlowWor
|
||||
flowWorkerServiceHub.start()
|
||||
runOnStop += { flowWorkerServiceHub.stop() }
|
||||
|
||||
val flowWorkerMessagingClient = ArtemisMessagingClient(flowWorkerServiceHub.configuration, flowWorkerServiceHub.configuration.messagingServerAddress!!, flowWorkerServiceHub.networkParameters.maxMessageSize)
|
||||
val flowWorkerMessagingClient = ArtemisMessagingClient(flowWorkerServiceHub.configuration.p2pSslOptions, flowWorkerServiceHub.configuration.messagingServerAddress!!, flowWorkerServiceHub.networkParameters.maxMessageSize)
|
||||
runOnStop += { flowWorkerMessagingClient.stop() }
|
||||
|
||||
val session = flowWorkerMessagingClient.start().session
|
||||
|
@ -59,8 +59,8 @@ data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : Intern
|
||||
fun startRpcFlowWorker(myLegalName: CordaX500Name, rpcUsers: List<net.corda.testing.node.User>, numberOfFlowWorkers: Int = 1): CordaFuture<RpcFlowWorkerHandle> {
|
||||
val (config, rpcWorkerConfig, flowWorkerConfigs) = generateConfigs(myLegalName, rpcUsers, numberOfFlowWorkers)
|
||||
|
||||
val trustRoot = rpcWorkerConfig.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
|
||||
val nodeCa = rpcWorkerConfig.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
|
||||
val trustRoot = rpcWorkerConfig.p2pSslOptions.trustStore.get().query { getCertificate(X509Utilities.CORDA_ROOT_CA) }
|
||||
val nodeCa = rpcWorkerConfig.signingCertificateStore.get().query { getCertificate(X509Utilities.CORDA_CLIENT_CA) }
|
||||
|
||||
val ourKeyPair = Crypto.generateKeyPair()
|
||||
val ourParty = Party(myLegalName, ourKeyPair.public)
|
||||
@ -151,9 +151,9 @@ private fun createRpcWorkerBroker(config: NodeConfiguration, maxMessageSize: Int
|
||||
val rpcOptions = config.rpcOptions
|
||||
val securityManager = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(config.rpcUsers))
|
||||
val broker = if (rpcOptions.useSsl) {
|
||||
ArtemisRpcBroker.withSsl(config, rpcOptions.address, rpcOptions.adminAddress, rpcOptions.sslConfig!!, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
|
||||
ArtemisRpcBroker.withSsl(config.p2pSslOptions, rpcOptions.address, rpcOptions.adminAddress, rpcOptions.sslConfig!!, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
|
||||
} else {
|
||||
ArtemisRpcBroker.withoutSsl(config, rpcOptions.address, rpcOptions.adminAddress, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
|
||||
ArtemisRpcBroker.withoutSsl(config.p2pSslOptions, rpcOptions.address, rpcOptions.adminAddress, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
|
||||
}
|
||||
broker.start()
|
||||
return broker
|
||||
@ -180,7 +180,7 @@ private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, networ
|
||||
}
|
||||
|
||||
private fun createBridgeControlListener(config: NodeConfiguration, maxMessageSize: Int): BridgeControlListener {
|
||||
val bridgeControlListener = BridgeControlListener(config, config.messagingServerAddress!!, maxMessageSize)
|
||||
val bridgeControlListener = BridgeControlListener(config.p2pSslOptions, config.messagingServerAddress!!, maxMessageSize)
|
||||
bridgeControlListener.start()
|
||||
return bridgeControlListener
|
||||
}
|
@ -56,14 +56,13 @@ class CordaRpcWorkerOps(
|
||||
const val RPC_WORKER_QUEUE_ADDRESS_PREFIX = "${ArtemisMessagingComponent.INTERNAL_PREFIX}rpc.worker."
|
||||
}
|
||||
|
||||
override val protocolVersion: Int = 1000
|
||||
private val flowWorkerQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}${services.myInfo.legalIdentities[0].owningKey.toStringShort()}"
|
||||
|
||||
private val rpcWorkerQueueAddress = "$RPC_WORKER_QUEUE_ADDRESS_PREFIX${services.myInfo.legalIdentities[0].owningKey.toStringShort()}"
|
||||
private val rpcWorkerId = UUID.randomUUID().toString()
|
||||
private val rpcWorkerQueueName = "$rpcWorkerQueueAddress.$rpcWorkerId"
|
||||
|
||||
private val artemisClient = ArtemisMessagingClient(services.configuration, services.configuration.messagingServerAddress!!, services.networkParameters.maxMessageSize)
|
||||
private val artemisClient = ArtemisMessagingClient(services.configuration.p2pSslOptions, services.configuration.messagingServerAddress!!, services.networkParameters.maxMessageSize)
|
||||
private lateinit var session: ClientSession
|
||||
private lateinit var producer: ClientProducer
|
||||
|
||||
|
@ -77,8 +77,8 @@ class Main : Runnable {
|
||||
val ourKeyPair = getIdentity()
|
||||
val myInfo = getNodeInfo()
|
||||
|
||||
val trustRoot = rpcWorkerConfig.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
|
||||
val nodeCa = rpcWorkerConfig.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
|
||||
val trustRoot = rpcWorkerConfig.p2pSslOptions.trustStore.get().query { getCertificate(X509Utilities.CORDA_ROOT_CA) }
|
||||
val nodeCa = rpcWorkerConfig.signingCertificateStore.get().query { getCertificate(X509Utilities.CORDA_CLIENT_CA) }
|
||||
|
||||
val signedNetworkParameters = NetworkParametersReader(trustRoot, null, rpcWorkerConfig.baseDirectory).read()
|
||||
val rpcWorkerBroker = createRpcWorkerBroker(rpcWorkerConfig, signedNetworkParameters.networkParameters.maxMessageSize)
|
||||
@ -101,9 +101,9 @@ class Main : Runnable {
|
||||
val rpcOptions = config.rpcOptions
|
||||
val securityManager = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(config.rpcUsers))
|
||||
val broker = if (rpcOptions.useSsl) {
|
||||
ArtemisRpcBroker.withSsl(config, rpcOptions.address, rpcOptions.adminAddress, rpcOptions.sslConfig!!, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
|
||||
ArtemisRpcBroker.withSsl(config.p2pSslOptions, rpcOptions.address, rpcOptions.adminAddress, rpcOptions.sslConfig!!, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
|
||||
} else {
|
||||
ArtemisRpcBroker.withoutSsl(config, rpcOptions.address, rpcOptions.adminAddress, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
|
||||
ArtemisRpcBroker.withoutSsl(config.p2pSslOptions, rpcOptions.address, rpcOptions.adminAddress, securityManager, maxMessageSize, false, config.baseDirectory / "artemis", false)
|
||||
}
|
||||
broker.start()
|
||||
return broker
|
||||
@ -126,9 +126,9 @@ class RpcWorker(private val rpcWorkerServiceHub: RpcWorkerServiceHub, private va
|
||||
rpcThreadPoolSize = rpcWorkerServiceHub.configuration.enterpriseConfiguration.tuning.rpcThreadPoolSize
|
||||
)
|
||||
val securityManager = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(rpcWorkerServiceHub.configuration.rpcUsers))
|
||||
val nodeName = CordaX500Name.build(rpcWorkerServiceHub.configuration.loadSslKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_TLS).subjectX500Principal)
|
||||
val nodeName = CordaX500Name.build(rpcWorkerServiceHub.configuration.p2pSslOptions.keyStore.get().query { getCertificate(X509Utilities.CORDA_CLIENT_TLS).subjectX500Principal })
|
||||
|
||||
val internalRpcMessagingClient = InternalRPCMessagingClient(rpcWorkerServiceHub.configuration, rpcWorkerServiceHub.configuration.rpcOptions.adminAddress, Node.MAX_RPC_MESSAGE_SIZE, nodeName, rpcServerConfiguration)
|
||||
val internalRpcMessagingClient = InternalRPCMessagingClient(rpcWorkerServiceHub.configuration.p2pSslOptions, rpcWorkerServiceHub.configuration.rpcOptions.adminAddress, Node.MAX_RPC_MESSAGE_SIZE, nodeName, rpcServerConfiguration)
|
||||
internalRpcMessagingClient.init(rpcWorkerServiceHub.rpcOps, securityManager)
|
||||
internalRpcMessagingClient.start(serverControl)
|
||||
|
||||
|
@ -6,10 +6,8 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
|
||||
|
||||
interface ArtemisSessionProvider {
|
||||
fun start(): ArtemisMessagingClient.Started
|
||||
@ -19,12 +17,15 @@ interface ArtemisSessionProvider {
|
||||
|
||||
class ArtemisMessagingClient(private val config: MutualSslConfiguration,
|
||||
private val serverAddress: NetworkHostAndPort,
|
||||
private val maxMessageSize: Int) : ArtemisSessionProvider {
|
||||
private val maxMessageSize: Int,
|
||||
private val autoCommitSends: Boolean = true,
|
||||
private val autoCommitAcks: Boolean = true,
|
||||
private val confirmationWindowSize: Int = -1) : ArtemisSessionProvider {
|
||||
companion object {
|
||||
private val log = loggerFor<ArtemisMessagingClient>()
|
||||
}
|
||||
|
||||
class Started(val sessionFactory: ClientSessionFactory, val session: ClientSession, val producer: ClientProducer)
|
||||
class Started(val serverLocator: ServerLocator, val sessionFactory: ClientSessionFactory, val session: ClientSession, val producer: ClientProducer)
|
||||
|
||||
override var started: Started? = null
|
||||
private set
|
||||
@ -41,6 +42,7 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
|
||||
clientFailureCheckPeriod = 30000
|
||||
minLargeMessageSize = maxMessageSize
|
||||
isUseGlobalPools = nodeSerializationEnv != null
|
||||
confirmationWindowSize = this@ArtemisMessagingClient.confirmationWindowSize
|
||||
addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize))
|
||||
}
|
||||
val sessionFactory = locator.createSessionFactory()
|
||||
@ -48,11 +50,11 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
|
||||
// using our TLS certificate.
|
||||
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
|
||||
// size of 1MB is acknowledged.
|
||||
val session = sessionFactory!!.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
val session = sessionFactory!!.createSession(NODE_P2P_USER, NODE_P2P_USER, false, autoCommitSends, autoCommitAcks, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
session.start()
|
||||
// Create a general purpose producer.
|
||||
val producer = session.createProducer()
|
||||
return Started(sessionFactory, session, producer).also { started = it }
|
||||
return Started(locator, sessionFactory, session, producer).also { started = it }
|
||||
}
|
||||
|
||||
override fun stop() = synchronized(this) {
|
||||
@ -62,6 +64,7 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
|
||||
session.commit()
|
||||
// Closing the factory closes all the sessions it produced as well.
|
||||
sessionFactory.close()
|
||||
serverLocator.close()
|
||||
}
|
||||
started = null
|
||||
}
|
||||
|
@ -38,22 +38,23 @@ import kotlin.concurrent.withLock
|
||||
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
|
||||
class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig? = null, maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
|
||||
|
||||
private val lock = ReentrantLock()
|
||||
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
|
||||
|
||||
private class AMQPConfigurationImpl private constructor(override val keyStore: CertificateStore,
|
||||
override val trustStore: CertificateStore,
|
||||
override val socksProxyConfig: SocksProxyConfig?,
|
||||
override val maxMessageSize: Int) : AMQPConfiguration {
|
||||
constructor(config: MutualSslConfiguration, maxMessageSize: Int) : this(config.keyStore.get(), config.trustStore.get(), maxMessageSize)
|
||||
constructor(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig?, maxMessageSize: Int) : this(config.keyStore.get(), config.trustStore.get(), socksProxyConfig, maxMessageSize)
|
||||
}
|
||||
|
||||
private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, socksProxyConfig, maxMessageSize)
|
||||
private var sharedEventLoopGroup: EventLoopGroup? = null
|
||||
private var artemis: ArtemisSessionProvider? = null
|
||||
|
||||
constructor(config: MutualSslConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int) : this(config, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
|
||||
constructor(config: MutualSslConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, socksProxyConfig: SocksProxyConfig? = null) : this(config, socksProxyConfig, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
|
||||
|
||||
companion object {
|
||||
private const val NUM_BRIDGE_THREADS = 0 // Default sized pool
|
||||
|
@ -11,30 +11,50 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOT
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisSessionProvider
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.util.*
|
||||
|
||||
class BridgeControlListener(val config: MutualSslConfiguration,
|
||||
socksProxyConfig: SocksProxyConfig? = null,
|
||||
maxMessageSize: Int,
|
||||
val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable {
|
||||
private val bridgeId: String = UUID.randomUUID().toString()
|
||||
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, maxMessageSize, artemisMessageClientFactory)
|
||||
private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
|
||||
private val bridgeNotifyQueue = "$BRIDGE_NOTIFY.$bridgeId"
|
||||
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, socksProxyConfig, maxMessageSize,
|
||||
artemisMessageClientFactory)
|
||||
private val validInboundQueues = mutableSetOf<String>()
|
||||
private var artemis: ArtemisSessionProvider? = null
|
||||
private var controlConsumer: ClientConsumer? = null
|
||||
private var notifyConsumer: ClientConsumer? = null
|
||||
|
||||
constructor(config: MutualSslConfiguration,
|
||||
p2pAddress: NetworkHostAndPort,
|
||||
maxMessageSize: Int) : this(config, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
|
||||
|
||||
maxMessageSize: Int,
|
||||
socksProxy: SocksProxyConfig? = null) : this(config, socksProxy, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
|
||||
|
||||
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
val active: Boolean
|
||||
get() = validInboundQueues.isNotEmpty()
|
||||
|
||||
private val _activeChange = PublishSubject.create<Boolean>().toSerialized()
|
||||
val activeChange: Observable<Boolean>
|
||||
get() = _activeChange
|
||||
|
||||
fun start() {
|
||||
stop()
|
||||
bridgeManager.start()
|
||||
@ -43,8 +63,21 @@ class BridgeControlListener(val config: MutualSslConfiguration,
|
||||
artemis.start()
|
||||
val artemisClient = artemis.started!!
|
||||
val artemisSession = artemisClient.session
|
||||
val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
|
||||
artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue)
|
||||
registerBridgeControlListener(artemisSession)
|
||||
registerBridgeDuplicateChecker(artemisSession)
|
||||
val startupMessage = BridgeControl.BridgeToNodeSnapshotRequest(bridgeId).serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
|
||||
val bridgeRequest = artemisSession.createMessage(false)
|
||||
bridgeRequest.writeBodyBufferBytes(startupMessage)
|
||||
artemisClient.producer.send(BRIDGE_NOTIFY, bridgeRequest)
|
||||
}
|
||||
|
||||
private fun registerBridgeControlListener(artemisSession: ClientSession) {
|
||||
try {
|
||||
artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue)
|
||||
} catch (ex: ActiveMQQueueExistsException) {
|
||||
// Ignore if there is a queue still not cleaned up
|
||||
}
|
||||
|
||||
val control = artemisSession.createConsumer(bridgeControlQueue)
|
||||
controlConsumer = control
|
||||
control.setMessageHandler { msg ->
|
||||
@ -54,17 +87,44 @@ class BridgeControlListener(val config: MutualSslConfiguration,
|
||||
log.error("Unable to process bridge control message", ex)
|
||||
}
|
||||
}
|
||||
val startupMessage = BridgeControl.BridgeToNodeSnapshotRequest(bridgeId).serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
|
||||
val bridgeRequest = artemisSession.createMessage(false)
|
||||
bridgeRequest.writeBodyBufferBytes(startupMessage)
|
||||
artemisClient.producer.send(BRIDGE_NOTIFY, bridgeRequest)
|
||||
}
|
||||
|
||||
private fun registerBridgeDuplicateChecker(artemisSession: ClientSession) {
|
||||
try {
|
||||
artemisSession.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue)
|
||||
} catch (ex: ActiveMQQueueExistsException) {
|
||||
// Ignore if there is a queue still not cleaned up
|
||||
}
|
||||
val notify = artemisSession.createConsumer(bridgeNotifyQueue)
|
||||
notifyConsumer = notify
|
||||
notify.setMessageHandler { msg ->
|
||||
try {
|
||||
val data: ByteArray = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
|
||||
val notifyMessage = data.deserialize<BridgeControl.BridgeToNodeSnapshotRequest>(context = SerializationDefaults.P2P_CONTEXT)
|
||||
if (notifyMessage.bridgeIdentity != bridgeId) {
|
||||
log.error("Fatal Error! Two bridges have been configured simultaneously! Check the enterpriseConfiguration.externalBridge status")
|
||||
System.exit(1)
|
||||
}
|
||||
} catch (ex: Exception) {
|
||||
log.error("Unable to process bridge notification message", ex)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
if (active) {
|
||||
_activeChange.onNext(false)
|
||||
}
|
||||
validInboundQueues.clear()
|
||||
controlConsumer?.close()
|
||||
controlConsumer = null
|
||||
artemis?.stop()
|
||||
notifyConsumer?.close()
|
||||
notifyConsumer = null
|
||||
artemis?.apply {
|
||||
started?.session?.deleteQueue(bridgeControlQueue)
|
||||
started?.session?.deleteQueue(bridgeNotifyQueue)
|
||||
stop()
|
||||
}
|
||||
artemis = null
|
||||
bridgeManager.stop()
|
||||
}
|
||||
@ -100,7 +160,11 @@ class BridgeControlListener(val config: MutualSslConfiguration,
|
||||
for (outQueue in controlMessage.sendQueues) {
|
||||
bridgeManager.deployBridge(outQueue.queueName, outQueue.targets.first(), outQueue.legalNames.toSet())
|
||||
}
|
||||
val wasActive = active
|
||||
validInboundQueues.addAll(controlMessage.inboxQueues)
|
||||
if (!wasActive && active) {
|
||||
_activeChange.onNext(true)
|
||||
}
|
||||
}
|
||||
is BridgeControl.BridgeToNodeSnapshotRequest -> {
|
||||
log.error("Message from Bridge $controlMessage detected on wrong topic!")
|
||||
|
@ -60,7 +60,7 @@ class AMQPServer(val hostName: String,
|
||||
private val conf = parent.configuration
|
||||
|
||||
init {
|
||||
keyManagerFactory.init(conf.keyStore)
|
||||
keyManagerFactory.init(conf.keyStore.value.internal, conf.keyStore.password.toCharArray())
|
||||
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, conf.crlCheckSoftFail))
|
||||
}
|
||||
|
||||
|
@ -12,19 +12,15 @@ import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.internal.NodeStartup
|
||||
import net.corda.node.services.Permissions.Companion.startFlow
|
||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.NodeParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.internal.IntegrationTest
|
||||
import net.corda.testing.internal.IntegrationTestSchemas
|
||||
import net.corda.testing.internal.toDatabaseSchemaName
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.startNode
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Test
|
||||
|
@ -7,14 +7,25 @@ import net.corda.node.services.config.configureDevKeyAndTrustStores
|
||||
import net.corda.nodeapi.internal.crypto.CertificateType
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.testing.internal.IntegrationTest
|
||||
import net.corda.testing.internal.IntegrationTestSchemas
|
||||
import net.corda.testing.internal.toDatabaseSchemaName
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Test
|
||||
import javax.security.auth.x500.X500Principal
|
||||
|
||||
class NodeKeystoreCheckTest {
|
||||
class NodeKeystoreCheckTest : IntegrationTest() {
|
||||
companion object {
|
||||
@ClassRule
|
||||
@JvmField
|
||||
val databaseSchemas = IntegrationTestSchemas(ALICE_NAME.toDatabaseSchemaName(), DUMMY_NOTARY_NAME.toDatabaseSchemaName())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `starting node in non-dev mode with no key store`() {
|
||||
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
|
||||
|
@ -4,11 +4,15 @@ import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.config.EnterpriseConfiguration
|
||||
import net.corda.node.services.config.MutualExclusionConfiguration
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager
|
||||
import net.corda.nodeapi.internal.bridging.BridgeManager
|
||||
@ -24,11 +28,15 @@ import net.corda.testing.internal.rigorousMock
|
||||
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.junit.Assert.assertArrayEquals
|
||||
import org.junit.Ignore
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.util.*
|
||||
import kotlin.system.measureNanoTime
|
||||
import kotlin.system.measureTimeMillis
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class AMQPBridgeTest {
|
||||
@ -167,6 +175,72 @@ class AMQPBridgeTest {
|
||||
artemisServer.stop()
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Run only manually to check the throughput of the AMQP bridge")
|
||||
fun `AMQP full bridge throughput`() {
|
||||
val numMessages = 10000
|
||||
// Create local queue
|
||||
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
|
||||
val (artemisServer, artemisClient, bridgeManager) = createArtemis(sourceQueueName)
|
||||
|
||||
val artemis = artemisClient.started!!
|
||||
val queueName = ArtemisMessagingComponent.RemoteInboxAddress(BOB.publicKey).queueName
|
||||
|
||||
val (artemisRecServer, artemisRecClient) = createArtemisReceiver(amqpAddress, "artemisBridge")
|
||||
//artemisBridgeClient.started!!.session.createQueue(SimpleString(queueName), RoutingType.ANYCAST, SimpleString(queueName), true)
|
||||
|
||||
var numReceived = 0
|
||||
|
||||
artemisRecClient.started!!.session.createQueue(SimpleString(queueName), RoutingType.ANYCAST, SimpleString(queueName), true)
|
||||
val artemisConsumer = artemisRecClient.started!!.session.createConsumer(queueName)
|
||||
|
||||
val rubbishPayload = ByteArray(10 * 1024)
|
||||
var timeNanosCreateMessage = 0L
|
||||
var timeNanosSendMessage = 0L
|
||||
var timeMillisRead = 0L
|
||||
val simpleSourceQueueName = SimpleString(sourceQueueName)
|
||||
val totalTimeMillis = measureTimeMillis {
|
||||
repeat(numMessages) {
|
||||
var artemisMessage: ClientMessage? = null
|
||||
timeNanosCreateMessage += measureNanoTime {
|
||||
artemisMessage = artemis.session.createMessage(true).apply {
|
||||
putIntProperty("CountProp", it)
|
||||
writeBodyBufferBytes(rubbishPayload)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
}
|
||||
timeNanosSendMessage += measureNanoTime {
|
||||
artemis.producer.send(simpleSourceQueueName, artemisMessage, {})
|
||||
}
|
||||
}
|
||||
artemisClient.started!!.session.commit()
|
||||
|
||||
|
||||
timeMillisRead = measureTimeMillis {
|
||||
while (numReceived < numMessages) {
|
||||
val current = artemisConsumer.receive()
|
||||
val messageId = current.getIntProperty("CountProp")
|
||||
assertEquals(numReceived, messageId)
|
||||
++numReceived
|
||||
current.acknowledge()
|
||||
}
|
||||
}
|
||||
}
|
||||
println("Creating $numMessages messages took ${timeNanosCreateMessage / (1000 * 1000)} milliseconds")
|
||||
println("Sending $numMessages messages took ${timeNanosSendMessage / (1000 * 1000)} milliseconds")
|
||||
println("Receiving $numMessages messages took $timeMillisRead milliseconds")
|
||||
println("Total took $totalTimeMillis milliseconds")
|
||||
assertEquals(numMessages, numReceived)
|
||||
|
||||
bridgeManager.stop()
|
||||
artemisClient.stop()
|
||||
artemisServer.stop()
|
||||
artemisRecClient.stop()
|
||||
artemisRecServer.stop()
|
||||
}
|
||||
|
||||
|
||||
private fun createArtemis(sourceQueueName: String?): Triple<ArtemisMessagingServer, ArtemisMessagingClient, BridgeManager> {
|
||||
val baseDir = temporaryFolder.root.toPath() / "artemis"
|
||||
val certificatesDirectory = baseDir / "certificates"
|
||||
@ -181,10 +255,13 @@ class AMQPBridgeTest {
|
||||
doReturn(true).whenever(it).crlCheckSoftFail
|
||||
doReturn(artemisAddress).whenever(it).p2pAddress
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
||||
}
|
||||
artemisConfig.configureWithDevSSLCertificate()
|
||||
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE)
|
||||
|
||||
artemisServer.start()
|
||||
artemisClient.start()
|
||||
val bridgeManager = AMQPBridgeManager(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE)
|
||||
@ -198,6 +275,32 @@ class AMQPBridgeTest {
|
||||
return Triple(artemisServer, artemisClient, bridgeManager)
|
||||
}
|
||||
|
||||
|
||||
private fun createArtemisReceiver(targetAdress: NetworkHostAndPort, workingDir: String): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
||||
val baseDir = temporaryFolder.root.toPath() / workingDir
|
||||
val certificatesDirectory = baseDir / "certificates"
|
||||
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(baseDir).whenever(it).baseDirectory
|
||||
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
|
||||
doReturn(BOB_NAME).whenever(it).myLegalName
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
doReturn(targetAdress).whenever(it).p2pAddress
|
||||
doReturn("").whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
||||
}
|
||||
artemisConfig.configureWithDevSSLCertificate()
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", targetAdress.port), MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, targetAdress, MAX_MESSAGE_SIZE, confirmationWindowSize = 10 * 1024)
|
||||
artemisServer.start()
|
||||
artemisClient.start()
|
||||
|
||||
return Pair(artemisServer, artemisClient)
|
||||
|
||||
}
|
||||
|
||||
private fun createAMQPServer(maxMessageSize: Int = MAX_MESSAGE_SIZE): AMQPServer {
|
||||
val baseDir = temporaryFolder.root.toPath() / "server"
|
||||
val certificatesDirectory = baseDir / "certificates"
|
||||
|
@ -9,6 +9,8 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.services.config.EnterpriseConfiguration
|
||||
import net.corda.node.services.config.MutualExclusionConfiguration
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
@ -89,6 +91,22 @@ class ProtonWrapperTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `AMPQ Client fails to connect when crl soft fail check is disabled`() {
|
||||
val amqpServer = createServer(serverPort, CordaX500Name("Rogue 1", "London", "GB"),
|
||||
maxMessageSize = MAX_MESSAGE_SIZE, crlCheckSoftFail = false)
|
||||
amqpServer.use {
|
||||
amqpServer.start()
|
||||
val amqpClient = createClient()
|
||||
amqpClient.use {
|
||||
val clientConnected = amqpClient.onConnection.toFuture()
|
||||
amqpClient.start()
|
||||
val clientConnect = clientConnected.get()
|
||||
assertEquals(false, clientConnect.connected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `AMPQ Client refuses to connect to unexpected server`() {
|
||||
val amqpServer = createServer(serverPort, CordaX500Name("Rogue 1", "London", "GB"))
|
||||
@ -398,6 +416,7 @@ class ProtonWrapperTests {
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
doReturn(NetworkHostAndPort("0.0.0.0", artemisPort)).whenever(it).p2pAddress
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
||||
doReturn(true).whenever(it).crlCheckSoftFail
|
||||
}
|
||||
artemisConfig.configureWithDevSSLCertificate()
|
||||
@ -470,7 +489,7 @@ class ProtonWrapperTests {
|
||||
sharedThreadPool = sharedEventGroup)
|
||||
}
|
||||
|
||||
private fun createServer(port: Int, name: CordaX500Name = ALICE_NAME, maxMessageSize: Int = MAX_MESSAGE_SIZE): AMQPServer {
|
||||
private fun createServer(port: Int, name: CordaX500Name = ALICE_NAME, maxMessageSize: Int = MAX_MESSAGE_SIZE, crlCheckSoftFail: Boolean = true): AMQPServer {
|
||||
val baseDirectory = temporaryFolder.root.toPath() / "server"
|
||||
val certificatesDirectory = baseDirectory / "certificates"
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
@ -481,7 +500,7 @@ class ProtonWrapperTests {
|
||||
doReturn(name).whenever(it).myLegalName
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
doReturn(true).whenever(it).crlCheckSoftFail
|
||||
doReturn(crlCheckSoftFail).whenever(it).crlCheckSoftFail
|
||||
}
|
||||
serverConfig.configureWithDevSSLCertificate()
|
||||
|
||||
|
@ -26,6 +26,7 @@ import net.corda.nodeapi.internal.protonwrapper.netty.*
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.junit.After
|
||||
import org.junit.Assert.assertArrayEquals
|
||||
@ -33,7 +34,6 @@ import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.security.KeyStore
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class SocksTests {
|
||||
@ -266,11 +266,17 @@ class SocksTests {
|
||||
}
|
||||
|
||||
private fun createArtemisServerAndClient(): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
||||
val baseDirectory = temporaryFolder.root.toPath() / "artemis"
|
||||
val certificatesDirectory = baseDirectory / "certificates"
|
||||
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
|
||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory
|
||||
doReturn(baseDirectory).whenever(it).baseDirectory
|
||||
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
|
||||
doReturn(CHARLIE_NAME).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
doReturn(NetworkHostAndPort("0.0.0.0", artemisPort)).whenever(it).p2pAddress
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
||||
@ -278,27 +284,32 @@ class SocksTests {
|
||||
artemisConfig.configureWithDevSSLCertificate()
|
||||
|
||||
val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), MAX_MESSAGE_SIZE)
|
||||
val client = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE)
|
||||
val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE)
|
||||
server.start()
|
||||
client.start()
|
||||
return Pair(server, client)
|
||||
}
|
||||
|
||||
private fun createClient(): AMQPClient {
|
||||
val baseDirectory = temporaryFolder.root.toPath() / "client"
|
||||
val certificatesDirectory = baseDirectory / "certificates"
|
||||
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
|
||||
val clientConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(temporaryFolder.root.toPath() / "client").whenever(it).baseDirectory
|
||||
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
|
||||
doReturn(BOB_NAME).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
}
|
||||
clientConfig.configureWithDevSSLCertificate()
|
||||
|
||||
val clientTruststore = clientConfig.loadTrustStore().internal
|
||||
val clientKeystore = clientConfig.loadSslKeyStore().internal
|
||||
val clientTruststore = clientConfig.p2pSslOptions.trustStore.get()
|
||||
val clientKeystore = clientConfig.p2pSslOptions.keyStore.get()
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = clientKeystore
|
||||
override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = clientTruststore
|
||||
override val keyStore = clientKeystore
|
||||
override val trustStore = clientTruststore
|
||||
override val trace: Boolean = true
|
||||
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
|
||||
override val socksProxyConfig: SocksProxyConfig? = SocksProxyConfig(SocksProxyVersion.SOCKS5, NetworkHostAndPort("127.0.0.1", socksPort), null, null)
|
||||
@ -312,20 +323,25 @@ class SocksTests {
|
||||
}
|
||||
|
||||
private fun createSharedThreadsClient(sharedEventGroup: EventLoopGroup, id: Int): AMQPClient {
|
||||
val baseDirectory = temporaryFolder.root.toPath() / "client_%$id"
|
||||
val certificatesDirectory = baseDirectory / "certificates"
|
||||
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
|
||||
val clientConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(temporaryFolder.root.toPath() / "client_%$id").whenever(it).baseDirectory
|
||||
doReturn(baseDirectory).whenever(it).baseDirectory
|
||||
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
|
||||
doReturn(CordaX500Name(null, "client $id", "Corda", "London", null, "GB")).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
}
|
||||
clientConfig.configureWithDevSSLCertificate()
|
||||
|
||||
val clientTruststore = clientConfig.loadTrustStore().internal
|
||||
val clientKeystore = clientConfig.loadSslKeyStore().internal
|
||||
val clientTruststore = clientConfig.p2pSslOptions.trustStore.get()
|
||||
val clientKeystore = clientConfig.p2pSslOptions.keyStore.get()
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = clientKeystore
|
||||
override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = clientTruststore
|
||||
override val keyStore = clientKeystore
|
||||
override val trustStore = clientTruststore
|
||||
override val trace: Boolean = true
|
||||
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
|
||||
override val socksProxyConfig: SocksProxyConfig? = SocksProxyConfig(SocksProxyVersion.SOCKS5, NetworkHostAndPort("127.0.0.1", socksPort), null, null)
|
||||
@ -339,20 +355,24 @@ class SocksTests {
|
||||
}
|
||||
|
||||
private fun createServer(port: Int, name: CordaX500Name = ALICE_NAME): AMQPServer {
|
||||
val baseDirectory = temporaryFolder.root.toPath() / "server"
|
||||
val certificatesDirectory = baseDirectory / "certificates"
|
||||
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
|
||||
val serverConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(temporaryFolder.root.toPath() / "server").whenever(it).baseDirectory
|
||||
doReturn(baseDirectory).whenever(it).baseDirectory
|
||||
doReturn(name).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
}
|
||||
serverConfig.configureWithDevSSLCertificate()
|
||||
|
||||
val serverTruststore = serverConfig.loadTrustStore().internal
|
||||
val serverKeystore = serverConfig.loadSslKeyStore().internal
|
||||
val serverTruststore = serverConfig.p2pSslOptions.trustStore.get()
|
||||
val serverKeystore = serverConfig.p2pSslOptions.keyStore.get()
|
||||
val amqpConfig = object : AMQPConfiguration {
|
||||
override val keyStore: KeyStore = serverKeystore
|
||||
override val keyStorePrivateKeyPassword: CharArray = serverConfig.keyStorePassword.toCharArray()
|
||||
override val trustStore: KeyStore = serverTruststore
|
||||
override val keyStore = serverKeystore
|
||||
override val trustStore = serverTruststore
|
||||
override val trace: Boolean = true
|
||||
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
|
||||
}
|
||||
|
@ -1,8 +1,6 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.messaging.ParametersUpdateInfo
|
||||
@ -10,7 +8,6 @@ import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.config.configureDevKeyAndTrustStores
|
||||
import net.corda.nodeapi.internal.NODE_INFO_DIRECTORY
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
@ -20,12 +17,14 @@ import net.corda.testing.core.*
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.driver.internal.NodeHandleInternal
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.testing.internal.IntegrationTest
|
||||
import net.corda.testing.internal.IntegrationTestSchemas
|
||||
import net.corda.testing.internal.toDatabaseSchemaName
|
||||
import net.corda.testing.node.internal.*
|
||||
import net.corda.testing.node.internal.network.NetworkMapServer
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.After
|
||||
import org.junit.*
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
@ -36,7 +35,7 @@ import java.net.URL
|
||||
import java.time.Instant
|
||||
|
||||
@RunWith(Parameterized::class)
|
||||
class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneParams) {
|
||||
class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneParams) : IntegrationTest() {
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule(true)
|
||||
@ -48,6 +47,13 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
private lateinit var compatibilityZone: CompatibilityZoneParams
|
||||
|
||||
companion object {
|
||||
@ClassRule
|
||||
@JvmField
|
||||
val databaseSchemas = IntegrationTestSchemas(
|
||||
ALICE_NAME.toDatabaseSchemaName(),
|
||||
BOB_NAME.toDatabaseSchemaName(),
|
||||
DUMMY_NOTARY_NAME.toDatabaseSchemaName())
|
||||
|
||||
@JvmStatic
|
||||
@Parameterized.Parameters(name = "{0}")
|
||||
fun runParams() = listOf(
|
||||
@ -241,17 +247,4 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
}
|
||||
assertThat(rpc.networkMapSnapshot()).containsOnly(*nodes)
|
||||
}
|
||||
}
|
||||
|
||||
private fun DriverDSLImpl.startNode(providedName: CordaX500Name, devMode: Boolean): CordaFuture<NodeHandle> {
|
||||
var customOverrides = emptyMap<String, String>()
|
||||
if (!devMode) {
|
||||
val nodeDir = baseDirectory(providedName)
|
||||
val certificatesDirectory = nodeDir / "certificates"
|
||||
val signingCertStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
val p2pSslConfig = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
|
||||
p2pSslConfig.configureDevKeyAndTrustStores(providedName, signingCertStore, certificatesDirectory)
|
||||
customOverrides = mapOf("devMode" to "false")
|
||||
}
|
||||
return startNode(providedName = providedName, customOverrides = customOverrides)
|
||||
}
|
||||
}
|
@ -21,9 +21,11 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATI
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.internal.IntegrationTestSchemas
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.internal.configureTestSSL
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.internal.toDatabaseSchemaName
|
||||
import net.corda.testing.node.internal.NodeBasedTest
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException
|
||||
@ -31,8 +33,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
@ -42,13 +43,20 @@ import kotlin.test.assertEquals
|
||||
* the attacker to [alice].
|
||||
*/
|
||||
abstract class MQSecurityTest : NodeBasedTest() {
|
||||
companion object {
|
||||
@ClassRule
|
||||
@JvmField
|
||||
val databaseSchemas = IntegrationTestSchemas(ALICE_NAME.toDatabaseSchemaName(), BOB_NAME.toDatabaseSchemaName())
|
||||
}
|
||||
|
||||
val rpcUser = User("user1", "pass", permissions = emptySet())
|
||||
lateinit var alice: NodeWithInfo
|
||||
lateinit var attacker: SimpleMQClient
|
||||
private val clients = ArrayList<SimpleMQClient>()
|
||||
|
||||
@Before
|
||||
fun start() {
|
||||
override fun setUp() {
|
||||
super.init()
|
||||
super.setUp()
|
||||
alice = startNode(ALICE_NAME, rpcUsers = extraRPCUsers + rpcUser)
|
||||
attacker = createAttacker()
|
||||
startAttacker(attacker)
|
||||
@ -60,9 +68,10 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
|
||||
abstract fun startAttacker(attacker: SimpleMQClient)
|
||||
|
||||
@After
|
||||
fun stopClients() {
|
||||
override fun tearDown() {
|
||||
rpcConnections.forEach { it.forceClose() }
|
||||
clients.forEach { it.stop() }
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -105,11 +114,6 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
return CordaRPCClient(target).start(rpcUser.username, rpcUser.password).also { rpcConnections.add(it) }.proxy
|
||||
}
|
||||
|
||||
@After
|
||||
fun closeRPCConnections() {
|
||||
rpcConnections.forEach { it.forceClose() }
|
||||
}
|
||||
|
||||
fun loginToRPCAndGetClientQueue(): String {
|
||||
loginToRPC(alice.node.configuration.rpcOptions.address, rpcUser)
|
||||
val clientQueueQuery = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.${rpcUser.username}.*")
|
||||
|
@ -747,7 +747,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
require(nodeCaCertChainRoot == trustRoot) { "Client CA certificate must chain to the trusted root." }
|
||||
|
||||
if (configuration.devMode) {
|
||||
val blacklisted = isCRLDistributionPointBlacklisted(configuration.loadNodeKeyStore().getCertificateChain(X509Utilities.CORDA_CLIENT_CA))
|
||||
val blacklisted = isCRLDistributionPointBlacklisted(configuration.signingCertificateStore.get().query { getCertificateChain(X509Utilities.CORDA_CLIENT_CA) })
|
||||
if (blacklisted) {
|
||||
log.warn("The format of the autogenerated dev. mode certificate this system uses has been deprecated. Please contact support@r3.com for information on how to upgrade.")
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ class NodeWithInfo(val node: Node, val info: NodeInfo) {
|
||||
open class Node(configuration: NodeConfiguration,
|
||||
versionInfo: VersionInfo,
|
||||
private val initialiseSerialization: Boolean = true,
|
||||
cordappLoader: CordappLoader = makeCordappLoader(configuration)
|
||||
cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo)
|
||||
) : AbstractNode<NodeInfo>(
|
||||
configuration,
|
||||
createClock(configuration),
|
||||
@ -134,8 +134,11 @@ open class Node(configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
private val sameVmNodeCounter = AtomicInteger()
|
||||
private fun makeCordappLoader(configuration: NodeConfiguration): CordappLoader {
|
||||
return JarScanningCordappLoader.fromDirectories(configuration.cordappDirectories)
|
||||
|
||||
@JvmStatic
|
||||
protected fun makeCordappLoader(configuration: NodeConfiguration, versionInfo: VersionInfo): CordappLoader {
|
||||
|
||||
return JarScanningCordappLoader.fromDirectories(configuration.cordappDirectories, versionInfo)
|
||||
}
|
||||
// TODO: make this configurable.
|
||||
const val MAX_RPC_MESSAGE_SIZE = 10485760
|
||||
@ -195,6 +198,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
nodeExecutor = serverThread,
|
||||
database = database,
|
||||
networkMap = networkMapCache,
|
||||
metricRegistry = metricRegistry,
|
||||
isDrainingModeOn = nodeProperties.flowsDrainingMode::isEnabled,
|
||||
drainingModeWasChangedEvents = nodeProperties.flowsDrainingMode.values
|
||||
)
|
||||
@ -227,10 +231,18 @@ open class Node(configuration: NodeConfiguration,
|
||||
startLocalRpcBroker(securityManager)
|
||||
}
|
||||
|
||||
val bridgeControlListener = BridgeControlListener(configuration.p2pSslOptions, network.serverAddress, networkParameters.maxMessageSize)
|
||||
val externalBridge = configuration.enterpriseConfiguration.externalBridge
|
||||
val bridgeControlListener = if (externalBridge == null || !externalBridge) {
|
||||
BridgeControlListener(configuration.p2pSslOptions, network.serverAddress, networkParameters.maxMessageSize)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
|
||||
printBasicNodeInfo("Advertised P2P messaging addresses", nodeInfo.addresses.joinToString())
|
||||
val rpcServerConfiguration = RPCServerConfiguration.DEFAULT
|
||||
|
||||
val rpcServerConfiguration = RPCServerConfiguration.DEFAULT.copy(
|
||||
rpcThreadPoolSize = configuration.enterpriseConfiguration.tuning.rpcThreadPoolSize
|
||||
)
|
||||
rpcServerAddresses?.let {
|
||||
internalRpcMessagingClient = InternalRPCMessagingClient(configuration.p2pSslOptions, it.admin, MAX_RPC_MESSAGE_SIZE, CordaX500Name.build(configuration.p2pSslOptions.keyStore.get()[X509Utilities.CORDA_CLIENT_TLS].subjectX500Principal), rpcServerConfiguration)
|
||||
printBasicNodeInfo("RPC connection address", it.primary.toString())
|
||||
@ -247,7 +259,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
start()
|
||||
}
|
||||
// Start P2P bridge service
|
||||
bridgeControlListener.apply {
|
||||
bridgeControlListener?.apply {
|
||||
closeOnStop()
|
||||
start()
|
||||
}
|
||||
@ -260,8 +272,9 @@ open class Node(configuration: NodeConfiguration,
|
||||
network.start(
|
||||
myIdentity = nodeInfo.legalIdentities[0].owningKey,
|
||||
serviceIdentity = if (nodeInfo.legalIdentities.size == 1) null else nodeInfo.legalIdentities[1].owningKey,
|
||||
advertisedAddress = nodeInfo.addresses[0],
|
||||
maxMessageSize = networkParameters.maxMessageSize
|
||||
advertisedAddress = nodeInfo.addresses.single(),
|
||||
maxMessageSize = networkParameters.maxMessageSize,
|
||||
legalName = nodeInfo.legalIdentities[0].name.toString()
|
||||
)
|
||||
}
|
||||
|
||||
@ -286,12 +299,16 @@ open class Node(configuration: NodeConfiguration,
|
||||
|
||||
private fun getAdvertisedAddress(): NetworkHostAndPort {
|
||||
return with(configuration) {
|
||||
val host = if (detectPublicIp) {
|
||||
tryDetectIfNotPublicHost(p2pAddress.host) ?: p2pAddress.host
|
||||
if (relay != null) {
|
||||
NetworkHostAndPort(relay!!.relayHost, relay!!.remoteInboundPort)
|
||||
} else {
|
||||
p2pAddress.host
|
||||
val host = if (detectPublicIp) {
|
||||
tryDetectIfNotPublicHost(p2pAddress.host) ?: p2pAddress.host
|
||||
} else {
|
||||
p2pAddress.host
|
||||
}
|
||||
NetworkHostAndPort(host, p2pAddress.port)
|
||||
}
|
||||
NetworkHostAndPort(host, p2pAddress.port)
|
||||
}
|
||||
}
|
||||
|
||||
@ -369,6 +386,8 @@ open class Node(configuration: NodeConfiguration,
|
||||
}
|
||||
printBasicNodeInfo("Database connection url is", "jdbc:h2:$url/node")
|
||||
}
|
||||
} else if (databaseUrl != null) {
|
||||
printBasicNodeInfo("Database connection url is", databaseUrl)
|
||||
}
|
||||
|
||||
super.startDatabase()
|
||||
|
@ -17,6 +17,7 @@ import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.config.UnknownConfigKeysPolicy
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.nodeapi.internal.config.parseAs
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence.DataSourceConfigTag
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.tools.shell.SSHDConfiguration
|
||||
import org.slf4j.Logger
|
||||
@ -27,6 +28,7 @@ import java.util.*
|
||||
import javax.security.auth.x500.X500Principal
|
||||
|
||||
val Int.MB: Long get() = this * 1024L * 1024L
|
||||
val Int.KB: Long get() = this * 1024L
|
||||
|
||||
private val DEFAULT_FLOW_MONITOR_PERIOD_MILLIS: Duration = Duration.ofMinutes(1)
|
||||
private val DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS: Duration = Duration.ofMinutes(1)
|
||||
@ -53,16 +55,20 @@ interface NodeConfiguration {
|
||||
val rpcOptions: NodeRpcOptions
|
||||
val messagingServerAddress: NetworkHostAndPort?
|
||||
val messagingServerExternal: Boolean
|
||||
val enterpriseConfiguration: EnterpriseConfiguration
|
||||
// TODO Move into DevModeOptions
|
||||
val useTestClock: Boolean get() = false
|
||||
val lazyBridgeStart: Boolean
|
||||
val detectPublicIp: Boolean get() = true
|
||||
val sshd: SSHDConfiguration?
|
||||
val database: DatabaseConfig
|
||||
val relay: RelayConfiguration?
|
||||
val noLocalShell: Boolean get() = false
|
||||
val transactionCacheSizeBytes: Long get() = defaultTransactionCacheSize
|
||||
val attachmentContentCacheSizeBytes: Long get() = defaultAttachmentContentCacheSize
|
||||
val attachmentCacheBound: Long get() = defaultAttachmentCacheBound
|
||||
val graphiteOptions: GraphiteOptions? get() = null
|
||||
|
||||
// do not change this value without syncing it with ScheduledFlowsDrainingModeTest
|
||||
val drainingModePollPeriod: Duration get() = Duration.ofSeconds(5)
|
||||
val extraNetworkMapKeys: List<UUID>
|
||||
@ -110,6 +116,13 @@ enum class JmxReporterType {
|
||||
|
||||
data class DevModeOptions(val disableCheckpointChecker: Boolean = false, val allowCompatibilityZone: Boolean = false)
|
||||
|
||||
data class GraphiteOptions(
|
||||
val server: String,
|
||||
val port: Int,
|
||||
val prefix: String? = null, // defaults to org name and ip address when null
|
||||
val sampleInvervallSeconds: Long = 60
|
||||
)
|
||||
|
||||
fun NodeConfiguration.shouldCheckCheckpoints(): Boolean {
|
||||
return this.devMode && this.devModeOptions?.disableCheckpointChecker != true
|
||||
}
|
||||
@ -122,15 +135,47 @@ data class NotaryConfig(val validating: Boolean,
|
||||
val raft: RaftConfig? = null,
|
||||
val bftSMaRt: BFTSMaRtConfiguration? = null,
|
||||
val custom: Boolean = false,
|
||||
val mysql: MySQLConfiguration? = null,
|
||||
val serviceLegalName: CordaX500Name? = null
|
||||
) {
|
||||
init {
|
||||
require(raft == null || bftSMaRt == null || !custom) {
|
||||
"raft, bftSMaRt, and custom configs cannot be specified together"
|
||||
require(raft == null || bftSMaRt == null || !custom || mysql == null) {
|
||||
"raft, bftSMaRt, custom, and mysql configs cannot be specified together"
|
||||
}
|
||||
}
|
||||
|
||||
val isClusterConfig: Boolean get() = raft != null || bftSMaRt != null
|
||||
val isClusterConfig: Boolean get() = raft != null || bftSMaRt != null || mysql != null
|
||||
}
|
||||
|
||||
data class MySQLConfiguration(
|
||||
val dataSource: Properties,
|
||||
/**
|
||||
* Number of times to attempt to reconnect to the database.
|
||||
*/
|
||||
val connectionRetries: Int = 2, // Default value for a 3 server cluster.
|
||||
/**
|
||||
* Time increment between re-connection attempts.
|
||||
*
|
||||
* The total back-off duration is calculated as: backOffIncrement * backOffBase ^ currentRetryCount
|
||||
*/
|
||||
val backOffIncrement: Int = 500,
|
||||
/** Exponential back-off multiplier base. */
|
||||
val backOffBase: Double = 1.5,
|
||||
/** The maximum number of transactions processed in a single batch. */
|
||||
val maxBatchSize: Int = 500,
|
||||
/** The maximum combined number of input states processed in a single batch. */
|
||||
val maxBatchInputStates: Int = 10_000,
|
||||
/** A batch will be processed after a specified timeout even if it has not yet reached full capacity. */
|
||||
val batchTimeoutMs: Long = 200,
|
||||
/**
|
||||
* The maximum number of commit requests in flight. Once the capacity is reached the service will block on
|
||||
* further commit requests.
|
||||
*/
|
||||
val maxQueueSize: Int = 100_000
|
||||
) {
|
||||
init {
|
||||
require(connectionRetries >= 0) { "connectionRetries cannot be negative" }
|
||||
}
|
||||
}
|
||||
|
||||
data class RaftConfig(val nodeAddress: NetworkHostAndPort, val clusterAddresses: List<NetworkHostAndPort>)
|
||||
@ -200,8 +245,12 @@ data class NodeConfigurationImpl(
|
||||
override val p2pAddress: NetworkHostAndPort,
|
||||
private val rpcAddress: NetworkHostAndPort? = null,
|
||||
private val rpcSettings: NodeRpcSettings,
|
||||
override val relay: RelayConfiguration?,
|
||||
// TODO This field is slightly redundant as p2pAddress is sufficient to hold the address of the node's MQ broker.
|
||||
// Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one
|
||||
override val messagingServerAddress: NetworkHostAndPort?,
|
||||
override val messagingServerExternal: Boolean = (messagingServerAddress != null),
|
||||
override val enterpriseConfiguration: EnterpriseConfiguration,
|
||||
override val notary: NotaryConfig?,
|
||||
@Suppress("DEPRECATION")
|
||||
@Deprecated("Do not configure")
|
||||
@ -215,10 +264,11 @@ data class NodeConfigurationImpl(
|
||||
// TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration
|
||||
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(),
|
||||
override val sshd: SSHDConfiguration? = null,
|
||||
override val database: DatabaseConfig = DatabaseConfig(initialiseSchema = devMode, exportHibernateJMXStatistics = devMode),
|
||||
override val database: DatabaseConfig = DatabaseConfig(exportHibernateJMXStatistics = devMode),
|
||||
private val transactionCacheSizeMegaBytes: Int? = null,
|
||||
private val attachmentContentCacheSizeMegaBytes: Int? = null,
|
||||
override val attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound,
|
||||
override val graphiteOptions: GraphiteOptions? = null,
|
||||
override val extraNetworkMapKeys: List<UUID> = emptyList(),
|
||||
// do not use or remove (breaks DemoBench together with rejection of unknown configuration keys during parsing)
|
||||
private val h2port: Int? = null,
|
||||
@ -323,7 +373,7 @@ data class NodeConfigurationImpl(
|
||||
}
|
||||
}
|
||||
|
||||
// if compatibiliZoneURL is set then it will be copied into the networkServices field and thus skipping
|
||||
// if compatibilityZoneURL is set then it will be copied into the networkServices field and thus skipping
|
||||
// this check by returning above is fine.
|
||||
networkServices?.let {
|
||||
if (devModeOptions?.allowCompatibilityZone != true) {
|
||||
@ -362,6 +412,31 @@ data class NodeConfigurationImpl(
|
||||
require(security == null || rpcUsers.isEmpty()) {
|
||||
"Cannot specify both 'rpcUsers' and 'security' in configuration"
|
||||
}
|
||||
|
||||
// ensure our datasource configuration is sane
|
||||
require(dataSourceProperties.get("autoCommit") != true) { "Datbase auto commit cannot be enabled, Corda requires transactional behaviour" }
|
||||
dataSourceProperties.set("autoCommit", false)
|
||||
if (dataSourceProperties.get("transactionIsolation") == null) {
|
||||
dataSourceProperties["transactionIsolation"] = database.transactionIsolationLevel.jdbcString
|
||||
}
|
||||
|
||||
// enforce that SQLServer does not get sent all strings as Unicode - hibernate handles this "cleverly"
|
||||
val dataSourceUrl = dataSourceProperties.getProperty(DataSourceConfigTag.DATA_SOURCE_URL, "")
|
||||
if (dataSourceUrl.contains(":sqlserver:") && !dataSourceUrl.contains("sendStringParametersAsUnicode", true)) {
|
||||
dataSourceProperties[DataSourceConfigTag.DATA_SOURCE_URL] = dataSourceUrl + ";sendStringParametersAsUnicode=false"
|
||||
}
|
||||
|
||||
// Adjust connection pool size depending on N=flow thread pool size.
|
||||
// If there is no configured pool size set it to N + 1, otherwise check that it's greater than N.
|
||||
val flowThreadPoolSize = enterpriseConfiguration.tuning.flowThreadPoolSize
|
||||
val maxConnectionPoolSize = dataSourceProperties.getProperty("maximumPoolSize")
|
||||
if (maxConnectionPoolSize == null) {
|
||||
dataSourceProperties.setProperty("maximumPoolSize", (flowThreadPoolSize + 1).toString())
|
||||
} else {
|
||||
require(maxConnectionPoolSize.toInt() > flowThreadPoolSize)
|
||||
}
|
||||
|
||||
// Check for usage of deprecated config
|
||||
@Suppress("DEPRECATION")
|
||||
if(certificateChainCheckPolicies.isNotEmpty()) {
|
||||
logger.warn("""You are configuring certificateChainCheckPolicies. This is a setting that is not used, and will be removed in a future version.
|
||||
@ -499,3 +574,10 @@ data class SecurityConfiguration(val authService: SecurityConfiguration.AuthServ
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data class RelayConfiguration(val relayHost: String,
|
||||
val remoteInboundPort: Int,
|
||||
val username: String,
|
||||
val privateKeyFile: Path,
|
||||
val publicKeyFile: Path,
|
||||
val sshPort: Int = 22)
|
||||
|
@ -28,9 +28,6 @@ import net.corda.node.utilities.registration.NodeRegistrationHelper
|
||||
import net.corda.nodeapi.internal.DevIdentityGenerator
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.addShutdownHook
|
||||
|
||||
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.parseAs
|
||||
import net.corda.nodeapi.internal.config.toConfig
|
||||
import net.corda.nodeapi.internal.crypto.X509KeyStore
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
@ -46,6 +43,7 @@ import net.corda.testing.driver.internal.InProcessImpl
|
||||
import net.corda.testing.driver.internal.NodeHandleInternal
|
||||
import net.corda.testing.driver.internal.OutOfProcessImpl
|
||||
import net.corda.testing.internal.setGlobalSerialization
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.testing.node.ClusterSpec
|
||||
import net.corda.testing.node.NotarySpec
|
||||
import net.corda.testing.node.User
|
||||
@ -1172,20 +1170,18 @@ private fun Config.toNodeOnly(): Config {
|
||||
return if (hasPath("webAddress")) withoutPath("webAddress").withoutPath("useHTTPS") else this
|
||||
}
|
||||
|
||||
internal fun DriverParameters.cordappsForAllNodes(): Set<TestCorDapp> = cordappsForAllNodes ?: cordappsInCurrentAndAdditionalPackages(extraCordappPackagesToScan)
|
||||
internal fun DriverParameters.cordappsForAllNodes(): Set<TestCorDapp> = cordappsForAllNodes
|
||||
?: cordappsInCurrentAndAdditionalPackages(extraCordappPackagesToScan)
|
||||
|
||||
fun DriverDSL.startNode(providedName: CordaX500Name, devMode: Boolean, parameters: NodeParameters = NodeParameters()): CordaFuture<NodeHandle> {
|
||||
var customOverrides = emptyMap<String, String>()
|
||||
if (!devMode) {
|
||||
val nodeDir = baseDirectory(providedName)
|
||||
val nodeSslConfig = object : NodeSSLConfiguration {
|
||||
override val baseDirectory = nodeDir
|
||||
override val keyStorePassword = "cordacadevpass"
|
||||
override val trustStorePassword = "trustpass"
|
||||
override val crlCheckSoftFail = true
|
||||
}
|
||||
nodeSslConfig.configureDevKeyAndTrustStores(providedName)
|
||||
val certificatesDirectory = nodeDir / "certificates"
|
||||
val signingCertStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
val p2pSslConfig = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
|
||||
p2pSslConfig.configureDevKeyAndTrustStores(providedName, signingCertStore, certificatesDirectory)
|
||||
customOverrides = mapOf("devMode" to "false")
|
||||
}
|
||||
return startNode(parameters, providedName = providedName, customOverrides = customOverrides)
|
||||
}
|
||||
}
|
@ -10,15 +10,15 @@ import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.EnterpriseNode
|
||||
import net.corda.node.internal.NodeWithInfo
|
||||
import net.corda.node.internal.EnterpriseNode
|
||||
import net.corda.node.services.config.*
|
||||
import net.corda.nodeapi.internal.config.toConfig
|
||||
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.internal.IntegrationTest
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.internal.testThreadFactory
|
||||
import net.corda.testing.node.User
|
||||
import org.apache.logging.log4j.Level
|
||||
@ -86,9 +86,9 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
|
||||
|
||||
@JvmOverloads
|
||||
fun initNode(legalName: CordaX500Name,
|
||||
platformVersion: Int = 4,
|
||||
rpcUsers: List<User> = emptyList(),
|
||||
configOverrides: Map<String, Any> = emptyMap()): InProcessNode {
|
||||
platformVersion: Int = 4,
|
||||
rpcUsers: List<User> = emptyList(),
|
||||
configOverrides: Map<String, Any> = emptyMap()): InProcessNode {
|
||||
val baseDirectory = baseDirectory(legalName).createDirectories()
|
||||
val p2pAddress = configOverrides["p2pAddress"] ?: portAllocation.nextHostAndPort().toString()
|
||||
val config = ConfigHelper.loadConfig(
|
||||
|
@ -29,15 +29,11 @@ import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import javax.ws.rs.core.MediaType.APPLICATION_OCTET_STREAM
|
||||
|
||||
// TODO: Make a shared implementation of CordaRPCOps where every method is unimplemented?
|
||||
|
||||
class CordaRPCProxyClient(private val targetHostAndPort: NetworkHostAndPort) : CordaRPCOps {
|
||||
companion object {
|
||||
val log = contextLogger()
|
||||
}
|
||||
|
||||
override val protocolVersion: Int = 1000
|
||||
|
||||
init {
|
||||
try {
|
||||
AMQPClientSerializationScheme.initialiseSerialization()
|
||||
@ -79,51 +75,51 @@ class CordaRPCProxyClient(private val targetHostAndPort: NetworkHostAndPort) : C
|
||||
}
|
||||
|
||||
override fun stateMachinesSnapshot(): List<StateMachineInfo> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultQueryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>): Vault.Page<T> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class<out T>): Vault.Page<T> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultQueryByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): Vault.Page<T> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultQueryByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): Vault.Page<T> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultTrackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultTrack(contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultTrackByCriteria(contractStateType: Class<out T>, criteria: QueryCriteria): DataFeed<Vault.Page<T>, Vault.Update<T>> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultTrackByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): DataFeed<Vault.Page<T>, Vault.Update<T>> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <T : ContractState> vaultTrackByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): DataFeed<Vault.Page<T>, Vault.Update<T>> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun internalVerifiedTransactionsSnapshot(): List<SignedTransaction> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction? {
|
||||
@ -131,107 +127,107 @@ class CordaRPCProxyClient(private val targetHostAndPort: NetworkHostAndPort) : C
|
||||
}
|
||||
|
||||
override fun internalVerifiedTransactionsFeed(): DataFeed<List<SignedTransaction>, SignedTransaction> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun stateMachineRecordedTransactionMappingSnapshot(): List<StateMachineTransactionMapping> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun stateMachineRecordedTransactionMappingFeed(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun networkMapFeed(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun networkParametersFeed(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun acceptNewNetworkParameters(parametersHash: SecureHash) {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun <T> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun getVaultTransactionNotes(txnId: SecureHash): Iterable<String> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun attachmentExists(id: SecureHash): Boolean {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun openAttachment(id: SecureHash): InputStream {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun uploadAttachment(jar: InputStream): SecureHash {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String): SecureHash {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun currentNodeTime(): Instant {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun waitUntilNetworkReady(): CordaFuture<Void?> {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun wellKnownPartyFromAnonymous(party: AbstractParty): Party? {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun partyFromKey(key: PublicKey): Party? {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun wellKnownPartyFromX500Name(x500Name: CordaX500Name): Party? {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun notaryPartyFromX500Name(x500Name: CordaX500Name): Party? {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun nodeInfoFromParty(party: AbstractParty): NodeInfo? {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun clearNetworkMapCache() {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun setFlowsDrainingModeEnabled(enabled: Boolean) {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun isFlowsDrainingModeEnabled(): Boolean {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun shutdown() {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun killFlow(id: StateMachineRunId): Boolean {
|
||||
TODO("not implemented")
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun refreshNetworkMapCache() {
|
||||
|
Loading…
x
Reference in New Issue
Block a user