mirror of
https://github.com/corda/corda.git
synced 2025-06-17 06:38:21 +00:00
ENT-1584: Subscribe to private network maps using UUIDs (#2922)
Client private network map implementation Add private network maps UUIDs to config as extraNetworkMapKeys. Adjust NetworkMapServer implementation accordingly. Change NetworkMapUpdaterTest to use NetworkMapServer instead of mock
This commit is contained in:
committed by
GitHub
parent
02913b284e
commit
91c52af5ac
@ -278,7 +278,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
|
||||
networkMapClient,
|
||||
networkParameters.serialize().hash,
|
||||
configuration.baseDirectory)
|
||||
configuration.baseDirectory,
|
||||
configuration.extraNetworkMapKeys)
|
||||
runOnStop += networkMapUpdater::close
|
||||
|
||||
networkMapUpdater.subscribeToNetworkMap()
|
||||
|
@ -53,6 +53,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
|
||||
val attachmentCacheBound: Long get() = defaultAttachmentCacheBound
|
||||
// do not change this value without syncing it with ScheduledFlowsDrainingModeTest
|
||||
val drainingModePollPeriod: Duration get() = Duration.ofSeconds(5)
|
||||
val extraNetworkMapKeys: List<UUID>
|
||||
|
||||
fun validate(): List<String>
|
||||
|
||||
@ -144,6 +145,7 @@ data class NodeConfigurationImpl(
|
||||
private val transactionCacheSizeMegaBytes: Int? = null,
|
||||
private val attachmentContentCacheSizeMegaBytes: Int? = null,
|
||||
override val attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound,
|
||||
override val extraNetworkMapKeys: List<UUID> = emptyList(),
|
||||
// do not use or remove (breaks DemoBench together with rejection of unknown configuration keys during parsing)
|
||||
private val h2port: Int = 0,
|
||||
// do not use or remove (used by Capsule)
|
||||
|
@ -21,6 +21,7 @@ import java.io.BufferedReader
|
||||
import java.net.URL
|
||||
import java.security.cert.X509Certificate
|
||||
import java.time.Duration
|
||||
import java.util.*
|
||||
|
||||
class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certificate) {
|
||||
companion object {
|
||||
@ -43,13 +44,14 @@ class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certifica
|
||||
logger.trace { "Sent network parameters approval to $ackURL successfully." }
|
||||
}
|
||||
|
||||
fun getNetworkMap(): NetworkMapResponse {
|
||||
logger.trace { "Fetching network map update from $networkMapUrl." }
|
||||
val connection = networkMapUrl.openHttpConnection()
|
||||
fun getNetworkMap(networkMapKey: UUID? = null): NetworkMapResponse {
|
||||
val url = networkMapKey?.let { URL("$networkMapUrl/$networkMapKey") } ?: networkMapUrl
|
||||
logger.trace { "Fetching network map update from $url." }
|
||||
val connection = url.openHttpConnection()
|
||||
val signedNetworkMap = connection.responseAs<SignedNetworkMap>()
|
||||
val networkMap = signedNetworkMap.verifiedNetworkMapCert(trustedRoot)
|
||||
val timeout = connection.cacheControl.maxAgeSeconds().seconds
|
||||
logger.trace { "Fetched network map update from $networkMapUrl successfully: $networkMap" }
|
||||
logger.trace { "Fetched network map update from $url successfully: $networkMap" }
|
||||
return NetworkMapResponse(networkMap, timeout)
|
||||
}
|
||||
|
||||
|
@ -18,9 +18,11 @@ import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException
|
||||
import net.corda.nodeapi.internal.network.*
|
||||
import rx.Subscription
|
||||
import rx.subjects.PublishSubject
|
||||
import java.net.URL
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardCopyOption
|
||||
import java.time.Duration
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.system.exitProcess
|
||||
@ -29,7 +31,8 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
private val fileWatcher: NodeInfoWatcher,
|
||||
private val networkMapClient: NetworkMapClient?,
|
||||
private val currentParametersHash: SecureHash,
|
||||
private val baseDirectory: Path
|
||||
private val baseDirectory: Path,
|
||||
private val extraNetworkMapKeys: List<UUID>
|
||||
) : AutoCloseable {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
@ -76,16 +79,25 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
}
|
||||
|
||||
private fun updateNetworkMapCache(networkMapClient: NetworkMapClient): Duration {
|
||||
val (networkMap, cacheTimeout) = networkMapClient.getNetworkMap()
|
||||
networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) }
|
||||
val (globalNetworkMap, cacheTimeout) = networkMapClient.getNetworkMap()
|
||||
globalNetworkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) }
|
||||
val additionalHashes = extraNetworkMapKeys.flatMap {
|
||||
try {
|
||||
networkMapClient.getNetworkMap(it).payload.nodeInfoHashes
|
||||
} catch (e: Exception) {
|
||||
// Failure to retrieve one network map using UUID shouldn't stop the whole update.
|
||||
logger.warn("Error encountered when downloading network map with uuid '$it', skipping...", e)
|
||||
emptyList<SecureHash>()
|
||||
}
|
||||
}
|
||||
val allHashesFromNetworkMap = (globalNetworkMap.nodeInfoHashes + additionalHashes).toSet()
|
||||
|
||||
if (currentParametersHash != networkMap.networkParameterHash) {
|
||||
exitOnParametersMismatch(networkMap)
|
||||
if (currentParametersHash != globalNetworkMap.networkParameterHash) {
|
||||
exitOnParametersMismatch(globalNetworkMap)
|
||||
}
|
||||
|
||||
val currentNodeHashes = networkMapCache.allNodeHashes
|
||||
val hashesFromNetworkMap = networkMap.nodeInfoHashes
|
||||
(hashesFromNetworkMap - currentNodeHashes).mapNotNull {
|
||||
(allHashesFromNetworkMap - currentNodeHashes).mapNotNull {
|
||||
// Download new node info from network map
|
||||
try {
|
||||
networkMapClient.getNodeInfo(it)
|
||||
@ -100,7 +112,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
}
|
||||
|
||||
// Remove node info from network map.
|
||||
(currentNodeHashes - hashesFromNetworkMap - fileWatcher.processedNodeInfoHashes)
|
||||
(currentNodeHashes - allHashesFromNetworkMap - fileWatcher.processedNodeInfoHashes)
|
||||
.mapNotNull(networkMapCache::getNodeByHash)
|
||||
.forEach(networkMapCache::removeNode)
|
||||
|
||||
|
@ -25,6 +25,7 @@ import java.io.IOException
|
||||
import java.net.URL
|
||||
import java.time.Instant
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NetworkMapClientTest {
|
||||
@ -84,11 +85,11 @@ class NetworkMapClientTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `download NetworkParameter correctly`() {
|
||||
fun `download NetworkParameters correctly`() {
|
||||
// The test server returns same network parameter for any hash.
|
||||
val parametersHash = server.networkParameters.serialize().hash
|
||||
val networkParameter = networkMapClient.getNetworkParameters(parametersHash).verified()
|
||||
assertEquals(server.networkParameters, networkParameter)
|
||||
val networkParameters = networkMapClient.getNetworkParameters(parametersHash).verified()
|
||||
assertEquals(server.networkParameters, networkParameters)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -2,41 +2,37 @@ package net.corda.node.services.network
|
||||
|
||||
import com.google.common.jimfs.Configuration.unix
|
||||
import com.google.common.jimfs.Jimfs
|
||||
import com.nhaarman.mockito_kotlin.any
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import com.nhaarman.mockito_kotlin.times
|
||||
import com.nhaarman.mockito_kotlin.verify
|
||||
import com.nhaarman.mockito_kotlin.*
|
||||
import net.corda.cordform.CordformNode.NODE_INFO_DIRECTORY
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.messaging.ParametersUpdateInfo
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.millis
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.createDevNetworkMapCa
|
||||
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
|
||||
import net.corda.nodeapi.internal.network.*
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.expect
|
||||
import net.corda.testing.core.expectEvents
|
||||
import net.corda.testing.core.sequence
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.internal.DEV_ROOT_CA
|
||||
import net.corda.testing.internal.createNodeInfoAndSigned
|
||||
import net.corda.testing.node.internal.network.NetworkMapServer
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import rx.schedulers.TestScheduler
|
||||
import java.io.IOException
|
||||
import java.net.URL
|
||||
import java.time.Instant
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.test.assertEquals
|
||||
@ -46,24 +42,30 @@ class NetworkMapUpdaterTest {
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule(true)
|
||||
|
||||
private val cacheExpiryMs = 1000
|
||||
private val privateNetUUID = UUID.randomUUID()
|
||||
private val fs = Jimfs.newFileSystem(unix())
|
||||
private val baseDir = fs.getPath("/node")
|
||||
private val networkMapCache = createMockNetworkMapCache()
|
||||
private val nodeInfoMap = ConcurrentHashMap<SecureHash, SignedNodeInfo>()
|
||||
private val networkParamsMap = HashMap<SecureHash, NetworkParameters>()
|
||||
private val networkMapCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa()
|
||||
private val cacheExpiryMs = 100
|
||||
private val networkMapClient = createMockNetworkMapClient()
|
||||
private val scheduler = TestScheduler()
|
||||
private val networkParametersHash = SecureHash.randomSHA256()
|
||||
private val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
private val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, networkParametersHash, baseDir)
|
||||
private var parametersUpdate: ParametersUpdate? = null
|
||||
private val networkMapCache = createMockNetworkMapCache()
|
||||
private lateinit var server: NetworkMapServer
|
||||
private lateinit var networkMapClient: NetworkMapClient
|
||||
private lateinit var updater: NetworkMapUpdater
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
server = NetworkMapServer(cacheExpiryMs.millis, PortAllocation.Incremental(10000).nextHostAndPort())
|
||||
val hostAndPort = server.start()
|
||||
networkMapClient = NetworkMapClient(URL("http://${hostAndPort.host}:${hostAndPort.port}"), DEV_ROOT_CA.certificate)
|
||||
updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, server.networkParameters.serialize().hash, baseDir, listOf(privateNetUUID))
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
updater.close()
|
||||
fs.close()
|
||||
server.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -91,11 +93,9 @@ class NetworkMapUpdaterTest {
|
||||
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, fileNodeInfoAndSigned)
|
||||
networkMapClient.publish(signedNodeInfo3)
|
||||
networkMapClient.publish(signedNodeInfo4)
|
||||
|
||||
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
|
||||
// 4 node info from network map, and 1 from file.
|
||||
verify(networkMapCache, times(5)).addNode(any())
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo3)
|
||||
@ -124,12 +124,13 @@ class NetworkMapUpdaterTest {
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
|
||||
// 4 node info from network map, and 1 from file.
|
||||
assertThat(nodeInfoMap).hasSize(4)
|
||||
verify(networkMapCache, times(5)).addNode(any())
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo)
|
||||
|
||||
// Test remove node.
|
||||
nodeInfoMap.clear()
|
||||
listOf(nodeInfo1, nodeInfo2, nodeInfo3, nodeInfo4).forEach {
|
||||
server.removeNodeInfo(it)
|
||||
}
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
verify(networkMapCache, times(4)).removeNode(any())
|
||||
@ -168,7 +169,7 @@ class NetworkMapUpdaterTest {
|
||||
assertEquals(null, snapshot)
|
||||
val newParameters = testNetworkParameters(epoch = 2)
|
||||
val updateDeadline = Instant.now().plus(1, ChronoUnit.DAYS)
|
||||
scheduleParametersUpdate(newParameters, "Test update", updateDeadline)
|
||||
server.scheduleParametersUpdate(newParameters, "Test update", updateDeadline)
|
||||
updater.subscribeToNetworkMap()
|
||||
updates.expectEvents(isStrict = false) {
|
||||
sequence(
|
||||
@ -185,49 +186,33 @@ class NetworkMapUpdaterTest {
|
||||
@Test
|
||||
fun `ack network parameters update`() {
|
||||
val newParameters = testNetworkParameters(epoch = 314)
|
||||
scheduleParametersUpdate(newParameters, "Test update", Instant.MIN)
|
||||
server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN)
|
||||
updater.subscribeToNetworkMap()
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
val newHash = newParameters.serialize().hash
|
||||
val keyPair = Crypto.generateKeyPair()
|
||||
updater.acceptNewNetworkParameters(newHash, { hash -> hash.serialize().sign(keyPair)})
|
||||
verify(networkMapClient).ackNetworkParametersUpdate(any())
|
||||
val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
val signedNetworkParams = updateFile.readObject<SignedNetworkParameters>()
|
||||
val paramsFromFile = signedNetworkParams.verifiedNetworkMapCert(DEV_ROOT_CA.certificate)
|
||||
assertEquals(newParameters, paramsFromFile)
|
||||
}
|
||||
|
||||
private fun scheduleParametersUpdate(nextParameters: NetworkParameters, description: String, updateDeadline: Instant) {
|
||||
val nextParamsHash = nextParameters.serialize().hash
|
||||
networkParamsMap[nextParamsHash] = nextParameters
|
||||
parametersUpdate = ParametersUpdate(nextParamsHash, description, updateDeadline)
|
||||
}
|
||||
|
||||
private fun createMockNetworkMapClient(): NetworkMapClient {
|
||||
return mock {
|
||||
on { trustedRoot }.then {
|
||||
DEV_ROOT_CA.certificate
|
||||
}
|
||||
on { publish(any()) }.then {
|
||||
val signedNodeInfo: SignedNodeInfo = uncheckedCast(it.arguments[0])
|
||||
nodeInfoMap.put(signedNodeInfo.verified().serialize().hash, signedNodeInfo)
|
||||
}
|
||||
on { getNetworkMap() }.then {
|
||||
NetworkMapResponse(NetworkMap(nodeInfoMap.keys.toList(), networkParametersHash, parametersUpdate), cacheExpiryMs.millis)
|
||||
}
|
||||
on { getNodeInfo(any()) }.then {
|
||||
nodeInfoMap[it.arguments[0]]?.verified()
|
||||
}
|
||||
on { getNetworkParameters(any()) }.then {
|
||||
val paramsHash: SecureHash = uncheckedCast(it.arguments[0])
|
||||
networkParamsMap[paramsHash]?.let { networkMapCertAndKeyPair.sign(it) }
|
||||
}
|
||||
on { ackNetworkParametersUpdate(any()) }.then {
|
||||
Unit
|
||||
}
|
||||
}
|
||||
@Test
|
||||
fun `fetch nodes from private network`() {
|
||||
server.addNodesToPrivateNetwork(privateNetUUID, listOf(ALICE_NAME))
|
||||
Assertions.assertThatThrownBy { networkMapClient.getNetworkMap(privateNetUUID).payload.nodeInfoHashes }
|
||||
.isInstanceOf(IOException::class.java)
|
||||
.hasMessageContaining("Response Code 404")
|
||||
val (aliceInfo, signedAliceInfo) = createNodeInfoAndSigned(ALICE_NAME) // Goes to private network map
|
||||
val aliceHash = aliceInfo.serialize().hash
|
||||
val (bobInfo, signedBobInfo) = createNodeInfoAndSigned(BOB_NAME) // Goes to global network map
|
||||
networkMapClient.publish(signedAliceInfo)
|
||||
networkMapClient.publish(signedBobInfo)
|
||||
assertThat(networkMapClient.getNetworkMap().payload.nodeInfoHashes).containsExactly(bobInfo.serialize().hash)
|
||||
assertThat(networkMapClient.getNetworkMap(privateNetUUID).payload.nodeInfoHashes).containsExactly(aliceHash)
|
||||
assertEquals(aliceInfo, networkMapClient.getNodeInfo(aliceHash))
|
||||
}
|
||||
|
||||
private fun createMockNetworkMapCache(): NetworkMapCacheInternal {
|
||||
|
Reference in New Issue
Block a user