CORDA-3662: Use an INNER JOIN for network map cache queries, (#6062)

- rename add or update function for clarity
- put removal of old nodes after retrieval of new ones to avoid gaps in the map
- plus add a test
This commit is contained in:
Ryan Fowler 2020-03-17 17:02:08 +00:00
parent 8eda8b744f
commit ebdd40049c
10 changed files with 146 additions and 66 deletions

View File

@ -50,7 +50,7 @@ class PersistentNetworkMapCacheTest {
@Test(timeout=300_000) @Test(timeout=300_000)
fun addNode() { fun addNode() {
val alice = createNodeInfo(listOf(ALICE)) val alice = createNodeInfo(listOf(ALICE))
charlieNetMapCache.addNode(alice) charlieNetMapCache.addOrUpdateNode(alice)
val fromDb = database.transaction { val fromDb = database.transaction {
session.createQuery( session.createQuery(
"from ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name}", "from ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name}",
@ -62,7 +62,7 @@ class PersistentNetworkMapCacheTest {
@Test(timeout=300_000) @Test(timeout=300_000)
fun `unknown legal name`() { fun `unknown legal name`() {
charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE))) charlieNetMapCache.addOrUpdateNode(createNodeInfo(listOf(ALICE)))
assertThat(charlieNetMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty() assertThat(charlieNetMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty()
assertThat(charlieNetMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull() assertThat(charlieNetMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull()
assertThat(charlieNetMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull() assertThat(charlieNetMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull()
@ -71,13 +71,13 @@ class PersistentNetworkMapCacheTest {
@Test(timeout=300_000) @Test(timeout=300_000)
fun `nodes in distributed service`() { fun `nodes in distributed service`() {
charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE))) charlieNetMapCache.addOrUpdateNode(createNodeInfo(listOf(ALICE)))
val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME) val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME)
val distServiceNodeInfos = (1..2).map { val distServiceNodeInfos = (1..2).map {
val nodeInfo = createNodeInfo(identities = listOf(TestIdentity.fresh("Org-$it"), distributedIdentity)) val nodeInfo = createNodeInfo(identities = listOf(TestIdentity.fresh("Org-$it"), distributedIdentity))
charlieNetMapCache.addNode(nodeInfo) charlieNetMapCache.addOrUpdateNode(nodeInfo)
nodeInfo nodeInfo
} }
@ -90,7 +90,7 @@ class PersistentNetworkMapCacheTest {
@Test(timeout=300_000) @Test(timeout=300_000)
fun `get nodes by owning key and by name`() { fun `get nodes by owning key and by name`() {
val alice = createNodeInfo(listOf(ALICE)) val alice = createNodeInfo(listOf(ALICE))
charlieNetMapCache.addNode(alice) charlieNetMapCache.addOrUpdateNode(alice)
assertThat(charlieNetMapCache.getNodesByLegalIdentityKey(ALICE.publicKey)).containsOnly(alice) assertThat(charlieNetMapCache.getNodesByLegalIdentityKey(ALICE.publicKey)).containsOnly(alice)
assertThat(charlieNetMapCache.getNodeByLegalName(ALICE.name)).isEqualTo(alice) assertThat(charlieNetMapCache.getNodeByLegalName(ALICE.name)).isEqualTo(alice)
} }
@ -98,31 +98,31 @@ class PersistentNetworkMapCacheTest {
@Test(timeout=300_000) @Test(timeout=300_000)
fun `get nodes by address`() { fun `get nodes by address`() {
val alice = createNodeInfo(listOf(ALICE)) val alice = createNodeInfo(listOf(ALICE))
charlieNetMapCache.addNode(alice) charlieNetMapCache.addOrUpdateNode(alice)
assertThat(charlieNetMapCache.getNodeByAddress(alice.addresses[0])).isEqualTo(alice) assertThat(charlieNetMapCache.getNodeByAddress(alice.addresses[0])).isEqualTo(alice)
} }
@Test(timeout=300_000) @Test(timeout=300_000)
fun `insert two node infos with the same host and port`() { fun `insert two node infos with the same host and port`() {
val alice = createNodeInfo(listOf(ALICE)) val alice = createNodeInfo(listOf(ALICE))
charlieNetMapCache.addNode(alice) charlieNetMapCache.addOrUpdateNode(alice)
val bob = createNodeInfo(listOf(BOB), address = alice.addresses[0]) val bob = createNodeInfo(listOf(BOB), address = alice.addresses[0])
charlieNetMapCache.addNode(bob) charlieNetMapCache.addOrUpdateNode(bob)
val nodeInfos = charlieNetMapCache.allNodes.filter { alice.addresses[0] in it.addresses } val nodeInfos = charlieNetMapCache.allNodes.filter { alice.addresses[0] in it.addresses }
assertThat(nodeInfos).hasSize(2) assertThat(nodeInfos).hasSize(2)
} }
@Test(timeout=300_000) @Test(timeout=300_000)
fun `negative test - attempt to insert invalid node info`() { fun `negative test - attempt to insert invalid node info`() {
charlieNetMapCache.addNode(createNodeInfo(listOf(LONG_PLC))) charlieNetMapCache.addOrUpdateNode(createNodeInfo(listOf(LONG_PLC)))
assertThat(charlieNetMapCache.allNodes).hasSize(0) assertThat(charlieNetMapCache.allNodes).hasSize(0)
} }
@Test(timeout=300_000) @Test(timeout=300_000)
fun `negative test - attempt to update existing node with invalid node info`() { fun `negative test - attempt to update existing node with invalid node info`() {
charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE))) charlieNetMapCache.addOrUpdateNode(createNodeInfo(listOf(ALICE)))
val aliceUpdate = TestIdentity(LONG_X500_NAME, ALICE.keyPair) val aliceUpdate = TestIdentity(LONG_X500_NAME, ALICE.keyPair)
charlieNetMapCache.addNode(createNodeInfo(listOf(aliceUpdate))) charlieNetMapCache.addOrUpdateNode(createNodeInfo(listOf(aliceUpdate)))
assertThat(charlieNetMapCache.allNodes).hasSize(1) assertThat(charlieNetMapCache.allNodes).hasSize(1)
assertThat(charlieNetMapCache.getNodeByLegalName(ALICE_NAME)).isNotNull assertThat(charlieNetMapCache.getNodeByLegalName(ALICE_NAME)).isNotNull
assertThat(charlieNetMapCache.getNodeByLegalName(LONG_X500_NAME)).isNull() assertThat(charlieNetMapCache.getNodeByLegalName(LONG_X500_NAME)).isNull()
@ -130,7 +130,7 @@ class PersistentNetworkMapCacheTest {
@Test(timeout=300_000) @Test(timeout=300_000)
fun `negative test - insert two valid node infos and one invalid one`() { fun `negative test - insert two valid node infos and one invalid one`() {
charlieNetMapCache.addNodes(listOf(createNodeInfo(listOf(ALICE)), charlieNetMapCache.addOrUpdateNodes(listOf(createNodeInfo(listOf(ALICE)),
createNodeInfo(listOf(BOB)), createNodeInfo(listOf(BOB)),
createNodeInfo(listOf(LONG_PLC)))) createNodeInfo(listOf(LONG_PLC))))
assertThat(charlieNetMapCache.allNodes).hasSize(2) assertThat(charlieNetMapCache.allNodes).hasSize(2)
@ -139,7 +139,7 @@ class PersistentNetworkMapCacheTest {
@Test(timeout=300_000) @Test(timeout=300_000)
fun `negative test - insert three valid node infos and two invalid ones`() { fun `negative test - insert three valid node infos and two invalid ones`() {
charlieNetMapCache.addNodes(listOf(createNodeInfo(listOf(LONG_PLC)), charlieNetMapCache.addOrUpdateNodes(listOf(createNodeInfo(listOf(LONG_PLC)),
createNodeInfo(listOf(ALICE)), createNodeInfo(listOf(ALICE)),
createNodeInfo(listOf(BOB)), createNodeInfo(listOf(BOB)),
createNodeInfo(listOf(CHARLIE)), createNodeInfo(listOf(CHARLIE)),
@ -150,9 +150,9 @@ class PersistentNetworkMapCacheTest {
@Test(timeout=300_000) @Test(timeout=300_000)
fun `negative test - insert one valid node info then attempt to add one invalid node info and update the existing valid nodeinfo`() { fun `negative test - insert one valid node info then attempt to add one invalid node info and update the existing valid nodeinfo`() {
charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE))) charlieNetMapCache.addOrUpdateNode(createNodeInfo(listOf(ALICE)))
val aliceUpdate = TestIdentity(LONG_X500_NAME, ALICE.keyPair) val aliceUpdate = TestIdentity(LONG_X500_NAME, ALICE.keyPair)
charlieNetMapCache.addNodes(listOf(createNodeInfo(listOf(aliceUpdate)), charlieNetMapCache.addOrUpdateNodes(listOf(createNodeInfo(listOf(aliceUpdate)),
createNodeInfo(listOf(LONGER_PLC)), createNodeInfo(listOf(BOB)))) createNodeInfo(listOf(LONGER_PLC)), createNodeInfo(listOf(BOB))))
assertThat(charlieNetMapCache.allNodes).hasSize(2) assertThat(charlieNetMapCache.allNodes).hasSize(2)
assertThat(charlieNetMapCache.getNodeByLegalName(ALICE_NAME)).isNotNull assertThat(charlieNetMapCache.getNodeByLegalName(ALICE_NAME)).isNotNull

View File

@ -611,7 +611,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
} else { } else {
log.info("Node-info has changed so submitting update. Old node-info was $nodeInfoFromDb") log.info("Node-info has changed so submitting update. Old node-info was $nodeInfoFromDb")
val newNodeInfo = potentialNodeInfo.copy(serial = platformClock.millis()) val newNodeInfo = potentialNodeInfo.copy(serial = platformClock.millis())
networkMapCache.addNode(newNodeInfo) networkMapCache.addOrUpdateNode(newNodeInfo)
log.info("New node-info: $newNodeInfo") log.info("New node-info: $newNodeInfo")
newNodeInfo newNodeInfo
} }

View File

@ -40,10 +40,11 @@ interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBase {
* This is used for Artemis bridge lookup process. */ * This is used for Artemis bridge lookup process. */
fun getNodesByOwningKeyIndex(identityKeyIndex: String): List<NodeInfo> fun getNodesByOwningKeyIndex(identityKeyIndex: String): List<NodeInfo>
/** Adds a node to the local cache (generally only used for adding ourselves). */ /** Adds (or updates) a node to the local cache (generally only used for adding ourselves). */
fun addNode(node: NodeInfo) fun addOrUpdateNode(node: NodeInfo)
fun addNodes(nodes: List<NodeInfo>) /** Adds (or updates) nodes to the local cache. */
fun addOrUpdateNodes(nodes: List<NodeInfo>)
/** Removes a node from the local cache. */ /** Removes a node from the local cache. */
fun removeNode(node: NodeInfo) fun removeNode(node: NodeInfo)

View File

@ -4,7 +4,13 @@ import com.google.common.util.concurrent.MoreExecutors
import net.corda.core.CordaRuntimeException import net.corda.core.CordaRuntimeException
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData import net.corda.core.crypto.SignedData
import net.corda.core.internal.* import net.corda.core.internal.NetworkParametersStorage
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.copyTo
import net.corda.core.internal.div
import net.corda.core.internal.exists
import net.corda.core.internal.readObject
import net.corda.core.internal.sign
import net.corda.core.messaging.DataFeed import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.ParametersUpdateInfo import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.node.AutoAcceptable import net.corda.core.node.AutoAcceptable
@ -20,7 +26,12 @@ import net.corda.node.services.config.NetworkParameterAcceptanceSettings
import net.corda.node.utilities.NamedThreadFactory import net.corda.node.utilities.NamedThreadFactory
import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException
import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.* import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.NetworkMap
import net.corda.nodeapi.internal.network.ParametersUpdate
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.nodeapi.internal.network.verifiedNetworkParametersCert
import rx.Subscription import rx.Subscription
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.lang.Integer.max import java.lang.Integer.max
@ -118,7 +129,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
.subscribe { .subscribe {
for (update in it) { for (update in it) {
when (update) { when (update) {
is NodeInfoUpdate.Add -> networkMapCache.addNode(update.nodeInfo) is NodeInfoUpdate.Add -> networkMapCache.addOrUpdateNode(update.nodeInfo)
is NodeInfoUpdate.Remove -> { is NodeInfoUpdate.Remove -> {
if (update.hash != ourNodeInfoHash) { if (update.hash != ourNodeInfoHash) {
val nodeInfo = networkMapCache.getNodeByHash(update.hash) val nodeInfo = networkMapCache.getNodeByHash(update.hash)
@ -177,11 +188,12 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
if (currentParametersHash != globalNetworkMap.networkParameterHash) { if (currentParametersHash != globalNetworkMap.networkParameterHash) {
exitOnParametersMismatch(globalNetworkMap) exitOnParametersMismatch(globalNetworkMap)
} }
val currentNodeHashes = networkMapCache.allNodeHashes // Calculate any nodes that are now gone and remove _only_ them from the cache
// Remove node info from network map. // NOTE: We won't remove them until after the add/update cycle as only then will we definitely know which nodes are no longer
(currentNodeHashes - allHashesFromNetworkMap - nodeInfoWatcher.processedNodeInfoHashes) // in the network
.mapNotNull { if (it != ourNodeInfoHash) networkMapCache.getNodeByHash(it) else null } val allNodeHashes = networkMapCache.allNodeHashes
.forEach(networkMapCache::removeNode) val nodeHashesToBeDeleted = (allNodeHashes - allHashesFromNetworkMap - nodeInfoWatcher.processedNodeInfoHashes)
.filter { it != ourNodeInfoHash }
//at the moment we use a blocking HTTP library - but under the covers, the OS will interleave threads waiting for IO //at the moment we use a blocking HTTP library - but under the covers, the OS will interleave threads waiting for IO
//as HTTP GET is mostly IO bound, use more threads than CPU's //as HTTP GET is mostly IO bound, use more threads than CPU's
//maximum threads to use = 24, as if we did not limit this on large machines it could result in 100's of concurrent requests //maximum threads to use = 24, as if we did not limit this on large machines it could result in 100's of concurrent requests
@ -189,7 +201,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
val executorToUseForDownloadingNodeInfos = Executors.newFixedThreadPool(threadsToUseForNetworkMapDownload, NamedThreadFactory("NetworkMapUpdaterNodeInfoDownloadThread")) val executorToUseForDownloadingNodeInfos = Executors.newFixedThreadPool(threadsToUseForNetworkMapDownload, NamedThreadFactory("NetworkMapUpdaterNodeInfoDownloadThread"))
//DB insert is single threaded - use a single threaded executor for it. //DB insert is single threaded - use a single threaded executor for it.
val executorToUseForInsertionIntoDB = Executors.newSingleThreadExecutor(NamedThreadFactory("NetworkMapUpdateDBInsertThread")) val executorToUseForInsertionIntoDB = Executors.newSingleThreadExecutor(NamedThreadFactory("NetworkMapUpdateDBInsertThread"))
val hashesToFetch = (allHashesFromNetworkMap - currentNodeHashes) val hashesToFetch = (allHashesFromNetworkMap - allNodeHashes)
val networkMapDownloadStartTime = System.currentTimeMillis() val networkMapDownloadStartTime = System.currentTimeMillis()
if (hashesToFetch.isNotEmpty()) { if (hashesToFetch.isNotEmpty()) {
val networkMapDownloadFutures = hashesToFetch.chunked(max(hashesToFetch.size / threadsToUseForNetworkMapDownload, 1)) val networkMapDownloadFutures = hashesToFetch.chunked(max(hashesToFetch.size / threadsToUseForNetworkMapDownload, 1))
@ -207,7 +219,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
} }
}, executorToUseForDownloadingNodeInfos).thenAcceptAsync(Consumer { retrievedNodeInfos -> }, executorToUseForDownloadingNodeInfos).thenAcceptAsync(Consumer { retrievedNodeInfos ->
// Add new node info to the network map cache, these could be new node info or modification of node info for existing nodes. // Add new node info to the network map cache, these could be new node info or modification of node info for existing nodes.
networkMapCache.addNodes(retrievedNodeInfos) networkMapCache.addOrUpdateNodes(retrievedNodeInfos)
}, executorToUseForInsertionIntoDB) }, executorToUseForInsertionIntoDB)
}.toTypedArray() }.toTypedArray()
//wait for all the futures to complete //wait for all the futures to complete
@ -218,6 +230,10 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
executorToUseForInsertionIntoDB.shutdown() executorToUseForInsertionIntoDB.shutdown()
}.getOrThrow() }.getOrThrow()
} }
// NOTE: We remove nodes after any new/updates because updated nodes will have a new hash and, therefore, any
// nodes that we can actually pull out of the cache (with the old hashes) should be a truly removed node.
nodeHashesToBeDeleted.mapNotNull { networkMapCache.getNodeByHash(it) }.forEach(networkMapCache::removeNode)
// Mark the network map cache as ready on a successful poll of the HTTP network map, even on the odd chance that // Mark the network map cache as ready on a successful poll of the HTTP network map, even on the odd chance that
// it's empty // it's empty
networkMapCache.nodeReady.set(null) networkMapCache.nodeReady.set(null)

View File

@ -159,7 +159,7 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
} }
} }
override fun addNodes(nodes: List<NodeInfo>) { override fun addOrUpdateNodes(nodes: List<NodeInfo>) {
synchronized(_changed) { synchronized(_changed) {
val newNodes = mutableListOf<NodeInfo>() val newNodes = mutableListOf<NodeInfo>()
val updatedNodes = mutableListOf<Pair<NodeInfo, NodeInfo>>() val updatedNodes = mutableListOf<Pair<NodeInfo, NodeInfo>>()
@ -226,9 +226,9 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
} }
} }
override fun addNode(node: NodeInfo) { override fun addOrUpdateNode(node: NodeInfo) {
logger.info("Adding node with info: $node") logger.info("Adding node with info: $node")
addNodes(listOf(node)) addOrUpdateNodes(listOf(node))
logger.debug { "Done adding node with info: $node" } logger.debug { "Done adding node with info: $node" }
} }
@ -305,7 +305,7 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
private fun findByIdentityKeyIndex(session: Session, identityKeyIndex: String): List<NodeInfoSchemaV1.PersistentNodeInfo> { private fun findByIdentityKeyIndex(session: Session, identityKeyIndex: String): List<NodeInfoSchemaV1.PersistentNodeInfo> {
val query = session.createQuery( val query = session.createQuery(
"SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.owningKeyHash = :owningKeyHash", "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n INNER JOIN n.legalIdentitiesAndCerts l WHERE l.owningKeyHash = :owningKeyHash",
NodeInfoSchemaV1.PersistentNodeInfo::class.java) NodeInfoSchemaV1.PersistentNodeInfo::class.java)
query.setParameter("owningKeyHash", identityKeyIndex) query.setParameter("owningKeyHash", identityKeyIndex)
return query.resultList return query.resultList
@ -323,7 +323,7 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
private fun queryIdentityByLegalName(session: Session, name: CordaX500Name): PartyAndCertificate? { private fun queryIdentityByLegalName(session: Session, name: CordaX500Name): PartyAndCertificate? {
val query = session.createQuery( val query = session.createQuery(
// We do the JOIN here to restrict results to those present in the network map // We do the JOIN here to restrict results to those present in the network map
"SELECT DISTINCT l FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name", "SELECT DISTINCT l FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n INNER JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name",
NodeInfoSchemaV1.DBPartyAndCertificate::class.java) NodeInfoSchemaV1.DBPartyAndCertificate::class.java)
query.setParameter("name", name.toString()) query.setParameter("name", name.toString())
val candidates = query.resultList.map { it.toLegalIdentityAndCert() } val candidates = query.resultList.map { it.toLegalIdentityAndCert() }
@ -333,7 +333,7 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
private fun queryByLegalName(session: Session, name: CordaX500Name): List<NodeInfo> { private fun queryByLegalName(session: Session, name: CordaX500Name): List<NodeInfo> {
val query = session.createQuery( val query = session.createQuery(
"SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name", "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n INNER JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name",
NodeInfoSchemaV1.PersistentNodeInfo::class.java) NodeInfoSchemaV1.PersistentNodeInfo::class.java)
query.setParameter("name", name.toString()) query.setParameter("name", name.toString())
val result = query.resultList val result = query.resultList
@ -342,7 +342,7 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
private fun queryByAddress(session: Session, hostAndPort: NetworkHostAndPort): NodeInfo? { private fun queryByAddress(session: Session, hostAndPort: NetworkHostAndPort): NodeInfo? {
val query = session.createQuery( val query = session.createQuery(
"SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.addresses a WHERE a.host = :host AND a.port = :port", "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n INNER JOIN n.addresses a WHERE a.host = :host AND a.port = :port",
NodeInfoSchemaV1.PersistentNodeInfo::class.java) NodeInfoSchemaV1.PersistentNodeInfo::class.java)
query.setParameter("host", hostAndPort.host) query.setParameter("host", hostAndPort.host)
query.setParameter("port", hostAndPort.port) query.setParameter("port", hostAndPort.port)

View File

@ -31,8 +31,8 @@ class NodeRestartTests {
val alice = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) val alice = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
val bob = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME)) val bob = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME))
bob.registerInitiatedFlow(Responder::class.java) bob.registerInitiatedFlow(Responder::class.java)
alice.services.networkMapCache.addNode(bob.info) alice.services.networkMapCache.addOrUpdateNode(bob.info)
bob.services.networkMapCache.addNode(alice.info) bob.services.networkMapCache.addOrUpdateNode(alice.info)
val alice2 = mockNet.restartNode(alice) val alice2 = mockNet.restartNode(alice)
val result = alice2.services.startFlow(Initiator(bob.info.singleIdentity())).resultFuture.getOrThrow() val result = alice2.services.startFlow(Initiator(bob.info.singleIdentity())).resultFuture.getOrThrow()
assertThat(result).isEqualTo(123) assertThat(result).isEqualTo(123)

View File

@ -37,7 +37,7 @@ class NetworkMapCacheTest {
val bob = bobNode.info.singleIdentity() val bob = bobNode.info.singleIdentity()
assertEquals(alice, bob) assertEquals(alice, bob)
aliceNode.services.networkMapCache.addNode(bobNode.info) aliceNode.services.networkMapCache.addOrUpdateNode(bobNode.info)
// The details of node B write over those for node A // The details of node B write over those for node A
assertEquals(aliceNode.services.networkMapCache.getNodesByLegalIdentityKey(alice.owningKey).singleOrNull(), bobNode.info) assertEquals(aliceNode.services.networkMapCache.getNodesByLegalIdentityKey(alice.owningKey).singleOrNull(), bobNode.info)
} }
@ -86,7 +86,7 @@ class NetworkMapCacheTest {
assertNull(bobCache.getPeerByLegalName(ALICE_NAME)) assertNull(bobCache.getPeerByLegalName(ALICE_NAME))
assertThat(bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).isEmpty()) assertThat(bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).isEmpty())
bobCacheInternal.addNode(aliceNode.info) bobCacheInternal.addOrUpdateNode(aliceNode.info)
assertEquals(aliceNode.info.singleIdentity(), bobCache.getPeerByLegalName(ALICE_NAME)) assertEquals(aliceNode.info.singleIdentity(), bobCache.getPeerByLegalName(ALICE_NAME))
assertEquals(aliceNode.info, bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).single()) assertEquals(aliceNode.info, bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).single())
@ -113,7 +113,7 @@ class NetworkMapCacheTest {
val aliceNode = mockNet.createPartyNode(ALICE_NAME) val aliceNode = mockNet.createPartyNode(ALICE_NAME)
val aliceCache = aliceNode.services.networkMapCache val aliceCache = aliceNode.services.networkMapCache
val alicePartyAndCert2 = getTestPartyAndCertificate(ALICE_NAME, generateKeyPair().public) val alicePartyAndCert2 = getTestPartyAndCertificate(ALICE_NAME, generateKeyPair().public)
aliceCache.addNode(aliceNode.info.copy(legalIdentitiesAndCerts = listOf(alicePartyAndCert2))) aliceCache.addOrUpdateNode(aliceNode.info.copy(legalIdentitiesAndCerts = listOf(alicePartyAndCert2)))
// This is correct behaviour as we may have distributed service nodes. // This is correct behaviour as we may have distributed service nodes.
assertEquals(2, aliceCache.getNodesByLegalName(ALICE_NAME).size) assertEquals(2, aliceCache.getNodesByLegalName(ALICE_NAME).size)
} }

View File

@ -2,15 +2,26 @@ package net.corda.node.services.network
import com.google.common.jimfs.Configuration.unix import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs import com.google.common.jimfs.Jimfs
import com.nhaarman.mockito_kotlin.* import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.never
import com.nhaarman.mockito_kotlin.times
import com.nhaarman.mockito_kotlin.verify
import net.corda.core.crypto.Crypto import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.generateKeyPair import net.corda.core.crypto.generateKeyPair
import net.corda.core.crypto.sign import net.corda.core.crypto.sign
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.* import net.corda.core.internal.NODE_INFO_DIRECTORY
import net.corda.core.internal.NetworkParametersStorage
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.delete
import net.corda.core.internal.div
import net.corda.core.internal.exists
import net.corda.core.internal.readObject
import net.corda.core.internal.sign
import net.corda.core.messaging.ParametersUpdateInfo import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.node.NetworkParameters import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
@ -28,10 +39,15 @@ import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
import net.corda.nodeapi.internal.network.SignedNetworkParameters import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.nodeapi.internal.network.verifiedNetworkParametersCert import net.corda.nodeapi.internal.network.verifiedNetworkParametersCert
import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.*
import net.corda.testing.internal.DEV_ROOT_CA import net.corda.testing.internal.DEV_ROOT_CA
import net.corda.testing.internal.TestNodeInfoBuilder import net.corda.testing.internal.TestNodeInfoBuilder
import net.corda.testing.internal.createNodeInfoAndSigned import net.corda.testing.internal.createNodeInfoAndSigned
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.expect
import net.corda.testing.core.expectEvents
import net.corda.testing.core.sequence
import net.corda.testing.node.internal.MockKeyManagementService import net.corda.testing.node.internal.MockKeyManagementService
import net.corda.testing.node.internal.MockPublicKeyToOwningIdentityCache import net.corda.testing.node.internal.MockPublicKeyToOwningIdentityCache
import net.corda.testing.node.internal.network.NetworkMapServer import net.corda.testing.node.internal.network.NetworkMapServer
@ -39,7 +55,11 @@ import net.corda.testing.node.makeTestIdentityService
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.Assertions.assertThatThrownBy
import org.hamcrest.collection.IsIterableContainingInAnyOrder import org.hamcrest.collection.IsIterableContainingInAnyOrder
import org.junit.* import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import rx.schedulers.TestScheduler import rx.schedulers.TestScheduler
import java.io.IOException import java.io.IOException
import java.net.URL import java.net.URL
@ -119,7 +139,7 @@ class NetworkMapUpdaterTest {
//Test adding new node. //Test adding new node.
networkMapClient.publish(signedNodeInfo1) networkMapClient.publish(signedNodeInfo1)
//Not subscribed yet. //Not subscribed yet.
verify(networkMapCache, times(0)).addNode(any()) verify(networkMapCache, times(0)).addOrUpdateNode(any())
startUpdater() startUpdater()
networkMapClient.publish(signedNodeInfo2) networkMapClient.publish(signedNodeInfo2)
@ -195,13 +215,56 @@ class NetworkMapUpdaterTest {
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash) assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash)
} }
@Test(timeout=300_000)
fun `process remove, add, and update node from network map`() {
setUpdater()
val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("Info 1")
val (nodeInfo3, signedNodeInfo3) = createNodeInfoAndSigned("Info 3")
val builder = TestNodeInfoBuilder()
builder.addLegalIdentity(CordaX500Name("Test", "London", "GB"))
val (nodeInfo2, signedNodeInfo2) = builder.buildWithSigned(1)
val (nodeInfo2_2, signedNodeInfo2_2) = builder.buildWithSigned(2)
//Add all nodes.
networkMapClient.publish(signedNodeInfo1)
networkMapClient.publish(signedNodeInfo2)
startUpdater()
advanceTime()
//TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
Assert.assertThat(networkMapCache.allNodeHashes, IsIterableContainingInAnyOrder.containsInAnyOrder(
signedNodeInfo1.raw.hash,
signedNodeInfo2.raw.hash
))
// remove one node, add another and update a third.
server.removeNodeInfo(nodeInfo1)
networkMapClient.publish(signedNodeInfo3)
networkMapClient.publish(signedNodeInfo2_2)
advanceTime()
//TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapCache, times(1)).removeNode(nodeInfo1)
verify(networkMapCache, times(0)).removeNode(nodeInfo2)
verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(nodeInfo2_2))
verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(nodeInfo3))
assertThat(networkMapCache.allNodeHashes).hasSameElementsAs(listOf(
signedNodeInfo2_2.raw.hash,
signedNodeInfo3.raw.hash
))
}
@Test(timeout=300_000) @Test(timeout=300_000)
fun `receive node infos from directory, without a network map`() { fun `receive node infos from directory, without a network map`() {
setUpdater(netMapClient = null) setUpdater(netMapClient = null)
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file") val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
//Not subscribed yet. //Not subscribed yet.
verify(networkMapCache, times(0)).addNode(any()) verify(networkMapCache, times(0)).addOrUpdateNode(any())
startUpdater() startUpdater()
@ -209,8 +272,8 @@ class NetworkMapUpdaterTest {
assertThat(nodeReadyFuture).isNotDone() assertThat(nodeReadyFuture).isNotDone()
advanceTime() advanceTime()
verify(networkMapCache, times(1)).addNode(any()) verify(networkMapCache, times(1)).addOrUpdateNode(any())
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo) verify(networkMapCache, times(1)).addOrUpdateNode(fileNodeInfoAndSigned.nodeInfo)
assertThat(nodeReadyFuture).isDone() assertThat(nodeReadyFuture).isDone()
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash) assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash)
@ -331,9 +394,9 @@ class NetworkMapUpdaterTest {
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned1) NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned1)
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned2) NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned2)
advanceTime() advanceTime()
verify(networkMapCache, times(2)).addNode(any()) verify(networkMapCache, times(2)).addOrUpdateNode(any())
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned1.nodeInfo) verify(networkMapCache, times(1)).addOrUpdateNode(fileNodeInfoAndSigned1.nodeInfo)
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned2.nodeInfo) verify(networkMapCache, times(1)).addOrUpdateNode(fileNodeInfoAndSigned2.nodeInfo)
assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(fileNodeInfoAndSigned1.signed.raw.hash, fileNodeInfoAndSigned2.signed.raw.hash) assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(fileNodeInfoAndSigned1.signed.raw.hash, fileNodeInfoAndSigned2.signed.raw.hash)
//Remove one of the nodes //Remove one of the nodes
val fileName1 = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${fileNodeInfoAndSigned1.nodeInfo.legalIdentities[0].name.serialize().hash}" val fileName1 = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${fileNodeInfoAndSigned1.nodeInfo.legalIdentities[0].name.serialize().hash}"
@ -361,7 +424,7 @@ class NetworkMapUpdaterTest {
networkMapClient.publish(serverSignedNodeInfo) networkMapClient.publish(serverSignedNodeInfo)
startUpdater() startUpdater()
advanceTime() advanceTime()
verify(networkMapCache, times(1)).addNode(localNodeInfo) verify(networkMapCache, times(1)).addOrUpdateNode(localNodeInfo)
Thread.sleep(2L * cacheExpiryMs) Thread.sleep(2L * cacheExpiryMs)
//Node from file has higher serial than the one from NetworkMapServer //Node from file has higher serial than the one from NetworkMapServer
assertThat(networkMapCache.allNodeHashes).containsOnly(localSignedNodeInfo.signed.raw.hash) assertThat(networkMapCache.allNodeHashes).containsOnly(localSignedNodeInfo.signed.raw.hash)
@ -383,7 +446,7 @@ class NetworkMapUpdaterTest {
val (myInfo, signedMyInfo) = createNodeInfoAndSigned("My node info") val (myInfo, signedMyInfo) = createNodeInfoAndSigned("My node info")
val (_, signedOtherInfo) = createNodeInfoAndSigned("Other info") val (_, signedOtherInfo) = createNodeInfoAndSigned("Other info")
setUpdater() setUpdater()
networkMapCache.addNode(myInfo) //Simulate behaviour on node startup when our node info is added to cache networkMapCache.addOrUpdateNode(myInfo) //Simulate behaviour on node startup when our node info is added to cache
networkMapClient.publish(signedOtherInfo) networkMapClient.publish(signedOtherInfo)
startUpdater(ourNodeInfo = signedMyInfo) startUpdater(ourNodeInfo = signedMyInfo)
Thread.sleep(2L * cacheExpiryMs) Thread.sleep(2L * cacheExpiryMs)
@ -406,19 +469,22 @@ class NetworkMapUpdaterTest {
//Test adding new node. //Test adding new node.
networkMapClient.publish(signedNodeInfo1) networkMapClient.publish(signedNodeInfo1)
//Not subscribed yet. //Not subscribed yet.
verify(networkMapCache, times(0)).addNode(any()) verify(networkMapCache, times(0)).addOrUpdateNode(any())
startUpdater() startUpdater()
//TODO: Remove sleep in unit test. //TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs) Thread.sleep(2L * cacheExpiryMs)
assert(networkMapCache.allNodeHashes.size == 1) assert(networkMapCache.allNodeHashes.size == 1)
assert(networkMapCache.allNodeHashes.first() == signedNodeInfo1.raw.hash)
verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(signedNodeInfo1.verified()))
networkMapClient.publish(signedNodeInfo2) networkMapClient.publish(signedNodeInfo2)
Thread.sleep(2L * cacheExpiryMs) Thread.sleep(2L * cacheExpiryMs)
advanceTime() advanceTime()
verify(networkMapCache, times(1)).removeNode(signedNodeInfo1.verified()) verify(networkMapCache, times(1)).addOrUpdateNodes(listOf(signedNodeInfo1.verified()))
assert(networkMapCache.allNodeHashes.size == 1) assert(networkMapCache.allNodeHashes.size == 1)
assert(networkMapCache.allNodeHashes.first() == signedNodeInfo2.raw.hash)
} }
@Test(timeout=300_000) @Test(timeout=300_000)
@ -459,15 +525,11 @@ class NetworkMapUpdaterTest {
return mock { return mock {
on { nodeReady }.thenReturn(nodeReadyFuture) on { nodeReady }.thenReturn(nodeReadyFuture)
val data = ConcurrentHashMap<Party, NodeInfo>() val data = ConcurrentHashMap<Party, NodeInfo>()
on { addNode(any()) }.then { on { addOrUpdateNode(any()) }.then {
val nodeInfo = it.arguments[0] as NodeInfo val nodeInfo = it.arguments[0] as NodeInfo
val party = nodeInfo.legalIdentities[0] addNodeToMockCache(nodeInfo, data)
data.compute(party) { _, current ->
if (current == null || current.serial < nodeInfo.serial) nodeInfo else current
}
} }
on { addOrUpdateNodes(any()) }.then {
on { addNodes(any<List<NodeInfo>>()) }.then {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
val nodeInfos = it.arguments[0] as List<NodeInfo> val nodeInfos = it.arguments[0] as List<NodeInfo>
nodeInfos.forEach { nodeInfo -> nodeInfos.forEach { nodeInfo ->
@ -477,6 +539,7 @@ class NetworkMapUpdaterTest {
on { removeNode(any()) }.then { data.remove((it.arguments[0] as NodeInfo).legalIdentities[0]) } on { removeNode(any()) }.then { data.remove((it.arguments[0] as NodeInfo).legalIdentities[0]) }
on { getNodeByLegalIdentity(any()) }.then { data[it.arguments[0]] } on { getNodeByLegalIdentity(any()) }.then { data[it.arguments[0]] }
on { allNodes }.then { data.values.toList() }
on { allNodeHashes }.then { data.values.map { it.serialize().hash } } on { allNodeHashes }.then { data.values.map { it.serialize().hash } }
on { getNodeByHash(any()) }.then { mock -> data.values.singleOrNull { it.serialize().hash == mock.arguments[0] } } on { getNodeByHash(any()) }.then { mock -> data.values.singleOrNull { it.serialize().hash == mock.arguments[0] } }
} }

View File

@ -352,8 +352,8 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
mockNet.nodes mockNet.nodes
.mapNotNull { it.started } .mapNotNull { it.started }
.forEach { existingNode -> .forEach { existingNode ->
newNode.services.networkMapCache.addNode(existingNode.info) newNode.services.networkMapCache.addOrUpdateNode(existingNode.info)
existingNode.services.networkMapCache.addNode(newNode.info) existingNode.services.networkMapCache.addOrUpdateNode(newNode.info)
} }
} }

View File

@ -147,7 +147,7 @@ constructor(private val cordappPackages: List<String> = emptyList(), private val
val runningNodesInfo = runningNodes.map { it.info } val runningNodesInfo = runningNodes.map { it.info }
for (node in runningNodes) for (node in runningNodes)
for (nodeInfo in runningNodesInfo) { for (nodeInfo in runningNodesInfo) {
node.services.networkMapCache.addNode(nodeInfo) node.services.networkMapCache.addOrUpdateNode(nodeInfo)
} }
} }
} }