mirror of
https://github.com/corda/corda.git
synced 2025-01-15 01:10:33 +00:00
Merge commit 'fca0afe5913d880628b9f94c459a04fb785b6c17' into christians/ENT-985-merge
This commit is contained in:
commit
d9fb2ae4a8
@ -2,7 +2,7 @@ Network Map
|
||||
===========
|
||||
|
||||
The network map is a collection of signed ``NodeInfo`` objects (signed by the node it represents and thus tamper-proof)
|
||||
forming the set of reachable nodes in a compatbility zone. A node can receive these objects from two sources:
|
||||
forming the set of reachable nodes in a compatibility zone. A node can receive these objects from two sources:
|
||||
|
||||
1. The HTTP network map service if the ``compatibilityZoneURL`` config key is specified.
|
||||
2. The ``additional-node-infos`` directory within the node's directory.
|
||||
@ -13,7 +13,7 @@ HTTP network map service
|
||||
If the node is configured with the ``compatibilityZoneURL`` config then it first uploads its own signed ``NodeInfo``
|
||||
to the server (and each time it changes on startup) and then proceeds to download the entire network map. The network map
|
||||
consists of a list of ``NodeInfo`` hashes. The node periodically polls for the network map (based on the HTTP cache expiry
|
||||
header) and any new hash entries are downloaded and cached. Entries which no longer exist are deleted from the node's cache.
|
||||
header) and any new entries are downloaded and cached. Entries which no longer exist are deleted from the node's cache.
|
||||
|
||||
The set of REST end-points for the network map service are as follows.
|
||||
|
||||
@ -35,7 +35,7 @@ The ``additional-node-infos`` directory
|
||||
|
||||
Alongside the HTTP network map service, or as a replacement if the node isn't connected to one, the node polls the
|
||||
contents of the ``additional-node-infos`` directory located in its base directory. Each file is expected to be the same
|
||||
signed ``NodeInfo`` object that the network map service vends. These are automtically added to the node's cache and can
|
||||
signed ``NodeInfo`` object that the network map service vends. These are automatically added to the node's cache and can
|
||||
be used to supplement or replace the HTTP network map. If the same node is advertised through both mechanisms then the
|
||||
latest one is taken.
|
||||
|
||||
@ -46,7 +46,7 @@ of every node that's part of this network.
|
||||
Network parameters
|
||||
------------------
|
||||
|
||||
Network parameters are a set of values that every node participating in the network needs to agree on and use to
|
||||
Network parameters are a set of values that every node participating in the zone needs to agree on and use to
|
||||
correctly interoperate with each other. If the node is using the HTTP network map service then on first startup it will
|
||||
download the signed network parameters, cache it in a ``network-parameters`` file and apply them on the node.
|
||||
|
||||
@ -54,7 +54,7 @@ download the signed network parameters, cache it in a ``network-parameters`` fil
|
||||
then the node will automatically shutdown. Resolution to this is to delete the incorrect file and restart the node so
|
||||
that the parameters can be downloaded again.
|
||||
|
||||
.. note:: A future release will support the notion of network parameters changes.
|
||||
.. note:: A future release will support the notion of phased rollout of network parameter changes.
|
||||
|
||||
If the node isn't using a HTTP network map service then it's expected the signed file is provided by some other means.
|
||||
For such a scenario there is the network bootstrapper tool which in addition to generating the network parameters file
|
||||
@ -66,13 +66,14 @@ The current set of network parameters:
|
||||
not start.
|
||||
:notaries: List of identity and validation type (either validating or non-validating) of the notaries which are permitted
|
||||
in the compatibility zone.
|
||||
:maxMessageSize: Maximum allowed P2P message size sent over the wire in bytes. Any message larger than this will be
|
||||
split up.
|
||||
:maxTransactionSize: Maximum permitted transaction size in bytes.
|
||||
:maxMessageSize: Maximum allowed size in bytes of an individual message sent over the wire. Note that attachments are
|
||||
a special case and may be fragmented for streaming transfer, however, an individual transaction or flow message
|
||||
may not be larger than this value.
|
||||
:modifiedTime: The time when the network parameters were last modified by the compatibility zone operator.
|
||||
:epoch: Version number of the network parameters. Starting from 1, this will always increment whenever any of the
|
||||
parameters change.
|
||||
|
||||
.. note:: ``maxTransactionSize`` is currently not enforced in the node, but will be in a later release.
|
||||
|
||||
More parameters may be added in future releases.
|
||||
More parameters will be added in future releases to regulate things like allowed port numbers, how long a node can be
|
||||
offline before it is evicted from the zone, whether or not IPv6 connectivity is required for zone members, required
|
||||
cryptographic algorithms and rollout schedules (e.g. for moving to post quantum cryptography), parameters related to
|
||||
SGX and so on.
|
||||
|
@ -190,6 +190,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like
|
||||
// a code smell.
|
||||
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
|
||||
persistentNetworkMapCache.start()
|
||||
val (keyPairs, info) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
|
||||
val signedNodeInfo = signNodeInfo(info) { publicKey, serialised ->
|
||||
val privateKey = keyPairs.single { it.public == publicKey }.private
|
||||
@ -233,7 +234,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
// Do all of this in a database transaction so anything that might need a connection has one.
|
||||
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
|
||||
lh.obj(database)
|
||||
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries), identityService)
|
||||
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService)
|
||||
val (keyPairs, info) = initNodeInfo(networkMapCache, identity, identityKeyPair)
|
||||
lh.obj(info)
|
||||
identityService.loadIdentities(info.legalIdentitiesAndCerts)
|
||||
|
@ -22,10 +22,11 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.NetworkMapCacheBaseInternal
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.utilities.NonInvalidatingCache
|
||||
import net.corda.nodeapi.internal.network.NotaryInfo
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
|
||||
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
||||
import net.corda.nodeapi.internal.network.NotaryInfo
|
||||
import org.hibernate.Session
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
@ -33,7 +34,6 @@ import java.security.PublicKey
|
||||
import java.sql.Connection
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import kotlin.collections.HashMap
|
||||
import kotlin.collections.HashSet
|
||||
|
||||
class NetworkMapCacheImpl(
|
||||
@ -81,9 +81,6 @@ open class PersistentNetworkMapCache(
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
// TODO Cleanup registered and party nodes
|
||||
protected val registeredNodes: MutableMap<PublicKey, NodeInfo> = Collections.synchronizedMap(HashMap())
|
||||
protected val partyNodes: MutableList<NodeInfo> get() = registeredNodes.map { it.value }.toMutableList()
|
||||
private val _changed = PublishSubject.create<MapChange>()
|
||||
// We use assignment here so that multiple subscribers share the same wrapped Observable.
|
||||
override val changed: Observable<MapChange> = _changed.wrapWithDatabaseTransaction()
|
||||
@ -96,13 +93,25 @@ open class PersistentNetworkMapCache(
|
||||
private var _loadDBSuccess: Boolean = false
|
||||
override val loadDBSuccess get() = _loadDBSuccess
|
||||
|
||||
fun start(): PersistentNetworkMapCache {
|
||||
// if we find any network map information in the db, we are good to go - if not
|
||||
// we have to wait for some being added
|
||||
synchronized(_changed) {
|
||||
val allNodes = database.transaction { getAllInfos(session) }
|
||||
if (allNodes.isNotEmpty()) {
|
||||
_loadDBSuccess = true
|
||||
}
|
||||
allNodes.forEach {
|
||||
changePublisher.onNext(MapChange.Added(it.toNodeInfo()))
|
||||
}
|
||||
_registrationFuture.set(null)
|
||||
}
|
||||
return this
|
||||
}
|
||||
|
||||
override val notaryIdentities: List<Party> = notaries.map { it.identity }
|
||||
private val validatingNotaries = notaries.mapNotNullTo(HashSet()) { if (it.validating) it.identity else null }
|
||||
|
||||
init {
|
||||
database.transaction { loadFromDB(session) }
|
||||
}
|
||||
|
||||
override val allNodeHashes: List<SecureHash>
|
||||
get() {
|
||||
return database.transaction {
|
||||
@ -131,7 +140,7 @@ open class PersistentNetworkMapCache(
|
||||
override fun isValidatingNotary(party: Party): Boolean = party in validatingNotaries
|
||||
|
||||
override fun getPartyInfo(party: Party): PartyInfo? {
|
||||
val nodes = database.transaction { queryByIdentityKey(session, party.owningKey) }
|
||||
val nodes = getNodesByLegalIdentityKey(party.owningKey)
|
||||
if (nodes.size == 1 && nodes[0].isLegalIdentity(party)) {
|
||||
return PartyInfo.SingleNode(party, nodes[0].addresses)
|
||||
}
|
||||
@ -147,35 +156,36 @@ open class PersistentNetworkMapCache(
|
||||
|
||||
override fun getNodeByLegalName(name: CordaX500Name): NodeInfo? = getNodesByLegalName(name).firstOrNull()
|
||||
override fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo> = database.transaction { queryByLegalName(session, name) }
|
||||
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> =
|
||||
database.transaction { queryByIdentityKey(session, identityKey) }
|
||||
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey]
|
||||
|
||||
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } })
|
||||
|
||||
override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = database.transaction { queryByAddress(session, address) }
|
||||
|
||||
override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = database.transaction { queryIdentityByLegalName(session, name) }
|
||||
override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = identityByLegalNameCache.get(name).orElse(null)
|
||||
|
||||
private val identityByLegalNameCache = NonInvalidatingCache<CordaX500Name, Optional<PartyAndCertificate>>(1024, 8, { name -> Optional.ofNullable(database.transaction { queryIdentityByLegalName(session, name) }) })
|
||||
|
||||
override fun track(): DataFeed<List<NodeInfo>, MapChange> {
|
||||
synchronized(_changed) {
|
||||
return DataFeed(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction())
|
||||
val allInfos = database.transaction { getAllInfos(session) }.map { it.toNodeInfo() }
|
||||
return DataFeed(allInfos, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction())
|
||||
}
|
||||
}
|
||||
|
||||
override fun addNode(node: NodeInfo) {
|
||||
logger.info("Adding node with info: $node")
|
||||
synchronized(_changed) {
|
||||
registeredNodes[node.legalIdentities.first().owningKey]?.let {
|
||||
if (it.serial > node.serial) {
|
||||
logger.info("Discarding older nodeInfo for ${node.legalIdentities.first().name}")
|
||||
return
|
||||
}
|
||||
}
|
||||
val previousNode = registeredNodes.put(node.legalIdentities.first().owningKey, node) // TODO hack... we left the first one as special one
|
||||
val previousNode = getNodesByLegalIdentityKey(node.legalIdentities.first().owningKey).firstOrNull()
|
||||
if (previousNode == null) {
|
||||
logger.info("No previous node found")
|
||||
database.transaction {
|
||||
updateInfoDB(node, session)
|
||||
changePublisher.onNext(MapChange.Added(node))
|
||||
}
|
||||
} else if (previousNode.serial > node.serial) {
|
||||
logger.info("Discarding older nodeInfo for ${node.legalIdentities.first().name}")
|
||||
return
|
||||
} else if (previousNode != node) {
|
||||
logger.info("Previous node was found as: $previousNode")
|
||||
database.transaction {
|
||||
@ -194,7 +204,6 @@ open class PersistentNetworkMapCache(
|
||||
override fun removeNode(node: NodeInfo) {
|
||||
logger.info("Removing node with info: $node")
|
||||
synchronized(_changed) {
|
||||
registeredNodes.remove(node.legalIdentities.first().owningKey)
|
||||
database.transaction {
|
||||
removeInfoDB(session, node)
|
||||
changePublisher.onNext(MapChange.Removed(node))
|
||||
@ -214,23 +223,6 @@ open class PersistentNetworkMapCache(
|
||||
return session.createQuery(criteria).resultList
|
||||
}
|
||||
|
||||
/**
|
||||
* Load NetworkMap data from the database if present. Node can start without having NetworkMapService configured.
|
||||
*/
|
||||
private fun loadFromDB(session: Session) {
|
||||
logger.info("Loading network map from database...")
|
||||
val result = getAllInfos(session)
|
||||
for (nodeInfo in result) {
|
||||
try {
|
||||
logger.info("Loaded node info: $nodeInfo")
|
||||
val node = nodeInfo.toNodeInfo()
|
||||
addNode(node)
|
||||
} catch (e: Exception) {
|
||||
logger.warn("Exception parsing network map from the database.", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun updateInfoDB(nodeInfo: NodeInfo, session: Session) {
|
||||
// TODO For now the main legal identity is left in NodeInfo, this should be set comparision/come up with index for NodeInfo?
|
||||
val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey)
|
||||
@ -239,11 +231,17 @@ open class PersistentNetworkMapCache(
|
||||
nodeInfoEntry.id = info.first().id
|
||||
}
|
||||
session.merge(nodeInfoEntry)
|
||||
// invalidate cache last - this way, we might serve up the wrong info for a short time, but it will get refreshed
|
||||
// on the next load
|
||||
invalidateCaches(nodeInfo)
|
||||
}
|
||||
|
||||
private fun removeInfoDB(session: Session, nodeInfo: NodeInfo) {
|
||||
val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey).single()
|
||||
session.remove(info)
|
||||
// invalidate cache last - this way, we might serve up the wrong info for a short time, but it will get refreshed
|
||||
// on the next load
|
||||
invalidateCaches(nodeInfo)
|
||||
}
|
||||
|
||||
private fun findByIdentityKey(session: Session, identityKey: PublicKey): List<NodeInfoSchemaV1.PersistentNodeInfo> {
|
||||
@ -304,7 +302,19 @@ open class PersistentNetworkMapCache(
|
||||
)
|
||||
}
|
||||
|
||||
/** We are caching data we get from the db - if we modify the db, they need to be cleared out*/
|
||||
private fun invalidateCaches(nodeInfo: NodeInfo) {
|
||||
nodesByKeyCache.invalidateAll(nodeInfo.legalIdentities.map { it.owningKey })
|
||||
identityByLegalNameCache.invalidateAll(nodeInfo.legalIdentities.map { it.name })
|
||||
}
|
||||
|
||||
private fun invalidateCaches() {
|
||||
nodesByKeyCache.invalidateAll()
|
||||
identityByLegalNameCache.invalidateAll()
|
||||
}
|
||||
|
||||
override fun clearNetworkMapCache() {
|
||||
invalidateCaches()
|
||||
database.transaction {
|
||||
val result = getAllInfos(session)
|
||||
for (nodeInfo in result) session.remove(nodeInfo)
|
||||
|
@ -82,7 +82,7 @@ class ArtemisMessagingTests {
|
||||
}
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), rigorousMock())
|
||||
networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()), rigorousMock())
|
||||
networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()).start(), rigorousMock())
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -1,16 +1,20 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.testing.ALICE_NAME
|
||||
import net.corda.testing.BOB_NAME
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.MockNodeParameters
|
||||
import net.corda.testing.singleIdentity
|
||||
import net.corda.testing.singleIdentityAndCert
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Test
|
||||
import java.math.BigInteger
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNotNull
|
||||
import kotlin.test.assertNull
|
||||
|
||||
class NetworkMapCacheTest {
|
||||
private val mockNet = MockNetwork(emptyList())
|
||||
@ -62,6 +66,31 @@ class NetworkMapCacheTest {
|
||||
assertEquals(expected, actual)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `caches get cleared on modification`() {
|
||||
val aliceNode = mockNet.createPartyNode(ALICE_NAME)
|
||||
val bobNode = mockNet.createPartyNode(BOB_NAME)
|
||||
val bobCache: NetworkMapCache = bobNode.services.networkMapCache
|
||||
val expected = aliceNode.info.singleIdentity()
|
||||
|
||||
val actual = bobNode.database.transaction { bobCache.getPeerByLegalName(ALICE_NAME) }
|
||||
assertEquals(expected, actual)
|
||||
assertEquals(aliceNode.info, bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).single())
|
||||
|
||||
// remove alice
|
||||
val bobCacheInternal = bobCache as NetworkMapCacheInternal
|
||||
assertNotNull(bobCacheInternal)
|
||||
bobCache.removeNode(aliceNode.info)
|
||||
|
||||
assertNull(bobCache.getPeerByLegalName(ALICE_NAME))
|
||||
assertThat(bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).isEmpty())
|
||||
|
||||
bobCacheInternal.addNode(aliceNode.info)
|
||||
|
||||
assertEquals(aliceNode.info.singleIdentity(), bobCache.getPeerByLegalName(ALICE_NAME))
|
||||
assertEquals(aliceNode.info, bobCache.getNodesByLegalIdentityKey(aliceNode.info.singleIdentity().owningKey).single())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `remove node from cache`() {
|
||||
val aliceNode = mockNet.createPartyNode(ALICE_NAME)
|
||||
|
Loading…
Reference in New Issue
Block a user