CORDA-928 Caching in the NetworkMapCache (#2358)

* CORDA-928 cache query results via `getPeerByLegalName` and `getNodesByIndentityKey` to avoid hitting the DB hard in RPC handling.

* Skip cache invalidation during init() - caches are still null.

* Remove registeredNodes/partyNodes caching of data feed.
Rewrite data feed to be initialised off the DB.
Add start method to trigger readyness/artemis listeners if there are nodes in the DB.

* Invalidate cache last rather than first when updating
This commit is contained in:
Christian Sailer 2018-01-16 21:15:15 +00:00 committed by GitHub
parent d247e643ae
commit fca0afe591
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 81 additions and 41 deletions

View File

@ -188,6 +188,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like
// a code smell.
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
persistentNetworkMapCache.start()
val (keyPairs, info) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
val signedNodeInfo = signNodeInfo(info) { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private
@ -217,7 +218,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// Do all of this in a database transaction so anything that might need a connection has one.
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
lh.obj(database)
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries), identityService)
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService)
val (keyPairs, info) = initNodeInfo(networkMapCache, identity, identityKeyPair)
lh.obj(info)
identityService.loadIdentities(info.legalIdentitiesAndCerts)

View File

@ -22,17 +22,17 @@ import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.NonInvalidatingCache
import net.corda.nodeapi.internal.network.NotaryInfo
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import net.corda.nodeapi.internal.network.NotaryInfo
import org.hibernate.Session
import rx.Observable
import rx.subjects.PublishSubject
import java.security.PublicKey
import java.util.*
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.HashMap
import kotlin.collections.HashSet
class NetworkMapCacheImpl(
@ -80,9 +80,6 @@ open class PersistentNetworkMapCache(
private val logger = contextLogger()
}
// TODO Cleanup registered and party nodes
protected val registeredNodes: MutableMap<PublicKey, NodeInfo> = Collections.synchronizedMap(HashMap())
protected val partyNodes: MutableList<NodeInfo> get() = registeredNodes.map { it.value }.toMutableList()
private val _changed = PublishSubject.create<MapChange>()
// We use assignment here so that multiple subscribers share the same wrapped Observable.
override val changed: Observable<MapChange> = _changed.wrapWithDatabaseTransaction()
@ -95,13 +92,25 @@ open class PersistentNetworkMapCache(
private var _loadDBSuccess: Boolean = false
override val loadDBSuccess get() = _loadDBSuccess
fun start(): PersistentNetworkMapCache {
// if we find any network map information in the db, we are good to go - if not
// we have to wait for some being added
synchronized(_changed) {
val allNodes = database.transaction { getAllInfos(session) }
if (allNodes.isNotEmpty()) {
_loadDBSuccess = true
}
allNodes.forEach {
changePublisher.onNext(MapChange.Added(it.toNodeInfo()))
}
_registrationFuture.set(null)
}
return this
}
override val notaryIdentities: List<Party> = notaries.map { it.identity }
private val validatingNotaries = notaries.mapNotNullTo(HashSet()) { if (it.validating) it.identity else null }
init {
database.transaction { loadFromDB(session) }
}
override val allNodeHashes: List<SecureHash>
get() {
return database.transaction {
@ -130,7 +139,7 @@ open class PersistentNetworkMapCache(
override fun isValidatingNotary(party: Party): Boolean = party in validatingNotaries
override fun getPartyInfo(party: Party): PartyInfo? {
val nodes = database.transaction { queryByIdentityKey(session, party.owningKey) }
val nodes = getNodesByLegalIdentityKey(party.owningKey)
if (nodes.size == 1 && nodes[0].isLegalIdentity(party)) {
return PartyInfo.SingleNode(party, nodes[0].addresses)
}
@ -146,35 +155,36 @@ open class PersistentNetworkMapCache(
override fun getNodeByLegalName(name: CordaX500Name): NodeInfo? = getNodesByLegalName(name).firstOrNull()
override fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo> = database.transaction { queryByLegalName(session, name) }
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> =
database.transaction { queryByIdentityKey(session, identityKey) }
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey]
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } })
override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = database.transaction { queryByAddress(session, address) }
override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = database.transaction { queryIdentityByLegalName(session, name) }
override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = identityByLegalNameCache.get(name).orElse(null)
private val identityByLegalNameCache = NonInvalidatingCache<CordaX500Name, Optional<PartyAndCertificate>>(1024, 8, { name -> Optional.ofNullable(database.transaction { queryIdentityByLegalName(session, name) }) })
override fun track(): DataFeed<List<NodeInfo>, MapChange> {
synchronized(_changed) {
return DataFeed(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction())
val allInfos = database.transaction { getAllInfos(session) }.map { it.toNodeInfo() }
return DataFeed(allInfos, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction())
}
}
override fun addNode(node: NodeInfo) {
logger.info("Adding node with info: $node")
synchronized(_changed) {
registeredNodes[node.legalIdentities.first().owningKey]?.let {
if (it.serial > node.serial) {
logger.info("Discarding older nodeInfo for ${node.legalIdentities.first().name}")
return
}
}
val previousNode = registeredNodes.put(node.legalIdentities.first().owningKey, node) // TODO hack... we left the first one as special one
val previousNode = getNodesByLegalIdentityKey(node.legalIdentities.first().owningKey).firstOrNull()
if (previousNode == null) {
logger.info("No previous node found")
database.transaction {
updateInfoDB(node, session)
changePublisher.onNext(MapChange.Added(node))
}
} else if (previousNode.serial > node.serial) {
logger.info("Discarding older nodeInfo for ${node.legalIdentities.first().name}")
return
} else if (previousNode != node) {
logger.info("Previous node was found as: $previousNode")
database.transaction {
@ -193,7 +203,6 @@ open class PersistentNetworkMapCache(
override fun removeNode(node: NodeInfo) {
logger.info("Removing node with info: $node")
synchronized(_changed) {
registeredNodes.remove(node.legalIdentities.first().owningKey)
database.transaction {
removeInfoDB(session, node)
changePublisher.onNext(MapChange.Removed(node))
@ -213,23 +222,6 @@ open class PersistentNetworkMapCache(
return session.createQuery(criteria).resultList
}
/**
* Load NetworkMap data from the database if present. Node can start without having NetworkMapService configured.
*/
private fun loadFromDB(session: Session) {
logger.info("Loading network map from database...")
val result = getAllInfos(session)
for (nodeInfo in result) {
try {
logger.info("Loaded node info: $nodeInfo")
val node = nodeInfo.toNodeInfo()
addNode(node)
} catch (e: Exception) {
logger.warn("Exception parsing network map from the database.", e)
}
}
}
private fun updateInfoDB(nodeInfo: NodeInfo, session: Session) {
// TODO For now the main legal identity is left in NodeInfo, this should be set comparision/come up with index for NodeInfo?
val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey)
@ -238,11 +230,17 @@ open class PersistentNetworkMapCache(
nodeInfoEntry.id = info.first().id
}
session.merge(nodeInfoEntry)
// invalidate cache last - this way, we might serve up the wrong info for a short time, but it will get refreshed
// on the next load
invalidateCaches(nodeInfo)
}
private fun removeInfoDB(session: Session, nodeInfo: NodeInfo) {
val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey).single()
session.remove(info)
// invalidate cache last - this way, we might serve up the wrong info for a short time, but it will get refreshed
// on the next load
invalidateCaches(nodeInfo)
}
private fun findByIdentityKey(session: Session, identityKey: PublicKey): List<NodeInfoSchemaV1.PersistentNodeInfo> {
@ -303,7 +301,19 @@ open class PersistentNetworkMapCache(
)
}
/** We are caching data we get from the db - if we modify the db, they need to be cleared out*/
private fun invalidateCaches(nodeInfo: NodeInfo) {
nodesByKeyCache.invalidateAll(nodeInfo.legalIdentities.map { it.owningKey })
identityByLegalNameCache.invalidateAll(nodeInfo.legalIdentities.map { it.name })
}
private fun invalidateCaches() {
nodesByKeyCache.invalidateAll()
identityByLegalNameCache.invalidateAll()
}
override fun clearNetworkMapCache() {
invalidateCaches()
database.transaction {
val result = getAllInfos(session)
for (nodeInfo in result) session.remove(nodeInfo)

View File

@ -78,7 +78,7 @@ class ArtemisMessagingTests {
}
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock())
networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()), rigorousMock())
networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()).start(), rigorousMock())
}
@After

View File

@ -1,16 +1,20 @@
package net.corda.node.services.network
import net.corda.core.node.services.NetworkMapCache
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.testing.ALICE_NAME
import net.corda.testing.BOB_NAME
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNodeParameters
import net.corda.testing.singleIdentity
import net.corda.testing.singleIdentityAndCert
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Test
import java.math.BigInteger
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
class NetworkMapCacheTest {
private val mockNet = MockNetwork(emptyList())
@ -62,6 +66,31 @@ class NetworkMapCacheTest {
assertEquals(expected, actual)
}
@Test
fun `caches get cleared on modification`() {
val aliceNode = mockNet.createPartyNode(ALICE_NAME)
val bobNode = mockNet.createPartyNode(BOB_NAME)
val bobCache: NetworkMapCache = bobNode.services.networkMapCache
val expected = aliceNode.info.singleIdentity()
val actual = bobNode.database.transaction { bobCache.getPeerByLegalName(ALICE_NAME) }
assertEquals(expected, actual)
assertEquals(aliceNode.info, bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).single())
// remove alice
val bobCacheInternal = bobCache as NetworkMapCacheInternal
assertNotNull(bobCacheInternal)
bobCache.removeNode(aliceNode.info)
assertNull(bobCache.getPeerByLegalName(ALICE_NAME))
assertThat(bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).isEmpty())
bobCacheInternal.addNode(aliceNode.info)
assertEquals(aliceNode.info.singleIdentity(), bobCache.getPeerByLegalName(ALICE_NAME))
assertEquals(aliceNode.info, bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).single())
}
@Test
fun `remove node from cache`() {
val aliceNode = mockNet.createPartyNode(ALICE_NAME)