CORDA-3844: bulk node infos request (#6411)

* CORDA-3844: Add new functions to network map client

* CORDA-3844: Apply new fetch logic to nm updater

* CORDA-3844: Fix base url and warnings

* CORDA-3844: Change response object and response validation

In order to make sure that the returned node infos are not maliciously modified, either a signed list response
or a signed reference object would need to be provided. As providing a signed list requires a lot of effort from NM and Signer services,
the signed network map is provided instead, allowing nodes to validate that the list provided conforms to the entries of the signed network map.

* CORDA-3844: Add clarifications and comments

* CORDA-3844: Add error handling for bulk request

* CORDA-3844: Enhance testing

* CORDA-3844: Fix detekt issues

* EG-3844: Apply pr suggestions
This commit is contained in:
Dimitris Gounaris 2020-07-28 17:02:53 +03:00 committed by GitHub
parent f2336f397d
commit 1e6be340eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 189 additions and 20 deletions

View File

@ -2,6 +2,7 @@ package net.corda.node.services.network
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData
import net.corda.core.crypto.sha256
import net.corda.core.internal.openHttpConnection
import net.corda.core.internal.post
import net.corda.core.internal.responseAs
@ -13,6 +14,7 @@ import net.corda.core.utilities.seconds
import net.corda.core.utilities.trace
import net.corda.node.VersionInfo
import net.corda.node.utilities.registration.cacheControl
import net.corda.node.utilities.registration.cordaServerVersion
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NetworkMap
import net.corda.nodeapi.internal.network.SignedNetworkMap
@ -61,8 +63,9 @@ class NetworkMapClient(compatibilityZoneURL: URL, private val versionInfo: Versi
val signedNetworkMap = connection.responseAs<SignedNetworkMap>()
val networkMap = signedNetworkMap.verifiedNetworkMapCert(trustRoot)
val timeout = connection.cacheControl.maxAgeSeconds().seconds
val version = connection.cordaServerVersion
logger.trace { "Fetched network map update from $url successfully: $networkMap" }
return NetworkMapResponse(networkMap, timeout)
return NetworkMapResponse(networkMap, timeout, version)
}
fun getNodeInfo(nodeInfoHash: SecureHash): NodeInfo {
@ -81,6 +84,23 @@ class NetworkMapClient(compatibilityZoneURL: URL, private val versionInfo: Versi
return networkParameter
}
fun getNodeInfos(): List<NodeInfo> {
val url = URL("$networkMapUrl/node-infos")
logger.trace { "Fetching node infos from $url." }
val verifiedNodeInfo = url.openHttpConnection().responseAs<Pair<SignedNetworkMap, List<SignedNodeInfo>>>()
.also {
val verifiedNodeInfoHashes = it.first.verifiedNetworkMapCert(trustRoot).nodeInfoHashes
val nodeInfoHashes = it.second.map { signedNodeInfo -> signedNodeInfo.verified().serialize().sha256() }
require(
verifiedNodeInfoHashes.containsAll(nodeInfoHashes) &&
verifiedNodeInfoHashes.size == nodeInfoHashes.size
)
}
.second.map { it.verified() }
logger.trace { "Fetched node infos successfully. Node Infos size: ${verifiedNodeInfo.size}" }
return verifiedNodeInfo
}
fun myPublicHostname(): String {
val url = URL("$networkMapUrl/my-hostname")
logger.trace { "Resolving public hostname from '$url'." }
@ -90,4 +110,4 @@ class NetworkMapClient(compatibilityZoneURL: URL, private val versionInfo: Versi
}
}
data class NetworkMapResponse(val payload: NetworkMap, val cacheMaxAge: Duration)
data class NetworkMapResponse(val payload: NetworkMap, val cacheMaxAge: Duration, val serverVersion: String)

View File

@ -4,6 +4,7 @@ import com.google.common.util.concurrent.MoreExecutors
import net.corda.core.CordaRuntimeException
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData
import net.corda.core.crypto.sha256
import net.corda.core.internal.NetworkParametersStorage
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.copyTo
@ -65,6 +66,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
companion object {
private val logger = contextLogger()
private val defaultRetryInterval = 1.minutes
private const val bulkNodeInfoFetchThreshold = 50
}
private val parametersUpdatesTrack = PublishSubject.create<ParametersUpdateInfo>()
@ -173,17 +175,9 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
if (networkMapClient == null) {
throw CordaRuntimeException("Network map cache can be updated only if network map/compatibility zone URL is specified")
}
val (globalNetworkMap, cacheTimeout) = networkMapClient.getNetworkMap()
val (globalNetworkMap, cacheTimeout, version) = networkMapClient.getNetworkMap()
globalNetworkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) }
val additionalHashes = extraNetworkMapKeys.flatMap {
try {
networkMapClient.getNetworkMap(it).payload.nodeInfoHashes
} catch (e: Exception) {
// Failure to retrieve one network map using UUID shouldn't stop the whole update.
logger.warn("Error encountered when downloading network map with uuid '$it', skipping...", e)
emptyList<SecureHash>()
}
}
val additionalHashes = getPrivateNetworkNodeHashes(version)
val allHashesFromNetworkMap = (globalNetworkMap.nodeInfoHashes + additionalHashes).toSet()
if (currentParametersHash != globalNetworkMap.networkParameterHash) {
exitOnParametersMismatch(globalNetworkMap)
@ -194,6 +188,37 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
val allNodeHashes = networkMapCache.allNodeHashes
val nodeHashesToBeDeleted = (allNodeHashes - allHashesFromNetworkMap - nodeInfoWatcher.processedNodeInfoHashes)
.filter { it != ourNodeInfoHash }
// enforce bulk fetch when no other nodes are known or unknown nodes count is less than threshold
if (version == "1" || (allNodeHashes.size > 1 && (allHashesFromNetworkMap - allNodeHashes).size < bulkNodeInfoFetchThreshold))
updateNodeInfosV1(allHashesFromNetworkMap, allNodeHashes, networkMapClient)
else
updateNodeInfos(allHashesFromNetworkMap)
// NOTE: We remove nodes after any new/updates because updated nodes will have a new hash and, therefore, any
// nodes that we can actually pull out of the cache (with the old hashes) should be a truly removed node.
nodeHashesToBeDeleted.mapNotNull { networkMapCache.getNodeByHash(it) }.forEach(networkMapCache::removeNode)
// Mark the network map cache as ready on a successful poll of the HTTP network map, even on the odd chance that
// it's empty
networkMapCache.nodeReady.set(null)
return cacheTimeout
}
private fun updateNodeInfos(allHashesFromNetworkMap: Set<SecureHash>) {
val networkMapDownloadStartTime = System.currentTimeMillis()
val nodeInfos = try {
networkMapClient!!.getNodeInfos()
} catch (e: Exception) {
logger.warn("Error encountered when downloading node infos", e)
emptyList<NodeInfo>()
}
(allHashesFromNetworkMap - nodeInfos.map { it.serialize().sha256() }).forEach {
logger.warn("Error encountered when downloading node info '$it', skipping...")
}
networkMapCache.addOrUpdateNodes(nodeInfos)
logger.info("Fetched: ${nodeInfos.size} using 1 bulk request in ${System.currentTimeMillis() - networkMapDownloadStartTime}ms")
}
private fun updateNodeInfosV1(allHashesFromNetworkMap: Set<SecureHash>, allNodeHashes: List<SecureHash>, networkMapClient: NetworkMapClient) {
//at the moment we use a blocking HTTP library - but under the covers, the OS will interleave threads waiting for IO
//as HTTP GET is mostly IO bound, use more threads than CPU's
//maximum threads to use = 24, as if we did not limit this on large machines it could result in 100's of concurrent requests
@ -230,14 +255,25 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
executorToUseForInsertionIntoDB.shutdown()
}.getOrThrow()
}
// NOTE: We remove nodes after any new/updates because updated nodes will have a new hash and, therefore, any
// nodes that we can actually pull out of the cache (with the old hashes) should be a truly removed node.
nodeHashesToBeDeleted.mapNotNull { networkMapCache.getNodeByHash(it) }.forEach(networkMapCache::removeNode)
}
// Mark the network map cache as ready on a successful poll of the HTTP network map, even on the odd chance that
// it's empty
networkMapCache.nodeReady.set(null)
return cacheTimeout
private fun getPrivateNetworkNodeHashes(version: String): List<SecureHash> {
// private networks are not supported by latest versions of Network Map
// for compatibility reasons, this call is still present for new nodes that communicate with old Network Map service versions
// but can be omitted if we know that the version of the Network Map is recent enough
return if (version == "1") {
extraNetworkMapKeys.flatMap {
try {
networkMapClient!!.getNetworkMap(it).payload.nodeInfoHashes
} catch (e: Exception) {
// Failure to retrieve one network map using UUID shouldn't stop the whole update.
logger.warn("Error encountered when downloading network map with uuid '$it', skipping...", e)
emptyList<SecureHash>()
}
}
} else {
emptyList()
}
}
private fun exitOnParametersMismatch(networkMap: NetworkMap) {

View File

@ -69,3 +69,8 @@ val HttpURLConnection.cacheControl: CacheControl
get() {
return CacheControl.parse(Headers.of(headerFields.filterKeys { it != null }.mapValues { it.value[0] }))
}
val HttpURLConnection.cordaServerVersion: String
get() {
return headerFields["X-Corda-Server-Version"]?.singleOrNull() ?: "1"
}

View File

@ -72,6 +72,29 @@ class NetworkMapClientTest {
assertEquals(nodeInfo2, networkMapClient.getNodeInfo(nodeInfoHash2))
}
@Test(timeout=300_000)
fun `registered node is added to the network map v2`() {
server.version = "2"
val (nodeInfo, signedNodeInfo) = createNodeInfoAndSigned(ALICE_NAME)
networkMapClient.publish(signedNodeInfo)
val nodeInfoHash = nodeInfo.serialize().sha256()
assertThat(networkMapClient.getNetworkMap().payload.nodeInfoHashes).containsExactly(nodeInfoHash)
assertEquals(nodeInfo, networkMapClient.getNodeInfos().single())
val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned(BOB_NAME)
networkMapClient.publish(signedNodeInfo2)
val nodeInfoHash2 = nodeInfo2.serialize().sha256()
assertThat(networkMapClient.getNetworkMap().payload.nodeInfoHashes).containsExactly(nodeInfoHash, nodeInfoHash2)
assertEquals(cacheTimeout, networkMapClient.getNetworkMap().cacheMaxAge)
assertEquals("2", networkMapClient.getNetworkMap().serverVersion)
assertThat(networkMapClient.getNodeInfos()).containsExactlyInAnyOrder(nodeInfo, nodeInfo2)
}
@Test(timeout=300_000)
fun `negative test - registered invalid node is added to the network map`() {
val invalidLongNodeName = CordaX500Name(

View File

@ -3,6 +3,7 @@ package net.corda.node.services.network
import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.atLeast
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.never
import com.nhaarman.mockito_kotlin.times
@ -10,6 +11,7 @@ import com.nhaarman.mockito_kotlin.verify
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.generateKeyPair
import net.corda.core.crypto.sha256
import net.corda.core.crypto.sign
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
@ -383,6 +385,75 @@ class NetworkMapUpdaterTest {
assertEquals(aliceInfo, networkMapClient.getNodeInfo(aliceHash))
}
@Test(timeout=300_000)
fun `update nodes is successful for network map supporting bulk operations but with only a few nodes requested`() {
server.version = "2"
setUpdater()
// on first update, bulk request is used
val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("info1")
val nodeInfoHash1 = nodeInfo1.serialize().sha256()
val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned("info2")
val nodeInfoHash2 = nodeInfo2.serialize().sha256()
networkMapClient.publish(signedNodeInfo1)
networkMapClient.publish(signedNodeInfo2)
startUpdater()
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(nodeInfo1, nodeInfo2))
assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(nodeInfoHash1, nodeInfoHash2)
// on subsequent updates, single requests are used
val (nodeInfo3, signedNodeInfo3) = createNodeInfoAndSigned("info3")
val nodeInfoHash3 = nodeInfo3.serialize().sha256()
val (nodeInfo4, signedNodeInfo4) = createNodeInfoAndSigned("info4")
val nodeInfoHash4 = nodeInfo4.serialize().sha256()
networkMapClient.publish(signedNodeInfo3)
networkMapClient.publish(signedNodeInfo4)
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(nodeInfo3))
verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(nodeInfo4))
assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(nodeInfoHash1, nodeInfoHash2, nodeInfoHash3, nodeInfoHash4)
}
@Test(timeout=300_000)
@SuppressWarnings("SpreadOperator")
fun `update nodes is successful for network map supporting bulk operations when high number of nodes is requested`() {
server.version = "2"
setUpdater()
val nodeInfos = (1..51).map { createNodeInfoAndSigned("info$it")
.also { nodeInfoAndSigned -> networkMapClient.publish(nodeInfoAndSigned.signed) }
.nodeInfo
}
val nodeInfoHashes = nodeInfos.map { it.serialize().sha256() }
startUpdater()
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapCache, times(1)).addOrUpdateNodes(nodeInfos)
assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(*(nodeInfoHashes.toTypedArray()))
}
@Test(timeout=300_000)
@SuppressWarnings("SpreadOperator")
fun `update nodes is successful for network map not supporting bulk operations`() {
setUpdater()
val nodeInfos = (1..51).map { createNodeInfoAndSigned("info$it")
.also { nodeInfoAndSigned -> networkMapClient.publish(nodeInfoAndSigned.signed) }
.nodeInfo
}
val nodeInfoHashes = nodeInfos.map { it.serialize().sha256() }
startUpdater()
Thread.sleep(2L * cacheExpiryMs)
// we can't be sure about the number of requests (and updates), as it depends on the machine and the threads created
// but if they are more than 1 it's enough to deduct that the parallel way was favored
verify(networkMapCache, atLeast(2)).addOrUpdateNodes(any())
assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(*(nodeInfoHashes.toTypedArray()))
}
@Test(timeout=300_000)
fun `remove node from filesystem deletes it from network map cache`() {
setUpdater(netMapClient = null)

View File

@ -49,6 +49,8 @@ class NetworkMapServer(private val pollInterval: Duration,
private val service = InMemoryNetworkMapService()
private var parametersUpdate: ParametersUpdate? = null
private var nextNetworkParameters: NetworkParameters? = null
// version toggle allowing to easily test behaviour of different version without spinning up a whole new server
var version: String = "1"
init {
server = Server(InetSocketAddress(hostAndPort.host, hostAndPort.port)).apply {
@ -171,7 +173,10 @@ class NetworkMapServer(private val pollInterval: Duration,
private fun networkMapResponse(nodeInfoHashes: List<SecureHash>): Response {
val networkMap = NetworkMap(nodeInfoHashes, signedNetParams.raw.hash, parametersUpdate)
val signedNetworkMap = networkMapCertAndKeyPair.sign(networkMap)
return Response.ok(signedNetworkMap.serialize().bytes).header("Cache-Control", "max-age=${pollInterval.seconds}").build()
return Response.ok(signedNetworkMap.serialize().bytes)
.header("Cache-Control", "max-age=${pollInterval.seconds}")
.apply { if (version != "1") this.header("X-Corda-Server-Version", version)}
.build()
}
// Remove nodeInfo for testing.
@ -205,6 +210,15 @@ class NetworkMapServer(private val pollInterval: Duration,
}.build()
}
@GET
@Path("node-infos")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
fun getNodeInfos(): Response {
val networkMap = NetworkMap(nodeInfoMap.keys.toList(), signedNetParams.raw.hash, parametersUpdate)
val signedNetworkMap = networkMapCertAndKeyPair.sign(networkMap)
return Response.ok(Pair(signedNetworkMap, nodeInfoMap.values.toList()).serialize().bytes).build()
}
@GET
@Path("network-parameters/{var}")
@Produces(MediaType.APPLICATION_OCTET_STREAM)