From 08e391579ccecf7822fe72516ba43926ffed9fec Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Thu, 22 Dec 2016 14:48:27 +0000 Subject: [PATCH] Node verifies the peer it connects to by checking its TLS common name --- config/dev/generalnodea.conf | 5 +- config/dev/generalnodeb.conf | 5 +- .../core/crypto/WhitelistTrustManager.kt | 185 ------ .../corda/core/node/services/ServiceType.kt | 2 +- .../core/crypto/WhitelistTrustManagerTest.kt | 205 ------ docs/source/corda-configuration-file.rst | 87 ++- docs/source/creating-a-cordapp.rst | 2 +- .../src/main/resources/example-node.conf | 5 +- docs/source/messaging.rst | 3 + docs/source/setting-up-a-corda-network.rst | 4 +- docs/source/tutorial-cordapp.rst | 9 +- .../groovy/net/corda/plugins/Cordform.groovy | 8 +- .../main/groovy/net/corda/plugins/Node.groovy | 11 +- .../node/services/RaftNotaryServiceTests.kt | 16 +- .../messaging/MQSecurityAsNodeTest.kt | 50 ++ ...SecurityTest.kt => MQSecurityAsRPCTest.kt} | 2 +- .../services/messaging/MQSecurityTest.kt | 51 +- .../services/messaging/P2PMessagingTest.kt | 16 +- .../services/messaging/P2PSecurityTest.kt | 92 +-- node/src/main/kotlin/net/corda/node/Main.kt | 20 +- .../kotlin/net/corda/node/driver/Driver.kt | 16 +- .../net/corda/node/internal/AbstractNode.kt | 63 +- .../kotlin/net/corda/node/internal/Node.kt | 106 ++-- .../net/corda/node/services/RPCUserService.kt | 4 +- .../node/services/config/ConfigUtilities.kt | 5 +- .../node/services/config/NodeConfiguration.kt | 53 +- .../messaging/ArtemisMessagingComponent.kt | 39 +- .../messaging/ArtemisMessagingServer.kt | 600 ++++++++++-------- .../node/services/messaging/CordaRPCClient.kt | 3 +- .../services/messaging/NodeMessagingClient.kt | 19 +- .../node/services/messaging/RPCStructures.kt | 15 +- .../statemachine/StateMachineManager.kt | 1 + .../node/services/ArtemisMessagingTests.kt | 17 +- .../certsigning/CertificateSignerTest.kt | 22 +- publish.properties | 2 +- .../kotlin/net/corda/simulation/Simulation.kt | 107 +--- .../kotlin/net/corda/testing/CoreTestUtils.kt | 25 + .../corda/testing/messaging/SimpleMQClient.kt | 8 +- .../kotlin/net/corda/testing/node/MockNode.kt | 32 +- .../net/corda/testing/node/NodeBasedTest.kt | 26 +- .../net/corda/testing/node/SimpleNode.kt | 61 ++ 41 files changed, 939 insertions(+), 1063 deletions(-) delete mode 100644 core/src/main/kotlin/net/corda/core/crypto/WhitelistTrustManager.kt delete mode 100644 core/src/test/kotlin/net/corda/core/crypto/WhitelistTrustManagerTest.kt create mode 100644 node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt rename node/src/integration-test/kotlin/net/corda/services/messaging/{RPCSecurityTest.kt => MQSecurityAsRPCTest.kt} (89%) create mode 100644 test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt diff --git a/config/dev/generalnodea.conf b/config/dev/generalnodea.conf index cf34c2b870..009edc95cd 100644 --- a/config/dev/generalnodea.conf +++ b/config/dev/generalnodea.conf @@ -6,5 +6,8 @@ trustStorePassword : "trustpass" artemisAddress : "localhost:31337" webAddress : "localhost:31339" extraAdvertisedServiceIds: "corda.interest_rates" -networkMapAddress : "localhost:12345" +networkMapService : { + address : "localhost:12345" + legalName : "Network Map Service" +} useHTTPS : false diff --git a/config/dev/generalnodeb.conf b/config/dev/generalnodeb.conf index 56815ad1e8..c8462cc2e8 100644 --- a/config/dev/generalnodeb.conf +++ b/config/dev/generalnodeb.conf @@ -6,5 +6,8 @@ trustStorePassword : "trustpass" artemisAddress : "localhost:31338" webAddress : "localhost:31340" extraAdvertisedServiceIds: "corda.interest_rates" -networkMapAddress : "localhost:12345" +networkMapService : { + address : "localhost:12345" + legalName : "Network Map Service" +} useHTTPS : false diff --git a/core/src/main/kotlin/net/corda/core/crypto/WhitelistTrustManager.kt b/core/src/main/kotlin/net/corda/core/crypto/WhitelistTrustManager.kt deleted file mode 100644 index 678c6b551b..0000000000 --- a/core/src/main/kotlin/net/corda/core/crypto/WhitelistTrustManager.kt +++ /dev/null @@ -1,185 +0,0 @@ -package net.corda.core.crypto - -import sun.security.util.HostnameChecker -import java.net.InetAddress -import java.net.Socket -import java.net.UnknownHostException -import java.security.KeyStore -import java.security.Provider -import java.security.Security -import java.security.cert.CertificateException -import java.security.cert.X509Certificate -import java.util.concurrent.ConcurrentHashMap -import javax.net.ssl.* - -/** - * Call this to change the default verification algorithm and this use the WhitelistTrustManager - * implementation. This is a work around to the fact that ArtemisMQ and probably many other libraries - * don't correctly configure the SSLParameters with setEndpointIdentificationAlgorithm and thus don't check - * that the certificate matches with the DNS entry requested. This exposes us to man in the middle attacks. - * The issue has been raised with ArtemisMQ: https://issues.apache.org/jira/browse/ARTEMIS-656 - */ -fun registerWhitelistTrustManager() { - if (Security.getProvider("WhitelistTrustManager") == null) { - WhitelistTrustManagerProvider.register() - } - - // Forcibly change the TrustManagerFactory defaultAlgorithm to be us - // This will apply to all code using TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()) - // Which includes the standard HTTPS implementation and most other SSL code - // TrustManagerFactory.getInstance(WhitelistTrustManagerProvider.originalTrustProviderAlgorithm)) will - // allow access to the original implementation which is normally "PKIX" - Security.setProperty("ssl.TrustManagerFactory.algorithm", "whitelistTrustManager") -} - -/** - * Custom Security Provider that forces the TrustManagerFactory to be our custom one. - * Also holds the identity of the original TrustManager algorithm so - * that we can delegate most of the checking to the proper Java code. We simply add some more checks. - * - * The whitelist automatically includes the local server DNS name and IP address - * - */ -object WhitelistTrustManagerProvider : Provider("WhitelistTrustManager", - 1.0, - "Provider for custom trust manager that always validates certificate names") { - - val originalTrustProviderAlgorithm = Security.getProperty("ssl.TrustManagerFactory.algorithm") - - private val _whitelist = ConcurrentHashMap.newKeySet() - val whitelist: Set get() = _whitelist.toSet() // The acceptable IP and DNS names for clients and servers. - - init { - // Add ourselves to whitelist as currently we have to connect to a local ArtemisMQ broker - val host = InetAddress.getLocalHost() - addWhitelistEntry(host.hostName) - } - - /** - * Security provider registration function for WhitelistTrustManagerProvider - */ - fun register() { - Security.addProvider(WhitelistTrustManagerProvider) - - // Register our custom TrustManagerFactorySpi - put("TrustManagerFactory.whitelistTrustManager", "net.corda.core.crypto.WhitelistTrustManagerSpi") - } - - /** - * Adds an extra name to the whitelist if not already present - * If this is a new entry it will internally request a DNS lookup which may block the calling thread. - */ - fun addWhitelistEntry(serverName: String) { - if (!_whitelist.contains(serverName)) { // Safe as we never delete from the set - addWhitelistEntries(listOf(serverName)) - } - } - - /** - * Adds a list of servers to the whitelist and also adds their fully resolved name/ip address after DNS lookup - * If the server name is not an actual DNS name this is silently ignored. - * The DNS request may block the calling thread. - */ - fun addWhitelistEntries(serverNames: List) { - _whitelist.addAll(serverNames) - for (name in serverNames) { - try { - val addresses = InetAddress.getAllByName(name).toList() - _whitelist.addAll(addresses.map { y -> y.canonicalHostName }) - _whitelist.addAll(addresses.map { y -> y.hostAddress }) - } catch (ex: UnknownHostException) { - // Ignore if the server name is not resolvable e.g. for wildcard addresses, or addresses that can only be resolved externally - } - } - } -} - -/** - * Registered TrustManagerFactorySpi - */ -class WhitelistTrustManagerSpi : TrustManagerFactorySpi() { - // Get the original implementation to delegate to (can't use Kotlin delegation on abstract classes unfortunately). - val originalProvider = TrustManagerFactory.getInstance(WhitelistTrustManagerProvider.originalTrustProviderAlgorithm) - - override fun engineInit(keyStore: KeyStore?) { - originalProvider.init(keyStore) - } - - override fun engineInit(managerFactoryParameters: ManagerFactoryParameters?) { - originalProvider.init(managerFactoryParameters) - } - - override fun engineGetTrustManagers(): Array { - val parent = originalProvider.trustManagers.first() as X509ExtendedTrustManager - // Wrap original provider in ours and return - return arrayOf(WhitelistTrustManager(parent)) - } -} - -/** - * Our TrustManager extension takes the standard certificate checker and first delegates all the - * chain checking to that. If everything is well formed we then simply add a check against our whitelist - */ -class WhitelistTrustManager(val originalProvider: X509ExtendedTrustManager) : X509ExtendedTrustManager() { - // Use same Helper class as standard HTTPS library validator - val checker = HostnameChecker.getInstance(HostnameChecker.TYPE_TLS) - - private fun checkIdentity(hostname: String?, cert: X509Certificate) { - // Based on standard code in sun.security.ssl.X509TrustManagerImpl.checkIdentity - // if IPv6 strip off the "[]" - if ((hostname != null) && hostname.startsWith("[") && hostname.endsWith("]")) { - checker.match(hostname.substring(1, hostname.length - 1), cert) - } else { - checker.match(hostname, cert) - } - } - - /** - * scan whitelist and confirm the certificate matches at least one entry - */ - private fun checkWhitelist(cert: X509Certificate) { - for (whiteListEntry in WhitelistTrustManagerProvider.whitelist) { - try { - checkIdentity(whiteListEntry, cert) - return // if we get here without throwing we had a match - } catch(ex: CertificateException) { - // Ignore and check the next entry until we find a match, or exhaust the whitelist - } - } - throw CertificateException("Certificate not on whitelist ${cert.subjectDN}") - } - - override fun checkClientTrusted(chain: Array, authType: String, socket: Socket?) { - originalProvider.checkClientTrusted(chain, authType, socket) - checkWhitelist(chain[0]) - } - - override fun checkClientTrusted(chain: Array, authType: String, engine: SSLEngine?) { - originalProvider.checkClientTrusted(chain, authType, engine) - checkWhitelist(chain[0]) - } - - override fun checkClientTrusted(chain: Array, authType: String) { - originalProvider.checkClientTrusted(chain, authType) - checkWhitelist(chain[0]) - } - - override fun checkServerTrusted(chain: Array, authType: String, socket: Socket?) { - originalProvider.checkServerTrusted(chain, authType, socket) - checkWhitelist(chain[0]) - } - - override fun checkServerTrusted(chain: Array, authType: String, engine: SSLEngine?) { - originalProvider.checkServerTrusted(chain, authType, engine) - checkWhitelist(chain[0]) - } - - override fun checkServerTrusted(chain: Array, authType: String) { - originalProvider.checkServerTrusted(chain, authType) - checkWhitelist(chain[0]) - } - - override fun getAcceptedIssuers(): Array { - return originalProvider.acceptedIssuers - } -} diff --git a/core/src/main/kotlin/net/corda/core/node/services/ServiceType.kt b/core/src/main/kotlin/net/corda/core/node/services/ServiceType.kt index 9cc7ac1178..66f58548d4 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/ServiceType.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/ServiceType.kt @@ -11,7 +11,7 @@ sealed class ServiceType(val id: String) { // // * IDs must start with a lower case letter // * IDs can only contain alphanumeric, full stop and underscore ASCII characters - require(id.matches(Regex("[a-z][a-zA-Z0-9._]+"))) + require(id.matches(Regex("[a-z][a-zA-Z0-9._]+"))) { id } } private class ServiceTypeImpl(baseId: String, subTypeId: String) : ServiceType("$baseId.$subTypeId") diff --git a/core/src/test/kotlin/net/corda/core/crypto/WhitelistTrustManagerTest.kt b/core/src/test/kotlin/net/corda/core/crypto/WhitelistTrustManagerTest.kt deleted file mode 100644 index 9fec5d0303..0000000000 --- a/core/src/test/kotlin/net/corda/core/crypto/WhitelistTrustManagerTest.kt +++ /dev/null @@ -1,205 +0,0 @@ -package net.corda.core.crypto - -import org.junit.BeforeClass -import org.junit.Test -import java.net.Socket -import java.security.KeyStore -import java.security.cert.CertificateException -import java.security.cert.X509Certificate -import javax.net.ssl.SSLEngine -import javax.net.ssl.TrustManagerFactory -import javax.net.ssl.X509ExtendedTrustManager -import kotlin.test.assertEquals -import kotlin.test.assertFailsWith -import kotlin.test.assertTrue - -// TODO: This suppress is needed due to KT-260, fixed in Kotlin 1.0.4 so remove after upgrading. -@Suppress("CAST_NEVER_SUCCEEDS") -class WhitelistTrustManagerTest { - companion object { - @BeforeClass - @JvmStatic - fun registerTrustManager() { - // Validate original factory - assertEquals("PKIX", TrustManagerFactory.getDefaultAlgorithm()) - - //register for all tests - registerWhitelistTrustManager() - } - } - - private fun getTrustmanagerAndCert(whitelist: String, certificateName: String): Pair { - WhitelistTrustManagerProvider.addWhitelistEntry(whitelist) - - val caCertAndKey = X509Utilities.createSelfSignedCACert(certificateName) - - val keyStore = KeyStore.getInstance(KeyStore.getDefaultType()) - keyStore.load(null, null) - keyStore.setCertificateEntry("cacert", caCertAndKey.certificate) - - val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()) - trustManagerFactory.init(keyStore) - - return Pair(trustManagerFactory.trustManagers.first() as X509ExtendedTrustManager, caCertAndKey.certificate) - } - - private fun getTrustmanagerAndUntrustedChainCert(): Pair { - WhitelistTrustManagerProvider.addWhitelistEntry("test.r3corda.com") - - val otherCaCertAndKey = X509Utilities.createSelfSignedCACert("bad root") - - val caCertAndKey = X509Utilities.createSelfSignedCACert("good root") - - val subject = X509Utilities.getDevX509Name("test.r3corda.com") - val serverKey = X509Utilities.generateECDSAKeyPairForSSL() - val serverCert = X509Utilities.createServerCert(subject, - serverKey.public, - otherCaCertAndKey, - listOf(), - listOf()) - - val keyStore = KeyStore.getInstance(KeyStore.getDefaultType()) - keyStore.load(null, null) - keyStore.setCertificateEntry("cacert", caCertAndKey.certificate) - - val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()) - trustManagerFactory.init(keyStore) - - return Pair(trustManagerFactory.trustManagers.first() as X509ExtendedTrustManager, serverCert) - } - - - @Test - fun `getDefaultAlgorithm TrustManager is WhitelistTrustManager`() { - registerWhitelistTrustManager() // Check double register is safe - - assertEquals("whitelistTrustManager", TrustManagerFactory.getDefaultAlgorithm()) - - val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()) - - trustManagerFactory.init(null as KeyStore?) - - val trustManagers = trustManagerFactory.trustManagers - - assertTrue { trustManagers.all { it is WhitelistTrustManager } } - } - - @Test - fun `check certificate works for whitelisted certificate and specific domain`() { - val (trustManager, cert) = getTrustmanagerAndCert("test.r3corda.com", "test.r3corda.com") - - trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) - - trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) - - trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) - - trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) - - trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) - - trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) - } - - @Test - fun `check certificate works for specific certificate and wildcard permitted domain`() { - val (trustManager, cert) = getTrustmanagerAndCert("*.r3corda.com", "test.r3corda.com") - - trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) - - trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) - - trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) - - trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) - - trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) - - trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) - } - - @Test - fun `check certificate works for wildcard certificate and non wildcard domain`() { - val (trustManager, cert) = getTrustmanagerAndCert("*.r3corda.com", "test.r3corda.com") - - trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) - - trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) - - trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) - - trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) - - trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) - - trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) - } - - @Test - fun `check unknown certificate rejected`() { - val (trustManager, cert) = getTrustmanagerAndCert("test.r3corda.com", "test.notr3.com") - - assertFailsWith { trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) } - - assertFailsWith { trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) } - - assertFailsWith { trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) } - - assertFailsWith { trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) } - - assertFailsWith { trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) } - - assertFailsWith { trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) } - } - - @Test - fun `check unknown wildcard certificate rejected`() { - val (trustManager, cert) = getTrustmanagerAndCert("test.r3corda.com", "*.notr3.com") - - assertFailsWith { trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) } - - assertFailsWith { trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) } - - assertFailsWith { trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) } - - assertFailsWith { trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) } - - assertFailsWith { trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) } - - assertFailsWith { trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) } - } - - @Test - fun `check unknown certificate rejected against mismatched wildcard`() { - val (trustManager, cert) = getTrustmanagerAndCert("*.r3corda.com", "test.notr3.com") - - assertFailsWith { trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) } - - assertFailsWith { trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) } - - assertFailsWith { trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) } - - assertFailsWith { trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) } - - assertFailsWith { trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) } - - assertFailsWith { trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) } - } - - @Test - fun `check certificate signed by untrusted root is still rejected, despite matched name`() { - val (trustManager, cert) = getTrustmanagerAndUntrustedChainCert() - - assertFailsWith { trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) } - - assertFailsWith { trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) } - - assertFailsWith { trustManager.checkServerTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) } - - assertFailsWith { trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM) } - - assertFailsWith { trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as Socket?) } - - assertFailsWith { trustManager.checkClientTrusted(arrayOf(cert), X509Utilities.SIGNATURE_ALGORITHM, null as SSLEngine?) } - } -} diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index 4da238dfc5..04a74d4dff 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -15,8 +15,9 @@ and for rarely changed properties this defaulting allows the property to be excl Format ------ -Corda uses the Typesafe configuration library to parse the configuration see the `typesafe config on Github `_ the format of the configuration files can be simple JSON, but for the more powerful substitution features -uses HOCON format see `HOCON documents `_ +The Corda configuration file uses the HOCON format which is superset of JSON. It has several features which makes it +very useful as a configuration format. Please visit their `page `_ +for further details. Examples -------- @@ -46,50 +47,73 @@ NetworkMapService plus Simple Notary configuration file. Fields ------ -:basedir: This specifies the node workspace folder either as an absolute path, or relative to the current working directory. It can be overidden by the ``--base-directory`` command line option, in which case the the value in the file is ignored and a ``node.conf`` file is expected in that workspace directory as the configuration source. +:basedir: This specifies the node workspace folder either as an absolute path, or relative to the current working directory. + It can be overidden by the ``--base-directory`` command line option, in which case the the value in the file is ignored + and a ``node.conf`` file is expected in that workspace directory as the configuration source. -:myLegalName: The legal identity of the node acts as a human readable alias to the node's public key and several demos use this to lookup the NodeInfo. +:myLegalName: The legal identity of the node acts as a human readable alias to the node's public key and several demos use + this to lookup the NodeInfo. -:nearestCity: The location of the node as used to locate coordinates on the world map when running the network simulator demo. See :doc:`network-simulator`. +:nearestCity: The location of the node as used to locate coordinates on the world map when running the network simulator + demo. See :doc:`network-simulator`. -:keyStorePassword: - The password to unlock the KeyStore file (``/certificates/sslkeystore.jks``) containing the node certificate and private key. +:keyStorePassword: The password to unlock the KeyStore file (``/certificates/sslkeystore.jks``) containing the + node certificate and private key. - note:: This is the non-secret value for the development certificates automatically generated during the first node run. Longer term these keys will be managed in secure hardware devices. + .. note:: This is the non-secret value for the development certificates automatically generated during the first node run. + Longer term these keys will be managed in secure hardware devices. -:trustStorePassword: - The password to unlock the Trust store file (``/certificates/truststore.jks``) containing the R3 Corda root certificate. This is the non-secret value for the development certificates automatically generated during the first node run. +:trustStorePassword: The password to unlock the Trust store file (``/certificates/truststore.jks``) containing + the Corda network root certificate. This is the non-secret value for the development certificates automatically + generated during the first node run. .. note:: Longer term these keys will be managed in secure hardware devices. -:dataSourceProperties: - This section is used to configure the jdbc connection and database driver used for the nodes persistence. Currently the defaults in ``/node/src/main/resources/reference.conf`` are as shown in the first example. This is currently the only configuration that has been tested, although in the future full support for other storage layers will be validated. +:dataSourceProperties: This section is used to configure the jdbc connection and database driver used for the nodes persistence. + Currently the defaults in ``/node/src/main/resources/reference.conf`` are as shown in the first example. This is currently + the only configuration that has been tested, although in the future full support for other storage layers will be validated. -:artemisAddress: - The host and port on which the node is available for protocol operations over ArtemisMQ. +:artemisAddress: The host and port on which the node is available for protocol operations over ArtemisMQ. - .. note:: In practice the ArtemisMQ messaging services bind to all local addresses on the specified port. However, note that the host is the included as the advertised entry in the NetworkMapService. As a result the value listed here must be externally accessible when running nodes across a cluster of machines. + .. note:: In practice the ArtemisMQ messaging services bind to all local addresses on the specified port. However, + note that the host is the included as the advertised entry in the NetworkMapService. As a result the value listed + here must be externally accessible when running nodes across a cluster of machines. -:messagingServerAddress: - The address of the ArtemisMQ broker instance. If not provided the node will run one locally. +:messagingServerAddress: The address of the ArtemisMQ broker instance. If not provided the node will run one locally. -:webAddress: - The host and port on which the node is available for web operations. +:webAddress: The host and port on which the node is available for web operations. - .. note:: If HTTPS is enabled then the browser security checks will require that the accessing url host name is one of either the machine name, fully qualified machine name, or server IP address to line up with the Subject Alternative Names contained within the development certificates. This is addition to requiring the ``/config/dev/corda_dev_ca.cer`` root certificate be installed as a Trusted CA. + .. note:: If HTTPS is enabled then the browser security checks will require that the accessing url host name is one + of either the machine name, fully qualified machine name, or server IP address to line up with the Subject Alternative + Names contained within the development certificates. This is addition to requiring the ``/config/dev/corda_dev_ca.cer`` + root certificate be installed as a Trusted CA. -:extraAdvertisedServiceIds: A list of ServiceType id strings to be advertised to the NetworkMapService and thus be available when other nodes query the NetworkMapCache for supporting nodes. This can also include plugin services loaded from .jar files in the plugins folder. Optionally, a custom advertised service name can be provided by appending it to the service type id: ``"corda.notary.validating|Notary A"`` +:extraAdvertisedServiceIds: A list of ServiceType id strings to be advertised to the NetworkMapService and thus be available + when other nodes query the NetworkMapCache for supporting nodes. This can also include plugin services loaded from .jar + files in the plugins folder. Optionally, a custom advertised service name can be provided by appending it to the service + type id: ``"corda.notary.validating|Notary A"`` -:notaryNodeAddress: The host and port to which to bind the embedded Raft server. Required only when running a distributed notary service. A group of Corda nodes can run a distributed notary service by each running an embedded Raft server and joining them to the same cluster to replicate the committed state log. Note that the Raft cluster uses a separate transport layer for communication that does not integrate with ArtemisMQ messaging services. +:notaryNodeAddress: The host and port to which to bind the embedded Raft server. Required only when running a distributed + notary service. A group of Corda nodes can run a distributed notary service by each running an embedded Raft server and + joining them to the same cluster to replicate the committed state log. Note that the Raft cluster uses a separate transport + layer for communication that does not integrate with ArtemisMQ messaging services. -:notaryClusterAddresses: List of Raft cluster member addresses used to joining the cluster. At least one of the specified members must be active and be able to communicate with the cluster leader for joining. If empty, a new cluster will be bootstrapped. Required only when running a distributed notary service. +:notaryClusterAddresses: List of Raft cluster member addresses used to joining the cluster. At least one of the specified + members must be active and be able to communicate with the cluster leader for joining. If empty, a new cluster will be + bootstrapped. Required only when running a distributed notary service. -:networkMapAddress: If `null`, or missing the node is declaring itself as the NetworkMapService host. Otherwise the configuration value is the remote HostAndPort string for the ArtemisMQ service on the hosting node. +:networkMapService: If `null`, or missing the node is declaring itself as the NetworkMapService host. Otherwise this is + a config object with the details of the network map service: -:useHTTPS: If false the node's web server will be plain HTTP. If true the node will use the same certificate and private key from the ``/certificates/sslkeystore.jks`` file as the ArtemisMQ port for HTTPS. If HTTPS is enabled then unencrypted HTTP traffic to the node's **webAddress** port is not supported. + :address: Host and port string of the ArtemisMQ broker hosting the network map node + :legalName: Legal name of the node. This is required as part of the TLS host verification process. The node will + reject the connection to the network map service if it provides a TLS common name which doesn't match with this value. -:rpcUsers: - A list of users who are authorised to access the RPC system. Each user in the list is a config object with the +:useHTTPS: If false the node's web server will be plain HTTP. If true the node will use the same certificate and private + key from the ``/certificates/sslkeystore.jks`` file as the ArtemisMQ port for HTTPS. If HTTPS is enabled + then unencrypted HTTP traffic to the node's **webAddress** port is not supported. + +:rpcUsers: A list of users who are authorised to access the RPC system. Each user in the list is a config object with the following fields: :user: Username consisting only of word characters (a-z, A-Z, 0-9 and _) @@ -98,8 +122,9 @@ Fields If this field is absent or an empty list then RPC is effectively locked down. -:devMode: - This flag indicate if the node is running in development mode. On startup, if the keystore ``/certificates/sslkeystore.jks`` does not exist, a developer keystore will be used if ``devMode`` is true. The node will exit if ``devMode`` is false and keystore does not exist. +:devMode: This flag indicate if the node is running in development mode. On startup, if the keystore ``/certificates/sslkeystore.jks`` + does not exist, a developer keystore will be used if ``devMode`` is true. The node will exit if ``devMode`` is false + and keystore does not exist. -:certificateSigningService: - Certificate Signing Server address. It is used by the certificate signing request utility to obtain SSL certificate. (See :doc:`permissioning` for more information.) +:certificateSigningService: Certificate Signing Server address. It is used by the certificate signing request utility to + obtain SSL certificate. (See :doc:`permissioning` for more information.) diff --git a/docs/source/creating-a-cordapp.rst b/docs/source/creating-a-cordapp.rst index 742e3bc988..ec9821e37d 100644 --- a/docs/source/creating-a-cordapp.rst +++ b/docs/source/creating-a-cordapp.rst @@ -209,7 +209,7 @@ is a three node example; task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['build']) { directory "./build/nodes" // The output directory - networkMap "Controller" // The artemis address of the node named here will be used as the networkMapAddress on all other nodes. + networkMap "Controller" // The artemis address of the node named here will be used as the networkMapService.address on all other nodes. node { name "Controller" dirName "controller" diff --git a/docs/source/example-code/src/main/resources/example-node.conf b/docs/source/example-code/src/main/resources/example-node.conf index 9a9cc830b3..6c2f68cbd1 100644 --- a/docs/source/example-code/src/main/resources/example-node.conf +++ b/docs/source/example-code/src/main/resources/example-node.conf @@ -12,7 +12,10 @@ dataSourceProperties : { artemisAddress : "my-corda-node:10002" webAddress : "localhost:10003" extraAdvertisedServiceIds: "corda.interest_rates" -networkMapAddress : "my-network-map:10000" +networkMapService : { + address : "my-network-map:10000" + legalName : "Network Map Service" +} useHTTPS : false rpcUsers : [ { user=user1, password=letmein, permissions=[ StartProtocol.net.corda.protocols.CashProtocol ] } diff --git a/docs/source/messaging.rst b/docs/source/messaging.rst index a2139589ac..781777405b 100644 --- a/docs/source/messaging.rst +++ b/docs/source/messaging.rst @@ -107,6 +107,9 @@ the validated user is the username itself and the RPC framework uses this to det .. note:: ``Party`` lookup is currently done by the legal name which isn't guaranteed to be unique. A future version will use the full X.500 name as it can provide additional structures for uniqueness. +The broker also does host verification when connecting to another peer. It checks that the TLS certificate common name +matches with the advertised legal name from the network map service. + Messaging types --------------- diff --git a/docs/source/setting-up-a-corda-network.rst b/docs/source/setting-up-a-corda-network.rst index e6b3ae3180..25b0c0ba24 100644 --- a/docs/source/setting-up-a-corda-network.rst +++ b/docs/source/setting-up-a-corda-network.rst @@ -46,8 +46,8 @@ The most important fields regarding network configuration are: but rather ``::`` (all addresses on all interfaces). The hostname specified is the hostname *that must be externally resolvable by other nodes in the network*. In the above configuration this is the resolvable name of a machine in a vpn. * ``webAddress``: The address the webserver should bind. Note that the port should be distinct from that of ``artemisAddress``. -* ``networkMapAddress``: The resolvable name and artemis port of the network map node. Note that if this node itself - is to be the network map this field should not be specified. +* ``networkMapService``: Details of the node running the network map service. If it's this node that's running the service + then this field must not be specified. Starting the nodes ------------------ diff --git a/docs/source/tutorial-cordapp.rst b/docs/source/tutorial-cordapp.rst index 2c82adf187..43ee26ff22 100644 --- a/docs/source/tutorial-cordapp.rst +++ b/docs/source/tutorial-cordapp.rst @@ -392,8 +392,7 @@ Corda nodes can be run on separate machines with little additional configuration When you have successfully run the ``deployNodes`` gradle task, choose which nodes you would like to run on separate machines. Copy the folders for those nodes from ``kotlin/build/nodes`` to the other machines. Make sure that you set the -``networkMapAddress`` property in ``node.conf`` to the correct hostname:port where the network map service node is -hosted. +``networkMapService`` config in ``node.conf`` to the correct hostname:port and legal name of the network map service node. The nodes can be run on each machine with ``java -jar corda.jar`` from the node's directory. @@ -891,9 +890,9 @@ the following changes: * Change the artemis address to the machine's ip address (e.g. `artemisAddress="10.18.0.166:10006"`) -* Change the network map address to the ip address of the machine where the -controller node is running (e.g. `networkMapAddress="10.18.0.166:10002"`) -(please note that the controller will not have a network map address) +* Change the network map service details to the ip address of the machine where the +controller node is running and to its legal name (e.g. `networkMapService.address="10.18.0.166:10002"` and +`networkMapService.legalName=controller`) (please note that the controller will not have the `networkMapService` config) Each machine should now run its nodes using `runnodes` or `runnodes.bat` files. Once they are up and running, the nodes should be able to place diff --git a/gradle-plugins/cordformation/src/main/groovy/net/corda/plugins/Cordform.groovy b/gradle-plugins/cordformation/src/main/groovy/net/corda/plugins/Cordform.groovy index 5a2b24ae2f..3cdbf8d69f 100644 --- a/gradle-plugins/cordformation/src/main/groovy/net/corda/plugins/Cordform.groovy +++ b/gradle-plugins/cordformation/src/main/groovy/net/corda/plugins/Cordform.groovy @@ -22,7 +22,7 @@ class Cordform extends DefaultTask { * @param directory The directory the nodes will be installed into. * @return */ - public void directory(String directory) { + void directory(String directory) { this.directory = Paths.get(directory) } @@ -32,7 +32,7 @@ class Cordform extends DefaultTask { * @warning Ensure the node name is one of the configured nodes. * @param nodeName The name of the node that will host the network map. */ - public void networkMap(String nodeName) { + void networkMap(String nodeName) { networkMapNodeName = nodeName } @@ -41,7 +41,7 @@ class Cordform extends DefaultTask { * * @param configureClosure A node configuration that will be deployed. */ - public void node(Closure configureClosure) { + void node(Closure configureClosure) { nodes << project.configure(new Node(project), configureClosure) } @@ -85,7 +85,7 @@ class Cordform extends DefaultTask { Node networkMapNode = getNodeByName(networkMapNodeName) nodes.each { if(it != networkMapNode) { - it.networkMapAddress(networkMapNode.getArtemisAddress()) + it.networkMapAddress(networkMapNode.getArtemisAddress(), networkMapNodeName) } it.build(directory.toFile()) } diff --git a/gradle-plugins/cordformation/src/main/groovy/net/corda/plugins/Node.groovy b/gradle-plugins/cordformation/src/main/groovy/net/corda/plugins/Node.groovy index cdb2cd6362..e50cdc02ff 100644 --- a/gradle-plugins/cordformation/src/main/groovy/net/corda/plugins/Node.groovy +++ b/gradle-plugins/cordformation/src/main/groovy/net/corda/plugins/Node.groovy @@ -122,11 +122,14 @@ class Node { * * @warning This should not be directly set unless you know what you are doing. Use the networkMapName in the * Cordform task instead. - * @param networkMapAddress Network map address. + * @param networkMapAddress Network map node address. + * @param networkMapLegalName Network map node legal name. */ - void networkMapAddress(String networkMapAddress) { - config = config.withValue("networkMapAddress", - ConfigValueFactory.fromAnyRef(networkMapAddress)) + void networkMapAddress(String networkMapAddress, String networkMapLegalName) { + def networkMapService = new HashMap() + networkMapService.put("address", networkMapAddress) + networkMapService.put("legalName", networkMapLegalName) + config = config.withValue("networkMapService", ConfigValueFactory.fromMap(networkMapService)) } Node(Project project) { diff --git a/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt index fd01b0b733..ed8f85c9df 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt @@ -1,11 +1,14 @@ package net.corda.node.services +import com.google.common.util.concurrent.Futures +import com.google.common.util.concurrent.ListenableFuture import net.corda.core.contracts.DummyContract import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateRef import net.corda.core.contracts.TransactionType import net.corda.core.crypto.Party import net.corda.core.div +import net.corda.core.flatMap import net.corda.core.getOrThrow import net.corda.core.node.services.ServiceInfo import net.corda.flows.NotaryError @@ -30,8 +33,7 @@ class RaftNotaryServiceTests : NodeBasedTest() { @Test fun `detect double spend`() { - val masterNode = createNotaryCluster() - val alice = startNode("Alice") + val (masterNode, alice) = Futures.allAsList(createNotaryCluster(), startNode("Alice")).getOrThrow() val notaryParty = alice.netMapCache.getNotary(notaryName)!! @@ -60,7 +62,7 @@ class RaftNotaryServiceTests : NodeBasedTest() { assertEquals(error.tx, stx.tx) } - private fun createNotaryCluster(): Node { + private fun createNotaryCluster(): ListenableFuture { val notaryService = ServiceInfo(RaftValidatingNotaryService.type, notaryName) val notaryAddresses = getFreeLocalPorts("localhost", clusterSize).map { it.toString() } ServiceIdentityGenerator.generateToDisk( @@ -73,16 +75,16 @@ class RaftNotaryServiceTests : NodeBasedTest() { advertisedServices = setOf(notaryService), configOverrides = mapOf("notaryNodeAddress" to notaryAddresses[0])) - for (i in 1 until clusterSize) { + val remainingNodes = (1 until clusterSize).map { startNode( - "Notary$i", + "Notary$it", advertisedServices = setOf(notaryService), configOverrides = mapOf( - "notaryNodeAddress" to notaryAddresses[i], + "notaryNodeAddress" to notaryAddresses[it], "notaryClusterAddresses" to listOf(notaryAddresses[0]))) } - return masterNode + return Futures.allAsList(remainingNodes).flatMap { masterNode } } private fun issueState(node: AbstractNode, notary: Party, notaryKey: KeyPair): StateAndRef<*> { diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt new file mode 100644 index 0000000000..2a871e0315 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt @@ -0,0 +1,50 @@ +package net.corda.services.messaging + +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEER_USER +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE +import net.corda.testing.messaging.SimpleMQClient +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration +import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException +import org.apache.activemq.artemis.api.core.ActiveMQSecurityException +import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.junit.Test + +/** + * Runs the security tests with the attacker pretending to be a node on the network. + */ +class MQSecurityAsNodeTest : MQSecurityTest() { + + override fun startAttacker(attacker: SimpleMQClient) { + attacker.start(PEER_USER, PEER_USER) // Login as a peer + } + + @Test + fun `send message to RPC requests address`() { + assertSendAttackFails(RPC_REQUESTS_QUEUE) + } + + @Test + fun `only the node running the broker can login using the special node user`() { + val attacker = clientTo(alice.configuration.artemisAddress) + assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { + attacker.start(NODE_USER, NODE_USER) + } + } + + @Test + fun `login as the default cluster user`() { + val attacker = clientTo(alice.configuration.artemisAddress) + assertThatExceptionOfType(ActiveMQClusterSecurityException::class.java).isThrownBy { + attacker.start(ActiveMQDefaultConfiguration.getDefaultClusterUser(), ActiveMQDefaultConfiguration.getDefaultClusterPassword()) + } + } + + @Test + fun `login without a username and password`() { + val attacker = clientTo(alice.configuration.artemisAddress) + assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { + attacker.start() + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/RPCSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsRPCTest.kt similarity index 89% rename from node/src/integration-test/kotlin/net/corda/services/messaging/RPCSecurityTest.kt rename to node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsRPCTest.kt index 0e21dceccc..f2b2bbc5db 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/RPCSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsRPCTest.kt @@ -6,7 +6,7 @@ import net.corda.testing.messaging.SimpleMQClient /** * Runs the security tests with the attacker being a valid RPC user of Alice. */ -class RPCSecurityTest : MQSecurityTest() { +class MQSecurityAsRPCTest : MQSecurityTest() { override val extraRPCUsers = listOf(User("evil", "pass", permissions = emptySet())) override fun startAttacker(attacker: SimpleMQClient) { diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt index 576c9f9329..f2c8614f0d 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt @@ -12,8 +12,11 @@ import net.corda.core.random63BitValue import net.corda.core.seconds import net.corda.node.internal.Node import net.corda.node.services.User +import net.corda.node.services.config.NodeSSLConfiguration +import net.corda.node.services.config.configureTestSSL import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX -import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NETWORK_MAP_ADDRESS +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NETWORK_MAP_QUEUE import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.P2P_QUEUE import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEERS_PREFIX @@ -44,8 +47,8 @@ abstract class MQSecurityTest : NodeBasedTest() { @Before fun start() { - alice = startNode("Alice", rpcUsers = extraRPCUsers + rpcUser) - attacker = SimpleMQClient(alice.configuration.artemisAddress) + alice = startNode("Alice", rpcUsers = extraRPCUsers + rpcUser).getOrThrow() + attacker = clientTo(alice.configuration.artemisAddress) startAttacker(attacker) } @@ -70,27 +73,31 @@ abstract class MQSecurityTest : NodeBasedTest() { } @Test - fun `send message to peer address`() { + fun `send message to address of peer which has been communicated with`() { val bobParty = startBobAndCommunicateWithAlice() assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}") } + @Test + fun `create queue for peer which has not been communciated with`() { + val bob = startNode("Bob").getOrThrow() + assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.legalIdentity.owningKey.toBase58String()}") + } + @Test fun `create queue for unknown peer`() { val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.composite.toBase58String()}" - assertNonTempQueueCreationAttackFails(invalidPeerQueue, durable = true) - assertNonTempQueueCreationAttackFails(invalidPeerQueue, durable = false) - assertTempQueueCreationAttackFails(invalidPeerQueue) + assertAllQueueCreationAttacksFail(invalidPeerQueue) } @Test fun `consume message from network map queue`() { - assertConsumeAttackFails(NETWORK_MAP_ADDRESS.toString()) + assertConsumeAttackFails(NETWORK_MAP_QUEUE) } @Test fun `send message to network map address`() { - assertSendAttackFails(NETWORK_MAP_ADDRESS.toString()) + assertSendAttackFails(NETWORK_MAP_QUEUE) } @Test @@ -133,15 +140,19 @@ abstract class MQSecurityTest : NodeBasedTest() { } @Test - fun `create random queue`() { - val randomQueue = random63BitValue().toString() - assertNonTempQueueCreationAttackFails(randomQueue, durable = false) - assertNonTempQueueCreationAttackFails(randomQueue, durable = true) - assertTempQueueCreationAttackFails(randomQueue) + fun `create random internal queue`() { + val randomQueue = "$INTERNAL_PREFIX${random63BitValue()}" + assertAllQueueCreationAttacksFail(randomQueue) } - fun clientTo(target: HostAndPort): SimpleMQClient { - val client = SimpleMQClient(target) + @Test + fun `create random queue`() { + val randomQueue = random63BitValue().toString() + assertAllQueueCreationAttacksFail(randomQueue) + } + + fun clientTo(target: HostAndPort, config: NodeSSLConfiguration = configureTestSSL()): SimpleMQClient { + val client = SimpleMQClient(target, config) clients += client return client } @@ -164,6 +175,12 @@ abstract class MQSecurityTest : NodeBasedTest() { return rpcClient.session.addressQuery(clientQueueQuery).queueNames.single().toString() } + fun assertAllQueueCreationAttacksFail(queue: String) { + assertNonTempQueueCreationAttackFails(queue, durable = true) + assertNonTempQueueCreationAttackFails(queue, durable = false) + assertTempQueueCreationAttackFails(queue) + } + fun assertTempQueueCreationAttackFails(queue: String) { assertAttackFails(queue, "CREATE_NON_DURABLE_QUEUE") { attacker.session.createTemporaryQueue(queue, queue) @@ -210,7 +227,7 @@ abstract class MQSecurityTest : NodeBasedTest() { } private fun startBobAndCommunicateWithAlice(): Party { - val bob = startNode("Bob") + val bob = startNode("Bob").getOrThrow() bob.services.registerFlowInitiator(SendFlow::class, ::ReceiveFlow) val bobParty = bob.info.legalIdentity // Perform a protocol exchange to force the peer queue to be created diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index 70824fcec5..e4cf731da6 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -1,6 +1,7 @@ package net.corda.services.messaging import co.paralleluniverse.fibers.Suspendable +import com.google.common.util.concurrent.Futures import net.corda.core.crypto.Party import net.corda.core.div import net.corda.core.flows.FlowLogic @@ -21,9 +22,7 @@ class P2PMessagingTest : NodeBasedTest() { @Test fun `network map will work after restart`() { fun startNodes() { - startNode("NodeA") - startNode("NodeB") - startNode("Notary") + Futures.allAsList(startNode("NodeA"), startNode("NodeB"), startNode("Notary")).getOrThrow() } startNodes() @@ -41,7 +40,8 @@ class P2PMessagingTest : NodeBasedTest() { startNetworkMapNode(advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type))) networkMapNode.services.registerFlowInitiator(ReceiveFlow::class) { SendFlow(it, "Hello") } val serviceParty = networkMapNode.services.networkMapCache.getAnyNotary()!! - val received = startNode("Alice").services.startFlow(ReceiveFlow(serviceParty)).resultFuture.getOrThrow(10.seconds) + val alice = startNode("Alice").getOrThrow() + val received = alice.services.startFlow(ReceiveFlow(serviceParty)).resultFuture.getOrThrow(10.seconds) assertThat(received).isEqualTo("Hello") } @@ -63,13 +63,15 @@ class P2PMessagingTest : NodeBasedTest() { "NetworkMap", advertisedServices = setOf(distributedService), configOverrides = mapOf("notaryNodeAddress" to notaryClusterAddress.toString())) - val alice = startNode( + val (alice, bob) = Futures.allAsList( + startNode( "Alice", advertisedServices = setOf(distributedService), configOverrides = mapOf( "notaryNodeAddress" to freeLocalHostAndPort().toString(), - "notaryClusterAddresses" to listOf(notaryClusterAddress.toString()))) - val bob = startNode("Bob") + "notaryClusterAddresses" to listOf(notaryClusterAddress.toString()))), + startNode("Bob") + ).getOrThrow() // Setup each node in the distributed service to return back it's Party so that we can know which node is being used val serviceNodes = listOf(networkMapNode, alice) diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt index 2e10d079dd..e6c7faaab7 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt @@ -1,53 +1,71 @@ package net.corda.services.messaging -import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER -import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEER_USER -import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE -import net.corda.testing.messaging.SimpleMQClient -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration -import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException -import org.apache.activemq.artemis.api.core.ActiveMQSecurityException +import com.google.common.util.concurrent.ListenableFuture +import kotlinx.support.jdk7.use +import net.corda.core.crypto.Party +import net.corda.core.div +import net.corda.core.getOrThrow +import net.corda.core.node.NodeInfo +import net.corda.core.random63BitValue +import net.corda.core.seconds +import net.corda.flows.sendRequest +import net.corda.node.internal.NetworkMapInfo +import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.node.services.network.NetworkMapService +import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW_TOPIC +import net.corda.node.services.network.NetworkMapService.RegistrationRequest +import net.corda.node.services.network.NodeRegistration +import net.corda.node.utilities.AddOrRemove +import net.corda.testing.TestNodeConfiguration +import net.corda.testing.node.NodeBasedTest +import net.corda.testing.node.SimpleNode import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.Test +import java.time.Instant +import java.util.concurrent.TimeoutException -/** - * Runs the security tests with the attacker pretending to be a node on the network. - */ -class P2PSecurityTest : MQSecurityTest() { +class P2PSecurityTest : NodeBasedTest() { - override fun startAttacker(attacker: SimpleMQClient) { - attacker.start(PEER_USER, PEER_USER) // Login as a peer + @Test + fun `incorrect legal name for the network map service config`() { + val incorrectNetworkMapName = random63BitValue().toString() + val node = startNode("Bob", configOverrides = mapOf( + "networkMapService" to mapOf( + "address" to networkMapNode.configuration.artemisAddress.toString(), + "legalName" to incorrectNetworkMapName + ) + )) + // The connection will be rejected as the legal name doesn't match + assertThatThrownBy { node.getOrThrow() }.hasMessageContaining(incorrectNetworkMapName) } @Test - fun `send message to RPC requests address`() { - assertSendAttackFails(RPC_REQUESTS_QUEUE) - } - - @Test - fun `only the node running the broker can login using the special node user`() { - val attacker = SimpleMQClient(alice.configuration.artemisAddress) - assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { - attacker.start(NODE_USER, NODE_USER) + fun `register with the network map service using a legal name different from the TLS CN`() { + startSimpleNode("Attacker").use { + // Register with the network map using a different legal name + val response = it.registerWithNetworkMap("Legit Business") + // We don't expect a response because the network map's host verification will prevent a connection back + // to the attacker as the TLS CN will not match the legal name it has just provided + assertThatExceptionOfType(TimeoutException::class.java).isThrownBy { + response.getOrThrow(2.seconds) + } } - attacker.stop() } - @Test - fun `login as the default cluster user`() { - val attacker = SimpleMQClient(alice.configuration.artemisAddress) - assertThatExceptionOfType(ActiveMQClusterSecurityException::class.java).isThrownBy { - attacker.start(ActiveMQDefaultConfiguration.getDefaultClusterUser(), ActiveMQDefaultConfiguration.getDefaultClusterPassword()) - } - attacker.stop() + private fun startSimpleNode(legalName: String): SimpleNode { + val config = TestNodeConfiguration( + basedir = tempFolder.root.toPath() / legalName, + myLegalName = legalName, + networkMapService = NetworkMapInfo(networkMapNode.configuration.artemisAddress, networkMapNode.info.legalIdentity.name)) + config.configureWithDevSSLCertificate() // This creates the node's TLS cert with the CN as the legal name + return SimpleNode(config).apply { start() } } - @Test - fun `login without a username and password`() { - val attacker = SimpleMQClient(alice.configuration.artemisAddress) - assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { - attacker.start() - } - attacker.stop() + private fun SimpleNode.registerWithNetworkMap(registrationName: String): ListenableFuture { + val nodeInfo = NodeInfo(net.myAddress, Party(registrationName, identity.public)) + val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX) + val request = RegistrationRequest(registration.toWire(identity.private), net.myAddress) + return net.sendRequest(REGISTER_FLOW_TOPIC, request, networkMapNode.net.myAddress) } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/Main.kt b/node/src/main/kotlin/net/corda/node/Main.kt index f5a8fe15dc..9c7b5a169f 100644 --- a/node/src/main/kotlin/net/corda/node/Main.kt +++ b/node/src/main/kotlin/net/corda/node/Main.kt @@ -2,10 +2,7 @@ package net.corda.node import com.typesafe.config.ConfigException import joptsimple.OptionParser -import net.corda.core.div -import net.corda.core.randomOrNull -import net.corda.core.rootCause -import net.corda.core.then +import net.corda.core.* import net.corda.core.utilities.Emoji import net.corda.node.internal.Node import net.corda.node.services.config.ConfigHelper @@ -82,32 +79,33 @@ fun main(args: Array) { } val dir = conf.basedir.toAbsolutePath().normalize() - log.info("Main class: ${FullNodeConfiguration::class.java.protectionDomain.codeSource.location.toURI().getPath()}") + log.info("Main class: ${FullNodeConfiguration::class.java.protectionDomain.codeSource.location.toURI().path}") val info = ManagementFactory.getRuntimeMXBean() - log.info("CommandLine Args: ${info.getInputArguments().joinToString(" ")}") + log.info("CommandLine Args: ${info.inputArguments.joinToString(" ")}") log.info("Application Args: ${args.joinToString(" ")}") log.info("bootclasspath: ${info.bootClassPath}") log.info("classpath: ${info.classPath}") log.info("VM ${info.vmName} ${info.vmVendor} ${info.vmVersion}") log.info("Machine: ${InetAddress.getLocalHost().hostName}") - log.info("Working Directory: ${dir}") + log.info("Working Directory: $dir") try { - val dirFile = dir.toFile() - if (!dirFile.exists()) - dirFile.mkdirs() + dir.createDirectories() val node = conf.createNode() node.start() printPluginsAndServices(node) - node.networkMapRegistrationFuture.then { + node.networkMapRegistrationFuture.success { val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0 printBasicNodeInfo("Node started up and registered in $elapsed sec") if (renderBasicInfoToConsole) ANSIProgressObserver(node.smm) + } failure { + log.error("Error during network map registration", it) + exitProcess(1) } node.run() } catch (e: Exception) { diff --git a/node/src/main/kotlin/net/corda/node/driver/Driver.kt b/node/src/main/kotlin/net/corda/node/driver/Driver.kt index 03617274f6..c8fab91057 100644 --- a/node/src/main/kotlin/net/corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/net/corda/node/driver/Driver.kt @@ -19,7 +19,6 @@ import net.corda.core.utilities.loggerFor import net.corda.node.services.User import net.corda.node.services.config.ConfigHelper import net.corda.node.services.config.FullNodeConfiguration -import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.RaftValidatingNotaryService @@ -260,7 +259,7 @@ open class DriverDSL( val isDebug: Boolean ) : DriverDSLInternalInterface { private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(2) - private val networkMapName = "NetworkMapService" + private val networkMapLegalName = "NetworkMapService" private val networkMapAddress = portAllocation.nextHostAndPort() class State { @@ -291,9 +290,7 @@ open class DriverDSL( override fun shutdown() { state.locked { - clients.forEach { - it.stop() - } + clients.forEach(NodeMessagingClient::stop) registeredProcesses.forEach { it.get().destroy() } @@ -353,7 +350,10 @@ open class DriverDSL( "artemisAddress" to messagingAddress.toString(), "webAddress" to apiAddress.toString(), "extraAdvertisedServiceIds" to advertisedServices.joinToString(","), - "networkMapAddress" to networkMapAddress.toString(), + "networkMapService" to mapOf( + "address" to networkMapAddress.toString(), + "legalName" to networkMapLegalName + ), "useTestClock" to useTestClock, "rpcUsers" to rpcUsers.map { mapOf( @@ -416,12 +416,12 @@ open class DriverDSL( val apiAddress = portAllocation.nextHostAndPort() val debugPort = if (isDebug) debugPortAllocation.nextPort() else null - val baseDirectory = driverDirectory / networkMapName + val baseDirectory = driverDirectory / networkMapLegalName val config = ConfigHelper.loadConfig( baseDirectoryPath = baseDirectory, allowMissingConfig = true, configOverrides = mapOf( - "myLegalName" to networkMapName, + "myLegalName" to networkMapLegalName, "basedir" to baseDirectory.normalize().toString(), "artemisAddress" to networkMapAddress.toString(), "webAddress" to apiAddress.toString(), diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 901045648c..f678dff1a8 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -45,7 +45,7 @@ import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.transactions.* import net.corda.node.services.vault.CashBalanceAsMetricsObserver import net.corda.node.services.vault.NodeVaultService -import net.corda.node.utilities.AddOrRemove +import net.corda.node.utilities.AddOrRemove.ADD import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.databaseTransaction @@ -72,8 +72,9 @@ import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair // TODO: Where this node is the initial network map service, currently no networkMapService is provided. // In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the // AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in. -abstract class AbstractNode(open val configuration: NodeConfiguration, val networkMapService: SingleMessageRecipient?, - val advertisedServices: Set, val platformClock: Clock) : SingletonSerializeAsToken() { +abstract class AbstractNode(open val configuration: NodeConfiguration, + val advertisedServices: Set, + val platformClock: Clock) : SingletonSerializeAsToken() { companion object { val PRIVATE_KEY_FILE_NAME = "identity-private-key" val PUBLIC_IDENTITY_FILE_NAME = "identity-public" @@ -95,6 +96,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo var networkMapSeq: Long = 1 protected abstract val log: Logger + protected abstract val networkMapAddress: SingleMessageRecipient? // We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the // low-performance prototyping period. @@ -174,8 +176,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo var isPreviousCheckpointsPresent = false private set + protected val _networkMapRegistrationFuture: SettableFuture = SettableFuture.create() /** Completes once the node has successfully registered with the network map service */ - private val _networkMapRegistrationFuture: SettableFuture = SettableFuture.create() val networkMapRegistrationFuture: ListenableFuture get() = _networkMapRegistrationFuture @@ -259,7 +261,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo } startMessagingService(CordaRPCOpsImpl(services, smm, database)) runOnStop += Runnable { net.stop() } - _networkMapRegistrationFuture.setFuture(registerWithNetworkMap()) + _networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured()) smm.start() // Shut down the SMM so no Fibers are scheduled. runOnStop += Runnable { smm.stop(acceptableLiveFiberCountOnStop()) } @@ -355,7 +357,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo return serviceList } - /** * Run any tasks that are needed to ensure the node is in a correct state before running start(). */ @@ -374,27 +375,43 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo } } - /** - * Register this node with the network map cache, and load network map from a remote service (and register for - * updates) if one has been supplied. - */ - private fun registerWithNetworkMap(): ListenableFuture { - require(networkMapService != null || NetworkMapService.type in advertisedServices.map { it.type }) { + private fun registerWithNetworkMapIfConfigured(): ListenableFuture { + require(networkMapAddress != null || NetworkMapService.type in advertisedServices.map { it.type }) { "Initial network map address must indicate a node that provides a network map service" } services.networkMapCache.addNode(info) // In the unit test environment, we may run without any network map service sometimes. - if (networkMapService == null && inNodeNetworkMapService == null) { + return if (networkMapAddress == null && inNodeNetworkMapService == null) { services.networkMapCache.runWithoutMapService() - return noNetworkMapConfigured() + noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache + + } else { + registerWithNetworkMap() } - return registerWithNetworkMap(networkMapService ?: info.address) } - private fun registerWithNetworkMap(networkMapServiceAddress: SingleMessageRecipient): ListenableFuture { + /** + * Register this node with the network map cache, and load network map from a remote service (and register for + * updates) if one has been supplied. + */ + protected open fun registerWithNetworkMap(): ListenableFuture { + val address = networkMapAddress ?: info.address // Register for updates, even if we're the one running the network map. - updateRegistration(networkMapServiceAddress, AddOrRemove.ADD) - return services.networkMapCache.addMapService(net, networkMapServiceAddress, true, null) + return sendNetworkMapRegistration(address).flatMap { response -> + check(response.success) { "The network map service rejected our registration request" } + // This Future will complete on the same executor as sendNetworkMapRegistration, namely the one used by net + services.networkMapCache.addMapService(net, address, true, null) + } + } + + private fun sendNetworkMapRegistration(networkMapAddress: SingleMessageRecipient): ListenableFuture { + // Register this node against the network + val instant = platformClock.instant() + val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD + val reg = NodeRegistration(info, instant.toEpochMilli(), ADD, expires) + val legalIdentityKey = obtainLegalIdentityKey() + val request = NetworkMapService.RegistrationRequest(reg.toWire(legalIdentityKey.private), net.myAddress) + return net.sendRequest(REGISTER_FLOW_TOPIC, request, networkMapAddress) } /** This is overriden by the mock node implementation to enable operation without any network map service */ @@ -404,16 +421,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo "has any other map node been configured.") } - private fun updateRegistration(networkMapAddr: SingleMessageRecipient, type: AddOrRemove): ListenableFuture { - // Register this node against the network - val instant = platformClock.instant() - val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD - val reg = NodeRegistration(info, instant.toEpochMilli(), type, expires) - val legalIdentityKey = obtainLegalIdentityKey() - val request = NetworkMapService.RegistrationRequest(reg.toWire(legalIdentityKey.private), net.myAddress) - return net.sendRequest(REGISTER_FLOW_TOPIC, request, networkMapAddr) - } - protected open fun makeKeyManagementService(): KeyManagementService = PersistentKeyManagementService(partyKeys) open protected fun makeNetworkMapService() { diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 7e6a466d03..0d22fa7523 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -1,15 +1,18 @@ package net.corda.node.internal import com.codahale.metrics.JmxReporter +import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.Futures +import com.google.common.util.concurrent.ListenableFuture import net.corda.core.div -import net.corda.core.getOrThrow +import net.corda.core.flatMap import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.RPCOps -import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.ServiceHub import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceType import net.corda.core.node.services.UniquenessProvider +import net.corda.core.success import net.corda.core.utilities.loggerFor import net.corda.node.printBasicNodeInfo import net.corda.node.serialization.NodeClock @@ -17,11 +20,11 @@ import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserServiceImpl import net.corda.node.services.api.MessagingServiceInternal import net.corda.node.services.config.FullNodeConfiguration +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.CordaRPCClient import net.corda.node.services.messaging.NodeMessagingClient -import net.corda.node.services.startFlowPermission import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.RaftUniquenessProvider import net.corda.node.services.transactions.RaftValidatingNotaryService @@ -53,24 +56,21 @@ import java.util.* import javax.management.ObjectName import javax.servlet.* import kotlin.concurrent.thread -import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER - -class ConfigurationException(message: String) : Exception(message) /** * A Node manages a standalone server that takes part in the P2P network. It creates the services found in [ServiceHub], * loads important data off disk and starts listening for connections. * * @param configuration This is typically loaded from a TypeSafe HOCON configuration file. - * @param networkMapAddress An external network map service to use. Should only ever be null when creating the first - * network map service, while bootstrapping a network. * @param advertisedServices The services this node advertises. This must be a subset of the services it runs, * but nodes are not required to advertise services they run (hence subset). * @param clock The clock used within the node and by all flows etc. */ -class Node(override val configuration: FullNodeConfiguration, networkMapAddress: SingleMessageRecipient?, - advertisedServices: Set, clock: Clock = NodeClock()) : AbstractNode(configuration, networkMapAddress, advertisedServices, clock) { +class Node(override val configuration: FullNodeConfiguration, + advertisedServices: Set, + clock: Clock = NodeClock()) : AbstractNode(configuration, advertisedServices, clock) { override val log = loggerFor() + override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress) // DISCUSSION // @@ -125,25 +125,22 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: override fun makeMessagingService(): MessagingServiceInternal { userService = RPCUserServiceImpl(configuration) - val serverAddr = with(configuration) { + val serverAddress = with(configuration) { messagingServerAddress ?: { messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache, userService) artemisAddress }() } - val legalIdentity = obtainLegalIdentity() - val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null - return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database, networkMapRegistrationFuture) + val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null + return NodeMessagingClient(configuration, serverAddress, myIdentityOrNullIfNetworkMapService, serverThread, database, + networkMapRegistrationFuture) } override fun startMessagingService(rpcOps: RPCOps) { // Start up the embedded MQ server messageBroker?.apply { - runOnStop += Runnable { messageBroker?.stop() } + runOnStop += Runnable { stop() } start() - if (networkMapService is NetworkMapAddress) { - deployBridgeIfAbsent(networkMapService.queueName, networkMapService.hostAndPort) - } } // Start up the MQ client. @@ -151,6 +148,15 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: net.start(rpcOps, userService) } + /** + * Insert an initial step in the registration process which will throw an exception if a non-recoverable error is + * encountered when trying to connect to the network map node. + */ + override fun registerWithNetworkMap(): ListenableFuture { + val networkMapConnection = messageBroker?.networkMapConnectionFuture ?: Futures.immediateFuture(Unit) + return networkMapConnection.flatMap { super.registerWithNetworkMap() } + } + // TODO: add flag to enable/disable webserver private fun initWebServer(localRpc: CordaRPCOps): Server { // Note that the web server handlers will all run concurrently, and not on the node thread. @@ -308,32 +314,36 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: override fun start(): Node { alreadyRunningNodeCheck() super.start() - // Only start the service API requests once the network map registration is complete - thread(name = "WebServer") { - networkMapRegistrationFuture.getOrThrow() - try { - webServer = initWebServer(connectLocalRpcAsNodeUser()) - } catch(ex: Exception) { - // TODO: We need to decide if this is a fatal error, given the API is unavailable, or whether the API - // is not critical and we continue anyway. - log.error("Web server startup failed", ex) + + // Only start the service API requests once the network map registration is successfully complete + networkMapRegistrationFuture.success { + // This needs to be in a seperate thread so that we can reply to our own request to become RPC clients + thread(name = "WebServer") { + try { + webServer = initWebServer(connectLocalRpcAsNodeUser()) + } catch(ex: Exception) { + // TODO: We need to decide if this is a fatal error, given the API is unavailable, or whether the API + // is not critical and we continue anyway. + log.error("Web server startup failed", ex) + } + // Begin exporting our own metrics via JMX. + JmxReporter. + forRegistry(services.monitoringService.metrics). + inDomain("net.corda"). + createsObjectNamesWith { type, domain, name -> + // Make the JMX hierarchy a bit better organised. + val category = name.substringBefore('.') + val subName = name.substringAfter('.', "") + if (subName == "") + ObjectName("$domain:name=$category") + else + ObjectName("$domain:type=$category,name=$subName") + }. + build(). + start() } - // Begin exporting our own metrics via JMX. - JmxReporter. - forRegistry(services.monitoringService.metrics). - inDomain("net.corda"). - createsObjectNamesWith { type, domain, name -> - // Make the JMX hierarchy a bit better organised. - val category = name.substringBefore('.') - val subName = name.substringAfter('.', "") - if (subName == "") - ObjectName("$domain:name=$category") - else - ObjectName("$domain:type=$category,name=$subName") - }. - build(). - start() } + shutdownThread = thread(start = false) { stop() } @@ -405,16 +415,16 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: // Servlet filter to wrap API requests with a database transaction. private class DatabaseTransactionFilter(val database: Database) : Filter { - override fun init(filterConfig: FilterConfig?) { - } - - override fun destroy() { - } - override fun doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain) { databaseTransaction(database) { chain.doFilter(request, response) } } + override fun init(filterConfig: FilterConfig?) {} + override fun destroy() {} } } + +class ConfigurationException(message: String) : Exception(message) + +data class NetworkMapInfo(val address: HostAndPort, val legalName: String) diff --git a/node/src/main/kotlin/net/corda/node/services/RPCUserService.kt b/node/src/main/kotlin/net/corda/node/services/RPCUserService.kt index 760f3fe8d3..f9b242597a 100644 --- a/node/src/main/kotlin/net/corda/node/services/RPCUserService.kt +++ b/node/src/main/kotlin/net/corda/node/services/RPCUserService.kt @@ -1,7 +1,7 @@ package net.corda.node.services import net.corda.core.flows.FlowLogic -import net.corda.node.services.config.FullNodeConfiguration +import net.corda.node.services.config.NodeConfiguration /** * Service for retrieving [User] objects representing RPC users who are authorised to use the RPC system. A [User] @@ -15,7 +15,7 @@ interface RPCUserService { // TODO Store passwords as salted hashes // TODO Or ditch this and consider something like Apache Shiro -class RPCUserServiceImpl(config: FullNodeConfiguration) : RPCUserService { +class RPCUserServiceImpl(config: NodeConfiguration) : RPCUserService { private val _users = config.rpcUsers.associateBy(User::username) diff --git a/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt b/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt index 47e5ed0eec..e9b4c3ed81 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt @@ -119,12 +119,13 @@ private fun NodeSSLConfiguration.configureDevKeyAndTrustStores(myLegalName: Stri } // TODO Move this to CoreTestUtils.kt once we can pry this from the explorer -fun configureTestSSL(): NodeSSLConfiguration = object : NodeSSLConfiguration { +@JvmOverloads +fun configureTestSSL(legalName: String = "Mega Corp."): NodeSSLConfiguration = object : NodeSSLConfiguration { override val certificatesPath = Files.createTempDirectory("certs") override val keyStorePassword: String get() = "cordacadevpass" override val trustStorePassword: String get() = "trustpass" init { - configureDevKeyAndTrustStores("Mega Corp.") + configureDevKeyAndTrustStores(legalName) } } diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 31dea8a679..463c485a26 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -3,12 +3,11 @@ package net.corda.node.services.config import com.google.common.net.HostAndPort import com.typesafe.config.Config import net.corda.core.div -import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.services.ServiceInfo +import net.corda.node.internal.NetworkMapInfo import net.corda.node.internal.Node import net.corda.node.serialization.NodeClock import net.corda.node.services.User -import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.services.network.NetworkMapService import net.corda.node.utilities.TestClock import java.nio.file.Path @@ -26,10 +25,12 @@ interface NodeConfiguration : NodeSSLConfiguration { val basedir: Path override val certificatesPath: Path get() = basedir / "certificates" val myLegalName: String + val networkMapService: NetworkMapInfo? val nearestCity: String val emailAddress: String val exportJMXto: String val dataSourceProperties: Properties get() = Properties() + val rpcUsers: List get() = emptyList() val devMode: Boolean } @@ -38,12 +39,25 @@ class FullNodeConfiguration(val config: Config) : NodeConfiguration { override val myLegalName: String by config override val nearestCity: String by config override val emailAddress: String by config - override val exportJMXto: String = "http" + override val exportJMXto: String get() = "http" override val keyStorePassword: String by config override val trustStorePassword: String by config override val dataSourceProperties: Properties by config override val devMode: Boolean by config.getOrElse { false } - val networkMapAddress: HostAndPort? by config.getOrElse { null } + override val networkMapService: NetworkMapInfo? = config.getOptionalConfig("networkMapService")?.run { + NetworkMapInfo( + HostAndPort.fromString(getString("address")), + getString("legalName")) + } + override val rpcUsers: List = config + .getListOrElse("rpcUsers") { emptyList() } + .map { + val username = it.getString("user") + require(username.matches("\\w+".toRegex())) { "Username $username contains invalid characters" } + val password = it.getString("password") + val permissions = it.getListOrElse("permissions") { emptyList() }.toSet() + User(username, password, permissions) + } val useHTTPS: Boolean by config val artemisAddress: HostAndPort by config val webAddress: HostAndPort by config @@ -51,30 +65,23 @@ class FullNodeConfiguration(val config: Config) : NodeConfiguration { val extraAdvertisedServiceIds: String by config val useTestClock: Boolean by config.getOrElse { false } val notaryNodeAddress: HostAndPort? by config.getOrElse { null } - val notaryClusterAddresses: List = config.getListOrElse("notaryClusterAddresses") { emptyList() }.map { HostAndPort.fromString(it) } - val rpcUsers: List = - config.getListOrElse("rpcUsers") { emptyList() } - .map { - val username = it.getString("user") - require(username.matches("\\w+".toRegex())) { "Username $username contains invalid characters" } - val password = it.getString("password") - val permissions = it.getListOrElse("permissions") { emptyList() }.toSet() - User(username, password, permissions) - } + val notaryClusterAddresses: List = config + .getListOrElse("notaryClusterAddresses") { emptyList() } + .map { HostAndPort.fromString(it) } fun createNode(): Node { // This is a sanity feature do not remove. require(!useTestClock || devMode) { "Cannot use test clock outside of dev mode" } - val advertisedServices = mutableSetOf() - if (!extraAdvertisedServiceIds.isNullOrEmpty()) { - for (serviceId in extraAdvertisedServiceIds.split(",")) { - advertisedServices.add(ServiceInfo.parse(serviceId)) - } - } - if (networkMapAddress == null) advertisedServices.add(ServiceInfo(NetworkMapService.type)) - val networkMapMessageAddress: SingleMessageRecipient? = if (networkMapAddress == null) null else NodeMessagingClient.makeNetworkMapAddress(networkMapAddress!!) - return Node(this, networkMapMessageAddress, advertisedServices, if (useTestClock == true) TestClock() else NodeClock()) + val advertisedServices = extraAdvertisedServiceIds + .split(",") + .filter(String::isNotBlank) + .map { ServiceInfo.parse(it) } + .toMutableSet() + if (networkMapService == null) advertisedServices.add(ServiceInfo(NetworkMapService.type)) + + return Node(this, advertisedServices, if (useTestClock) TestClock() else NodeClock()) } } +private fun Config.getOptionalConfig(path: String): Config? = if (hasPath(path)) getConfig(path) else null diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt index 275e89802f..11cf9d2c83 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt @@ -9,10 +9,10 @@ import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.read import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.node.services.config.NodeSSLConfiguration -import org.apache.activemq.artemis.api.core.SimpleString +import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Inbound +import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound import org.apache.activemq.artemis.api.core.TransportConfiguration import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants import java.nio.file.FileSystems import java.nio.file.Path @@ -41,9 +41,9 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { const val RPC_REQUESTS_QUEUE = "rpc.requests" const val RPC_QUEUE_REMOVALS_QUEUE = "rpc.qremovals" const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications" + const val NETWORK_MAP_QUEUE = "${INTERNAL_PREFIX}networkmap" - @JvmStatic - val NETWORK_MAP_ADDRESS = "${INTERNAL_PREFIX}networkmap" + const val VERIFY_PEER_COMMON_NAME = "corda.verifyPeerCommonName" /** * Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should @@ -59,7 +59,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { } interface ArtemisAddress : MessageRecipients { - val queueName: SimpleString + val queueName: String } interface ArtemisPeerAddress : ArtemisAddress, SingleMessageRecipient { @@ -67,7 +67,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { } data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisPeerAddress { - override val queueName = SimpleString(NETWORK_MAP_ADDRESS) + override val queueName: String get() = NETWORK_MAP_QUEUE } /** @@ -75,22 +75,21 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { * may change or evolve and code that relies upon it being a simple host/port may not function correctly. * For instance it may contain onion routing data. * - * [NodeAddress] identifies a specific peer node and an associated queue. The queue may be the peer's p2p queue or + * [NodeAddress] identifies a specific peer node and an associated queue. The queue may be the peer's own queue or * an advertised service's queue. * * @param queueName The name of the queue this address is associated with. * @param hostAndPort The address of the node. */ - data class NodeAddress(override val queueName: SimpleString, override val hostAndPort: HostAndPort) : ArtemisPeerAddress { + data class NodeAddress(override val queueName: String, override val hostAndPort: HostAndPort) : ArtemisPeerAddress { companion object { fun asPeer(peerIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress { - return NodeAddress(SimpleString("$PEERS_PREFIX${peerIdentity.toBase58String()}"), hostAndPort) + return NodeAddress("$PEERS_PREFIX${peerIdentity.toBase58String()}", hostAndPort) } fun asService(serviceIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress { - return NodeAddress(SimpleString("$SERVICES_PREFIX${serviceIdentity.toBase58String()}"), hostAndPort) + return NodeAddress("$SERVICES_PREFIX${serviceIdentity.toBase58String()}", hostAndPort) } } - override fun toString(): String = "${javaClass.simpleName}(queue = $queueName, $hostAndPort)" } /** @@ -103,14 +102,12 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { * @param identity The service identity's owning key. */ data class ServiceAddress(val identity: CompositeKey) : ArtemisAddress, MessageRecipientGroup { - override val queueName: SimpleString = SimpleString("$SERVICES_PREFIX${identity.toBase58String()}") + override val queueName: String = "$SERVICES_PREFIX${identity.toBase58String()}" } /** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */ abstract val config: NodeSSLConfiguration - protected enum class ConnectionDirection { INBOUND, OUTBOUND } - // Restrict enabled Cipher Suites to AES and GCM as minimum for the bulk cipher. // Our self-generated certificates all use ECDSA for handshakes, but we allow classical RSA certificates to work // in case we need to use keytool certificates in some demos @@ -142,8 +139,8 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { config.trustStorePath.expectedOnDefaultFileSystem() return TransportConfiguration( when (direction) { - ConnectionDirection.INBOUND -> NettyAcceptorFactory::class.java.name - ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name + is Inbound -> NettyAcceptorFactory::class.java.name + is Outbound -> VerifyingNettyConnectorFactory::class.java.name }, mapOf( // Basic TCP target details @@ -167,9 +164,8 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword, TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","), TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2", - TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true - - // TODO: Set up the connector's host name verifier logic to ensure we connect to the expected node even in case of MITM or BGP hijacks + TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true, + VERIFY_PEER_COMMON_NAME to (direction as? Outbound)?.expectedCommonName ) ) } @@ -177,4 +173,9 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { protected fun Path.expectedOnDefaultFileSystem() { require(fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" } } + + protected sealed class ConnectionDirection { + object Inbound : ConnectionDirection() + class Outbound(val expectedCommonName: String? = null) : ConnectionDirection() + } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index fc3cced9b0..490ac67aa5 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -1,46 +1,58 @@ package net.corda.node.services.messaging import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture +import io.netty.handler.ssl.SslHandler import net.corda.core.ThreadBox -import net.corda.core.crypto.AddressFormatException -import net.corda.core.crypto.CompositeKey -import net.corda.core.crypto.X509Utilities +import net.corda.core.crypto.* import net.corda.core.crypto.X509Utilities.CORDA_CLIENT_CA import net.corda.core.crypto.X509Utilities.CORDA_ROOT_CA -import net.corda.core.crypto.newSecureRandom import net.corda.core.div +import net.corda.core.minutes import net.corda.core.node.NodeInfo import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache.MapChange +import net.corda.core.seconds import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor import net.corda.node.printBasicNodeInfo import net.corda.node.services.RPCUserService import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.INBOUND -import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.OUTBOUND -import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.NODE_ROLE -import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.PEER_ROLE -import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.RPC_ROLE +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEER_USER +import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Inbound +import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound +import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE +import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE +import net.corda.node.services.messaging.NodeLoginModule.Companion.RPC_ROLE import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.core.config.BridgeConfiguration import org.apache.activemq.artemis.core.config.Configuration import org.apache.activemq.artemis.core.config.CoreQueueConfiguration import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory import org.apache.activemq.artemis.core.security.Role import org.apache.activemq.artemis.core.server.ActiveMQServer import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl +import org.apache.activemq.artemis.spi.core.remoting.* import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal +import org.bouncycastle.asn1.x500.X500Name import rx.Subscription import java.io.IOException import java.math.BigInteger import java.security.Principal import java.security.PublicKey import java.util.* +import java.util.concurrent.Executor +import java.util.concurrent.ScheduledExecutorService import javax.annotation.concurrent.ThreadSafe import javax.security.auth.Subject import javax.security.auth.callback.CallbackHandler @@ -52,6 +64,7 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE import javax.security.auth.login.FailedLoginException import javax.security.auth.login.LoginException import javax.security.auth.spi.LoginModule +import javax.security.cert.X509Certificate // TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman. // TODO: Implement a discovery engine that can trigger builds of new connections when another node registers? (later) @@ -81,6 +94,12 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, private val mutex = ThreadBox(InnerState()) private lateinit var activeMQServer: ActiveMQServer + private val _networkMapConnectionFuture = config.networkMapService?.let { SettableFuture.create() } + /** + * A [ListenableFuture] which completes when the server successfully connects to the network map node. If a + * non-recoverable error is encountered then the Future will complete with an exception. + */ + val networkMapConnectionFuture: SettableFuture? get() = _networkMapConnectionFuture private var networkChangeHandle: Subscription? = null init { @@ -88,13 +107,15 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, } /** - * The server will make sure the bridge exists on network map changes, see method [destroyOrCreateBridge] + * The server will make sure the bridge exists on network map changes, see method [updateBridgesOnNetworkChange] * We assume network map will be updated accordingly when the client node register with the network map server. */ fun start() = mutex.locked { if (!running) { configureAndStartServer() - networkChangeHandle = networkMapCache.changed.subscribe { destroyOrCreateBridges(it) } + // Deploy bridge to the network map service + config.networkMapService?.let { deployBridge(NetworkMapAddress(it.address), it.legalName) } + networkChangeHandle = networkMapCache.changed.subscribe { updateBridgesOnNetworkChange(it) } running = true } } @@ -106,48 +127,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, running = false } - /** - * The bridge will be created automatically when the queues are created, however, this is not the case when the network map restarted. - * The queues are restored from the journal, and because the queues are added before we register the callback handler, this method will never get called for existing queues. - * This results in message queues up and never get send out. (https://github.com/corda/corda/issues/37) - * - * We create the bridges indirectly now because the network map is not persisted and there are no ways to obtain host and port information on startup. - * TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service. - */ - private fun destroyOrCreateBridges(change: MapChange) { - fun addAddresses(node: NodeInfo, targets: MutableSet) { - // Add the node's address with the p2p queue. - val nodeAddress = node.address as ArtemisPeerAddress - targets.add(nodeAddress) - // Add the node's address with service queues, one per service. - node.advertisedServices.forEach { - targets.add(NodeAddress.asService(it.identity.owningKey, nodeAddress.hostAndPort)) - } - } - - val addressesToCreateBridgesTo = HashSet() - val addressesToRemoveBridgesFrom = HashSet() - when (change) { - is MapChange.Modified -> { - addAddresses(change.node, addressesToCreateBridgesTo) - addAddresses(change.previousNode, addressesToRemoveBridgesFrom) - } - is MapChange.Removed -> { - addAddresses(change.node, addressesToRemoveBridgesFrom) - } - is MapChange.Added -> { - addAddresses(change.node, addressesToCreateBridgesTo) - } - } - - (addressesToRemoveBridgesFrom - addressesToCreateBridgesTo).forEach { - activeMQServer.destroyBridge(getBridgeName(it.queueName, it.hostAndPort)) - } - addressesToCreateBridgesTo.filter { activeMQServer.queueQuery(it.queueName).isExists }.forEach { - deployBridgeIfAbsent(it.queueName, it.hostAndPort) - } - } - private fun configureAndStartServer() { val config = createArtemisConfig() val securityManager = createArtemisSecurityManager() @@ -156,57 +135,19 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, registerActivationFailureListener { exception -> throw exception } // Some types of queue might need special preparation on our side, like dialling back or preparing // a lazily initialised subsystem. - registerPostQueueCreationCallback { deployBridgeFromNewQueue(it) } + registerPostQueueCreationCallback { deployBridgesFromNewQueue(it.toString()) } registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } } } activeMQServer.start() printBasicNodeInfo("Node listening on address", myHostPort.toString()) } - private fun maybeDeployBridgeForNode(queueName: SimpleString, nodeInfo: NodeInfo) { - val address = nodeInfo.address - if (address is ArtemisPeerAddress) { - log.debug("Deploying bridge for $queueName to $nodeInfo") - deployBridgeIfAbsent(queueName, address.hostAndPort) - } else { - log.error("Don't know how to deal with $address for queue $queueName") - } - } - - private fun deployBridgeFromNewQueue(queueName: SimpleString) { - log.debug { "Queue created: $queueName, deploying bridge(s)" } - when { - queueName.startsWith(PEERS_PREFIX) -> try { - val identity = CompositeKey.parseFromBase58(queueName.substring(PEERS_PREFIX.length)) - val nodeInfo = networkMapCache.getNodeByLegalIdentityKey(identity) - if (nodeInfo != null) { - maybeDeployBridgeForNode(queueName, nodeInfo) - } else { - log.error("Queue created for a peer that we don't know from the network map: $queueName") - } - } catch (e: AddressFormatException) { - log.error("Flow violation: Could not parse peer queue name as Base 58: $queueName") - } - - queueName.startsWith(SERVICES_PREFIX) -> try { - val identity = CompositeKey.parseFromBase58(queueName.substring(SERVICES_PREFIX.length)) - val nodeInfos = networkMapCache.getNodesByAdvertisedServiceIdentityKey(identity) - // Create a bridge for each node advertising the service. - for (nodeInfo in nodeInfos) { - maybeDeployBridgeForNode(queueName, nodeInfo) - } - } catch (e: AddressFormatException) { - log.error("Flow violation: Could not parse service queue name as Base 58: $queueName") - } - } - } - private fun createArtemisConfig(): Configuration = ConfigurationImpl().apply { val artemisDir = config.basedir / "artemis" bindingsDirectory = (artemisDir / "bindings").toString() journalDirectory = (artemisDir / "journal").toString() largeMessagesDirectory = (artemisDir / "large-messages").toString() - acceptorConfigurations = setOf(tcpTransport(INBOUND, "0.0.0.0", myHostPort.port)) + acceptorConfigurations = setOf(tcpTransport(Inbound, "0.0.0.0", myHostPort.port)) // Enable built in message deduplication. Note we still have to do our own as the delayed commits // and our own definition of commit mean that the built in deduplication cannot remove all duplicates. idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess @@ -215,42 +156,32 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS) // Artemis allows multiple servers to be grouped together into a cluster for load balancing purposes. The cluster // user is used for connecting the nodes together. It has super-user privileges and so it's imperative that its - // password is changed from the default (as warned in the docs). Since we don't need this feature we turn it off + // password be changed from the default (as warned in the docs). Since we don't need this feature we turn it off // by having its password be an unknown securely random 128-bit value. clusterPassword = BigInteger(128, newSecureRandom()).toString(16) - - queueConfigurations.addAll(listOf( - CoreQueueConfiguration().apply { - address = NETWORK_MAP_ADDRESS - name = NETWORK_MAP_ADDRESS - isDurable = true - }, - CoreQueueConfiguration().apply { - address = P2P_QUEUE - name = P2P_QUEUE - isDurable = true - }, - // Create an RPC queue: this will service locally connected clients only (not via a bridge) and those - // clients must have authenticated. We could use a single consumer for everything and perhaps we should, - // but these queues are not worth persisting. - CoreQueueConfiguration().apply { - name = RPC_REQUESTS_QUEUE - address = RPC_REQUESTS_QUEUE - isDurable = false - }, - // The custom name for the queue is intentional - we may wish other things to subscribe to the - // NOTIFICATIONS_ADDRESS with different filters in future - CoreQueueConfiguration().apply { - name = RPC_QUEUE_REMOVALS_QUEUE - address = NOTIFICATIONS_ADDRESS - isDurable = false - filterString = "_AMQ_NotifType = 1" - } - )) - + queueConfigurations = listOf( + queueConfig(NETWORK_MAP_QUEUE, durable = true), + queueConfig(P2P_QUEUE, durable = true), + // Create an RPC queue: this will service locally connected clients only (not via a bridge) and those + // clients must have authenticated. We could use a single consumer for everything and perhaps we should, + // but these queues are not worth persisting. + queueConfig(RPC_REQUESTS_QUEUE, durable = false), + // The custom name for the queue is intentional - we may wish other things to subscribe to the + // NOTIFICATIONS_ADDRESS with different filters in future + queueConfig(RPC_QUEUE_REMOVALS_QUEUE, address = NOTIFICATIONS_ADDRESS, filter = "_AMQ_NotifType = 1", durable = false) + ) configureAddressSecurity() } + private fun queueConfig(name: String, address: String = name, filter: String? = null, durable: Boolean): CoreQueueConfiguration { + return CoreQueueConfiguration().apply { + this.name = name + this.address = address + filterString = filter + isDurable = durable + } + } + /** * Authenticated clients connecting to us fall in one of three groups: * 1. The node hosting us and any of its logically connected components. These are given full access to all valid queues. @@ -279,45 +210,114 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, } private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager { - val ourRootCAPublicKey = X509Utilities + val rootCAPublicKey = X509Utilities .loadCertificateFromKeyStore(config.trustStorePath, config.trustStorePassword, CORDA_ROOT_CA) .publicKey - val ourPublicKey = X509Utilities + val ourCertificate = X509Utilities .loadCertificateFromKeyStore(config.keyStorePath, config.keyStorePassword, CORDA_CLIENT_CA) - .publicKey + val ourSubjectDN = X500Name(ourCertificate.subjectDN.name) + // This is a sanity check and should not fail unless things have been misconfigured + require(ourSubjectDN.commonName == config.myLegalName) { + "Legal name does not match with our subject CN: $ourSubjectDN" + } val securityConfig = object : SecurityConfiguration() { // Override to make it work with our login module override fun getAppConfigurationEntry(name: String): Array { val options = mapOf( RPCUserService::class.java.name to userService, - CORDA_ROOT_CA to ourRootCAPublicKey, - CORDA_CLIENT_CA to ourPublicKey) + CORDA_ROOT_CA to rootCAPublicKey, + CORDA_CLIENT_CA to ourCertificate.publicKey) return arrayOf(AppConfigurationEntry(name, REQUIRED, options)) } } return ActiveMQJAASSecurityManager(NodeLoginModule::class.java.name, securityConfig) } - private fun connectorExists(hostAndPort: HostAndPort) = hostAndPort.toString() in activeMQServer.configuration.connectorConfigurations + private fun deployBridgesFromNewQueue(queueName: String) { + log.debug { "Queue created: $queueName, deploying bridge(s)" } - private fun addConnector(hostAndPort: HostAndPort) = activeMQServer.configuration.addConnectorConfiguration( - hostAndPort.toString(), - tcpTransport(OUTBOUND, hostAndPort.hostText, hostAndPort.port) - ) - - private fun bridgeExists(name: String) = activeMQServer.clusterManager.bridges.containsKey(name) - - fun deployBridgeIfAbsent(queueName: SimpleString, hostAndPort: HostAndPort) { - if (!connectorExists(hostAndPort)) { - addConnector(hostAndPort) + fun deployBridgeToPeer(nodeInfo: NodeInfo) { + log.debug("Deploying bridge for $queueName to $nodeInfo") + val address = nodeInfo.address + if (address is ArtemisPeerAddress) { + deployBridge(queueName, address.hostAndPort, nodeInfo.legalIdentity.name) + } else { + log.error("Don't know how to deal with $address for queue $queueName") + } } - val bridgeName = getBridgeName(queueName, hostAndPort) - if (!bridgeExists(bridgeName)) { - deployBridge(bridgeName, queueName, hostAndPort) + + when { + queueName.startsWith(PEERS_PREFIX) -> try { + val identity = CompositeKey.parseFromBase58(queueName.substring(PEERS_PREFIX.length)) + val nodeInfo = networkMapCache.getNodeByLegalIdentityKey(identity) + if (nodeInfo != null) { + deployBridgeToPeer(nodeInfo) + } else { + log.error("Queue created for a peer that we don't know from the network map: $queueName") + } + } catch (e: AddressFormatException) { + log.error("Flow violation: Could not parse peer queue name as Base 58: $queueName") + } + + queueName.startsWith(SERVICES_PREFIX) -> try { + val identity = CompositeKey.parseFromBase58(queueName.substring(SERVICES_PREFIX.length)) + val nodeInfos = networkMapCache.getNodesByAdvertisedServiceIdentityKey(identity) + // Create a bridge for each node advertising the service. + for (nodeInfo in nodeInfos) { + deployBridgeToPeer(nodeInfo) + } + } catch (e: AddressFormatException) { + log.error("Flow violation: Could not parse service queue name as Base 58: $queueName") + } } } - private fun getBridgeName(queueName: SimpleString, hostAndPort: HostAndPort) = "$queueName -> $hostAndPort" + /** + * The bridge will be created automatically when the queues are created, however, this is not the case when the network map restarted. + * The queues are restored from the journal, and because the queues are added before we register the callback handler, this method will never get called for existing queues. + * This results in message queues up and never get send out. (https://github.com/corda/corda/issues/37) + * + * We create the bridges indirectly now because the network map is not persisted and there are no ways to obtain host and port information on startup. + * TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service. + */ + private fun updateBridgesOnNetworkChange(change: MapChange) { + fun gatherAddresses(node: NodeInfo): Sequence { + val peerAddress = node.address as ArtemisPeerAddress + val addresses = mutableListOf(peerAddress) + node.advertisedServices.mapTo(addresses) { NodeAddress.asService(it.identity.owningKey, peerAddress.hostAndPort) } + return addresses.asSequence() + } + + fun deployBridges(node: NodeInfo) { + gatherAddresses(node) + .filter { queueExists(it.queueName) && !bridgeExists(it.bridgeName) } + .forEach { deployBridge(it, node.legalIdentity.name) } + } + + fun destroyBridges(node: NodeInfo) { + gatherAddresses(node).forEach { + activeMQServer.destroyBridge(it.bridgeName) + } + } + + when (change) { + is MapChange.Added -> { + deployBridges(change.node) + } + is MapChange.Removed -> { + destroyBridges(change.node) + } + is MapChange.Modified -> { + // TODO Figure out what has actually changed and only destroy those bridges that need to be. + destroyBridges(change.previousNode) + deployBridges(change.node) + } + } + } + + private fun deployBridge(address: ArtemisPeerAddress, legalName: String) { + deployBridge(address.queueName, address.hostAndPort, legalName) + } /** * All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving @@ -325,14 +325,25 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, * as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's * P2P address. */ - private fun deployBridge(bridgeName: String, queueName: SimpleString, hostAndPort: HostAndPort) { + private fun deployBridge(queueName: String, target: HostAndPort, legalName: String) { + val tcpTransport = tcpTransport(Outbound(expectedCommonName = legalName), target.hostText, target.port) + tcpTransport.params[ArtemisMessagingServer::class.java.name] = this + // We intentionally overwrite any previous connector config in case the peer legal name changed + activeMQServer.configuration.addConnectorConfiguration(target.toString(), tcpTransport) + activeMQServer.deployBridge(BridgeConfiguration().apply { - name = bridgeName - this.queueName = queueName.toString() + name = getBridgeName(queueName, target) + this.queueName = queueName forwardingAddress = P2P_QUEUE - staticConnectors = listOf(hostAndPort.toString()) + staticConnectors = listOf(target.toString()) confirmationWindowSize = 100000 // a guess isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic + // We keep trying until the network map deems the node unreachable and tells us it's been removed at which + // point we destroy the bridge + // TODO Give some thought to the retry settings + retryInterval = 5.seconds.toMillis() + retryIntervalMultiplier = 1.5 // Exponential backoff + maxRetryInterval = 3.minutes.toMillis() // As a peer of the target node we must connect to it using the peer user. Actual authentication is done using // our TLS certificate. user = PEER_USER @@ -340,114 +351,197 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, }) } - /** - * Clients must connect to us with a username and password and must use TLS. If a someone connects with - * [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate - * is the same as our one in our key store. Then they're given full access to all valid queues. If they connect with - * [ArtemisMessagingComponent.PEER_USER] then we confirm they belong on our P2P network by checking their root CA is - * the same as our root CA. If that's the case the only access they're given is the ablility send to our P2P address. - * In both cases the messages these authenticated nodes send to us are tagged with their subject DN and we assume - * the CN within that is their legal name. - * Otherwise if the username is neither of the above we assume it's an RPC user and authenticate against our list of - * valid RPC users. RPC clients are given permission to perform RPC and nothing else. - */ - class NodeLoginModule : LoginModule { + private fun queueExists(queueName: String): Boolean = activeMQServer.queueQuery(SimpleString(queueName)).isExists - companion object { - // Include forbidden username character to prevent name clash with any RPC usernames - const val PEER_ROLE = "SystemRoles/Peer" - const val NODE_ROLE = "SystemRoles/Node" - const val RPC_ROLE = "SystemRoles/RPC" + private fun bridgeExists(bridgeName: String): Boolean = activeMQServer.clusterManager.bridges.containsKey(bridgeName) + + private val ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort) + + private fun getBridgeName(queueName: String, hostAndPort: HostAndPort): String = "$queueName -> $hostAndPort" + + // This is called on one of Artemis' background threads + internal fun hostVerificationFail(peerLegalName: String, expectedCommonName: String) { + log.error("Peer has wrong CN - expected $expectedCommonName but got $peerLegalName. This is either a fatal " + + "misconfiguration by the remote peer or an SSL man-in-the-middle attack!") + if (expectedCommonName == config.networkMapService?.legalName) { + // If the peer that failed host verification was the network map node then we're in big trouble and need to bail! + _networkMapConnectionFuture!!.setException(IOException("${config.networkMapService} failed host verification check")) } + } - private var loginSucceeded: Boolean = false - private lateinit var subject: Subject - private lateinit var callbackHandler: CallbackHandler - private lateinit var userService: RPCUserService - private lateinit var ourRootCAPublicKey: PublicKey - private lateinit var ourPublicKey: PublicKey - private val principals = ArrayList() - - override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map, options: Map) { - this.subject = subject - this.callbackHandler = callbackHandler - userService = options[RPCUserService::class.java.name] as RPCUserService - ourRootCAPublicKey = options[CORDA_ROOT_CA] as PublicKey - ourPublicKey = options[CORDA_CLIENT_CA] as PublicKey - } - - override fun login(): Boolean { - val nameCallback = NameCallback("Username: ") - val passwordCallback = PasswordCallback("Password: ", false) - val certificateCallback = CertificateCallback() - - try { - callbackHandler.handle(arrayOf(nameCallback, passwordCallback, certificateCallback)) - } catch (e: IOException) { - throw LoginException(e.message) - } catch (e: UnsupportedCallbackException) { - throw LoginException("${e.message} not available to obtain information from user") - } - - val username = nameCallback.name ?: throw FailedLoginException("Username not provided") - val password = String(passwordCallback.password ?: throw FailedLoginException("Password not provided")) - - val validatedUser = if (username == PEER_USER || username == NODE_USER) { - val certificates = certificateCallback.certificates ?: throw FailedLoginException("No TLS?") - val peerCertificate = certificates.first() - val role = if (username == NODE_USER) { - if (peerCertificate.publicKey != ourPublicKey) { - throw FailedLoginException("Only the node can login as $NODE_USER") - } - NODE_ROLE - } else { - val theirRootCAPublicKey = certificates.last().publicKey - if (theirRootCAPublicKey != ourRootCAPublicKey) { - throw FailedLoginException("Peer does not belong on our network. Their root CA: $theirRootCAPublicKey") - } - PEER_ROLE // This enables the peer to send to our P2P address - } - principals += RolePrincipal(role) - peerCertificate.subjectDN.name - } else { - // Otherwise assume they're an RPC user - val rpcUser = userService.getUser(username) ?: throw FailedLoginException("User does not exist") - if (password != rpcUser.password) { - // TODO Switch to hashed passwords - // TODO Retrieve client IP address to include in exception message - throw FailedLoginException("Password for user $username does not match") - } - principals += RolePrincipal(RPC_ROLE) // This enables the RPC client to send requests - principals += RolePrincipal("$CLIENTS_PREFIX$username") // This enables the RPC client to receive responses - username - } - principals += UserPrincipal(validatedUser) - - loginSucceeded = true - return loginSucceeded - } - - override fun commit(): Boolean { - val result = loginSucceeded - if (result) { - subject.principals.addAll(principals) - } - clear() - return result - } - - override fun abort(): Boolean { - clear() - return true - } - - override fun logout(): Boolean { - subject.principals.removeAll(principals) - return true - } - - private fun clear() { - loginSucceeded = false + // This is called on one of Artemis' background threads + internal fun onTcpConnection(peerLegalName: String) { + if (peerLegalName == config.networkMapService?.legalName) { + _networkMapConnectionFuture!!.set(Unit) } } } + +class VerifyingNettyConnectorFactory : NettyConnectorFactory() { + override fun createConnector(configuration: MutableMap?, + handler: BufferHandler?, + listener: ClientConnectionLifeCycleListener?, + closeExecutor: Executor?, + threadPool: Executor?, + scheduledThreadPool: ScheduledExecutorService?, + protocolManager: ClientProtocolManager?): Connector { + return VerifyingNettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, + protocolManager) + } +} + +private class VerifyingNettyConnector(configuration: MutableMap?, + handler: BufferHandler?, + listener: ClientConnectionLifeCycleListener?, + closeExecutor: Executor?, + threadPool: Executor?, + scheduledThreadPool: ScheduledExecutorService?, + protocolManager: ClientProtocolManager?) : + NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) +{ + private val server = configuration?.get(ArtemisMessagingServer::class.java.name) as? ArtemisMessagingServer + private val expectedCommonName = configuration?.get(ArtemisMessagingComponent.VERIFY_PEER_COMMON_NAME) as? String + + override fun createConnection(): Connection? { + val connection = super.createConnection() as NettyConnection? + if (connection != null && expectedCommonName != null) { + val peerLegalName = connection + .channel + .pipeline() + .get(SslHandler::class.java) + .engine() + .session + .peerPrincipal + .name + .let(::X500Name) + .commonName + // TODO Verify on the entire principle (subject) + if (peerLegalName != expectedCommonName) { + connection.close() + server!!.hostVerificationFail(peerLegalName, expectedCommonName) + return null // Artemis will keep trying to reconnect until it's told otherwise + } else { + server!!.onTcpConnection(peerLegalName) + } + } + return connection + } +} + +/** + * Clients must connect to us with a username and password and must use TLS. If a someone connects with + * [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate + * is the same as our one in our key store. Then they're given full access to all valid queues. If they connect with + * [ArtemisMessagingComponent.PEER_USER] then we confirm they belong on our P2P network by checking their root CA is + * the same as our root CA. If that's the case the only access they're given is the ablility send to our P2P address. + * In both cases the messages these authenticated nodes send to us are tagged with their subject DN and we assume + * the CN within that is their legal name. + * Otherwise if the username is neither of the above we assume it's an RPC user and authenticate against our list of + * valid RPC users. RPC clients are given permission to perform RPC and nothing else. + */ +class NodeLoginModule : LoginModule { + companion object { + // Include forbidden username character to prevent name clash with any RPC usernames + const val PEER_ROLE = "SystemRoles/Peer" + const val NODE_ROLE = "SystemRoles/Node" + const val RPC_ROLE = "SystemRoles/RPC" + } + + private var loginSucceeded: Boolean = false + private lateinit var subject: Subject + private lateinit var callbackHandler: CallbackHandler + private lateinit var userService: RPCUserService + private lateinit var ourRootCAPublicKey: PublicKey + private lateinit var ourPublicKey: PublicKey + private val principals = ArrayList() + + override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map, options: Map) { + this.subject = subject + this.callbackHandler = callbackHandler + userService = options[RPCUserService::class.java.name] as RPCUserService + ourRootCAPublicKey = options[CORDA_ROOT_CA] as PublicKey + ourPublicKey = options[CORDA_CLIENT_CA] as PublicKey + } + + override fun login(): Boolean { + val nameCallback = NameCallback("Username: ") + val passwordCallback = PasswordCallback("Password: ", false) + val certificateCallback = CertificateCallback() + + try { + callbackHandler.handle(arrayOf(nameCallback, passwordCallback, certificateCallback)) + } catch (e: IOException) { + throw LoginException(e.message) + } catch (e: UnsupportedCallbackException) { + throw LoginException("${e.message} not available to obtain information from user") + } + + val username = nameCallback.name ?: throw FailedLoginException("Username not provided") + val password = String(passwordCallback.password ?: throw FailedLoginException("Password not provided")) + + val validatedUser = if (username == PEER_USER || username == NODE_USER) { + val certificates = certificateCallback.certificates ?: throw FailedLoginException("No TLS?") + authenticateNode(certificates, username) + } else { + // Otherwise assume they're an RPC user + authenticateRpcUser(password, username) + } + principals += UserPrincipal(validatedUser) + + loginSucceeded = true + return loginSucceeded + } + + private fun authenticateNode(certificates: Array, username: String): String { + val peerCertificate = certificates.first() + val role = if (username == NODE_USER) { + if (peerCertificate.publicKey != ourPublicKey) { + throw FailedLoginException("Only the node can login as $NODE_USER") + } + NODE_ROLE + } else { + val theirRootCAPublicKey = certificates.last().publicKey + if (theirRootCAPublicKey != ourRootCAPublicKey) { + throw FailedLoginException("Peer does not belong on our network. Their root CA: $theirRootCAPublicKey") + } + PEER_ROLE // This enables the peer to send to our P2P address + } + principals += RolePrincipal(role) + return peerCertificate.subjectDN.name + } + + private fun authenticateRpcUser(password: String, username: String): String { + val rpcUser = userService.getUser(username) ?: throw FailedLoginException("User does not exist") + if (password != rpcUser.password) { + // TODO Switch to hashed passwords + // TODO Retrieve client IP address to include in exception message + throw FailedLoginException("Password for user $username does not match") + } + principals += RolePrincipal(RPC_ROLE) // This enables the RPC client to send requests + principals += RolePrincipal("$CLIENTS_PREFIX$username") // This enables the RPC client to receive responses + return username + } + + override fun commit(): Boolean { + val result = loginSucceeded + if (result) { + subject.principals.addAll(principals) + } + clear() + return result + } + + override fun abort(): Boolean { + clear() + return true + } + + override fun logout(): Boolean { + subject.principals.removeAll(principals) + return true + } + + private fun clear() { + loginSucceeded = false + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClient.kt index 22ec2703b0..85d2eb85e3 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClient.kt @@ -4,6 +4,7 @@ import com.google.common.net.HostAndPort import net.corda.core.ThreadBox import net.corda.core.messaging.CordaRPCOps import net.corda.node.services.config.NodeSSLConfiguration +import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.ClientSession @@ -35,7 +36,7 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur state.locked { check(!running) checkStorePasswords() - val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(ConnectionDirection.OUTBOUND, host.hostText, host.port)) + val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(Outbound(), host.hostText, host.port)) serverLocator.threadPoolMaxSize = 1 // TODO: Configure session reconnection, confirmation window sizes and other Artemis features. // This will allow reconnection in case of server restart/network outages/IP address changes, etc. diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index 2e4c9d9119..7aaab91fef 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -14,6 +14,7 @@ import net.corda.core.utilities.trace import net.corda.node.services.RPCUserService import net.corda.node.services.api.MessagingServiceInternal import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound import net.corda.node.utilities.* import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID @@ -66,13 +67,6 @@ class NodeMessagingClient(override val config: NodeConfiguration, // confusion. const val TOPIC_PROPERTY = "platform-topic" const val SESSION_ID_PROPERTY = "session-id" - - /** - * This should be the only way to generate an ArtemisAddress and that only of the remote NetworkMapService node. - * All other addresses come from the NetworkMapCache, or myAddress below. - * The node will populate with their own identity based address when they register with the NetworkMapService. - */ - fun makeNetworkMapAddress(hostAndPort: HostAndPort): SingleMessageRecipient = NetworkMapAddress(hostAndPort) } private class InnerState { @@ -118,7 +112,8 @@ class NodeMessagingClient(override val config: NodeConfiguration, started = true log.info("Connecting to server: $serverHostPort") - val tcpTransport = tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port) + // TODO Add broker CN to config for host verification in case the embedded broker isn't used + val tcpTransport = tcpTransport(Outbound(), serverHostPort.hostText, serverHostPort.port) val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport) clientFactory = locator.createSessionFactory() @@ -375,10 +370,10 @@ class NodeMessagingClient(override val config: NodeConfiguration, } } - private fun getMQAddress(target: MessageRecipients): SimpleString { + private fun getMQAddress(target: MessageRecipients): String { return if (target == myAddress) { // If we are sending to ourselves then route the message directly to our P2P queue. - SimpleString(P2P_QUEUE) + P2P_QUEUE } else { // Otherwise we send the message to an internal queue for the target residing on our broker. It's then the // broker's job to route the message to the target's P2P queue. @@ -391,9 +386,9 @@ class NodeMessagingClient(override val config: NodeConfiguration, } /** Attempts to create a durable queue on the broker which is bound to an address of the same name. */ - private fun createQueueIfAbsent(queueName: SimpleString) { + private fun createQueueIfAbsent(queueName: String) { state.alreadyLocked { - val queueQuery = session!!.queueQuery(queueName) + val queueQuery = session!!.queueQuery(SimpleString(queueName)) if (!queueQuery.isExists) { log.info("Create fresh queue $queueName bound on same address") session!!.createQueue(queueName, queueName, true) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt index c68105887b..ca2954e5cb 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt @@ -32,6 +32,7 @@ import net.corda.flows.CashFlowResult import net.corda.node.internal.AbstractNode import net.corda.node.services.User import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress import net.i2p.crypto.eddsa.EdDSAPrivateKey import net.i2p.crypto.eddsa.EdDSAPublicKey import org.apache.activemq.artemis.api.core.SimpleString @@ -197,18 +198,8 @@ private class RPCKryo(observableSerializer: Serializer>? = null) register(NetworkMapCache.MapChange.Added::class.java) register(NetworkMapCache.MapChange.Removed::class.java) register(NetworkMapCache.MapChange.Modified::class.java) - register(ArtemisMessagingComponent.NodeAddress::class.java, - read = { kryo, input -> - ArtemisMessagingComponent.NodeAddress( - kryo.readObject(input, SimpleString::class.java), - kryo.readObject(input, HostAndPort::class.java)) - }, - write = { kryo, output, nodeAddress -> - kryo.writeObject(output, nodeAddress.queueName) - kryo.writeObject(output, nodeAddress.hostAndPort) - } - ) - register(NodeMessagingClient.makeNetworkMapAddress(HostAndPort.fromString("localhost:0")).javaClass) + register(ArtemisMessagingComponent.NodeAddress::class.java) + register(NetworkMapAddress::class.java) register(ServiceInfo::class.java) register(ServiceType.getServiceType("ab", "ab").javaClass) register(ServiceType.parse("ab").javaClass) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index ccfcae6953..44e37250fe 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -208,6 +208,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, serviceHub.networkService.addMessageHandler(sessionTopic) { message, reg -> executor.checkOnThread() val sessionMessage = message.data.deserialize() + // TODO Look up the party with the full X.500 name instead of just the legal name val otherParty = serviceHub.networkMapCache.getNodeByLegalName(message.peer.commonName)?.legalIdentity if (otherParty != null) { when (sessionMessage) { diff --git a/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt index a87fb3b453..ae7149e731 100644 --- a/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt @@ -23,6 +23,7 @@ import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.databaseTransaction +import net.corda.testing.TestNodeConfiguration import net.corda.testing.freeLocalHostAndPort import net.corda.testing.node.makeTestDataSourceProperties import org.assertj.core.api.Assertions.assertThat @@ -35,7 +36,6 @@ import org.junit.Test import org.junit.rules.TemporaryFolder import java.io.Closeable import java.net.ServerSocket -import java.nio.file.Path import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit.MILLISECONDS import kotlin.concurrent.thread @@ -68,17 +68,10 @@ class ArtemisMessagingTests { @Before fun setUp() { userService = RPCUserServiceImpl(FullNodeConfiguration(ConfigFactory.empty())) - // TODO: create a base class that provides a default implementation - config = object : NodeConfiguration { - override val basedir: Path = temporaryFolder.newFolder().toPath() - override val myLegalName: String = "me" - override val nearestCity: String = "London" - override val emailAddress: String = "" - override val devMode: Boolean = true - override val exportJMXto: String = "" - override val keyStorePassword: String = "testpass" - override val trustStorePassword: String = "trustpass" - } + config = TestNodeConfiguration( + basedir = temporaryFolder.newFolder().toPath(), + myLegalName = "me", + networkMapService = null) LogHelper.setLevel(PersistentUniquenessProvider::class) val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties()) dataSource = dataSourceAndDatabase.first diff --git a/node/src/test/kotlin/net/corda/node/utilities/certsigning/CertificateSignerTest.kt b/node/src/test/kotlin/net/corda/node/utilities/certsigning/CertificateSignerTest.kt index cc5e1fda93..6ba11a2fc0 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/certsigning/CertificateSignerTest.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/certsigning/CertificateSignerTest.kt @@ -8,11 +8,10 @@ import net.corda.core.crypto.X509Utilities import net.corda.core.div import net.corda.core.exists import net.corda.core.readLines -import net.corda.node.services.config.NodeConfiguration +import net.corda.testing.TestNodeConfiguration import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder -import java.nio.file.Path import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertTrue @@ -20,11 +19,10 @@ import kotlin.test.assertTrue class CertificateSignerTest { @Rule @JvmField - val tempFolder: TemporaryFolder = TemporaryFolder() + val tempFolder = TemporaryFolder() @Test fun buildKeyStore() { - val id = SecureHash.randomSHA256().toString() val certs = arrayOf(X509Utilities.createSelfSignedCACert("CORDA_CLIENT_CA").certificate, @@ -36,17 +34,10 @@ class CertificateSignerTest { on { retrieveCertificates(eq(id)) }.then { certs } } - - val config = object : NodeConfiguration { - override val basedir: Path = tempFolder.root.toPath() - override val myLegalName: String = "me" - override val nearestCity: String = "London" - override val emailAddress: String = "" - override val devMode: Boolean = true - override val exportJMXto: String = "" - override val keyStorePassword: String = "testpass" - override val trustStorePassword: String = "trustpass" - } + val config = TestNodeConfiguration( + basedir = tempFolder.root.toPath(), + myLegalName = "me", + networkMapService = null) assertFalse(config.keyStorePath.exists()) assertFalse(config.trustStorePath.exists()) @@ -76,5 +67,4 @@ class CertificateSignerTest { assertEquals(id, (config.certificatesPath / "certificate-request-id.txt").readLines { it.findFirst().get() }) } - } diff --git a/publish.properties b/publish.properties index 1c38857662..3975a38351 100644 --- a/publish.properties +++ b/publish.properties @@ -1 +1 @@ -gradlePluginsVersion=0.6.2 \ No newline at end of file +gradlePluginsVersion=0.6.3 \ No newline at end of file diff --git a/samples/irs-demo/src/main/kotlin/net/corda/simulation/Simulation.kt b/samples/irs-demo/src/main/kotlin/net/corda/simulation/Simulation.kt index 3863631146..3c2bc05c64 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/simulation/Simulation.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/simulation/Simulation.kt @@ -18,10 +18,13 @@ import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.SimpleNotaryService import net.corda.node.utilities.AddOrRemove import net.corda.node.utilities.databaseTransaction -import net.corda.testing.node.* +import net.corda.testing.TestNodeConfiguration +import net.corda.testing.node.InMemoryMessagingNetwork +import net.corda.testing.node.MockNetwork +import net.corda.testing.node.TestClock +import net.corda.testing.node.setTo import rx.Observable import rx.subjects.PublishSubject -import java.nio.file.Path import java.security.KeyPair import java.time.LocalDate import java.time.LocalDateTime @@ -47,7 +50,8 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, // This puts together a mock network of SimulatedNodes. open class SimulatedNode(config: NodeConfiguration, mockNet: MockNetwork, networkMapAddress: SingleMessageRecipient?, - advertisedServices: Set, id: Int, keyPair: KeyPair?) : MockNetwork.MockNode(config, mockNet, networkMapAddress, advertisedServices, id, keyPair) { + advertisedServices: Set, id: Int, keyPair: KeyPair?) + : MockNetwork.MockNode(config, mockNet, networkMapAddress, advertisedServices, id, keyPair) { override fun findMyLocation(): PhysicalLocation? = CityDatabase[configuration.nearestCity] } @@ -59,19 +63,12 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, val letter = 'A' + counter val city = bankLocations[counter++ % bankLocations.size] - // TODO: create a base class that provides a default implementation - val cfg = object : NodeConfiguration { - override val basedir: Path = config.basedir - // TODO: Set this back to "Bank of $city" after video day. - override val myLegalName: String = "Bank $letter" - override val nearestCity: String = city - override val emailAddress: String = "" - override val devMode: Boolean = true - override val exportJMXto: String = "" - override val keyStorePassword: String = "dummy" - override val trustStorePassword: String = "trustpass" - override val dataSourceProperties = makeTestDataSourceProperties() - } + val cfg = TestNodeConfiguration( + basedir = config.basedir, + // TODO: Set this back to "Bank of $city" after video day. + myLegalName = "Bank $letter", + nearestCity = city, + networkMapService = null) return SimulatedNode(cfg, network, networkMapAddr, advertisedServices, id, keyPair) } @@ -88,20 +85,11 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?, advertisedServices: Set, id: Int, keyPair: KeyPair?): MockNetwork.MockNode { require(advertisedServices.containsType(NetworkMapService.type)) - - // TODO: create a base class that provides a default implementation - val cfg = object : NodeConfiguration { - override val basedir: Path = config.basedir - override val myLegalName: String = "Network coordination center" - override val nearestCity: String = "Amsterdam" - override val emailAddress: String = "" - override val devMode: Boolean = true - override val exportJMXto: String = "" - override val keyStorePassword: String = "dummy" - override val trustStorePassword: String = "trustpass" - override val dataSourceProperties = makeTestDataSourceProperties() - } - + val cfg = TestNodeConfiguration( + basedir = config.basedir, + myLegalName = "Network coordination center", + nearestCity = "Amsterdam", + networkMapService = null) return object : SimulatedNode(cfg, network, networkMapAddr, advertisedServices, id, keyPair) {} } } @@ -110,19 +98,11 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?, advertisedServices: Set, id: Int, keyPair: KeyPair?): MockNetwork.MockNode { require(advertisedServices.containsType(SimpleNotaryService.type)) - - // TODO: create a base class that provides a default implementation - val cfg = object : NodeConfiguration { - override val basedir: Path = config.basedir - override val myLegalName: String = "Notary Service" - override val nearestCity: String = "Zurich" - override val emailAddress: String = "" - override val devMode: Boolean = true - override val exportJMXto: String = "" - override val keyStorePassword: String = "dummy" - override val trustStorePassword: String = "trustpass" - override val dataSourceProperties = makeTestDataSourceProperties() - } + val cfg = TestNodeConfiguration( + basedir = config.basedir, + myLegalName = "Notary Service", + nearestCity = "Zurich", + networkMapService = null) return SimulatedNode(cfg, network, networkMapAddr, advertisedServices, id, keyPair) } } @@ -131,20 +111,11 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?, advertisedServices: Set, id: Int, keyPair: KeyPair?): MockNetwork.MockNode { require(advertisedServices.containsType(NodeInterestRates.type)) - - // TODO: create a base class that provides a default implementation - val cfg = object : NodeConfiguration { - override val basedir: Path = config.basedir - override val myLegalName: String = "Rates Service Provider" - override val nearestCity: String = "Madrid" - override val emailAddress: String = "" - override val devMode: Boolean = true - override val exportJMXto: String = "" - override val keyStorePassword: String = "dummy" - override val trustStorePassword: String = "trustpass" - override val dataSourceProperties = makeTestDataSourceProperties() - } - + val cfg = TestNodeConfiguration( + basedir = config.basedir, + myLegalName = "Rates Service Provider", + nearestCity = "Madrid", + networkMapService = null) return object : SimulatedNode(cfg, network, networkMapAddr, advertisedServices, id, keyPair) { override fun start(): MockNetwork.MockNode { super.start() @@ -162,26 +133,16 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, object RegulatorFactory : MockNetwork.Factory { override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?, advertisedServices: Set, id: Int, keyPair: KeyPair?): MockNetwork.MockNode { - - // TODO: create a base class that provides a default implementation - val cfg = object : NodeConfiguration { - override val basedir: Path = config.basedir - override val myLegalName: String = "Regulator A" - override val nearestCity: String = "Paris" - override val emailAddress: String = "" - override val devMode: Boolean = true - override val exportJMXto: String = "" - override val keyStorePassword: String = "dummy" - override val trustStorePassword: String = "trustpass" - override val dataSourceProperties = makeTestDataSourceProperties() - } - - val n = object : SimulatedNode(cfg, network, networkMapAddr, advertisedServices, id, keyPair) { + val cfg = TestNodeConfiguration( + basedir = config.basedir, + myLegalName = "Regulator A", + nearestCity = "Paris", + networkMapService = null) + return object : SimulatedNode(cfg, network, networkMapAddr, advertisedServices, id, keyPair) { // TODO: Regulatory nodes don't actually exist properly, this is a last minute demo request. // So we just fire a message at a node that doesn't know how to handle it, and it'll ignore it. // But that's fine for visualisation purposes. } - return n } } diff --git a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt index 105a6aa0d3..f39f26db2e 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt @@ -15,14 +15,20 @@ import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.node.internal.AbstractNode +import net.corda.node.internal.NetworkMapInfo +import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.services.statemachine.StateMachineManager.Change import net.corda.node.utilities.AddOrRemove.ADD import net.corda.testing.node.MockIdentityService import net.corda.testing.node.MockServices +import net.corda.testing.node.makeTestDataSourceProperties import rx.Subscriber import java.net.ServerSocket +import java.nio.file.Path import java.security.KeyPair +import java.time.Duration +import java.util.* import kotlin.reflect.KClass /** @@ -160,3 +166,22 @@ inline fun > AbstractNode.initiateSingleShotFlow( } fun Config.getHostAndPort(name: String) = HostAndPort.fromString(getString(name)) + +inline fun elapsedTime(block: () -> Unit): Duration { + val start = System.nanoTime() + block() + val end = System.nanoTime() + return Duration.ofNanos(end-start) +} + +data class TestNodeConfiguration( + override val basedir: Path, + override val myLegalName: String, + override val networkMapService: NetworkMapInfo?, + override val keyStorePassword: String = "cordacadevpass", + override val trustStorePassword: String = "trustpass", + override val dataSourceProperties: Properties = makeTestDataSourceProperties(myLegalName), + override val nearestCity: String = "Null Island", + override val emailAddress: String = "", + override val exportJMXto: String = "", + override val devMode: Boolean = true) : NodeConfiguration diff --git a/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt b/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt index d679beb890..08d5f13621 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt @@ -4,20 +4,20 @@ import com.google.common.net.HostAndPort import net.corda.node.services.config.NodeSSLConfiguration import net.corda.node.services.config.configureTestSSL import net.corda.node.services.messaging.ArtemisMessagingComponent -import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.OUTBOUND +import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound import org.apache.activemq.artemis.api.core.client.* /** * As the name suggests this is a simple client for connecting to MQ brokers. */ -class SimpleMQClient(val target: HostAndPort) : ArtemisMessagingComponent() { - override val config: NodeSSLConfiguration = configureTestSSL() +class SimpleMQClient(val target: HostAndPort, + override val config: NodeSSLConfiguration = configureTestSSL("SimpleMQClient")) : ArtemisMessagingComponent() { lateinit var sessionFactory: ClientSessionFactory lateinit var session: ClientSession lateinit var producer: ClientProducer fun start(username: String? = null, password: String? = null) { - val tcpTransport = tcpTransport(OUTBOUND, target.hostText, target.port) + val tcpTransport = tcpTransport(Outbound(), target.hostText, target.port) val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { isBlockOnNonDurableSend = true threadPoolMaxSize = 1 diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index 83b5e6c413..efd8536f53 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -3,6 +3,7 @@ package net.corda.testing.node import com.google.common.jimfs.Configuration.unix import com.google.common.jimfs.Jimfs import com.google.common.util.concurrent.Futures +import com.google.common.util.concurrent.ListenableFuture import net.corda.core.* import net.corda.core.crypto.Party import net.corda.core.messaging.RPCOps @@ -24,9 +25,9 @@ import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.node.services.vault.NodeVaultService import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor +import net.corda.testing.TestNodeConfiguration import org.slf4j.Logger import java.nio.file.FileSystem -import java.nio.file.Path import java.security.KeyPair import java.util.* import java.util.concurrent.TimeUnit @@ -47,7 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger */ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, private val threadPerNode: Boolean = false, - private val servicePeerAllocationStrategy: InMemoryMessagingNetwork.ServicePeerAllocationStrategy = + servicePeerAllocationStrategy: InMemoryMessagingNetwork.ServicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random(), private val defaultFactory: Factory = MockNetwork.DefaultFactory) { private var nextNodeId = 0 @@ -105,8 +106,12 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, } } - open class MockNode(config: NodeConfiguration, val mockNet: MockNetwork, networkMapAddr: SingleMessageRecipient?, - advertisedServices: Set, val id: Int, val keyPair: KeyPair?) : AbstractNode(config, networkMapAddr, advertisedServices, TestClock()) { + open class MockNode(config: NodeConfiguration, + val mockNet: MockNetwork, + override val networkMapAddress: SingleMessageRecipient?, + advertisedServices: Set, + val id: Int, + val keyPair: KeyPair?) : AbstractNode(config, advertisedServices, TestClock()) { override val log: Logger = loggerFor() override val serverThread: AffinityExecutor = if (mockNet.threadPerNode) @@ -140,7 +145,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, override fun generateKeyPair(): KeyPair = keyPair ?: super.generateKeyPair() // It's OK to not have a network map service in the mock network. - override fun noNetworkMapConfigured() = Futures.immediateFuture(Unit) + override fun noNetworkMapConfigured(): ListenableFuture = Futures.immediateFuture(Unit) // There is no need to slow down the unit tests by initialising CityDatabase override fun findMyLocation(): PhysicalLocation? = null @@ -193,18 +198,11 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, if (newNode) (path / "attachments").createDirectories() - // TODO: create a base class that provides a default implementation - val config = object : NodeConfiguration { - override val basedir: Path = path - override val myLegalName: String = legalName ?: "Mock Company $id" - override val nearestCity: String = "Atlantis" - override val emailAddress: String = "" - override val devMode: Boolean = true - override val exportJMXto: String = "" - override val keyStorePassword: String = "dummy" - override val trustStorePassword: String = "trustpass" - override val dataSourceProperties: Properties get() = makeTestDataSourceProperties("node_${id}_net_$networkId") - } + val config = TestNodeConfiguration( + basedir = path, + myLegalName = legalName ?: "Mock Company $id", + networkMapService = null, + dataSourceProperties = makeTestDataSourceProperties("node_${id}_net_$networkId")) val node = nodeFactory.create(config, this, networkMapAddress, advertisedServices.toSet(), id, keyPair) if (start) { node.setup().start() diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt index a5bd16f888..40bddbeb63 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt @@ -1,8 +1,10 @@ package net.corda.testing.node +import com.google.common.util.concurrent.ListenableFuture import net.corda.core.createDirectories import net.corda.core.div import net.corda.core.getOrThrow +import net.corda.core.map import net.corda.core.node.services.ServiceInfo import net.corda.node.internal.Node import net.corda.node.services.User @@ -17,7 +19,7 @@ import kotlin.concurrent.thread /** * Extend this class if you need to run nodes in a test. You could use the driver DSL but it's extremely slow for testing - * purposes. + * purposes. Use the DSL if you need to run the nodes in separate processes otherwise this class will suffice. */ // TODO Some of the logic here duplicates what's in the driver abstract class NodeBasedTest { @@ -50,7 +52,7 @@ abstract class NodeBasedTest { rpcUsers: List = emptyList(), configOverrides: Map = emptyMap()): Node { check(_networkMapNode == null) - return startNodeInternal(legalName, advertisedServices, rpcUsers, configOverrides).apply { + return startNodeInternal(legalName, advertisedServices, rpcUsers, configOverrides).getOrThrow().apply { _networkMapNode = this } } @@ -58,25 +60,28 @@ abstract class NodeBasedTest { fun startNode(legalName: String, advertisedServices: Set = emptySet(), rpcUsers: List = emptyList(), - configOverrides: Map = emptyMap()): Node { + configOverrides: Map = emptyMap()): ListenableFuture { return startNodeInternal( legalName, advertisedServices, rpcUsers, - configOverrides + mapOf( - "networkMapAddress" to networkMapNode.configuration.artemisAddress.toString() - ) + mapOf( + "networkMapService" to mapOf( + "address" to networkMapNode.configuration.artemisAddress.toString(), + "legalName" to networkMapNode.info.legalIdentity.name + ) + ) + configOverrides ) } private fun startNodeInternal(legalName: String, advertisedServices: Set, rpcUsers: List, - configOverrides: Map): Node { + configOverrides: Map): ListenableFuture { val config = ConfigHelper.loadConfig( baseDirectoryPath = (tempFolder.root.toPath() / legalName).createDirectories(), allowMissingConfig = true, - configOverrides = configOverrides + mapOf( + configOverrides = mapOf( "myLegalName" to legalName, "artemisAddress" to freeLocalHostAndPort().toString(), "extraAdvertisedServiceIds" to advertisedServices.joinToString(","), @@ -87,7 +92,7 @@ abstract class NodeBasedTest { "permissions" to it.permissions ) } - ) + ) + configOverrides ) val node = FullNodeConfiguration(config).createNode() @@ -96,7 +101,6 @@ abstract class NodeBasedTest { thread(name = legalName) { node.run() } - node.networkMapRegistrationFuture.getOrThrow() - return node + return node.networkMapRegistrationFuture.map { node } } } \ No newline at end of file diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt new file mode 100644 index 0000000000..3794a3a189 --- /dev/null +++ b/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt @@ -0,0 +1,61 @@ +package net.corda.testing.node + +import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.SettableFuture +import net.corda.core.crypto.composite +import net.corda.core.crypto.generateKeyPair +import net.corda.core.messaging.RPCOps +import net.corda.node.services.RPCUserServiceImpl +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.messaging.ArtemisMessagingServer +import net.corda.node.services.messaging.NodeMessagingClient +import net.corda.node.services.network.InMemoryNetworkMapCache +import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor +import net.corda.node.utilities.configureDatabase +import net.corda.node.utilities.databaseTransaction +import net.corda.testing.freeLocalHostAndPort +import org.jetbrains.exposed.sql.Database +import java.io.Closeable +import java.security.KeyPair +import kotlin.concurrent.thread + +/** + * This is a bare-bones node which can only send and receive messages. It doesn't register with a network map service or + * any other such task that would make it functionable in a network and thus left to the user to do so manually. + */ +class SimpleNode(val config: NodeConfiguration, val address: HostAndPort = freeLocalHostAndPort()) : AutoCloseable { + + private val databaseWithCloseable: Pair = configureDatabase(config.dataSourceProperties) + val database: Database get() = databaseWithCloseable.second + val userService = RPCUserServiceImpl(config) + val identity: KeyPair = generateKeyPair() + val executor = ServiceAffinityExecutor(config.myLegalName, 1) + val broker = ArtemisMessagingServer(config, address, InMemoryNetworkMapCache(), userService) + val networkMapRegistrationFuture: SettableFuture = SettableFuture.create() + val net = databaseTransaction(database) { + NodeMessagingClient( + config, + address, + identity.public.composite, + executor, + database, + networkMapRegistrationFuture) + } + + fun start() { + broker.start() + net.start( + object : RPCOps { override val protocolVersion = 0 }, + userService) + thread(name = config.myLegalName) { + net.run() + } + } + + override fun close() { + net.stop() + broker.stop() + databaseWithCloseable.first.close() + executor.shutdownNow() + } +} \ No newline at end of file