mirror of
https://github.com/corda/corda.git
synced 2025-06-18 07:08:15 +00:00
Add :node-api, factor out some artemis code
This commit is contained in:
@ -2,6 +2,7 @@ package net.corda.services.messaging
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.net.HostAndPort
|
||||
import net.corda.config.SSLConfiguration
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.composite
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
@ -13,7 +14,6 @@ import net.corda.core.seconds
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.config.SSLConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NETWORK_MAP_QUEUE
|
||||
|
@ -8,6 +8,7 @@ import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigRenderOptions
|
||||
import net.corda.config.SSLConfiguration
|
||||
import net.corda.core.*
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
@ -18,7 +19,6 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.config.ConfigHelper
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
import net.corda.node.services.config.SSLConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent
|
||||
import net.corda.node.services.messaging.CordaRPCClient
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
|
@ -3,11 +3,11 @@
|
||||
|
||||
package net.corda.node.services.config
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import com.typesafe.config.ConfigParseOptions
|
||||
import com.typesafe.config.ConfigRenderOptions
|
||||
import net.corda.config.SSLConfiguration
|
||||
import net.corda.core.copyTo
|
||||
import net.corda.core.createDirectories
|
||||
import net.corda.core.crypto.X509Utilities
|
||||
@ -15,13 +15,8 @@ import net.corda.core.div
|
||||
import net.corda.core.exists
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import java.net.URL
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.time.Instant
|
||||
import java.time.LocalDate
|
||||
import java.util.*
|
||||
import kotlin.reflect.KProperty
|
||||
import kotlin.reflect.jvm.javaType
|
||||
|
||||
object ConfigHelper {
|
||||
private val log = loggerFor<ConfigHelper>()
|
||||
@ -46,57 +41,6 @@ object ConfigHelper {
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST", "PLATFORM_CLASS_MAPPED_TO_KOTLIN")
|
||||
operator fun <T> Config.getValue(receiver: Any, metadata: KProperty<*>): T {
|
||||
if (metadata.returnType.isMarkedNullable && !hasPath(metadata.name)) {
|
||||
return null as T
|
||||
}
|
||||
return when (metadata.returnType.javaType) {
|
||||
String::class.java -> getString(metadata.name) as T
|
||||
Int::class.java -> getInt(metadata.name) as T
|
||||
Integer::class.java -> getInt(metadata.name) as T
|
||||
Long::class.java -> getLong(metadata.name) as T
|
||||
Double::class.java -> getDouble(metadata.name) as T
|
||||
Boolean::class.java -> getBoolean(metadata.name) as T
|
||||
LocalDate::class.java -> LocalDate.parse(getString(metadata.name)) as T
|
||||
Instant::class.java -> Instant.parse(getString(metadata.name)) as T
|
||||
HostAndPort::class.java -> HostAndPort.fromString(getString(metadata.name)) as T
|
||||
Path::class.java -> Paths.get(getString(metadata.name)) as T
|
||||
URL::class.java -> URL(getString(metadata.name)) as T
|
||||
Properties::class.java -> getProperties(metadata.name) as T
|
||||
else -> throw IllegalArgumentException("Unsupported type ${metadata.returnType}")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class for optional configurations
|
||||
*/
|
||||
class OptionalConfig<out T>(val conf: Config, val lambda: () -> T) {
|
||||
operator fun getValue(receiver: Any, metadata: KProperty<*>): T {
|
||||
return if (conf.hasPath(metadata.name)) conf.getValue(receiver, metadata) else lambda()
|
||||
}
|
||||
}
|
||||
|
||||
fun <T> Config.getOrElse(lambda: () -> T): OptionalConfig<T> = OptionalConfig(this, lambda)
|
||||
|
||||
fun Config.getProperties(path: String): Properties {
|
||||
val obj = this.getObject(path)
|
||||
val props = Properties()
|
||||
for ((property, objectValue) in obj.entries) {
|
||||
props.setProperty(property, objectValue.unwrapped().toString())
|
||||
}
|
||||
return props
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
inline fun <reified T : Any> Config.getListOrElse(path: String, default: Config.() -> List<T>): List<T> {
|
||||
return if (hasPath(path)) {
|
||||
(if (T::class == String::class) getStringList(path) else getConfigList(path)) as List<T>
|
||||
} else {
|
||||
this.default()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Strictly for dev only automatically construct a server certificate/private key signed from
|
||||
* the CA certs in Node resources. Then provision KeyStores into certificates folder under node path.
|
||||
|
@ -2,6 +2,10 @@ package net.corda.node.services.config
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.typesafe.config.Config
|
||||
import net.corda.config.SSLConfiguration
|
||||
import net.corda.config.getListOrElse
|
||||
import net.corda.config.getOrElse
|
||||
import net.corda.config.getValue
|
||||
import net.corda.core.div
|
||||
import net.corda.core.node.NodeVersionInfo
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
@ -15,13 +19,6 @@ import java.net.URL
|
||||
import java.nio.file.Path
|
||||
import java.util.*
|
||||
|
||||
interface SSLConfiguration {
|
||||
val keyStorePassword: String
|
||||
val trustStorePassword: String
|
||||
val certificatesDirectory: Path
|
||||
val keyStoreFile: Path get() = certificatesDirectory / "sslkeystore.jks"
|
||||
val trustStoreFile: Path get() = certificatesDirectory / "truststore.jks"
|
||||
}
|
||||
|
||||
interface NodeConfiguration : SSLConfiguration {
|
||||
val baseDirectory: Path
|
||||
|
@ -2,6 +2,7 @@ package net.corda.node.services.messaging
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import com.google.common.net.HostAndPort
|
||||
import net.corda.config.SSLConfiguration
|
||||
import net.corda.core.crypto.CompositeKey
|
||||
import net.corda.core.messaging.MessageRecipientGroup
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
@ -9,14 +10,6 @@ import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.read
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.node.services.config.SSLConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Inbound
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
|
||||
import java.nio.file.FileSystems
|
||||
import java.nio.file.Path
|
||||
import java.security.KeyStore
|
||||
|
||||
/**
|
||||
@ -43,8 +36,6 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
|
||||
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
|
||||
const val NETWORK_MAP_QUEUE = "${INTERNAL_PREFIX}networkmap"
|
||||
|
||||
const val VERIFY_PEER_COMMON_NAME = "corda.verifyPeerCommonName"
|
||||
|
||||
/**
|
||||
* Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should
|
||||
* only be used in unit tests and the internals of the messaging services to keep addressing opaque for the future.
|
||||
@ -111,19 +102,6 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
|
||||
/** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */
|
||||
abstract val config: SSLConfiguration?
|
||||
|
||||
// Restrict enabled Cipher Suites to AES and GCM as minimum for the bulk cipher.
|
||||
// Our self-generated certificates all use ECDSA for handshakes, but we allow classical RSA certificates to work
|
||||
// in case we need to use keytool certificates in some demos
|
||||
private val CIPHER_SUITES = listOf(
|
||||
"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
|
||||
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
|
||||
"TLS_RSA_WITH_AES_128_GCM_SHA256",
|
||||
"TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256",
|
||||
"TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256",
|
||||
"TLS_DHE_RSA_WITH_AES_128_GCM_SHA256",
|
||||
"TLS_DHE_DSS_WITH_AES_128_GCM_SHA256"
|
||||
)
|
||||
|
||||
/**
|
||||
* Returns nothing if the keystore was opened OK or throws if not. Useful to check the password, as
|
||||
* unfortunately Artemis tends to bury the exception when the password is wrong.
|
||||
@ -137,57 +115,4 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
|
||||
KeyStore.getInstance("JKS").load(it, config.trustStorePassword.toCharArray())
|
||||
}
|
||||
}
|
||||
|
||||
protected fun tcpTransport(direction: ConnectionDirection, host: String, port: Int, enableSSL: Boolean = true): TransportConfiguration {
|
||||
// Will throw exception if enableSSL = true but config is missing
|
||||
require(config != null || !enableSSL) { "SSL configuration cannot be null when SSL is enabled." }
|
||||
|
||||
val config = config
|
||||
val options = mutableMapOf<String, Any?>(
|
||||
// Basic TCP target details
|
||||
TransportConstants.HOST_PROP_NAME to host,
|
||||
TransportConstants.PORT_PROP_NAME to port,
|
||||
|
||||
// Turn on AMQP support, which needs the protocol jar on the classpath.
|
||||
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop
|
||||
// It does not use AMQP messages for its own messages e.g. topology and heartbeats
|
||||
// TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications
|
||||
TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP"
|
||||
)
|
||||
|
||||
if (config != null && enableSSL) {
|
||||
config.keyStoreFile.expectedOnDefaultFileSystem()
|
||||
config.trustStoreFile.expectedOnDefaultFileSystem()
|
||||
val tlsOptions = mapOf<String, Any?>(
|
||||
// Enable TLS transport layer with client certs and restrict to at least SHA256 in handshake
|
||||
// and AES encryption
|
||||
TransportConstants.SSL_ENABLED_PROP_NAME to true,
|
||||
TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS",
|
||||
TransportConstants.KEYSTORE_PATH_PROP_NAME to config.keyStoreFile,
|
||||
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to config.keyStorePassword, // TODO proper management of keystores and password
|
||||
TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to "JKS",
|
||||
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to config.trustStoreFile,
|
||||
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword,
|
||||
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
|
||||
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2",
|
||||
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true,
|
||||
VERIFY_PEER_COMMON_NAME to (direction as? Outbound)?.expectedCommonName
|
||||
)
|
||||
options.putAll(tlsOptions)
|
||||
}
|
||||
val factoryName = when (direction) {
|
||||
is Inbound -> NettyAcceptorFactory::class.java.name
|
||||
is Outbound -> VerifyingNettyConnectorFactory::class.java.name
|
||||
}
|
||||
return TransportConfiguration(factoryName, options)
|
||||
}
|
||||
|
||||
protected fun Path.expectedOnDefaultFileSystem() {
|
||||
require(fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
|
||||
}
|
||||
|
||||
protected sealed class ConnectionDirection {
|
||||
object Inbound : ConnectionDirection()
|
||||
class Outbound(val expectedCommonName: String? = null) : ConnectionDirection()
|
||||
}
|
||||
}
|
||||
|
@ -16,14 +16,15 @@ import net.corda.core.node.services.NetworkMapCache.MapChange
|
||||
import net.corda.core.seconds
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.ArtemisTcpTransport
|
||||
import net.corda.node.ConnectionDirection
|
||||
import net.corda.node.expectedOnDefaultFileSystem
|
||||
import net.corda.node.printBasicNodeInfo
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Inbound
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound
|
||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE
|
||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE
|
||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.RPC_ROLE
|
||||
@ -151,9 +152,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
bindingsDirectory = (artemisDir / "bindings").toString()
|
||||
journalDirectory = (artemisDir / "journal").toString()
|
||||
largeMessagesDirectory = (artemisDir / "large-messages").toString()
|
||||
val acceptors = mutableSetOf(tcpTransport(Inbound, "0.0.0.0", p2pHostPort.port))
|
||||
val acceptors = mutableSetOf(verifyingTcpTransport(ConnectionDirection.Inbound, "0.0.0.0", p2pHostPort.port))
|
||||
if (rpcHostPort != null) {
|
||||
acceptors.add(tcpTransport(Inbound, "0.0.0.0", rpcHostPort.port, enableSSL = false))
|
||||
acceptors.add(verifyingTcpTransport(ConnectionDirection.Inbound, "0.0.0.0", rpcHostPort.port, enableSSL = false))
|
||||
}
|
||||
acceptorConfigurations = acceptors
|
||||
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
|
||||
@ -191,8 +192,8 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
}
|
||||
|
||||
/**
|
||||
* Authenticated clients connecting to us fall in one of three groups:
|
||||
* 1. The node hosting us and any of its logically connected components. These are given full access to all valid queues.
|
||||
* Authenticated clients connecting to us fall in one of the following groups:
|
||||
* 1. The node itself. It is given full access to all valid queues.
|
||||
* 2. Peers on the same network as us. These are only given permission to send to our P2P inbound queue.
|
||||
* 3. RPC users. These are only given sufficient access to perform RPC with us.
|
||||
*/
|
||||
@ -327,6 +328,12 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
deployBridge(address.queueName, address.hostAndPort, legalName)
|
||||
}
|
||||
|
||||
private fun verifyingTcpTransport(direction: ConnectionDirection, host: String, port: Int, enableSSL: Boolean = true) =
|
||||
ArtemisTcpTransport.tcpTransport(direction, HostAndPort.fromParts(host, port), config,
|
||||
enableSSL = enableSSL,
|
||||
connectorFactoryClassName = VerifyingNettyConnectorFactory::class.java.name
|
||||
)
|
||||
|
||||
/**
|
||||
* All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
|
||||
* messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it,
|
||||
@ -334,7 +341,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
* P2P address.
|
||||
*/
|
||||
private fun deployBridge(queueName: String, target: HostAndPort, legalName: String) {
|
||||
val tcpTransport = tcpTransport(Outbound(expectedCommonName = legalName), target.hostText, target.port)
|
||||
val tcpTransport = verifyingTcpTransport(ConnectionDirection.Outbound(expectedCommonName = legalName), target.hostText, target.port)
|
||||
tcpTransport.params[ArtemisMessagingServer::class.java.name] = this
|
||||
// We intentionally overwrite any previous connector config in case the peer legal name changed
|
||||
activeMQServer.configuration.addConnectorConfiguration(target.toString(), tcpTransport)
|
||||
@ -407,7 +414,7 @@ private class VerifyingNettyConnector(configuration: MutableMap<String, Any>?,
|
||||
protocolManager: ClientProtocolManager?) :
|
||||
NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) {
|
||||
private val server = configuration?.get(ArtemisMessagingServer::class.java.name) as? ArtemisMessagingServer
|
||||
private val expectedCommonName = configuration?.get(ArtemisMessagingComponent.VERIFY_PEER_COMMON_NAME) as? String
|
||||
private val expectedCommonName = configuration?.get(ArtemisTcpTransport.VERIFY_PEER_COMMON_NAME) as? String
|
||||
|
||||
override fun createConnection(): Connection? {
|
||||
val connection = super.createConnection() as NettyConnection?
|
||||
|
@ -1,14 +1,15 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import net.corda.config.SSLConfiguration
|
||||
import net.corda.core.ThreadBox
|
||||
import net.corda.core.logElapsedTime
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.minutes
|
||||
import net.corda.core.seconds
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.config.SSLConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound
|
||||
import net.corda.node.ArtemisTcpTransport.Companion.tcpTransport
|
||||
import net.corda.node.ConnectionDirection
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
@ -52,7 +53,7 @@ class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguratio
|
||||
check(!running)
|
||||
log.logElapsedTime("Startup") {
|
||||
checkStorePasswords()
|
||||
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(Outbound(), host.hostText, host.port, enableSSL = config != null)).apply {
|
||||
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(ConnectionDirection.Outbound(), host, config, enableSSL = config != null)).apply {
|
||||
// TODO: Put these in config file or make it user configurable?
|
||||
threadPoolMaxSize = 1
|
||||
confirmationWindowSize = 100000 // a guess
|
||||
|
@ -12,10 +12,11 @@ import net.corda.core.serialization.opaque
|
||||
import net.corda.core.success
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.ArtemisTcpTransport
|
||||
import net.corda.node.ConnectionDirection
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.api.MessagingServiceInternal
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.utilities.*
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
@ -129,7 +130,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
|
||||
log.info("Connecting to server: $serverHostPort")
|
||||
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
|
||||
val tcpTransport = tcpTransport(Outbound(), serverHostPort.hostText, serverHostPort.port)
|
||||
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverHostPort, config)
|
||||
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
|
||||
clientFactory = locator.createSessionFactory()
|
||||
|
||||
|
@ -20,7 +20,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.config.SSLConfiguration
|
||||
import net.corda.config.SSLConfiguration
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import java.nio.file.Path
|
||||
import java.util.concurrent.CompletableFuture
|
||||
|
Reference in New Issue
Block a user