Node verifies the peer it connects to by checking its TLS common name

This commit is contained in:
Shams Asari
2016-12-22 14:48:27 +00:00
parent 32e1c291d1
commit 08e391579c
41 changed files with 939 additions and 1063 deletions

View File

@ -1,11 +1,14 @@
package net.corda.node.services
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.DummyContract
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionType
import net.corda.core.crypto.Party
import net.corda.core.div
import net.corda.core.flatMap
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.flows.NotaryError
@ -30,8 +33,7 @@ class RaftNotaryServiceTests : NodeBasedTest() {
@Test
fun `detect double spend`() {
val masterNode = createNotaryCluster()
val alice = startNode("Alice")
val (masterNode, alice) = Futures.allAsList(createNotaryCluster(), startNode("Alice")).getOrThrow()
val notaryParty = alice.netMapCache.getNotary(notaryName)!!
@ -60,7 +62,7 @@ class RaftNotaryServiceTests : NodeBasedTest() {
assertEquals(error.tx, stx.tx)
}
private fun createNotaryCluster(): Node {
private fun createNotaryCluster(): ListenableFuture<Node> {
val notaryService = ServiceInfo(RaftValidatingNotaryService.type, notaryName)
val notaryAddresses = getFreeLocalPorts("localhost", clusterSize).map { it.toString() }
ServiceIdentityGenerator.generateToDisk(
@ -73,16 +75,16 @@ class RaftNotaryServiceTests : NodeBasedTest() {
advertisedServices = setOf(notaryService),
configOverrides = mapOf("notaryNodeAddress" to notaryAddresses[0]))
for (i in 1 until clusterSize) {
val remainingNodes = (1 until clusterSize).map {
startNode(
"Notary$i",
"Notary$it",
advertisedServices = setOf(notaryService),
configOverrides = mapOf(
"notaryNodeAddress" to notaryAddresses[i],
"notaryNodeAddress" to notaryAddresses[it],
"notaryClusterAddresses" to listOf(notaryAddresses[0])))
}
return masterNode
return Futures.allAsList(remainingNodes).flatMap { masterNode }
}
private fun issueState(node: AbstractNode, notary: Party, notaryKey: KeyPair): StateAndRef<*> {

View File

@ -0,0 +1,50 @@
package net.corda.services.messaging
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
import net.corda.testing.messaging.SimpleMQClient
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Test
/**
* Runs the security tests with the attacker pretending to be a node on the network.
*/
class MQSecurityAsNodeTest : MQSecurityTest() {
override fun startAttacker(attacker: SimpleMQClient) {
attacker.start(PEER_USER, PEER_USER) // Login as a peer
}
@Test
fun `send message to RPC requests address`() {
assertSendAttackFails(RPC_REQUESTS_QUEUE)
}
@Test
fun `only the node running the broker can login using the special node user`() {
val attacker = clientTo(alice.configuration.artemisAddress)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.start(NODE_USER, NODE_USER)
}
}
@Test
fun `login as the default cluster user`() {
val attacker = clientTo(alice.configuration.artemisAddress)
assertThatExceptionOfType(ActiveMQClusterSecurityException::class.java).isThrownBy {
attacker.start(ActiveMQDefaultConfiguration.getDefaultClusterUser(), ActiveMQDefaultConfiguration.getDefaultClusterPassword())
}
}
@Test
fun `login without a username and password`() {
val attacker = clientTo(alice.configuration.artemisAddress)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.start()
}
}
}

View File

@ -6,7 +6,7 @@ import net.corda.testing.messaging.SimpleMQClient
/**
* Runs the security tests with the attacker being a valid RPC user of Alice.
*/
class RPCSecurityTest : MQSecurityTest() {
class MQSecurityAsRPCTest : MQSecurityTest() {
override val extraRPCUsers = listOf(User("evil", "pass", permissions = emptySet()))
override fun startAttacker(attacker: SimpleMQClient) {

View File

@ -12,8 +12,11 @@ import net.corda.core.random63BitValue
import net.corda.core.seconds
import net.corda.node.internal.Node
import net.corda.node.services.User
import net.corda.node.services.config.NodeSSLConfiguration
import net.corda.node.services.config.configureTestSSL
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NETWORK_MAP_ADDRESS
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NETWORK_MAP_QUEUE
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEERS_PREFIX
@ -44,8 +47,8 @@ abstract class MQSecurityTest : NodeBasedTest() {
@Before
fun start() {
alice = startNode("Alice", rpcUsers = extraRPCUsers + rpcUser)
attacker = SimpleMQClient(alice.configuration.artemisAddress)
alice = startNode("Alice", rpcUsers = extraRPCUsers + rpcUser).getOrThrow()
attacker = clientTo(alice.configuration.artemisAddress)
startAttacker(attacker)
}
@ -70,27 +73,31 @@ abstract class MQSecurityTest : NodeBasedTest() {
}
@Test
fun `send message to peer address`() {
fun `send message to address of peer which has been communicated with`() {
val bobParty = startBobAndCommunicateWithAlice()
assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}")
}
@Test
fun `create queue for peer which has not been communciated with`() {
val bob = startNode("Bob").getOrThrow()
assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.legalIdentity.owningKey.toBase58String()}")
}
@Test
fun `create queue for unknown peer`() {
val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.composite.toBase58String()}"
assertNonTempQueueCreationAttackFails(invalidPeerQueue, durable = true)
assertNonTempQueueCreationAttackFails(invalidPeerQueue, durable = false)
assertTempQueueCreationAttackFails(invalidPeerQueue)
assertAllQueueCreationAttacksFail(invalidPeerQueue)
}
@Test
fun `consume message from network map queue`() {
assertConsumeAttackFails(NETWORK_MAP_ADDRESS.toString())
assertConsumeAttackFails(NETWORK_MAP_QUEUE)
}
@Test
fun `send message to network map address`() {
assertSendAttackFails(NETWORK_MAP_ADDRESS.toString())
assertSendAttackFails(NETWORK_MAP_QUEUE)
}
@Test
@ -133,15 +140,19 @@ abstract class MQSecurityTest : NodeBasedTest() {
}
@Test
fun `create random queue`() {
val randomQueue = random63BitValue().toString()
assertNonTempQueueCreationAttackFails(randomQueue, durable = false)
assertNonTempQueueCreationAttackFails(randomQueue, durable = true)
assertTempQueueCreationAttackFails(randomQueue)
fun `create random internal queue`() {
val randomQueue = "$INTERNAL_PREFIX${random63BitValue()}"
assertAllQueueCreationAttacksFail(randomQueue)
}
fun clientTo(target: HostAndPort): SimpleMQClient {
val client = SimpleMQClient(target)
@Test
fun `create random queue`() {
val randomQueue = random63BitValue().toString()
assertAllQueueCreationAttacksFail(randomQueue)
}
fun clientTo(target: HostAndPort, config: NodeSSLConfiguration = configureTestSSL()): SimpleMQClient {
val client = SimpleMQClient(target, config)
clients += client
return client
}
@ -164,6 +175,12 @@ abstract class MQSecurityTest : NodeBasedTest() {
return rpcClient.session.addressQuery(clientQueueQuery).queueNames.single().toString()
}
fun assertAllQueueCreationAttacksFail(queue: String) {
assertNonTempQueueCreationAttackFails(queue, durable = true)
assertNonTempQueueCreationAttackFails(queue, durable = false)
assertTempQueueCreationAttackFails(queue)
}
fun assertTempQueueCreationAttackFails(queue: String) {
assertAttackFails(queue, "CREATE_NON_DURABLE_QUEUE") {
attacker.session.createTemporaryQueue(queue, queue)
@ -210,7 +227,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
}
private fun startBobAndCommunicateWithAlice(): Party {
val bob = startNode("Bob")
val bob = startNode("Bob").getOrThrow()
bob.services.registerFlowInitiator(SendFlow::class, ::ReceiveFlow)
val bobParty = bob.info.legalIdentity
// Perform a protocol exchange to force the peer queue to be created

View File

@ -1,6 +1,7 @@
package net.corda.services.messaging
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.Futures
import net.corda.core.crypto.Party
import net.corda.core.div
import net.corda.core.flows.FlowLogic
@ -21,9 +22,7 @@ class P2PMessagingTest : NodeBasedTest() {
@Test
fun `network map will work after restart`() {
fun startNodes() {
startNode("NodeA")
startNode("NodeB")
startNode("Notary")
Futures.allAsList(startNode("NodeA"), startNode("NodeB"), startNode("Notary")).getOrThrow()
}
startNodes()
@ -41,7 +40,8 @@ class P2PMessagingTest : NodeBasedTest() {
startNetworkMapNode(advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
networkMapNode.services.registerFlowInitiator(ReceiveFlow::class) { SendFlow(it, "Hello") }
val serviceParty = networkMapNode.services.networkMapCache.getAnyNotary()!!
val received = startNode("Alice").services.startFlow(ReceiveFlow(serviceParty)).resultFuture.getOrThrow(10.seconds)
val alice = startNode("Alice").getOrThrow()
val received = alice.services.startFlow(ReceiveFlow(serviceParty)).resultFuture.getOrThrow(10.seconds)
assertThat(received).isEqualTo("Hello")
}
@ -63,13 +63,15 @@ class P2PMessagingTest : NodeBasedTest() {
"NetworkMap",
advertisedServices = setOf(distributedService),
configOverrides = mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
val alice = startNode(
val (alice, bob) = Futures.allAsList(
startNode(
"Alice",
advertisedServices = setOf(distributedService),
configOverrides = mapOf(
"notaryNodeAddress" to freeLocalHostAndPort().toString(),
"notaryClusterAddresses" to listOf(notaryClusterAddress.toString())))
val bob = startNode("Bob")
"notaryClusterAddresses" to listOf(notaryClusterAddress.toString()))),
startNode("Bob")
).getOrThrow()
// Setup each node in the distributed service to return back it's Party so that we can know which node is being used
val serviceNodes = listOf(networkMapNode, alice)

View File

@ -1,53 +1,71 @@
package net.corda.services.messaging
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
import net.corda.testing.messaging.SimpleMQClient
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import com.google.common.util.concurrent.ListenableFuture
import kotlinx.support.jdk7.use
import net.corda.core.crypto.Party
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.node.NodeInfo
import net.corda.core.random63BitValue
import net.corda.core.seconds
import net.corda.flows.sendRequest
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.RegistrationRequest
import net.corda.node.services.network.NodeRegistration
import net.corda.node.utilities.AddOrRemove
import net.corda.testing.TestNodeConfiguration
import net.corda.testing.node.NodeBasedTest
import net.corda.testing.node.SimpleNode
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import java.time.Instant
import java.util.concurrent.TimeoutException
/**
* Runs the security tests with the attacker pretending to be a node on the network.
*/
class P2PSecurityTest : MQSecurityTest() {
class P2PSecurityTest : NodeBasedTest() {
override fun startAttacker(attacker: SimpleMQClient) {
attacker.start(PEER_USER, PEER_USER) // Login as a peer
@Test
fun `incorrect legal name for the network map service config`() {
val incorrectNetworkMapName = random63BitValue().toString()
val node = startNode("Bob", configOverrides = mapOf(
"networkMapService" to mapOf(
"address" to networkMapNode.configuration.artemisAddress.toString(),
"legalName" to incorrectNetworkMapName
)
))
// The connection will be rejected as the legal name doesn't match
assertThatThrownBy { node.getOrThrow() }.hasMessageContaining(incorrectNetworkMapName)
}
@Test
fun `send message to RPC requests address`() {
assertSendAttackFails(RPC_REQUESTS_QUEUE)
}
@Test
fun `only the node running the broker can login using the special node user`() {
val attacker = SimpleMQClient(alice.configuration.artemisAddress)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.start(NODE_USER, NODE_USER)
fun `register with the network map service using a legal name different from the TLS CN`() {
startSimpleNode("Attacker").use {
// Register with the network map using a different legal name
val response = it.registerWithNetworkMap("Legit Business")
// We don't expect a response because the network map's host verification will prevent a connection back
// to the attacker as the TLS CN will not match the legal name it has just provided
assertThatExceptionOfType(TimeoutException::class.java).isThrownBy {
response.getOrThrow(2.seconds)
}
}
attacker.stop()
}
@Test
fun `login as the default cluster user`() {
val attacker = SimpleMQClient(alice.configuration.artemisAddress)
assertThatExceptionOfType(ActiveMQClusterSecurityException::class.java).isThrownBy {
attacker.start(ActiveMQDefaultConfiguration.getDefaultClusterUser(), ActiveMQDefaultConfiguration.getDefaultClusterPassword())
}
attacker.stop()
private fun startSimpleNode(legalName: String): SimpleNode {
val config = TestNodeConfiguration(
basedir = tempFolder.root.toPath() / legalName,
myLegalName = legalName,
networkMapService = NetworkMapInfo(networkMapNode.configuration.artemisAddress, networkMapNode.info.legalIdentity.name))
config.configureWithDevSSLCertificate() // This creates the node's TLS cert with the CN as the legal name
return SimpleNode(config).apply { start() }
}
@Test
fun `login without a username and password`() {
val attacker = SimpleMQClient(alice.configuration.artemisAddress)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.start()
}
attacker.stop()
private fun SimpleNode.registerWithNetworkMap(registrationName: String): ListenableFuture<NetworkMapService.RegistrationResponse> {
val nodeInfo = NodeInfo(net.myAddress, Party(registrationName, identity.public))
val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX)
val request = RegistrationRequest(registration.toWire(identity.private), net.myAddress)
return net.sendRequest<NetworkMapService.RegistrationResponse>(REGISTER_FLOW_TOPIC, request, networkMapNode.net.myAddress)
}
}

View File

@ -2,10 +2,7 @@ package net.corda.node
import com.typesafe.config.ConfigException
import joptsimple.OptionParser
import net.corda.core.div
import net.corda.core.randomOrNull
import net.corda.core.rootCause
import net.corda.core.then
import net.corda.core.*
import net.corda.core.utilities.Emoji
import net.corda.node.internal.Node
import net.corda.node.services.config.ConfigHelper
@ -82,32 +79,33 @@ fun main(args: Array<String>) {
}
val dir = conf.basedir.toAbsolutePath().normalize()
log.info("Main class: ${FullNodeConfiguration::class.java.protectionDomain.codeSource.location.toURI().getPath()}")
log.info("Main class: ${FullNodeConfiguration::class.java.protectionDomain.codeSource.location.toURI().path}")
val info = ManagementFactory.getRuntimeMXBean()
log.info("CommandLine Args: ${info.getInputArguments().joinToString(" ")}")
log.info("CommandLine Args: ${info.inputArguments.joinToString(" ")}")
log.info("Application Args: ${args.joinToString(" ")}")
log.info("bootclasspath: ${info.bootClassPath}")
log.info("classpath: ${info.classPath}")
log.info("VM ${info.vmName} ${info.vmVendor} ${info.vmVersion}")
log.info("Machine: ${InetAddress.getLocalHost().hostName}")
log.info("Working Directory: ${dir}")
log.info("Working Directory: $dir")
try {
val dirFile = dir.toFile()
if (!dirFile.exists())
dirFile.mkdirs()
dir.createDirectories()
val node = conf.createNode()
node.start()
printPluginsAndServices(node)
node.networkMapRegistrationFuture.then {
node.networkMapRegistrationFuture.success {
val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0
printBasicNodeInfo("Node started up and registered in $elapsed sec")
if (renderBasicInfoToConsole)
ANSIProgressObserver(node.smm)
} failure {
log.error("Error during network map registration", it)
exitProcess(1)
}
node.run()
} catch (e: Exception) {

View File

@ -19,7 +19,6 @@ import net.corda.core.utilities.loggerFor
import net.corda.node.services.User
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.RaftValidatingNotaryService
@ -260,7 +259,7 @@ open class DriverDSL(
val isDebug: Boolean
) : DriverDSLInternalInterface {
private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
private val networkMapName = "NetworkMapService"
private val networkMapLegalName = "NetworkMapService"
private val networkMapAddress = portAllocation.nextHostAndPort()
class State {
@ -291,9 +290,7 @@ open class DriverDSL(
override fun shutdown() {
state.locked {
clients.forEach {
it.stop()
}
clients.forEach(NodeMessagingClient::stop)
registeredProcesses.forEach {
it.get().destroy()
}
@ -353,7 +350,10 @@ open class DriverDSL(
"artemisAddress" to messagingAddress.toString(),
"webAddress" to apiAddress.toString(),
"extraAdvertisedServiceIds" to advertisedServices.joinToString(","),
"networkMapAddress" to networkMapAddress.toString(),
"networkMapService" to mapOf(
"address" to networkMapAddress.toString(),
"legalName" to networkMapLegalName
),
"useTestClock" to useTestClock,
"rpcUsers" to rpcUsers.map {
mapOf(
@ -416,12 +416,12 @@ open class DriverDSL(
val apiAddress = portAllocation.nextHostAndPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val baseDirectory = driverDirectory / networkMapName
val baseDirectory = driverDirectory / networkMapLegalName
val config = ConfigHelper.loadConfig(
baseDirectoryPath = baseDirectory,
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to networkMapName,
"myLegalName" to networkMapLegalName,
"basedir" to baseDirectory.normalize().toString(),
"artemisAddress" to networkMapAddress.toString(),
"webAddress" to apiAddress.toString(),

View File

@ -45,7 +45,7 @@ import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.*
import net.corda.node.services.vault.CashBalanceAsMetricsObserver
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.AddOrRemove.ADD
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.databaseTransaction
@ -72,8 +72,9 @@ import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
// TODO: Where this node is the initial network map service, currently no networkMapService is provided.
// In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the
// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in.
abstract class AbstractNode(open val configuration: NodeConfiguration, val networkMapService: SingleMessageRecipient?,
val advertisedServices: Set<ServiceInfo>, val platformClock: Clock) : SingletonSerializeAsToken() {
abstract class AbstractNode(open val configuration: NodeConfiguration,
val advertisedServices: Set<ServiceInfo>,
val platformClock: Clock) : SingletonSerializeAsToken() {
companion object {
val PRIVATE_KEY_FILE_NAME = "identity-private-key"
val PUBLIC_IDENTITY_FILE_NAME = "identity-public"
@ -95,6 +96,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
var networkMapSeq: Long = 1
protected abstract val log: Logger
protected abstract val networkMapAddress: SingleMessageRecipient?
// We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the
// low-performance prototyping period.
@ -174,8 +176,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
var isPreviousCheckpointsPresent = false
private set
protected val _networkMapRegistrationFuture: SettableFuture<Unit> = SettableFuture.create()
/** Completes once the node has successfully registered with the network map service */
private val _networkMapRegistrationFuture: SettableFuture<Unit> = SettableFuture.create()
val networkMapRegistrationFuture: ListenableFuture<Unit>
get() = _networkMapRegistrationFuture
@ -259,7 +261,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
}
startMessagingService(CordaRPCOpsImpl(services, smm, database))
runOnStop += Runnable { net.stop() }
_networkMapRegistrationFuture.setFuture(registerWithNetworkMap())
_networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured())
smm.start()
// Shut down the SMM so no Fibers are scheduled.
runOnStop += Runnable { smm.stop(acceptableLiveFiberCountOnStop()) }
@ -355,7 +357,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
return serviceList
}
/**
* Run any tasks that are needed to ensure the node is in a correct state before running start().
*/
@ -374,27 +375,43 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
}
}
/**
* Register this node with the network map cache, and load network map from a remote service (and register for
* updates) if one has been supplied.
*/
private fun registerWithNetworkMap(): ListenableFuture<Unit> {
require(networkMapService != null || NetworkMapService.type in advertisedServices.map { it.type }) {
private fun registerWithNetworkMapIfConfigured(): ListenableFuture<Unit> {
require(networkMapAddress != null || NetworkMapService.type in advertisedServices.map { it.type }) {
"Initial network map address must indicate a node that provides a network map service"
}
services.networkMapCache.addNode(info)
// In the unit test environment, we may run without any network map service sometimes.
if (networkMapService == null && inNodeNetworkMapService == null) {
return if (networkMapAddress == null && inNodeNetworkMapService == null) {
services.networkMapCache.runWithoutMapService()
return noNetworkMapConfigured()
noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache
} else {
registerWithNetworkMap()
}
return registerWithNetworkMap(networkMapService ?: info.address)
}
private fun registerWithNetworkMap(networkMapServiceAddress: SingleMessageRecipient): ListenableFuture<Unit> {
/**
* Register this node with the network map cache, and load network map from a remote service (and register for
* updates) if one has been supplied.
*/
protected open fun registerWithNetworkMap(): ListenableFuture<Unit> {
val address = networkMapAddress ?: info.address
// Register for updates, even if we're the one running the network map.
updateRegistration(networkMapServiceAddress, AddOrRemove.ADD)
return services.networkMapCache.addMapService(net, networkMapServiceAddress, true, null)
return sendNetworkMapRegistration(address).flatMap { response ->
check(response.success) { "The network map service rejected our registration request" }
// This Future will complete on the same executor as sendNetworkMapRegistration, namely the one used by net
services.networkMapCache.addMapService(net, address, true, null)
}
}
private fun sendNetworkMapRegistration(networkMapAddress: SingleMessageRecipient): ListenableFuture<RegistrationResponse> {
// Register this node against the network
val instant = platformClock.instant()
val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(info, instant.toEpochMilli(), ADD, expires)
val legalIdentityKey = obtainLegalIdentityKey()
val request = NetworkMapService.RegistrationRequest(reg.toWire(legalIdentityKey.private), net.myAddress)
return net.sendRequest(REGISTER_FLOW_TOPIC, request, networkMapAddress)
}
/** This is overriden by the mock node implementation to enable operation without any network map service */
@ -404,16 +421,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
"has any other map node been configured.")
}
private fun updateRegistration(networkMapAddr: SingleMessageRecipient, type: AddOrRemove): ListenableFuture<RegistrationResponse> {
// Register this node against the network
val instant = platformClock.instant()
val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(info, instant.toEpochMilli(), type, expires)
val legalIdentityKey = obtainLegalIdentityKey()
val request = NetworkMapService.RegistrationRequest(reg.toWire(legalIdentityKey.private), net.myAddress)
return net.sendRequest(REGISTER_FLOW_TOPIC, request, networkMapAddr)
}
protected open fun makeKeyManagementService(): KeyManagementService = PersistentKeyManagementService(partyKeys)
open protected fun makeNetworkMapService() {

View File

@ -1,15 +1,18 @@
package net.corda.node.internal
import com.codahale.metrics.JmxReporter
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.flatMap
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.success
import net.corda.core.utilities.loggerFor
import net.corda.node.printBasicNodeInfo
import net.corda.node.serialization.NodeClock
@ -17,11 +20,11 @@ import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.transactions.RaftUniquenessProvider
import net.corda.node.services.transactions.RaftValidatingNotaryService
@ -53,24 +56,21 @@ import java.util.*
import javax.management.ObjectName
import javax.servlet.*
import kotlin.concurrent.thread
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
class ConfigurationException(message: String) : Exception(message)
/**
* A Node manages a standalone server that takes part in the P2P network. It creates the services found in [ServiceHub],
* loads important data off disk and starts listening for connections.
*
* @param configuration This is typically loaded from a TypeSafe HOCON configuration file.
* @param networkMapAddress An external network map service to use. Should only ever be null when creating the first
* network map service, while bootstrapping a network.
* @param advertisedServices The services this node advertises. This must be a subset of the services it runs,
* but nodes are not required to advertise services they run (hence subset).
* @param clock The clock used within the node and by all flows etc.
*/
class Node(override val configuration: FullNodeConfiguration, networkMapAddress: SingleMessageRecipient?,
advertisedServices: Set<ServiceInfo>, clock: Clock = NodeClock()) : AbstractNode(configuration, networkMapAddress, advertisedServices, clock) {
class Node(override val configuration: FullNodeConfiguration,
advertisedServices: Set<ServiceInfo>,
clock: Clock = NodeClock()) : AbstractNode(configuration, advertisedServices, clock) {
override val log = loggerFor<Node>()
override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress)
// DISCUSSION
//
@ -125,25 +125,22 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
override fun makeMessagingService(): MessagingServiceInternal {
userService = RPCUserServiceImpl(configuration)
val serverAddr = with(configuration) {
val serverAddress = with(configuration) {
messagingServerAddress ?: {
messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache, userService)
artemisAddress
}()
}
val legalIdentity = obtainLegalIdentity()
val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database, networkMapRegistrationFuture)
val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null
return NodeMessagingClient(configuration, serverAddress, myIdentityOrNullIfNetworkMapService, serverThread, database,
networkMapRegistrationFuture)
}
override fun startMessagingService(rpcOps: RPCOps) {
// Start up the embedded MQ server
messageBroker?.apply {
runOnStop += Runnable { messageBroker?.stop() }
runOnStop += Runnable { stop() }
start()
if (networkMapService is NetworkMapAddress) {
deployBridgeIfAbsent(networkMapService.queueName, networkMapService.hostAndPort)
}
}
// Start up the MQ client.
@ -151,6 +148,15 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
net.start(rpcOps, userService)
}
/**
* Insert an initial step in the registration process which will throw an exception if a non-recoverable error is
* encountered when trying to connect to the network map node.
*/
override fun registerWithNetworkMap(): ListenableFuture<Unit> {
val networkMapConnection = messageBroker?.networkMapConnectionFuture ?: Futures.immediateFuture(Unit)
return networkMapConnection.flatMap { super.registerWithNetworkMap() }
}
// TODO: add flag to enable/disable webserver
private fun initWebServer(localRpc: CordaRPCOps): Server {
// Note that the web server handlers will all run concurrently, and not on the node thread.
@ -308,32 +314,36 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
override fun start(): Node {
alreadyRunningNodeCheck()
super.start()
// Only start the service API requests once the network map registration is complete
thread(name = "WebServer") {
networkMapRegistrationFuture.getOrThrow()
try {
webServer = initWebServer(connectLocalRpcAsNodeUser())
} catch(ex: Exception) {
// TODO: We need to decide if this is a fatal error, given the API is unavailable, or whether the API
// is not critical and we continue anyway.
log.error("Web server startup failed", ex)
// Only start the service API requests once the network map registration is successfully complete
networkMapRegistrationFuture.success {
// This needs to be in a seperate thread so that we can reply to our own request to become RPC clients
thread(name = "WebServer") {
try {
webServer = initWebServer(connectLocalRpcAsNodeUser())
} catch(ex: Exception) {
// TODO: We need to decide if this is a fatal error, given the API is unavailable, or whether the API
// is not critical and we continue anyway.
log.error("Web server startup failed", ex)
}
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("net.corda").
createsObjectNamesWith { type, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
}
// Begin exporting our own metrics via JMX.
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("net.corda").
createsObjectNamesWith { type, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
}
shutdownThread = thread(start = false) {
stop()
}
@ -405,16 +415,16 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
// Servlet filter to wrap API requests with a database transaction.
private class DatabaseTransactionFilter(val database: Database) : Filter {
override fun init(filterConfig: FilterConfig?) {
}
override fun destroy() {
}
override fun doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain) {
databaseTransaction(database) {
chain.doFilter(request, response)
}
}
override fun init(filterConfig: FilterConfig?) {}
override fun destroy() {}
}
}
class ConfigurationException(message: String) : Exception(message)
data class NetworkMapInfo(val address: HostAndPort, val legalName: String)

View File

@ -1,7 +1,7 @@
package net.corda.node.services
import net.corda.core.flows.FlowLogic
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.config.NodeConfiguration
/**
* Service for retrieving [User] objects representing RPC users who are authorised to use the RPC system. A [User]
@ -15,7 +15,7 @@ interface RPCUserService {
// TODO Store passwords as salted hashes
// TODO Or ditch this and consider something like Apache Shiro
class RPCUserServiceImpl(config: FullNodeConfiguration) : RPCUserService {
class RPCUserServiceImpl(config: NodeConfiguration) : RPCUserService {
private val _users = config.rpcUsers.associateBy(User::username)

View File

@ -119,12 +119,13 @@ private fun NodeSSLConfiguration.configureDevKeyAndTrustStores(myLegalName: Stri
}
// TODO Move this to CoreTestUtils.kt once we can pry this from the explorer
fun configureTestSSL(): NodeSSLConfiguration = object : NodeSSLConfiguration {
@JvmOverloads
fun configureTestSSL(legalName: String = "Mega Corp."): NodeSSLConfiguration = object : NodeSSLConfiguration {
override val certificatesPath = Files.createTempDirectory("certs")
override val keyStorePassword: String get() = "cordacadevpass"
override val trustStorePassword: String get() = "trustpass"
init {
configureDevKeyAndTrustStores("Mega Corp.")
configureDevKeyAndTrustStores(legalName)
}
}

View File

@ -3,12 +3,11 @@ package net.corda.node.services.config
import com.google.common.net.HostAndPort
import com.typesafe.config.Config
import net.corda.core.div
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.ServiceInfo
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.internal.Node
import net.corda.node.serialization.NodeClock
import net.corda.node.services.User
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.network.NetworkMapService
import net.corda.node.utilities.TestClock
import java.nio.file.Path
@ -26,10 +25,12 @@ interface NodeConfiguration : NodeSSLConfiguration {
val basedir: Path
override val certificatesPath: Path get() = basedir / "certificates"
val myLegalName: String
val networkMapService: NetworkMapInfo?
val nearestCity: String
val emailAddress: String
val exportJMXto: String
val dataSourceProperties: Properties get() = Properties()
val rpcUsers: List<User> get() = emptyList()
val devMode: Boolean
}
@ -38,12 +39,25 @@ class FullNodeConfiguration(val config: Config) : NodeConfiguration {
override val myLegalName: String by config
override val nearestCity: String by config
override val emailAddress: String by config
override val exportJMXto: String = "http"
override val exportJMXto: String get() = "http"
override val keyStorePassword: String by config
override val trustStorePassword: String by config
override val dataSourceProperties: Properties by config
override val devMode: Boolean by config.getOrElse { false }
val networkMapAddress: HostAndPort? by config.getOrElse { null }
override val networkMapService: NetworkMapInfo? = config.getOptionalConfig("networkMapService")?.run {
NetworkMapInfo(
HostAndPort.fromString(getString("address")),
getString("legalName"))
}
override val rpcUsers: List<User> = config
.getListOrElse<Config>("rpcUsers") { emptyList() }
.map {
val username = it.getString("user")
require(username.matches("\\w+".toRegex())) { "Username $username contains invalid characters" }
val password = it.getString("password")
val permissions = it.getListOrElse<String>("permissions") { emptyList() }.toSet()
User(username, password, permissions)
}
val useHTTPS: Boolean by config
val artemisAddress: HostAndPort by config
val webAddress: HostAndPort by config
@ -51,30 +65,23 @@ class FullNodeConfiguration(val config: Config) : NodeConfiguration {
val extraAdvertisedServiceIds: String by config
val useTestClock: Boolean by config.getOrElse { false }
val notaryNodeAddress: HostAndPort? by config.getOrElse { null }
val notaryClusterAddresses: List<HostAndPort> = config.getListOrElse<String>("notaryClusterAddresses") { emptyList<String>() }.map { HostAndPort.fromString(it) }
val rpcUsers: List<User> =
config.getListOrElse<Config>("rpcUsers") { emptyList() }
.map {
val username = it.getString("user")
require(username.matches("\\w+".toRegex())) { "Username $username contains invalid characters" }
val password = it.getString("password")
val permissions = it.getListOrElse<String>("permissions") { emptyList() }.toSet()
User(username, password, permissions)
}
val notaryClusterAddresses: List<HostAndPort> = config
.getListOrElse<String>("notaryClusterAddresses") { emptyList() }
.map { HostAndPort.fromString(it) }
fun createNode(): Node {
// This is a sanity feature do not remove.
require(!useTestClock || devMode) { "Cannot use test clock outside of dev mode" }
val advertisedServices = mutableSetOf<ServiceInfo>()
if (!extraAdvertisedServiceIds.isNullOrEmpty()) {
for (serviceId in extraAdvertisedServiceIds.split(",")) {
advertisedServices.add(ServiceInfo.parse(serviceId))
}
}
if (networkMapAddress == null) advertisedServices.add(ServiceInfo(NetworkMapService.type))
val networkMapMessageAddress: SingleMessageRecipient? = if (networkMapAddress == null) null else NodeMessagingClient.makeNetworkMapAddress(networkMapAddress!!)
return Node(this, networkMapMessageAddress, advertisedServices, if (useTestClock == true) TestClock() else NodeClock())
val advertisedServices = extraAdvertisedServiceIds
.split(",")
.filter(String::isNotBlank)
.map { ServiceInfo.parse(it) }
.toMutableSet()
if (networkMapService == null) advertisedServices.add(ServiceInfo(NetworkMapService.type))
return Node(this, advertisedServices, if (useTestClock) TestClock() else NodeClock())
}
}
private fun Config.getOptionalConfig(path: String): Config? = if (hasPath(path)) getConfig(path) else null

View File

@ -9,10 +9,10 @@ import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.read
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.node.services.config.NodeSSLConfiguration
import org.apache.activemq.artemis.api.core.SimpleString
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Inbound
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import java.nio.file.FileSystems
import java.nio.file.Path
@ -41,9 +41,9 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
const val RPC_REQUESTS_QUEUE = "rpc.requests"
const val RPC_QUEUE_REMOVALS_QUEUE = "rpc.qremovals"
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
const val NETWORK_MAP_QUEUE = "${INTERNAL_PREFIX}networkmap"
@JvmStatic
val NETWORK_MAP_ADDRESS = "${INTERNAL_PREFIX}networkmap"
const val VERIFY_PEER_COMMON_NAME = "corda.verifyPeerCommonName"
/**
* Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should
@ -59,7 +59,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
}
interface ArtemisAddress : MessageRecipients {
val queueName: SimpleString
val queueName: String
}
interface ArtemisPeerAddress : ArtemisAddress, SingleMessageRecipient {
@ -67,7 +67,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
}
data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisPeerAddress {
override val queueName = SimpleString(NETWORK_MAP_ADDRESS)
override val queueName: String get() = NETWORK_MAP_QUEUE
}
/**
@ -75,22 +75,21 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
* may change or evolve and code that relies upon it being a simple host/port may not function correctly.
* For instance it may contain onion routing data.
*
* [NodeAddress] identifies a specific peer node and an associated queue. The queue may be the peer's p2p queue or
* [NodeAddress] identifies a specific peer node and an associated queue. The queue may be the peer's own queue or
* an advertised service's queue.
*
* @param queueName The name of the queue this address is associated with.
* @param hostAndPort The address of the node.
*/
data class NodeAddress(override val queueName: SimpleString, override val hostAndPort: HostAndPort) : ArtemisPeerAddress {
data class NodeAddress(override val queueName: String, override val hostAndPort: HostAndPort) : ArtemisPeerAddress {
companion object {
fun asPeer(peerIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress {
return NodeAddress(SimpleString("$PEERS_PREFIX${peerIdentity.toBase58String()}"), hostAndPort)
return NodeAddress("$PEERS_PREFIX${peerIdentity.toBase58String()}", hostAndPort)
}
fun asService(serviceIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress {
return NodeAddress(SimpleString("$SERVICES_PREFIX${serviceIdentity.toBase58String()}"), hostAndPort)
return NodeAddress("$SERVICES_PREFIX${serviceIdentity.toBase58String()}", hostAndPort)
}
}
override fun toString(): String = "${javaClass.simpleName}(queue = $queueName, $hostAndPort)"
}
/**
@ -103,14 +102,12 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
* @param identity The service identity's owning key.
*/
data class ServiceAddress(val identity: CompositeKey) : ArtemisAddress, MessageRecipientGroup {
override val queueName: SimpleString = SimpleString("$SERVICES_PREFIX${identity.toBase58String()}")
override val queueName: String = "$SERVICES_PREFIX${identity.toBase58String()}"
}
/** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */
abstract val config: NodeSSLConfiguration
protected enum class ConnectionDirection { INBOUND, OUTBOUND }
// Restrict enabled Cipher Suites to AES and GCM as minimum for the bulk cipher.
// Our self-generated certificates all use ECDSA for handshakes, but we allow classical RSA certificates to work
// in case we need to use keytool certificates in some demos
@ -142,8 +139,8 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
config.trustStorePath.expectedOnDefaultFileSystem()
return TransportConfiguration(
when (direction) {
ConnectionDirection.INBOUND -> NettyAcceptorFactory::class.java.name
ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name
is Inbound -> NettyAcceptorFactory::class.java.name
is Outbound -> VerifyingNettyConnectorFactory::class.java.name
},
mapOf(
// Basic TCP target details
@ -167,9 +164,8 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword,
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2",
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true
// TODO: Set up the connector's host name verifier logic to ensure we connect to the expected node even in case of MITM or BGP hijacks
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true,
VERIFY_PEER_COMMON_NAME to (direction as? Outbound)?.expectedCommonName
)
)
}
@ -177,4 +173,9 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
protected fun Path.expectedOnDefaultFileSystem() {
require(fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
}
protected sealed class ConnectionDirection {
object Inbound : ConnectionDirection()
class Outbound(val expectedCommonName: String? = null) : ConnectionDirection()
}
}

View File

@ -1,46 +1,58 @@
package net.corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import io.netty.handler.ssl.SslHandler
import net.corda.core.ThreadBox
import net.corda.core.crypto.AddressFormatException
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.X509Utilities
import net.corda.core.crypto.*
import net.corda.core.crypto.X509Utilities.CORDA_CLIENT_CA
import net.corda.core.crypto.X509Utilities.CORDA_ROOT_CA
import net.corda.core.crypto.newSecureRandom
import net.corda.core.div
import net.corda.core.minutes
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.seconds
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.printBasicNodeInfo
import net.corda.node.services.RPCUserService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.INBOUND
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.OUTBOUND
import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.NODE_ROLE
import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.PEER_ROLE
import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.RPC_ROLE
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Inbound
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound
import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.RPC_ROLE
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.spi.core.remoting.*
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal
import org.bouncycastle.asn1.x500.X500Name
import rx.Subscription
import java.io.IOException
import java.math.BigInteger
import java.security.Principal
import java.security.PublicKey
import java.util.*
import java.util.concurrent.Executor
import java.util.concurrent.ScheduledExecutorService
import javax.annotation.concurrent.ThreadSafe
import javax.security.auth.Subject
import javax.security.auth.callback.CallbackHandler
@ -52,6 +64,7 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE
import javax.security.auth.login.FailedLoginException
import javax.security.auth.login.LoginException
import javax.security.auth.spi.LoginModule
import javax.security.cert.X509Certificate
// TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman.
// TODO: Implement a discovery engine that can trigger builds of new connections when another node registers? (later)
@ -81,6 +94,12 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private val mutex = ThreadBox(InnerState())
private lateinit var activeMQServer: ActiveMQServer
private val _networkMapConnectionFuture = config.networkMapService?.let { SettableFuture.create<Unit>() }
/**
* A [ListenableFuture] which completes when the server successfully connects to the network map node. If a
* non-recoverable error is encountered then the Future will complete with an exception.
*/
val networkMapConnectionFuture: SettableFuture<Unit>? get() = _networkMapConnectionFuture
private var networkChangeHandle: Subscription? = null
init {
@ -88,13 +107,15 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
}
/**
* The server will make sure the bridge exists on network map changes, see method [destroyOrCreateBridge]
* The server will make sure the bridge exists on network map changes, see method [updateBridgesOnNetworkChange]
* We assume network map will be updated accordingly when the client node register with the network map server.
*/
fun start() = mutex.locked {
if (!running) {
configureAndStartServer()
networkChangeHandle = networkMapCache.changed.subscribe { destroyOrCreateBridges(it) }
// Deploy bridge to the network map service
config.networkMapService?.let { deployBridge(NetworkMapAddress(it.address), it.legalName) }
networkChangeHandle = networkMapCache.changed.subscribe { updateBridgesOnNetworkChange(it) }
running = true
}
}
@ -106,48 +127,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
running = false
}
/**
* The bridge will be created automatically when the queues are created, however, this is not the case when the network map restarted.
* The queues are restored from the journal, and because the queues are added before we register the callback handler, this method will never get called for existing queues.
* This results in message queues up and never get send out. (https://github.com/corda/corda/issues/37)
*
* We create the bridges indirectly now because the network map is not persisted and there are no ways to obtain host and port information on startup.
* TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service.
*/
private fun destroyOrCreateBridges(change: MapChange) {
fun addAddresses(node: NodeInfo, targets: MutableSet<ArtemisPeerAddress>) {
// Add the node's address with the p2p queue.
val nodeAddress = node.address as ArtemisPeerAddress
targets.add(nodeAddress)
// Add the node's address with service queues, one per service.
node.advertisedServices.forEach {
targets.add(NodeAddress.asService(it.identity.owningKey, nodeAddress.hostAndPort))
}
}
val addressesToCreateBridgesTo = HashSet<ArtemisPeerAddress>()
val addressesToRemoveBridgesFrom = HashSet<ArtemisPeerAddress>()
when (change) {
is MapChange.Modified -> {
addAddresses(change.node, addressesToCreateBridgesTo)
addAddresses(change.previousNode, addressesToRemoveBridgesFrom)
}
is MapChange.Removed -> {
addAddresses(change.node, addressesToRemoveBridgesFrom)
}
is MapChange.Added -> {
addAddresses(change.node, addressesToCreateBridgesTo)
}
}
(addressesToRemoveBridgesFrom - addressesToCreateBridgesTo).forEach {
activeMQServer.destroyBridge(getBridgeName(it.queueName, it.hostAndPort))
}
addressesToCreateBridgesTo.filter { activeMQServer.queueQuery(it.queueName).isExists }.forEach {
deployBridgeIfAbsent(it.queueName, it.hostAndPort)
}
}
private fun configureAndStartServer() {
val config = createArtemisConfig()
val securityManager = createArtemisSecurityManager()
@ -156,57 +135,19 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
registerActivationFailureListener { exception -> throw exception }
// Some types of queue might need special preparation on our side, like dialling back or preparing
// a lazily initialised subsystem.
registerPostQueueCreationCallback { deployBridgeFromNewQueue(it) }
registerPostQueueCreationCallback { deployBridgesFromNewQueue(it.toString()) }
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
}
activeMQServer.start()
printBasicNodeInfo("Node listening on address", myHostPort.toString())
}
private fun maybeDeployBridgeForNode(queueName: SimpleString, nodeInfo: NodeInfo) {
val address = nodeInfo.address
if (address is ArtemisPeerAddress) {
log.debug("Deploying bridge for $queueName to $nodeInfo")
deployBridgeIfAbsent(queueName, address.hostAndPort)
} else {
log.error("Don't know how to deal with $address for queue $queueName")
}
}
private fun deployBridgeFromNewQueue(queueName: SimpleString) {
log.debug { "Queue created: $queueName, deploying bridge(s)" }
when {
queueName.startsWith(PEERS_PREFIX) -> try {
val identity = CompositeKey.parseFromBase58(queueName.substring(PEERS_PREFIX.length))
val nodeInfo = networkMapCache.getNodeByLegalIdentityKey(identity)
if (nodeInfo != null) {
maybeDeployBridgeForNode(queueName, nodeInfo)
} else {
log.error("Queue created for a peer that we don't know from the network map: $queueName")
}
} catch (e: AddressFormatException) {
log.error("Flow violation: Could not parse peer queue name as Base 58: $queueName")
}
queueName.startsWith(SERVICES_PREFIX) -> try {
val identity = CompositeKey.parseFromBase58(queueName.substring(SERVICES_PREFIX.length))
val nodeInfos = networkMapCache.getNodesByAdvertisedServiceIdentityKey(identity)
// Create a bridge for each node advertising the service.
for (nodeInfo in nodeInfos) {
maybeDeployBridgeForNode(queueName, nodeInfo)
}
} catch (e: AddressFormatException) {
log.error("Flow violation: Could not parse service queue name as Base 58: $queueName")
}
}
}
private fun createArtemisConfig(): Configuration = ConfigurationImpl().apply {
val artemisDir = config.basedir / "artemis"
bindingsDirectory = (artemisDir / "bindings").toString()
journalDirectory = (artemisDir / "journal").toString()
largeMessagesDirectory = (artemisDir / "large-messages").toString()
acceptorConfigurations = setOf(tcpTransport(INBOUND, "0.0.0.0", myHostPort.port))
acceptorConfigurations = setOf(tcpTransport(Inbound, "0.0.0.0", myHostPort.port))
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
@ -215,42 +156,32 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS)
// Artemis allows multiple servers to be grouped together into a cluster for load balancing purposes. The cluster
// user is used for connecting the nodes together. It has super-user privileges and so it's imperative that its
// password is changed from the default (as warned in the docs). Since we don't need this feature we turn it off
// password be changed from the default (as warned in the docs). Since we don't need this feature we turn it off
// by having its password be an unknown securely random 128-bit value.
clusterPassword = BigInteger(128, newSecureRandom()).toString(16)
queueConfigurations.addAll(listOf(
CoreQueueConfiguration().apply {
address = NETWORK_MAP_ADDRESS
name = NETWORK_MAP_ADDRESS
isDurable = true
},
CoreQueueConfiguration().apply {
address = P2P_QUEUE
name = P2P_QUEUE
isDurable = true
},
// Create an RPC queue: this will service locally connected clients only (not via a bridge) and those
// clients must have authenticated. We could use a single consumer for everything and perhaps we should,
// but these queues are not worth persisting.
CoreQueueConfiguration().apply {
name = RPC_REQUESTS_QUEUE
address = RPC_REQUESTS_QUEUE
isDurable = false
},
// The custom name for the queue is intentional - we may wish other things to subscribe to the
// NOTIFICATIONS_ADDRESS with different filters in future
CoreQueueConfiguration().apply {
name = RPC_QUEUE_REMOVALS_QUEUE
address = NOTIFICATIONS_ADDRESS
isDurable = false
filterString = "_AMQ_NotifType = 1"
}
))
queueConfigurations = listOf(
queueConfig(NETWORK_MAP_QUEUE, durable = true),
queueConfig(P2P_QUEUE, durable = true),
// Create an RPC queue: this will service locally connected clients only (not via a bridge) and those
// clients must have authenticated. We could use a single consumer for everything and perhaps we should,
// but these queues are not worth persisting.
queueConfig(RPC_REQUESTS_QUEUE, durable = false),
// The custom name for the queue is intentional - we may wish other things to subscribe to the
// NOTIFICATIONS_ADDRESS with different filters in future
queueConfig(RPC_QUEUE_REMOVALS_QUEUE, address = NOTIFICATIONS_ADDRESS, filter = "_AMQ_NotifType = 1", durable = false)
)
configureAddressSecurity()
}
private fun queueConfig(name: String, address: String = name, filter: String? = null, durable: Boolean): CoreQueueConfiguration {
return CoreQueueConfiguration().apply {
this.name = name
this.address = address
filterString = filter
isDurable = durable
}
}
/**
* Authenticated clients connecting to us fall in one of three groups:
* 1. The node hosting us and any of its logically connected components. These are given full access to all valid queues.
@ -279,45 +210,114 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
}
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
val ourRootCAPublicKey = X509Utilities
val rootCAPublicKey = X509Utilities
.loadCertificateFromKeyStore(config.trustStorePath, config.trustStorePassword, CORDA_ROOT_CA)
.publicKey
val ourPublicKey = X509Utilities
val ourCertificate = X509Utilities
.loadCertificateFromKeyStore(config.keyStorePath, config.keyStorePassword, CORDA_CLIENT_CA)
.publicKey
val ourSubjectDN = X500Name(ourCertificate.subjectDN.name)
// This is a sanity check and should not fail unless things have been misconfigured
require(ourSubjectDN.commonName == config.myLegalName) {
"Legal name does not match with our subject CN: $ourSubjectDN"
}
val securityConfig = object : SecurityConfiguration() {
// Override to make it work with our login module
override fun getAppConfigurationEntry(name: String): Array<AppConfigurationEntry> {
val options = mapOf(
RPCUserService::class.java.name to userService,
CORDA_ROOT_CA to ourRootCAPublicKey,
CORDA_CLIENT_CA to ourPublicKey)
CORDA_ROOT_CA to rootCAPublicKey,
CORDA_CLIENT_CA to ourCertificate.publicKey)
return arrayOf(AppConfigurationEntry(name, REQUIRED, options))
}
}
return ActiveMQJAASSecurityManager(NodeLoginModule::class.java.name, securityConfig)
}
private fun connectorExists(hostAndPort: HostAndPort) = hostAndPort.toString() in activeMQServer.configuration.connectorConfigurations
private fun deployBridgesFromNewQueue(queueName: String) {
log.debug { "Queue created: $queueName, deploying bridge(s)" }
private fun addConnector(hostAndPort: HostAndPort) = activeMQServer.configuration.addConnectorConfiguration(
hostAndPort.toString(),
tcpTransport(OUTBOUND, hostAndPort.hostText, hostAndPort.port)
)
private fun bridgeExists(name: String) = activeMQServer.clusterManager.bridges.containsKey(name)
fun deployBridgeIfAbsent(queueName: SimpleString, hostAndPort: HostAndPort) {
if (!connectorExists(hostAndPort)) {
addConnector(hostAndPort)
fun deployBridgeToPeer(nodeInfo: NodeInfo) {
log.debug("Deploying bridge for $queueName to $nodeInfo")
val address = nodeInfo.address
if (address is ArtemisPeerAddress) {
deployBridge(queueName, address.hostAndPort, nodeInfo.legalIdentity.name)
} else {
log.error("Don't know how to deal with $address for queue $queueName")
}
}
val bridgeName = getBridgeName(queueName, hostAndPort)
if (!bridgeExists(bridgeName)) {
deployBridge(bridgeName, queueName, hostAndPort)
when {
queueName.startsWith(PEERS_PREFIX) -> try {
val identity = CompositeKey.parseFromBase58(queueName.substring(PEERS_PREFIX.length))
val nodeInfo = networkMapCache.getNodeByLegalIdentityKey(identity)
if (nodeInfo != null) {
deployBridgeToPeer(nodeInfo)
} else {
log.error("Queue created for a peer that we don't know from the network map: $queueName")
}
} catch (e: AddressFormatException) {
log.error("Flow violation: Could not parse peer queue name as Base 58: $queueName")
}
queueName.startsWith(SERVICES_PREFIX) -> try {
val identity = CompositeKey.parseFromBase58(queueName.substring(SERVICES_PREFIX.length))
val nodeInfos = networkMapCache.getNodesByAdvertisedServiceIdentityKey(identity)
// Create a bridge for each node advertising the service.
for (nodeInfo in nodeInfos) {
deployBridgeToPeer(nodeInfo)
}
} catch (e: AddressFormatException) {
log.error("Flow violation: Could not parse service queue name as Base 58: $queueName")
}
}
}
private fun getBridgeName(queueName: SimpleString, hostAndPort: HostAndPort) = "$queueName -> $hostAndPort"
/**
* The bridge will be created automatically when the queues are created, however, this is not the case when the network map restarted.
* The queues are restored from the journal, and because the queues are added before we register the callback handler, this method will never get called for existing queues.
* This results in message queues up and never get send out. (https://github.com/corda/corda/issues/37)
*
* We create the bridges indirectly now because the network map is not persisted and there are no ways to obtain host and port information on startup.
* TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service.
*/
private fun updateBridgesOnNetworkChange(change: MapChange) {
fun gatherAddresses(node: NodeInfo): Sequence<ArtemisPeerAddress> {
val peerAddress = node.address as ArtemisPeerAddress
val addresses = mutableListOf(peerAddress)
node.advertisedServices.mapTo(addresses) { NodeAddress.asService(it.identity.owningKey, peerAddress.hostAndPort) }
return addresses.asSequence()
}
fun deployBridges(node: NodeInfo) {
gatherAddresses(node)
.filter { queueExists(it.queueName) && !bridgeExists(it.bridgeName) }
.forEach { deployBridge(it, node.legalIdentity.name) }
}
fun destroyBridges(node: NodeInfo) {
gatherAddresses(node).forEach {
activeMQServer.destroyBridge(it.bridgeName)
}
}
when (change) {
is MapChange.Added -> {
deployBridges(change.node)
}
is MapChange.Removed -> {
destroyBridges(change.node)
}
is MapChange.Modified -> {
// TODO Figure out what has actually changed and only destroy those bridges that need to be.
destroyBridges(change.previousNode)
deployBridges(change.node)
}
}
}
private fun deployBridge(address: ArtemisPeerAddress, legalName: String) {
deployBridge(address.queueName, address.hostAndPort, legalName)
}
/**
* All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
@ -325,14 +325,25 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
* P2P address.
*/
private fun deployBridge(bridgeName: String, queueName: SimpleString, hostAndPort: HostAndPort) {
private fun deployBridge(queueName: String, target: HostAndPort, legalName: String) {
val tcpTransport = tcpTransport(Outbound(expectedCommonName = legalName), target.hostText, target.port)
tcpTransport.params[ArtemisMessagingServer::class.java.name] = this
// We intentionally overwrite any previous connector config in case the peer legal name changed
activeMQServer.configuration.addConnectorConfiguration(target.toString(), tcpTransport)
activeMQServer.deployBridge(BridgeConfiguration().apply {
name = bridgeName
this.queueName = queueName.toString()
name = getBridgeName(queueName, target)
this.queueName = queueName
forwardingAddress = P2P_QUEUE
staticConnectors = listOf(hostAndPort.toString())
staticConnectors = listOf(target.toString())
confirmationWindowSize = 100000 // a guess
isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic
// We keep trying until the network map deems the node unreachable and tells us it's been removed at which
// point we destroy the bridge
// TODO Give some thought to the retry settings
retryInterval = 5.seconds.toMillis()
retryIntervalMultiplier = 1.5 // Exponential backoff
maxRetryInterval = 3.minutes.toMillis()
// As a peer of the target node we must connect to it using the peer user. Actual authentication is done using
// our TLS certificate.
user = PEER_USER
@ -340,114 +351,197 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
})
}
/**
* Clients must connect to us with a username and password and must use TLS. If a someone connects with
* [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate
* is the same as our one in our key store. Then they're given full access to all valid queues. If they connect with
* [ArtemisMessagingComponent.PEER_USER] then we confirm they belong on our P2P network by checking their root CA is
* the same as our root CA. If that's the case the only access they're given is the ablility send to our P2P address.
* In both cases the messages these authenticated nodes send to us are tagged with their subject DN and we assume
* the CN within that is their legal name.
* Otherwise if the username is neither of the above we assume it's an RPC user and authenticate against our list of
* valid RPC users. RPC clients are given permission to perform RPC and nothing else.
*/
class NodeLoginModule : LoginModule {
private fun queueExists(queueName: String): Boolean = activeMQServer.queueQuery(SimpleString(queueName)).isExists
companion object {
// Include forbidden username character to prevent name clash with any RPC usernames
const val PEER_ROLE = "SystemRoles/Peer"
const val NODE_ROLE = "SystemRoles/Node"
const val RPC_ROLE = "SystemRoles/RPC"
private fun bridgeExists(bridgeName: String): Boolean = activeMQServer.clusterManager.bridges.containsKey(bridgeName)
private val ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort)
private fun getBridgeName(queueName: String, hostAndPort: HostAndPort): String = "$queueName -> $hostAndPort"
// This is called on one of Artemis' background threads
internal fun hostVerificationFail(peerLegalName: String, expectedCommonName: String) {
log.error("Peer has wrong CN - expected $expectedCommonName but got $peerLegalName. This is either a fatal " +
"misconfiguration by the remote peer or an SSL man-in-the-middle attack!")
if (expectedCommonName == config.networkMapService?.legalName) {
// If the peer that failed host verification was the network map node then we're in big trouble and need to bail!
_networkMapConnectionFuture!!.setException(IOException("${config.networkMapService} failed host verification check"))
}
}
private var loginSucceeded: Boolean = false
private lateinit var subject: Subject
private lateinit var callbackHandler: CallbackHandler
private lateinit var userService: RPCUserService
private lateinit var ourRootCAPublicKey: PublicKey
private lateinit var ourPublicKey: PublicKey
private val principals = ArrayList<Principal>()
override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map<String, *>, options: Map<String, *>) {
this.subject = subject
this.callbackHandler = callbackHandler
userService = options[RPCUserService::class.java.name] as RPCUserService
ourRootCAPublicKey = options[CORDA_ROOT_CA] as PublicKey
ourPublicKey = options[CORDA_CLIENT_CA] as PublicKey
}
override fun login(): Boolean {
val nameCallback = NameCallback("Username: ")
val passwordCallback = PasswordCallback("Password: ", false)
val certificateCallback = CertificateCallback()
try {
callbackHandler.handle(arrayOf(nameCallback, passwordCallback, certificateCallback))
} catch (e: IOException) {
throw LoginException(e.message)
} catch (e: UnsupportedCallbackException) {
throw LoginException("${e.message} not available to obtain information from user")
}
val username = nameCallback.name ?: throw FailedLoginException("Username not provided")
val password = String(passwordCallback.password ?: throw FailedLoginException("Password not provided"))
val validatedUser = if (username == PEER_USER || username == NODE_USER) {
val certificates = certificateCallback.certificates ?: throw FailedLoginException("No TLS?")
val peerCertificate = certificates.first()
val role = if (username == NODE_USER) {
if (peerCertificate.publicKey != ourPublicKey) {
throw FailedLoginException("Only the node can login as $NODE_USER")
}
NODE_ROLE
} else {
val theirRootCAPublicKey = certificates.last().publicKey
if (theirRootCAPublicKey != ourRootCAPublicKey) {
throw FailedLoginException("Peer does not belong on our network. Their root CA: $theirRootCAPublicKey")
}
PEER_ROLE // This enables the peer to send to our P2P address
}
principals += RolePrincipal(role)
peerCertificate.subjectDN.name
} else {
// Otherwise assume they're an RPC user
val rpcUser = userService.getUser(username) ?: throw FailedLoginException("User does not exist")
if (password != rpcUser.password) {
// TODO Switch to hashed passwords
// TODO Retrieve client IP address to include in exception message
throw FailedLoginException("Password for user $username does not match")
}
principals += RolePrincipal(RPC_ROLE) // This enables the RPC client to send requests
principals += RolePrincipal("$CLIENTS_PREFIX$username") // This enables the RPC client to receive responses
username
}
principals += UserPrincipal(validatedUser)
loginSucceeded = true
return loginSucceeded
}
override fun commit(): Boolean {
val result = loginSucceeded
if (result) {
subject.principals.addAll(principals)
}
clear()
return result
}
override fun abort(): Boolean {
clear()
return true
}
override fun logout(): Boolean {
subject.principals.removeAll(principals)
return true
}
private fun clear() {
loginSucceeded = false
// This is called on one of Artemis' background threads
internal fun onTcpConnection(peerLegalName: String) {
if (peerLegalName == config.networkMapService?.legalName) {
_networkMapConnectionFuture!!.set(Unit)
}
}
}
class VerifyingNettyConnectorFactory : NettyConnectorFactory() {
override fun createConnector(configuration: MutableMap<String, Any>?,
handler: BufferHandler?,
listener: ClientConnectionLifeCycleListener?,
closeExecutor: Executor?,
threadPool: Executor?,
scheduledThreadPool: ScheduledExecutorService?,
protocolManager: ClientProtocolManager?): Connector {
return VerifyingNettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool,
protocolManager)
}
}
private class VerifyingNettyConnector(configuration: MutableMap<String, Any>?,
handler: BufferHandler?,
listener: ClientConnectionLifeCycleListener?,
closeExecutor: Executor?,
threadPool: Executor?,
scheduledThreadPool: ScheduledExecutorService?,
protocolManager: ClientProtocolManager?) :
NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager)
{
private val server = configuration?.get(ArtemisMessagingServer::class.java.name) as? ArtemisMessagingServer
private val expectedCommonName = configuration?.get(ArtemisMessagingComponent.VERIFY_PEER_COMMON_NAME) as? String
override fun createConnection(): Connection? {
val connection = super.createConnection() as NettyConnection?
if (connection != null && expectedCommonName != null) {
val peerLegalName = connection
.channel
.pipeline()
.get(SslHandler::class.java)
.engine()
.session
.peerPrincipal
.name
.let(::X500Name)
.commonName
// TODO Verify on the entire principle (subject)
if (peerLegalName != expectedCommonName) {
connection.close()
server!!.hostVerificationFail(peerLegalName, expectedCommonName)
return null // Artemis will keep trying to reconnect until it's told otherwise
} else {
server!!.onTcpConnection(peerLegalName)
}
}
return connection
}
}
/**
* Clients must connect to us with a username and password and must use TLS. If a someone connects with
* [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate
* is the same as our one in our key store. Then they're given full access to all valid queues. If they connect with
* [ArtemisMessagingComponent.PEER_USER] then we confirm they belong on our P2P network by checking their root CA is
* the same as our root CA. If that's the case the only access they're given is the ablility send to our P2P address.
* In both cases the messages these authenticated nodes send to us are tagged with their subject DN and we assume
* the CN within that is their legal name.
* Otherwise if the username is neither of the above we assume it's an RPC user and authenticate against our list of
* valid RPC users. RPC clients are given permission to perform RPC and nothing else.
*/
class NodeLoginModule : LoginModule {
companion object {
// Include forbidden username character to prevent name clash with any RPC usernames
const val PEER_ROLE = "SystemRoles/Peer"
const val NODE_ROLE = "SystemRoles/Node"
const val RPC_ROLE = "SystemRoles/RPC"
}
private var loginSucceeded: Boolean = false
private lateinit var subject: Subject
private lateinit var callbackHandler: CallbackHandler
private lateinit var userService: RPCUserService
private lateinit var ourRootCAPublicKey: PublicKey
private lateinit var ourPublicKey: PublicKey
private val principals = ArrayList<Principal>()
override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map<String, *>, options: Map<String, *>) {
this.subject = subject
this.callbackHandler = callbackHandler
userService = options[RPCUserService::class.java.name] as RPCUserService
ourRootCAPublicKey = options[CORDA_ROOT_CA] as PublicKey
ourPublicKey = options[CORDA_CLIENT_CA] as PublicKey
}
override fun login(): Boolean {
val nameCallback = NameCallback("Username: ")
val passwordCallback = PasswordCallback("Password: ", false)
val certificateCallback = CertificateCallback()
try {
callbackHandler.handle(arrayOf(nameCallback, passwordCallback, certificateCallback))
} catch (e: IOException) {
throw LoginException(e.message)
} catch (e: UnsupportedCallbackException) {
throw LoginException("${e.message} not available to obtain information from user")
}
val username = nameCallback.name ?: throw FailedLoginException("Username not provided")
val password = String(passwordCallback.password ?: throw FailedLoginException("Password not provided"))
val validatedUser = if (username == PEER_USER || username == NODE_USER) {
val certificates = certificateCallback.certificates ?: throw FailedLoginException("No TLS?")
authenticateNode(certificates, username)
} else {
// Otherwise assume they're an RPC user
authenticateRpcUser(password, username)
}
principals += UserPrincipal(validatedUser)
loginSucceeded = true
return loginSucceeded
}
private fun authenticateNode(certificates: Array<X509Certificate>, username: String): String {
val peerCertificate = certificates.first()
val role = if (username == NODE_USER) {
if (peerCertificate.publicKey != ourPublicKey) {
throw FailedLoginException("Only the node can login as $NODE_USER")
}
NODE_ROLE
} else {
val theirRootCAPublicKey = certificates.last().publicKey
if (theirRootCAPublicKey != ourRootCAPublicKey) {
throw FailedLoginException("Peer does not belong on our network. Their root CA: $theirRootCAPublicKey")
}
PEER_ROLE // This enables the peer to send to our P2P address
}
principals += RolePrincipal(role)
return peerCertificate.subjectDN.name
}
private fun authenticateRpcUser(password: String, username: String): String {
val rpcUser = userService.getUser(username) ?: throw FailedLoginException("User does not exist")
if (password != rpcUser.password) {
// TODO Switch to hashed passwords
// TODO Retrieve client IP address to include in exception message
throw FailedLoginException("Password for user $username does not match")
}
principals += RolePrincipal(RPC_ROLE) // This enables the RPC client to send requests
principals += RolePrincipal("$CLIENTS_PREFIX$username") // This enables the RPC client to receive responses
return username
}
override fun commit(): Boolean {
val result = loginSucceeded
if (result) {
subject.principals.addAll(principals)
}
clear()
return result
}
override fun abort(): Boolean {
clear()
return true
}
override fun logout(): Boolean {
subject.principals.removeAll(principals)
return true
}
private fun clear() {
loginSucceeded = false
}
}

View File

@ -4,6 +4,7 @@ import com.google.common.net.HostAndPort
import net.corda.core.ThreadBox
import net.corda.core.messaging.CordaRPCOps
import net.corda.node.services.config.NodeSSLConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound
import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientSession
@ -35,7 +36,7 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur
state.locked {
check(!running)
checkStorePasswords()
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(ConnectionDirection.OUTBOUND, host.hostText, host.port))
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(Outbound(), host.hostText, host.port))
serverLocator.threadPoolMaxSize = 1
// TODO: Configure session reconnection, confirmation window sizes and other Artemis features.
// This will allow reconnection in case of server restart/network outages/IP address changes, etc.

View File

@ -14,6 +14,7 @@ import net.corda.core.utilities.trace
import net.corda.node.services.RPCUserService
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound
import net.corda.node.utilities.*
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
@ -66,13 +67,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// confusion.
const val TOPIC_PROPERTY = "platform-topic"
const val SESSION_ID_PROPERTY = "session-id"
/**
* This should be the only way to generate an ArtemisAddress and that only of the remote NetworkMapService node.
* All other addresses come from the NetworkMapCache, or myAddress below.
* The node will populate with their own identity based address when they register with the NetworkMapService.
*/
fun makeNetworkMapAddress(hostAndPort: HostAndPort): SingleMessageRecipient = NetworkMapAddress(hostAndPort)
}
private class InnerState {
@ -118,7 +112,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
started = true
log.info("Connecting to server: $serverHostPort")
val tcpTransport = tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port)
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
val tcpTransport = tcpTransport(Outbound(), serverHostPort.hostText, serverHostPort.port)
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
clientFactory = locator.createSessionFactory()
@ -375,10 +370,10 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
private fun getMQAddress(target: MessageRecipients): SimpleString {
private fun getMQAddress(target: MessageRecipients): String {
return if (target == myAddress) {
// If we are sending to ourselves then route the message directly to our P2P queue.
SimpleString(P2P_QUEUE)
P2P_QUEUE
} else {
// Otherwise we send the message to an internal queue for the target residing on our broker. It's then the
// broker's job to route the message to the target's P2P queue.
@ -391,9 +386,9 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
private fun createQueueIfAbsent(queueName: SimpleString) {
private fun createQueueIfAbsent(queueName: String) {
state.alreadyLocked {
val queueQuery = session!!.queueQuery(queueName)
val queueQuery = session!!.queueQuery(SimpleString(queueName))
if (!queueQuery.isExists) {
log.info("Create fresh queue $queueName bound on same address")
session!!.createQueue(queueName, queueName, true)

View File

@ -32,6 +32,7 @@ import net.corda.flows.CashFlowResult
import net.corda.node.internal.AbstractNode
import net.corda.node.services.User
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress
import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.apache.activemq.artemis.api.core.SimpleString
@ -197,18 +198,8 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(NetworkMapCache.MapChange.Added::class.java)
register(NetworkMapCache.MapChange.Removed::class.java)
register(NetworkMapCache.MapChange.Modified::class.java)
register(ArtemisMessagingComponent.NodeAddress::class.java,
read = { kryo, input ->
ArtemisMessagingComponent.NodeAddress(
kryo.readObject(input, SimpleString::class.java),
kryo.readObject(input, HostAndPort::class.java))
},
write = { kryo, output, nodeAddress ->
kryo.writeObject(output, nodeAddress.queueName)
kryo.writeObject(output, nodeAddress.hostAndPort)
}
)
register(NodeMessagingClient.makeNetworkMapAddress(HostAndPort.fromString("localhost:0")).javaClass)
register(ArtemisMessagingComponent.NodeAddress::class.java)
register(NetworkMapAddress::class.java)
register(ServiceInfo::class.java)
register(ServiceType.getServiceType("ab", "ab").javaClass)
register(ServiceType.parse("ab").javaClass)

View File

@ -208,6 +208,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
serviceHub.networkService.addMessageHandler(sessionTopic) { message, reg ->
executor.checkOnThread()
val sessionMessage = message.data.deserialize<SessionMessage>()
// TODO Look up the party with the full X.500 name instead of just the legal name
val otherParty = serviceHub.networkMapCache.getNodeByLegalName(message.peer.commonName)?.legalIdentity
if (otherParty != null) {
when (sessionMessage) {

View File

@ -23,6 +23,7 @@ import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.TestNodeConfiguration
import net.corda.testing.freeLocalHostAndPort
import net.corda.testing.node.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
@ -35,7 +36,6 @@ import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.io.Closeable
import java.net.ServerSocket
import java.nio.file.Path
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit.MILLISECONDS
import kotlin.concurrent.thread
@ -68,17 +68,10 @@ class ArtemisMessagingTests {
@Before
fun setUp() {
userService = RPCUserServiceImpl(FullNodeConfiguration(ConfigFactory.empty()))
// TODO: create a base class that provides a default implementation
config = object : NodeConfiguration {
override val basedir: Path = temporaryFolder.newFolder().toPath()
override val myLegalName: String = "me"
override val nearestCity: String = "London"
override val emailAddress: String = ""
override val devMode: Boolean = true
override val exportJMXto: String = ""
override val keyStorePassword: String = "testpass"
override val trustStorePassword: String = "trustpass"
}
config = TestNodeConfiguration(
basedir = temporaryFolder.newFolder().toPath(),
myLegalName = "me",
networkMapService = null)
LogHelper.setLevel(PersistentUniquenessProvider::class)
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
dataSource = dataSourceAndDatabase.first

View File

@ -8,11 +8,10 @@ import net.corda.core.crypto.X509Utilities
import net.corda.core.div
import net.corda.core.exists
import net.corda.core.readLines
import net.corda.node.services.config.NodeConfiguration
import net.corda.testing.TestNodeConfiguration
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.nio.file.Path
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
@ -20,11 +19,10 @@ import kotlin.test.assertTrue
class CertificateSignerTest {
@Rule
@JvmField
val tempFolder: TemporaryFolder = TemporaryFolder()
val tempFolder = TemporaryFolder()
@Test
fun buildKeyStore() {
val id = SecureHash.randomSHA256().toString()
val certs = arrayOf(X509Utilities.createSelfSignedCACert("CORDA_CLIENT_CA").certificate,
@ -36,17 +34,10 @@ class CertificateSignerTest {
on { retrieveCertificates(eq(id)) }.then { certs }
}
val config = object : NodeConfiguration {
override val basedir: Path = tempFolder.root.toPath()
override val myLegalName: String = "me"
override val nearestCity: String = "London"
override val emailAddress: String = ""
override val devMode: Boolean = true
override val exportJMXto: String = ""
override val keyStorePassword: String = "testpass"
override val trustStorePassword: String = "trustpass"
}
val config = TestNodeConfiguration(
basedir = tempFolder.root.toPath(),
myLegalName = "me",
networkMapService = null)
assertFalse(config.keyStorePath.exists())
assertFalse(config.trustStorePath.exists())
@ -76,5 +67,4 @@ class CertificateSignerTest {
assertEquals(id, (config.certificatesPath / "certificate-request-id.txt").readLines { it.findFirst().get() })
}
}