diff --git a/docs/source/network-map.rst b/docs/source/network-map.rst index f6eca336f4..160f4c0401 100644 --- a/docs/source/network-map.rst +++ b/docs/source/network-map.rst @@ -2,7 +2,7 @@ Network Map =========== The network map is a collection of signed ``NodeInfo`` objects (signed by the node it represents and thus tamper-proof) -forming the set of reachable nodes in a compatbility zone. A node can receive these objects from two sources: +forming the set of reachable nodes in a compatibility zone. A node can receive these objects from two sources: 1. The HTTP network map service if the ``compatibilityZoneURL`` config key is specified. 2. The ``additional-node-infos`` directory within the node's directory. @@ -13,7 +13,7 @@ HTTP network map service If the node is configured with the ``compatibilityZoneURL`` config then it first uploads its own signed ``NodeInfo`` to the server (and each time it changes on startup) and then proceeds to download the entire network map. The network map consists of a list of ``NodeInfo`` hashes. The node periodically polls for the network map (based on the HTTP cache expiry -header) and any new hash entries are downloaded and cached. Entries which no longer exist are deleted from the node's cache. +header) and any new entries are downloaded and cached. Entries which no longer exist are deleted from the node's cache. The set of REST end-points for the network map service are as follows. @@ -35,7 +35,7 @@ The ``additional-node-infos`` directory Alongside the HTTP network map service, or as a replacement if the node isn't connected to one, the node polls the contents of the ``additional-node-infos`` directory located in its base directory. Each file is expected to be the same -signed ``NodeInfo`` object that the network map service vends. These are automtically added to the node's cache and can +signed ``NodeInfo`` object that the network map service vends. These are automatically added to the node's cache and can be used to supplement or replace the HTTP network map. If the same node is advertised through both mechanisms then the latest one is taken. @@ -46,7 +46,7 @@ of every node that's part of this network. Network parameters ------------------ -Network parameters are a set of values that every node participating in the network needs to agree on and use to +Network parameters are a set of values that every node participating in the zone needs to agree on and use to correctly interoperate with each other. If the node is using the HTTP network map service then on first startup it will download the signed network parameters, cache it in a ``network-parameters`` file and apply them on the node. @@ -54,7 +54,7 @@ download the signed network parameters, cache it in a ``network-parameters`` fil then the node will automatically shutdown. Resolution to this is to delete the incorrect file and restart the node so that the parameters can be downloaded again. -.. note:: A future release will support the notion of network parameters changes. +.. note:: A future release will support the notion of phased rollout of network parameter changes. If the node isn't using a HTTP network map service then it's expected the signed file is provided by some other means. For such a scenario there is the network bootstrapper tool which in addition to generating the network parameters file @@ -66,13 +66,14 @@ The current set of network parameters: not start. :notaries: List of identity and validation type (either validating or non-validating) of the notaries which are permitted in the compatibility zone. -:maxMessageSize: Maximum allowed P2P message size sent over the wire in bytes. Any message larger than this will be - split up. -:maxTransactionSize: Maximum permitted transaction size in bytes. +:maxMessageSize: Maximum allowed size in bytes of an individual message sent over the wire. Note that attachments are + a special case and may be fragmented for streaming transfer, however, an individual transaction or flow message + may not be larger than this value. :modifiedTime: The time when the network parameters were last modified by the compatibility zone operator. :epoch: Version number of the network parameters. Starting from 1, this will always increment whenever any of the parameters change. -.. note:: ``maxTransactionSize`` is currently not enforced in the node, but will be in a later release. - -More parameters may be added in future releases. +More parameters will be added in future releases to regulate things like allowed port numbers, how long a node can be +offline before it is evicted from the zone, whether or not IPv6 connectivity is required for zone members, required +cryptographic algorithms and rollout schedules (e.g. for moving to post quantum cryptography), parameters related to +SGX and so on. diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 926a95c0bb..2ad60ac9e0 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -190,6 +190,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like // a code smell. val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList()) + persistentNetworkMapCache.start() val (keyPairs, info) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair) val signedNodeInfo = signNodeInfo(info) { publicKey, serialised -> val privateKey = keyPairs.single { it.public == publicKey }.private @@ -233,7 +234,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // Do all of this in a database transaction so anything that might need a connection has one. val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database -> lh.obj(database) - val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries), identityService) + val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService) val (keyPairs, info) = initNodeInfo(networkMapCache, identity, identityKeyPair) lh.obj(info) identityService.loadIdentities(info.legalIdentitiesAndCerts) diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt index 13881de63c..e6a1a9f972 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -22,10 +22,11 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.loggerFor import net.corda.node.services.api.NetworkMapCacheBaseInternal import net.corda.node.services.api.NetworkMapCacheInternal +import net.corda.node.utilities.NonInvalidatingCache +import net.corda.nodeapi.internal.network.NotaryInfo import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction -import net.corda.nodeapi.internal.network.NotaryInfo import org.hibernate.Session import rx.Observable import rx.subjects.PublishSubject @@ -33,7 +34,6 @@ import java.security.PublicKey import java.sql.Connection import java.util.* import javax.annotation.concurrent.ThreadSafe -import kotlin.collections.HashMap import kotlin.collections.HashSet class NetworkMapCacheImpl( @@ -81,9 +81,6 @@ open class PersistentNetworkMapCache( private val logger = contextLogger() } - // TODO Cleanup registered and party nodes - protected val registeredNodes: MutableMap = Collections.synchronizedMap(HashMap()) - protected val partyNodes: MutableList get() = registeredNodes.map { it.value }.toMutableList() private val _changed = PublishSubject.create() // We use assignment here so that multiple subscribers share the same wrapped Observable. override val changed: Observable = _changed.wrapWithDatabaseTransaction() @@ -96,13 +93,25 @@ open class PersistentNetworkMapCache( private var _loadDBSuccess: Boolean = false override val loadDBSuccess get() = _loadDBSuccess + fun start(): PersistentNetworkMapCache { + // if we find any network map information in the db, we are good to go - if not + // we have to wait for some being added + synchronized(_changed) { + val allNodes = database.transaction { getAllInfos(session) } + if (allNodes.isNotEmpty()) { + _loadDBSuccess = true + } + allNodes.forEach { + changePublisher.onNext(MapChange.Added(it.toNodeInfo())) + } + _registrationFuture.set(null) + } + return this + } + override val notaryIdentities: List = notaries.map { it.identity } private val validatingNotaries = notaries.mapNotNullTo(HashSet()) { if (it.validating) it.identity else null } - init { - database.transaction { loadFromDB(session) } - } - override val allNodeHashes: List get() { return database.transaction { @@ -131,7 +140,7 @@ open class PersistentNetworkMapCache( override fun isValidatingNotary(party: Party): Boolean = party in validatingNotaries override fun getPartyInfo(party: Party): PartyInfo? { - val nodes = database.transaction { queryByIdentityKey(session, party.owningKey) } + val nodes = getNodesByLegalIdentityKey(party.owningKey) if (nodes.size == 1 && nodes[0].isLegalIdentity(party)) { return PartyInfo.SingleNode(party, nodes[0].addresses) } @@ -147,35 +156,36 @@ open class PersistentNetworkMapCache( override fun getNodeByLegalName(name: CordaX500Name): NodeInfo? = getNodesByLegalName(name).firstOrNull() override fun getNodesByLegalName(name: CordaX500Name): List = database.transaction { queryByLegalName(session, name) } - override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List = - database.transaction { queryByIdentityKey(session, identityKey) } + override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List = nodesByKeyCache[identityKey] + + private val nodesByKeyCache = NonInvalidatingCache>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } }) override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = database.transaction { queryByAddress(session, address) } - override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = database.transaction { queryIdentityByLegalName(session, name) } + override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = identityByLegalNameCache.get(name).orElse(null) + + private val identityByLegalNameCache = NonInvalidatingCache>(1024, 8, { name -> Optional.ofNullable(database.transaction { queryIdentityByLegalName(session, name) }) }) override fun track(): DataFeed, MapChange> { synchronized(_changed) { - return DataFeed(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction()) + val allInfos = database.transaction { getAllInfos(session) }.map { it.toNodeInfo() } + return DataFeed(allInfos, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction()) } } override fun addNode(node: NodeInfo) { logger.info("Adding node with info: $node") synchronized(_changed) { - registeredNodes[node.legalIdentities.first().owningKey]?.let { - if (it.serial > node.serial) { - logger.info("Discarding older nodeInfo for ${node.legalIdentities.first().name}") - return - } - } - val previousNode = registeredNodes.put(node.legalIdentities.first().owningKey, node) // TODO hack... we left the first one as special one + val previousNode = getNodesByLegalIdentityKey(node.legalIdentities.first().owningKey).firstOrNull() if (previousNode == null) { logger.info("No previous node found") database.transaction { updateInfoDB(node, session) changePublisher.onNext(MapChange.Added(node)) } + } else if (previousNode.serial > node.serial) { + logger.info("Discarding older nodeInfo for ${node.legalIdentities.first().name}") + return } else if (previousNode != node) { logger.info("Previous node was found as: $previousNode") database.transaction { @@ -194,7 +204,6 @@ open class PersistentNetworkMapCache( override fun removeNode(node: NodeInfo) { logger.info("Removing node with info: $node") synchronized(_changed) { - registeredNodes.remove(node.legalIdentities.first().owningKey) database.transaction { removeInfoDB(session, node) changePublisher.onNext(MapChange.Removed(node)) @@ -214,23 +223,6 @@ open class PersistentNetworkMapCache( return session.createQuery(criteria).resultList } - /** - * Load NetworkMap data from the database if present. Node can start without having NetworkMapService configured. - */ - private fun loadFromDB(session: Session) { - logger.info("Loading network map from database...") - val result = getAllInfos(session) - for (nodeInfo in result) { - try { - logger.info("Loaded node info: $nodeInfo") - val node = nodeInfo.toNodeInfo() - addNode(node) - } catch (e: Exception) { - logger.warn("Exception parsing network map from the database.", e) - } - } - } - private fun updateInfoDB(nodeInfo: NodeInfo, session: Session) { // TODO For now the main legal identity is left in NodeInfo, this should be set comparision/come up with index for NodeInfo? val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey) @@ -239,11 +231,17 @@ open class PersistentNetworkMapCache( nodeInfoEntry.id = info.first().id } session.merge(nodeInfoEntry) + // invalidate cache last - this way, we might serve up the wrong info for a short time, but it will get refreshed + // on the next load + invalidateCaches(nodeInfo) } private fun removeInfoDB(session: Session, nodeInfo: NodeInfo) { val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey).single() session.remove(info) + // invalidate cache last - this way, we might serve up the wrong info for a short time, but it will get refreshed + // on the next load + invalidateCaches(nodeInfo) } private fun findByIdentityKey(session: Session, identityKey: PublicKey): List { @@ -304,7 +302,19 @@ open class PersistentNetworkMapCache( ) } + /** We are caching data we get from the db - if we modify the db, they need to be cleared out*/ + private fun invalidateCaches(nodeInfo: NodeInfo) { + nodesByKeyCache.invalidateAll(nodeInfo.legalIdentities.map { it.owningKey }) + identityByLegalNameCache.invalidateAll(nodeInfo.legalIdentities.map { it.name }) + } + + private fun invalidateCaches() { + nodesByKeyCache.invalidateAll() + identityByLegalNameCache.invalidateAll() + } + override fun clearNetworkMapCache() { + invalidateCaches() database.transaction { val result = getAllInfos(session) for (nodeInfo in result) session.remove(nodeInfo) diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index 98d6f0233d..1fb6840b71 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -82,7 +82,7 @@ class ArtemisMessagingTests { } LogHelper.setLevel(PersistentUniquenessProvider::class) database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), rigorousMock()) - networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()), rigorousMock()) + networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()).start(), rigorousMock()) } @After diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt index 14f91e0178..eb95a985dd 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt @@ -1,16 +1,20 @@ package net.corda.node.services.network import net.corda.core.node.services.NetworkMapCache +import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.testing.ALICE_NAME import net.corda.testing.BOB_NAME import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockNodeParameters import net.corda.testing.singleIdentity +import net.corda.testing.singleIdentityAndCert import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Test import java.math.BigInteger import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull class NetworkMapCacheTest { private val mockNet = MockNetwork(emptyList()) @@ -62,6 +66,31 @@ class NetworkMapCacheTest { assertEquals(expected, actual) } + @Test + fun `caches get cleared on modification`() { + val aliceNode = mockNet.createPartyNode(ALICE_NAME) + val bobNode = mockNet.createPartyNode(BOB_NAME) + val bobCache: NetworkMapCache = bobNode.services.networkMapCache + val expected = aliceNode.info.singleIdentity() + + val actual = bobNode.database.transaction { bobCache.getPeerByLegalName(ALICE_NAME) } + assertEquals(expected, actual) + assertEquals(aliceNode.info, bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).single()) + + // remove alice + val bobCacheInternal = bobCache as NetworkMapCacheInternal + assertNotNull(bobCacheInternal) + bobCache.removeNode(aliceNode.info) + + assertNull(bobCache.getPeerByLegalName(ALICE_NAME)) + assertThat(bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).isEmpty()) + + bobCacheInternal.addNode(aliceNode.info) + + assertEquals(aliceNode.info.singleIdentity(), bobCache.getPeerByLegalName(ALICE_NAME)) + assertEquals(aliceNode.info, bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).single()) + } + @Test fun `remove node from cache`() { val aliceNode = mockNet.createPartyNode(ALICE_NAME)