CORDA-3055 - Parallel node info download (#5097)

* parallelize download of nodeInfos

* actually call new list based addNodes method

* address review comments
fix NetworkMapUpdaterTest

* ensure threadpools are shutdown after network-map download is completed

* use NamedThreadFactory instead of re-implementing it.

* fix imports after rebase

* address review comments

* remove extra whitespace
This commit is contained in:
Stefano Franz 2019-08-02 12:54:18 +00:00 committed by Rick Parker
parent 7f3eca44dd
commit fa75711647
4 changed files with 191 additions and 142 deletions

View File

@ -43,6 +43,8 @@ interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBase {
/** Adds a node to the local cache (generally only used for adding ourselves). */
fun addNode(node: NodeInfo)
fun addNodes(nodes: List<NodeInfo>)
/** Removes a node from the local cache. */
fun removeNode(node: NodeInfo)
}

View File

@ -9,9 +9,11 @@ import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.node.AutoAcceptable
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.NetworkParameterAcceptanceSettings
@ -21,15 +23,21 @@ import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.*
import rx.Subscription
import rx.subjects.PublishSubject
import java.lang.Integer.max
import java.lang.Integer.min
import java.lang.reflect.Method
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import java.util.function.Consumer
import java.util.function.Supplier
import kotlin.reflect.KProperty1
import kotlin.reflect.full.declaredMemberProperties
import kotlin.reflect.full.findAnnotation
@ -154,10 +162,8 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
if (networkMapClient == null) {
throw CordaRuntimeException("Network map cache can be updated only if network map/compatibility zone URL is specified")
}
val (globalNetworkMap, cacheTimeout) = networkMapClient.getNetworkMap()
globalNetworkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) }
val additionalHashes = extraNetworkMapKeys.flatMap {
try {
networkMapClient.getNetworkMap(it).payload.nodeInfoHashes
@ -167,38 +173,54 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
emptyList<SecureHash>()
}
}
val allHashesFromNetworkMap = (globalNetworkMap.nodeInfoHashes + additionalHashes).toSet()
if (currentParametersHash != globalNetworkMap.networkParameterHash) {
exitOnParametersMismatch(globalNetworkMap)
}
val currentNodeHashes = networkMapCache.allNodeHashes
// Remove node info from network map.
(currentNodeHashes - allHashesFromNetworkMap - nodeInfoWatcher.processedNodeInfoHashes)
.mapNotNull { if (it != ourNodeInfoHash) networkMapCache.getNodeByHash(it) else null }
.forEach(networkMapCache::removeNode)
(allHashesFromNetworkMap - currentNodeHashes).mapNotNull {
// Download new node info from network map
try {
networkMapClient.getNodeInfo(it)
} catch (e: Exception) {
// Failure to retrieve one node info shouldn't stop the whole update, log and return null instead.
logger.warn("Error encountered when downloading node info '$it', skipping...", e)
null
}
}.forEach {
// Add new node info to the network map cache, these could be new node info or modification of node info for existing nodes.
networkMapCache.addNode(it)
//at the moment we use a blocking HTTP library - but under the covers, the OS will interleave threads waiting for IO
//as HTTP GET is mostly IO bound, use more threads than CPU's
//maximum threads to use = 24, as if we did not limit this on large machines it could result in 100's of concurrent requests
val threadsToUseForNetworkMapDownload = min(Runtime.getRuntime().availableProcessors() * 4, 24)
val executorToUseForDownloadingNodeInfos = Executors.newFixedThreadPool(threadsToUseForNetworkMapDownload, NamedThreadFactory("NetworkMapUpdaterNodeInfoDownloadThread"))
//DB insert is single threaded - use a single threaded executor for it.
val executorToUseForInsertionIntoDB = Executors.newSingleThreadExecutor(NamedThreadFactory("NetworkMapUpdateDBInsertThread"))
val hashesToFetch = (allHashesFromNetworkMap - currentNodeHashes)
val networkMapDownloadStartTime = System.currentTimeMillis()
if (hashesToFetch.isNotEmpty()) {
val networkMapDownloadFutures = hashesToFetch.chunked(max(hashesToFetch.size / threadsToUseForNetworkMapDownload, 1))
.map { nodeInfosToGet ->
//for a set of chunked hashes, get the nodeInfo for each hash
CompletableFuture.supplyAsync(Supplier<List<NodeInfo>> {
nodeInfosToGet.mapNotNull { nodeInfo ->
try {
networkMapClient.getNodeInfo(nodeInfo)
} catch (e: Exception) {
// Failure to retrieve one node info shouldn't stop the whole update, log and return null instead.
logger.warn("Error encountered when downloading node info '$nodeInfo', skipping...", e)
null
}
}
}, executorToUseForDownloadingNodeInfos).thenAcceptAsync(Consumer { retrievedNodeInfos ->
// Add new node info to the network map cache, these could be new node info or modification of node info for existing nodes.
networkMapCache.addNodes(retrievedNodeInfos)
}, executorToUseForInsertionIntoDB)
}.toTypedArray()
//wait for all the futures to complete
val waitForAllHashes = CompletableFuture.allOf(*networkMapDownloadFutures)
waitForAllHashes.thenRunAsync {
logger.info("Fetched: ${hashesToFetch.size} using $threadsToUseForNetworkMapDownload Threads in ${System.currentTimeMillis() - networkMapDownloadStartTime}ms")
executorToUseForDownloadingNodeInfos.shutdown()
executorToUseForInsertionIntoDB.shutdown()
}.getOrThrow()
}
// Mark the network map cache as ready on a successful poll of the HTTP network map, even on the odd chance that
// it's empty
networkMapCache.nodeReady.set(null)
return cacheTimeout
}
@ -247,7 +269,8 @@ The node will shutdown now.""")
}
fun acceptNewNetworkParameters(parametersHash: SecureHash, sign: (SecureHash) -> SignedData<SecureHash>) {
networkMapClient ?: throw IllegalStateException("Network parameters updates are not supported without compatibility zone configured")
networkMapClient
?: throw IllegalStateException("Network parameters updates are not supported without compatibility zone configured")
// TODO This scenario will happen if node was restarted and didn't download parameters yet, but we accepted them.
// Add persisting of newest parameters from update.
val (update, signedNewNetParams) = requireNotNull(newNetworkParameters) { "Couldn't find parameters update for the hash: $parametersHash" }

View File

@ -40,6 +40,8 @@ import javax.annotation.concurrent.ThreadSafe
open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
private val database: CordaPersistence,
private val identityService: IdentityService) : NetworkMapCacheInternal, SingletonSerializeAsToken() {
companion object {
private val logger = contextLogger()
}
@ -157,32 +159,52 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
}
}
override fun addNode(node: NodeInfo) {
logger.info("Adding node with info: $node")
override fun addNodes(nodes: List<NodeInfo>) {
synchronized(_changed) {
val previousNode = getNodesByLegalIdentityKey(node.legalIdentities.first().owningKey).firstOrNull()
if (previousNode == null) {
logger.info("No previous node found")
if (!verifyAndRegisterIdentities(node)) return
database.transaction {
updateInfoDB(node, session)
changePublisher.onNext(MapChange.Added(node))
}
} else if (previousNode.serial > node.serial) {
logger.info("Discarding older nodeInfo for ${node.legalIdentities.first().name}")
return
} else if (previousNode != node) {
logger.info("Previous node was found as: $previousNode")
// TODO We should be adding any new identities as well
if (!verifyIdentities(node)) return
database.transaction {
val newNodes = mutableListOf<NodeInfo>()
val updatedNodes = mutableListOf<Pair<NodeInfo, NodeInfo>>()
nodes.map { it to getNodesByLegalIdentityKey(it.legalIdentities.first().owningKey).firstOrNull() }
.forEach { (node, previousNode) ->
when {
previousNode == null -> {
logger.info("No previous node found for ${node.legalIdentities.first().name}")
if (verifyAndRegisterIdentities(node)) {
newNodes.add(node)
}
}
previousNode.serial > node.serial -> {
logger.info("Discarding older nodeInfo for ${node.legalIdentities.first().name}")
}
previousNode != node -> {
logger.info("Previous node was found for ${node.legalIdentities.first().name} as: $previousNode")
// TODO We should be adding any new identities as well
if (verifyIdentities(node)) {
updatedNodes.add(node to previousNode)
}
}
else -> logger.info("Previous node was identical to incoming one - doing nothing")
}
}
database.transaction {
updatedNodes.forEach { (node, previousNode) ->
//updated
updateInfoDB(node, session)
changePublisher.onNext(MapChange.Modified(node, previousNode))
}
} else {
logger.info("Previous node was identical to incoming one - doing nothing")
newNodes.forEach { node ->
//new
updateInfoDB(node, session)
changePublisher.onNext(MapChange.Added(node))
}
}
}
}
override fun addNode(node: NodeInfo) {
logger.info("Adding node with info: $node")
addNodes(listOf(node))
logger.debug { "Done adding node with info: $node" }
}
@ -218,11 +240,12 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
logger.debug { "Done removing node with info: $node" }
}
override val allNodes: List<NodeInfo> get() {
return database.transaction {
getAllNodeInfos(session).map { it.toNodeInfo() }
override val allNodes: List<NodeInfo>
get() {
return database.transaction {
getAllNodeInfos(session).map { it.toNodeInfo() }
}
}
}
private fun getAllNodeInfos(session: Session): List<NodeInfoSchemaV1.PersistentNodeInfo> {
val criteria = session.criteriaBuilder.createQuery(NodeInfoSchemaV1.PersistentNodeInfo::class.java)

View File

@ -27,7 +27,6 @@ import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.nodeapi.internal.network.verifiedNetworkParametersCert
import net.corda.testing.common.internal.eventually
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.*
import net.corda.testing.internal.DEV_ROOT_CA
@ -38,16 +37,12 @@ import net.corda.testing.node.internal.network.NetworkMapServer
import net.corda.testing.node.makeTestIdentityService
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
import org.junit.Assert.fail
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.hamcrest.collection.IsIterableContainingInAnyOrder
import org.junit.*
import rx.schedulers.TestScheduler
import java.io.IOException
import java.net.URL
import java.security.KeyPair
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.*
@ -104,11 +99,11 @@ class NetworkMapUpdaterTest {
autoAcceptNetworkParameters: Boolean = true,
excludedAutoAcceptNetworkParameters: Set<String> = emptySet()) {
updater!!.start(DEV_ROOT_CA.certificate,
server.networkParameters.serialize().hash,
ourNodeInfo,
networkParameters,
MockKeyManagementService(makeTestIdentityService(), ourKeyPair),
NetworkParameterAcceptanceSettings(autoAcceptNetworkParameters, excludedAutoAcceptNetworkParameters))
server.networkParameters.serialize().hash,
ourNodeInfo,
networkParameters,
MockKeyManagementService(makeTestIdentityService(), ourKeyPair),
NetworkParameterAcceptanceSettings(autoAcceptNetworkParameters, excludedAutoAcceptNetworkParameters))
}
@Test
@ -120,31 +115,37 @@ class NetworkMapUpdaterTest {
val (nodeInfo4, signedNodeInfo4) = createNodeInfoAndSigned("Info 4")
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
// Test adding new node.
//Test adding new node.
networkMapClient.publish(signedNodeInfo1)
// Not subscribed yet.
//Not subscribed yet.
verify(networkMapCache, times(0)).addNode(any())
startUpdater()
networkMapClient.publish(signedNodeInfo2)
assertThat(nodeReadyFuture).isNotDone()
//TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
Assert.assertThat(networkMapCache.allNodeHashes, IsIterableContainingInAnyOrder.containsInAnyOrder(signedNodeInfo1.raw.hash, signedNodeInfo2.raw.hash))
eventually { verify(networkMapCache, times(2)).addNode(any()) }
verify(networkMapCache, times(1)).addNode(nodeInfo1)
verify(networkMapCache, times(1)).addNode(nodeInfo2)
assertThat(nodeReadyFuture).isDone()
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned)
networkMapClient.publish(signedNodeInfo3)
networkMapClient.publish(signedNodeInfo4)
advanceTime()
//TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
//4 node info from network map, and 1 from file.
// 4 node info from network map, and 1 from file.
eventually { verify(networkMapCache, times(5)).addNode(any()) }
verify(networkMapCache, times(1)).addNode(nodeInfo3)
verify(networkMapCache, times(1)).addNode(nodeInfo4)
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo)
Assert.assertThat(networkMapCache.allNodeHashes, IsIterableContainingInAnyOrder.containsInAnyOrder(
signedNodeInfo1.raw.hash,
signedNodeInfo2.raw.hash,
signedNodeInfo3.raw.hash,
signedNodeInfo4.raw.hash,
fileNodeInfoAndSigned.signed.raw.hash
))
}
@Test
@ -156,7 +157,7 @@ class NetworkMapUpdaterTest {
val (nodeInfo4, signedNodeInfo4) = createNodeInfoAndSigned("Info 4")
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
// Add all nodes.
//Add all nodes.
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned)
networkMapClient.publish(signedNodeInfo1)
networkMapClient.publish(signedNodeInfo2)
@ -165,23 +166,31 @@ class NetworkMapUpdaterTest {
startUpdater()
advanceTime()
//TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
// 4 node info from network map, and 1 from file.
eventually { verify(networkMapCache, times(5)).addNode(any()) }
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo)
// Test remove node.
Assert.assertThat(networkMapCache.allNodeHashes, IsIterableContainingInAnyOrder.containsInAnyOrder(
signedNodeInfo1.raw.hash,
signedNodeInfo2.raw.hash,
signedNodeInfo3.raw.hash,
signedNodeInfo4.raw.hash,
fileNodeInfoAndSigned.signed.raw.hash
))
//Test remove node.
listOf(nodeInfo1, nodeInfo2, nodeInfo3, nodeInfo4).forEach {
server.removeNodeInfo(it)
}
eventually { verify(networkMapCache, times(4)).removeNode(any()) }
//TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapCache, times(4)).removeNode(any())
verify(networkMapCache, times(1)).removeNode(nodeInfo1)
verify(networkMapCache, times(1)).removeNode(nodeInfo2)
verify(networkMapCache, times(1)).removeNode(nodeInfo3)
verify(networkMapCache, times(1)).removeNode(nodeInfo4)
// Node info from file should not be deleted
//Node info from file should not be deleted
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash)
}
@ -190,7 +199,7 @@ class NetworkMapUpdaterTest {
setUpdater(netMapClient = null)
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
// Not subscribed yet.
//Not subscribed yet.
verify(networkMapCache, times(0)).addNode(any())
startUpdater()
@ -235,12 +244,13 @@ class NetworkMapUpdaterTest {
val newParameters = testNetworkParameters(epoch = 314, maxMessageSize = 10485761)
server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN)
startUpdater()
//TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
val newHash = newParameters.serialize().hash
val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME
assertNever("network parameters should not be auto accepted") { updateFile.exists() }
assert(!updateFile.exists()) { "network parameters should not be auto accepted" }
updater!!.acceptNewNetworkParameters(newHash) { it.serialize().sign(ourKeyPair) }
eventually { verify(networkParametersStorage, times(1)).saveParameters(any()) }
verify(networkParametersStorage, times(1)).saveParameters(any())
val signedNetworkParams = updateFile.readObject<SignedNetworkParameters>()
val paramsFromFile = signedNetworkParams.verifiedNetworkParametersCert(DEV_ROOT_CA.certificate)
assertEquals(newParameters, paramsFromFile)
@ -255,15 +265,14 @@ class NetworkMapUpdaterTest {
whitelistedContractImplementations = mapOf("key" to listOf(SecureHash.randomSHA256())))
server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN)
startUpdater()
//TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
val newHash = newParameters.serialize().hash
val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME
eventually {
assertTrue(updateFile.exists(), "Update file should be created")
val signedNetworkParams = updateFile.readObject<SignedNetworkParameters>()
val paramsFromFile = signedNetworkParams.verifiedNetworkParametersCert(DEV_ROOT_CA.certificate)
assertEquals(newParameters, paramsFromFile)
assertEquals(newHash, server.latestParametersAccepted(ourKeyPair.public))
}
val signedNetworkParams = updateFile.readObject<SignedNetworkParameters>()
val paramsFromFile = signedNetworkParams.verifiedNetworkParametersCert(DEV_ROOT_CA.certificate)
assertEquals(newParameters, paramsFromFile)
assertEquals(newHash, server.latestParametersAccepted(ourKeyPair.public))
}
@Test
@ -274,9 +283,10 @@ class NetworkMapUpdaterTest {
whitelistedContractImplementations = mapOf("key" to listOf(SecureHash.randomSHA256())))
server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN)
startUpdater(excludedAutoAcceptNetworkParameters = setOf("whitelistedContractImplementations"))
//TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME
assertNever("network parameters should not be auto accepted") { updateFile.exists() }
assert(!updateFile.exists()) { "network parameters should not be auto accepted" }
}
@Test
@ -287,9 +297,10 @@ class NetworkMapUpdaterTest {
whitelistedContractImplementations = mapOf("key" to listOf(SecureHash.randomSHA256())))
server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN)
startUpdater(autoAcceptNetworkParameters = false)
//TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME
assertNever("network parameters should not be auto accepted") { updateFile.exists() }
assert(!updateFile.exists()) { "network parameters should not be auto accepted" }
}
@Test
@ -299,9 +310,9 @@ class NetworkMapUpdaterTest {
assertThatThrownBy { networkMapClient.getNetworkMap(privateNetUUID).payload.nodeInfoHashes }
.isInstanceOf(IOException::class.java)
.hasMessageContaining("Response Code 404")
val (aliceInfo, signedAliceInfo) = createNodeInfoAndSigned(ALICE_NAME) // Goes to private network map
val (aliceInfo, signedAliceInfo) = createNodeInfoAndSigned(ALICE_NAME) //Goes to private network map
val aliceHash = aliceInfo.serialize().hash
val (bobInfo, signedBobInfo) = createNodeInfoAndSigned(BOB_NAME) // Goes to global network map
val (bobInfo, signedBobInfo) = createNodeInfoAndSigned(BOB_NAME) //Goes to global network map
networkMapClient.publish(signedAliceInfo)
networkMapClient.publish(signedBobInfo)
assertThat(networkMapClient.getNetworkMap().payload.nodeInfoHashes).containsExactly(bobInfo.serialize().hash)
@ -323,7 +334,7 @@ class NetworkMapUpdaterTest {
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned1.nodeInfo)
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned2.nodeInfo)
assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(fileNodeInfoAndSigned1.signed.raw.hash, fileNodeInfoAndSigned2.signed.raw.hash)
// Remove one of the nodes
//Remove one of the nodes
val fileName1 = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${fileNodeInfoAndSigned1.nodeInfo.legalIdentities[0].name.serialize().hash}"
(nodeInfoDir / fileName1).delete()
advanceTime()
@ -338,48 +349,44 @@ class NetworkMapUpdaterTest {
val nodeInfoBuilder = TestNodeInfoBuilder()
val (_, key) = nodeInfoBuilder.addLegalIdentity(CordaX500Name("Info", "London", "GB"))
val (serverNodeInfo, serverSignedNodeInfo) = nodeInfoBuilder.buildWithSigned(1, 1)
// Construct node for exactly same identity, but different serial. This one will go to additional-node-infos only.
//Construct node for exactly same identity, but different serial. This one will go to additional-node-infos only.
val localNodeInfo = serverNodeInfo.copy(serial = 17)
val localSignedNodeInfo = NodeInfoAndSigned(localNodeInfo) { _, serialised ->
key.sign(serialised.bytes)
}
// The one with higher serial goes to additional-node-infos.
//The one with higher serial goes to additional-node-infos.
NodeInfoWatcher.saveToFile(nodeInfoDir, localSignedNodeInfo)
// Publish to network map the one with lower serial.
//Publish to network map the one with lower serial.
networkMapClient.publish(serverSignedNodeInfo)
startUpdater()
advanceTime()
verify(networkMapCache, times(1)).addNode(localNodeInfo)
// Node from file has higher serial than the one from NetworkMapServer
eventually { assertThat(networkMapCache.allNodeHashes).containsOnly(localSignedNodeInfo.signed.raw.hash) }
Thread.sleep(2L * cacheExpiryMs)
//Node from file has higher serial than the one from NetworkMapServer
assertThat(networkMapCache.allNodeHashes).containsOnly(localSignedNodeInfo.signed.raw.hash)
val fileName = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${localNodeInfo.legalIdentities[0].name.serialize().hash}"
(nodeInfoDir / fileName).delete()
advanceTime()
verify(networkMapCache, times(1)).removeNode(any())
verify(networkMapCache).removeNode(localNodeInfo)
eventually {
// Instead of node from file we should have now the one from NetworkMapServer
assertThat(networkMapCache.allNodeHashes).containsOnly(serverSignedNodeInfo.raw.hash)
}
Thread.sleep(2L * cacheExpiryMs)
//Instead of node from file we should have now the one from NetworkMapServer
assertThat(networkMapCache.allNodeHashes).containsOnly(serverSignedNodeInfo.raw.hash)
}
// Test fix for ENT-1882
// This scenario can happen when signing of network map server is performed much longer after the node joined the network.
// Network map will advertise hashes without that node.
//Test fix for ENT-1882
//This scenario can happen when signing of network map server is performed much longer after the node joined the network.
//Network map will advertise hashes without that node.
@Test
fun `not remove own node info when it is not in network map yet`() {
val (myInfo, signedMyInfo) = createNodeInfoAndSigned("My node info")
val (_, signedOtherInfo) = createNodeInfoAndSigned("Other info")
setUpdater()
networkMapCache.addNode(myInfo) // Simulate behaviour on node startup when our node info is added to cache
networkMapCache.addNode(myInfo) //Simulate behaviour on node startup when our node info is added to cache
networkMapClient.publish(signedOtherInfo)
startUpdater(ourNodeInfo = signedMyInfo)
assertAlways("Node must never be removed") {
verify(networkMapCache, never()).removeNode(myInfo)
}
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapCache, never()).removeNode(myInfo)
assertThat(server.networkMapHashes()).containsOnly(signedOtherInfo.raw.hash)
assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(signedMyInfo.raw.hash, signedOtherInfo.raw.hash)
}
@ -395,24 +402,22 @@ class NetworkMapUpdaterTest {
val signedNodeInfo1 = builder.buildWithSigned(1).signed
val signedNodeInfo2 = builder.buildWithSigned(2).signed
// Test adding new node.
//Test adding new node.
networkMapClient.publish(signedNodeInfo1)
// Not subscribed yet.
//Not subscribed yet.
verify(networkMapCache, times(0)).addNode(any())
startUpdater()
eventually { verify(networkMapCache, times(1)).addNode(signedNodeInfo1.verified()) }
//TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
assert(networkMapCache.allNodeHashes.size == 1)
networkMapClient.publish(signedNodeInfo2)
Thread.sleep(2L * cacheExpiryMs)
advanceTime()
eventually {
verify(networkMapCache, times(1)).addNode(signedNodeInfo2.verified())
verify(networkMapCache, times(1)).removeNode(signedNodeInfo1.verified())
}
assertEquals(1, networkMapCache.allNodeHashes.size)
verify(networkMapCache, times(1)).removeNode(signedNodeInfo1.verified())
assert(networkMapCache.allNodeHashes.size == 1)
}
@Test
@ -442,6 +447,14 @@ class NetworkMapUpdaterTest {
}
private fun createMockNetworkMapCache(): NetworkMapCacheInternal {
fun addNodeToMockCache(nodeInfo: NodeInfo, data: ConcurrentHashMap<Party, NodeInfo>) {
val party = nodeInfo.legalIdentities[0]
data.compute(party) { _, current ->
if (current == null || current.serial < nodeInfo.serial) nodeInfo else current
}
}
return mock {
on { nodeReady }.thenReturn(nodeReadyFuture)
val data = ConcurrentHashMap<Party, NodeInfo>()
@ -452,6 +465,14 @@ class NetworkMapUpdaterTest {
if (current == null || current.serial < nodeInfo.serial) nodeInfo else current
}
}
on { addNodes(any<List<NodeInfo>>()) }.then {
val nodeInfos = it.arguments[0] as List<NodeInfo>
nodeInfos.forEach { nodeInfo ->
addNodeToMockCache(nodeInfo, data)
}
}
on { removeNode(any()) }.then { data.remove((it.arguments[0] as NodeInfo).legalIdentities[0]) }
on { getNodeByLegalIdentity(any()) }.then { data[it.arguments[0]] }
on { allNodeHashes }.then { data.values.map { it.serialize().hash } }
@ -470,24 +491,4 @@ class NetworkMapUpdaterTest {
private fun advanceTime() {
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
}
private fun assertNever(condition: String, check: () -> Boolean) {
val timeoutMillis = 2L * cacheExpiryMs
val start = Instant.now()
while (Duration.between(start, Instant.now()).toMillis() < timeoutMillis) {
Thread.sleep(100)
if (check()) fail(condition)
}
}
private fun assertAlways(condition: String, check: () -> Unit) {
assertNever(condition) {
try {
check()
false
} catch (e: Exception) {
true
}
}
}
}