From ebdd40049c66780bde54106afaa87973791166c5 Mon Sep 17 00:00:00 2001 From: Ryan Fowler Date: Tue, 17 Mar 2020 17:02:08 +0000 Subject: [PATCH 1/4] CORDA-3662: Use an INNER JOIN for network map cache queries, (#6062) - rename add or update function for clarity - put removal of old nodes after retrieval of new ones to avoid gaps in the map - plus add a test --- .../network/PersistentNetworkMapCacheTest.kt | 30 ++--- .../net/corda/node/internal/AbstractNode.kt | 2 +- .../node/services/api/ServiceHubInternal.kt | 7 +- .../services/network/NetworkMapUpdater.kt | 36 ++++-- .../network/PersistentNetworkMapCache.kt | 14 +-- .../corda/node/internal/NodeRestartTests.kt | 4 +- .../services/network/NetworkMapCacheTest.kt | 6 +- .../services/network/NetworkMapUpdaterTest.kt | 107 ++++++++++++++---- .../node/internal/InternalMockNetwork.kt | 4 +- .../testing/node/internal/NodeBasedTest.kt | 2 +- 10 files changed, 146 insertions(+), 66 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt index f9675a94c1..9c701ccd3a 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt @@ -50,7 +50,7 @@ class PersistentNetworkMapCacheTest { @Test(timeout=300_000) fun addNode() { val alice = createNodeInfo(listOf(ALICE)) - charlieNetMapCache.addNode(alice) + charlieNetMapCache.addOrUpdateNode(alice) val fromDb = database.transaction { session.createQuery( "from ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name}", @@ -62,7 +62,7 @@ class PersistentNetworkMapCacheTest { @Test(timeout=300_000) fun `unknown legal name`() { - charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE))) + charlieNetMapCache.addOrUpdateNode(createNodeInfo(listOf(ALICE))) assertThat(charlieNetMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty() assertThat(charlieNetMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull() assertThat(charlieNetMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull() @@ -71,13 +71,13 @@ class PersistentNetworkMapCacheTest { @Test(timeout=300_000) fun `nodes in distributed service`() { - charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE))) + charlieNetMapCache.addOrUpdateNode(createNodeInfo(listOf(ALICE))) val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME) val distServiceNodeInfos = (1..2).map { val nodeInfo = createNodeInfo(identities = listOf(TestIdentity.fresh("Org-$it"), distributedIdentity)) - charlieNetMapCache.addNode(nodeInfo) + charlieNetMapCache.addOrUpdateNode(nodeInfo) nodeInfo } @@ -90,7 +90,7 @@ class PersistentNetworkMapCacheTest { @Test(timeout=300_000) fun `get nodes by owning key and by name`() { val alice = createNodeInfo(listOf(ALICE)) - charlieNetMapCache.addNode(alice) + charlieNetMapCache.addOrUpdateNode(alice) assertThat(charlieNetMapCache.getNodesByLegalIdentityKey(ALICE.publicKey)).containsOnly(alice) assertThat(charlieNetMapCache.getNodeByLegalName(ALICE.name)).isEqualTo(alice) } @@ -98,31 +98,31 @@ class PersistentNetworkMapCacheTest { @Test(timeout=300_000) fun `get nodes by address`() { val alice = createNodeInfo(listOf(ALICE)) - charlieNetMapCache.addNode(alice) + charlieNetMapCache.addOrUpdateNode(alice) assertThat(charlieNetMapCache.getNodeByAddress(alice.addresses[0])).isEqualTo(alice) } @Test(timeout=300_000) fun `insert two node infos with the same host and port`() { val alice = createNodeInfo(listOf(ALICE)) - charlieNetMapCache.addNode(alice) + charlieNetMapCache.addOrUpdateNode(alice) val bob = createNodeInfo(listOf(BOB), address = alice.addresses[0]) - charlieNetMapCache.addNode(bob) + charlieNetMapCache.addOrUpdateNode(bob) val nodeInfos = charlieNetMapCache.allNodes.filter { alice.addresses[0] in it.addresses } assertThat(nodeInfos).hasSize(2) } @Test(timeout=300_000) fun `negative test - attempt to insert invalid node info`() { - charlieNetMapCache.addNode(createNodeInfo(listOf(LONG_PLC))) + charlieNetMapCache.addOrUpdateNode(createNodeInfo(listOf(LONG_PLC))) assertThat(charlieNetMapCache.allNodes).hasSize(0) } @Test(timeout=300_000) fun `negative test - attempt to update existing node with invalid node info`() { - charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE))) + charlieNetMapCache.addOrUpdateNode(createNodeInfo(listOf(ALICE))) val aliceUpdate = TestIdentity(LONG_X500_NAME, ALICE.keyPair) - charlieNetMapCache.addNode(createNodeInfo(listOf(aliceUpdate))) + charlieNetMapCache.addOrUpdateNode(createNodeInfo(listOf(aliceUpdate))) assertThat(charlieNetMapCache.allNodes).hasSize(1) assertThat(charlieNetMapCache.getNodeByLegalName(ALICE_NAME)).isNotNull assertThat(charlieNetMapCache.getNodeByLegalName(LONG_X500_NAME)).isNull() @@ -130,7 +130,7 @@ class PersistentNetworkMapCacheTest { @Test(timeout=300_000) fun `negative test - insert two valid node infos and one invalid one`() { - charlieNetMapCache.addNodes(listOf(createNodeInfo(listOf(ALICE)), + charlieNetMapCache.addOrUpdateNodes(listOf(createNodeInfo(listOf(ALICE)), createNodeInfo(listOf(BOB)), createNodeInfo(listOf(LONG_PLC)))) assertThat(charlieNetMapCache.allNodes).hasSize(2) @@ -139,7 +139,7 @@ class PersistentNetworkMapCacheTest { @Test(timeout=300_000) fun `negative test - insert three valid node infos and two invalid ones`() { - charlieNetMapCache.addNodes(listOf(createNodeInfo(listOf(LONG_PLC)), + charlieNetMapCache.addOrUpdateNodes(listOf(createNodeInfo(listOf(LONG_PLC)), createNodeInfo(listOf(ALICE)), createNodeInfo(listOf(BOB)), createNodeInfo(listOf(CHARLIE)), @@ -150,9 +150,9 @@ class PersistentNetworkMapCacheTest { @Test(timeout=300_000) fun `negative test - insert one valid node info then attempt to add one invalid node info and update the existing valid nodeinfo`() { - charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE))) + charlieNetMapCache.addOrUpdateNode(createNodeInfo(listOf(ALICE))) val aliceUpdate = TestIdentity(LONG_X500_NAME, ALICE.keyPair) - charlieNetMapCache.addNodes(listOf(createNodeInfo(listOf(aliceUpdate)), + charlieNetMapCache.addOrUpdateNodes(listOf(createNodeInfo(listOf(aliceUpdate)), createNodeInfo(listOf(LONGER_PLC)), createNodeInfo(listOf(BOB)))) assertThat(charlieNetMapCache.allNodes).hasSize(2) assertThat(charlieNetMapCache.getNodeByLegalName(ALICE_NAME)).isNotNull diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 38675eb979..a89dd09763 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -611,7 +611,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } else { log.info("Node-info has changed so submitting update. Old node-info was $nodeInfoFromDb") val newNodeInfo = potentialNodeInfo.copy(serial = platformClock.millis()) - networkMapCache.addNode(newNodeInfo) + networkMapCache.addOrUpdateNode(newNodeInfo) log.info("New node-info: $newNodeInfo") newNodeInfo } diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 0009424917..6fa3ed5869 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -40,10 +40,11 @@ interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBase { * This is used for Artemis bridge lookup process. */ fun getNodesByOwningKeyIndex(identityKeyIndex: String): List - /** Adds a node to the local cache (generally only used for adding ourselves). */ - fun addNode(node: NodeInfo) + /** Adds (or updates) a node to the local cache (generally only used for adding ourselves). */ + fun addOrUpdateNode(node: NodeInfo) - fun addNodes(nodes: List) + /** Adds (or updates) nodes to the local cache. */ + fun addOrUpdateNodes(nodes: List) /** Removes a node from the local cache. */ fun removeNode(node: NodeInfo) diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt index d39f6fbc8f..7ff18232a2 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt @@ -4,7 +4,13 @@ import com.google.common.util.concurrent.MoreExecutors import net.corda.core.CordaRuntimeException import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SignedData -import net.corda.core.internal.* +import net.corda.core.internal.NetworkParametersStorage +import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.copyTo +import net.corda.core.internal.div +import net.corda.core.internal.exists +import net.corda.core.internal.readObject +import net.corda.core.internal.sign import net.corda.core.messaging.DataFeed import net.corda.core.messaging.ParametersUpdateInfo import net.corda.core.node.AutoAcceptable @@ -20,7 +26,12 @@ import net.corda.node.services.config.NetworkParameterAcceptanceSettings import net.corda.node.utilities.NamedThreadFactory import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException import net.corda.nodeapi.internal.SignedNodeInfo -import net.corda.nodeapi.internal.network.* +import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME +import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME +import net.corda.nodeapi.internal.network.NetworkMap +import net.corda.nodeapi.internal.network.ParametersUpdate +import net.corda.nodeapi.internal.network.SignedNetworkParameters +import net.corda.nodeapi.internal.network.verifiedNetworkParametersCert import rx.Subscription import rx.subjects.PublishSubject import java.lang.Integer.max @@ -118,7 +129,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, .subscribe { for (update in it) { when (update) { - is NodeInfoUpdate.Add -> networkMapCache.addNode(update.nodeInfo) + is NodeInfoUpdate.Add -> networkMapCache.addOrUpdateNode(update.nodeInfo) is NodeInfoUpdate.Remove -> { if (update.hash != ourNodeInfoHash) { val nodeInfo = networkMapCache.getNodeByHash(update.hash) @@ -177,11 +188,12 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, if (currentParametersHash != globalNetworkMap.networkParameterHash) { exitOnParametersMismatch(globalNetworkMap) } - val currentNodeHashes = networkMapCache.allNodeHashes - // Remove node info from network map. - (currentNodeHashes - allHashesFromNetworkMap - nodeInfoWatcher.processedNodeInfoHashes) - .mapNotNull { if (it != ourNodeInfoHash) networkMapCache.getNodeByHash(it) else null } - .forEach(networkMapCache::removeNode) + // Calculate any nodes that are now gone and remove _only_ them from the cache + // NOTE: We won't remove them until after the add/update cycle as only then will we definitely know which nodes are no longer + // in the network + val allNodeHashes = networkMapCache.allNodeHashes + val nodeHashesToBeDeleted = (allNodeHashes - allHashesFromNetworkMap - nodeInfoWatcher.processedNodeInfoHashes) + .filter { it != ourNodeInfoHash } //at the moment we use a blocking HTTP library - but under the covers, the OS will interleave threads waiting for IO //as HTTP GET is mostly IO bound, use more threads than CPU's //maximum threads to use = 24, as if we did not limit this on large machines it could result in 100's of concurrent requests @@ -189,7 +201,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, val executorToUseForDownloadingNodeInfos = Executors.newFixedThreadPool(threadsToUseForNetworkMapDownload, NamedThreadFactory("NetworkMapUpdaterNodeInfoDownloadThread")) //DB insert is single threaded - use a single threaded executor for it. val executorToUseForInsertionIntoDB = Executors.newSingleThreadExecutor(NamedThreadFactory("NetworkMapUpdateDBInsertThread")) - val hashesToFetch = (allHashesFromNetworkMap - currentNodeHashes) + val hashesToFetch = (allHashesFromNetworkMap - allNodeHashes) val networkMapDownloadStartTime = System.currentTimeMillis() if (hashesToFetch.isNotEmpty()) { val networkMapDownloadFutures = hashesToFetch.chunked(max(hashesToFetch.size / threadsToUseForNetworkMapDownload, 1)) @@ -207,7 +219,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, } }, executorToUseForDownloadingNodeInfos).thenAcceptAsync(Consumer { retrievedNodeInfos -> // Add new node info to the network map cache, these could be new node info or modification of node info for existing nodes. - networkMapCache.addNodes(retrievedNodeInfos) + networkMapCache.addOrUpdateNodes(retrievedNodeInfos) }, executorToUseForInsertionIntoDB) }.toTypedArray() //wait for all the futures to complete @@ -218,6 +230,10 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, executorToUseForInsertionIntoDB.shutdown() }.getOrThrow() } + // NOTE: We remove nodes after any new/updates because updated nodes will have a new hash and, therefore, any + // nodes that we can actually pull out of the cache (with the old hashes) should be a truly removed node. + nodeHashesToBeDeleted.mapNotNull { networkMapCache.getNodeByHash(it) }.forEach(networkMapCache::removeNode) + // Mark the network map cache as ready on a successful poll of the HTTP network map, even on the odd chance that // it's empty networkMapCache.nodeReady.set(null) diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt index a201880c16..877a7f2aed 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -159,7 +159,7 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory, } } - override fun addNodes(nodes: List) { + override fun addOrUpdateNodes(nodes: List) { synchronized(_changed) { val newNodes = mutableListOf() val updatedNodes = mutableListOf>() @@ -226,9 +226,9 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory, } } - override fun addNode(node: NodeInfo) { + override fun addOrUpdateNode(node: NodeInfo) { logger.info("Adding node with info: $node") - addNodes(listOf(node)) + addOrUpdateNodes(listOf(node)) logger.debug { "Done adding node with info: $node" } } @@ -305,7 +305,7 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory, private fun findByIdentityKeyIndex(session: Session, identityKeyIndex: String): List { val query = session.createQuery( - "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.owningKeyHash = :owningKeyHash", + "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n INNER JOIN n.legalIdentitiesAndCerts l WHERE l.owningKeyHash = :owningKeyHash", NodeInfoSchemaV1.PersistentNodeInfo::class.java) query.setParameter("owningKeyHash", identityKeyIndex) return query.resultList @@ -323,7 +323,7 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory, private fun queryIdentityByLegalName(session: Session, name: CordaX500Name): PartyAndCertificate? { val query = session.createQuery( // We do the JOIN here to restrict results to those present in the network map - "SELECT DISTINCT l FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name", + "SELECT DISTINCT l FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n INNER JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name", NodeInfoSchemaV1.DBPartyAndCertificate::class.java) query.setParameter("name", name.toString()) val candidates = query.resultList.map { it.toLegalIdentityAndCert() } @@ -333,7 +333,7 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory, private fun queryByLegalName(session: Session, name: CordaX500Name): List { val query = session.createQuery( - "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name", + "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n INNER JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name", NodeInfoSchemaV1.PersistentNodeInfo::class.java) query.setParameter("name", name.toString()) val result = query.resultList @@ -342,7 +342,7 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory, private fun queryByAddress(session: Session, hostAndPort: NetworkHostAndPort): NodeInfo? { val query = session.createQuery( - "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.addresses a WHERE a.host = :host AND a.port = :port", + "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n INNER JOIN n.addresses a WHERE a.host = :host AND a.port = :port", NodeInfoSchemaV1.PersistentNodeInfo::class.java) query.setParameter("host", hostAndPort.host) query.setParameter("port", hostAndPort.port) diff --git a/node/src/test/kotlin/net/corda/node/internal/NodeRestartTests.kt b/node/src/test/kotlin/net/corda/node/internal/NodeRestartTests.kt index 2da7661dd8..59d6a23d74 100644 --- a/node/src/test/kotlin/net/corda/node/internal/NodeRestartTests.kt +++ b/node/src/test/kotlin/net/corda/node/internal/NodeRestartTests.kt @@ -31,8 +31,8 @@ class NodeRestartTests { val alice = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) val bob = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME)) bob.registerInitiatedFlow(Responder::class.java) - alice.services.networkMapCache.addNode(bob.info) - bob.services.networkMapCache.addNode(alice.info) + alice.services.networkMapCache.addOrUpdateNode(bob.info) + bob.services.networkMapCache.addOrUpdateNode(alice.info) val alice2 = mockNet.restartNode(alice) val result = alice2.services.startFlow(Initiator(bob.info.singleIdentity())).resultFuture.getOrThrow() assertThat(result).isEqualTo(123) diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt index 0ba75cc799..b852a921db 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt @@ -37,7 +37,7 @@ class NetworkMapCacheTest { val bob = bobNode.info.singleIdentity() assertEquals(alice, bob) - aliceNode.services.networkMapCache.addNode(bobNode.info) + aliceNode.services.networkMapCache.addOrUpdateNode(bobNode.info) // The details of node B write over those for node A assertEquals(aliceNode.services.networkMapCache.getNodesByLegalIdentityKey(alice.owningKey).singleOrNull(), bobNode.info) } @@ -86,7 +86,7 @@ class NetworkMapCacheTest { assertNull(bobCache.getPeerByLegalName(ALICE_NAME)) assertThat(bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).isEmpty()) - bobCacheInternal.addNode(aliceNode.info) + bobCacheInternal.addOrUpdateNode(aliceNode.info) assertEquals(aliceNode.info.singleIdentity(), bobCache.getPeerByLegalName(ALICE_NAME)) assertEquals(aliceNode.info, bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).single()) @@ -113,7 +113,7 @@ class NetworkMapCacheTest { val aliceNode = mockNet.createPartyNode(ALICE_NAME) val aliceCache = aliceNode.services.networkMapCache val alicePartyAndCert2 = getTestPartyAndCertificate(ALICE_NAME, generateKeyPair().public) - aliceCache.addNode(aliceNode.info.copy(legalIdentitiesAndCerts = listOf(alicePartyAndCert2))) + aliceCache.addOrUpdateNode(aliceNode.info.copy(legalIdentitiesAndCerts = listOf(alicePartyAndCert2))) // This is correct behaviour as we may have distributed service nodes. assertEquals(2, aliceCache.getNodesByLegalName(ALICE_NAME).size) } diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt index a418b89cc9..48ac12a1f9 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt @@ -2,15 +2,26 @@ package net.corda.node.services.network import com.google.common.jimfs.Configuration.unix import com.google.common.jimfs.Jimfs -import com.nhaarman.mockito_kotlin.* +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.never +import com.nhaarman.mockito_kotlin.times +import com.nhaarman.mockito_kotlin.verify import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash import net.corda.core.crypto.generateKeyPair import net.corda.core.crypto.sign import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party -import net.corda.core.internal.* +import net.corda.core.internal.NODE_INFO_DIRECTORY +import net.corda.core.internal.NetworkParametersStorage +import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.delete +import net.corda.core.internal.div +import net.corda.core.internal.exists +import net.corda.core.internal.readObject +import net.corda.core.internal.sign import net.corda.core.messaging.ParametersUpdateInfo import net.corda.core.node.NetworkParameters import net.corda.core.node.NodeInfo @@ -28,10 +39,15 @@ import net.corda.nodeapi.internal.network.NodeInfoFilesCopier import net.corda.nodeapi.internal.network.SignedNetworkParameters import net.corda.nodeapi.internal.network.verifiedNetworkParametersCert import net.corda.testing.common.internal.testNetworkParameters -import net.corda.testing.core.* import net.corda.testing.internal.DEV_ROOT_CA import net.corda.testing.internal.TestNodeInfoBuilder import net.corda.testing.internal.createNodeInfoAndSigned +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.core.expect +import net.corda.testing.core.expectEvents +import net.corda.testing.core.sequence import net.corda.testing.node.internal.MockKeyManagementService import net.corda.testing.node.internal.MockPublicKeyToOwningIdentityCache import net.corda.testing.node.internal.network.NetworkMapServer @@ -39,7 +55,11 @@ import net.corda.testing.node.makeTestIdentityService import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.hamcrest.collection.IsIterableContainingInAnyOrder -import org.junit.* +import org.junit.After +import org.junit.Assert +import org.junit.Before +import org.junit.Rule +import org.junit.Test import rx.schedulers.TestScheduler import java.io.IOException import java.net.URL @@ -119,7 +139,7 @@ class NetworkMapUpdaterTest { //Test adding new node. networkMapClient.publish(signedNodeInfo1) //Not subscribed yet. - verify(networkMapCache, times(0)).addNode(any()) + verify(networkMapCache, times(0)).addOrUpdateNode(any()) startUpdater() networkMapClient.publish(signedNodeInfo2) @@ -195,13 +215,56 @@ class NetworkMapUpdaterTest { assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash) } + @Test(timeout=300_000) + fun `process remove, add, and update node from network map`() { + setUpdater() + val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("Info 1") + val (nodeInfo3, signedNodeInfo3) = createNodeInfoAndSigned("Info 3") + + val builder = TestNodeInfoBuilder() + builder.addLegalIdentity(CordaX500Name("Test", "London", "GB")) + val (nodeInfo2, signedNodeInfo2) = builder.buildWithSigned(1) + val (nodeInfo2_2, signedNodeInfo2_2) = builder.buildWithSigned(2) + + //Add all nodes. + networkMapClient.publish(signedNodeInfo1) + networkMapClient.publish(signedNodeInfo2) + + startUpdater() + advanceTime() + //TODO: Remove sleep in unit test. + Thread.sleep(2L * cacheExpiryMs) + + Assert.assertThat(networkMapCache.allNodeHashes, IsIterableContainingInAnyOrder.containsInAnyOrder( + signedNodeInfo1.raw.hash, + signedNodeInfo2.raw.hash + )) + + // remove one node, add another and update a third. + server.removeNodeInfo(nodeInfo1) + networkMapClient.publish(signedNodeInfo3) + networkMapClient.publish(signedNodeInfo2_2) + + advanceTime() + //TODO: Remove sleep in unit test. + Thread.sleep(2L * cacheExpiryMs) + verify(networkMapCache, times(1)).removeNode(nodeInfo1) + verify(networkMapCache, times(0)).removeNode(nodeInfo2) + verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(nodeInfo2_2)) + verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(nodeInfo3)) + assertThat(networkMapCache.allNodeHashes).hasSameElementsAs(listOf( + signedNodeInfo2_2.raw.hash, + signedNodeInfo3.raw.hash + )) + } + @Test(timeout=300_000) fun `receive node infos from directory, without a network map`() { setUpdater(netMapClient = null) val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file") //Not subscribed yet. - verify(networkMapCache, times(0)).addNode(any()) + verify(networkMapCache, times(0)).addOrUpdateNode(any()) startUpdater() @@ -209,8 +272,8 @@ class NetworkMapUpdaterTest { assertThat(nodeReadyFuture).isNotDone() advanceTime() - verify(networkMapCache, times(1)).addNode(any()) - verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo) + verify(networkMapCache, times(1)).addOrUpdateNode(any()) + verify(networkMapCache, times(1)).addOrUpdateNode(fileNodeInfoAndSigned.nodeInfo) assertThat(nodeReadyFuture).isDone() assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash) @@ -331,9 +394,9 @@ class NetworkMapUpdaterTest { NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned1) NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned2) advanceTime() - verify(networkMapCache, times(2)).addNode(any()) - verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned1.nodeInfo) - verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned2.nodeInfo) + verify(networkMapCache, times(2)).addOrUpdateNode(any()) + verify(networkMapCache, times(1)).addOrUpdateNode(fileNodeInfoAndSigned1.nodeInfo) + verify(networkMapCache, times(1)).addOrUpdateNode(fileNodeInfoAndSigned2.nodeInfo) assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(fileNodeInfoAndSigned1.signed.raw.hash, fileNodeInfoAndSigned2.signed.raw.hash) //Remove one of the nodes val fileName1 = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${fileNodeInfoAndSigned1.nodeInfo.legalIdentities[0].name.serialize().hash}" @@ -361,7 +424,7 @@ class NetworkMapUpdaterTest { networkMapClient.publish(serverSignedNodeInfo) startUpdater() advanceTime() - verify(networkMapCache, times(1)).addNode(localNodeInfo) + verify(networkMapCache, times(1)).addOrUpdateNode(localNodeInfo) Thread.sleep(2L * cacheExpiryMs) //Node from file has higher serial than the one from NetworkMapServer assertThat(networkMapCache.allNodeHashes).containsOnly(localSignedNodeInfo.signed.raw.hash) @@ -383,7 +446,7 @@ class NetworkMapUpdaterTest { val (myInfo, signedMyInfo) = createNodeInfoAndSigned("My node info") val (_, signedOtherInfo) = createNodeInfoAndSigned("Other info") setUpdater() - networkMapCache.addNode(myInfo) //Simulate behaviour on node startup when our node info is added to cache + networkMapCache.addOrUpdateNode(myInfo) //Simulate behaviour on node startup when our node info is added to cache networkMapClient.publish(signedOtherInfo) startUpdater(ourNodeInfo = signedMyInfo) Thread.sleep(2L * cacheExpiryMs) @@ -406,19 +469,22 @@ class NetworkMapUpdaterTest { //Test adding new node. networkMapClient.publish(signedNodeInfo1) //Not subscribed yet. - verify(networkMapCache, times(0)).addNode(any()) + verify(networkMapCache, times(0)).addOrUpdateNode(any()) startUpdater() //TODO: Remove sleep in unit test. Thread.sleep(2L * cacheExpiryMs) assert(networkMapCache.allNodeHashes.size == 1) + assert(networkMapCache.allNodeHashes.first() == signedNodeInfo1.raw.hash) + verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(signedNodeInfo1.verified())) networkMapClient.publish(signedNodeInfo2) Thread.sleep(2L * cacheExpiryMs) advanceTime() - verify(networkMapCache, times(1)).removeNode(signedNodeInfo1.verified()) + verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(signedNodeInfo1.verified())) assert(networkMapCache.allNodeHashes.size == 1) + assert(networkMapCache.allNodeHashes.first() == signedNodeInfo2.raw.hash) } @Test(timeout=300_000) @@ -459,15 +525,11 @@ class NetworkMapUpdaterTest { return mock { on { nodeReady }.thenReturn(nodeReadyFuture) val data = ConcurrentHashMap() - on { addNode(any()) }.then { + on { addOrUpdateNode(any()) }.then { val nodeInfo = it.arguments[0] as NodeInfo - val party = nodeInfo.legalIdentities[0] - data.compute(party) { _, current -> - if (current == null || current.serial < nodeInfo.serial) nodeInfo else current - } + addNodeToMockCache(nodeInfo, data) } - - on { addNodes(any>()) }.then { + on { addOrUpdateNodes(any()) }.then { @Suppress("UNCHECKED_CAST") val nodeInfos = it.arguments[0] as List nodeInfos.forEach { nodeInfo -> @@ -477,6 +539,7 @@ class NetworkMapUpdaterTest { on { removeNode(any()) }.then { data.remove((it.arguments[0] as NodeInfo).legalIdentities[0]) } on { getNodeByLegalIdentity(any()) }.then { data[it.arguments[0]] } + on { allNodes }.then { data.values.toList() } on { allNodeHashes }.then { data.values.map { it.serialize().hash } } on { getNodeByHash(any()) }.then { mock -> data.values.singleOrNull { it.serialize().hash == mock.arguments[0] } } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index 29425cab4e..eb986f88e0 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -352,8 +352,8 @@ open class InternalMockNetwork(cordappPackages: List = emptyList(), mockNet.nodes .mapNotNull { it.started } .forEach { existingNode -> - newNode.services.networkMapCache.addNode(existingNode.info) - existingNode.services.networkMapCache.addNode(newNode.info) + newNode.services.networkMapCache.addOrUpdateNode(existingNode.info) + existingNode.services.networkMapCache.addOrUpdateNode(newNode.info) } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/NodeBasedTest.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/NodeBasedTest.kt index 3c43c99a97..ccdb8d0a93 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/NodeBasedTest.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/NodeBasedTest.kt @@ -147,7 +147,7 @@ constructor(private val cordappPackages: List = emptyList(), private val val runningNodesInfo = runningNodes.map { it.info } for (node in runningNodes) for (nodeInfo in runningNodesInfo) { - node.services.networkMapCache.addNode(nodeInfo) + node.services.networkMapCache.addOrUpdateNode(nodeInfo) } } } From 55797612b29e0edacca01636abe1f08345d0dbc9 Mon Sep 17 00:00:00 2001 From: Ryan Fowler Date: Mon, 4 May 2020 15:02:33 +0100 Subject: [PATCH 2/4] ENT-5237: Remove DISTINCT (backport) --- .../corda/node/services/network/PersistentNetworkMapCache.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt index 877a7f2aed..f09d9aa8f6 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -323,9 +323,10 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory, private fun queryIdentityByLegalName(session: Session, name: CordaX500Name): PartyAndCertificate? { val query = session.createQuery( // We do the JOIN here to restrict results to those present in the network map - "SELECT DISTINCT l FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n INNER JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name", + "SELECT l FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n INNER JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name", NodeInfoSchemaV1.DBPartyAndCertificate::class.java) query.setParameter("name", name.toString()) + query.maxResults = 1 // instead of DISTINCT in the query, DISTINCT is not supported in Oracle when one of the columns is BLOB val candidates = query.resultList.map { it.toLegalIdentityAndCert() } // The map is restricted to holding a single identity for any X.500 name, so firstOrNull() is correct here. return candidates.firstOrNull() From b6dc1d8c4aba491ea5a53c3a65790c9a6e7ee29d Mon Sep 17 00:00:00 2001 From: Ryan Fowler Date: Tue, 5 May 2020 09:38:55 +0100 Subject: [PATCH 3/4] NOTICK - Reduce flow count for test to improve run time (#6213) --- .../kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt index 9ec17b4bf8..5c3f103377 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt @@ -54,8 +54,8 @@ import kotlin.test.currentStackTrace class RpcReconnectTests { companion object { - // 150 flows take ~5 minutes - const val NUMBER_OF_FLOWS_TO_RUN = 150 + // this many flows take ~5 minutes + const val NUMBER_OF_FLOWS_TO_RUN = 100 private val log = contextLogger() } From b43e781f274f7e404591cb1a554e7cc574d12e4e Mon Sep 17 00:00:00 2001 From: Ryan Fowler Date: Mon, 11 May 2020 15:38:23 +0100 Subject: [PATCH 4/4] ENT-5285, ENT-5296 Ignore ForkJoinPool.commonPool threads in `RPCStabilityTests` (#6205) * NOTICK Ignore JUnit time threads in `RPCStabilityTests` * NOTICK - Ignore ForkJoinPool.commonPool as it's not related to our test Co-authored-by: LankyDan --- .../kotlin/net/corda/client/rpc/RPCStabilityTests.kt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 6875840d04..e476e0e581 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -70,7 +70,11 @@ class RPCStabilityTests { private fun waitUntilNumberOfThreadsStable(executorService: ScheduledExecutorService): Map> { val values = ConcurrentLinkedQueue>>() return poll(executorService, "number of threads to become stable", 250.millis) { - values.add(Thread.getAllStackTraces().mapValues { it.value.toList() }) + // Exclude threads which we don't use for timing our tests + val map: Map> = Thread.getAllStackTraces() + .filterKeys { !it.name.contains("ForkJoinPool.commonPool") } + .mapValues { it.value.toList() } + values.add(map) if (values.size > 5) { values.poll() }