diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index 2f101b869f..562c3f916d 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -19,6 +19,8 @@ import rx.subjects.UnicastSubject import java.io.* import java.lang.reflect.Field import java.math.BigDecimal +import java.net.HttpURLConnection +import java.net.URL import java.nio.charset.Charset import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.* @@ -303,3 +305,5 @@ fun TransactionBuilder.toLedgerTransaction(services: ServiceHub, serializationCo /** Convenience method to get the package name of a class literal. */ val KClass<*>.packageName get() = java.`package`.name + +fun URL.openHttpConnection(): HttpURLConnection = openConnection() as HttpURLConnection diff --git a/core/src/main/kotlin/net/corda/core/internal/schemas/NodeInfoSchema.kt b/core/src/main/kotlin/net/corda/core/internal/schemas/NodeInfoSchema.kt index 219ee2cbae..22d9ad533e 100644 --- a/core/src/main/kotlin/net/corda/core/internal/schemas/NodeInfoSchema.kt +++ b/core/src/main/kotlin/net/corda/core/internal/schemas/NodeInfoSchema.kt @@ -27,6 +27,9 @@ object NodeInfoSchemaV1 : MappedSchema( @Column(name = "node_info_id") var id: Int, + @Column(name="node_info_hash", length = 64) + val hash: String, + @Column(name = "addresses") @OneToMany(cascade = arrayOf(CascadeType.ALL), orphanRemoval = true) val addresses: List, diff --git a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt index 69ab1a7307..6cfe29dbec 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt @@ -2,6 +2,7 @@ package net.corda.core.node.services import net.corda.core.DoNotImplement import net.corda.core.concurrent.CordaFuture +import net.corda.core.crypto.SecureHash import net.corda.core.identity.AbstractParty import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 9c2e1a4fb5..f395e9b831 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -80,6 +80,9 @@ UNRELEASED * Added an overload of ``TransactionWithSignatures.verifySignaturesExcept`` which takes in a collection of ``PublicKey``s. +* Replaced node configuration parameter ``certificateSigningService`` with ``compatibilityZoneURL``, which is Corda + compatibility zone network management service's address. + .. _changelog_v1: Release 1.0 diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index fbb144d795..4df92c64b4 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -45,8 +45,7 @@ Simple Notary configuration file. } useHTTPS : false devMode : true - // Certificate signing service will be hosted by R3 in the near future. - //certificateSigningService : "https://testnet.certificate.corda.net" + compatibilityZoneURL : "https://cz.corda.net" Fields ------ @@ -141,8 +140,8 @@ path to the node's base directory. attempt to discover its externally visible IP address first by looking for any public addresses on its network interfaces, and then by sending an IP discovery request to the network map service. Set to ``false`` to disable. -:certificateSigningService: Certificate Signing Server address. It is used by the certificate signing request utility to - obtain SSL certificate. (See :doc:`permissioning` for more information.) +:compatibilityZoneURL: The root address of Corda compatibility zone network management services, it is used by the Corda node to register with the network and + obtain Corda node certificate, (See :doc:`permissioning` for more information.) and also used by the node to obtain network map information. :jvmArgs: An optional list of JVM args, as strings, which replace those inherited from the command line when launching via ``corda.jar`` only. e.g. ``jvmArgs = [ "-Xmx220m", "-Xms220m", "-XX:+UseG1GC" ]`` diff --git a/docs/source/permissioning.rst b/docs/source/permissioning.rst index 9ab1782f76..637b78ab50 100644 --- a/docs/source/permissioning.rst +++ b/docs/source/permissioning.rst @@ -22,8 +22,7 @@ The following information from the node configuration file is needed to generate :emailAddress: e.g. "admin@company.com" -:certificateSigningService: Doorman server URL. A doorman server will be hosted by R3 in the near - future. e.g."https://testnet.certificate.corda.net" +:compatibilityZoneURL: Corda compatibility zone network management service root URL. A new pair of private and public keys generated by the Corda node will be used to create the request. diff --git a/node/src/integration-test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt index d1bf50d6aa..1a0be3d57f 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt @@ -3,14 +3,16 @@ package net.corda.node.services.network import com.google.common.jimfs.Configuration import com.google.common.jimfs.Jimfs import net.corda.cordform.CordformNode +import net.corda.core.crypto.SignedData import net.corda.core.internal.createDirectories import net.corda.core.internal.div import net.corda.core.node.NodeInfo +import net.corda.core.node.services.KeyManagementService +import net.corda.core.serialization.serialize +import net.corda.node.services.identity.InMemoryIdentityService import net.corda.nodeapi.NodeInfoFilesCopier -import net.corda.testing.ALICE -import net.corda.testing.ALICE_KEY -import net.corda.testing.getTestPartyAndCertificate import net.corda.testing.* +import net.corda.testing.node.MockKeyManagementService import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.contentOf import org.junit.Before @@ -38,12 +40,15 @@ class NodeInfoWatcherTest { private lateinit var nodeInfoPath: Path private val scheduler = TestScheduler() private val testSubscriber = TestSubscriber() + private lateinit var keyManagementService: KeyManagementService // Object under test private lateinit var nodeInfoWatcher: NodeInfoWatcher @Before fun start() { + val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT) + keyManagementService = MockKeyManagementService(identityService, ALICE_KEY) nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler = scheduler) nodeInfoPath = tempFolder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY } @@ -52,7 +57,8 @@ class NodeInfoWatcherTest { fun `save a NodeInfo`() { assertEquals(0, tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.size) - NodeInfoWatcher.saveToFile(tempFolder.root.toPath(), nodeInfo, ALICE_KEY) + val signedNodeInfo = SignedData(nodeInfo.serialize(), keyManagementService.sign(nodeInfo.serialize().bytes, nodeInfo.legalIdentities.first().owningKey)) + NodeInfoWatcher.saveToFile(tempFolder.root.toPath(), signedNodeInfo) val nodeInfoFiles = tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) } assertEquals(1, nodeInfoFiles.size) @@ -67,7 +73,8 @@ class NodeInfoWatcherTest { fun `save a NodeInfo to JimFs`() { val jimFs = Jimfs.newFileSystem(Configuration.unix()) val jimFolder = jimFs.getPath("/nodeInfo") - NodeInfoWatcher.saveToFile(jimFolder, nodeInfo, ALICE_KEY) + val signedNodeInfo = SignedData(nodeInfo.serialize(), keyManagementService.sign(nodeInfo.serialize().bytes, nodeInfo.legalIdentities.first().owningKey)) + NodeInfoWatcher.saveToFile(jimFolder, signedNodeInfo) } @Test @@ -136,6 +143,7 @@ class NodeInfoWatcherTest { // Write a nodeInfo under the right path. private fun createNodeInfoFileInPath(nodeInfo: NodeInfo) { - NodeInfoWatcher.saveToFile(nodeInfoPath, nodeInfo, ALICE_KEY) + val signedNodeInfo = SignedData(nodeInfo.serialize(), keyManagementService.sign(nodeInfo.serialize().bytes, nodeInfo.legalIdentities.first().owningKey)) + NodeInfoWatcher.saveToFile(nodeInfoPath, signedNodeInfo) } } diff --git a/node/src/main/kotlin/net/corda/node/ArgsParser.kt b/node/src/main/kotlin/net/corda/node/ArgsParser.kt index 3de3f30071..6ed6cd4cec 100644 --- a/node/src/main/kotlin/net/corda/node/ArgsParser.kt +++ b/node/src/main/kotlin/net/corda/node/ArgsParser.kt @@ -4,6 +4,7 @@ import joptsimple.OptionParser import joptsimple.util.EnumConverter import net.corda.core.internal.div import net.corda.node.services.config.ConfigHelper +import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.parseAsNodeConfiguration import org.slf4j.event.Level import java.io.PrintStream @@ -69,6 +70,11 @@ data class CmdLineOptions(val baseDirectory: Path, val noLocalShell: Boolean, val sshdServer: Boolean, val justGenerateNodeInfo: Boolean) { - fun loadConfig() = ConfigHelper - .loadConfig(baseDirectory, configFile).parseAsNodeConfiguration() + fun loadConfig(): NodeConfiguration { + val config = ConfigHelper.loadConfig(baseDirectory, configFile).parseAsNodeConfiguration() + if (isRegistration) { + requireNotNull(config.compatibilityZoneURL) { "Compatibility Zone Url must be provided in registration mode." } + } + return config + } } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 2ed2096e35..a595a7204a 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -8,6 +8,7 @@ import net.corda.confidential.SwapIdentitiesHandler import net.corda.core.CordaException import net.corda.core.concurrent.CordaFuture import net.corda.core.crypto.SignedData +import net.corda.core.crypto.sign import net.corda.core.flows.* import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party @@ -18,10 +19,7 @@ import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.* import net.corda.core.node.* import net.corda.core.node.services.* -import net.corda.core.serialization.SerializationWhitelist -import net.corda.core.serialization.SerializeAsToken -import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.serialization.deserialize +import net.corda.core.serialization.* import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.debug @@ -44,9 +42,7 @@ import net.corda.node.services.events.ScheduledActivityObserver import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.messaging.MessagingService -import net.corda.node.services.network.NetworkMapCacheImpl -import net.corda.node.services.network.NodeInfoWatcher -import net.corda.node.services.network.PersistentNetworkMapCache +import net.corda.node.services.network.* import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.DBTransactionMappingStorage import net.corda.node.services.persistence.DBTransactionStorage @@ -72,6 +68,7 @@ import java.security.cert.CertificateFactory import java.security.cert.X509Certificate import java.sql.Connection import java.time.Clock +import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService @@ -135,6 +132,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected val runOnStop = ArrayList<() -> Any?>() protected lateinit var database: CordaPersistence protected val _nodeReadyFuture = openFuture() + protected val networkMapClient: NetworkMapClient? by lazy { configuration.compatibilityZoneURL?.let(::NetworkMapClient) } + /** Completes once the node has successfully registered with the network map service * or has loaded network map data from local database */ val nodeReadyFuture: CordaFuture @@ -170,7 +169,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration, check(started == null) { "Node has already been started" } log.info("Generating nodeInfo ...") initCertificate() - initNodeInfo() + val keyPairs = initNodeInfo() + val identityKeypair = keyPairs.first { it.public == info.legalIdentities.first().owningKey } + val serialisedNodeInfo = info.serialize() + val signature = identityKeypair.sign(serialisedNodeInfo) + // TODO: Signed data might not be sufficient for multiple identities, as it only contains one signature. + NodeInfoWatcher.saveToFile(configuration.baseDirectory, SignedData(serialisedNodeInfo, signature)) } open fun start(): StartedNode { @@ -184,8 +188,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService) { val transactionStorage = makeTransactionStorage() val stateLoader = StateLoaderImpl(transactionStorage) - val services = makeServices(keyPairs, schemaService, transactionStorage, stateLoader) - val notaryService = makeNotaryService(services) + val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, stateLoader) + val notaryService = makeNotaryService(nodeServices) smm = makeStateMachineManager() val flowStarter = FlowStarterImpl(serverThread, smm) val schedulerService = NodeSchedulerService( @@ -208,7 +212,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, startMessagingService(rpcOps) installCoreFlows() val cordaServices = installCordaServices(flowStarter) - tokenizableServices = services + cordaServices + schedulerService + tokenizableServices = nodeServices + cordaServices + schedulerService registerCordappFlows() _services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows } FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader @@ -216,6 +220,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration, runOnStop += network::stop Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService) } + + val networkMapUpdater = NetworkMapUpdater(services.networkMapCache, + NodeInfoWatcher(configuration.baseDirectory, Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)), + networkMapClient) + runOnStop += networkMapUpdater::close + + networkMapUpdater.updateNodeInfo(services.myInfo) { + val serialisedNodeInfo = it.serialize() + val signature = services.keyManagementService.sign(serialisedNodeInfo.bytes, it.legalIdentities.first().owningKey) + SignedData(serialisedNodeInfo, signature) + } + networkMapUpdater.subscribeToNetworkMap() + // If we successfully loaded network data from database, we set this future to Unit. services.networkMapCache.addNode(info) _nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit }) @@ -245,16 +262,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration, identity } } - info = NodeInfo( myAddresses(), setOf(identity, myNotaryIdentity).filterNotNull(), versionInfo.platformVersion, platformClock.instant().toEpochMilli() ) - - NodeInfoWatcher.saveToFile(configuration.baseDirectory, info, identityKeyPair) - return keyPairs } @@ -727,7 +740,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, NetworkMapCacheImpl( PersistentNetworkMapCache( this@AbstractNode.database, - this@AbstractNode.configuration, networkParameters.notaries), identityService) } @@ -768,4 +780,4 @@ internal class FlowStarterImpl(private val serverThread: AffinityExecutor, priva /** * Thrown when a node is about to start and its network map cache doesn't contain any node. */ -internal class NetworkMapCacheEmptyException: Exception() \ No newline at end of file +internal class NetworkMapCacheEmptyException : Exception() \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 0761169df7..349ef97121 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -176,15 +176,21 @@ open class Node(configuration: NodeConfiguration, * TODO this code used to rely on the networkmap node, we might want to look at a different solution. */ private fun tryDetectIfNotPublicHost(host: String): String? { - if (!AddressUtils.isPublic(host)) { + return if (!AddressUtils.isPublic(host)) { val foundPublicIP = AddressUtils.tryDetectPublicIP() - - if (foundPublicIP != null) { + if (foundPublicIP == null) { + val retrievedHostName = networkMapClient?.myPublicHostname() + if (retrievedHostName != null) { + log.info("Retrieved public IP from Network Map Service: $this. This will be used instead of the provided \"$host\" as the advertised address.") + } + retrievedHostName + } else { log.info("Detected public IP: ${foundPublicIP.hostAddress}. This will be used instead of the provided \"$host\" as the advertised address.") - return foundPublicIP.hostAddress + foundPublicIP.hostAddress } + } else { + null } - return null } override fun startMessagingService(rpcOps: RPCOps) { diff --git a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt index 16de351f63..9e68468962 100644 --- a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt +++ b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt @@ -141,14 +141,15 @@ open class NodeStartup(val args: Array) { } open protected fun maybeRegisterWithNetworkAndExit(cmdlineOptions: CmdLineOptions, conf: NodeConfiguration) { - if (!cmdlineOptions.isRegistration) return + val compatibilityZoneURL = conf.compatibilityZoneURL + if (!cmdlineOptions.isRegistration || compatibilityZoneURL == null) return println() println("******************************************************************") println("* *") println("* Registering as a new participant with Corda network *") println("* *") println("******************************************************************") - NetworkRegistrationHelper(conf, HTTPNetworkRegistrationService(conf.certificateSigningService)).buildKeystore() + NetworkRegistrationHelper(conf, HTTPNetworkRegistrationService(compatibilityZoneURL)).buildKeystore() exitProcess(0) } diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index d54deb153a..85b9dceae1 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -30,6 +30,10 @@ import net.corda.node.utilities.CordaPersistence interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBaseInternal interface NetworkMapCacheBaseInternal : NetworkMapCacheBase { + val allNodeHashes: List + + fun getNodeByHash(nodeHash: SecureHash): NodeInfo? + /** Adds a node to the local cache (generally only used for adding ourselves). */ fun addNode(node: NodeInfo) diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 29d4895dd5..84b4940436 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -25,7 +25,7 @@ interface NodeConfiguration : NodeSSLConfiguration { val rpcUsers: List val devMode: Boolean val devModeOptions: DevModeOptions? - val certificateSigningService: URL + val compatibilityZoneURL: URL? val certificateChainCheckPolicies: List val verifierType: VerifierType val messageRedeliveryDelaySeconds: Int @@ -89,7 +89,7 @@ data class NodeConfigurationImpl( override val trustStorePassword: String, override val dataSourceProperties: Properties, override val database: Properties?, - override val certificateSigningService: URL, + override val compatibilityZoneURL: URL? = null, override val rpcUsers: List, override val verifierType: VerifierType, // TODO typesafe config supports the notion of durations. Make use of that by mapping it to java.time.Duration. diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapClient.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapClient.kt index 2aba89084d..24a3b92fe0 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapClient.kt @@ -1,70 +1,152 @@ package net.corda.node.services.network import com.fasterxml.jackson.databind.ObjectMapper +import com.google.common.util.concurrent.MoreExecutors import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SignedData +import net.corda.core.internal.openHttpConnection import net.corda.core.node.NodeInfo import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize +import net.corda.core.utilities.loggerFor +import net.corda.core.utilities.minutes +import net.corda.core.utilities.seconds +import net.corda.node.services.api.NetworkMapCacheInternal +import net.corda.node.utilities.NamedThreadFactory +import okhttp3.CacheControl +import okhttp3.Headers +import rx.Subscription +import java.io.BufferedReader +import java.io.Closeable import java.net.HttpURLConnection import java.net.URL +import java.time.Duration +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit -interface NetworkMapClient { - /** - * Publish node info to network map service. - */ - fun publish(signedNodeInfo: SignedData) +class NetworkMapClient(compatibilityZoneURL: URL) { + companion object { + val logger = loggerFor() + } - /** - * Retrieve [NetworkMap] from the network map service containing list of node info hashes and network parameter hash. - */ - // TODO: Use NetworkMap object when available. - fun getNetworkMap(): List + private val networkMapUrl = URL("$compatibilityZoneURL/network-map") - /** - * Retrieve [NodeInfo] from network map service using the node info hash. - */ - fun getNodeInfo(nodeInfoHash: SecureHash): NodeInfo? - - // TODO: Implement getNetworkParameter when its available. - //fun getNetworkParameter(networkParameterHash: SecureHash): NetworkParameter -} - -class HTTPNetworkMapClient(private val networkMapUrl: String) : NetworkMapClient { - override fun publish(signedNodeInfo: SignedData) { + fun publish(signedNodeInfo: SignedData) { val publishURL = URL("$networkMapUrl/publish") - val conn = publishURL.openConnection() as HttpURLConnection + val conn = publishURL.openHttpConnection() conn.doOutput = true conn.requestMethod = "POST" conn.setRequestProperty("Content-Type", "application/octet-stream") - conn.outputStream.write(signedNodeInfo.serialize().bytes) - when (conn.responseCode) { - HttpURLConnection.HTTP_OK -> return - HttpURLConnection.HTTP_UNAUTHORIZED -> throw IllegalArgumentException(conn.errorStream.bufferedReader().readLine()) - else -> throw IllegalArgumentException("Unexpected response code ${conn.responseCode}, response error message: '${conn.errorStream.bufferedReader().readLines()}'") + conn.outputStream.use { it.write(signedNodeInfo.serialize().bytes) } + + // This will throw IOException if the response code is not HTTP 200. + // This gives a much better exception then reading the error stream. + conn.inputStream.close() + } + + fun getNetworkMap(): NetworkMapResponse { + val conn = networkMapUrl.openHttpConnection() + val response = conn.inputStream.bufferedReader().use(BufferedReader::readLine) + val networkMap = ObjectMapper().readValue(response, List::class.java).map { SecureHash.parse(it.toString()) } + val timeout = CacheControl.parse(Headers.of(conn.headerFields.filterKeys { it != null }.mapValues { it.value.first() })).maxAgeSeconds().seconds + return NetworkMapResponse(networkMap, timeout) + } + + fun getNodeInfo(nodeInfoHash: SecureHash): NodeInfo? { + val conn = URL("$networkMapUrl/$nodeInfoHash").openHttpConnection() + return if (conn.responseCode == HttpURLConnection.HTTP_NOT_FOUND) { + null + } else { + conn.inputStream.use { it.readBytes() }.deserialize() } } - override fun getNetworkMap(): List { - val conn = URL(networkMapUrl).openConnection() as HttpURLConnection + fun myPublicHostname(): String { + val conn = URL("$networkMapUrl/my-hostname").openHttpConnection() + return conn.inputStream.bufferedReader().use(BufferedReader::readLine) + } +} - return when (conn.responseCode) { - HttpURLConnection.HTTP_OK -> { - val response = conn.inputStream.bufferedReader().use { it.readLine() } - ObjectMapper().readValue(response, List::class.java).map { SecureHash.parse(it.toString()) } +data class NetworkMapResponse(val networkMap: List, val cacheMaxAge: Duration) + +class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, + private val fileWatcher: NodeInfoWatcher, + private val networkMapClient: NetworkMapClient?) : Closeable { + companion object { + private val logger = loggerFor() + private val retryInterval = 1.minutes + } + + private val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater Thread", Executors.defaultThreadFactory())) + private var fileWatcherSubscription: Subscription? = null + + override fun close() { + fileWatcherSubscription?.unsubscribe() + MoreExecutors.shutdownAndAwaitTermination(executor, 50, TimeUnit.SECONDS) + } + + fun updateNodeInfo(newInfo: NodeInfo, signNodeInfo: (NodeInfo) -> SignedData) { + val oldInfo = networkMapCache.getNodeByLegalIdentity(newInfo.legalIdentities.first()) + // Compare node info without timestamp. + if (newInfo.copy(serial = 0L) == oldInfo?.copy(serial = 0L)) return + + // Only publish and write to disk if there are changes to the node info. + val signedNodeInfo = signNodeInfo(newInfo) + fileWatcher.saveToFile(signedNodeInfo) + + if (networkMapClient != null) { + tryPublishNodeInfoAsync(signedNodeInfo, networkMapClient) + } + } + + fun subscribeToNetworkMap() { + require(fileWatcherSubscription == null) { "Should not call this method twice." } + // Subscribe to file based networkMap + fileWatcherSubscription = fileWatcher.nodeInfoUpdates().subscribe(networkMapCache::addNode) + + if (networkMapClient == null) return + // Subscribe to remote network map if configured. + val task = object : Runnable { + override fun run() { + val nextScheduleDelay = try { + val (networkMap, cacheTimeout) = networkMapClient.getNetworkMap() + val currentNodeHashes = networkMapCache.allNodeHashes + (networkMap - currentNodeHashes).mapNotNull { + // Download new node info from network map + networkMapClient.getNodeInfo(it) + }.forEach { + // Add new node info to the network map cache, these could be new node info or modification of node info for existing nodes. + networkMapCache.addNode(it) + } + // Remove node info from network map. + (currentNodeHashes - networkMap - fileWatcher.processedNodeInfoHashes) + .mapNotNull(networkMapCache::getNodeByHash) + .forEach(networkMapCache::removeNode) + + cacheTimeout + } catch (t: Throwable) { + logger.warn("Error encountered while updating network map, will retry in $retryInterval", t) + retryInterval + } + // Schedule the next update. + executor.schedule(this, nextScheduleDelay.toMillis(), TimeUnit.MILLISECONDS) } - else -> throw IllegalArgumentException("Unexpected response code ${conn.responseCode}, response error message: '${conn.errorStream.bufferedReader().readLines()}'") } + executor.submit(task) // The check may be expensive, so always run it in the background even the first time. } - override fun getNodeInfo(nodeInfoHash: SecureHash): NodeInfo? { - val nodeInfoURL = URL("$networkMapUrl/$nodeInfoHash") - val conn = nodeInfoURL.openConnection() as HttpURLConnection - - return when (conn.responseCode) { - HttpURLConnection.HTTP_OK -> conn.inputStream.readBytes().deserialize() - HttpURLConnection.HTTP_NOT_FOUND -> null - else -> throw IllegalArgumentException("Unexpected response code ${conn.responseCode}, response error message: '${conn.errorStream.bufferedReader().readLines()}'") + private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedData, networkMapClient: NetworkMapClient) { + val task = object : Runnable { + override fun run() { + try { + networkMapClient.publish(signedNodeInfo) + } catch (t: Throwable) { + logger.warn("Error encountered while publishing node info, will retry in $retryInterval.", t) + // TODO: Exponential backoff? + executor.schedule(this, retryInterval.toMillis(), TimeUnit.MILLISECONDS) + } + } } + executor.submit(task) } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt index 32dfecee97..579c91afe4 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt @@ -1,8 +1,8 @@ package net.corda.node.services.network import net.corda.cordform.CordformNode +import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SignedData -import net.corda.core.crypto.sign import net.corda.core.internal.* import net.corda.core.node.NodeInfo import net.corda.core.serialization.deserialize @@ -15,7 +15,7 @@ import rx.Scheduler import rx.schedulers.Schedulers import java.io.IOException import java.nio.file.Path -import java.security.KeyPair +import java.time.Duration import java.util.concurrent.TimeUnit import kotlin.streams.toList @@ -24,19 +24,20 @@ import kotlin.streams.toList * - Serialize and de-serialize a [NodeInfo] to disk and reading it back. * - Poll a directory for new serialized [NodeInfo] * - * @param path the base path of a node. - * @param pollFrequencyMsec how often to poll the filesystem in milliseconds. Any value smaller than 5 seconds will - * be treated as 5 seconds. + * @param nodePath the base path of a node. + * @param pollInterval how often to poll the filesystem in milliseconds. Must be longer then 5 seconds. * @param scheduler a [Scheduler] for the rx [Observable] returned by [nodeInfoUpdates], this is mainly useful for * testing. It defaults to the io scheduler which is the appropriate value for production uses. */ +// TODO: Use NIO watch service instead? class NodeInfoWatcher(private val nodePath: Path, - pollFrequencyMsec: Long = 5.seconds.toMillis(), + private val pollInterval: Duration = 5.seconds, private val scheduler: Scheduler = Schedulers.io()) { private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY - private val pollFrequencyMsec: Long = maxOf(pollFrequencyMsec, 5.seconds.toMillis()) - private val successfullyProcessedFiles = mutableSetOf() + private val processedNodeInfoFiles = mutableSetOf() + private val _processedNodeInfoHashes = mutableSetOf() + val processedNodeInfoHashes: Set get() = _processedNodeInfoHashes.toSet() companion object { private val logger = loggerFor() @@ -48,17 +49,14 @@ class NodeInfoWatcher(private val nodePath: Path, * is used so that one can freely copy these files without fearing to overwrite another one. * * @param path the path where to write the file, if non-existent it will be created. - * @param nodeInfo the NodeInfo to serialize. - * @param signingKey used to sign the NodeInfo data. + * @param signedNodeInfo the signed NodeInfo. */ - fun saveToFile(path: Path, nodeInfo: NodeInfo, signingKey: KeyPair) { + fun saveToFile(path: Path, signedNodeInfo: SignedData) { try { path.createDirectories() - val serializedBytes = nodeInfo.serialize() - val regSig = signingKey.sign(serializedBytes.bytes) - val signedData = SignedData(serializedBytes, regSig) - signedData.serialize().open().copyTo( - path / "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${serializedBytes.hash}") + signedNodeInfo.serialize() + .open() + .copyTo(path / "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${signedNodeInfo.raw.hash}") } catch (e: Exception) { logger.warn("Couldn't write node info to file", e) } @@ -66,6 +64,7 @@ class NodeInfoWatcher(private val nodePath: Path, } init { + require(pollInterval >= 5.seconds) { "Poll interval must be 5 seconds or longer." } if (!nodeInfoDirectory.isDirectory()) { try { nodeInfoDirectory.createDirectories() @@ -85,10 +84,12 @@ class NodeInfoWatcher(private val nodePath: Path, * @return an [Observable] returning [NodeInfo]s, at most one [NodeInfo] is returned for each processed file. */ fun nodeInfoUpdates(): Observable { - return Observable.interval(pollFrequencyMsec, TimeUnit.MILLISECONDS, scheduler) + return Observable.interval(pollInterval.toMillis(), TimeUnit.MILLISECONDS, scheduler) .flatMapIterable { loadFromDirectory() } } + fun saveToFile(signedNodeInfo: SignedData) = Companion.saveToFile(nodePath, signedNodeInfo) + /** * Loads all the files contained in a given path and returns the deserialized [NodeInfo]s. * Signatures are checked before returning a value. @@ -100,10 +101,13 @@ class NodeInfoWatcher(private val nodePath: Path, return emptyList() } val result = nodeInfoDirectory.list { paths -> - paths.filter { it !in successfullyProcessedFiles } + paths.filter { it !in processedNodeInfoFiles } .filter { it.isRegularFile() } .map { path -> - processFile(path)?.apply { successfullyProcessedFiles.add(path) } + processFile(path)?.apply { + processedNodeInfoFiles.add(path) + _processedNodeInfoHashes.add(this.serialize().hash) + } } .toList() .filterNotNull() @@ -115,13 +119,13 @@ class NodeInfoWatcher(private val nodePath: Path, } private fun processFile(file: Path): NodeInfo? { - try { + return try { logger.info("Reading NodeInfo from file: $file") val signedData = file.readAll().deserialize>() - return signedData.verified() + signedData.verified() } catch (e: Exception) { logger.warn("Exception parsing NodeInfo from file. $file", e) - return null + null } } } diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt index 33f98f7322..b6e9cd7818 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -1,6 +1,7 @@ package net.corda.node.services.network import net.corda.core.concurrent.CordaFuture +import net.corda.core.crypto.SecureHash import net.corda.core.crypto.toStringShort import net.corda.core.identity.AbstractParty import net.corda.core.identity.CordaX500Name @@ -16,11 +17,11 @@ import net.corda.core.node.services.IdentityService import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.node.services.PartyInfo import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.serialization.serialize import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.loggerFor import net.corda.node.services.api.NetworkMapCacheBaseInternal import net.corda.node.services.api.NetworkMapCacheInternal -import net.corda.node.services.config.NodeConfiguration import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.bufferUntilDatabaseCommit import net.corda.node.utilities.wrapWithDatabaseTransaction @@ -63,7 +64,6 @@ class NetworkMapCacheImpl( @ThreadSafe open class PersistentNetworkMapCache( private val database: CordaPersistence, - val configuration: NodeConfiguration, notaries: List ) : SingletonSerializeAsToken(), NetworkMapCacheBaseInternal { companion object { @@ -88,17 +88,33 @@ open class PersistentNetworkMapCache( override val notaryIdentities: List = notaries.map { it.identity } private val validatingNotaries = notaries.mapNotNullTo(HashSet()) { if (it.validating) it.identity else null } - private val nodeInfoSerializer = NodeInfoWatcher(configuration.baseDirectory, - configuration.additionalNodeInfoPollingFrequencyMsec) - init { - loadFromFiles() database.transaction { loadFromDB(session) } } - private fun loadFromFiles() { - logger.info("Loading network map from files..") - nodeInfoSerializer.nodeInfoUpdates().subscribe { node -> addNode(node) } + override val allNodeHashes: List + get() { + return database.transaction { + val builder = session.criteriaBuilder + val query = builder.createQuery(String::class.java).run { + from(NodeInfoSchemaV1.PersistentNodeInfo::class.java).run { + select(get(NodeInfoSchemaV1.PersistentNodeInfo::hash.name)) + } + } + session.createQuery(query).resultList.map { SecureHash.sha256(it) } + } + } + + override fun getNodeByHash(nodeHash: SecureHash): NodeInfo? { + return database.transaction { + val builder = session.criteriaBuilder + val query = builder.createQuery(NodeInfoSchemaV1.PersistentNodeInfo::class.java).run { + from(NodeInfoSchemaV1.PersistentNodeInfo::class.java).run { + where(builder.equal(get(NodeInfoSchemaV1.PersistentNodeInfo::hash.name), nodeHash.toString())) + } + } + session.createQuery(query).resultList.singleOrNull()?.toNodeInfo() + } } override fun isValidatingNotary(party: Party): Boolean = party in validatingNotaries @@ -277,10 +293,12 @@ open class PersistentNetworkMapCache( else result.map { it.toNodeInfo() }.singleOrNull() ?: throw IllegalStateException("More than one node with the same host and port") } + /** Object Relational Mapping support. */ private fun generateMappedObject(nodeInfo: NodeInfo): NodeInfoSchemaV1.PersistentNodeInfo { return NodeInfoSchemaV1.PersistentNodeInfo( id = 0, + hash = nodeInfo.serialize().hash.toString(), addresses = nodeInfo.addresses.map { NodeInfoSchemaV1.DBHostAndPort.fromHostAndPort(it) }, legalIdentitiesAndCerts = nodeInfo.legalIdentitiesAndCerts.mapIndexed { idx, elem -> NodeInfoSchemaV1.DBPartyAndCertificate(elem, isMain = idx == 0) diff --git a/node/src/main/kotlin/net/corda/node/utilities/registration/HTTPNetworkRegistrationService.kt b/node/src/main/kotlin/net/corda/node/utilities/registration/HTTPNetworkRegistrationService.kt index 6904f91258..8461ff2e19 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/registration/HTTPNetworkRegistrationService.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/registration/HTTPNetworkRegistrationService.kt @@ -1,6 +1,7 @@ package net.corda.node.utilities.registration import com.google.common.net.MediaType +import net.corda.core.internal.openHttpConnection import net.corda.node.utilities.CertificateStream import org.apache.commons.io.IOUtils import org.bouncycastle.pkcs.PKCS10CertificationRequest @@ -12,7 +13,9 @@ import java.security.cert.Certificate import java.util.* import java.util.zip.ZipInputStream -class HTTPNetworkRegistrationService(val server: URL) : NetworkRegistrationService { +class HTTPNetworkRegistrationService(compatibilityZoneURL: URL) : NetworkRegistrationService { + private val registrationURL = URL("$compatibilityZoneURL/certificate") + companion object { // TODO: Propagate version information from gradle val clientVersion = "1.0" @@ -21,7 +24,7 @@ class HTTPNetworkRegistrationService(val server: URL) : NetworkRegistrationServi @Throws(CertificateRequestException::class) override fun retrieveCertificates(requestId: String): Array? { // Poll server to download the signed certificate once request has been approved. - val url = URL("$server/api/certificate/$requestId") + val url = URL("$registrationURL/$requestId") val conn = url.openConnection() as HttpURLConnection conn.requestMethod = "GET" @@ -43,7 +46,7 @@ class HTTPNetworkRegistrationService(val server: URL) : NetworkRegistrationServi override fun submitRequest(request: PKCS10CertificationRequest): String { // Post request to certificate signing server via http. - val conn = URL("$server/api/certificate").openConnection() as HttpURLConnection + val conn = URL("$registrationURL").openHttpConnection() conn.doOutput = true conn.requestMethod = "POST" conn.setRequestProperty("Content-Type", "application/octet-stream") diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index 37ff984da6..ef16577ae1 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -14,7 +14,6 @@ database = { initDatabase = true } devMode = true -certificateSigningService = "https://cordaci-netperm.corda.r3cev.com" useHTTPS = false h2port = 0 useTestClock = false diff --git a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt index 3bcd80be35..bca85ad551 100644 --- a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt @@ -41,7 +41,6 @@ class NodeConfigurationImplTest { trustStorePassword = "trustpass", dataSourceProperties = makeTestDataSourceProperties(ALICE.name.organisation), database = makeTestDatabaseProperties(), - certificateSigningService = URL("http://localhost"), rpcUsers = emptyList(), verifierType = VerifierType.InMemory, useHTTPS = false, diff --git a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt index c53c8b750d..849e56d69c 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt @@ -106,7 +106,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() { doReturn(configuration).whenever(it).configuration doReturn(MonitoringService(MetricRegistry())).whenever(it).monitoringService doReturn(validatedTransactions).whenever(it).validatedTransactions - doReturn(NetworkMapCacheImpl(MockNetworkMapCache(database, configuration), identityService)).whenever(it).networkMapCache + doReturn(NetworkMapCacheImpl(MockNetworkMapCache(database), identityService)).whenever(it).networkMapCache doCallRealMethod().whenever(it).signInitialTransaction(any(), any()) doReturn(myInfo).whenever(it).myInfo doReturn(kms).whenever(it).keyManagementService diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index b19125f6e6..95c0be2bd7 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -1,6 +1,7 @@ package net.corda.node.services.messaging import com.codahale.metrics.MetricRegistry +import com.nhaarman.mockito_kotlin.mock import net.corda.core.concurrent.CordaFuture import net.corda.core.crypto.generateKeyPair import net.corda.core.internal.concurrent.doneFuture @@ -13,6 +14,7 @@ import net.corda.node.services.api.MonitoringService import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.network.NetworkMapCacheImpl +import net.corda.node.services.network.NetworkMapClient import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor @@ -30,6 +32,7 @@ import org.junit.Before import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder +import org.mockito.Mockito.mock import java.net.ServerSocket import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit.MILLISECONDS @@ -79,7 +82,7 @@ class ArtemisMessagingTests { LogHelper.setLevel(PersistentUniquenessProvider::class) database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), ::makeTestIdentityService) networkMapRegistrationFuture = doneFuture(Unit) - networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, config, emptyList()), rigorousMock()) + networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()), rigorousMock()) } @After diff --git a/node/src/test/kotlin/net/corda/node/services/network/HTTPNetworkMapClientTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapClientTest.kt similarity index 72% rename from node/src/test/kotlin/net/corda/node/services/network/HTTPNetworkMapClientTest.kt rename to node/src/test/kotlin/net/corda/node/services/network/NetworkMapClientTest.kt index 25a4e77855..b973992489 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/HTTPNetworkMapClientTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapClientTest.kt @@ -8,6 +8,8 @@ import net.corda.core.node.NodeInfo import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.seconds +import net.corda.node.services.network.TestNodeInfoFactory.createNodeInfo import net.corda.node.utilities.CertificateType import net.corda.node.utilities.X509Utilities import net.corda.testing.SerializationEnvironmentRule @@ -28,6 +30,7 @@ import org.junit.Test import java.io.ByteArrayInputStream import java.io.InputStream import java.net.InetSocketAddress +import java.net.URL import java.security.cert.CertPath import java.security.cert.Certificate import java.security.cert.CertificateFactory @@ -38,7 +41,7 @@ import javax.ws.rs.core.Response import javax.ws.rs.core.Response.ok import kotlin.test.assertEquals -class HTTPNetworkMapClientTest { +class NetworkMapClientTest { @Rule @JvmField val testSerialization = SerializationEnvironmentRule(true) @@ -61,7 +64,7 @@ class HTTPNetworkMapClientTest { register(MockNetworkMapServer()) } val jerseyServlet = ServletHolder(ServletContainer(resourceConfig)).apply { initOrder = 0 }// Initialise at server start - addServlet(jerseyServlet, "/api/*") + addServlet(jerseyServlet, "/*") }) } } @@ -72,7 +75,7 @@ class HTTPNetworkMapClientTest { } val hostAndPort = server.connectors.mapNotNull { it as? ServerConnector }.first() - networkMapClient = HTTPNetworkMapClient("http://${hostAndPort.host}:${hostAndPort.localPort}/api/network-map") + networkMapClient = NetworkMapClient(URL("http://${hostAndPort.host}:${hostAndPort.localPort}")) } @After @@ -90,7 +93,7 @@ class HTTPNetworkMapClientTest { val nodeInfoHash = nodeInfo.serialize().sha256() - assertThat(networkMapClient.getNetworkMap()).containsExactly(nodeInfoHash) + assertThat(networkMapClient.getNetworkMap().networkMap).containsExactly(nodeInfoHash) assertEquals(nodeInfo, networkMapClient.getNodeInfo(nodeInfoHash)) val signedNodeInfo2 = createNodeInfo("Test2") @@ -98,27 +101,21 @@ class HTTPNetworkMapClientTest { networkMapClient.publish(signedNodeInfo2) val nodeInfoHash2 = nodeInfo2.serialize().sha256() - assertThat(networkMapClient.getNetworkMap()).containsExactly(nodeInfoHash, nodeInfoHash2) + assertThat(networkMapClient.getNetworkMap().networkMap).containsExactly(nodeInfoHash, nodeInfoHash2) + assertEquals(100000.seconds, networkMapClient.getNetworkMap().cacheMaxAge) assertEquals(nodeInfo2, networkMapClient.getNodeInfo(nodeInfoHash2)) } - private fun createNodeInfo(organisation: String): SignedData { - val keyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME) - val clientCert = X509Utilities.createCertificate(CertificateType.CLIENT_CA, intermediateCACert, intermediateCAKey, CordaX500Name(organisation = organisation, locality = "London", country = "GB"), keyPair.public) - val certPath = buildCertPath(clientCert.toX509Certificate(), intermediateCACert.toX509Certificate(), rootCACert.toX509Certificate()) - val nodeInfo = NodeInfo(listOf(NetworkHostAndPort("my.$organisation.com", 1234)), listOf(PartyAndCertificate(certPath)), 1, serial = 1L) - - // Create digital signature. - val digitalSignature = DigitalSignature.WithKey(keyPair.public, Crypto.doSign(keyPair.private, nodeInfo.serialize().bytes)) - - return SignedData(nodeInfo.serialize(), digitalSignature) + @Test + fun `get hostname string from http response correctly`() { + assertEquals("test.host.name", networkMapClient.myPublicHostname()) } } @Path("network-map") // This is a stub implementation of the network map rest API. internal class MockNetworkMapServer { - private val nodeInfos = mutableMapOf() + val nodeInfoMap = mutableMapOf() @POST @Path("publish") @Consumes(MediaType.APPLICATION_OCTET_STREAM) @@ -126,33 +123,31 @@ internal class MockNetworkMapServer { val registrationData = input.readBytes().deserialize>() val nodeInfo = registrationData.verified() val nodeInfoHash = nodeInfo.serialize().sha256() - nodeInfos.put(nodeInfoHash, nodeInfo) + nodeInfoMap.put(nodeInfoHash, nodeInfo) return ok().build() } @GET @Produces(MediaType.APPLICATION_JSON) fun getNetworkMap(): Response { - return Response.ok(ObjectMapper().writeValueAsString(nodeInfos.keys.map { it.toString() })).build() + return Response.ok(ObjectMapper().writeValueAsString(nodeInfoMap.keys.map { it.toString() })).header("Cache-Control", "max-age=100000").build() } @GET @Path("{var}") @Produces(MediaType.APPLICATION_OCTET_STREAM) fun getNodeInfo(@PathParam("var") nodeInfoHash: String): Response { - val nodeInfo = nodeInfos[SecureHash.parse(nodeInfoHash)] + val nodeInfo = nodeInfoMap[SecureHash.parse(nodeInfoHash)] return if (nodeInfo != null) { Response.ok(nodeInfo.serialize().bytes) } else { Response.status(Response.Status.NOT_FOUND) }.build() } -} -private fun buildCertPath(vararg certificates: Certificate): CertPath { - return CertificateFactory.getInstance("X509").generateCertPath(certificates.asList()) + @GET + @Path("my-hostname") + fun getHostName(): Response { + return Response.ok("test.host.name").build() + } } - -private fun X509CertificateHolder.toX509Certificate(): X509Certificate { - return CertificateFactory.getInstance("X509").generateCertificate(ByteArrayInputStream(encoded)) as X509Certificate -} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt new file mode 100644 index 0000000000..3390d32968 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt @@ -0,0 +1,232 @@ +package net.corda.node.services.network + +import com.google.common.jimfs.Configuration +import com.google.common.jimfs.Jimfs +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.times +import com.nhaarman.mockito_kotlin.verify +import net.corda.cordform.CordformNode +import net.corda.core.crypto.Crypto +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.SignedData +import net.corda.core.identity.Party +import net.corda.core.internal.div +import net.corda.core.internal.uncheckedCast +import net.corda.core.node.NodeInfo +import net.corda.core.serialization.serialize +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.millis +import net.corda.node.services.api.NetworkMapCacheInternal +import net.corda.testing.SerializationEnvironmentRule +import org.junit.Rule +import org.junit.Test +import rx.schedulers.TestScheduler +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals + +class NetworkMapUpdaterTest { + @Rule + @JvmField + val testSerialization = SerializationEnvironmentRule(true) + private val jimFs = Jimfs.newFileSystem(Configuration.unix()) + private val baseDir = jimFs.getPath("/node") + + @Test + fun `publish node info`() { + val keyPair = Crypto.generateKeyPair() + + val nodeInfo1 = TestNodeInfoFactory.createNodeInfo("Info 1").verified() + val signedNodeInfo = TestNodeInfoFactory.sign(keyPair, nodeInfo1) + + val sameNodeInfoDifferentTime = nodeInfo1.copy(serial = System.currentTimeMillis()) + val signedSameNodeInfoDifferentTime = TestNodeInfoFactory.sign(keyPair, sameNodeInfoDifferentTime) + + val differentNodeInfo = nodeInfo1.copy(addresses = listOf(NetworkHostAndPort("my.new.host.com", 1000))) + val signedDifferentNodeInfo = TestNodeInfoFactory.sign(keyPair, differentNodeInfo) + + val networkMapCache = getMockNetworkMapCache() + + val networkMapClient = mock() + + val scheduler = TestScheduler() + val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler) + val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient) + + // Publish node info for the first time. + updater.updateNodeInfo(nodeInfo1) { signedNodeInfo } + // Sleep as publish is asynchronous. + // TODO: Remove sleep in unit test + Thread.sleep(200) + verify(networkMapClient, times(1)).publish(any()) + + networkMapCache.addNode(nodeInfo1) + + // Publish the same node info, but with different serial. + updater.updateNodeInfo(sameNodeInfoDifferentTime) { signedSameNodeInfoDifferentTime } + // TODO: Remove sleep in unit test. + Thread.sleep(200) + + // Same node info should not publish twice + verify(networkMapClient, times(0)).publish(signedSameNodeInfoDifferentTime) + + // Publish different node info. + updater.updateNodeInfo(differentNodeInfo) { signedDifferentNodeInfo } + // TODO: Remove sleep in unit test. + Thread.sleep(200) + verify(networkMapClient, times(1)).publish(signedDifferentNodeInfo) + + updater.close() + } + + @Test + fun `process add node updates from network map, with additional node infos from dir`() { + val nodeInfo1 = TestNodeInfoFactory.createNodeInfo("Info 1") + val nodeInfo2 = TestNodeInfoFactory.createNodeInfo("Info 2") + val nodeInfo3 = TestNodeInfoFactory.createNodeInfo("Info 3") + val nodeInfo4 = TestNodeInfoFactory.createNodeInfo("Info 4") + val fileNodeInfo = TestNodeInfoFactory.createNodeInfo("Info from file") + val networkMapCache = getMockNetworkMapCache() + + val nodeInfoMap = ConcurrentHashMap>() + val networkMapClient = mock { + on { publish(any()) }.then { + val signedNodeInfo: SignedData = uncheckedCast(it.arguments.first()) + nodeInfoMap.put(signedNodeInfo.verified().serialize().hash, signedNodeInfo) + } + on { getNetworkMap() }.then { NetworkMapResponse(nodeInfoMap.keys.toList(), 100.millis) } + on { getNodeInfo(any()) }.then { nodeInfoMap[it.arguments.first()]?.verified() } + } + + val scheduler = TestScheduler() + val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler) + val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient) + + // Test adding new node. + networkMapClient.publish(nodeInfo1) + // Not subscribed yet. + verify(networkMapCache, times(0)).addNode(any()) + + updater.subscribeToNetworkMap() + networkMapClient.publish(nodeInfo2) + + // TODO: Remove sleep in unit test. + Thread.sleep(200) + verify(networkMapCache, times(2)).addNode(any()) + verify(networkMapCache, times(1)).addNode(nodeInfo1.verified()) + verify(networkMapCache, times(1)).addNode(nodeInfo2.verified()) + + NodeInfoWatcher.saveToFile(baseDir / CordformNode.NODE_INFO_DIRECTORY, fileNodeInfo) + networkMapClient.publish(nodeInfo3) + networkMapClient.publish(nodeInfo4) + + scheduler.advanceTimeBy(10, TimeUnit.SECONDS) + // TODO: Remove sleep in unit test. + Thread.sleep(200) + + // 4 node info from network map, and 1 from file. + verify(networkMapCache, times(5)).addNode(any()) + verify(networkMapCache, times(1)).addNode(nodeInfo3.verified()) + verify(networkMapCache, times(1)).addNode(nodeInfo4.verified()) + verify(networkMapCache, times(1)).addNode(fileNodeInfo.verified()) + + updater.close() + } + + @Test + fun `process remove node updates from network map, with additional node infos from dir`() { + val nodeInfo1 = TestNodeInfoFactory.createNodeInfo("Info 1") + val nodeInfo2 = TestNodeInfoFactory.createNodeInfo("Info 2") + val nodeInfo3 = TestNodeInfoFactory.createNodeInfo("Info 3") + val nodeInfo4 = TestNodeInfoFactory.createNodeInfo("Info 4") + val fileNodeInfo = TestNodeInfoFactory.createNodeInfo("Info from file") + val networkMapCache = getMockNetworkMapCache() + + val nodeInfoMap = ConcurrentHashMap>() + val networkMapClient = mock { + on { publish(any()) }.then { + val signedNodeInfo: SignedData = uncheckedCast(it.arguments.first()) + nodeInfoMap.put(signedNodeInfo.verified().serialize().hash, signedNodeInfo) + } + on { getNetworkMap() }.then { NetworkMapResponse(nodeInfoMap.keys.toList(), 100.millis) } + on { getNodeInfo(any()) }.then { nodeInfoMap[it.arguments.first()]?.verified() } + } + + val scheduler = TestScheduler() + val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler) + val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient) + + // Add all nodes. + NodeInfoWatcher.saveToFile(baseDir / CordformNode.NODE_INFO_DIRECTORY, fileNodeInfo) + networkMapClient.publish(nodeInfo1) + networkMapClient.publish(nodeInfo2) + networkMapClient.publish(nodeInfo3) + networkMapClient.publish(nodeInfo4) + + updater.subscribeToNetworkMap() + scheduler.advanceTimeBy(10, TimeUnit.SECONDS) + // TODO: Remove sleep in unit test. + Thread.sleep(200) + + // 4 node info from network map, and 1 from file. + assertEquals(4, nodeInfoMap.size) + verify(networkMapCache, times(5)).addNode(any()) + verify(networkMapCache, times(1)).addNode(fileNodeInfo.verified()) + + // Test remove node. + nodeInfoMap.clear() + // TODO: Remove sleep in unit test. + Thread.sleep(200) + verify(networkMapCache, times(4)).removeNode(any()) + verify(networkMapCache, times(1)).removeNode(nodeInfo1.verified()) + verify(networkMapCache, times(1)).removeNode(nodeInfo2.verified()) + verify(networkMapCache, times(1)).removeNode(nodeInfo3.verified()) + verify(networkMapCache, times(1)).removeNode(nodeInfo4.verified()) + + // Node info from file should not be deleted + assertEquals(1, networkMapCache.allNodeHashes.size) + assertEquals(fileNodeInfo.verified().serialize().hash, networkMapCache.allNodeHashes.first()) + + updater.close() + } + + @Test + fun `receive node infos from directory, without a network map`() { + val fileNodeInfo = TestNodeInfoFactory.createNodeInfo("Info from file") + + val networkMapCache = getMockNetworkMapCache() + + val scheduler = TestScheduler() + val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler) + val updater = NetworkMapUpdater(networkMapCache, fileWatcher, null) + + // Not subscribed yet. + verify(networkMapCache, times(0)).addNode(any()) + + updater.subscribeToNetworkMap() + + NodeInfoWatcher.saveToFile(baseDir / CordformNode.NODE_INFO_DIRECTORY, fileNodeInfo) + scheduler.advanceTimeBy(10, TimeUnit.SECONDS) + + verify(networkMapCache, times(1)).addNode(any()) + verify(networkMapCache, times(1)).addNode(fileNodeInfo.verified()) + + assertEquals(1, networkMapCache.allNodeHashes.size) + assertEquals(fileNodeInfo.verified().serialize().hash, networkMapCache.allNodeHashes.first()) + + updater.close() + } + + private fun getMockNetworkMapCache() = mock { + val data = ConcurrentHashMap() + on { addNode(any()) }.then { + val nodeInfo = it.arguments.first() as NodeInfo + data.put(nodeInfo.legalIdentities.first(), nodeInfo) + } + on { removeNode(any()) }.then { data.remove((it.arguments.first() as NodeInfo).legalIdentities.first()) } + on { getNodeByLegalIdentity(any()) }.then { data[it.arguments.first()] } + on { allNodeHashes }.then { data.values.map { it.serialize().hash } } + on { getNodeByHash(any()) }.then { mock -> data.values.single { it.serialize().hash == mock.arguments.first() } } + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/network/TestNodeInfoFactory.kt b/node/src/test/kotlin/net/corda/node/services/network/TestNodeInfoFactory.kt new file mode 100644 index 0000000000..78840967fc --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/network/TestNodeInfoFactory.kt @@ -0,0 +1,50 @@ +package net.corda.node.services.network + +import net.corda.core.crypto.Crypto +import net.corda.core.crypto.DigitalSignature +import net.corda.core.crypto.SignedData +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.PartyAndCertificate +import net.corda.core.node.NodeInfo +import net.corda.core.serialization.serialize +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.utilities.CertificateType +import net.corda.node.utilities.X509Utilities +import org.bouncycastle.asn1.x500.X500Name +import org.bouncycastle.cert.X509CertificateHolder +import java.io.ByteArrayInputStream +import java.security.KeyPair +import java.security.cert.CertPath +import java.security.cert.Certificate +import java.security.cert.CertificateFactory +import java.security.cert.X509Certificate + +object TestNodeInfoFactory { + private val rootCAKey = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME) + private val rootCACert = X509Utilities.createSelfSignedCACertificate(CordaX500Name(commonName = "Corda Node Root CA", organisation = "R3 LTD", locality = "London", country = "GB"), rootCAKey) + private val intermediateCAKey = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME) + private val intermediateCACert = X509Utilities.createCertificate(CertificateType.INTERMEDIATE_CA, rootCACert, rootCAKey, X500Name("CN=Corda Node Intermediate CA,L=London"), intermediateCAKey.public) + + fun createNodeInfo(organisation: String): SignedData { + val keyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME) + val clientCert = X509Utilities.createCertificate(CertificateType.CLIENT_CA, intermediateCACert, intermediateCAKey, CordaX500Name(organisation = organisation, locality = "London", country = "GB"), keyPair.public) + val certPath = buildCertPath(clientCert.toX509Certificate(), intermediateCACert.toX509Certificate(), rootCACert.toX509Certificate()) + val nodeInfo = NodeInfo(listOf(NetworkHostAndPort("my.$organisation.com", 1234)), listOf(PartyAndCertificate(certPath)), 1, serial = 1L) + return sign(keyPair, nodeInfo) + } + + fun sign(keyPair: KeyPair, t: T): SignedData { + // Create digital signature. + val digitalSignature = DigitalSignature.WithKey(keyPair.public, Crypto.doSign(keyPair.private, t.serialize().bytes)) + return SignedData(t.serialize(), digitalSignature) + } + + private fun buildCertPath(vararg certificates: Certificate): CertPath { + return CertificateFactory.getInstance("X509").generateCertPath(certificates.asList()) + } + + private fun X509CertificateHolder.toX509Certificate(): X509Certificate { + return CertificateFactory.getInstance("X509").generateCertificate(ByteArrayInputStream(encoded)) as X509Certificate + } + +} \ No newline at end of file diff --git a/node/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/node/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000..ca6ee9cea8 --- /dev/null +++ b/node/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/NodeTestUtils.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/NodeTestUtils.kt index 3a68395b46..0649fbe0fc 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/NodeTestUtils.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/NodeTestUtils.kt @@ -8,6 +8,7 @@ import com.nhaarman.mockito_kotlin.whenever import net.corda.core.identity.CordaX500Name import net.corda.core.node.ServiceHub import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.seconds import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.VerifierType @@ -60,11 +61,11 @@ fun testNodeConfiguration( doReturn("").whenever(it).emailAddress doReturn("").whenever(it).exportJMXto doReturn(true).whenever(it).devMode - doReturn(URL("http://localhost")).whenever(it).certificateSigningService + doReturn(null).whenever(it).compatibilityZoneURL doReturn(emptyList()).whenever(it).certificateChainCheckPolicies doReturn(VerifierType.InMemory).whenever(it).verifierType doReturn(5).whenever(it).messageRedeliveryDelaySeconds - doReturn(0L).whenever(it).additionalNodeInfoPollingFrequencyMsec + doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec doReturn(null).whenever(it).devModeOptions doCallRealMethod().whenever(it).certificatesDirectory doCallRealMethod().whenever(it).trustStoreFile diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt index 6e10455fee..26167cbe11 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt @@ -18,10 +18,7 @@ import java.math.BigInteger /** * Network map cache with no backing map service. */ -class MockNetworkMapCache( - database: CordaPersistence, - configuration: NodeConfiguration -) : PersistentNetworkMapCache(database, configuration, emptyList()) { +class MockNetworkMapCache(database: CordaPersistence) : PersistentNetworkMapCache(database, emptyList()) { private companion object { val BANK_C = getTestPartyAndCertificate(CordaX500Name(organisation = "Bank C", locality = "London", country = "GB"), entropyToKeyPair(BigInteger.valueOf(1000)).public) val BANK_D = getTestPartyAndCertificate(CordaX500Name(organisation = "Bank D", locality = "London", country = "GB"), entropyToKeyPair(BigInteger.valueOf(2000)).public) @@ -35,7 +32,8 @@ class MockNetworkMapCache( init { val mockNodeA = NodeInfo(listOf(BANK_C_ADDR), listOf(BANK_C), 1, serial = 1L) val mockNodeB = NodeInfo(listOf(BANK_D_ADDR), listOf(BANK_D), 1, serial = 1L) - partyNodes.add(mockNodeA) - partyNodes.add(mockNodeB) + addNode(mockNodeA) + addNode(mockNodeB) } } +