diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index d17ca7cae0..e9ffeb0dcf 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -188,6 +188,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like // a code smell. val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList()) + persistentNetworkMapCache.start() val (keyPairs, info) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair) val signedNodeInfo = signNodeInfo(info) { publicKey, serialised -> val privateKey = keyPairs.single { it.public == publicKey }.private @@ -217,7 +218,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // Do all of this in a database transaction so anything that might need a connection has one. val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database -> lh.obj(database) - val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries), identityService) + val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService) val (keyPairs, info) = initNodeInfo(networkMapCache, identity, identityKeyPair) lh.obj(info) identityService.loadIdentities(info.legalIdentitiesAndCerts) diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt index 8cb6bd68b5..4bc21ea475 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -22,17 +22,17 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.loggerFor import net.corda.node.services.api.NetworkMapCacheBaseInternal import net.corda.node.services.api.NetworkMapCacheInternal +import net.corda.node.utilities.NonInvalidatingCache +import net.corda.nodeapi.internal.network.NotaryInfo import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction -import net.corda.nodeapi.internal.network.NotaryInfo import org.hibernate.Session import rx.Observable import rx.subjects.PublishSubject import java.security.PublicKey import java.util.* import javax.annotation.concurrent.ThreadSafe -import kotlin.collections.HashMap import kotlin.collections.HashSet class NetworkMapCacheImpl( @@ -80,9 +80,6 @@ open class PersistentNetworkMapCache( private val logger = contextLogger() } - // TODO Cleanup registered and party nodes - protected val registeredNodes: MutableMap = Collections.synchronizedMap(HashMap()) - protected val partyNodes: MutableList get() = registeredNodes.map { it.value }.toMutableList() private val _changed = PublishSubject.create() // We use assignment here so that multiple subscribers share the same wrapped Observable. override val changed: Observable = _changed.wrapWithDatabaseTransaction() @@ -95,13 +92,25 @@ open class PersistentNetworkMapCache( private var _loadDBSuccess: Boolean = false override val loadDBSuccess get() = _loadDBSuccess + fun start(): PersistentNetworkMapCache { + // if we find any network map information in the db, we are good to go - if not + // we have to wait for some being added + synchronized(_changed) { + val allNodes = database.transaction { getAllInfos(session) } + if (allNodes.isNotEmpty()) { + _loadDBSuccess = true + } + allNodes.forEach { + changePublisher.onNext(MapChange.Added(it.toNodeInfo())) + } + _registrationFuture.set(null) + } + return this + } + override val notaryIdentities: List = notaries.map { it.identity } private val validatingNotaries = notaries.mapNotNullTo(HashSet()) { if (it.validating) it.identity else null } - init { - database.transaction { loadFromDB(session) } - } - override val allNodeHashes: List get() { return database.transaction { @@ -130,7 +139,7 @@ open class PersistentNetworkMapCache( override fun isValidatingNotary(party: Party): Boolean = party in validatingNotaries override fun getPartyInfo(party: Party): PartyInfo? { - val nodes = database.transaction { queryByIdentityKey(session, party.owningKey) } + val nodes = getNodesByLegalIdentityKey(party.owningKey) if (nodes.size == 1 && nodes[0].isLegalIdentity(party)) { return PartyInfo.SingleNode(party, nodes[0].addresses) } @@ -146,35 +155,36 @@ open class PersistentNetworkMapCache( override fun getNodeByLegalName(name: CordaX500Name): NodeInfo? = getNodesByLegalName(name).firstOrNull() override fun getNodesByLegalName(name: CordaX500Name): List = database.transaction { queryByLegalName(session, name) } - override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List = - database.transaction { queryByIdentityKey(session, identityKey) } + override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List = nodesByKeyCache[identityKey] + + private val nodesByKeyCache = NonInvalidatingCache>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } }) override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = database.transaction { queryByAddress(session, address) } - override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = database.transaction { queryIdentityByLegalName(session, name) } + override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = identityByLegalNameCache.get(name).orElse(null) + + private val identityByLegalNameCache = NonInvalidatingCache>(1024, 8, { name -> Optional.ofNullable(database.transaction { queryIdentityByLegalName(session, name) }) }) override fun track(): DataFeed, MapChange> { synchronized(_changed) { - return DataFeed(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction()) + val allInfos = database.transaction { getAllInfos(session) }.map { it.toNodeInfo() } + return DataFeed(allInfos, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction()) } } override fun addNode(node: NodeInfo) { logger.info("Adding node with info: $node") synchronized(_changed) { - registeredNodes[node.legalIdentities.first().owningKey]?.let { - if (it.serial > node.serial) { - logger.info("Discarding older nodeInfo for ${node.legalIdentities.first().name}") - return - } - } - val previousNode = registeredNodes.put(node.legalIdentities.first().owningKey, node) // TODO hack... we left the first one as special one + val previousNode = getNodesByLegalIdentityKey(node.legalIdentities.first().owningKey).firstOrNull() if (previousNode == null) { logger.info("No previous node found") database.transaction { updateInfoDB(node, session) changePublisher.onNext(MapChange.Added(node)) } + } else if (previousNode.serial > node.serial) { + logger.info("Discarding older nodeInfo for ${node.legalIdentities.first().name}") + return } else if (previousNode != node) { logger.info("Previous node was found as: $previousNode") database.transaction { @@ -193,7 +203,6 @@ open class PersistentNetworkMapCache( override fun removeNode(node: NodeInfo) { logger.info("Removing node with info: $node") synchronized(_changed) { - registeredNodes.remove(node.legalIdentities.first().owningKey) database.transaction { removeInfoDB(session, node) changePublisher.onNext(MapChange.Removed(node)) @@ -213,23 +222,6 @@ open class PersistentNetworkMapCache( return session.createQuery(criteria).resultList } - /** - * Load NetworkMap data from the database if present. Node can start without having NetworkMapService configured. - */ - private fun loadFromDB(session: Session) { - logger.info("Loading network map from database...") - val result = getAllInfos(session) - for (nodeInfo in result) { - try { - logger.info("Loaded node info: $nodeInfo") - val node = nodeInfo.toNodeInfo() - addNode(node) - } catch (e: Exception) { - logger.warn("Exception parsing network map from the database.", e) - } - } - } - private fun updateInfoDB(nodeInfo: NodeInfo, session: Session) { // TODO For now the main legal identity is left in NodeInfo, this should be set comparision/come up with index for NodeInfo? val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey) @@ -238,11 +230,17 @@ open class PersistentNetworkMapCache( nodeInfoEntry.id = info.first().id } session.merge(nodeInfoEntry) + // invalidate cache last - this way, we might serve up the wrong info for a short time, but it will get refreshed + // on the next load + invalidateCaches(nodeInfo) } private fun removeInfoDB(session: Session, nodeInfo: NodeInfo) { val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey).single() session.remove(info) + // invalidate cache last - this way, we might serve up the wrong info for a short time, but it will get refreshed + // on the next load + invalidateCaches(nodeInfo) } private fun findByIdentityKey(session: Session, identityKey: PublicKey): List { @@ -303,7 +301,19 @@ open class PersistentNetworkMapCache( ) } + /** We are caching data we get from the db - if we modify the db, they need to be cleared out*/ + private fun invalidateCaches(nodeInfo: NodeInfo) { + nodesByKeyCache.invalidateAll(nodeInfo.legalIdentities.map { it.owningKey }) + identityByLegalNameCache.invalidateAll(nodeInfo.legalIdentities.map { it.name }) + } + + private fun invalidateCaches() { + nodesByKeyCache.invalidateAll() + identityByLegalNameCache.invalidateAll() + } + override fun clearNetworkMapCache() { + invalidateCaches() database.transaction { val result = getAllInfos(session) for (nodeInfo in result) session.remove(nodeInfo) diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index 5ddeddb6eb..279df7ae4a 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -78,7 +78,7 @@ class ArtemisMessagingTests { } LogHelper.setLevel(PersistentUniquenessProvider::class) database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock()) - networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()), rigorousMock()) + networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()).start(), rigorousMock()) } @After diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt index 14f91e0178..eb95a985dd 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt @@ -1,16 +1,20 @@ package net.corda.node.services.network import net.corda.core.node.services.NetworkMapCache +import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.testing.ALICE_NAME import net.corda.testing.BOB_NAME import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockNodeParameters import net.corda.testing.singleIdentity +import net.corda.testing.singleIdentityAndCert import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Test import java.math.BigInteger import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull class NetworkMapCacheTest { private val mockNet = MockNetwork(emptyList()) @@ -62,6 +66,31 @@ class NetworkMapCacheTest { assertEquals(expected, actual) } + @Test + fun `caches get cleared on modification`() { + val aliceNode = mockNet.createPartyNode(ALICE_NAME) + val bobNode = mockNet.createPartyNode(BOB_NAME) + val bobCache: NetworkMapCache = bobNode.services.networkMapCache + val expected = aliceNode.info.singleIdentity() + + val actual = bobNode.database.transaction { bobCache.getPeerByLegalName(ALICE_NAME) } + assertEquals(expected, actual) + assertEquals(aliceNode.info, bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).single()) + + // remove alice + val bobCacheInternal = bobCache as NetworkMapCacheInternal + assertNotNull(bobCacheInternal) + bobCache.removeNode(aliceNode.info) + + assertNull(bobCache.getPeerByLegalName(ALICE_NAME)) + assertThat(bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).isEmpty()) + + bobCacheInternal.addNode(aliceNode.info) + + assertEquals(aliceNode.info.singleIdentity(), bobCache.getPeerByLegalName(ALICE_NAME)) + assertEquals(aliceNode.info, bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).single()) + } + @Test fun `remove node from cache`() { val aliceNode = mockNet.createPartyNode(ALICE_NAME)