mirror of
https://github.com/corda/corda.git
synced 2025-02-12 05:35:50 +00:00
CORDA-866: Implement removal of stale nodes from network (#774)
* Implement removal of stale nodes from network Add eventHorizon to NetworkParameters structure. Add republishing of node info on 1 day intervals - it is treated by network map as heartbeat from node indicating if it's alive or not. Add removal of old node infos on network map signing. * Add copy method to NetworkParameters data class Add JvmOverloads annotation to the constructor, because it's data class exposed in API
This commit is contained in:
parent
4afc7f3824
commit
509a52ad5e
@ -13,6 +13,8 @@ package net.corda.core.node
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.AttachmentId
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.days
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
|
||||
/**
|
||||
@ -27,18 +29,19 @@ import java.time.Instant
|
||||
* of parameters.
|
||||
* @property whitelistedContractImplementations List of whitelisted jars containing contract code for each contract class.
|
||||
* This will be used by [net.corda.core.contracts.WhitelistedByZoneAttachmentConstraint]. Read more about contract constraints here: <https://docs.corda.net/api-contract-constraints.html>
|
||||
* @property eventHorizon Time after which nodes will be removed from the network map if they have not been seen
|
||||
* during this period
|
||||
*/
|
||||
// TODO Add eventHorizon - how many days a node can be offline before being automatically ejected from the network.
|
||||
// It needs separate design.
|
||||
@CordaSerializable
|
||||
data class NetworkParameters(
|
||||
data class NetworkParameters @JvmOverloads constructor(
|
||||
val minimumPlatformVersion: Int,
|
||||
val notaries: List<NotaryInfo>,
|
||||
val maxMessageSize: Int,
|
||||
val maxTransactionSize: Int,
|
||||
val modifiedTime: Instant,
|
||||
val epoch: Int,
|
||||
val whitelistedContractImplementations: Map<String, List<AttachmentId>>
|
||||
val whitelistedContractImplementations: Map<String, List<AttachmentId>>,
|
||||
val eventHorizon: Duration = Int.MAX_VALUE.days
|
||||
) {
|
||||
init {
|
||||
require(minimumPlatformVersion > 0) { "minimumPlatformVersion must be at least 1" }
|
||||
@ -46,6 +49,25 @@ data class NetworkParameters(
|
||||
require(epoch > 0) { "epoch must be at least 1" }
|
||||
require(maxMessageSize > 0) { "maxMessageSize must be at least 1" }
|
||||
require(maxTransactionSize > 0) { "maxTransactionSize must be at least 1" }
|
||||
require(!eventHorizon.isNegative) { "eventHorizon must be positive value" }
|
||||
}
|
||||
|
||||
fun copy(minimumPlatformVersion: Int,
|
||||
notaries: List<NotaryInfo>,
|
||||
maxMessageSize: Int,
|
||||
maxTransactionSize: Int,
|
||||
modifiedTime: Instant,
|
||||
epoch: Int,
|
||||
whitelistedContractImplementations: Map<String, List<AttachmentId>>
|
||||
): NetworkParameters {
|
||||
return copy(minimumPlatformVersion = minimumPlatformVersion,
|
||||
notaries = notaries,
|
||||
maxMessageSize = maxMessageSize,
|
||||
maxTransactionSize = maxTransactionSize,
|
||||
modifiedTime = modifiedTime,
|
||||
epoch = epoch,
|
||||
whitelistedContractImplementations = whitelistedContractImplementations,
|
||||
eventHorizon = eventHorizon)
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
@ -57,6 +79,7 @@ data class NetworkParameters(
|
||||
whitelistedContractImplementations {
|
||||
${whitelistedContractImplementations.entries.joinToString("\n ")}
|
||||
}
|
||||
eventHorizon=$eventHorizon
|
||||
modifiedTime=$modifiedTime
|
||||
epoch=$epoch
|
||||
}"""
|
||||
|
@ -121,6 +121,9 @@ The current set of network parameters:
|
||||
For each contract class there is a list of hashes of the approved CorDapp jar versions containing that contract.
|
||||
Read more about *Zone constraints* here :doc:`api-contract-constraints`
|
||||
|
||||
:eventHorizon: Time after which nodes are considered to be unresponsive and removed from network map. Nodes republish their
|
||||
``NodeInfo`` on a regular interval. Network map treats that as a heartbeat from the node.
|
||||
|
||||
More parameters will be added in future releases to regulate things like allowed port numbers, how long a node can be
|
||||
offline before it is evicted from the zone, whether or not IPv6 connectivity is required for zone members, required
|
||||
cryptographic algorithms and rollout schedules (e.g. for moving to post quantum cryptography), parameters related to
|
||||
|
@ -5,3 +5,4 @@ notaries : [{
|
||||
minimumPlatformVersion = 1
|
||||
maxMessageSize = 10485760
|
||||
maxTransactionSize = 10485760
|
||||
eventHorizonDays = 30 # Duration in days
|
||||
|
@ -244,6 +244,7 @@ networkMap {
|
||||
minimumPlatformVersion = 1
|
||||
maxMessageSize = 10485760
|
||||
maxTransactionSize = 10485760
|
||||
eventHorizonDays = 30 # Duration in days
|
||||
|
||||
Save the parameters to `network-parameters.conf`
|
||||
|
||||
|
@ -8,3 +8,4 @@ notaries : [{
|
||||
minimumPlatformVersion = 1
|
||||
maxMessageSize = 10485760
|
||||
maxTransactionSize = 10485760
|
||||
eventHorizonDays = 30 # Duration in days
|
@ -79,7 +79,8 @@ class NetworkParametersUpdateTest : IntegrationTest() {
|
||||
minimumPlatformVersion = 1,
|
||||
maxMessageSize = 1_000_000,
|
||||
maxTransactionSize = 1_000_000,
|
||||
parametersUpdate = null
|
||||
parametersUpdate = null,
|
||||
eventHorizonDays = 30
|
||||
)
|
||||
applyNetworkParametersAndStart(initialNetParams)
|
||||
|
||||
|
@ -117,7 +117,8 @@ class NodeRegistrationTest : IntegrationTest() {
|
||||
minimumPlatformVersion = 1,
|
||||
maxMessageSize = 10485760,
|
||||
maxTransactionSize = 10485760,
|
||||
parametersUpdate = null
|
||||
parametersUpdate = null,
|
||||
eventHorizonDays = 30
|
||||
)
|
||||
// Restart the server once we're able to generate the network parameters
|
||||
applyNetworkParametersAndStart(setNetParams)
|
||||
|
@ -37,7 +37,8 @@ interface NetworkMapStorage {
|
||||
fun saveNewNetworkMap(networkId: String? = null, networkMapAndSigned: NetworkMapAndSigned)
|
||||
|
||||
/**
|
||||
* Retrieves node info hashes for both public and private networks where [NodeInfoEntity.isCurrent] is true and the certificate status is [CertificateStatus.VALID]
|
||||
* Retrieves node info hashes for both public and private networks where [NodeInfoEntity.isCurrent] is true and the certificate status is [CertificateStatus.VALID],
|
||||
* and that were published less than eventHorizon ago.
|
||||
* Nodes should have declared that they are using correct set of parameters.
|
||||
*/
|
||||
fun getNodeInfoHashes(): NodeInfoHashes
|
||||
|
@ -42,6 +42,10 @@ interface NodeInfoStorage {
|
||||
|
||||
/**
|
||||
* The [nodeInfoAndSigned] is keyed by the public key, old node info with the same public key will be replaced by the new node info.
|
||||
* If republishing of the same nodeInfo happens, then we will record the time it was republished in the database.
|
||||
* Based on that information we can remove unresponsive nodes from network (event horizon is the parameter that tells how
|
||||
* long node can be down before it gets removed). If the nodes becomes active again, it will enter back to the network map
|
||||
* after republishing its [NodeInfo].
|
||||
* @param nodeInfoAndSigned signed node info data to be stored
|
||||
* @return hash for the newly created node info entry
|
||||
*/
|
||||
|
@ -69,7 +69,9 @@ class PersistentNetworkMapStorage(private val database: CordaPersistence) : Netw
|
||||
|
||||
override fun getNodeInfoHashes(): NodeInfoHashes {
|
||||
return database.transaction {
|
||||
val currentParameters = getNetworkMaps().publicNetworkMap?.networkParameters?.networkParameters
|
||||
val builder = session.criteriaBuilder
|
||||
// TODO Convert this query to JPQL so it's more readable.
|
||||
val query = builder.createTupleQuery().run {
|
||||
from(NodeInfoEntity::class.java).run {
|
||||
val certStatusExpression = get<CertificateSigningRequestEntity>(NodeInfoEntity::certificateSigningRequest.name)
|
||||
@ -79,12 +81,19 @@ class PersistentNetworkMapStorage(private val database: CordaPersistence) : Netw
|
||||
// isn't needed.
|
||||
val certStatusEq = builder.equal(certStatusExpression, CertificateStatus.VALID)
|
||||
val isCurrentNodeInfo = builder.isTrue(get<Boolean>(NodeInfoEntity::isCurrent.name))
|
||||
|
||||
// We enable eventHorizon only if minimum platform version is greater than 3, nodes on previous versions
|
||||
// don't republish their node infos on regular intervals so they shouldn't be evicted from network after eventHorizon.
|
||||
val eventHorizonAgo = if (currentParameters != null && currentParameters.minimumPlatformVersion >= 4) {
|
||||
builder.greaterThanOrEqualTo(get<Instant>(NodeInfoEntity::publishedAt.name),
|
||||
Instant.now().minus(currentParameters.eventHorizon))
|
||||
} else {
|
||||
builder.and() // This expression is always true. It's needed when eventHorizon isn't enabled.
|
||||
}
|
||||
val networkIdSelector = get<CertificateSigningRequestEntity>(NodeInfoEntity::certificateSigningRequest.name)
|
||||
.get<PrivateNetworkEntity>(CertificateSigningRequestEntity::privateNetwork.name)
|
||||
.get<String>(PrivateNetworkEntity::networkId.name)
|
||||
|
||||
multiselect(networkIdSelector, get<String>(NodeInfoEntity::nodeInfoHash.name)).where(builder.and(certStatusEq, isCurrentNodeInfo))
|
||||
multiselect(networkIdSelector, get<String>(NodeInfoEntity::nodeInfoHash.name))
|
||||
.where(builder.and(certStatusEq, isCurrentNodeInfo, eventHorizonAgo))
|
||||
}
|
||||
}
|
||||
val allNodeInfos = session.createQuery(query).resultList.groupBy { it[0]?.toString() ?: PUBLIC_NETWORK_ID }.mapValues { it.value.map { SecureHash.parse(it.get(1, String::class.java)) } }
|
||||
|
@ -16,10 +16,10 @@ import com.r3.corda.networkmanage.common.persistence.entity.ParametersUpdateEnti
|
||||
import com.r3.corda.networkmanage.common.persistence.entity.UpdateStatus
|
||||
import com.r3.corda.networkmanage.common.utils.logger
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.internal.CertRole
|
||||
import net.corda.core.internal.CertRole.NODE_CA
|
||||
import net.corda.core.internal.hash
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.crypto.x509Certificates
|
||||
@ -27,6 +27,7 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||
import java.security.PublicKey
|
||||
import java.security.cert.CertPath
|
||||
import java.time.Instant
|
||||
|
||||
/**
|
||||
* Database implementation of the [NetworkMapStorage] interface
|
||||
@ -40,16 +41,16 @@ class PersistentNodeInfoStorage(private val database: CordaPersistence) : NodeIn
|
||||
val registeredIdentities = nodeInfo.legalIdentitiesAndCerts.map { it.certPath.x509Certificates.single { CertRole.extract(it) in setOf(CertRole.SERVICE_IDENTITY, NODE_CA) } }
|
||||
|
||||
database.transaction {
|
||||
val count = session.createQuery(
|
||||
"select count(*) from ${NodeInfoEntity::class.java.name} where nodeInfoHash = :nodeInfoHash and isCurrent = true", java.lang.Long::class.java)
|
||||
// Record fact of republishing of the node info, it's treated as a heartbeat from the node.
|
||||
val rowsUpdated = session.createQuery("update ${NodeInfoEntity::class.java.name} n set publishedAt = :now " +
|
||||
"where n.nodeInfoHash = :nodeInfoHash and n.isCurrent = true")
|
||||
.setParameter("now", Instant.now())
|
||||
.setParameter("nodeInfoHash", nodeInfoHash.toString())
|
||||
.singleResult
|
||||
.toLong()
|
||||
if (count != 0L) {
|
||||
logger.debug("Ignoring duplicate publish: $nodeInfo")
|
||||
.executeUpdate()
|
||||
if (rowsUpdated != 0) {
|
||||
logger.debug { "Republish of $nodeInfo" }
|
||||
return@transaction nodeInfoHash
|
||||
}
|
||||
|
||||
// TODO Move these checks out of data access layer
|
||||
// For each identity known by the doorman, validate against it's CSR.
|
||||
val requests = registeredIdentities.map {
|
||||
|
@ -12,6 +12,7 @@ package com.r3.corda.networkmanage.common.persistence.entity
|
||||
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import org.hibernate.annotations.UpdateTimestamp
|
||||
import java.io.Serializable
|
||||
import java.time.Instant
|
||||
import javax.persistence.*
|
||||
@ -42,6 +43,7 @@ data class NodeInfoEntity(
|
||||
val isCurrent: Boolean,
|
||||
|
||||
@Column(name = "published_at", nullable = false)
|
||||
@UpdateTimestamp
|
||||
val publishedAt: Instant = Instant.now(),
|
||||
|
||||
@ManyToOne(fetch = FetchType.EAGER)
|
||||
|
@ -8,6 +8,7 @@ import joptsimple.util.PathConverter
|
||||
import joptsimple.util.PathProperties
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.utilities.days
|
||||
import java.nio.file.Path
|
||||
import java.time.Instant
|
||||
|
||||
@ -70,6 +71,7 @@ sealed class NetworkParametersCmd {
|
||||
val notaries: List<NotaryInfo>,
|
||||
val maxMessageSize: Int,
|
||||
val maxTransactionSize: Int,
|
||||
val eventHorizonDays: Int,
|
||||
val parametersUpdate: ParametersUpdateConfig?
|
||||
) : NetworkParametersCmd() {
|
||||
companion object {
|
||||
@ -79,6 +81,7 @@ sealed class NetworkParametersCmd {
|
||||
config.notaries.map { it.toNotaryInfo() },
|
||||
config.maxMessageSize,
|
||||
config.maxTransactionSize,
|
||||
config.eventHorizonDays,
|
||||
config.parametersUpdate
|
||||
)
|
||||
}
|
||||
@ -103,7 +106,8 @@ sealed class NetworkParametersCmd {
|
||||
modifiedTime,
|
||||
epoch,
|
||||
// TODO: Tudor, Michal - pass the actual network parameters where we figure out how
|
||||
emptyMap()
|
||||
emptyMap(),
|
||||
eventHorizonDays.days
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ import net.corda.core.node.NotaryInfo
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import java.nio.file.Path
|
||||
import java.time.Instant
|
||||
import java.time.Duration
|
||||
|
||||
/**
|
||||
* Data class representing a [NotaryInfo] which can be easily parsed by a typesafe [ConfigFactory].
|
||||
@ -53,4 +54,5 @@ data class NetworkParametersConfig(val minimumPlatformVersion: Int,
|
||||
val notaries: List<NotaryConfig>,
|
||||
val maxMessageSize: Int,
|
||||
val maxTransactionSize: Int,
|
||||
val eventHorizonDays: Int,
|
||||
val parametersUpdate: ParametersUpdateConfig?)
|
||||
|
@ -80,6 +80,7 @@ fun NetworkParameters.toCmd(parametersUpdate: ParametersUpdateConfig? = null): N
|
||||
notaries = notaries,
|
||||
maxMessageSize = maxMessageSize,
|
||||
maxTransactionSize = maxTransactionSize,
|
||||
parametersUpdate = parametersUpdate
|
||||
parametersUpdate = parametersUpdate,
|
||||
eventHorizonDays = eventHorizon.toDays().toInt()
|
||||
)
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ import com.r3.corda.networkmanage.common.persistence.entity.UpdateStatus
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.nodeapi.internal.createDevNetworkMapCa
|
||||
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
|
||||
import net.corda.nodeapi.internal.network.NetworkMap
|
||||
@ -208,4 +209,34 @@ class PersistentNetworkMapStorageTest : TestBase() {
|
||||
// then
|
||||
assertThat(validNodeInfoHashes).containsOnly(nodeInfoHashB)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `remove nodes older than eventHorizon from network map`() {
|
||||
val networkParameters = testNetworkParameters(eventHorizon = 1.seconds, minimumPlatformVersion = 4)
|
||||
val (signedNodeInfoA) = createValidSignedNodeInfo("TestA", requestStorage)
|
||||
val networkParametersEntity = networkMapStorage.saveNetworkParameters(networkParameters, networkMapCertAndKeyPair.sign(networkParameters).sig)
|
||||
val networkMap = NetworkMap(emptyList(), SecureHash.parse(networkParametersEntity.hash), null)
|
||||
val networkMapAndSigned = NetworkMapAndSigned(networkMap) { networkMapCertAndKeyPair.sign(networkMap).sig }
|
||||
networkMapStorage.saveNewNetworkMap(networkMapAndSigned = networkMapAndSigned)
|
||||
nodeInfoStorage.putNodeInfo(signedNodeInfoA)
|
||||
assertThat(networkMapStorage.getNodeInfoHashes().publicNodeInfoHashes).containsExactly(signedNodeInfoA.signed.raw.hash)
|
||||
Thread.sleep(2000) // Wait for node to be older than eventHorizon
|
||||
assertThat(networkMapStorage.getNodeInfoHashes().publicNodeInfoHashes).doesNotContain(signedNodeInfoA.signed.raw.hash)
|
||||
nodeInfoStorage.putNodeInfo(signedNodeInfoA) // Republish
|
||||
assertThat(networkMapStorage.getNodeInfoHashes().publicNodeInfoHashes).containsExactly(signedNodeInfoA.signed.raw.hash)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `don't enable eventHorizon for platform version less than 4`() {
|
||||
val networkParameters = testNetworkParameters(eventHorizon = 1.seconds, minimumPlatformVersion = 3)
|
||||
val (signedNodeInfoA) = createValidSignedNodeInfo("TestA", requestStorage)
|
||||
val networkParametersEntity = networkMapStorage.saveNetworkParameters(networkParameters, networkMapCertAndKeyPair.sign(networkParameters).sig)
|
||||
val networkMap = NetworkMap(emptyList(), SecureHash.parse(networkParametersEntity.hash), null)
|
||||
val networkMapAndSigned = NetworkMapAndSigned(networkMap) { networkMapCertAndKeyPair.sign(networkMap).sig }
|
||||
networkMapStorage.saveNewNetworkMap(networkMapAndSigned = networkMapAndSigned)
|
||||
nodeInfoStorage.putNodeInfo(signedNodeInfoA)
|
||||
assertThat(networkMapStorage.getNodeInfoHashes().publicNodeInfoHashes).containsExactly(signedNodeInfoA.signed.raw.hash)
|
||||
Thread.sleep(2000) // Wait for eventHorizon to pass
|
||||
assertThat(networkMapStorage.getNodeInfoHashes().publicNodeInfoHashes).containsExactly(signedNodeInfoA.signed.raw.hash)
|
||||
}
|
||||
}
|
||||
|
@ -11,7 +11,6 @@
|
||||
package com.r3.corda.networkmanage.common.persistence
|
||||
|
||||
import com.r3.corda.networkmanage.TestBase
|
||||
import com.r3.corda.networkmanage.common.persistence.entity.NetworkMapEntity
|
||||
import com.r3.corda.networkmanage.common.persistence.entity.NodeInfoEntity
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
@ -160,8 +159,9 @@ class PersistentNodeInfoStorageTest : TestBase() {
|
||||
val (nodeInfoAndSigned) = createValidSignedNodeInfo("Test", requestStorage)
|
||||
nodeInfoStorage.putNodeInfo(nodeInfoAndSigned)
|
||||
val nodeInfo = singleNodeInfo()
|
||||
Thread.sleep(500)
|
||||
nodeInfoStorage.putNodeInfo(nodeInfoAndSigned)
|
||||
assertThat(nodeInfo.publishedAt).isEqualTo(singleNodeInfo().publishedAt) // Check publishAt hasn't changed
|
||||
assertThat(nodeInfo.publishedAt).isBefore(singleNodeInfo().publishedAt) // Check publishAt has changed
|
||||
assertThat(singleNodeInfo().isCurrent).isTrue()
|
||||
}
|
||||
|
||||
|
@ -157,7 +157,8 @@ class ParametersUpdateHandlerTest {
|
||||
mapOf("minimumPlatformVersion" to 1,
|
||||
"maxMessageSize" to 10485760,
|
||||
"maxTransactionSize" to 10485760,
|
||||
"notaries" to notaryFiles.map { mapOf("notaryNodeInfoFile" to it.toString(), "validating" to true) }
|
||||
"notaries" to notaryFiles.map { mapOf("notaryNodeInfoFile" to it.toString(), "validating" to true) },
|
||||
"eventHorizonDays" to 7
|
||||
)
|
||||
|
||||
).toConfig()
|
||||
|
@ -25,6 +25,7 @@ import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||
import net.corda.core.serialization.internal._contextSerializationEnv
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.nodeapi.internal.ContractsJar
|
||||
@ -232,7 +233,8 @@ class NetworkBootstrapper {
|
||||
maxMessageSize = 10485760,
|
||||
maxTransactionSize = Int.MAX_VALUE,
|
||||
whitelistedContractImplementations = whitelist,
|
||||
epoch = 1
|
||||
epoch = 1,
|
||||
eventHorizon = 30.days
|
||||
)
|
||||
}
|
||||
val copier = NetworkParametersCopier(networkParameters, overwriteFile = true)
|
||||
|
@ -211,6 +211,23 @@ class NetworkMapTest : IntegrationTest() {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test node heartbeat`() {
|
||||
internalDriver(
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
initialiseSerialization = false,
|
||||
systemProperties = mapOf("net.corda.node.internal.nodeinfo.publish.interval" to 1.seconds.toString())
|
||||
) {
|
||||
val aliceNode = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
assertThat(networkMapServer.networkMapHashes()).contains(aliceNode.nodeInfo.serialize().hash)
|
||||
networkMapServer.removeNodeInfo(aliceNode.nodeInfo)
|
||||
assertThat(networkMapServer.networkMapHashes()).doesNotContain(aliceNode.nodeInfo.serialize().hash)
|
||||
Thread.sleep(2000)
|
||||
assertThat(networkMapServer.networkMapHashes()).contains(aliceNode.nodeInfo.serialize().hash)
|
||||
}
|
||||
}
|
||||
|
||||
private fun NodeHandle.onlySees(vararg nodes: NodeInfo) {
|
||||
// Make sure the nodes aren't getting the node infos from their additional directories
|
||||
val nodeInfosDir = baseDirectory / CordformNode.NODE_INFO_DIRECTORY
|
||||
|
@ -37,9 +37,7 @@ import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.node.CordaClock
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.classloading.requireAnnotation
|
||||
@ -96,6 +94,7 @@ import java.sql.Connection
|
||||
import java.sql.DriverManager
|
||||
import java.time.Clock
|
||||
import java.time.Duration
|
||||
import java.time.format.DateTimeParseException
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.Executors
|
||||
@ -363,6 +362,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
// Write the node-info file even if nothing's changed, just in case the file has been deleted.
|
||||
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
|
||||
|
||||
// Always republish on startup, it's treated by network map server as a heartbeat.
|
||||
if (networkMapClient != null) {
|
||||
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
|
||||
}
|
||||
@ -370,18 +370,31 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
return Pair(keyPairs, nodeInfo)
|
||||
}
|
||||
|
||||
// Publish node info on startup and start task that sends every day a heartbeat - republishes node info.
|
||||
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
|
||||
// By default heartbeat interval should be set to 1 day, but for testing we may change it.
|
||||
val republishProperty = System.getProperty("net.corda.node.internal.nodeinfo.publish.interval")
|
||||
val heartbeatInterval = if (republishProperty != null) {
|
||||
try {
|
||||
Duration.parse(republishProperty)
|
||||
} catch (e: DateTimeParseException) {
|
||||
1.days
|
||||
}
|
||||
} else {
|
||||
1.days
|
||||
}
|
||||
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory()))
|
||||
|
||||
executor.submit(object : Runnable {
|
||||
override fun run() {
|
||||
try {
|
||||
val republishInterval = try {
|
||||
networkMapClient.publish(signedNodeInfo)
|
||||
heartbeatInterval
|
||||
} catch (t: Throwable) {
|
||||
log.warn("Error encountered while publishing node info, will retry again", t)
|
||||
// TODO: Exponential backoff?
|
||||
executor.schedule(this, 1, TimeUnit.MINUTES)
|
||||
// TODO: Exponential backoff? It should reach max interval of eventHorizon/2.
|
||||
1.minutes
|
||||
}
|
||||
executor.schedule(this, republishInterval.toMinutes(), TimeUnit.MINUTES)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -28,7 +28,6 @@ import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException
|
||||
import net.corda.nodeapi.internal.network.*
|
||||
import rx.Subscription
|
||||
import rx.subjects.PublishSubject
|
||||
import java.net.URL
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardCopyOption
|
||||
import java.time.Duration
|
||||
|
@ -18,11 +18,14 @@ import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.createDevNetworkMapCa
|
||||
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
|
||||
import net.corda.nodeapi.internal.network.NetworkMap
|
||||
import net.corda.nodeapi.internal.network.ParametersUpdate
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import org.eclipse.jetty.server.Server
|
||||
import org.eclipse.jetty.server.ServerConnector
|
||||
import org.eclipse.jetty.server.handler.HandlerCollection
|
||||
@ -50,7 +53,7 @@ class NetworkMapServer(private val pollInterval: Duration,
|
||||
private val myHostNameValue: String = "test.host.name",
|
||||
vararg additionalServices: Any) : Closeable {
|
||||
companion object {
|
||||
private val stubNetworkParameters = NetworkParameters(1, emptyList(), 10485760, Int.MAX_VALUE, Instant.now(), 10, emptyMap())
|
||||
private val stubNetworkParameters = testNetworkParameters(epoch = 10)
|
||||
}
|
||||
|
||||
private val server: Server
|
||||
@ -88,6 +91,8 @@ class NetworkMapServer(private val pollInterval: Duration,
|
||||
.let { NetworkHostAndPort(it.host, it.localPort) }
|
||||
}
|
||||
|
||||
fun networkMapHashes(): List<SecureHash> = service.nodeInfoMap.keys.toList()
|
||||
|
||||
fun removeNodeInfo(nodeInfo: NodeInfo) {
|
||||
service.removeNodeInfo(nodeInfo)
|
||||
}
|
||||
@ -118,7 +123,7 @@ class NetworkMapServer(private val pollInterval: Duration,
|
||||
@Path("network-map")
|
||||
inner class InMemoryNetworkMapService {
|
||||
private val nodeNamesUUID = mutableMapOf<CordaX500Name, UUID>()
|
||||
private val nodeInfoMap = mutableMapOf<SecureHash, SignedNodeInfo>()
|
||||
val nodeInfoMap = mutableMapOf<SecureHash, SignedNodeInfo>()
|
||||
// Mapping from the UUID of the network (null for global one) to hashes of the nodes in network
|
||||
private val networkMaps = mutableMapOf<UUID?, MutableSet<SecureHash>>()
|
||||
val latestAcceptedParametersMap = mutableMapOf<PublicKey, SecureHash>()
|
||||
|
@ -13,6 +13,8 @@ package net.corda.testing.common.internal
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.node.services.AttachmentId
|
||||
import net.corda.core.utilities.days
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
|
||||
fun testNetworkParameters(
|
||||
@ -23,7 +25,8 @@ fun testNetworkParameters(
|
||||
// TODO: Make this configurable and consistence across driver, bootstrapper, demobench and NetworkMapServer
|
||||
maxTransactionSize: Int = maxMessageSize,
|
||||
whitelistedContractImplementations: Map<String, List<AttachmentId>> = emptyMap(),
|
||||
epoch: Int = 1
|
||||
epoch: Int = 1,
|
||||
eventHorizon: Duration = 30.days
|
||||
): NetworkParameters {
|
||||
return NetworkParameters(
|
||||
minimumPlatformVersion = minimumPlatformVersion,
|
||||
@ -32,6 +35,7 @@ fun testNetworkParameters(
|
||||
maxTransactionSize = maxTransactionSize,
|
||||
whitelistedContractImplementations = whitelistedContractImplementations,
|
||||
modifiedTime = modifiedTime,
|
||||
epoch = epoch
|
||||
epoch = epoch,
|
||||
eventHorizon = eventHorizon
|
||||
)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user