mirror of
https://github.com/corda/corda.git
synced 2025-06-18 07:08:15 +00:00
Clean up messaging/RPC port configuration and docs (#296)
* Non-ssl artemis acceptor for RPC connection. (#271) * New non-ssl acceptor in artemis server for RPC connection. * Rename artemisAddress with messagingAddress Rename artemisAddress with messagingAddress so that the node configuration file properties match the code variable names. Rename artemisPort to messagingPort in Gradle configuration to match node configuration naming. * Add rpcPort configuration option for Gradle * Update docs to reflect changes to RPC port configuration * Renumber ports in example CorDapp to match numbering used elsewhere * Restructure upgrade guide * added config file checks on corda startup to make the upgrade path a bit smoother.
This commit is contained in:
@ -14,6 +14,9 @@ import org.junit.Test
|
||||
* Runs the security tests with the attacker pretending to be a node on the network.
|
||||
*/
|
||||
class MQSecurityAsNodeTest : MQSecurityTest() {
|
||||
override fun createAttacker(): SimpleMQClient {
|
||||
return clientTo(alice.configuration.p2pAddress)
|
||||
}
|
||||
|
||||
override fun startAttacker(attacker: SimpleMQClient) {
|
||||
attacker.start(PEER_USER, PEER_USER) // Login as a peer
|
||||
@ -26,7 +29,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() {
|
||||
|
||||
@Test
|
||||
fun `only the node running the broker can login using the special node user`() {
|
||||
val attacker = clientTo(alice.configuration.artemisAddress)
|
||||
val attacker = clientTo(alice.configuration.p2pAddress)
|
||||
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||
attacker.start(NODE_USER, NODE_USER)
|
||||
}
|
||||
@ -34,7 +37,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() {
|
||||
|
||||
@Test
|
||||
fun `login as the default cluster user`() {
|
||||
val attacker = clientTo(alice.configuration.artemisAddress)
|
||||
val attacker = clientTo(alice.configuration.p2pAddress)
|
||||
assertThatExceptionOfType(ActiveMQClusterSecurityException::class.java).isThrownBy {
|
||||
attacker.start(ActiveMQDefaultConfiguration.getDefaultClusterUser(), ActiveMQDefaultConfiguration.getDefaultClusterPassword())
|
||||
}
|
||||
@ -42,9 +45,25 @@ class MQSecurityAsNodeTest : MQSecurityTest() {
|
||||
|
||||
@Test
|
||||
fun `login without a username and password`() {
|
||||
val attacker = clientTo(alice.configuration.artemisAddress)
|
||||
val attacker = clientTo(alice.configuration.p2pAddress)
|
||||
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||
attacker.start()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `login to a non ssl port as a node user`() {
|
||||
val attacker = clientTo(alice.configuration.rpcAddress!!, sslConfiguration = null)
|
||||
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||
attacker.start(NODE_USER, NODE_USER, enableSSL = false)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `login to a non ssl port as a peer user`() {
|
||||
val attacker = clientTo(alice.configuration.rpcAddress!!, sslConfiguration = null)
|
||||
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||
attacker.start(PEER_USER, PEER_USER, enableSSL = false) // Login as a peer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,14 +2,35 @@ package net.corda.services.messaging
|
||||
|
||||
import net.corda.node.services.User
|
||||
import net.corda.testing.messaging.SimpleMQClient
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
import org.junit.Test
|
||||
|
||||
/**
|
||||
* Runs the security tests with the attacker being a valid RPC user of Alice.
|
||||
*/
|
||||
class MQSecurityAsRPCTest : MQSecurityTest() {
|
||||
override fun createAttacker(): SimpleMQClient {
|
||||
return clientTo(alice.configuration.rpcAddress!!)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `send message on logged in user's RPC address`() {
|
||||
val user1Queue = loginToRPCAndGetClientQueue()
|
||||
assertSendAttackFails(user1Queue)
|
||||
}
|
||||
|
||||
override val extraRPCUsers = listOf(User("evil", "pass", permissions = emptySet()))
|
||||
|
||||
override fun startAttacker(attacker: SimpleMQClient) {
|
||||
attacker.loginToRPC(extraRPCUsers[0])
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `login to a ssl port as a RPC user`() {
|
||||
val attacker = clientTo(alice.configuration.p2pAddress)
|
||||
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||
attacker.loginToRPC(extraRPCUsers[0], enableSSL = true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -14,7 +14,6 @@ import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.config.SSLConfiguration
|
||||
import net.corda.node.services.config.configureTestSSL
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NETWORK_MAP_QUEUE
|
||||
@ -24,6 +23,7 @@ import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEE
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_QUEUE_REMOVALS_QUEUE
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
|
||||
import net.corda.node.services.messaging.CordaRPCClientImpl
|
||||
import net.corda.testing.configureTestSSL
|
||||
import net.corda.testing.messaging.SimpleMQClient
|
||||
import net.corda.testing.node.NodeBasedTest
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException
|
||||
@ -49,12 +49,14 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
@Before
|
||||
fun start() {
|
||||
alice = startNode("Alice", rpcUsers = extraRPCUsers + rpcUser).getOrThrow()
|
||||
attacker = clientTo(alice.configuration.artemisAddress)
|
||||
attacker = createAttacker()
|
||||
startAttacker(attacker)
|
||||
}
|
||||
|
||||
open val extraRPCUsers: List<User> get() = emptyList()
|
||||
|
||||
abstract fun createAttacker(): SimpleMQClient
|
||||
|
||||
abstract fun startAttacker(attacker: SimpleMQClient)
|
||||
|
||||
@After
|
||||
@ -112,12 +114,6 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
assertConsumeAttackFails(user1Queue)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `send message on logged in user's RPC address`() {
|
||||
val user1Queue = loginToRPCAndGetClientQueue()
|
||||
assertSendAttackFails(user1Queue)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `create queue for valid RPC user`() {
|
||||
val user1Queue = "$CLIENTS_PREFIX${rpcUser.username}.rpc.${random63BitValue()}"
|
||||
@ -152,26 +148,26 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
assertAllQueueCreationAttacksFail(randomQueue)
|
||||
}
|
||||
|
||||
fun clientTo(target: HostAndPort, config: SSLConfiguration = configureTestSSL()): SimpleMQClient {
|
||||
val client = SimpleMQClient(target, config)
|
||||
fun clientTo(target: HostAndPort, sslConfiguration: SSLConfiguration? = configureTestSSL()): SimpleMQClient {
|
||||
val client = SimpleMQClient(target, sslConfiguration)
|
||||
clients += client
|
||||
return client
|
||||
}
|
||||
|
||||
fun loginToRPC(target: HostAndPort, rpcUser: User): SimpleMQClient {
|
||||
val client = clientTo(target)
|
||||
fun loginToRPC(target: HostAndPort, rpcUser: User, sslConfiguration: SSLConfiguration? = null): SimpleMQClient {
|
||||
val client = clientTo(target, sslConfiguration)
|
||||
client.loginToRPC(rpcUser)
|
||||
return client
|
||||
}
|
||||
|
||||
fun SimpleMQClient.loginToRPC(rpcUser: User): CordaRPCOps {
|
||||
start(rpcUser.username, rpcUser.password)
|
||||
fun SimpleMQClient.loginToRPC(rpcUser: User, enableSSL: Boolean = false): CordaRPCOps {
|
||||
start(rpcUser.username, rpcUser.password, enableSSL)
|
||||
val clientImpl = CordaRPCClientImpl(session, ReentrantLock(), rpcUser.username)
|
||||
return clientImpl.proxyFor(CordaRPCOps::class.java, timeout = 1.seconds)
|
||||
}
|
||||
|
||||
fun loginToRPCAndGetClientQueue(): String {
|
||||
val rpcClient = loginToRPC(alice.configuration.artemisAddress, rpcUser)
|
||||
val rpcClient = loginToRPC(alice.configuration.rpcAddress!!, rpcUser)
|
||||
val clientQueueQuery = SimpleString("$CLIENTS_PREFIX${rpcUser.username}.rpc.*")
|
||||
return rpcClient.session.addressQuery(clientQueueQuery).queueNames.single().toString()
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ class P2PSecurityTest : NodeBasedTest() {
|
||||
val incorrectNetworkMapName = random63BitValue().toString()
|
||||
val node = startNode("Bob", configOverrides = mapOf(
|
||||
"networkMapService" to mapOf(
|
||||
"address" to networkMapNode.configuration.artemisAddress.toString(),
|
||||
"address" to networkMapNode.configuration.p2pAddress.toString(),
|
||||
"legalName" to incorrectNetworkMapName
|
||||
)
|
||||
))
|
||||
@ -57,7 +57,7 @@ class P2PSecurityTest : NodeBasedTest() {
|
||||
val config = TestNodeConfiguration(
|
||||
baseDirectory = tempFolder.root.toPath() / legalName,
|
||||
myLegalName = legalName,
|
||||
networkMapService = NetworkMapInfo(networkMapNode.configuration.artemisAddress, networkMapNode.info.legalIdentity.name))
|
||||
networkMapService = NetworkMapInfo(networkMapNode.configuration.p2pAddress, networkMapNode.info.legalIdentity.name))
|
||||
config.configureWithDevSSLCertificate() // This creates the node's TLS cert with the CN as the legal name
|
||||
return SimpleNode(config).apply { start() }
|
||||
}
|
||||
@ -68,4 +68,4 @@ class P2PSecurityTest : NodeBasedTest() {
|
||||
val request = RegistrationRequest(registration.toWire(identity.private), net.myAddress)
|
||||
return net.sendRequest<NetworkMapService.RegistrationResponse>(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.net.myAddress)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
@file:JvmName("Corda")
|
||||
package net.corda.node
|
||||
|
||||
import com.typesafe.config.Config
|
||||
import com.jcabi.manifests.Manifests
|
||||
import com.typesafe.config.ConfigException
|
||||
import joptsimple.OptionException
|
||||
@ -80,7 +81,9 @@ fun main(args: Array<String>) {
|
||||
printBasicNodeInfo("Logs can be found in", System.getProperty("log-path"))
|
||||
|
||||
val conf = try {
|
||||
FullNodeConfiguration(cmdlineOptions.baseDirectory, cmdlineOptions.loadConfig())
|
||||
val conf = cmdlineOptions.loadConfig()
|
||||
checkConfigVersion(conf)
|
||||
FullNodeConfiguration(cmdlineOptions.baseDirectory, conf)
|
||||
} catch (e: ConfigException) {
|
||||
println("Unable to load the configuration file: ${e.rootCause.message}")
|
||||
exitProcess(2)
|
||||
@ -110,7 +113,7 @@ fun main(args: Array<String>) {
|
||||
log.info("VM ${info.vmName} ${info.vmVendor} ${info.vmVersion}")
|
||||
log.info("Machine: ${InetAddress.getLocalHost().hostName}")
|
||||
log.info("Working Directory: ${cmdlineOptions.baseDirectory}")
|
||||
log.info("Starting as node on ${conf.artemisAddress}")
|
||||
log.info("Starting as node on ${conf.p2pAddress}")
|
||||
|
||||
try {
|
||||
cmdlineOptions.baseDirectory.createDirectories()
|
||||
@ -138,6 +141,16 @@ fun main(args: Array<String>) {
|
||||
exitProcess(0)
|
||||
}
|
||||
|
||||
private fun checkConfigVersion(conf: Config) {
|
||||
// TODO: Remove this check in future milestone.
|
||||
if (conf.hasPath("artemisAddress")) {
|
||||
// artemisAddress has been renamed to p2pAddress in M10.
|
||||
println("artemisAddress has been renamed to p2pAddress in M10, please upgrade your configuration file and start Corda node again.")
|
||||
println("Corda will now exit...")
|
||||
exitProcess(1)
|
||||
}
|
||||
}
|
||||
|
||||
private fun checkJavaVersion() {
|
||||
// Check we're not running a version of Java with a known bug: https://github.com/corda/corda/issues/83
|
||||
try {
|
||||
|
@ -105,7 +105,7 @@ data class NodeHandle(
|
||||
val configuration: FullNodeConfiguration,
|
||||
val process: Process
|
||||
) {
|
||||
fun rpcClientToNode(): CordaRPCClient = CordaRPCClient(configuration.artemisAddress, configuration)
|
||||
fun rpcClientToNode(): CordaRPCClient = CordaRPCClient(configuration.rpcAddress!!)
|
||||
}
|
||||
|
||||
sealed class PortAllocation {
|
||||
@ -342,16 +342,18 @@ open class DriverDSL(
|
||||
|
||||
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>,
|
||||
rpcUsers: List<User>, customOverrides: Map<String, Any?>): ListenableFuture<NodeHandle> {
|
||||
val messagingAddress = portAllocation.nextHostAndPort()
|
||||
val apiAddress = portAllocation.nextHostAndPort()
|
||||
val p2pAddress = portAllocation.nextHostAndPort()
|
||||
val rpcAddress = portAllocation.nextHostAndPort()
|
||||
val webAddress = portAllocation.nextHostAndPort()
|
||||
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
|
||||
val name = providedName ?: "${pickA(name)}-${messagingAddress.port}"
|
||||
val name = providedName ?: "${pickA(name)}-${p2pAddress.port}"
|
||||
|
||||
val baseDirectory = driverDirectory / name
|
||||
val configOverrides = mapOf(
|
||||
"myLegalName" to name,
|
||||
"artemisAddress" to messagingAddress.toString(),
|
||||
"webAddress" to apiAddress.toString(),
|
||||
"p2pAddress" to p2pAddress.toString(),
|
||||
"rpcAddress" to rpcAddress.toString(),
|
||||
"webAddress" to webAddress.toString(),
|
||||
"extraAdvertisedServiceIds" to advertisedServices.map { it.toString() },
|
||||
"networkMapService" to mapOf(
|
||||
"address" to networkMapAddress.toString(),
|
||||
@ -379,7 +381,8 @@ open class DriverDSL(
|
||||
val processFuture = startNode(executorService, configuration, quasarJarPath, debugPort)
|
||||
registerProcess(processFuture)
|
||||
return processFuture.flatMap { process ->
|
||||
establishRpc(messagingAddress, configuration).flatMap { rpc ->
|
||||
// We continue to use SSL enabled port for RPC when its for node user.
|
||||
establishRpc(p2pAddress, configuration).flatMap { rpc ->
|
||||
rpc.waitUntilRegisteredWithNetworkMap().map {
|
||||
NodeHandle(rpc.nodeIdentity(), rpc, configuration, process)
|
||||
}
|
||||
@ -465,7 +468,7 @@ open class DriverDSL(
|
||||
// TODO: remove the webAddress as NMS doesn't need to run a web server. This will cause all
|
||||
// node port numbers to be shifted, so all demos and docs need to be updated accordingly.
|
||||
"webAddress" to apiAddress,
|
||||
"artemisAddress" to networkMapAddress.toString(),
|
||||
"p2pAddress" to networkMapAddress.toString(),
|
||||
"useTestClock" to useTestClock
|
||||
)
|
||||
)
|
||||
@ -536,7 +539,7 @@ open class DriverDSL(
|
||||
val process = builder.start()
|
||||
// TODO There is a race condition here. Even though the messaging address is bound it may be the case that
|
||||
// the handlers for the advertised services are not yet registered. Needs rethinking.
|
||||
return addressMustBeBound(executorService, nodeConf.artemisAddress).map { process }
|
||||
return addressMustBeBound(executorService, nodeConf.p2pAddress).map { process }
|
||||
}
|
||||
|
||||
private fun startWebserver(
|
||||
@ -554,7 +557,7 @@ open class DriverDSL(
|
||||
emptyList()
|
||||
|
||||
val javaArgs = listOf(path) +
|
||||
listOf("-Dname=node-${nodeConf.artemisAddress}-webserver") + debugPortArg +
|
||||
listOf("-Dname=node-${nodeConf.p2pAddress}-webserver") + debugPortArg +
|
||||
listOf(
|
||||
"-cp", classpath, className,
|
||||
"--base-directory", nodeConf.baseDirectory.toString())
|
||||
|
@ -27,6 +27,7 @@ import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.services.transactions.RaftUniquenessProvider
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.node.utilities.AddressUtils
|
||||
import net.corda.node.services.transactions.*
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import org.slf4j.Logger
|
||||
import java.io.RandomAccessFile
|
||||
@ -141,9 +142,9 @@ class Node(override val configuration: FullNodeConfiguration,
|
||||
|
||||
private fun makeLocalMessageBroker(): HostAndPort {
|
||||
with(configuration) {
|
||||
val useHost = tryDetectIfNotPublicHost(artemisAddress.hostText)
|
||||
val useAddress = useHost?.let { HostAndPort.fromParts(it, artemisAddress.port) } ?: artemisAddress
|
||||
messageBroker = ArtemisMessagingServer(this, useAddress, services.networkMapCache, userService)
|
||||
val useHost = tryDetectIfNotPublicHost(p2pAddress.hostText)
|
||||
val useAddress = useHost?.let { HostAndPort.fromParts(it, p2pAddress.port) } ?: p2pAddress
|
||||
messageBroker = ArtemisMessagingServer(this, useAddress, rpcAddress, services.networkMapCache, userService)
|
||||
return useAddress
|
||||
}
|
||||
}
|
||||
|
@ -15,7 +15,6 @@ import net.corda.core.div
|
||||
import net.corda.core.exists
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import java.net.URL
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.time.Instant
|
||||
@ -49,6 +48,9 @@ object ConfigHelper {
|
||||
|
||||
@Suppress("UNCHECKED_CAST", "PLATFORM_CLASS_MAPPED_TO_KOTLIN")
|
||||
operator fun <T> Config.getValue(receiver: Any, metadata: KProperty<*>): T {
|
||||
if (metadata.returnType.isMarkedNullable && !hasPath(metadata.name)) {
|
||||
return null as T
|
||||
}
|
||||
return when (metadata.returnType.javaType) {
|
||||
String::class.java -> getString(metadata.name) as T
|
||||
Int::class.java -> getInt(metadata.name) as T
|
||||
@ -101,7 +103,7 @@ inline fun <reified T : Any> Config.getListOrElse(path: String, default: Config.
|
||||
*/
|
||||
fun NodeConfiguration.configureWithDevSSLCertificate() = configureDevKeyAndTrustStores(myLegalName)
|
||||
|
||||
private fun SSLConfiguration.configureDevKeyAndTrustStores(myLegalName: String) {
|
||||
fun SSLConfiguration.configureDevKeyAndTrustStores(myLegalName: String) {
|
||||
certificatesDirectory.createDirectories()
|
||||
if (!trustStoreFile.exists()) {
|
||||
javaClass.classLoader.getResourceAsStream("net/corda/node/internal/certificates/cordatruststore.jks").copyTo(trustStoreFile)
|
||||
@ -113,15 +115,3 @@ private fun SSLConfiguration.configureDevKeyAndTrustStores(myLegalName: String)
|
||||
X509Utilities.createKeystoreForSSL(keyStoreFile, keyStorePassword, keyStorePassword, caKeyStore, "cordacadevkeypass", myLegalName)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO Move this to CoreTestUtils.kt once we can pry this from the explorer
|
||||
@JvmOverloads
|
||||
fun configureTestSSL(legalName: String = "Mega Corp."): SSLConfiguration = object : SSLConfiguration {
|
||||
override val certificatesDirectory = Files.createTempDirectory("certs")
|
||||
override val keyStorePassword: String get() = "cordacadevpass"
|
||||
override val trustStorePassword: String get() = "trustpass"
|
||||
|
||||
init {
|
||||
configureDevKeyAndTrustStores(legalName)
|
||||
}
|
||||
}
|
||||
|
@ -65,15 +65,16 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config
|
||||
User(username, password, permissions)
|
||||
}
|
||||
val useHTTPS: Boolean by config
|
||||
val artemisAddress: HostAndPort by config
|
||||
val p2pAddress: HostAndPort by config
|
||||
val rpcAddress: HostAndPort? by config
|
||||
val webAddress: HostAndPort by config
|
||||
// TODO This field is slightly redundant as artemisAddress is sufficient to hold the address of the node's MQ broker.
|
||||
// TODO This field is slightly redundant as p2pAddress is sufficient to hold the address of the node's MQ broker.
|
||||
// Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one
|
||||
val messagingServerAddress: HostAndPort? by config.getOrElse { null }
|
||||
val messagingServerAddress: HostAndPort? by config
|
||||
val extraAdvertisedServiceIds: List<String> = config.getListOrElse<String>("extraAdvertisedServiceIds") { emptyList() }
|
||||
val useTestClock: Boolean by config.getOrElse { false }
|
||||
val notaryNodeId: Int? by config.getOrElse { null }
|
||||
val notaryNodeAddress: HostAndPort? by config.getOrElse { null }
|
||||
val notaryNodeId: Int? by config
|
||||
val notaryNodeAddress: HostAndPort? by config
|
||||
val notaryClusterAddresses: List<HostAndPort> = config
|
||||
.getListOrElse<String>("notaryClusterAddresses") { emptyList() }
|
||||
.map { HostAndPort.fromString(it) }
|
||||
|
@ -22,7 +22,7 @@ import java.security.KeyStore
|
||||
/**
|
||||
* The base class for Artemis services that defines shared data structures and SSL transport configuration.
|
||||
*/
|
||||
abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
|
||||
abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
init {
|
||||
System.setProperty("org.jboss.logging.provider", "slf4j")
|
||||
@ -88,6 +88,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
|
||||
fun asPeer(peerIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress {
|
||||
return NodeAddress("$PEERS_PREFIX${peerIdentity.toBase58String()}", hostAndPort)
|
||||
}
|
||||
|
||||
fun asService(serviceIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress {
|
||||
return NodeAddress("$SERVICES_PREFIX${serviceIdentity.toBase58String()}", hostAndPort)
|
||||
}
|
||||
@ -137,7 +138,10 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
|
||||
}
|
||||
}
|
||||
|
||||
protected fun tcpTransport(direction: ConnectionDirection, host: String, port: Int): TransportConfiguration {
|
||||
protected fun tcpTransport(direction: ConnectionDirection, host: String, port: Int, enableSSL: Boolean = true): TransportConfiguration {
|
||||
// Will throw exception if enableSSL = true but config is missing
|
||||
require(config != null || !enableSSL) { "SSL configuration cannot be null when SSL is enabled." }
|
||||
|
||||
val config = config
|
||||
val options = mutableMapOf<String, Any?>(
|
||||
// Basic TCP target details
|
||||
@ -151,7 +155,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
|
||||
TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP"
|
||||
)
|
||||
|
||||
if (config != null) {
|
||||
if (config != null && enableSSL) {
|
||||
config.keyStoreFile.expectedOnDefaultFileSystem()
|
||||
config.trustStoreFile.expectedOnDefaultFileSystem()
|
||||
val tlsOptions = mapOf<String, Any?>(
|
||||
|
@ -81,7 +81,8 @@ import javax.security.cert.X509Certificate
|
||||
*/
|
||||
@ThreadSafe
|
||||
class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
val myHostPort: HostAndPort,
|
||||
val p2pHostPort: HostAndPort,
|
||||
val rpcHostPort: HostAndPort?,
|
||||
val networkMapCache: NetworkMapCache,
|
||||
val userService: RPCUserService) : ArtemisMessagingComponent() {
|
||||
companion object {
|
||||
@ -139,7 +140,10 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
|
||||
}
|
||||
activeMQServer.start()
|
||||
printBasicNodeInfo("Node ${this.config.myLegalName} listening on address", myHostPort.toString())
|
||||
printBasicNodeInfo("Node ${this.config.myLegalName} listening on address", p2pHostPort.toString())
|
||||
if (rpcHostPort != null) {
|
||||
printBasicNodeInfo("Node ${this.config.myLegalName} RPC service listening on address", rpcHostPort.toString())
|
||||
}
|
||||
}
|
||||
|
||||
private fun createArtemisConfig(): Configuration = ConfigurationImpl().apply {
|
||||
@ -147,7 +151,11 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
bindingsDirectory = (artemisDir / "bindings").toString()
|
||||
journalDirectory = (artemisDir / "journal").toString()
|
||||
largeMessagesDirectory = (artemisDir / "large-messages").toString()
|
||||
acceptorConfigurations = setOf(tcpTransport(Inbound, "0.0.0.0", myHostPort.port))
|
||||
val acceptors = mutableSetOf(tcpTransport(Inbound, "0.0.0.0", p2pHostPort.port))
|
||||
if (rpcHostPort != null) {
|
||||
acceptors.add(tcpTransport(Inbound, "0.0.0.0", rpcHostPort.port, enableSSL = false))
|
||||
}
|
||||
acceptorConfigurations = acceptors
|
||||
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
|
||||
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
|
||||
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
|
||||
@ -160,15 +168,15 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
// by having its password be an unknown securely random 128-bit value.
|
||||
clusterPassword = BigInteger(128, newSecureRandom()).toString(16)
|
||||
queueConfigurations = listOf(
|
||||
queueConfig(NETWORK_MAP_QUEUE, durable = true),
|
||||
queueConfig(P2P_QUEUE, durable = true),
|
||||
// Create an RPC queue: this will service locally connected clients only (not via a bridge) and those
|
||||
// clients must have authenticated. We could use a single consumer for everything and perhaps we should,
|
||||
// but these queues are not worth persisting.
|
||||
queueConfig(RPC_REQUESTS_QUEUE, durable = false),
|
||||
// The custom name for the queue is intentional - we may wish other things to subscribe to the
|
||||
// NOTIFICATIONS_ADDRESS with different filters in future
|
||||
queueConfig(RPC_QUEUE_REMOVALS_QUEUE, address = NOTIFICATIONS_ADDRESS, filter = "_AMQ_NotifType = 1", durable = false)
|
||||
queueConfig(NETWORK_MAP_QUEUE, durable = true),
|
||||
queueConfig(P2P_QUEUE, durable = true),
|
||||
// Create an RPC queue: this will service locally connected clients only (not via a bridge) and those
|
||||
// clients must have authenticated. We could use a single consumer for everything and perhaps we should,
|
||||
// but these queues are not worth persisting.
|
||||
queueConfig(RPC_REQUESTS_QUEUE, durable = false),
|
||||
// The custom name for the queue is intentional - we may wish other things to subscribe to the
|
||||
// NOTIFICATIONS_ADDRESS with different filters in future
|
||||
queueConfig(RPC_QUEUE_REMOVALS_QUEUE, address = NOTIFICATIONS_ADDRESS, filter = "_AMQ_NotifType = 1", durable = false)
|
||||
)
|
||||
configureAddressSecurity()
|
||||
}
|
||||
@ -290,8 +298,8 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
|
||||
fun deployBridges(node: NodeInfo) {
|
||||
gatherAddresses(node)
|
||||
.filter { queueExists(it.queueName) && !bridgeExists(it.bridgeName) }
|
||||
.forEach { deployBridge(it, node.legalIdentity.name) }
|
||||
.filter { queueExists(it.queueName) && !bridgeExists(it.bridgeName) }
|
||||
.forEach { deployBridge(it, node.legalIdentity.name) }
|
||||
}
|
||||
|
||||
fun destroyBridges(node: NodeInfo) {
|
||||
@ -397,8 +405,7 @@ private class VerifyingNettyConnector(configuration: MutableMap<String, Any>?,
|
||||
threadPool: Executor?,
|
||||
scheduledThreadPool: ScheduledExecutorService?,
|
||||
protocolManager: ClientProtocolManager?) :
|
||||
NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager)
|
||||
{
|
||||
NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) {
|
||||
private val server = configuration?.get(ArtemisMessagingServer::class.java.name) as? ArtemisMessagingServer
|
||||
private val expectedCommonName = configuration?.get(ArtemisMessagingComponent.VERIFY_PEER_COMMON_NAME) as? String
|
||||
|
||||
@ -480,15 +487,15 @@ class NodeLoginModule : LoginModule {
|
||||
|
||||
val username = nameCallback.name ?: throw FailedLoginException("Username not provided")
|
||||
val password = String(passwordCallback.password ?: throw FailedLoginException("Password not provided"))
|
||||
val certificates = certificateCallback.certificates
|
||||
|
||||
log.info("Processing login for $username")
|
||||
|
||||
val validatedUser = if (username == PEER_USER || username == NODE_USER) {
|
||||
val certificates = certificateCallback.certificates ?: throw FailedLoginException("No TLS?")
|
||||
authenticateNode(certificates, username)
|
||||
} else {
|
||||
// Otherwise assume they're an RPC user
|
||||
authenticateRpcUser(password, username)
|
||||
val validatedUser = when (determineUserRole(certificates, username)) {
|
||||
PEER_ROLE -> authenticatePeer(certificates)
|
||||
NODE_ROLE -> authenticateNode(certificates)
|
||||
RPC_ROLE -> authenticateRpcUser(password, username)
|
||||
else -> throw FailedLoginException("Peer does not belong on our network")
|
||||
}
|
||||
principals += UserPrincipal(validatedUser)
|
||||
|
||||
@ -496,24 +503,24 @@ class NodeLoginModule : LoginModule {
|
||||
return loginSucceeded
|
||||
}
|
||||
|
||||
private fun authenticateNode(certificates: Array<X509Certificate>, username: String): String {
|
||||
private fun authenticateNode(certificates: Array<X509Certificate>): String {
|
||||
val peerCertificate = certificates.first()
|
||||
val role = if (username == NODE_USER) {
|
||||
if (peerCertificate.publicKey != ourPublicKey) {
|
||||
throw FailedLoginException("Only the node can login as $NODE_USER")
|
||||
}
|
||||
NODE_ROLE
|
||||
} else {
|
||||
val theirRootCAPublicKey = certificates.last().publicKey
|
||||
if (theirRootCAPublicKey != ourRootCAPublicKey) {
|
||||
throw FailedLoginException("Peer does not belong on our network. Their root CA: $theirRootCAPublicKey")
|
||||
}
|
||||
PEER_ROLE // This enables the peer to send to our P2P address
|
||||
if (peerCertificate.publicKey != ourPublicKey) {
|
||||
throw FailedLoginException("Only the node can login as $NODE_USER")
|
||||
}
|
||||
principals += RolePrincipal(role)
|
||||
principals += RolePrincipal(NODE_ROLE)
|
||||
return peerCertificate.subjectDN.name
|
||||
}
|
||||
|
||||
private fun authenticatePeer(certificates: Array<X509Certificate>): String {
|
||||
val theirRootCAPublicKey = certificates.last().publicKey
|
||||
if (theirRootCAPublicKey != ourRootCAPublicKey) {
|
||||
throw FailedLoginException("Peer does not belong on our network. Their root CA: $theirRootCAPublicKey")
|
||||
}
|
||||
principals += RolePrincipal(PEER_ROLE)
|
||||
return certificates.first().subjectDN.name
|
||||
}
|
||||
|
||||
private fun authenticateRpcUser(password: String, username: String): String {
|
||||
val rpcUser = userService.getUser(username) ?: throw FailedLoginException("User does not exist")
|
||||
if (password != rpcUser.password) {
|
||||
@ -526,6 +533,18 @@ class NodeLoginModule : LoginModule {
|
||||
return username
|
||||
}
|
||||
|
||||
private fun determineUserRole(certificates: Array<X509Certificate>?, username: String): String? {
|
||||
return if (username == PEER_USER || username == NODE_USER) {
|
||||
certificates ?: throw FailedLoginException("No TLS?")
|
||||
if (username == PEER_USER) PEER_ROLE else NODE_ROLE
|
||||
} else if (certificates == null) {
|
||||
// Assume they're an RPC user if its from a non-ssl connection
|
||||
RPC_ROLE
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
override fun commit(): Boolean {
|
||||
val result = loginSucceeded
|
||||
if (result) {
|
||||
|
@ -24,10 +24,10 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* useful tasks. See the documentation for [proxy] or review the docsite to learn more about how this API works.
|
||||
*
|
||||
* @param host The hostname and messaging port of the node.
|
||||
* @param config If specified, the SSL configuration to use. If not specified, SSL will be disabled and the node will not be authenticated, nor will RPC traffic be encrypted.
|
||||
* @param config If specified, the SSL configuration to use. If not specified, SSL will be disabled and the node will only be authenticated on non-SSL RPC port, the RPC traffic with not be encrypted when SSL is disabled.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration?, val serviceConfigurationOverride: (ServerLocator.() -> Unit)? = null) : Closeable, ArtemisMessagingComponent() {
|
||||
class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration? = null, val serviceConfigurationOverride: (ServerLocator.() -> Unit)? = null) : Closeable, ArtemisMessagingComponent() {
|
||||
private companion object {
|
||||
val log = loggerFor<CordaRPCClient>()
|
||||
}
|
||||
@ -52,7 +52,7 @@ class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguratio
|
||||
check(!running)
|
||||
log.logElapsedTime("Startup") {
|
||||
checkStorePasswords()
|
||||
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(Outbound(), host.hostText, host.port)).apply {
|
||||
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(Outbound(), host.hostText, host.port, enableSSL = config != null)).apply {
|
||||
// TODO: Put these in config file or make it user configurable?
|
||||
threadPoolMaxSize = 1
|
||||
confirmationWindowSize = 100000 // a guess
|
||||
|
@ -48,6 +48,7 @@ class ArtemisMessagingTests {
|
||||
@Rule @JvmField val temporaryFolder = TemporaryFolder()
|
||||
|
||||
val hostAndPort = freeLocalHostAndPort()
|
||||
val rpcHostAndPort = freeLocalHostAndPort()
|
||||
val topic = "platform.self"
|
||||
val identity = generateKeyPair()
|
||||
|
||||
@ -232,8 +233,8 @@ class ArtemisMessagingTests {
|
||||
}
|
||||
}
|
||||
|
||||
private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer {
|
||||
return ArtemisMessagingServer(config, local, networkMapCache, userService).apply {
|
||||
private fun createMessagingServer(local: HostAndPort = hostAndPort, rpc: HostAndPort = rpcHostAndPort): ArtemisMessagingServer {
|
||||
return ArtemisMessagingServer(config, local, rpc, networkMapCache, userService).apply {
|
||||
config.configureWithDevSSLCertificate()
|
||||
messagingServer = this
|
||||
}
|
||||
|
Reference in New Issue
Block a user