diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index beeea48ffc..1b0cf330ca 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -43,6 +43,8 @@ interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBase { /** Adds a node to the local cache (generally only used for adding ourselves). */ fun addNode(node: NodeInfo) + fun addNodes(nodes: List) + /** Removes a node from the local cache. */ fun removeNode(node: NodeInfo) } diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt index 7d54ac55c0..d39f6fbc8f 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt @@ -9,9 +9,11 @@ import net.corda.core.messaging.DataFeed import net.corda.core.messaging.ParametersUpdateInfo import net.corda.core.node.AutoAcceptable import net.corda.core.node.NetworkParameters +import net.corda.core.node.NodeInfo import net.corda.core.node.services.KeyManagementService import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.minutes import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.config.NetworkParameterAcceptanceSettings @@ -21,15 +23,21 @@ import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.network.* import rx.Subscription import rx.subjects.PublishSubject +import java.lang.Integer.max +import java.lang.Integer.min import java.lang.reflect.Method import java.nio.file.Path import java.nio.file.StandardCopyOption import java.security.cert.X509Certificate import java.time.Duration import java.util.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference +import java.util.function.Consumer +import java.util.function.Supplier import kotlin.reflect.KProperty1 import kotlin.reflect.full.declaredMemberProperties import kotlin.reflect.full.findAnnotation @@ -154,10 +162,8 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, if (networkMapClient == null) { throw CordaRuntimeException("Network map cache can be updated only if network map/compatibility zone URL is specified") } - val (globalNetworkMap, cacheTimeout) = networkMapClient.getNetworkMap() globalNetworkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) } - val additionalHashes = extraNetworkMapKeys.flatMap { try { networkMapClient.getNetworkMap(it).payload.nodeInfoHashes @@ -167,38 +173,54 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, emptyList() } } - val allHashesFromNetworkMap = (globalNetworkMap.nodeInfoHashes + additionalHashes).toSet() - if (currentParametersHash != globalNetworkMap.networkParameterHash) { exitOnParametersMismatch(globalNetworkMap) } - val currentNodeHashes = networkMapCache.allNodeHashes - // Remove node info from network map. (currentNodeHashes - allHashesFromNetworkMap - nodeInfoWatcher.processedNodeInfoHashes) .mapNotNull { if (it != ourNodeInfoHash) networkMapCache.getNodeByHash(it) else null } .forEach(networkMapCache::removeNode) - - (allHashesFromNetworkMap - currentNodeHashes).mapNotNull { - // Download new node info from network map - try { - networkMapClient.getNodeInfo(it) - } catch (e: Exception) { - // Failure to retrieve one node info shouldn't stop the whole update, log and return null instead. - logger.warn("Error encountered when downloading node info '$it', skipping...", e) - null - } - }.forEach { - // Add new node info to the network map cache, these could be new node info or modification of node info for existing nodes. - networkMapCache.addNode(it) + //at the moment we use a blocking HTTP library - but under the covers, the OS will interleave threads waiting for IO + //as HTTP GET is mostly IO bound, use more threads than CPU's + //maximum threads to use = 24, as if we did not limit this on large machines it could result in 100's of concurrent requests + val threadsToUseForNetworkMapDownload = min(Runtime.getRuntime().availableProcessors() * 4, 24) + val executorToUseForDownloadingNodeInfos = Executors.newFixedThreadPool(threadsToUseForNetworkMapDownload, NamedThreadFactory("NetworkMapUpdaterNodeInfoDownloadThread")) + //DB insert is single threaded - use a single threaded executor for it. + val executorToUseForInsertionIntoDB = Executors.newSingleThreadExecutor(NamedThreadFactory("NetworkMapUpdateDBInsertThread")) + val hashesToFetch = (allHashesFromNetworkMap - currentNodeHashes) + val networkMapDownloadStartTime = System.currentTimeMillis() + if (hashesToFetch.isNotEmpty()) { + val networkMapDownloadFutures = hashesToFetch.chunked(max(hashesToFetch.size / threadsToUseForNetworkMapDownload, 1)) + .map { nodeInfosToGet -> + //for a set of chunked hashes, get the nodeInfo for each hash + CompletableFuture.supplyAsync(Supplier> { + nodeInfosToGet.mapNotNull { nodeInfo -> + try { + networkMapClient.getNodeInfo(nodeInfo) + } catch (e: Exception) { + // Failure to retrieve one node info shouldn't stop the whole update, log and return null instead. + logger.warn("Error encountered when downloading node info '$nodeInfo', skipping...", e) + null + } + } + }, executorToUseForDownloadingNodeInfos).thenAcceptAsync(Consumer { retrievedNodeInfos -> + // Add new node info to the network map cache, these could be new node info or modification of node info for existing nodes. + networkMapCache.addNodes(retrievedNodeInfos) + }, executorToUseForInsertionIntoDB) + }.toTypedArray() + //wait for all the futures to complete + val waitForAllHashes = CompletableFuture.allOf(*networkMapDownloadFutures) + waitForAllHashes.thenRunAsync { + logger.info("Fetched: ${hashesToFetch.size} using $threadsToUseForNetworkMapDownload Threads in ${System.currentTimeMillis() - networkMapDownloadStartTime}ms") + executorToUseForDownloadingNodeInfos.shutdown() + executorToUseForInsertionIntoDB.shutdown() + }.getOrThrow() } - // Mark the network map cache as ready on a successful poll of the HTTP network map, even on the odd chance that // it's empty networkMapCache.nodeReady.set(null) - return cacheTimeout } @@ -247,7 +269,8 @@ The node will shutdown now.""") } fun acceptNewNetworkParameters(parametersHash: SecureHash, sign: (SecureHash) -> SignedData) { - networkMapClient ?: throw IllegalStateException("Network parameters updates are not supported without compatibility zone configured") + networkMapClient + ?: throw IllegalStateException("Network parameters updates are not supported without compatibility zone configured") // TODO This scenario will happen if node was restarted and didn't download parameters yet, but we accepted them. // Add persisting of newest parameters from update. val (update, signedNewNetParams) = requireNotNull(newNetworkParameters) { "Couldn't find parameters update for the hash: $parametersHash" } diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt index 4eea7af890..0ba34b2cb3 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -40,6 +40,8 @@ import javax.annotation.concurrent.ThreadSafe open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory, private val database: CordaPersistence, private val identityService: IdentityService) : NetworkMapCacheInternal, SingletonSerializeAsToken() { + + companion object { private val logger = contextLogger() } @@ -157,32 +159,52 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory, } } - override fun addNode(node: NodeInfo) { - logger.info("Adding node with info: $node") + override fun addNodes(nodes: List) { synchronized(_changed) { - val previousNode = getNodesByLegalIdentityKey(node.legalIdentities.first().owningKey).firstOrNull() - if (previousNode == null) { - logger.info("No previous node found") - if (!verifyAndRegisterIdentities(node)) return - database.transaction { - updateInfoDB(node, session) - changePublisher.onNext(MapChange.Added(node)) - } - } else if (previousNode.serial > node.serial) { - logger.info("Discarding older nodeInfo for ${node.legalIdentities.first().name}") - return - } else if (previousNode != node) { - logger.info("Previous node was found as: $previousNode") - // TODO We should be adding any new identities as well - if (!verifyIdentities(node)) return - database.transaction { + val newNodes = mutableListOf() + val updatedNodes = mutableListOf>() + nodes.map { it to getNodesByLegalIdentityKey(it.legalIdentities.first().owningKey).firstOrNull() } + .forEach { (node, previousNode) -> + when { + previousNode == null -> { + logger.info("No previous node found for ${node.legalIdentities.first().name}") + if (verifyAndRegisterIdentities(node)) { + newNodes.add(node) + } + } + previousNode.serial > node.serial -> { + logger.info("Discarding older nodeInfo for ${node.legalIdentities.first().name}") + } + previousNode != node -> { + logger.info("Previous node was found for ${node.legalIdentities.first().name} as: $previousNode") + // TODO We should be adding any new identities as well + if (verifyIdentities(node)) { + updatedNodes.add(node to previousNode) + } + } + else -> logger.info("Previous node was identical to incoming one - doing nothing") + } + } + + database.transaction { + updatedNodes.forEach { (node, previousNode) -> + //updated updateInfoDB(node, session) changePublisher.onNext(MapChange.Modified(node, previousNode)) } - } else { - logger.info("Previous node was identical to incoming one - doing nothing") + newNodes.forEach { node -> + //new + updateInfoDB(node, session) + changePublisher.onNext(MapChange.Added(node)) + } } + } + } + + override fun addNode(node: NodeInfo) { + logger.info("Adding node with info: $node") + addNodes(listOf(node)) logger.debug { "Done adding node with info: $node" } } @@ -218,11 +240,12 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory, logger.debug { "Done removing node with info: $node" } } - override val allNodes: List get() { - return database.transaction { - getAllNodeInfos(session).map { it.toNodeInfo() } + override val allNodes: List + get() { + return database.transaction { + getAllNodeInfos(session).map { it.toNodeInfo() } + } } - } private fun getAllNodeInfos(session: Session): List { val criteria = session.criteriaBuilder.createQuery(NodeInfoSchemaV1.PersistentNodeInfo::class.java) diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt index 46e77f0a5f..fd79d49b61 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt @@ -27,7 +27,6 @@ import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME import net.corda.nodeapi.internal.network.NodeInfoFilesCopier import net.corda.nodeapi.internal.network.SignedNetworkParameters import net.corda.nodeapi.internal.network.verifiedNetworkParametersCert -import net.corda.testing.common.internal.eventually import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.core.* import net.corda.testing.internal.DEV_ROOT_CA @@ -38,16 +37,12 @@ import net.corda.testing.node.internal.network.NetworkMapServer import net.corda.testing.node.makeTestIdentityService import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy -import org.junit.After -import org.junit.Assert.fail -import org.junit.Before -import org.junit.Rule -import org.junit.Test +import org.hamcrest.collection.IsIterableContainingInAnyOrder +import org.junit.* import rx.schedulers.TestScheduler import java.io.IOException import java.net.URL import java.security.KeyPair -import java.time.Duration import java.time.Instant import java.time.temporal.ChronoUnit import java.util.* @@ -104,11 +99,11 @@ class NetworkMapUpdaterTest { autoAcceptNetworkParameters: Boolean = true, excludedAutoAcceptNetworkParameters: Set = emptySet()) { updater!!.start(DEV_ROOT_CA.certificate, - server.networkParameters.serialize().hash, - ourNodeInfo, - networkParameters, - MockKeyManagementService(makeTestIdentityService(), ourKeyPair), - NetworkParameterAcceptanceSettings(autoAcceptNetworkParameters, excludedAutoAcceptNetworkParameters)) + server.networkParameters.serialize().hash, + ourNodeInfo, + networkParameters, + MockKeyManagementService(makeTestIdentityService(), ourKeyPair), + NetworkParameterAcceptanceSettings(autoAcceptNetworkParameters, excludedAutoAcceptNetworkParameters)) } @Test @@ -120,31 +115,37 @@ class NetworkMapUpdaterTest { val (nodeInfo4, signedNodeInfo4) = createNodeInfoAndSigned("Info 4") val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file") - // Test adding new node. + //Test adding new node. networkMapClient.publish(signedNodeInfo1) - // Not subscribed yet. + //Not subscribed yet. verify(networkMapCache, times(0)).addNode(any()) startUpdater() networkMapClient.publish(signedNodeInfo2) assertThat(nodeReadyFuture).isNotDone() + //TODO: Remove sleep in unit test. + Thread.sleep(2L * cacheExpiryMs) + + Assert.assertThat(networkMapCache.allNodeHashes, IsIterableContainingInAnyOrder.containsInAnyOrder(signedNodeInfo1.raw.hash, signedNodeInfo2.raw.hash)) - eventually { verify(networkMapCache, times(2)).addNode(any()) } - verify(networkMapCache, times(1)).addNode(nodeInfo1) - verify(networkMapCache, times(1)).addNode(nodeInfo2) assertThat(nodeReadyFuture).isDone() NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned) networkMapClient.publish(signedNodeInfo3) networkMapClient.publish(signedNodeInfo4) advanceTime() + //TODO: Remove sleep in unit test. + Thread.sleep(2L * cacheExpiryMs) + //4 node info from network map, and 1 from file. - // 4 node info from network map, and 1 from file. - eventually { verify(networkMapCache, times(5)).addNode(any()) } - verify(networkMapCache, times(1)).addNode(nodeInfo3) - verify(networkMapCache, times(1)).addNode(nodeInfo4) - verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo) + Assert.assertThat(networkMapCache.allNodeHashes, IsIterableContainingInAnyOrder.containsInAnyOrder( + signedNodeInfo1.raw.hash, + signedNodeInfo2.raw.hash, + signedNodeInfo3.raw.hash, + signedNodeInfo4.raw.hash, + fileNodeInfoAndSigned.signed.raw.hash + )) } @Test @@ -156,7 +157,7 @@ class NetworkMapUpdaterTest { val (nodeInfo4, signedNodeInfo4) = createNodeInfoAndSigned("Info 4") val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file") - // Add all nodes. + //Add all nodes. NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned) networkMapClient.publish(signedNodeInfo1) networkMapClient.publish(signedNodeInfo2) @@ -165,23 +166,31 @@ class NetworkMapUpdaterTest { startUpdater() advanceTime() + //TODO: Remove sleep in unit test. + Thread.sleep(2L * cacheExpiryMs) - // 4 node info from network map, and 1 from file. - eventually { verify(networkMapCache, times(5)).addNode(any()) } - verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo) - // Test remove node. + Assert.assertThat(networkMapCache.allNodeHashes, IsIterableContainingInAnyOrder.containsInAnyOrder( + signedNodeInfo1.raw.hash, + signedNodeInfo2.raw.hash, + signedNodeInfo3.raw.hash, + signedNodeInfo4.raw.hash, + fileNodeInfoAndSigned.signed.raw.hash + )) + + //Test remove node. listOf(nodeInfo1, nodeInfo2, nodeInfo3, nodeInfo4).forEach { server.removeNodeInfo(it) } - - eventually { verify(networkMapCache, times(4)).removeNode(any()) } + //TODO: Remove sleep in unit test. + Thread.sleep(2L * cacheExpiryMs) + verify(networkMapCache, times(4)).removeNode(any()) verify(networkMapCache, times(1)).removeNode(nodeInfo1) verify(networkMapCache, times(1)).removeNode(nodeInfo2) verify(networkMapCache, times(1)).removeNode(nodeInfo3) verify(networkMapCache, times(1)).removeNode(nodeInfo4) - // Node info from file should not be deleted + //Node info from file should not be deleted assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash) } @@ -190,7 +199,7 @@ class NetworkMapUpdaterTest { setUpdater(netMapClient = null) val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file") - // Not subscribed yet. + //Not subscribed yet. verify(networkMapCache, times(0)).addNode(any()) startUpdater() @@ -235,12 +244,13 @@ class NetworkMapUpdaterTest { val newParameters = testNetworkParameters(epoch = 314, maxMessageSize = 10485761) server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN) startUpdater() - + //TODO: Remove sleep in unit test. + Thread.sleep(2L * cacheExpiryMs) val newHash = newParameters.serialize().hash val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME - assertNever("network parameters should not be auto accepted") { updateFile.exists() } + assert(!updateFile.exists()) { "network parameters should not be auto accepted" } updater!!.acceptNewNetworkParameters(newHash) { it.serialize().sign(ourKeyPair) } - eventually { verify(networkParametersStorage, times(1)).saveParameters(any()) } + verify(networkParametersStorage, times(1)).saveParameters(any()) val signedNetworkParams = updateFile.readObject() val paramsFromFile = signedNetworkParams.verifiedNetworkParametersCert(DEV_ROOT_CA.certificate) assertEquals(newParameters, paramsFromFile) @@ -255,15 +265,14 @@ class NetworkMapUpdaterTest { whitelistedContractImplementations = mapOf("key" to listOf(SecureHash.randomSHA256()))) server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN) startUpdater() + //TODO: Remove sleep in unit test. + Thread.sleep(2L * cacheExpiryMs) val newHash = newParameters.serialize().hash val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME - eventually { - assertTrue(updateFile.exists(), "Update file should be created") - val signedNetworkParams = updateFile.readObject() - val paramsFromFile = signedNetworkParams.verifiedNetworkParametersCert(DEV_ROOT_CA.certificate) - assertEquals(newParameters, paramsFromFile) - assertEquals(newHash, server.latestParametersAccepted(ourKeyPair.public)) - } + val signedNetworkParams = updateFile.readObject() + val paramsFromFile = signedNetworkParams.verifiedNetworkParametersCert(DEV_ROOT_CA.certificate) + assertEquals(newParameters, paramsFromFile) + assertEquals(newHash, server.latestParametersAccepted(ourKeyPair.public)) } @Test @@ -274,9 +283,10 @@ class NetworkMapUpdaterTest { whitelistedContractImplementations = mapOf("key" to listOf(SecureHash.randomSHA256()))) server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN) startUpdater(excludedAutoAcceptNetworkParameters = setOf("whitelistedContractImplementations")) - + //TODO: Remove sleep in unit test. + Thread.sleep(2L * cacheExpiryMs) val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME - assertNever("network parameters should not be auto accepted") { updateFile.exists() } + assert(!updateFile.exists()) { "network parameters should not be auto accepted" } } @Test @@ -287,9 +297,10 @@ class NetworkMapUpdaterTest { whitelistedContractImplementations = mapOf("key" to listOf(SecureHash.randomSHA256()))) server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN) startUpdater(autoAcceptNetworkParameters = false) - + //TODO: Remove sleep in unit test. + Thread.sleep(2L * cacheExpiryMs) val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME - assertNever("network parameters should not be auto accepted") { updateFile.exists() } + assert(!updateFile.exists()) { "network parameters should not be auto accepted" } } @Test @@ -299,9 +310,9 @@ class NetworkMapUpdaterTest { assertThatThrownBy { networkMapClient.getNetworkMap(privateNetUUID).payload.nodeInfoHashes } .isInstanceOf(IOException::class.java) .hasMessageContaining("Response Code 404") - val (aliceInfo, signedAliceInfo) = createNodeInfoAndSigned(ALICE_NAME) // Goes to private network map + val (aliceInfo, signedAliceInfo) = createNodeInfoAndSigned(ALICE_NAME) //Goes to private network map val aliceHash = aliceInfo.serialize().hash - val (bobInfo, signedBobInfo) = createNodeInfoAndSigned(BOB_NAME) // Goes to global network map + val (bobInfo, signedBobInfo) = createNodeInfoAndSigned(BOB_NAME) //Goes to global network map networkMapClient.publish(signedAliceInfo) networkMapClient.publish(signedBobInfo) assertThat(networkMapClient.getNetworkMap().payload.nodeInfoHashes).containsExactly(bobInfo.serialize().hash) @@ -323,7 +334,7 @@ class NetworkMapUpdaterTest { verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned1.nodeInfo) verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned2.nodeInfo) assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(fileNodeInfoAndSigned1.signed.raw.hash, fileNodeInfoAndSigned2.signed.raw.hash) - // Remove one of the nodes + //Remove one of the nodes val fileName1 = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${fileNodeInfoAndSigned1.nodeInfo.legalIdentities[0].name.serialize().hash}" (nodeInfoDir / fileName1).delete() advanceTime() @@ -338,48 +349,44 @@ class NetworkMapUpdaterTest { val nodeInfoBuilder = TestNodeInfoBuilder() val (_, key) = nodeInfoBuilder.addLegalIdentity(CordaX500Name("Info", "London", "GB")) val (serverNodeInfo, serverSignedNodeInfo) = nodeInfoBuilder.buildWithSigned(1, 1) - // Construct node for exactly same identity, but different serial. This one will go to additional-node-infos only. + //Construct node for exactly same identity, but different serial. This one will go to additional-node-infos only. val localNodeInfo = serverNodeInfo.copy(serial = 17) val localSignedNodeInfo = NodeInfoAndSigned(localNodeInfo) { _, serialised -> key.sign(serialised.bytes) } - // The one with higher serial goes to additional-node-infos. + //The one with higher serial goes to additional-node-infos. NodeInfoWatcher.saveToFile(nodeInfoDir, localSignedNodeInfo) - // Publish to network map the one with lower serial. + //Publish to network map the one with lower serial. networkMapClient.publish(serverSignedNodeInfo) startUpdater() advanceTime() verify(networkMapCache, times(1)).addNode(localNodeInfo) - - // Node from file has higher serial than the one from NetworkMapServer - eventually { assertThat(networkMapCache.allNodeHashes).containsOnly(localSignedNodeInfo.signed.raw.hash) } + Thread.sleep(2L * cacheExpiryMs) + //Node from file has higher serial than the one from NetworkMapServer + assertThat(networkMapCache.allNodeHashes).containsOnly(localSignedNodeInfo.signed.raw.hash) val fileName = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${localNodeInfo.legalIdentities[0].name.serialize().hash}" (nodeInfoDir / fileName).delete() advanceTime() verify(networkMapCache, times(1)).removeNode(any()) verify(networkMapCache).removeNode(localNodeInfo) - - - eventually { - // Instead of node from file we should have now the one from NetworkMapServer - assertThat(networkMapCache.allNodeHashes).containsOnly(serverSignedNodeInfo.raw.hash) - } + Thread.sleep(2L * cacheExpiryMs) + //Instead of node from file we should have now the one from NetworkMapServer + assertThat(networkMapCache.allNodeHashes).containsOnly(serverSignedNodeInfo.raw.hash) } - // Test fix for ENT-1882 - // This scenario can happen when signing of network map server is performed much longer after the node joined the network. - // Network map will advertise hashes without that node. + //Test fix for ENT-1882 + //This scenario can happen when signing of network map server is performed much longer after the node joined the network. + //Network map will advertise hashes without that node. @Test fun `not remove own node info when it is not in network map yet`() { val (myInfo, signedMyInfo) = createNodeInfoAndSigned("My node info") val (_, signedOtherInfo) = createNodeInfoAndSigned("Other info") setUpdater() - networkMapCache.addNode(myInfo) // Simulate behaviour on node startup when our node info is added to cache + networkMapCache.addNode(myInfo) //Simulate behaviour on node startup when our node info is added to cache networkMapClient.publish(signedOtherInfo) startUpdater(ourNodeInfo = signedMyInfo) - assertAlways("Node must never be removed") { - verify(networkMapCache, never()).removeNode(myInfo) - } + Thread.sleep(2L * cacheExpiryMs) + verify(networkMapCache, never()).removeNode(myInfo) assertThat(server.networkMapHashes()).containsOnly(signedOtherInfo.raw.hash) assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(signedMyInfo.raw.hash, signedOtherInfo.raw.hash) } @@ -395,24 +402,22 @@ class NetworkMapUpdaterTest { val signedNodeInfo1 = builder.buildWithSigned(1).signed val signedNodeInfo2 = builder.buildWithSigned(2).signed - // Test adding new node. + //Test adding new node. networkMapClient.publish(signedNodeInfo1) - // Not subscribed yet. + //Not subscribed yet. verify(networkMapCache, times(0)).addNode(any()) startUpdater() - eventually { verify(networkMapCache, times(1)).addNode(signedNodeInfo1.verified()) } + //TODO: Remove sleep in unit test. + Thread.sleep(2L * cacheExpiryMs) assert(networkMapCache.allNodeHashes.size == 1) networkMapClient.publish(signedNodeInfo2) - + Thread.sleep(2L * cacheExpiryMs) advanceTime() - eventually { - verify(networkMapCache, times(1)).addNode(signedNodeInfo2.verified()) - verify(networkMapCache, times(1)).removeNode(signedNodeInfo1.verified()) - } - assertEquals(1, networkMapCache.allNodeHashes.size) + verify(networkMapCache, times(1)).removeNode(signedNodeInfo1.verified()) + assert(networkMapCache.allNodeHashes.size == 1) } @Test @@ -442,6 +447,14 @@ class NetworkMapUpdaterTest { } private fun createMockNetworkMapCache(): NetworkMapCacheInternal { + + fun addNodeToMockCache(nodeInfo: NodeInfo, data: ConcurrentHashMap) { + val party = nodeInfo.legalIdentities[0] + data.compute(party) { _, current -> + if (current == null || current.serial < nodeInfo.serial) nodeInfo else current + } + } + return mock { on { nodeReady }.thenReturn(nodeReadyFuture) val data = ConcurrentHashMap() @@ -452,6 +465,14 @@ class NetworkMapUpdaterTest { if (current == null || current.serial < nodeInfo.serial) nodeInfo else current } } + + on { addNodes(any>()) }.then { + val nodeInfos = it.arguments[0] as List + nodeInfos.forEach { nodeInfo -> + addNodeToMockCache(nodeInfo, data) + } + } + on { removeNode(any()) }.then { data.remove((it.arguments[0] as NodeInfo).legalIdentities[0]) } on { getNodeByLegalIdentity(any()) }.then { data[it.arguments[0]] } on { allNodeHashes }.then { data.values.map { it.serialize().hash } } @@ -470,24 +491,4 @@ class NetworkMapUpdaterTest { private fun advanceTime() { scheduler.advanceTimeBy(10, TimeUnit.SECONDS) } - - private fun assertNever(condition: String, check: () -> Boolean) { - val timeoutMillis = 2L * cacheExpiryMs - val start = Instant.now() - while (Duration.between(start, Instant.now()).toMillis() < timeoutMillis) { - Thread.sleep(100) - if (check()) fail(condition) - } - } - - private fun assertAlways(condition: String, check: () -> Unit) { - assertNever(condition) { - try { - check() - false - } catch (e: Exception) { - true - } - } - } }