Create basic bridge module and capsule build

Basic pieces of bridge, still very rough

Work in progress

Fixes after rebase

Primitive float tunnel implementation

Put explanatory comments on the interfaces. Add support for different SSL KeyStores for the different connections to/from the bridge and float.

Add a couple more comments

Cleanup

Fix some issues

Use a new custom header to relay the login identity from bridges. (Will add more security to this shortly)

Make key protection logic a bit clearer in the comments

Create some basic test and integrationTests

Add a couple of tests of the BridgeAMQPListenerService

Add some basic tests

Correct comment

Fixup after rebase

Fixup after rebase

Fixup after rebase

Explicit parameter types to work on build box.

Address PR comments

Address some of Mike's PR comments.

Re-enable test on enterprise.

Don't sweep up node default config

Remove obsolete config entry

Correct merge mistake

Configurable whitelist headers on bridge

Don't access primary artemis session from different threads used by inbound packet pathway.

Fix unit test
This commit is contained in:
Matthew Nesbit 2018-02-01 16:59:15 +00:00
parent 341e060424
commit d592fb5c49
61 changed files with 3616 additions and 32 deletions

8
.idea/compiler.xml generated
View File

@ -15,6 +15,14 @@
<module name="behave_test" target="1.8" />
<module name="bootstrapper_main" target="1.8" />
<module name="bootstrapper_test" target="1.8" />
<module name="bridge_integrationTest" target="1.8" />
<module name="bridge_main" target="1.8" />
<module name="bridge_test" target="1.8" />
<module name="bridgecapsule_main" target="1.6" />
<module name="bridgecapsule_test" target="1.6" />
<module name="bridges_integrationTest" target="1.8" />
<module name="bridges_main" target="1.8" />
<module name="bridges_test" target="1.8" />
<module name="buildSrc_main" target="1.8" />
<module name="buildSrc_test" target="1.8" />
<module name="business-network-demo_integrationTest" target="1.8" />

View File

@ -0,0 +1,65 @@
/**
* This build.gradle exists to publish our capsule (executable fat jar) to maven. It cannot be placed in the
* bridges project because the bintray plugin cannot publish two modules from one project.
*/
apply plugin: 'net.corda.plugins.publish-utils'
apply plugin: 'us.kirchmeier.capsule'
description 'Corda bridge server capsule'
configurations {
runtimeArtifacts
capsuleRuntime
}
dependencies {
// TypeSafe Config: for simple and human friendly config files.
capsuleRuntime "com.typesafe:config:$typesafe_config_version"
}
// Force the Caplet to target Java 6. This ensures that running 'java -jar corda.jar' on any Java 6 VM upwards
// will get as far as the Capsule version checks, meaning that if your JVM is too old, you will at least get
// a sensible error message telling you what to do rather than a bytecode version exception that doesn't.
// If we introduce .java files into this module that need Java 8+ then we will have to push the caplet into
// its own module so its target can be controlled individually, but for now this suffices.
sourceCompatibility = 1.6
targetCompatibility = 1.6
task buildBridgeServerJar(type: FatCapsule, dependsOn: project(':bridge').jar) {
applicationClass 'net.corda.bridge.Bridge'
archiveName "corda-bridgeserver-${corda_release_version}.jar"
applicationSource = files(
project(':bridge').configurations.runtime,
project(':bridge').jar,
"$rootDir/config/dev/log4j2.xml",
"$rootDir/bridge/build/resources/main/reference.conf"
)
from 'NOTICE' // Copy CDDL notice
from configurations.capsuleRuntime.files.collect { zipTree(it) }
capsuleManifest {
applicationVersion = corda_release_version
javaAgents = []
systemProperties['visualvm.display.name'] = 'Corda Bridge Server'
minJavaVersion = '1.8.0'
minUpdateVersion['1.8'] = java8_minUpdateVersion
caplets = []
// JVM configuration:
// - Constrain to small heap sizes to ease development on low end devices.
// - Switch to the G1 GC which is going to be the default in Java 9 and gives low pause times/string dedup.
jvmArgs = ['-Xmx200m', '-XX:+UseG1GC']
}
}
artifacts {
runtimeArtifacts buildBridgeServerJar
publish buildBridgeServerJar {
classifier ""
}
}
publish {
disableDefaultJar = true
name 'corda-bridgeserver'
}

56
bridge/build.gradle Normal file
View File

@ -0,0 +1,56 @@
apply plugin: 'kotlin'
apply plugin: 'java'
apply plugin: 'net.corda.plugins.publish-utils'
description 'Corda peer bridging components'
configurations {
integrationTestCompile.extendsFrom testCompile
integrationTestRuntime.extendsFrom testRuntime
}
sourceSets {
integrationTest {
kotlin {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integration-test/kotlin')
}
}
}
processResources {
from file("$rootDir/config/dev/log4j2.xml")
}
dependencies {
compile project(':node-api')
// Log4J: logging framework (with SLF4J bindings)
compile "org.apache.logging.log4j:log4j-slf4j-impl:$log4j_version"
compile "org.apache.logging.log4j:log4j-core:$log4j_version"
compile "org.slf4j:jul-to-slf4j:$slf4j_version"
// JOpt: for command line flags.
compile "net.sf.jopt-simple:jopt-simple:$jopt_simple_version"
// Manifests: for reading stuff from the manifest file
compile "com.jcabi:jcabi-manifests:1.1"
integrationTestCompile project(':node-driver')
testCompile "junit:junit:$junit_version"
testCompile project(':test-utils')
}
task integrationTest(type: Test) {
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath
}
jar {
baseName 'corda-bridge-impl'
}
publish {
name jar.baseName
}

View File

@ -0,0 +1,135 @@
package net.corda.bridge
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.bridge.internal.BridgeInstance
import net.corda.bridge.services.api.BridgeMode
import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.EnterpriseConfiguration
import net.corda.node.services.config.MutualExclusionConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.rigorousMock
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
class BridgeIntegrationTest {
@Rule
@JvmField
val tempFolder = TemporaryFolder()
@Rule
@JvmField
val serializationEnvironment = SerializationEnvironmentRule(true)
private abstract class AbstractNodeConfiguration : NodeConfiguration
@Test
fun `Load simple all in one bridge and stand it up`() {
val configResource = "/net/corda/bridge/singleprocess/bridge.conf"
createNetworkParams(tempFolder.root.toPath())
val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
assertEquals(BridgeMode.SenderReceiver, config.bridgeMode)
assertEquals(NetworkHostAndPort("localhost", 11005), config.outboundConfig!!.artemisBrokerAddress)
assertEquals(NetworkHostAndPort("0.0.0.0", 10005), config.inboundConfig!!.listeningAddress)
assertNull(config.floatInnerConfig)
assertNull(config.floatOuterConfig)
config.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val (artemisServer, artemisClient) = createArtemis()
try {
val bridge = BridgeInstance(config, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
val stateFollower = bridge.activeChange.toBlocking().iterator
assertEquals(false, stateFollower.next())
assertEquals(false, bridge.active)
bridge.start()
assertEquals(true, stateFollower.next())
assertEquals(true, bridge.active)
assertEquals(true, serverListening("localhost", 10005))
bridge.stop()
assertEquals(false, stateFollower.next())
assertEquals(false, bridge.active)
assertEquals(false, serverListening("localhost", 10005))
} finally {
artemisClient.stop()
artemisServer.stop()
}
}
@Test
fun `Load bridge (float inner) and float outer and stand them up`() {
val bridgeFolder = tempFolder.root.toPath()
val bridgeConfigResource = "/net/corda/bridge/withfloat/bridge/bridge.conf"
val bridgeConfig = createAndLoadConfigFromResource(bridgeFolder, bridgeConfigResource)
bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
createNetworkParams(bridgeFolder)
assertEquals(BridgeMode.FloatInner, bridgeConfig.bridgeMode)
assertEquals(NetworkHostAndPort("localhost", 11005), bridgeConfig.outboundConfig!!.artemisBrokerAddress)
val floatFolder = tempFolder.root.toPath() / "float"
val floatConfigResource = "/net/corda/bridge/withfloat/float/bridge.conf"
val floatConfig = createAndLoadConfigFromResource(floatFolder, floatConfigResource)
floatConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
createNetworkParams(floatFolder)
assertEquals(BridgeMode.FloatOuter, floatConfig.bridgeMode)
assertEquals(NetworkHostAndPort("0.0.0.0", 10005), floatConfig.inboundConfig!!.listeningAddress)
val (artemisServer, artemisClient) = createArtemis()
try {
val bridge = BridgeInstance(bridgeConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
val bridgeStateFollower = bridge.activeChange.toBlocking().iterator
val float = BridgeInstance(floatConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
val floatStateFollower = float.activeChange.toBlocking().iterator
assertEquals(false, floatStateFollower.next())
float.start()
assertEquals(true, floatStateFollower.next())
assertEquals(true, float.active) // float is running
assertEquals(false, serverListening("localhost", 10005)) // but not activated
assertEquals(false, bridgeStateFollower.next())
bridge.start()
assertEquals(true, bridgeStateFollower.next())
assertEquals(true, bridge.active)
assertEquals(true, float.active)
assertEquals(true, serverListening("localhost", 10005)) // now activated
bridge.stop()
assertEquals(false, bridgeStateFollower.next())
assertEquals(false, bridge.active)
assertEquals(true, float.active)
assertEquals(false, serverListening("localhost", 10005)) // now de-activated
float.stop()
assertEquals(false, floatStateFollower.next())
assertEquals(false, bridge.active)
assertEquals(false, float.active)
} finally {
artemisClient.stop()
artemisServer.stop()
}
}
private fun createArtemis(): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(tempFolder.root.toPath()).whenever(it).baseDirectory
doReturn(ALICE_NAME).whenever(it).myLegalName
doReturn("trustpass").whenever(it).trustStorePassword
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn(NetworkHostAndPort("localhost", 11005)).whenever(it).p2pAddress
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
}
val artemisServer = ArtemisMessagingServer(artemisConfig, 11005, MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", 11005), MAX_MESSAGE_SIZE)
artemisServer.start()
artemisClient.start()
return Pair(artemisServer, artemisClient)
}
}

View File

@ -0,0 +1,166 @@
package net.corda.bridge.services
import net.corda.bridge.createAndLoadConfigFromResource
import net.corda.bridge.createBridgeKeyStores
import net.corda.bridge.createNetworkParams
import net.corda.bridge.serverListening
import net.corda.bridge.services.receiver.BridgeAMQPListenerServiceImpl
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.Crypto.ECDSA_SECP256R1_SHA256
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.div
import net.corda.core.internal.readAll
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.crypto.X509KeyStore
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import org.junit.Assert.assertArrayEquals
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import kotlin.test.assertEquals
class AMQPListenerTest {
@Rule
@JvmField
val tempFolder = TemporaryFolder()
@Rule
@JvmField
val serializationEnvironment = SerializationEnvironmentRule(true)
@Test
fun `Basic AMPQListenerService lifecycle test`() {
val configResource = "/net/corda/bridge/singleprocess/bridge.conf"
createNetworkParams(tempFolder.root.toPath())
val bridgeConfig = createAndLoadConfigFromResource(tempFolder.root.toPath() / "listener", configResource)
bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val auditService = TestAuditService()
val amqpListenerService = BridgeAMQPListenerServiceImpl(bridgeConfig, auditService)
val stateFollower = amqpListenerService.activeChange.toBlocking().iterator
val connectionFollower = amqpListenerService.onConnection.toBlocking().iterator
val auditFollower = auditService.onAuditEvent.toBlocking().iterator
// Listener doesn't come up yet as not started
assertEquals(false, stateFollower.next())
amqpListenerService.start()
// Listener still not up as audit not ready
assertEquals(false, amqpListenerService.active)
auditService.start()
// Service 'active', but no listening activity yet
assertEquals(true, stateFollower.next())
assertEquals(true, amqpListenerService.active)
assertEquals(false, serverListening("localhost", 10005))
val keyStoreBytes = bridgeConfig.sslKeystore.readAll()
val trustStoreBytes = bridgeConfig.trustStoreFile.readAll()
// start listening
amqpListenerService.provisionKeysAndActivate(keyStoreBytes,
bridgeConfig.keyStorePassword.toCharArray(),
bridgeConfig.keyStorePassword.toCharArray(),
trustStoreBytes,
bridgeConfig.trustStorePassword.toCharArray())
// Fire lots of activity to prove we are good
assertEquals(TestAuditService.AuditEvent.STATUS_CHANGE, auditFollower.next())
assertEquals(true, amqpListenerService.active)
// Definitely a socket tehre
assertEquals(true, serverListening("localhost", 10005))
// But not a valid SSL link
assertEquals(false, connectionFollower.next().connected)
assertEquals(TestAuditService.AuditEvent.FAILED_CONNECTION, auditFollower.next())
val clientConfig = createAndLoadConfigFromResource(tempFolder.root.toPath() / "client", configResource)
clientConfig.createBridgeKeyStores(DUMMY_BANK_B_NAME)
val clientKeyStore = clientConfig.loadSslKeyStore().internal
val clientTrustStore = clientConfig.loadTrustStore().internal
// create and connect a real client
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", 10005)),
setOf(DUMMY_BANK_A_NAME),
PEER_USER,
PEER_USER,
clientKeyStore,
clientConfig.keyStorePassword,
clientTrustStore)
amqpClient.start()
// Should see events to show we got a valid connection
val connectedEvent = connectionFollower.next()
assertEquals(true, connectedEvent.connected)
assertEquals(DUMMY_BANK_B_NAME, CordaX500Name.build(connectedEvent.remoteCert!!.subjectX500Principal))
assertEquals(TestAuditService.AuditEvent.SUCCESSFUL_CONNECTION, auditFollower.next())
val receiver = amqpListenerService.onReceive.toBlocking().iterator
// Send a test message
val testMsg = "A test".toByteArray()
val msg = amqpClient.createMessage(testMsg, "${PEERS_PREFIX}fake", DUMMY_BANK_A_NAME.toString(), emptyMap())
amqpClient.write(msg)
val receivedMessage = receiver.next()
// confirm details match
assertEquals(DUMMY_BANK_B_NAME, CordaX500Name.parse(receivedMessage.sourceLegalName))
assertArrayEquals(testMsg, receivedMessage.payload)
receivedMessage.complete(true)
assertEquals(MessageStatus.Acknowledged, msg.onComplete.get())
// Shutdown link
amqpClient.stop()
// verify audit events for disconnect
val disconnectedEvent = connectionFollower.next()
assertEquals(false, disconnectedEvent.connected)
assertEquals(DUMMY_BANK_B_NAME, CordaX500Name.build(disconnectedEvent.remoteCert!!.subjectX500Principal))
assertEquals(TestAuditService.AuditEvent.FAILED_CONNECTION, auditFollower.next())
// tear down listener
amqpListenerService.wipeKeysAndDeactivate()
assertEquals(true, amqpListenerService.active)
assertEquals(false, serverListening("localhost", 10005))
amqpListenerService.stop()
assertEquals(false, stateFollower.next())
assertEquals(false, amqpListenerService.active)
}
@Test
fun `Bad certificate audit check`() {
val configResource = "/net/corda/bridge/singleprocess/bridge.conf"
createNetworkParams(tempFolder.root.toPath())
val bridgeConfig = createAndLoadConfigFromResource(tempFolder.root.toPath() / "listener", configResource)
bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val auditService = TestAuditService()
val amqpListenerService = BridgeAMQPListenerServiceImpl(bridgeConfig, auditService)
amqpListenerService.start()
auditService.start()
val keyStoreBytes = bridgeConfig.sslKeystore.readAll()
val trustStoreBytes = bridgeConfig.trustStoreFile.readAll()
// start listening
amqpListenerService.provisionKeysAndActivate(keyStoreBytes,
bridgeConfig.keyStorePassword.toCharArray(),
bridgeConfig.keyStorePassword.toCharArray(),
trustStoreBytes,
bridgeConfig.trustStorePassword.toCharArray())
val connectionFollower = amqpListenerService.onConnection.toBlocking().iterator
val auditFollower = auditService.onAuditEvent.toBlocking().iterator
val clientKeys = Crypto.generateKeyPair(ECDSA_SECP256R1_SHA256)
val clientCert = X509Utilities.createSelfSignedCACertificate(ALICE_NAME.x500Principal, clientKeys)
val clientKeyStore = X509KeyStore("password")
clientKeyStore.setPrivateKey("TLS_CERT", clientKeys.private, listOf(clientCert))
val clientTrustStore = X509KeyStore("password")
clientTrustStore.setCertificate("TLS_ROOT", clientCert)
// create and connect a real client
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", 10005)),
setOf(DUMMY_BANK_A_NAME),
PEER_USER,
PEER_USER,
clientKeyStore.internal,
"password",
clientTrustStore.internal)
amqpClient.start()
val connectionEvent = connectionFollower.next()
assertEquals(false, connectionEvent.connected)
assertEquals(TestAuditService.AuditEvent.FAILED_CONNECTION, auditFollower.next())
amqpClient.stop()
amqpListenerService.wipeKeysAndDeactivate()
amqpListenerService.stop()
}
}

View File

@ -0,0 +1,103 @@
package net.corda.bridge.services
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.bridge.createAndLoadConfigFromResource
import net.corda.bridge.createBridgeKeyStores
import net.corda.bridge.createNetworkParams
import net.corda.bridge.services.artemis.BridgeArtemisConnectionServiceImpl
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.EnterpriseConfiguration
import net.corda.node.services.config.MutualExclusionConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.rigorousMock
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
class ArtemisConnectionTest {
@Rule
@JvmField
val tempFolder = TemporaryFolder()
@Rule
@JvmField
val serializationEnvironment = SerializationEnvironmentRule(true)
private abstract class AbstractNodeConfiguration : NodeConfiguration
@Test
fun `Basic lifecycle test`() {
val configResource = "/net/corda/bridge/singleprocess/bridge.conf"
createNetworkParams(tempFolder.root.toPath())
val bridgeConfig = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val auditService = TestAuditService()
val artemisService = BridgeArtemisConnectionServiceImpl(bridgeConfig, MAX_MESSAGE_SIZE, auditService)
val stateFollower = artemisService.activeChange.toBlocking().iterator
artemisService.start()
assertEquals(false, stateFollower.next())
assertEquals(false, artemisService.active)
assertNull(artemisService.started)
auditService.start()
assertEquals(false, artemisService.active)
assertNull(artemisService.started)
var artemisServer = createArtemis()
try {
assertEquals(true, stateFollower.next())
assertEquals(true, artemisService.active)
assertNotNull(artemisService.started)
auditService.stop()
assertEquals(false, stateFollower.next())
assertEquals(false, artemisService.active)
assertNull(artemisService.started)
auditService.start()
assertEquals(true, stateFollower.next())
assertEquals(true, artemisService.active)
assertNotNull(artemisService.started)
} finally {
artemisServer.stop()
}
assertEquals(false, stateFollower.next())
assertEquals(false, artemisService.active)
assertNull(artemisService.started)
artemisServer = createArtemis()
try {
assertEquals(true, stateFollower.next())
assertEquals(true, artemisService.active)
assertNotNull(artemisService.started)
} finally {
artemisServer.stop()
}
assertEquals(false, stateFollower.next())
assertEquals(false, artemisService.active)
assertNull(artemisService.started)
artemisService.stop()
}
private fun createArtemis(): ArtemisMessagingServer {
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(tempFolder.root.toPath()).whenever(it).baseDirectory
doReturn(DUMMY_BANK_A_NAME).whenever(it).myLegalName
doReturn("trustpass").whenever(it).trustStorePassword
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn(NetworkHostAndPort("localhost", 11005)).whenever(it).p2pAddress
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
}
val artemisServer = ArtemisMessagingServer(artemisConfig, 11005, MAX_MESSAGE_SIZE)
artemisServer.start()
return artemisServer
}
}

View File

@ -0,0 +1,251 @@
package net.corda.bridge.services
import com.nhaarman.mockito_kotlin.*
import net.corda.bridge.*
import net.corda.bridge.services.api.BridgeAMQPListenerService
import net.corda.bridge.services.api.IncomingMessageFilterService
import net.corda.bridge.services.ha.SingleInstanceMasterService
import net.corda.bridge.services.receiver.FloatControlListenerService
import net.corda.bridge.services.receiver.TunnelingBridgeReceiverService
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.rigorousMock
import org.junit.Assert.assertArrayEquals
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import rx.Observable
import rx.subjects.PublishSubject
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
class TunnelControlTest {
companion object {
val inboxTopic = "${P2P_PREFIX}test"
}
@Rule
@JvmField
val tempFolder = TemporaryFolder()
@Rule
@JvmField
val serializationEnvironment = SerializationEnvironmentRule(true)
private abstract class TestBridgeAMQPListenerService : BridgeAMQPListenerService, TestServiceBase() {
private var _running: Boolean = false
override val running: Boolean
get() = _running
override fun provisionKeysAndActivate(keyStoreBytes: ByteArray, keyStorePassword: CharArray, keyStorePrivateKeyPassword: CharArray, trustStoreBytes: ByteArray, trustStorePassword: CharArray) {
_running = true
}
override fun wipeKeysAndDeactivate() {
_running = false
}
}
private abstract class TestIncomingMessageFilterService : IncomingMessageFilterService, TestServiceBase()
@Test
fun `Basic tunnel life cycle test`() {
val bridgeConfigResource = "/net/corda/bridge/withfloat/bridge/bridge.conf"
val bridgePath = tempFolder.root.toPath() / "bridge"
bridgePath.createDirectories()
createNetworkParams(bridgePath)
val bridgeConfig = createAndLoadConfigFromResource(bridgePath, bridgeConfigResource)
bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val bridgeAuditService = TestAuditService()
val haService = SingleInstanceMasterService(bridgeConfig, bridgeAuditService)
val filterService = createPartialMock<TestIncomingMessageFilterService>()
val bridgeProxiedReceiverService = TunnelingBridgeReceiverService(bridgeConfig, bridgeAuditService, haService, filterService)
val bridgeStateFollower = bridgeProxiedReceiverService.activeChange.toBlocking().iterator
bridgeProxiedReceiverService.start()
assertEquals(false, bridgeStateFollower.next())
assertEquals(false, bridgeProxiedReceiverService.active)
bridgeAuditService.start()
assertEquals(false, bridgeProxiedReceiverService.active)
filterService.start()
assertEquals(false, bridgeProxiedReceiverService.active)
haService.start()
assertEquals(false, bridgeProxiedReceiverService.active)
val floatConfigResource = "/net/corda/bridge/withfloat/float/bridge.conf"
val floatPath = tempFolder.root.toPath() / "float"
floatPath.createDirectories()
createNetworkParams(floatPath)
val floatConfig = createAndLoadConfigFromResource(floatPath, floatConfigResource)
floatConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val floatAuditService = TestAuditService()
val amqpListenerService = createPartialMock<TestBridgeAMQPListenerService>().also {
doReturn(Observable.never<ConnectionChange>()).whenever(it).onConnection
doReturn(Observable.never<ReceivedMessage>()).whenever(it).onReceive
}
val floatControlListener = FloatControlListenerService(floatConfig, floatAuditService, amqpListenerService)
val floatStateFollower = floatControlListener.activeChange.toBlocking().iterator
assertEquals(false, floatStateFollower.next())
assertEquals(false, floatControlListener.active)
floatControlListener.start()
assertEquals(false, floatControlListener.active)
floatAuditService.start()
assertEquals(false, floatControlListener.active)
verify(amqpListenerService, times(0)).wipeKeysAndDeactivate()
verify(amqpListenerService, times(0)).provisionKeysAndActivate(any(), any(), any(), any(), any())
assertEquals(false, serverListening("localhost", 12005))
amqpListenerService.start()
assertEquals(true, floatStateFollower.next())
assertEquals(true, floatControlListener.active)
assertEquals(true, serverListening("localhost", 12005))
assertEquals(true, bridgeStateFollower.next())
assertEquals(true, bridgeProxiedReceiverService.active)
verify(amqpListenerService, times(0)).wipeKeysAndDeactivate()
verify(amqpListenerService, times(1)).provisionKeysAndActivate(any(), any(), any(), any(), any())
haService.stop()
assertEquals(false, bridgeStateFollower.next())
assertEquals(false, bridgeProxiedReceiverService.active)
assertEquals(true, floatControlListener.active)
verify(amqpListenerService, times(1)).wipeKeysAndDeactivate()
verify(amqpListenerService, times(1)).provisionKeysAndActivate(any(), any(), any(), any(), any())
assertEquals(true, serverListening("localhost", 12005))
haService.start()
assertEquals(true, bridgeStateFollower.next())
assertEquals(true, bridgeProxiedReceiverService.active)
assertEquals(true, floatControlListener.active)
verify(amqpListenerService, times(1)).wipeKeysAndDeactivate()
verify(amqpListenerService, times(2)).provisionKeysAndActivate(any(), any(), any(), any(), any())
floatControlListener.stop()
assertEquals(false, floatControlListener.active)
bridgeProxiedReceiverService.stop()
assertEquals(false, bridgeStateFollower.next())
assertEquals(false, bridgeProxiedReceiverService.active)
}
@Test
fun `Inbound message test`() {
val bridgeConfigResource = "/net/corda/bridge/withfloat/bridge/bridge.conf"
val bridgePath = tempFolder.root.toPath() / "bridge"
bridgePath.createDirectories()
createNetworkParams(bridgePath)
val bridgeConfig = createAndLoadConfigFromResource(bridgePath, bridgeConfigResource)
bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val bridgeAuditService = TestAuditService()
val haService = SingleInstanceMasterService(bridgeConfig, bridgeAuditService)
val forwardedMessages = PublishSubject.create<ReceivedMessage>()
val filterService = createPartialMock<TestIncomingMessageFilterService>().also {
doAnswer {
val msg = it.arguments[0] as ReceivedMessage
forwardedMessages.onNext(msg)
Unit
}.whenever(it).sendMessageToLocalBroker(any())
}
val bridgeProxiedReceiverService = TunnelingBridgeReceiverService(bridgeConfig, bridgeAuditService, haService, filterService)
val bridgeStateFollower = bridgeProxiedReceiverService.activeChange.toBlocking().iterator
bridgeProxiedReceiverService.start()
bridgeAuditService.start()
filterService.start()
haService.start()
assertEquals(false, bridgeStateFollower.next())
val floatConfigResource = "/net/corda/bridge/withfloat/float/bridge.conf"
val floatPath = tempFolder.root.toPath() / "float"
floatPath.createDirectories()
createNetworkParams(floatPath)
val floatConfig = createAndLoadConfigFromResource(floatPath, floatConfigResource)
floatConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val floatAuditService = TestAuditService()
val receiveObserver = PublishSubject.create<ReceivedMessage>()
val amqpListenerService = createPartialMock<TestBridgeAMQPListenerService>().also {
doReturn(Observable.never<ConnectionChange>()).whenever(it).onConnection
doReturn(receiveObserver).whenever(it).onReceive
}
val floatControlListener = FloatControlListenerService(floatConfig, floatAuditService, amqpListenerService)
floatControlListener.start()
floatAuditService.start()
amqpListenerService.start()
assertEquals(true, bridgeStateFollower.next())
// Message flows back fine from float to bridge and is then forwarded to the filter service
val receiver = forwardedMessages.toBlocking().iterator
val testPayload = ByteArray(1) { 0x11 }
val receivedMessage = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(true) // ACK was called
doReturn(DUMMY_BANK_B_NAME.toString()).whenever(it).sourceLegalName
doReturn(NetworkHostAndPort("localhost", 12345)).whenever(it).sourceLink
doReturn(inboxTopic).whenever(it).topic
doReturn(testPayload).whenever(it).payload
doReturn(emptyMap<Any?, Any?>()).whenever(it).applicationProperties
doReturn(DUMMY_BANK_A_NAME.toString()).whenever(it).destinationLegalName
doReturn(NetworkHostAndPort("localhost", 6789)).whenever(it).destinationLink
}
receiveObserver.onNext(receivedMessage)
val messageReceived = receiver.next()
messageReceived.complete(true)
assertArrayEquals(testPayload, messageReceived.payload)
assertEquals(inboxTopic, messageReceived.topic)
assertEquals(DUMMY_BANK_B_NAME.toString(), messageReceived.sourceLegalName)
// Message NAK is propagated backwards
val testPayload2 = ByteArray(1) { 0x22 }
val ackLatch = CountDownLatch(1)
val receivedMessage2 = rigorousMock<ReceivedMessage>().also {
doAnswer {
ackLatch.countDown()
Unit
}.whenever(it).complete(false) // NAK was called
doReturn(DUMMY_BANK_B_NAME.toString()).whenever(it).sourceLegalName
doReturn(NetworkHostAndPort("localhost", 12345)).whenever(it).sourceLink
doReturn(inboxTopic).whenever(it).topic
doReturn(testPayload2).whenever(it).payload
doReturn(emptyMap<Any?, Any?>()).whenever(it).applicationProperties
doReturn(DUMMY_BANK_A_NAME.toString()).whenever(it).destinationLegalName
doReturn(NetworkHostAndPort("localhost", 6789)).whenever(it).destinationLink
}
receiveObserver.onNext(receivedMessage2)
val messageReceived2 = receiver.next()
messageReceived2.complete(false) // cause NAK to be called
assertArrayEquals(testPayload2, messageReceived2.payload)
assertEquals(inboxTopic, messageReceived2.topic)
assertEquals(DUMMY_BANK_B_NAME.toString(), messageReceived2.sourceLegalName)
ackLatch.await(1, TimeUnit.SECONDS)
verify(receivedMessage2, times(1)).complete(false)
// Message NAK if connection dies, without message acceptance
val ackLatch2 = CountDownLatch(1)
val receivedMessage3 = rigorousMock<ReceivedMessage>().also {
doAnswer {
ackLatch2.countDown()
Unit
}.whenever(it).complete(false) // NAK was called
doReturn(DUMMY_BANK_B_NAME.toString()).whenever(it).sourceLegalName
doReturn(NetworkHostAndPort("localhost", 12345)).whenever(it).sourceLink
doReturn(inboxTopic).whenever(it).topic
doReturn(testPayload2).whenever(it).payload
doReturn(emptyMap<Any?, Any?>()).whenever(it).applicationProperties
doReturn(DUMMY_BANK_A_NAME.toString()).whenever(it).destinationLegalName
doReturn(NetworkHostAndPort("localhost", 6789)).whenever(it).destinationLink
}
receiveObserver.onNext(receivedMessage3)
receiver.next() // wait message on bridge
bridgeProxiedReceiverService.stop() // drop control link
ackLatch.await(1, TimeUnit.SECONDS)
verify(receivedMessage3, times(1)).complete(false)
floatControlListener.stop()
}
}

View File

@ -0,0 +1,10 @@
@file:JvmName("Bridge")
package net.corda.bridge
import net.corda.bridge.internal.BridgeStartup
import kotlin.system.exitProcess
fun main(args: Array<String>) {
exitProcess(if (BridgeStartup(args).run()) 0 else 1)
}

View File

@ -0,0 +1,68 @@
package net.corda.bridge
import joptsimple.OptionParser
import joptsimple.util.EnumConverter
import net.corda.bridge.services.api.BridgeConfiguration
import net.corda.bridge.services.config.BridgeConfigHelper
import net.corda.bridge.services.config.parseAsBridgeConfiguration
import net.corda.core.internal.div
import org.slf4j.event.Level
import java.io.PrintStream
import java.nio.file.Path
import java.nio.file.Paths
// NOTE: Do not use any logger in this class as args parsing is done before the logger is setup.
class ArgsParser {
private val optionParser = OptionParser()
// The intent of allowing a command line configurable directory and config path is to allow deployment flexibility.
// Other general configuration should live inside the config file unless we regularly need temporary overrides on the command line
private val baseDirectoryArg = optionParser
.accepts("base-directory", "The bridge working directory where all the files are kept")
.withRequiredArg()
.defaultsTo(".")
private val configFileArg = optionParser
.accepts("config-file", "The path to the config file")
.withRequiredArg()
.defaultsTo("bridge.conf")
private val loggerLevel = optionParser
.accepts("logging-level", "Enable logging at this level and higher")
.withRequiredArg()
.withValuesConvertedBy(object : EnumConverter<Level>(Level::class.java) {})
.defaultsTo(Level.INFO)
private val logToConsoleArg = optionParser.accepts("log-to-console", "If set, prints logging to the console as well as to a file.")
private val isVersionArg = optionParser.accepts("version", "Print the version and exit")
private val helpArg = optionParser.accepts("help").forHelp()
fun parse(vararg args: String): CmdLineOptions {
val optionSet = optionParser.parse(*args)
require(!optionSet.has(baseDirectoryArg) || !optionSet.has(configFileArg)) {
"${baseDirectoryArg.options()[0]} and ${configFileArg.options()[0]} cannot be specified together"
}
val baseDirectory = Paths.get(optionSet.valueOf(baseDirectoryArg)).normalize().toAbsolutePath()
val configFile = baseDirectory / optionSet.valueOf(configFileArg)
val help = optionSet.has(helpArg)
val loggingLevel = optionSet.valueOf(loggerLevel)
val logToConsole = optionSet.has(logToConsoleArg)
val isVersion = optionSet.has(isVersionArg)
return CmdLineOptions(baseDirectory,
configFile,
help,
loggingLevel,
logToConsole,
isVersion)
}
fun printHelp(sink: PrintStream) = optionParser.printHelpOn(sink)
}
data class CmdLineOptions(val baseDirectory: Path,
val configFile: Path,
val help: Boolean,
val loggingLevel: Level,
val logToConsole: Boolean,
val isVersion: Boolean) {
fun loadConfig(): BridgeConfiguration {
val config = BridgeConfigHelper.loadConfig(baseDirectory, configFile).parseAsBridgeConfiguration()
return config
}
}

View File

@ -0,0 +1,18 @@
package net.corda.bridge
/**
* Encapsulates various pieces of version information of the bridge.
*/
data class BridgeVersionInfo(
/**
* Platform version of the bridge which is an integer value which increments on any release where any of the public
* API of the entire Corda platform changes. This includes messaging, serialisation, bridge APIs, etc.
*/
val platformVersion: Int,
/** Release version string of the bridge. */
val releaseVersion: String,
/** The exact version control commit ID of the bridge build. */
val revision: String,
/** The bridge vendor */
val vendor: String)

View File

@ -0,0 +1,173 @@
package net.corda.bridge.internal
import net.corda.bridge.BridgeVersionInfo
import net.corda.bridge.services.api.*
import net.corda.bridge.services.audit.LoggingBridgeAuditService
import net.corda.bridge.services.supervisors.BridgeSupervisorServiceImpl
import net.corda.bridge.services.supervisors.FloatSupervisorServiceImpl
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.SignedDataWithCert
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.div
import net.corda.core.internal.exists
import net.corda.core.internal.readObject
import net.corda.core.node.NetworkParameters
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ShutdownHook
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
import net.corda.nodeapi.internal.serialization.AMQP_STORAGE_CONTEXT
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
import rx.Subscription
import java.util.concurrent.atomic.AtomicBoolean
class BridgeInstance(val conf: BridgeConfiguration,
val versionInfo: BridgeVersionInfo,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : ServiceLifecycleSupport, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
}
private val shutdown = AtomicBoolean(false)
private var shutdownHook: ShutdownHook? = null
private lateinit var networkParameters: NetworkParameters
private lateinit var bridgeAuditService: BridgeAuditService
private var bridgeSupervisorService: BridgeSupervisorService? = null
private var floatSupervisorService: FloatSupervisorService? = null
private var statusFollower: ServiceStateCombiner? = null
private var statusSubscriber: Subscription? = null
init {
initialiseSerialization()
}
private fun initialiseSerialization() {
val serializationExists = try {
effectiveSerializationEnv
true
} catch (e: IllegalStateException) {
false
}
if (!serializationExists) {
val classloader = this.javaClass.classLoader
nodeSerializationEnv = SerializationEnvironmentImpl(
SerializationFactoryImpl().apply {
registerScheme(AMQPServerSerializationScheme(emptyList()))
},
p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader),
rpcServerContext = AMQP_P2P_CONTEXT.withClassLoader(classloader),
storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader),
checkpointContext = AMQP_P2P_CONTEXT.withClassLoader(classloader))
}
}
override fun start() {
val wasRunning = shutdown.getAndSet(true)
require(!wasRunning) { "Already running" }
shutdownHook = addShutdownHook {
stop()
}
retrieveNetworkParameters()
createServices()
startServices()
}
override fun stop() {
val wasRunning = shutdown.getAndSet(false)
if (!wasRunning) {
return
}
shutdownHook?.cancel()
shutdownHook = null
log.info("Shutting down ...")
stopServices()
_exitFuture.set(this)
log.info("Shutdown complete")
}
private val _exitFuture = openFuture<BridgeInstance>()
val onExit: CordaFuture<BridgeInstance> get() = _exitFuture
private fun retrieveNetworkParameters() {
val trustRoot = conf.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
val networkParamsFile = conf.baseDirectory / NETWORK_PARAMS_FILE_NAME
require(networkParamsFile.exists()) { "No network-parameters file found." }
networkParameters = networkParamsFile.readObject<SignedDataWithCert<NetworkParameters>>().verifiedNetworkMapCert(trustRoot)
log.info("Loaded network parameters: $networkParameters")
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
"Node's platform version is lower than network's required minimumPlatformVersion"
}
}
private fun createServices() {
bridgeAuditService = LoggingBridgeAuditService(conf)
when (conf.bridgeMode) {
// In the SenderReceiver mode the inbound and outbound message paths are run from within a single bridge process.
// The process thus contains components that listen for bridge control messages on Artemis.
// The process can then initiates TLS/AMQP 1.0 connections to remote peers and transfers the outbound messages.
// The process also runs a TLS/AMQP 1.0 server socket, which is can receive connections and messages from peers,
// validate the messages and then forwards the packets to the Artemis inbox queue of the node.
BridgeMode.SenderReceiver -> {
floatSupervisorService = FloatSupervisorServiceImpl(conf, networkParameters.maxMessageSize, bridgeAuditService)
bridgeSupervisorService = BridgeSupervisorServiceImpl(conf, networkParameters.maxMessageSize, bridgeAuditService, floatSupervisorService!!.amqpListenerService)
}
// In the FloatInner mode the process runs the full outbound message path as in the SenderReceiver mode, but the inbound path is split.
// This 'Float Inner/Bridge Controller' process runs the more trusted portion of the inbound path.
// In particular the 'Float Inner/Bridge Controller' has access to the persisted TLS KeyStore, which it provisions dynamically into the 'Float Outer'.
// Also the the 'Float Inner' does more complete validation of inbound messages and ensures that they correspond to legitimate
// node inboxes, before transferring the message to Artemis. Potentially it might carry out deeper checks of received packets.
// However, the 'Float Inner' is not directly exposed to the internet, or peers and does not host the TLS/AMQP 1.0 server socket.
BridgeMode.FloatInner -> {
bridgeSupervisorService = BridgeSupervisorServiceImpl(conf, networkParameters.maxMessageSize, bridgeAuditService, null)
}
// In the FloatOuter mode this process runs a minimal AMQP proxy that is designed to run in a DMZ zone.
// The process holds the minimum data necessary to act as the TLS/AMQP 1.0 receiver socket and tries
// to minimise any state. It specifically does not persist the Node TLS keys anywhere, nor does it hold network map information on peers.
// The 'Float Outer' does not initiate socket connection anywhere, so that attackers can be easily blocked by firewalls
// if they try to invade the system from a compromised 'Float Outer' machine. The 'Float Outer' hosts a control TLS/AMQP 1.0 server socket,
// which receives a connection from the 'Float Inner/Bridge controller' in the trusted zone of the organisation.
// The control channel is ideally authenticated using server/client certificates that are not related to the Corda PKI hierarchy.
// Once the control channel is formed it is used to RPC the methods of the BridgeAMQPListenerService to start the publicly visible
// TLS/AMQP 1.0 server socket of the Corda node. Thus peer connections will directly terminate onto the activate listener socket and
// be validated against the keys/certificates sent across the control tunnel. Inbound messages are given basic checks that do not require
// holding potentially sensitive information and are then forwarded across the control tunnel to the 'Float Inner' process for more
// complete validation checks.
BridgeMode.FloatOuter -> {
floatSupervisorService = FloatSupervisorServiceImpl(conf, networkParameters.maxMessageSize, bridgeAuditService)
}
}
statusFollower = ServiceStateCombiner(listOf(bridgeAuditService, floatSupervisorService, bridgeSupervisorService).filterNotNull())
statusSubscriber = statusFollower!!.activeChange.subscribe {
stateHelper.active = it
}
}
private fun startServices() {
bridgeAuditService.start()
bridgeSupervisorService?.start()
floatSupervisorService?.start()
}
private fun stopServices() {
stateHelper.active = false
floatSupervisorService?.stop()
bridgeSupervisorService?.stop()
bridgeAuditService.stop()
statusSubscriber?.unsubscribe()
statusSubscriber = null
statusFollower = null
}
}

View File

@ -0,0 +1,204 @@
package net.corda.bridge.internal
import com.jcabi.manifests.Manifests
import joptsimple.OptionException
import net.corda.bridge.ArgsParser
import net.corda.bridge.BridgeVersionInfo
import net.corda.bridge.CmdLineOptions
import net.corda.bridge.services.api.BridgeConfiguration
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.addShutdownHook
import org.slf4j.bridge.SLF4JBridgeHandler
import sun.misc.VMSupport
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.net.InetAddress
import java.nio.file.Path
import java.util.*
import kotlin.system.exitProcess
class BridgeStartup(val args: Array<String>) {
companion object {
// lazy init the logging, because the logging levels aren't configured until we have parsed some options.
private val log by lazy { contextLogger() }
val LOGS_DIRECTORY_NAME = "logs"
}
/**
* @return true if the bridge startup was successful. This value is intended to be the exit code of the process.
*/
fun run(): Boolean {
val startTime = System.currentTimeMillis()
val (argsParser, cmdlineOptions) = parseArguments()
// We do the single bridge check before we initialise logging so that in case of a double-bridge start it
// doesn't mess with the running bridge's logs.
enforceSingleBridgeIsRunning(cmdlineOptions.baseDirectory)
initLogging(cmdlineOptions)
val versionInfo = getVersionInfo()
if (cmdlineOptions.isVersion) {
println("${versionInfo.vendor} ${versionInfo.releaseVersion}")
println("Revision ${versionInfo.revision}")
println("Platform Version ${versionInfo.platformVersion}")
return true
}
// Maybe render command line help.
if (cmdlineOptions.help) {
argsParser.printHelp(System.out)
return true
}
val conf = try {
loadConfigFile(cmdlineOptions)
} catch (e: Exception) {
log.error("Exception during bridge configuration", e)
return false
}
try {
logStartupInfo(versionInfo, cmdlineOptions, conf)
} catch (e: Exception) {
log.error("Exception during bridge registration", e)
return false
}
val bridge = try {
cmdlineOptions.baseDirectory.createDirectories()
startBridge(conf, versionInfo, startTime)
} catch (e: Exception) {
if (e.message?.startsWith("Unknown named curve:") == true) {
log.error("Exception during bridge startup - ${e.message}. " +
"This is a known OpenJDK issue on some Linux distributions, please use OpenJDK from zulu.org or Oracle JDK.")
} else {
log.error("Exception during bridge startup", e)
}
return false
}
if (System.getProperties().containsKey("WAIT_KEY_FOR_EXIT")) {
System.`in`.read() // Inside IntelliJ we can't forward CTRL-C, so debugging shutdown is a nightmare. So allow -DWAIT_KEY_FOR_EXIT flag for key based quit.
} else {
bridge.onExit.get()
}
log.info("bridge shutting down")
bridge.stop()
return true
}
fun logStartupInfo(versionInfo: BridgeVersionInfo, cmdlineOptions: CmdLineOptions, conf: BridgeConfiguration) {
log.info("Vendor: ${versionInfo.vendor}")
log.info("Release: ${versionInfo.releaseVersion}")
log.info("Platform Version: ${versionInfo.platformVersion}")
log.info("Revision: ${versionInfo.revision}")
val info = ManagementFactory.getRuntimeMXBean()
log.info("PID: ${info.name.split("@").firstOrNull()}") // TODO Java 9 has better support for this
log.info("Main class: ${BridgeStartup::class.java.protectionDomain.codeSource.location.toURI().path}")
log.info("CommandLine Args: ${info.inputArguments.joinToString(" ")}")
log.info("Application Args: ${args.joinToString(" ")}")
log.info("bootclasspath: ${info.bootClassPath}")
log.info("classpath: ${info.classPath}")
log.info("VM ${info.vmName} ${info.vmVendor} ${info.vmVersion}")
log.info("Machine: ${lookupMachineNameAndMaybeWarn()}")
log.info("Working Directory: ${cmdlineOptions.baseDirectory}")
val agentProperties = VMSupport.getAgentProperties()
if (agentProperties.containsKey("sun.jdwp.listenerAddress")) {
log.info("Debug port: ${agentProperties.getProperty("sun.jdwp.listenerAddress")}")
}
log.info("Starting as bridge mode of ${conf.bridgeMode}")
}
protected fun loadConfigFile(cmdlineOptions: CmdLineOptions): BridgeConfiguration = cmdlineOptions.loadConfig()
protected fun getVersionInfo(): BridgeVersionInfo {
// Manifest properties are only available if running from the corda jar
fun manifestValue(name: String): String? = if (Manifests.exists(name)) Manifests.read(name) else null
return BridgeVersionInfo(
manifestValue("Corda-Platform-Version")?.toInt() ?: 1,
manifestValue("Corda-Release-Version") ?: "Unknown",
manifestValue("Corda-Revision") ?: "Unknown",
manifestValue("Corda-Vendor") ?: "Unknown"
)
}
private fun enforceSingleBridgeIsRunning(baseDirectory: Path) {
// Write out our process ID (which may or may not resemble a UNIX process id - to us it's just a string) to a
// file that we'll do our best to delete on exit. But if we don't, it'll be overwritten next time. If it already
// exists, we try to take the file lock first before replacing it and if that fails it means we're being started
// twice with the same directory: that's a user error and we should bail out.
val pidFile = (baseDirectory / "bridge-process-id").toFile()
pidFile.createNewFile()
pidFile.deleteOnExit()
val pidFileRw = RandomAccessFile(pidFile, "rw")
val pidFileLock = pidFileRw.channel.tryLock()
if (pidFileLock == null) {
println("It appears there is already a bridge running with the specified data directory $baseDirectory")
println("Shut that other bridge down and try again. It may have process ID ${pidFile.readText()}")
System.exit(1)
}
// Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us
// when our process shuts down, but we try in stop() anyway just to be nice.
addShutdownHook {
pidFileLock.release()
}
val ourProcessID: String = ManagementFactory.getRuntimeMXBean().name.split("@")[0]
pidFileRw.setLength(0)
pidFileRw.write(ourProcessID.toByteArray())
}
private fun lookupMachineNameAndMaybeWarn(): String {
val start = System.currentTimeMillis()
val hostName: String = InetAddress.getLocalHost().hostName
val elapsed = System.currentTimeMillis() - start
if (elapsed > 1000 && hostName.endsWith(".local")) {
// User is probably on macOS and experiencing this problem: http://stackoverflow.com/questions/10064581/how-can-i-eliminate-slow-resolving-loading-of-localhost-virtualhost-a-2-3-secon
//
// Also see https://bugs.openjdk.java.net/browse/JDK-8143378
val messages = listOf(
"Your computer took over a second to resolve localhost due an incorrect configuration. Corda will work but start very slowly until this is fixed. ",
"Please see https://docs.corda.net/troubleshooting.html#slow-localhost-resolution for information on how to fix this. ",
"It will only take a few seconds for you to resolve."
)
log.warn(messages.joinToString(""))
}
return hostName
}
private fun parseArguments(): Pair<ArgsParser, CmdLineOptions> {
val argsParser = ArgsParser()
val cmdlineOptions = try {
argsParser.parse(*args)
} catch (ex: OptionException) {
println("Invalid command line arguments: ${ex.message}")
argsParser.printHelp(System.out)
exitProcess(1)
}
return Pair(argsParser, cmdlineOptions)
}
fun initLogging(cmdlineOptions: CmdLineOptions) {
val loggingLevel = cmdlineOptions.loggingLevel.name.toLowerCase(Locale.ENGLISH)
System.setProperty("defaultLogLevel", loggingLevel) // These properties are referenced from the XML config file.
if (cmdlineOptions.logToConsole) {
System.setProperty("consoleLogLevel", loggingLevel)
}
System.setProperty("log-path", (cmdlineOptions.baseDirectory / LOGS_DIRECTORY_NAME).toString())
SLF4JBridgeHandler.removeHandlersForRootLogger() // The default j.u.l config adds a ConsoleHandler.
SLF4JBridgeHandler.install()
}
fun startBridge(conf: BridgeConfiguration, versionInfo: BridgeVersionInfo, startTime: Long): BridgeInstance {
val bridge = BridgeInstance(conf, versionInfo)
bridge.start()
val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0
log.info("Bridge started up and registered in $elapsed sec")
return bridge
}
}

View File

@ -0,0 +1,45 @@
package net.corda.bridge.services.api
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
import rx.Observable
import java.security.KeyStore
/**
* This service when activated via [provisionKeysAndActivate] installs an AMQP listening socket,
* which listens on the port specified in the [BridgeConfiguration.inboundConfig] section.
* The service technically runs inside the 'float' portion of the bridge, so that it can be run remotely inside the DMZ.
* As a result it reports as active, whilst not actually listening. Only when the TLS [KeyStore]s are passed to it
* does the service become [running].
*/
interface BridgeAMQPListenerService : ServiceLifecycleSupport {
/**
* Passes in the [KeyStore]s containing the TLS keys and certificates. This data is only to be held in memory
* and will be wiped on close.
*/
fun provisionKeysAndActivate(keyStoreBytes: ByteArray,
keyStorePassword: CharArray,
keyStorePrivateKeyPassword: CharArray,
trustStoreBytes: ByteArray,
trustStorePassword: CharArray)
/**
* Stop listening on the socket and cleanup any private data/keys.
*/
fun wipeKeysAndDeactivate()
/**
* If the service is [running] the AMQP listener is active.
*/
val running: Boolean
/**
* Incoming AMQP packets from remote peers are available on this [Observable].
*/
val onReceive: Observable<ReceivedMessage>
/**
* Any connection, disconnection, or authentication failure is available on this [Observable].
*/
val onConnection: Observable<ConnectionChange>
}

View File

@ -0,0 +1,14 @@
package net.corda.bridge.services.api
import net.corda.nodeapi.internal.ArtemisMessagingClient
/**
* This provides a service to manage connection to the local broker as defined in the [BridgeConfiguration.outboundConfig] section.
* Once started the service will repeatedly attempt to connect to the bus, signalling success by changing to the [active] state.
*/
interface BridgeArtemisConnectionService : ServiceLifecycleSupport {
/**
* When the service becomes [active] this will be non-null and provides access to Artemis management objects.
*/
val started: ArtemisMessagingClient.Started?
}

View File

@ -0,0 +1,17 @@
package net.corda.bridge.services.api
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import java.net.InetSocketAddress
/**
* This service provides centralised facilities for recording business critical events in the bridge.
* Currently the simple implementation just records events to log file, but future implementations may need to post
* security data to an enterprise service.
*/
interface BridgeAuditService : ServiceLifecycleSupport {
fun successfulConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress, certificateSubject: String, msg: String)
fun failedConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress?, certificateSubject: String?, msg: String)
fun packetDropEvent(packet: ReceivedMessage?, msg: String)
fun packetAcceptedEvent(packet: ReceivedMessage)
fun statusChangeEvent(msg: String)
}

View File

@ -0,0 +1,92 @@
package net.corda.bridge.services.api
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.config.SSLConfiguration
import java.nio.file.Path
enum class BridgeMode {
/**
* The Bridge/Float is run as a single process with both AMQP sending and receiving functionality.
*/
SenderReceiver,
/**
* Runs only the trusted bridge side of the system, which has direct TLS access to Artemis.
* The components handles all outgoing aspects of AMQP bridges directly.
* The inbound messages are initially received onto a different [FloatOuter] process and a
* separate AMQP tunnel is used to ship back the inbound data to this [FloatInner] process.
*/
FloatInner,
/**
* A minimal process designed to be run inside a DMZ, which acts an AMQP receiver of inbound peer messages.
* The component carries out basic validation of the TLS sources and AMQP packets, before forwarding to the [FloatInner].
* No keys are stored on disk for the component, but must instead be provisioned from the [FloatInner] using a
* separate AMQP link initiated from the [FloatInner] to the [FloatOuter].
*/
FloatOuter
}
/**
* Details of the local Artemis broker.
* Required in SenderReceiver and FloatInner modes.
*/
interface BridgeOutboundConfiguration {
val artemisBrokerAddress: NetworkHostAndPort
// Allows override of [KeyStore] details for the artemis connection, otherwise the general top level details are used.
val customSSLConfiguration: SSLConfiguration?
}
/**
* Details of the inbound socket binding address, which should be where external peers
* using the node's network map advertised data should route links and directly terminate their TLS connections.
* This configuration is required in SenderReceiver and FloatOuter modes.
*/
interface BridgeInboundConfiguration {
val listeningAddress: NetworkHostAndPort
// Allows override of [KeyStore] details for the AMQP listener port, otherwise the general top level details are used.
val customSSLConfiguration: SSLConfiguration?
}
/**
* Details of the target control ports of available [BridgeMode.FloatOuter] processes from the perspective of the [BridgeMode.FloatInner] process.
* Required for [BridgeMode.FloatInner] mode.
*/
interface FloatInnerConfiguration {
val floatAddresses: List<NetworkHostAndPort>
val expectedCertificateSubject: CordaX500Name
// Allows override of [KeyStore] details for the control port, otherwise the general top level details are used.
// Used for connection to Float in DMZ
val customSSLConfiguration: SSLConfiguration?
// The SSL keystores to provision into the Float in DMZ
val customFloatOuterSSLConfiguration: SSLConfiguration?
}
/**
* Details of the listening port for a [BridgeMode.FloatOuter] process and of the certificate that the [BridgeMode.FloatInner] should present.
* Required for [BridgeMode.FloatOuter] mode.
*/
interface FloatOuterConfiguration {
val floatAddress: NetworkHostAndPort
val expectedCertificateSubject: CordaX500Name
// Allows override of [KeyStore] details for the control port, otherwise the general top level details are used.
val customSSLConfiguration: SSLConfiguration?
}
interface BridgeConfiguration : NodeSSLConfiguration {
val bridgeMode: BridgeMode
val outboundConfig: BridgeOutboundConfiguration?
val inboundConfig: BridgeInboundConfiguration?
val floatInnerConfig: FloatInnerConfiguration?
val floatOuterConfig: FloatOuterConfiguration?
val haConfig: String?
val networkParametersPath: Path
val enableAMQPPacketTrace: Boolean
// Reconnect to artemis after [artemisReconnectionInterval] ms the default value is 5000 ms.
val artemisReconnectionInterval: Int
// The period to wait for clean shutdown of remote components
// e.g links to the Float Outer, or Artemis sessions, before the process continues shutting down anyway.
// Default value is 1000 ms.
val politeShutdownPeriod: Int
val whitelistedHeaders: List<String>
}

View File

@ -0,0 +1,11 @@
package net.corda.bridge.services.api
/**
* This service controls when a bridge may become active and start relaying messages to/from the artemis broker.
* The active flag is the used to gate dependent services, which should hold off connecting to the bus until this service
* has been able to become active.
*/
interface BridgeMasterService : ServiceLifecycleSupport {
// An echo of the active flag that can be used to make the intention of active status checks clearer.
val isMaster: Boolean get() = active
}

View File

@ -0,0 +1,10 @@
package net.corda.bridge.services.api
/**
* The [BridgeReceiverService] is the service responsible for joining together the perhaps remote [BridgeAMQPListenerService]
* and the outgoing [IncomingMessageFilterService] that provides the validation and filtering path into the local Artemis broker.
* It should not become active, or transmit messages until all of the dependencies are themselves active.
*/
interface BridgeReceiverService : ServiceLifecycleSupport {
}

View File

@ -0,0 +1,19 @@
package net.corda.bridge.services.api
import net.corda.core.identity.CordaX500Name
import net.corda.nodeapi.internal.bridging.BridgeControlListener
/**
* This service is responsible for the outgoing path of messages from the local Artemis broker
* to the remote peer using AMQP. It should not become active until the connection to the local Artemis broker is stable
* and the [BridgeMasterService] has allowed this bridge instance to become activated.
* In practice the actual AMQP bridging logic is carried out using an instance of the [BridgeControlListener] class with
* lifecycle support coming from the service.
*/
interface BridgeSenderService : ServiceLifecycleSupport {
/**
* This method is used to check inbound packets against the list of valid inbox addresses registered from the nodes
* via the local Bridge Control Protocol. They may optionally also check this against the source legal name.
*/
fun validateReceiveTopic(topic: String, sourceLegalName: CordaX500Name): Boolean
}

View File

@ -0,0 +1,9 @@
package net.corda.bridge.services.api
/**
* This is the top level service representing the [BridgeMode.FloatInner] service stack. The primary role of this component is to
* create and wire up concrete implementations of the relevant services according to the [BridgeConfiguration] details.
* The possibly proxied path to the [BridgeAMQPListenerService] is typically a constructor input
* as that is a [BridgeMode.FloatOuter] component.
*/
interface BridgeSupervisorService : ServiceLifecycleSupport

View File

@ -0,0 +1,7 @@
package net.corda.bridge.services.api
/**
* This service represent an AMQP socket listener that awaits a remote initiated connection from the [BridgeMode.FloatInner].
* Only one active connection is allowed at a time and it must match the configured requirements in the [BridgeConfiguration.floatInnerConfig].
*/
interface FloatControlService : ServiceLifecycleSupport

View File

@ -0,0 +1,10 @@
package net.corda.bridge.services.api
/**
* This is the top level service responsible for creating and managing the [BridgeMode.FloatOuter] portions of the bridge.
* It exposes a possibly proxied [BridgeAMQPListenerService] component that is used in the [BridgeSupervisorService]
* to wire up the internal portions of the AMQP peer inbound message path.
*/
interface FloatSupervisorService : ServiceLifecycleSupport {
val amqpListenerService: BridgeAMQPListenerService
}

View File

@ -0,0 +1,13 @@
package net.corda.bridge.services.api
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
/**
* This service is responsible for security checking the incoming packets to ensure they are for a legitimate node inbox and
* potentially for any other security related aspects. If the message is badly formed then it will be dropped and an audit event logged.
* Otherwise the message is forwarded to the appropriate node inbox on the local Artemis Broker.
* The service will not be active until the underlying [BridgeArtemisConnectionService] is active.
*/
interface IncomingMessageFilterService : ServiceLifecycleSupport {
fun sendMessageToLocalBroker(inboundMessage: ReceivedMessage)
}

View File

@ -0,0 +1,43 @@
package net.corda.bridge.services.api
import rx.Observable
/**
* Basic interface to represent the dynamic life cycles of services that may be running, but may have to await external dependencies,
* or for HA master state.
* Implementations of this should be implemented in a thread safe fashion.
*/
interface ServiceStateSupport {
/**
* Reads the current dynamic status of the service, which should only become true after the service has been started,
* any dynamic resources have been started/registered and any network connections have been completed.
* Failure to acquire a resource, or manual stop of the service, should return this to false.
*/
val active: Boolean
/**
* This Observer signals changes in the [active] variable, it should not be triggered for events that don't flip the [active] state.
*/
val activeChange: Observable<Boolean>
}
/**
* Simple interface for generic start/stop service lifecycle and the [active] flag indicating runtime ready state.
*/
interface ServiceLifecycleSupport : ServiceStateSupport, AutoCloseable {
/**
* Manual call to allow the service to start the process towards becoming active.
* Note wiring up service dependencies should happen in the constructor phase, unless this is to avoid a circular reference.
* Also, resources allocated as a result of start should be cleaned up as much as possible by stop.
* The [start] method should allow multiple reuse, assuming a [stop] call was made to clear the state.
*/
fun start()
/**
* Release the resources created by [start] and drops the [active] state to false.
*/
fun stop()
override fun close() = stop()
}

View File

@ -0,0 +1,171 @@
package net.corda.bridge.services.artemis
import net.corda.bridge.services.api.BridgeArtemisConnectionService
import net.corda.bridge.services.api.BridgeAuditService
import net.corda.bridge.services.api.BridgeConfiguration
import net.corda.bridge.services.api.ServiceStateSupport
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.internal.ThreadBox
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.config.SSLConfiguration
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.FailoverEventType
import org.apache.activemq.artemis.api.core.client.ServerLocator
import rx.Subscription
import java.util.concurrent.CountDownLatch
class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration,
val maxMessageSize: Int,
val auditService: BridgeAuditService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeArtemisConnectionService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
}
private class InnerState {
var running = false
var locator: ServerLocator? = null
var started: ArtemisMessagingClient.Started? = null
var connectThread: Thread? = null
}
private val state = ThreadBox(InnerState())
private val sslConfiguration: SSLConfiguration
private val statusFollower: ServiceStateCombiner
private var statusSubscriber: Subscription? = null
init {
statusFollower = ServiceStateCombiner(listOf(auditService))
sslConfiguration = conf.outboundConfig?.customSSLConfiguration ?: conf
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
if (it) {
startArtemisConnection()
} else {
stopArtemisConnection()
}
}
}
private fun startArtemisConnection() {
state.locked {
check(!running) { "start can't be called twice" }
running = true
log.info("Connecting to message broker: ${conf.outboundConfig!!.artemisBrokerAddress}")
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), conf.outboundConfig!!.artemisBrokerAddress, sslConfiguration)
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
// would be the default and the two lines below can be deleted.
connectionTTL = -1
clientFailureCheckPeriod = -1
minLargeMessageSize = maxMessageSize
isUseGlobalPools = nodeSerializationEnv != null
}
connectThread = Thread({ artemisReconnectionLoop() }, "Artemis Connector Thread").apply {
isDaemon = true
}
connectThread!!.start()
}
}
override fun stop() {
stopArtemisConnection()
statusSubscriber?.unsubscribe()
statusSubscriber = null
}
private fun stopArtemisConnection() {
stateHelper.active = false
val connectThread = state.locked {
if (running) {
log.info("Shutdown artemis")
running = false
started?.apply {
producer.close()
session.close()
sessionFactory.close()
}
started = null
locator?.close()
locator = null
val thread = connectThread
connectThread = null
thread
} else null
}
connectThread?.interrupt()
connectThread?.join(conf.politeShutdownPeriod.toLong())
}
override val started: ArtemisMessagingClient.Started?
get() = state.locked { started }
private fun artemisReconnectionLoop() {
while (state.locked { running }) {
val locator = state.locked { locator }
if (locator == null) {
break
}
try {
log.info("Try create session factory")
val newSessionFactory = locator.createSessionFactory()
log.info("Got session factory")
val latch = CountDownLatch(1)
newSessionFactory.connection.addCloseListener {
log.info("Connection close event")
latch.countDown()
}
newSessionFactory.addFailoverListener { evt: FailoverEventType ->
log.info("Session failover Event $evt")
if (evt == FailoverEventType.FAILOVER_FAILED) {
latch.countDown()
}
}
val newSession = newSessionFactory.createSession(ArtemisMessagingComponent.NODE_USER,
ArtemisMessagingComponent.NODE_USER,
false,
true,
true,
locator.isPreAcknowledge,
ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
newSession.start()
log.info("Session created")
val newProducer = newSession.createProducer()
state.locked {
started = ArtemisMessagingClient.Started(locator, newSessionFactory, newSession, newProducer)
}
stateHelper.active = true
latch.await()
stateHelper.active = false
state.locked {
started?.apply {
producer.close()
session.close()
sessionFactory.close()
}
started = null
}
log.info("Session closed")
} catch (ex: Exception) {
log.trace("Caught exception", ex)
}
try {
// Sleep for a short while before attempting reconnect
Thread.sleep(conf.artemisReconnectionInterval.toLong())
} catch (ex: InterruptedException) {
// ignore
}
}
log.info("Ended Artemis Connector Thread")
}
}

View File

@ -0,0 +1,45 @@
package net.corda.bridge.services.audit
import net.corda.bridge.services.api.BridgeAuditService
import net.corda.bridge.services.api.BridgeConfiguration
import net.corda.bridge.services.api.ServiceStateSupport
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.trace
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import java.net.InetSocketAddress
class LoggingBridgeAuditService(val conf: BridgeConfiguration,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeAuditService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
}
override fun start() {
stateHelper.active = true
}
override fun stop() {
stateHelper.active = false
}
override fun successfulConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress, certificateSubject: String, msg: String) {
log.info(msg)
}
override fun failedConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress?, certificateSubject: String?, msg: String) {
log.warn(msg)
}
override fun packetDropEvent(packet: ReceivedMessage?, msg: String) {
log.info(msg)
}
override fun packetAcceptedEvent(packet: ReceivedMessage) {
log.trace { "Packet received from ${packet.sourceLegalName} uuid: ${packet.applicationProperties["_AMQ_DUPL_ID"]}" }
}
override fun statusChangeEvent(msg: String) {
log.info(msg)
}
}

View File

@ -0,0 +1,48 @@
package net.corda.bridge.services.config
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigFactory.systemEnvironment
import com.typesafe.config.ConfigFactory.systemProperties
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRenderOptions
import net.corda.core.internal.div
import net.corda.nodeapi.internal.config.toProperties
import org.slf4j.LoggerFactory
import java.nio.file.Path
fun configOf(vararg pairs: Pair<String, Any?>): Config = ConfigFactory.parseMap(mapOf(*pairs))
operator fun Config.plus(overrides: Map<String, Any?>): Config = ConfigFactory.parseMap(overrides).withFallback(this)
object BridgeConfigHelper {
const val BRIDGE_PROPERTY_PREFIX = "bridge."
private val log = LoggerFactory.getLogger(javaClass)
fun loadConfig(baseDirectory: Path,
configFile: Path = baseDirectory / "bridge.conf",
allowMissingConfig: Boolean = false,
configOverrides: Config = ConfigFactory.empty()): Config {
val parseOptions = ConfigParseOptions.defaults()
val defaultConfig = ConfigFactory.parseResources("bridgedefault.conf", parseOptions.setAllowMissing(false))
val appConfig = ConfigFactory.parseFile(configFile.toFile(), parseOptions.setAllowMissing(allowMissingConfig))
val systemOverrides = systemProperties().bridgeEntriesOnly()
val environmentOverrides = systemEnvironment().bridgeEntriesOnly()
val finalConfig = configOverrides
// Add substitution values here
.withFallback(systemOverrides) //for database integration tests
.withFallback(environmentOverrides) //for database integration tests
.withFallback(configOf("baseDirectory" to baseDirectory.toString()))
.withFallback(appConfig)
.withFallback(defaultConfig)
.resolve()
log.info("Config:\n${finalConfig.root().render(ConfigRenderOptions.defaults())}")
return finalConfig
}
private fun Config.bridgeEntriesOnly(): Config {
return ConfigFactory.parseMap(toProperties().filterKeys { (it as String).startsWith(BRIDGE_PROPERTY_PREFIX) }.mapKeys { (it.key as String).removePrefix(BRIDGE_PROPERTY_PREFIX) })
}
}

View File

@ -0,0 +1,61 @@
package net.corda.bridge.services.config
import com.typesafe.config.Config
import net.corda.bridge.services.api.*
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.nodeapi.internal.config.parseAs
import java.nio.file.Path
fun Config.parseAsBridgeConfiguration(): BridgeConfiguration = parseAs<BridgeConfigurationImpl>()
data class CustomSSLConfiguration(override val keyStorePassword: String,
override val trustStorePassword: String,
override val certificatesDirectory: Path) : SSLConfiguration
data class BridgeOutboundConfigurationImpl(override val artemisBrokerAddress: NetworkHostAndPort,
override val customSSLConfiguration: CustomSSLConfiguration?) : BridgeOutboundConfiguration
data class BridgeInboundConfigurationImpl(override val listeningAddress: NetworkHostAndPort,
override val customSSLConfiguration: CustomSSLConfiguration?) : BridgeInboundConfiguration
data class FloatInnerConfigurationImpl(override val floatAddresses: List<NetworkHostAndPort>,
override val expectedCertificateSubject: CordaX500Name,
override val customSSLConfiguration: CustomSSLConfiguration?,
override val customFloatOuterSSLConfiguration: CustomSSLConfiguration?) : FloatInnerConfiguration
data class FloatOuterConfigurationImpl(override val floatAddress: NetworkHostAndPort,
override val expectedCertificateSubject: CordaX500Name,
override val customSSLConfiguration: CustomSSLConfiguration?) : FloatOuterConfiguration
data class BridgeConfigurationImpl(
override val baseDirectory: Path,
override val keyStorePassword: String,
override val trustStorePassword: String,
override val bridgeMode: BridgeMode,
override val networkParametersPath: Path,
override val outboundConfig: BridgeOutboundConfigurationImpl?,
override val inboundConfig: BridgeInboundConfigurationImpl?,
override val floatInnerConfig: FloatInnerConfigurationImpl?,
override val floatOuterConfig: FloatOuterConfigurationImpl?,
override val haConfig: String?,
override val enableAMQPPacketTrace: Boolean,
override val artemisReconnectionInterval: Int = 5000,
override val politeShutdownPeriod: Int = 1000,
override val whitelistedHeaders: List<String> = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.toList()
) : BridgeConfiguration {
init {
if (bridgeMode == BridgeMode.SenderReceiver) {
require(inboundConfig != null && outboundConfig != null) { "Missing required configuration" }
} else if (bridgeMode == BridgeMode.FloatInner) {
require(floatInnerConfig != null && outboundConfig != null) { "Missing required configuration" }
} else if (bridgeMode == BridgeMode.FloatOuter) {
require(inboundConfig != null && floatOuterConfig != null) { "Missing required configuration" }
}
}
}

View File

@ -0,0 +1,99 @@
package net.corda.bridge.services.filter
import net.corda.bridge.services.api.*
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import rx.Subscription
class SimpleMessageFilterService(val conf: BridgeConfiguration,
val auditService: BridgeAuditService,
val artemisConnectionService: BridgeArtemisConnectionService,
val bridgeSenderService: BridgeSenderService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : IncomingMessageFilterService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
}
private val statusFollower: ServiceStateCombiner
private var statusSubscriber: Subscription? = null
private val whiteListedAMQPHeaders: Set<String> = conf.whitelistedHeaders.toSet()
private var inboundSession: ClientSession? = null
private var inboundProducer: ClientProducer? = null
init {
statusFollower = ServiceStateCombiner(listOf(auditService, artemisConnectionService, bridgeSenderService))
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
if (it) {
inboundSession = artemisConnectionService.started!!.sessionFactory.createSession(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
inboundProducer = inboundSession!!.createProducer()
} else {
inboundProducer?.close()
inboundProducer = null
inboundSession?.close()
inboundSession = null
}
stateHelper.active = it
}
}
override fun stop() {
inboundProducer?.close()
inboundProducer = null
inboundSession?.close()
inboundSession = null
stateHelper.active = false
statusSubscriber?.unsubscribe()
statusSubscriber = null
}
private fun validateMessage(inboundMessage: ReceivedMessage) {
if (!active) {
throw IllegalStateException("Unable to forward message as Service Dependencies down")
}
val sourceLegalName = try {
CordaX500Name.parse(inboundMessage.sourceLegalName)
} catch (ex: IllegalArgumentException) {
throw SecurityException("Invalid Legal Name ${inboundMessage.sourceLegalName}")
}
require(inboundMessage.payload.size > 0) { "No valid payload" }
val validInboxTopic = bridgeSenderService.validateReceiveTopic(inboundMessage.topic, sourceLegalName)
require(validInboxTopic) { "Topic not a legitimate Inbox for a node on this Artemis Broker ${inboundMessage.topic}" }
require(inboundMessage.applicationProperties.keys.all { it!!.toString() in whiteListedAMQPHeaders }) { "Disallowed header present in ${inboundMessage.applicationProperties.keys.map { it.toString() }}" }
}
override fun sendMessageToLocalBroker(inboundMessage: ReceivedMessage) {
try {
validateMessage(inboundMessage)
val session = inboundSession
val producer = inboundProducer
if (session == null || producer == null) {
throw IllegalStateException("No artemis connection to forward message over")
}
val artemisMessage = session.createMessage(true)
for (key in whiteListedAMQPHeaders) {
if (inboundMessage.applicationProperties.containsKey(key)) {
artemisMessage.putObjectProperty(key, inboundMessage.applicationProperties[key])
}
}
artemisMessage.putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, SimpleString(inboundMessage.sourceLegalName))
artemisMessage.writeBodyBufferBytes(inboundMessage.payload)
producer.send(SimpleString(inboundMessage.topic), artemisMessage, { _ -> inboundMessage.complete(true) })
auditService.packetAcceptedEvent(inboundMessage)
} catch (ex: Exception) {
auditService.packetDropEvent(inboundMessage, "Packet Failed validation checks: " + ex.message)
inboundMessage.complete(false)
}
}
}

View File

@ -0,0 +1,27 @@
package net.corda.bridge.services.ha
import net.corda.bridge.services.api.BridgeAuditService
import net.corda.bridge.services.api.BridgeConfiguration
import net.corda.bridge.services.api.BridgeMasterService
import net.corda.bridge.services.api.ServiceStateSupport
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.utilities.contextLogger
class SingleInstanceMasterService(val conf: BridgeConfiguration,
val auditService: BridgeAuditService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeMasterService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
}
override fun start() {
auditService.statusChangeEvent("Single instance master going active immediately.")
stateHelper.active = true
}
override fun stop() {
auditService.statusChangeEvent("Single instance master stopping")
stateHelper.active = false
}
}

View File

@ -0,0 +1,128 @@
package net.corda.bridge.services.receiver
import net.corda.bridge.services.api.BridgeAMQPListenerService
import net.corda.bridge.services.api.BridgeAuditService
import net.corda.bridge.services.api.BridgeConfiguration
import net.corda.bridge.services.api.ServiceStateSupport
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.crypto.KEYSTORE_TYPE
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
import org.slf4j.LoggerFactory
import rx.Observable
import rx.Subscription
import rx.subjects.PublishSubject
import java.io.ByteArrayInputStream
import java.security.KeyStore
import java.util.*
class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
val auditService: BridgeAuditService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeAMQPListenerService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
val consoleLogger = LoggerFactory.getLogger("BasicInfo")
}
private val statusFollower: ServiceStateCombiner
private var statusSubscriber: Subscription? = null
private var amqpServer: AMQPServer? = null
private var keyStorePrivateKeyPassword: CharArray? = null
private var onConnectSubscription: Subscription? = null
private var onConnectAuditSubscription: Subscription? = null
private var onReceiveSubscription: Subscription? = null
init {
statusFollower = ServiceStateCombiner(listOf(auditService))
}
override fun provisionKeysAndActivate(keyStoreBytes: ByteArray,
keyStorePassword: CharArray,
keyStorePrivateKeyPassword: CharArray,
trustStoreBytes: ByteArray,
trustStorePassword: CharArray) {
require(active) { "AuditService must be active" }
require(keyStorePassword !== keyStorePrivateKeyPassword) { "keyStorePassword and keyStorePrivateKeyPassword must reference distinct arrays!" }
val keyStore = loadKeyStoreAndWipeKeys(keyStoreBytes, keyStorePassword)
val trustStore = loadKeyStoreAndWipeKeys(trustStoreBytes, trustStorePassword)
val bindAddress = conf.inboundConfig!!.listeningAddress
val server = AMQPServer(bindAddress.host, bindAddress.port, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, conf.enableAMQPPacketTrace)
onConnectSubscription = server.onConnection.subscribe(_onConnection)
onConnectAuditSubscription = server.onConnection.subscribe {
if (it.connected) {
auditService.successfulConnectionEvent(true, it.remoteAddress, it.remoteCert?.subjectDN?.name
?: "", "Successful AMQP inbound connection")
} else {
auditService.failedConnectionEvent(true, it.remoteAddress, it.remoteCert?.subjectDN?.name
?: "", "Failed AMQP inbound connection")
}
}
onReceiveSubscription = server.onReceive.subscribe(_onReceive)
amqpServer = server
server.start()
val msg = "Now listening for incoming connections on $bindAddress"
auditService.statusChangeEvent(msg)
consoleLogger.info(msg)
}
private fun loadKeyStoreAndWipeKeys(keyStoreBytes: ByteArray, keyStorePassword: CharArray): KeyStore {
val keyStore = KeyStore.getInstance(KEYSTORE_TYPE)
ByteArrayInputStream(keyStoreBytes).use {
keyStore.load(it, keyStorePassword)
}
// We overwrite the keys we don't need anymore
Arrays.fill(keyStoreBytes, 0xAA.toByte())
Arrays.fill(keyStorePassword, 0xAA55.toChar())
return keyStore
}
override fun wipeKeysAndDeactivate() {
onReceiveSubscription?.unsubscribe()
onReceiveSubscription = null
onConnectSubscription?.unsubscribe()
onConnectSubscription = null
onConnectAuditSubscription?.unsubscribe()
onConnectAuditSubscription = null
if (running) {
val msg = "AMQP Listener shutting down"
auditService.statusChangeEvent(msg)
consoleLogger.info(msg)
}
amqpServer?.close()
amqpServer = null
if (keyStorePrivateKeyPassword != null) {
// Wipe the old password
Arrays.fill(keyStorePrivateKeyPassword, 0xAA55.toChar())
keyStorePrivateKeyPassword = null
}
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
stateHelper.active = it
}
}
override fun stop() {
stateHelper.active = false
wipeKeysAndDeactivate()
statusSubscriber?.unsubscribe()
statusSubscriber = null
}
override val running: Boolean
get() = amqpServer?.listening ?: false
private val _onReceive = PublishSubject.create<ReceivedMessage>().toSerialized()
override val onReceive: Observable<ReceivedMessage>
get() = _onReceive
private val _onConnection = PublishSubject.create<ConnectionChange>().toSerialized()
override val onConnection: Observable<ConnectionChange>
get() = _onConnection
}

View File

@ -0,0 +1,229 @@
package net.corda.bridge.services.receiver
import net.corda.bridge.services.api.*
import net.corda.bridge.services.receiver.FloatControlTopics.FLOAT_DATA_TOPIC
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.identity.CordaX500Name
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
import rx.Subscription
import java.security.KeyStore
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
class FloatControlListenerService(val conf: BridgeConfiguration,
val auditService: BridgeAuditService,
val amqpListener: BridgeAMQPListenerService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : FloatControlService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
}
private val lock = ReentrantLock()
private val statusFollower: ServiceStateCombiner
private var statusSubscriber: Subscription? = null
private var incomingMessageSubscriber: Subscription? = null
private var connectSubscriber: Subscription? = null
private var receiveSubscriber: Subscription? = null
private var amqpControlServer: AMQPServer? = null
private val sslConfiguration: SSLConfiguration
private val keyStore: KeyStore
private val keyStorePrivateKeyPassword: String
private val trustStore: KeyStore
private val floatControlAddress = conf.floatOuterConfig!!.floatAddress
private val floatClientName = conf.floatOuterConfig!!.expectedCertificateSubject
private var activeConnectionInfo: ConnectionChange? = null
private var forwardAddress: NetworkHostAndPort? = null
private var forwardLegalName: String? = null
init {
statusFollower = ServiceStateCombiner(listOf(auditService, amqpListener))
sslConfiguration = conf.floatOuterConfig?.customSSLConfiguration ?: conf
keyStore = sslConfiguration.loadSslKeyStore().internal
keyStorePrivateKeyPassword = sslConfiguration.keyStorePassword
trustStore = sslConfiguration.loadTrustStore().internal
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
if (it) {
startControlListener()
} else {
stopControlListener()
}
stateHelper.active = it
}
incomingMessageSubscriber = amqpListener.onReceive.subscribe {
forwardReceivedMessage(it)
}
}
private fun startControlListener() {
lock.withLock {
val controlServer = AMQPServer(floatControlAddress.host, floatControlAddress.port, null, null, keyStore, keyStorePrivateKeyPassword, trustStore, conf.enableAMQPPacketTrace)
connectSubscriber = controlServer.onConnection.subscribe { onConnectToControl(it) }
receiveSubscriber = controlServer.onReceive.subscribe { onControlMessage(it) }
amqpControlServer = controlServer
controlServer.start()
}
}
override fun stop() {
lock.withLock {
stateHelper.active = false
stopControlListener()
statusSubscriber?.unsubscribe()
statusSubscriber = null
}
}
private fun stopControlListener() {
lock.withLock {
if (amqpListener.running) {
amqpListener.wipeKeysAndDeactivate()
}
connectSubscriber?.unsubscribe()
connectSubscriber = null
amqpControlServer?.stop()
receiveSubscriber?.unsubscribe()
receiveSubscriber = null
amqpControlServer = null
activeConnectionInfo = null
forwardAddress = null
forwardLegalName = null
incomingMessageSubscriber?.unsubscribe()
incomingMessageSubscriber = null
}
}
private fun onConnectToControl(connectionChange: ConnectionChange) {
auditService.statusChangeEvent("Connection change on float control port $connectionChange")
lock.withLock {
val currentConnection = activeConnectionInfo
if (currentConnection != null) {
// If there is a new valid TLS connection kill old connection.
// Else if this event signals loss of current connection wipe the keys
if (connectionChange.connected || (currentConnection.remoteAddress == connectionChange.remoteAddress)) {
if (amqpListener.running) {
amqpListener.wipeKeysAndDeactivate()
}
amqpControlServer?.dropConnection(currentConnection.remoteAddress)
activeConnectionInfo = null
forwardAddress = null
forwardLegalName = null
}
}
if (connectionChange.connected) {
if (connectionChange.remoteCert != null) {
val certificateSubject = CordaX500Name.parse(connectionChange.remoteCert!!.subjectDN.toString())
if (certificateSubject == floatClientName) {
activeConnectionInfo = connectionChange
} else {
amqpControlServer?.dropConnection(connectionChange.remoteAddress)
}
} else {
amqpControlServer?.dropConnection(connectionChange.remoteAddress)
}
}
}
}
private fun onControlMessage(receivedMessage: ReceivedMessage) {
if (!receivedMessage.checkTunnelControlTopic()) {
auditService.packetDropEvent(receivedMessage, "Invalid control topic packet received on topic ${receivedMessage.topic}!!")
receivedMessage.complete(false)
return
}
val controlMessage = try {
if (CordaX500Name.parse(receivedMessage.sourceLegalName) != floatClientName) {
auditService.packetDropEvent(receivedMessage, "Invalid control source legal name!!")
receivedMessage.complete(false)
return
}
receivedMessage.payload.deserialize<TunnelControlMessage>()
} catch (ex: Exception) {
receivedMessage.complete(false)
return
}
lock.withLock {
when (controlMessage) {
is ActivateFloat -> {
log.info("Received Tunnel Activate message")
amqpListener.provisionKeysAndActivate(controlMessage.keyStoreBytes,
controlMessage.keyStorePassword,
controlMessage.keyStorePrivateKeyPassword,
controlMessage.trustStoreBytes,
controlMessage.trustStorePassword)
forwardAddress = receivedMessage.destinationLink
forwardLegalName = receivedMessage.destinationLegalName
}
is DeactivateFloat -> {
log.info("Received Tunnel Deactivate message")
if (amqpListener.running) {
amqpListener.wipeKeysAndDeactivate()
}
forwardAddress = null
forwardLegalName = null
}
}
}
receivedMessage.complete(true)
}
private fun forwardReceivedMessage(message: ReceivedMessage) {
val amqpControl = lock.withLock {
if (amqpControlServer == null ||
activeConnectionInfo == null ||
forwardLegalName == null ||
forwardAddress == null ||
!stateHelper.active) {
null
} else {
amqpControlServer
}
}
if (amqpControl == null) {
message.complete(false)
return
}
if (!message.topic.startsWith(P2P_PREFIX)) {
auditService.packetDropEvent(message, "Message topic is not a valid peer namespace ${message.topic}")
message.complete(false)
return
}
val appProperties = message.applicationProperties.map { Pair(it.key!!.toString(), it.value) }.toList()
try {
val wrappedMessage = FloatDataPacket(message.topic,
appProperties,
message.payload,
CordaX500Name.parse(message.sourceLegalName),
message.sourceLink,
CordaX500Name.parse(message.destinationLegalName),
message.destinationLink)
val amqpForwardMessage = amqpControl.createMessage(wrappedMessage.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes,
FLOAT_DATA_TOPIC,
forwardLegalName!!,
forwardAddress!!,
emptyMap())
amqpForwardMessage.onComplete.then { message.complete(it.get() == MessageStatus.Acknowledged) }
amqpControl.write(amqpForwardMessage)
} catch (ex: Exception) {
log.error("Failed to forward message", ex)
message.complete(false)
}
}
}

View File

@ -0,0 +1,64 @@
package net.corda.bridge.services.receiver
import net.corda.bridge.services.api.*
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.internal.readAll
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import rx.Subscription
class InProcessBridgeReceiverService(val conf: BridgeConfiguration,
val auditService: BridgeAuditService,
haService: BridgeMasterService,
val amqpListenerService: BridgeAMQPListenerService,
val filterService: IncomingMessageFilterService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeReceiverService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
}
private val statusFollower: ServiceStateCombiner
private var statusSubscriber: Subscription? = null
private var receiveSubscriber: Subscription? = null
private val sslConfiguration: SSLConfiguration
init {
statusFollower = ServiceStateCombiner(listOf(auditService, haService, amqpListenerService, filterService))
sslConfiguration = conf.inboundConfig?.customSSLConfiguration ?: conf
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
if (it) {
val keyStoreBytes = sslConfiguration.sslKeystore.readAll()
val trustStoreBytes = sslConfiguration.trustStoreFile.readAll()
amqpListenerService.provisionKeysAndActivate(keyStoreBytes,
sslConfiguration.keyStorePassword.toCharArray(),
sslConfiguration.keyStorePassword.toCharArray(),
trustStoreBytes,
sslConfiguration.trustStorePassword.toCharArray())
}
stateHelper.active = it
}
receiveSubscriber = amqpListenerService.onReceive.subscribe {
processMessage(it)
}
}
private fun processMessage(receivedMessage: ReceivedMessage) {
filterService.sendMessageToLocalBroker(receivedMessage)
}
override fun stop() {
stateHelper.active = false
if (amqpListenerService.running) {
amqpListenerService.wipeKeysAndDeactivate()
}
receiveSubscriber?.unsubscribe()
receiveSubscriber = null
statusSubscriber?.unsubscribe()
statusSubscriber = null
}
}

View File

@ -0,0 +1,37 @@
package net.corda.bridge.services.receiver
import net.corda.bridge.services.receiver.FloatControlTopics.FLOAT_CONTROL_TOPIC
import net.corda.bridge.services.receiver.FloatControlTopics.FLOAT_DATA_TOPIC
import net.corda.core.identity.CordaX500Name
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
@CordaSerializable
sealed class TunnelControlMessage
object FloatControlTopics {
const val FLOAT_CONTROL_TOPIC = "float.control"
const val FLOAT_DATA_TOPIC = "float.forward"
}
internal class ActivateFloat(val keyStoreBytes: ByteArray,
val keyStorePassword: CharArray,
val keyStorePrivateKeyPassword: CharArray,
val trustStoreBytes: ByteArray,
val trustStorePassword: CharArray) : TunnelControlMessage()
class DeactivateFloat : TunnelControlMessage()
fun ReceivedMessage.checkTunnelControlTopic() = (topic == FLOAT_CONTROL_TOPIC)
@CordaSerializable
internal class FloatDataPacket(val topic: String,
val originalHeaders: List<Pair<String, Any?>>,
val originalPayload: ByteArray,
val sourceLegalName: CordaX500Name,
val sourceLink: NetworkHostAndPort,
val destinationLegalName: CordaX500Name,
val destinationLink: NetworkHostAndPort)
fun ReceivedMessage.checkTunnelDataTopic() = (topic == FLOAT_DATA_TOPIC)

View File

@ -0,0 +1,193 @@
package net.corda.bridge.services.receiver
import net.corda.bridge.services.api.*
import net.corda.bridge.services.receiver.FloatControlTopics.FLOAT_CONTROL_TOPIC
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.crypto.newSecureRandom
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.readAll
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
import rx.Subscription
import java.io.ByteArrayOutputStream
import java.security.KeyStore
import java.security.SecureRandom
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
val auditService: BridgeAuditService,
haService: BridgeMasterService,
val filterService: IncomingMessageFilterService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeReceiverService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
}
private val statusFollower: ServiceStateCombiner
private var statusSubscriber: Subscription? = null
private var connectSubscriber: Subscription? = null
private var receiveSubscriber: Subscription? = null
private var amqpControlClient: AMQPClient? = null
private val controlLinkSSLConfiguration: SSLConfiguration
private val floatListenerSSLConfiguration: SSLConfiguration
private val controlLinkKeyStore: KeyStore
private val controLinkKeyStorePrivateKeyPassword: String
private val controlLinkTrustStore: KeyStore
private val expectedCertificateSubject: CordaX500Name
private val secureRandom: SecureRandom = newSecureRandom()
init {
statusFollower = ServiceStateCombiner(listOf(auditService, haService, filterService))
controlLinkSSLConfiguration = conf.floatInnerConfig?.customSSLConfiguration ?: conf
floatListenerSSLConfiguration = conf.floatInnerConfig?.customFloatOuterSSLConfiguration ?: conf
controlLinkKeyStore = controlLinkSSLConfiguration.loadSslKeyStore().internal
controLinkKeyStorePrivateKeyPassword = controlLinkSSLConfiguration.keyStorePassword
controlLinkTrustStore = controlLinkSSLConfiguration.loadTrustStore().internal
expectedCertificateSubject = conf.floatInnerConfig!!.expectedCertificateSubject
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
if (it) {
val floatAddresses = conf.floatInnerConfig!!.floatAddresses
val controlClient = AMQPClient(floatAddresses, setOf(expectedCertificateSubject), null, null, controlLinkKeyStore, controLinkKeyStorePrivateKeyPassword, controlLinkTrustStore, conf.enableAMQPPacketTrace)
connectSubscriber = controlClient.onConnection.subscribe { onConnectToControl(it) }
receiveSubscriber = controlClient.onReceive.subscribe { onFloatMessage(it) }
amqpControlClient = controlClient
controlClient.start()
} else {
stateHelper.active = false
closeAMQPClient()
}
}
}
private fun closeAMQPClient() {
connectSubscriber?.unsubscribe()
connectSubscriber = null
receiveSubscriber?.unsubscribe()
receiveSubscriber = null
amqpControlClient?.apply {
val deactivateMessage = DeactivateFloat()
val amqpDeactivateMessage = amqpControlClient!!.createMessage(deactivateMessage.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes,
FLOAT_CONTROL_TOPIC,
expectedCertificateSubject.toString(),
emptyMap())
try {
amqpControlClient!!.write(amqpDeactivateMessage)
} catch (ex: IllegalStateException) {
// ignore if channel is already closed
}
try {
// Await acknowledgement of the deactivate message, but don't block our shutdown forever.
amqpDeactivateMessage.onComplete.get(conf.politeShutdownPeriod.toLong(), TimeUnit.MILLISECONDS)
} catch (ex: TimeoutException) {
// Ignore
}
stop()
}
amqpControlClient = null
}
override fun stop() {
stateHelper.active = false
closeAMQPClient()
statusSubscriber?.unsubscribe()
statusSubscriber = null
}
private fun onConnectToControl(connectionChange: ConnectionChange) {
auditService.statusChangeEvent("Connection change on float control port $connectionChange")
if (connectionChange.connected) {
val (freshKeyStorePassword, freshKeyStoreKeyPassword, recodedKeyStore) = recodeKeyStore(floatListenerSSLConfiguration)
val trustStoreBytes = floatListenerSSLConfiguration.trustStoreFile.readAll()
val activateMessage = ActivateFloat(recodedKeyStore,
freshKeyStorePassword,
freshKeyStoreKeyPassword,
trustStoreBytes,
floatListenerSSLConfiguration.trustStorePassword.toCharArray())
val amqpActivateMessage = amqpControlClient!!.createMessage(activateMessage.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes,
FLOAT_CONTROL_TOPIC,
expectedCertificateSubject.toString(),
emptyMap())
try {
amqpControlClient!!.write(amqpActivateMessage)
} catch (ex: IllegalStateException) {
stateHelper.active = false // lost the channel
return
}
amqpActivateMessage.onComplete.then {
stateHelper.active = (it.get() == MessageStatus.Acknowledged)
//TODO Retry?
}
} else {
stateHelper.active = false
}
}
// Recode KeyStore to use a fresh random password for entries and overall
private fun recodeKeyStore(sslConfiguration: SSLConfiguration): Triple<CharArray, CharArray, ByteArray> {
val keyStoreOriginal = sslConfiguration.loadSslKeyStore().internal
val originalKeyStorePassword = sslConfiguration.keyStorePassword.toCharArray()
val freshKeyStorePassword = CharArray(20) { secureRandom.nextInt(0xD800).toChar() } // Stick to single character Unicode range
val freshPrivateKeyPassword = CharArray(20) { secureRandom.nextInt(0xD800).toChar() } // Stick to single character Unicode range
for (alias in keyStoreOriginal.aliases()) {
if (keyStoreOriginal.isKeyEntry(alias)) {
// Recode key entries to new password
val privateKey = keyStoreOriginal.getKey(alias, originalKeyStorePassword)
val certs = keyStoreOriginal.getCertificateChain(alias)
keyStoreOriginal.setKeyEntry(alias, privateKey, freshPrivateKeyPassword, certs)
}
}
// Serialize re-keyed KeyStore to ByteArray
val recodedKeyStore = ByteArrayOutputStream().use {
keyStoreOriginal.store(it, freshKeyStorePassword)
it
}.toByteArray()
return Triple(freshKeyStorePassword, freshPrivateKeyPassword, recodedKeyStore)
}
private fun onFloatMessage(receivedMessage: ReceivedMessage) {
if (!receivedMessage.checkTunnelDataTopic()) {
auditService.packetDropEvent(receivedMessage, "Invalid float inbound topic received ${receivedMessage.topic}!!")
receivedMessage.complete(false)
return
}
val innerMessage = try {
receivedMessage.payload.deserialize<FloatDataPacket>()
} catch (ex: Exception) {
auditService.packetDropEvent(receivedMessage, "Unable to decode Float Control message")
receivedMessage.complete(false)
return
}
log.info("Received $innerMessage")
val onwardMessage = object : ReceivedMessage {
override val topic: String = innerMessage.topic
override val applicationProperties: Map<Any?, Any?> = innerMessage.originalHeaders.toMap()
override val payload: ByteArray = innerMessage.originalPayload
override val sourceLegalName: String = innerMessage.sourceLegalName.toString()
override val sourceLink: NetworkHostAndPort = receivedMessage.sourceLink
override fun complete(accepted: Boolean) {
receivedMessage.complete(accepted)
}
override val destinationLegalName: String = innerMessage.destinationLegalName.toString()
override val destinationLink: NetworkHostAndPort = innerMessage.destinationLink
}
filterService.sendMessageToLocalBroker(onwardMessage)
}
}

View File

@ -0,0 +1,68 @@
package net.corda.bridge.services.sender
import net.corda.bridge.services.api.*
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.bridging.BridgeControlListener
import rx.Subscription
class DirectBridgeSenderService(val conf: BridgeConfiguration,
val auditService: BridgeAuditService,
val haService: BridgeMasterService,
val artemisConnectionService: BridgeArtemisConnectionService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeSenderService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
}
private val statusFollower: ServiceStateCombiner
private var statusSubscriber: Subscription? = null
private var connectionSubscriber: Subscription? = null
private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf, { ForwardingArtemisMessageClient(artemisConnectionService) })
init {
statusFollower = ServiceStateCombiner(listOf(auditService, artemisConnectionService, haService))
}
private class ForwardingArtemisMessageClient(val artemisConnectionService: BridgeArtemisConnectionService) : ArtemisSessionProvider {
override fun start(): ArtemisMessagingClient.Started {
// We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService
return artemisConnectionService.started!!
}
override fun stop() {
// We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService
}
override val started: ArtemisMessagingClient.Started?
get() = artemisConnectionService.started
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe { ready ->
if (ready) {
bridgeControlListener.start()
stateHelper.active = true
} else {
stateHelper.active = false
bridgeControlListener.stop()
}
}
}
override fun stop() {
stateHelper.active = false
bridgeControlListener.stop()
connectionSubscriber?.unsubscribe()
connectionSubscriber = null
statusSubscriber?.unsubscribe()
statusSubscriber = null
}
override fun validateReceiveTopic(topic: String, sourceLegalName: CordaX500Name): Boolean = bridgeControlListener.validateReceiveTopic(topic)
}

View File

@ -0,0 +1,76 @@
package net.corda.bridge.services.supervisors
import net.corda.bridge.services.api.*
import net.corda.bridge.services.artemis.BridgeArtemisConnectionServiceImpl
import net.corda.bridge.services.filter.SimpleMessageFilterService
import net.corda.bridge.services.ha.SingleInstanceMasterService
import net.corda.bridge.services.receiver.InProcessBridgeReceiverService
import net.corda.bridge.services.receiver.TunnelingBridgeReceiverService
import net.corda.bridge.services.sender.DirectBridgeSenderService
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.utilities.contextLogger
import org.slf4j.LoggerFactory
import rx.Subscription
class BridgeSupervisorServiceImpl(val conf: BridgeConfiguration,
maxMessageSize: Int,
val auditService: BridgeAuditService,
inProcessAMQPListenerService: BridgeAMQPListenerService?,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeSupervisorService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
val consoleLogger = LoggerFactory.getLogger("BasicInfo")
}
private val haService: BridgeMasterService
private val artemisService: BridgeArtemisConnectionServiceImpl
private val senderService: BridgeSenderService
private val receiverService: BridgeReceiverService
private val filterService: IncomingMessageFilterService
private val statusFollower: ServiceStateCombiner
private var statusSubscriber: Subscription? = null
init {
if (conf.haConfig.isNullOrEmpty()) {
haService = SingleInstanceMasterService(conf, auditService)
} else {
TODO()
}
artemisService = BridgeArtemisConnectionServiceImpl(conf, maxMessageSize, auditService)
senderService = DirectBridgeSenderService(conf, auditService, haService, artemisService)
filterService = SimpleMessageFilterService(conf, auditService, artemisService, senderService)
receiverService = if (conf.bridgeMode == BridgeMode.SenderReceiver) {
InProcessBridgeReceiverService(conf, auditService, haService, inProcessAMQPListenerService!!, filterService)
} else {
require(inProcessAMQPListenerService == null) { "Should not have an in process instance of the AMQPListenerService" }
TunnelingBridgeReceiverService(conf, auditService, haService, filterService)
}
statusFollower = ServiceStateCombiner(listOf(haService, senderService, receiverService, filterService))
activeChange.subscribe {
consoleLogger.info("BridgeSupervisorService: active = $it")
}
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
stateHelper.active = it
}
artemisService.start()
senderService.start()
receiverService.start()
filterService.start()
haService.start()
}
override fun stop() {
stateHelper.active = false
haService.stop()
senderService.stop()
receiverService.stop()
filterService.stop()
artemisService.stop()
statusSubscriber?.unsubscribe()
statusSubscriber = null
}
}

View File

@ -0,0 +1,54 @@
package net.corda.bridge.services.supervisors
import net.corda.bridge.services.api.*
import net.corda.bridge.services.receiver.BridgeAMQPListenerServiceImpl
import net.corda.bridge.services.receiver.FloatControlListenerService
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.utilities.contextLogger
import org.slf4j.LoggerFactory
import rx.Subscription
class FloatSupervisorServiceImpl(val conf: BridgeConfiguration,
val maxMessageSize: Int,
val auditService: BridgeAuditService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : FloatSupervisorService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
val consoleLogger = LoggerFactory.getLogger("BasicInfo")
}
override val amqpListenerService: BridgeAMQPListenerService
private val floatControlService: FloatControlService?
private val statusFollower: ServiceStateCombiner
private var statusSubscriber: Subscription? = null
init {
amqpListenerService = BridgeAMQPListenerServiceImpl(conf, auditService)
floatControlService = if (conf.bridgeMode == BridgeMode.FloatOuter) {
FloatControlListenerService(conf, auditService, amqpListenerService)
} else {
null
}
statusFollower = ServiceStateCombiner(listOf(amqpListenerService, floatControlService).filterNotNull())
activeChange.subscribe {
consoleLogger.info("FloatSupervisorService: active = $it")
}
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe {
stateHelper.active = it
}
amqpListenerService.start()
floatControlService?.start()
}
override fun stop() {
stateHelper.active = false
floatControlService?.stop()
amqpListenerService.stop()
statusSubscriber?.unsubscribe()
statusSubscriber = null
}
}

View File

@ -0,0 +1,44 @@
package net.corda.bridge.services.util
import net.corda.bridge.services.api.ServiceStateSupport
import org.slf4j.Logger
import rx.Observable
import rx.subjects.BehaviorSubject
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
/**
* Simple implementation of [ServiceStateSupport] service domino logic using RxObservables.
*/
class ServiceStateHelper(val log: Logger) : ServiceStateSupport {
val lock = ReentrantLock()
private var _active: Boolean = false
override var active: Boolean
get() = lock.withLock { _active }
set(value) {
lock.withLock {
if (value != _active) {
_active = value
log.info("Status change to $value")
_activeChange.onNext(value)
}
}
}
private val _activeChange: BehaviorSubject<Boolean> = BehaviorSubject.create<Boolean>(false)
private val _threadSafeObservable: Observable<Boolean> = _activeChange.serialize().distinctUntilChanged()
override val activeChange: Observable<Boolean>
get() = _threadSafeObservable
}
/**
* Simple implementation of [ServiceStateSupport] where it only reports [active] true when a set of dependencies are all [active] true.
*/
class ServiceStateCombiner(val services: List<ServiceStateSupport>) : ServiceStateSupport {
override val active: Boolean
get() = services.all { it.active }
private val _activeChange = Observable.combineLatest(services.map { it.activeChange }, { x -> x.all { y -> y as Boolean } }).serialize().distinctUntilChanged()
override val activeChange: Observable<Boolean>
get() = _activeChange
}

View File

@ -0,0 +1,5 @@
keyStorePassword = "cordacadevpass"
trustStorePassword = "trustpass"
enableAMQPPacketTrace = false
artemisReconnectionInterval = 5000
politeShutdownPeriod = 1000

View File

@ -0,0 +1,94 @@
package net.corda.bridge
import net.corda.bridge.services.api.BridgeConfiguration
import net.corda.core.crypto.Crypto.generateKeyPair
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.createDirectories
import net.corda.core.internal.exists
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.AttachmentId
import net.corda.nodeapi.internal.*
import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.nodeapi.internal.crypto.*
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.TestIdentity
import org.mockito.Mockito
import org.mockito.Mockito.CALLS_REAL_METHODS
import org.mockito.Mockito.withSettings
import java.net.Socket
import java.nio.file.Files
import java.nio.file.Path
import java.security.cert.X509Certificate
import java.time.Instant
fun createNetworkParams(baseDirectory: Path) {
val dummyNotaryParty = TestIdentity(DUMMY_NOTARY_NAME)
val notaryInfo = NotaryInfo(dummyNotaryParty.party, false)
val copier = NetworkParametersCopier(NetworkParameters(
minimumPlatformVersion = 1,
notaries = listOf(notaryInfo),
modifiedTime = Instant.now(),
maxMessageSize = 10485760,
maxTransactionSize = 40000,
epoch = 1,
whitelistedContractImplementations = emptyMap<String, List<AttachmentId>>()
), overwriteFile = true)
copier.install(baseDirectory)
}
fun createAndLoadConfigFromResource(baseDirectory: Path, configResource: String): BridgeConfiguration {
val workspaceFolder = baseDirectory.normalize().toAbsolutePath()
val args = arrayOf("--base-directory", workspaceFolder.toString())
val argsParser = ArgsParser()
val cmdlineOptions = argsParser.parse(*args)
val configFile = cmdlineOptions.configFile
configFile.normalize().parent?.createDirectories()
ConfigTest::class.java.getResourceAsStream(configResource).use {
Files.copy(it, configFile)
}
val config = cmdlineOptions.loadConfig()
return config
}
fun SSLConfiguration.createBridgeKeyStores(legalName: CordaX500Name,
rootCert: X509Certificate = DEV_ROOT_CA.certificate,
intermediateCa: CertificateAndKeyPair = DEV_INTERMEDIATE_CA) {
certificatesDirectory.createDirectories()
if (!trustStoreFile.exists()) {
loadKeyStore(javaClass.classLoader.getResourceAsStream("certificates/${DEV_CA_TRUST_STORE_FILE}"), DEV_CA_TRUST_STORE_PASS).save(trustStoreFile, trustStorePassword)
}
val (nodeCaCert, nodeCaKeyPair) = createDevNodeCa(intermediateCa, legalName)
val sslKeyStore = loadSslKeyStore(createNew = true)
sslKeyStore.update {
val tlsKeyPair = generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
val tlsCert = X509Utilities.createCertificate(CertificateType.TLS, nodeCaCert, nodeCaKeyPair, legalName.x500Principal, tlsKeyPair.public)
setPrivateKey(
X509Utilities.CORDA_CLIENT_TLS,
tlsKeyPair.private,
listOf(tlsCert, nodeCaCert, intermediateCa.certificate, rootCert))
}
}
fun serverListening(host: String, port: Int): Boolean {
var s: Socket? = null
try {
s = Socket(host, port)
return true
} catch (e: Exception) {
return false
} finally {
try {
s?.close()
} catch (e: Exception) {
}
}
}
inline fun <reified T> createPartialMock() = Mockito.mock(T::class.java, withSettings().useConstructor().defaultAnswer(CALLS_REAL_METHODS))

View File

@ -0,0 +1,56 @@
package net.corda.bridge
import net.corda.bridge.services.api.BridgeMode
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.testing.core.SerializationEnvironmentRule
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
class ConfigTest {
@Rule
@JvmField
val tempFolder = TemporaryFolder()
@Rule
@JvmField
val serializationEnvironment = SerializationEnvironmentRule()
@Test
fun `Load simple config`() {
val configResource = "/net/corda/bridge/singleprocess/bridge.conf"
val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
assertEquals(BridgeMode.SenderReceiver, config.bridgeMode)
assertEquals(NetworkHostAndPort("localhost", 11005), config.outboundConfig!!.artemisBrokerAddress)
assertEquals(NetworkHostAndPort("0.0.0.0", 10005), config.inboundConfig!!.listeningAddress)
assertNull(config.floatInnerConfig)
assertNull(config.floatOuterConfig)
}
@Test
fun `Load simple bridge config`() {
val configResource = "/net/corda/bridge/withfloat/bridge/bridge.conf"
val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
assertEquals(BridgeMode.FloatInner, config.bridgeMode)
assertEquals(NetworkHostAndPort("localhost", 11005), config.outboundConfig!!.artemisBrokerAddress)
assertNull(config.inboundConfig)
assertEquals(listOf(NetworkHostAndPort("localhost", 12005)), config.floatInnerConfig!!.floatAddresses)
assertEquals(CordaX500Name.parse("O=Bank A, L=London, C=GB"), config.floatInnerConfig!!.expectedCertificateSubject)
assertNull(config.floatOuterConfig)
}
@Test
fun `Load simple float config`() {
val configResource = "/net/corda/bridge/withfloat/float/bridge.conf"
val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
assertEquals(BridgeMode.FloatOuter, config.bridgeMode)
assertNull(config.outboundConfig)
assertEquals(NetworkHostAndPort("0.0.0.0", 10005), config.inboundConfig!!.listeningAddress)
assertNull(config.floatInnerConfig)
assertEquals(NetworkHostAndPort("localhost", 12005), config.floatOuterConfig!!.floatAddress)
assertEquals(CordaX500Name.parse("O=Bank A, L=London, C=GB"), config.floatOuterConfig!!.expectedCertificateSubject)
}
}

View File

@ -0,0 +1,227 @@
package net.corda.bridge
import net.corda.bridge.services.api.ServiceStateSupport
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.utilities.contextLogger
import org.junit.Assert.assertEquals
import org.junit.Test
import rx.Observable
class ServiceStateTest {
interface ServiceA : ServiceStateSupport {
fun changeStatus(status: Boolean)
}
class ServiceAImpl(private val stateSupport: ServiceStateHelper = ServiceStateHelper(log)) : ServiceA, ServiceStateSupport by stateSupport {
companion object {
val log = contextLogger()
}
override fun changeStatus(status: Boolean) {
stateSupport.active = status
}
}
interface ServiceB : ServiceStateSupport {
fun changeStatus(status: Boolean)
}
class ServiceBImpl(private val stateSupport: ServiceStateHelper = ServiceStateHelper(log)) : ServiceB, ServiceStateSupport by stateSupport {
companion object {
val log = contextLogger()
}
override fun changeStatus(status: Boolean) {
stateSupport.active = status
}
}
interface ServiceC : ServiceStateSupport {
}
class ServiceCImpl(servA: ServiceA, servB: ServiceB) : ServiceC {
private val combiner = ServiceStateCombiner(listOf(servA, servB))
override val active: Boolean
get() = combiner.active
override val activeChange: Observable<Boolean>
get() = combiner.activeChange
}
@Test
fun `Test state helper`() {
val servA = ServiceAImpl()
var upA = 0
var downA = 0
val subsA = servA.activeChange.subscribe {
if (it) ++upA else ++downA
}
assertEquals(0, upA)
assertEquals(1, downA)
servA.changeStatus(true)
assertEquals(true, servA.active)
assertEquals(1, upA)
assertEquals(1, downA)
servA.changeStatus(true)
assertEquals(true, servA.active)
assertEquals(1, upA)
assertEquals(1, downA)
servA.changeStatus(false)
assertEquals(false, servA.active)
assertEquals(1, upA)
assertEquals(2, downA)
servA.changeStatus(false)
assertEquals(false, servA.active)
assertEquals(1, upA)
assertEquals(2, downA)
// Should stop alerting, but keep functioning after unsubscribe
subsA.unsubscribe()
servA.changeStatus(true)
assertEquals(true, servA.active)
assertEquals(1, upA)
assertEquals(2, downA)
servA.changeStatus(false)
assertEquals(false, servA.active)
assertEquals(1, upA)
assertEquals(2, downA)
}
@Test
fun `Test basic domino behaviour of combiner`() {
val servA = ServiceAImpl()
val servB = ServiceBImpl()
val servC = ServiceCImpl(servA, servB)
var upA = 0
var downA = 0
var upB = 0
var downB = 0
var upC = 0
var downC = 0
val subsA = servA.activeChange.subscribe {
if (it) ++upA else ++downA
}
val subsB = servB.activeChange.subscribe {
if (it) ++upB else ++downB
}
val subsC = servC.activeChange.subscribe {
if (it) ++upC else ++downC
}
// Get one automatic down event at subscribe
assertEquals(false, servA.active)
assertEquals(false, servB.active)
assertEquals(false, servC.active)
assertEquals(0, upA)
assertEquals(1, downA)
assertEquals(0, upB)
assertEquals(1, downB)
assertEquals(0, upC)
assertEquals(1, downC)
// Rest of sequence should only signal on change and C should come up if A.active && B.active else it is false
servA.changeStatus(true)
assertEquals(true, servA.active)
assertEquals(false, servB.active)
assertEquals(false, servC.active)
assertEquals(1, upA)
assertEquals(1, downA)
assertEquals(0, upB)
assertEquals(1, downB)
assertEquals(0, upC)
assertEquals(1, downC)
servB.changeStatus(false)
assertEquals(true, servA.active)
assertEquals(false, servB.active)
assertEquals(false, servC.active)
assertEquals(1, upA)
assertEquals(1, downA)
assertEquals(0, upB)
assertEquals(1, downB)
assertEquals(0, upC)
assertEquals(1, downC)
servB.changeStatus(true)
assertEquals(true, servA.active)
assertEquals(true, servB.active)
assertEquals(true, servC.active)
assertEquals(1, upA)
assertEquals(1, downA)
assertEquals(1, upB)
assertEquals(1, downB)
assertEquals(1, upC)
assertEquals(1, downC)
servA.changeStatus(false)
assertEquals(false, servA.active)
assertEquals(true, servB.active)
assertEquals(false, servC.active)
assertEquals(1, upA)
assertEquals(2, downA)
assertEquals(1, upB)
assertEquals(1, downB)
assertEquals(1, upC)
assertEquals(2, downC)
servB.changeStatus(false)
assertEquals(false, servA.active)
assertEquals(false, servB.active)
assertEquals(false, servC.active)
assertEquals(1, upA)
assertEquals(2, downA)
assertEquals(1, upB)
assertEquals(2, downB)
assertEquals(1, upC)
assertEquals(2, downC)
servB.changeStatus(true)
assertEquals(false, servA.active)
assertEquals(true, servB.active)
assertEquals(false, servC.active)
assertEquals(1, upA)
assertEquals(2, downA)
assertEquals(2, upB)
assertEquals(2, downB)
assertEquals(1, upC)
assertEquals(2, downC)
servA.changeStatus(true)
assertEquals(true, servA.active)
assertEquals(true, servB.active)
assertEquals(true, servC.active)
assertEquals(2, upA)
assertEquals(2, downA)
assertEquals(2, upB)
assertEquals(2, downB)
assertEquals(2, upC)
assertEquals(2, downC)
subsC.unsubscribe()
subsA.unsubscribe()
subsB.unsubscribe()
servA.changeStatus(false)
assertEquals(false, servA.active)
assertEquals(true, servB.active)
assertEquals(false, servC.active)
assertEquals(2, upA)
assertEquals(2, downA)
assertEquals(2, upB)
assertEquals(2, downB)
assertEquals(2, upC)
assertEquals(2, downC)
}
}

View File

@ -0,0 +1,215 @@
package net.corda.bridge.services
import com.nhaarman.mockito_kotlin.*
import net.corda.bridge.createPartialMock
import net.corda.bridge.services.api.BridgeArtemisConnectionService
import net.corda.bridge.services.api.BridgeConfiguration
import net.corda.bridge.services.api.BridgeSenderService
import net.corda.bridge.services.filter.SimpleMessageFilterService
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.internal.rigorousMock
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.junit.Test
import org.mockito.ArgumentMatchers
import kotlin.test.assertEquals
class FilterServiceTest {
private abstract class TestBridgeArtemisConnectionService : BridgeArtemisConnectionService, TestServiceBase()
private abstract class TestBridgeSenderService : BridgeSenderService, TestServiceBase()
companion object {
private val inboxTopic = "${P2P_PREFIX}test"
}
@Test
fun `Basic function tests`() {
val conf = rigorousMock<BridgeConfiguration>().also {
doReturn(ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.toList()).whenever(it).whitelistedHeaders
}
val auditService = TestAuditService()
val dummyMessage = rigorousMock<ClientMessage>().also {
doReturn(it).whenever(it).putStringProperty(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())
doReturn(it).whenever(it).putStringProperty(ArgumentMatchers.any<SimpleString>(), ArgumentMatchers.any<SimpleString>())
doReturn(it).whenever(it).writeBodyBufferBytes(any())
}
val dummyProducer = rigorousMock<ClientProducer>().also {
doNothing().whenever(it).send(any(), eq(dummyMessage), any())
doNothing().whenever(it).close()
}
val dummySession = rigorousMock<ClientSession>().also {
doReturn(dummyMessage).whenever(it).createMessage(true)
doReturn(dummyProducer).whenever(it).createProducer()
doNothing().whenever(it).close()
}
val artemisStarted = ArtemisMessagingClient.Started(
rigorousMock(),
rigorousMock<ClientSessionFactory>().also {
doReturn(dummySession).whenever(it).createSession(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
},
rigorousMock(),
rigorousMock()
)
val artemisService = createPartialMock<TestBridgeArtemisConnectionService>().also {
doReturn(artemisStarted).whenever(it).started
}
val senderService = createPartialMock<TestBridgeSenderService>().also {
doReturn(true).whenever(it).validateReceiveTopic(ArgumentMatchers.anyString(), any())
}
val filterService = SimpleMessageFilterService(conf, auditService, artemisService, senderService)
val stateFollower = filterService.activeChange.toBlocking().iterator
val auditFollower = auditService.onAuditEvent.toBlocking().iterator
filterService.start()
// Not ready so packet dropped
val fakeMessage = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(false) // NAK'd
}
filterService.sendMessageToLocalBroker(fakeMessage)
assertEquals(TestAuditService.AuditEvent.PACKET_DROP, auditFollower.next()) // Dropped as not ready
assertEquals(false, stateFollower.next())
assertEquals(false, filterService.active)
verify(dummyProducer, times(0)).send(ArgumentMatchers.any(), eq(dummyMessage), ArgumentMatchers.any()) // not sent
auditService.start()
assertEquals(false, filterService.active)
artemisService.start()
assertEquals(false, filterService.active)
senderService.start()
assertEquals(true, stateFollower.next())
assertEquals(true, filterService.active)
// ready so packet forwarded
val goodMessage = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(true) // ACK was called
doReturn(DUMMY_BANK_B_NAME.toString()).whenever(it).sourceLegalName
doReturn(inboxTopic).whenever(it).topic
doReturn(ByteArray(1)).whenever(it).payload
doReturn(emptyMap<Any?, Any?>()).whenever(it).applicationProperties
}
filterService.sendMessageToLocalBroker(goodMessage)
assertEquals(TestAuditService.AuditEvent.PACKET_ACCEPT, auditFollower.next()) // Accepted the message
verify(dummyProducer, times(1)).send(ArgumentMatchers.any(), eq(dummyMessage), ArgumentMatchers.any()) // message forwarded
filterService.stop()
assertEquals(false, stateFollower.next())
assertEquals(false, filterService.active)
}
@Test
fun `Rejection tests`() {
val conf = rigorousMock<BridgeConfiguration>().also {
doReturn(ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.toList()).whenever(it).whitelistedHeaders
}
val auditService = TestAuditService()
val dummyMessage = rigorousMock<ClientMessage>().also {
doReturn(it).whenever(it).putStringProperty(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())
doReturn(it).whenever(it).putStringProperty(ArgumentMatchers.any<SimpleString>(), ArgumentMatchers.any<SimpleString>())
doReturn(it).whenever(it).writeBodyBufferBytes(any())
}
val dummyProducer = rigorousMock<ClientProducer>().also {
doNothing().whenever(it).send(any(), eq(dummyMessage), any())
doNothing().whenever(it).close()
}
val dummySession = rigorousMock<ClientSession>().also {
doReturn(dummyMessage).whenever(it).createMessage(true)
doReturn(dummyProducer).whenever(it).createProducer()
doNothing().whenever(it).close()
}
val artemisStarted = ArtemisMessagingClient.Started(
rigorousMock(),
rigorousMock<ClientSessionFactory>().also {
doReturn(dummySession).whenever(it).createSession(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
},
rigorousMock(),
rigorousMock()
)
val artemisService = createPartialMock<TestBridgeArtemisConnectionService>().also {
doReturn(artemisStarted).whenever(it).started
}
val senderService = createPartialMock<TestBridgeSenderService>().also {
doAnswer {
val topic = it.arguments[0] as String
(topic == inboxTopic)
}.whenever(it).validateReceiveTopic(ArgumentMatchers.anyString(), any())
}
val filterService = SimpleMessageFilterService(conf, auditService, artemisService, senderService)
val auditFollower = auditService.onAuditEvent.toBlocking().iterator
auditService.start()
artemisService.start()
senderService.start()
filterService.start()
assertEquals(true, filterService.active)
// empty legal name
val badMessage1 = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(false) // NAK was called
doReturn("").whenever(it).sourceLegalName
doReturn(inboxTopic).whenever(it).topic
doReturn(ByteArray(1)).whenever(it).payload
doReturn(emptyMap<Any?, Any?>()).whenever(it).applicationProperties
}
filterService.sendMessageToLocalBroker(badMessage1)
assertEquals(TestAuditService.AuditEvent.PACKET_DROP, auditFollower.next())
verify(dummyProducer, times(0)).send(ArgumentMatchers.any(), eq(dummyMessage), ArgumentMatchers.any()) // not sent
// bad legal name
val badMessage2 = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(false) // NAK was called
doReturn("CN=Test").whenever(it).sourceLegalName
doReturn(inboxTopic).whenever(it).topic
doReturn(ByteArray(1)).whenever(it).payload
doReturn(emptyMap<Any?, Any?>()).whenever(it).applicationProperties
}
filterService.sendMessageToLocalBroker(badMessage2)
assertEquals(TestAuditService.AuditEvent.PACKET_DROP, auditFollower.next())
verify(dummyProducer, times(0)).send(ArgumentMatchers.any(), eq(dummyMessage), ArgumentMatchers.any()) // not sent
// empty payload
val badMessage3 = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(false) // NAK was called
doReturn(DUMMY_BANK_B_NAME.toString()).whenever(it).sourceLegalName
doReturn(inboxTopic).whenever(it).topic
doReturn(ByteArray(0)).whenever(it).payload
doReturn(emptyMap<Any?, Any?>()).whenever(it).applicationProperties
}
filterService.sendMessageToLocalBroker(badMessage3)
assertEquals(TestAuditService.AuditEvent.PACKET_DROP, auditFollower.next())
verify(dummyProducer, times(0)).send(ArgumentMatchers.any(), eq(dummyMessage), ArgumentMatchers.any()) // not sent
// bad topic
val badMessage4 = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(false) // NAK was called
doReturn(DUMMY_BANK_B_NAME.toString()).whenever(it).sourceLegalName
doReturn("bridge.control").whenever(it).topic
doReturn(ByteArray(1)).whenever(it).payload
doReturn(emptyMap<Any?, Any?>()).whenever(it).applicationProperties
}
filterService.sendMessageToLocalBroker(badMessage4)
assertEquals(TestAuditService.AuditEvent.PACKET_DROP, auditFollower.next())
verify(dummyProducer, times(0)).send(ArgumentMatchers.any(), eq(dummyMessage), ArgumentMatchers.any()) // not sent
// Non-whitelist header header
val badMessage5 = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(false) // NAK was called
doReturn(DUMMY_BANK_B_NAME.toString()).whenever(it).sourceLegalName
doReturn(inboxTopic).whenever(it).topic
doReturn(ByteArray(1)).whenever(it).payload
doReturn(mapOf<Any?, Any?>("Suspicious" to "Header")).whenever(it).applicationProperties
}
filterService.sendMessageToLocalBroker(badMessage5)
assertEquals(TestAuditService.AuditEvent.PACKET_DROP, auditFollower.next())
verify(dummyProducer, times(0)).send(ArgumentMatchers.any(), eq(dummyMessage), ArgumentMatchers.any()) // not sent
// Valid message sent and completed
val goodMessage = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(true) // ACK was called
doReturn(DUMMY_BANK_B_NAME.toString()).whenever(it).sourceLegalName
doReturn(inboxTopic).whenever(it).topic
doReturn(ByteArray(1)).whenever(it).payload
doReturn(emptyMap<Any?, Any?>()).whenever(it).applicationProperties
}
filterService.sendMessageToLocalBroker(goodMessage)
assertEquals(TestAuditService.AuditEvent.PACKET_ACCEPT, auditFollower.next()) // packet was accepted
verify(dummyProducer, times(1)).send(ArgumentMatchers.any(), eq(dummyMessage), ArgumentMatchers.any()) // not sent
filterService.stop()
}
}

View File

@ -0,0 +1,49 @@
package net.corda.bridge.services
import net.corda.bridge.services.api.BridgeAuditService
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import rx.Observable
import rx.subjects.PublishSubject
import java.net.InetSocketAddress
class TestAuditService() : BridgeAuditService, TestServiceBase() {
enum class AuditEvent {
SUCCESSFUL_CONNECTION,
FAILED_CONNECTION,
PACKET_DROP,
PACKET_ACCEPT,
STATUS_CHANGE
}
var eventCount: Int = 0
private set
private val _onAuditEvent = PublishSubject.create<AuditEvent>().toSerialized()
val onAuditEvent: Observable<AuditEvent>
get() = _onAuditEvent
override fun successfulConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress, certificateSubject: String, msg: String) {
++eventCount
_onAuditEvent.onNext(AuditEvent.SUCCESSFUL_CONNECTION)
}
override fun failedConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress?, certificateSubject: String?, msg: String) {
++eventCount
_onAuditEvent.onNext(AuditEvent.FAILED_CONNECTION)
}
override fun packetDropEvent(packet: ReceivedMessage?, msg: String) {
++eventCount
_onAuditEvent.onNext(AuditEvent.PACKET_DROP)
}
override fun packetAcceptedEvent(packet: ReceivedMessage) {
++eventCount
_onAuditEvent.onNext(AuditEvent.PACKET_ACCEPT)
}
override fun statusChangeEvent(msg: String) {
++eventCount
_onAuditEvent.onNext(AuditEvent.STATUS_CHANGE)
}
}

View File

@ -0,0 +1,24 @@
package net.corda.bridge.services
import net.corda.bridge.services.api.ServiceLifecycleSupport
import net.corda.bridge.services.util.ServiceStateHelper
import org.slf4j.helpers.NOPLogger
import rx.Observable
open class TestServiceBase() : ServiceLifecycleSupport {
private val stateHelper: ServiceStateHelper = ServiceStateHelper(NOPLogger.NOP_LOGGER)
override val active: Boolean
get() = stateHelper.active
override val activeChange: Observable<Boolean>
get() = stateHelper.activeChange
override fun start() {
stateHelper.active = true
}
override fun stop() {
stateHelper.active = false
}
}

View File

@ -0,0 +1,8 @@
bridgeMode = SenderReceiver
outboundConfig : {
artemisBrokerAddress = "localhost:11005"
}
inboundConfig : {
listeningAddress = "0.0.0.0:10005"
}
networkParametersPath = network-parameters

View File

@ -0,0 +1,9 @@
bridgeMode = FloatInner
outboundConfig : {
artemisBrokerAddress = "localhost:11005"
}
floatInnerConfig : {
floatAddresses = [ "localhost:12005" ]
expectedCertificateSubject = "O=Bank A, L=London, C=GB"
}
networkParametersPath = network-parameters

View File

@ -0,0 +1,9 @@
bridgeMode = FloatOuter
inboundConfig : {
listeningAddress = "0.0.0.0:10005"
}
floatOuterConfig : {
floatAddress = "localhost:12005"
expectedCertificateSubject = "O=Bank A, L=London, C=GB"
}
networkParametersPath = network-parameters

View File

@ -33,7 +33,7 @@ class ArtemisMessagingClient(
private val autoCommitSends: Boolean = true,
private val autoCommitAcks: Boolean = true,
private val confirmationWindowSize: Int = -1
): ArtemisSessionProvider {
) : ArtemisSessionProvider {
companion object {
private val log = loggerFor<ArtemisMessagingClient>()
}

View File

@ -59,8 +59,6 @@ class AMQPBridgeTest {
private abstract class AbstractNodeConfiguration : NodeConfiguration
// TODO: revisit upon Matthew Nesbitt return
@Ignore()
@Test
fun `test acked and nacked messages`() {
// Create local queue

View File

@ -178,9 +178,11 @@ open class Node(configuration: NodeConfiguration,
} else {
startLocalRpcBroker(networkParameters)
}
val advertisedAddress = info.addresses[0]
bridgeControlListener = BridgeControlListener(configuration, serverAddress, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE)
val advertisedAddress = info.addresses.single()
val externalBridge = configuration.enterpriseConfiguration.externalBridge
if (externalBridge == null || !externalBridge) {
bridgeControlListener = BridgeControlListener(configuration, serverAddress, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE)
}
printBasicNodeInfo("Advertised P2P messaging addresses", info.addresses.joinToString())
val rpcServerConfiguration = RPCServerConfiguration.default.copy(
@ -205,6 +207,7 @@ open class Node(configuration: NodeConfiguration,
database,
services.networkMapCache,
services.monitoringService.metrics,
info.legalIdentities[0].name.toString(),
advertisedAddress,
/*networkParameters.maxMessageSize*/MAX_FILE_SIZE,
nodeProperties.flowsDrainingMode::isEnabled,

View File

@ -16,8 +16,8 @@ import net.corda.node.services.statemachine.transitions.StateMachineConfiguratio
data class EnterpriseConfiguration(
val mutualExclusionConfiguration: MutualExclusionConfiguration,
val useMultiThreadedSMM: Boolean = true,
val tuning: PerformanceTuning = PerformanceTuning.default
)
val tuning: PerformanceTuning = PerformanceTuning.default,
val externalBridge: Boolean? = null)
data class MutualExclusionConfiguration(val on: Boolean = false, val machineName: String, val updateInterval: Long, val waitInterval: Long)

View File

@ -48,7 +48,6 @@ interface NodeConfiguration : NodeSSLConfiguration {
val verifierType: VerifierType
val messageRedeliveryDelaySeconds: Int
val notary: NotaryConfig?
val activeMQServer: ActiveMqServerConfiguration
val additionalNodeInfoPollingFrequencyMsec: Long
val p2pAddress: NetworkHostAndPort
val rpcOptions: NodeRpcOptions
@ -139,12 +138,6 @@ data class BFTSMaRtConfiguration(
}
}
data class BridgeConfiguration(val retryIntervalMs: Long,
val maxRetryIntervalMin: Long,
val retryIntervalMultiplier: Double)
data class ActiveMqServerConfiguration(val bridge: BridgeConfiguration)
fun Config.parseAsNodeConfiguration(): NodeConfiguration = parseAs<NodeConfigurationImpl>()
data class NodeConfigurationImpl(
@ -178,7 +171,6 @@ data class NodeConfigurationImpl(
override val devModeOptions: DevModeOptions? = null,
override val useTestClock: Boolean = false,
override val detectPublicIp: Boolean = true,
override val activeMQServer: ActiveMqServerConfiguration,
// TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(),
override val sshd: SSHDConfiguration? = null,

View File

@ -35,7 +35,6 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
import net.corda.nodeapi.internal.requireOnDefaultFileSystem
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl

View File

@ -49,7 +49,8 @@ class MessagingExecutor(
val resolver: AddressToArtemisQueueResolver,
metricRegistry: MetricRegistry,
val ourSenderUUID: String,
queueBound: Int
queueBound: Int,
val myLegalName: String
) {
private sealed class Job {
data class Acknowledge(val message: ClientMessage) : Job()
@ -164,6 +165,7 @@ class MessagingExecutor(
putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion)
putIntProperty(P2PMessagingHeaders.platformVersionProperty, versionInfo.platformVersion)
putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString(message.topic))
putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, SimpleString(myLegalName))
sendMessageSizeMetric.update(message.data.bytes.size)
writeBodyBufferBytes(message.data.bytes)
// Use the magic deduplication property built into Artemis as our message identity too

View File

@ -106,6 +106,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val database: CordaPersistence,
private val networkMap: NetworkMapCacheInternal,
private val metricRegistry: MetricRegistry,
val legalName: String,
advertisedAddress: NetworkHostAndPort = serverAddress,
private val maxMessageSize: Int,
private val isDrainingModeOn: () -> Boolean,
@ -170,6 +171,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong()
private val state = ThreadBox(InnerState())
private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
private val externalBridge: Boolean = config.enterpriseConfiguration.externalBridge ?: false
private val handlers = ConcurrentHashMap<String, MessageHandler>()
@ -238,7 +240,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
this@P2PMessagingClient,
metricRegistry,
queueBound = config.enterpriseConfiguration.tuning.maximumMessagingBatchSize,
ourSenderUUID = deduplicator.ourSenderUUID
ourSenderUUID = deduplicator.ourSenderUUID,
myLegalName = legalName
)
this@P2PMessagingClient.messagingExecutor = messagingExecutor
messagingExecutor.start()
@ -384,7 +387,12 @@ class P2PMessagingClient(val config: NodeConfiguration,
private fun artemisToCordaMessage(message: ClientMessage): ReceivedMessage? {
try {
val topic = message.required(P2PMessagingHeaders.topicProperty) { getStringProperty(it) }
val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" }
val user = requireNotNull(if (externalBridge) {
message.getStringProperty(P2PMessagingHeaders.bridgedCertificateSubject) ?: message.getStringProperty(HDR_VALIDATED_USER)
} else {
message.getStringProperty(HDR_VALIDATED_USER)
}) { "Message is not authenticated" }
val platformVersion = message.required(P2PMessagingHeaders.platformVersionProperty) { getIntProperty(it) }
// Use the magic deduplication property built into Artemis as our message identity too
val uniqueMessageId = message.required(HDR_DUPLICATE_DETECTION_ID) { DeduplicationId(message.getStringProperty(it)) }

View File

@ -16,13 +16,6 @@ devMode = true
h2port = 0
useTestClock = false
verifierType = InMemory
activeMQServer = {
bridge = {
retryIntervalMs = 5000
retryIntervalMultiplier = 1.5
maxRetryIntervalMin = 3
}
}
enterpriseConfiguration = {
mutualExclusionConfiguration = {
on = false

View File

@ -118,7 +118,6 @@ class NodeConfigurationImplTest {
certificateChainCheckPolicies = emptyList(),
devMode = true,
noLocalShell = false,
activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)),
rpcSettings = rpcSettings,
relay = null,
enterpriseConfiguration = EnterpriseConfiguration((MutualExclusionConfiguration(false, "", 20000, 40000)))

View File

@ -216,10 +216,10 @@ class ArtemisMessagingTest {
// Now change the receiver
try {
val messagingClient2 = createMessagingClient()
messagingClient2.addMessageHandler(TOPIC) { message, _, handle ->
messagingClient2.addMessageHandler(TOPIC) { msg, _, handle ->
database.transaction { handle.persistDeduplicationId() }
handle.acknowledge() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
receivedMessages.add(message)
receivedMessages.add(msg)
}
startNodeMessagingClient()
@ -251,10 +251,10 @@ class ArtemisMessagingTest {
fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
val messagingClient3 = createMessagingClient()
messagingClient3.addMessageHandler(TOPIC) { message, _, handle ->
messagingClient3.addMessageHandler(TOPIC) { msg, _, handle ->
database.transaction { handle.persistDeduplicationId() }
handle.acknowledge() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
receivedMessages.add(message)
receivedMessages.add(msg)
}
startNodeMessagingClient()
@ -305,6 +305,7 @@ class ArtemisMessagingTest {
database,
networkMapCache,
MetricRegistry(),
ALICE_NAME.toString(),
maxMessageSize = maxMessageSize,
isDrainingModeOn = { false },
drainingModeWasChangedEvents = PublishSubject.create()).apply {

View File

@ -19,6 +19,8 @@ include 'docs'
include 'node-api'
include 'node'
include 'node:capsule'
include 'bridge'
include 'bridge:bridgecapsule'
include 'client:jackson'
include 'client:jfx'
include 'client:mock'