From 1e6be340eb3516fa8b477a6071a2cf813065c3ff Mon Sep 17 00:00:00 2001 From: Dimitris Gounaris <17044221+dgounaris@users.noreply.github.com> Date: Tue, 28 Jul 2020 17:02:53 +0300 Subject: [PATCH] CORDA-3844: bulk node infos request (#6411) * CORDA-3844: Add new functions to network map client * CORDA-3844: Apply new fetch logic to nm updater * CORDA-3844: Fix base url and warnings * CORDA-3844: Change response object and response validation In order to make sure that the returned node infos are not maliciously modified, either a signed list response or a signed reference object would need to be provided. As providing a signed list requires a lot of effort from NM and Signer services, the signed network map is provided instead, allowing nodes to validate that the list provided conforms to the entries of the signed network map. * CORDA-3844: Add clarifications and comments * CORDA-3844: Add error handling for bulk request * CORDA-3844: Enhance testing * CORDA-3844: Fix detekt issues * EG-3844: Apply pr suggestions --- .../node/services/network/NetworkMapClient.kt | 24 ++++++- .../services/network/NetworkMapUpdater.kt | 70 +++++++++++++----- .../HTTPNetworkRegistrationService.kt | 5 ++ .../services/network/NetworkMapClientTest.kt | 23 ++++++ .../services/network/NetworkMapUpdaterTest.kt | 71 +++++++++++++++++++ .../node/internal/network/NetworkMapServer.kt | 16 ++++- 6 files changed, 189 insertions(+), 20 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapClient.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapClient.kt index 91a0e159c6..de1cccac8e 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapClient.kt @@ -2,6 +2,7 @@ package net.corda.node.services.network import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SignedData +import net.corda.core.crypto.sha256 import net.corda.core.internal.openHttpConnection import net.corda.core.internal.post import net.corda.core.internal.responseAs @@ -13,6 +14,7 @@ import net.corda.core.utilities.seconds import net.corda.core.utilities.trace import net.corda.node.VersionInfo import net.corda.node.utilities.registration.cacheControl +import net.corda.node.utilities.registration.cordaServerVersion import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.network.NetworkMap import net.corda.nodeapi.internal.network.SignedNetworkMap @@ -61,8 +63,9 @@ class NetworkMapClient(compatibilityZoneURL: URL, private val versionInfo: Versi val signedNetworkMap = connection.responseAs() val networkMap = signedNetworkMap.verifiedNetworkMapCert(trustRoot) val timeout = connection.cacheControl.maxAgeSeconds().seconds + val version = connection.cordaServerVersion logger.trace { "Fetched network map update from $url successfully: $networkMap" } - return NetworkMapResponse(networkMap, timeout) + return NetworkMapResponse(networkMap, timeout, version) } fun getNodeInfo(nodeInfoHash: SecureHash): NodeInfo { @@ -81,6 +84,23 @@ class NetworkMapClient(compatibilityZoneURL: URL, private val versionInfo: Versi return networkParameter } + fun getNodeInfos(): List { + val url = URL("$networkMapUrl/node-infos") + logger.trace { "Fetching node infos from $url." } + val verifiedNodeInfo = url.openHttpConnection().responseAs>>() + .also { + val verifiedNodeInfoHashes = it.first.verifiedNetworkMapCert(trustRoot).nodeInfoHashes + val nodeInfoHashes = it.second.map { signedNodeInfo -> signedNodeInfo.verified().serialize().sha256() } + require( + verifiedNodeInfoHashes.containsAll(nodeInfoHashes) && + verifiedNodeInfoHashes.size == nodeInfoHashes.size + ) + } + .second.map { it.verified() } + logger.trace { "Fetched node infos successfully. Node Infos size: ${verifiedNodeInfo.size}" } + return verifiedNodeInfo + } + fun myPublicHostname(): String { val url = URL("$networkMapUrl/my-hostname") logger.trace { "Resolving public hostname from '$url'." } @@ -90,4 +110,4 @@ class NetworkMapClient(compatibilityZoneURL: URL, private val versionInfo: Versi } } -data class NetworkMapResponse(val payload: NetworkMap, val cacheMaxAge: Duration) +data class NetworkMapResponse(val payload: NetworkMap, val cacheMaxAge: Duration, val serverVersion: String) diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt index 7ff18232a2..712efce341 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt @@ -4,6 +4,7 @@ import com.google.common.util.concurrent.MoreExecutors import net.corda.core.CordaRuntimeException import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SignedData +import net.corda.core.crypto.sha256 import net.corda.core.internal.NetworkParametersStorage import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.copyTo @@ -65,6 +66,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, companion object { private val logger = contextLogger() private val defaultRetryInterval = 1.minutes + private const val bulkNodeInfoFetchThreshold = 50 } private val parametersUpdatesTrack = PublishSubject.create() @@ -173,17 +175,9 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, if (networkMapClient == null) { throw CordaRuntimeException("Network map cache can be updated only if network map/compatibility zone URL is specified") } - val (globalNetworkMap, cacheTimeout) = networkMapClient.getNetworkMap() + val (globalNetworkMap, cacheTimeout, version) = networkMapClient.getNetworkMap() globalNetworkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) } - val additionalHashes = extraNetworkMapKeys.flatMap { - try { - networkMapClient.getNetworkMap(it).payload.nodeInfoHashes - } catch (e: Exception) { - // Failure to retrieve one network map using UUID shouldn't stop the whole update. - logger.warn("Error encountered when downloading network map with uuid '$it', skipping...", e) - emptyList() - } - } + val additionalHashes = getPrivateNetworkNodeHashes(version) val allHashesFromNetworkMap = (globalNetworkMap.nodeInfoHashes + additionalHashes).toSet() if (currentParametersHash != globalNetworkMap.networkParameterHash) { exitOnParametersMismatch(globalNetworkMap) @@ -194,6 +188,37 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, val allNodeHashes = networkMapCache.allNodeHashes val nodeHashesToBeDeleted = (allNodeHashes - allHashesFromNetworkMap - nodeInfoWatcher.processedNodeInfoHashes) .filter { it != ourNodeInfoHash } + // enforce bulk fetch when no other nodes are known or unknown nodes count is less than threshold + if (version == "1" || (allNodeHashes.size > 1 && (allHashesFromNetworkMap - allNodeHashes).size < bulkNodeInfoFetchThreshold)) + updateNodeInfosV1(allHashesFromNetworkMap, allNodeHashes, networkMapClient) + else + updateNodeInfos(allHashesFromNetworkMap) + // NOTE: We remove nodes after any new/updates because updated nodes will have a new hash and, therefore, any + // nodes that we can actually pull out of the cache (with the old hashes) should be a truly removed node. + nodeHashesToBeDeleted.mapNotNull { networkMapCache.getNodeByHash(it) }.forEach(networkMapCache::removeNode) + + // Mark the network map cache as ready on a successful poll of the HTTP network map, even on the odd chance that + // it's empty + networkMapCache.nodeReady.set(null) + return cacheTimeout + } + + private fun updateNodeInfos(allHashesFromNetworkMap: Set) { + val networkMapDownloadStartTime = System.currentTimeMillis() + val nodeInfos = try { + networkMapClient!!.getNodeInfos() + } catch (e: Exception) { + logger.warn("Error encountered when downloading node infos", e) + emptyList() + } + (allHashesFromNetworkMap - nodeInfos.map { it.serialize().sha256() }).forEach { + logger.warn("Error encountered when downloading node info '$it', skipping...") + } + networkMapCache.addOrUpdateNodes(nodeInfos) + logger.info("Fetched: ${nodeInfos.size} using 1 bulk request in ${System.currentTimeMillis() - networkMapDownloadStartTime}ms") + } + + private fun updateNodeInfosV1(allHashesFromNetworkMap: Set, allNodeHashes: List, networkMapClient: NetworkMapClient) { //at the moment we use a blocking HTTP library - but under the covers, the OS will interleave threads waiting for IO //as HTTP GET is mostly IO bound, use more threads than CPU's //maximum threads to use = 24, as if we did not limit this on large machines it could result in 100's of concurrent requests @@ -230,14 +255,25 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, executorToUseForInsertionIntoDB.shutdown() }.getOrThrow() } - // NOTE: We remove nodes after any new/updates because updated nodes will have a new hash and, therefore, any - // nodes that we can actually pull out of the cache (with the old hashes) should be a truly removed node. - nodeHashesToBeDeleted.mapNotNull { networkMapCache.getNodeByHash(it) }.forEach(networkMapCache::removeNode) + } - // Mark the network map cache as ready on a successful poll of the HTTP network map, even on the odd chance that - // it's empty - networkMapCache.nodeReady.set(null) - return cacheTimeout + private fun getPrivateNetworkNodeHashes(version: String): List { + // private networks are not supported by latest versions of Network Map + // for compatibility reasons, this call is still present for new nodes that communicate with old Network Map service versions + // but can be omitted if we know that the version of the Network Map is recent enough + return if (version == "1") { + extraNetworkMapKeys.flatMap { + try { + networkMapClient!!.getNetworkMap(it).payload.nodeInfoHashes + } catch (e: Exception) { + // Failure to retrieve one network map using UUID shouldn't stop the whole update. + logger.warn("Error encountered when downloading network map with uuid '$it', skipping...", e) + emptyList() + } + } + } else { + emptyList() + } } private fun exitOnParametersMismatch(networkMap: NetworkMap) { diff --git a/node/src/main/kotlin/net/corda/node/utilities/registration/HTTPNetworkRegistrationService.kt b/node/src/main/kotlin/net/corda/node/utilities/registration/HTTPNetworkRegistrationService.kt index 41ea0edd25..653f6fdaf9 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/registration/HTTPNetworkRegistrationService.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/registration/HTTPNetworkRegistrationService.kt @@ -69,3 +69,8 @@ val HttpURLConnection.cacheControl: CacheControl get() { return CacheControl.parse(Headers.of(headerFields.filterKeys { it != null }.mapValues { it.value[0] })) } + +val HttpURLConnection.cordaServerVersion: String + get() { + return headerFields["X-Corda-Server-Version"]?.singleOrNull() ?: "1" + } \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapClientTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapClientTest.kt index 9fa13bf67e..ab42bd19fd 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapClientTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapClientTest.kt @@ -72,6 +72,29 @@ class NetworkMapClientTest { assertEquals(nodeInfo2, networkMapClient.getNodeInfo(nodeInfoHash2)) } + @Test(timeout=300_000) + fun `registered node is added to the network map v2`() { + server.version = "2" + val (nodeInfo, signedNodeInfo) = createNodeInfoAndSigned(ALICE_NAME) + + networkMapClient.publish(signedNodeInfo) + + val nodeInfoHash = nodeInfo.serialize().sha256() + + assertThat(networkMapClient.getNetworkMap().payload.nodeInfoHashes).containsExactly(nodeInfoHash) + assertEquals(nodeInfo, networkMapClient.getNodeInfos().single()) + + val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned(BOB_NAME) + + networkMapClient.publish(signedNodeInfo2) + + val nodeInfoHash2 = nodeInfo2.serialize().sha256() + assertThat(networkMapClient.getNetworkMap().payload.nodeInfoHashes).containsExactly(nodeInfoHash, nodeInfoHash2) + assertEquals(cacheTimeout, networkMapClient.getNetworkMap().cacheMaxAge) + assertEquals("2", networkMapClient.getNetworkMap().serverVersion) + assertThat(networkMapClient.getNodeInfos()).containsExactlyInAnyOrder(nodeInfo, nodeInfo2) + } + @Test(timeout=300_000) fun `negative test - registered invalid node is added to the network map`() { val invalidLongNodeName = CordaX500Name( diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt index d2689ce039..a406bd9be6 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt @@ -3,6 +3,7 @@ package net.corda.node.services.network import com.google.common.jimfs.Configuration.unix import com.google.common.jimfs.Jimfs import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.atLeast import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.never import com.nhaarman.mockito_kotlin.times @@ -10,6 +11,7 @@ import com.nhaarman.mockito_kotlin.verify import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash import net.corda.core.crypto.generateKeyPair +import net.corda.core.crypto.sha256 import net.corda.core.crypto.sign import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party @@ -383,6 +385,75 @@ class NetworkMapUpdaterTest { assertEquals(aliceInfo, networkMapClient.getNodeInfo(aliceHash)) } + @Test(timeout=300_000) + fun `update nodes is successful for network map supporting bulk operations but with only a few nodes requested`() { + server.version = "2" + setUpdater() + // on first update, bulk request is used + val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("info1") + val nodeInfoHash1 = nodeInfo1.serialize().sha256() + val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned("info2") + val nodeInfoHash2 = nodeInfo2.serialize().sha256() + networkMapClient.publish(signedNodeInfo1) + networkMapClient.publish(signedNodeInfo2) + + startUpdater() + + Thread.sleep(2L * cacheExpiryMs) + verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(nodeInfo1, nodeInfo2)) + assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(nodeInfoHash1, nodeInfoHash2) + + // on subsequent updates, single requests are used + val (nodeInfo3, signedNodeInfo3) = createNodeInfoAndSigned("info3") + val nodeInfoHash3 = nodeInfo3.serialize().sha256() + val (nodeInfo4, signedNodeInfo4) = createNodeInfoAndSigned("info4") + val nodeInfoHash4 = nodeInfo4.serialize().sha256() + networkMapClient.publish(signedNodeInfo3) + networkMapClient.publish(signedNodeInfo4) + + Thread.sleep(2L * cacheExpiryMs) + verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(nodeInfo3)) + verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(nodeInfo4)) + assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(nodeInfoHash1, nodeInfoHash2, nodeInfoHash3, nodeInfoHash4) + } + + @Test(timeout=300_000) + @SuppressWarnings("SpreadOperator") + fun `update nodes is successful for network map supporting bulk operations when high number of nodes is requested`() { + server.version = "2" + setUpdater() + val nodeInfos = (1..51).map { createNodeInfoAndSigned("info$it") + .also { nodeInfoAndSigned -> networkMapClient.publish(nodeInfoAndSigned.signed) } + .nodeInfo + } + val nodeInfoHashes = nodeInfos.map { it.serialize().sha256() } + + startUpdater() + Thread.sleep(2L * cacheExpiryMs) + + verify(networkMapCache, times(1)).addOrUpdateNodes(nodeInfos) + assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(*(nodeInfoHashes.toTypedArray())) + } + + @Test(timeout=300_000) + @SuppressWarnings("SpreadOperator") + fun `update nodes is successful for network map not supporting bulk operations`() { + setUpdater() + val nodeInfos = (1..51).map { createNodeInfoAndSigned("info$it") + .also { nodeInfoAndSigned -> networkMapClient.publish(nodeInfoAndSigned.signed) } + .nodeInfo + } + val nodeInfoHashes = nodeInfos.map { it.serialize().sha256() } + + startUpdater() + Thread.sleep(2L * cacheExpiryMs) + + // we can't be sure about the number of requests (and updates), as it depends on the machine and the threads created + // but if they are more than 1 it's enough to deduct that the parallel way was favored + verify(networkMapCache, atLeast(2)).addOrUpdateNodes(any()) + assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(*(nodeInfoHashes.toTypedArray())) + } + @Test(timeout=300_000) fun `remove node from filesystem deletes it from network map cache`() { setUpdater(netMapClient = null) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt index 0aa00f832c..88620bb1d7 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt @@ -49,6 +49,8 @@ class NetworkMapServer(private val pollInterval: Duration, private val service = InMemoryNetworkMapService() private var parametersUpdate: ParametersUpdate? = null private var nextNetworkParameters: NetworkParameters? = null + // version toggle allowing to easily test behaviour of different version without spinning up a whole new server + var version: String = "1" init { server = Server(InetSocketAddress(hostAndPort.host, hostAndPort.port)).apply { @@ -171,7 +173,10 @@ class NetworkMapServer(private val pollInterval: Duration, private fun networkMapResponse(nodeInfoHashes: List): Response { val networkMap = NetworkMap(nodeInfoHashes, signedNetParams.raw.hash, parametersUpdate) val signedNetworkMap = networkMapCertAndKeyPair.sign(networkMap) - return Response.ok(signedNetworkMap.serialize().bytes).header("Cache-Control", "max-age=${pollInterval.seconds}").build() + return Response.ok(signedNetworkMap.serialize().bytes) + .header("Cache-Control", "max-age=${pollInterval.seconds}") + .apply { if (version != "1") this.header("X-Corda-Server-Version", version)} + .build() } // Remove nodeInfo for testing. @@ -205,6 +210,15 @@ class NetworkMapServer(private val pollInterval: Duration, }.build() } + @GET + @Path("node-infos") + @Produces(MediaType.APPLICATION_OCTET_STREAM) + fun getNodeInfos(): Response { + val networkMap = NetworkMap(nodeInfoMap.keys.toList(), signedNetParams.raw.hash, parametersUpdate) + val signedNetworkMap = networkMapCertAndKeyPair.sign(networkMap) + return Response.ok(Pair(signedNetworkMap, nodeInfoMap.values.toList()).serialize().bytes).build() + } + @GET @Path("network-parameters/{var}") @Produces(MediaType.APPLICATION_OCTET_STREAM)