mirror of
https://github.com/corda/corda.git
synced 2025-04-07 19:34:41 +00:00
CORDA-866: Implement removal of stale nodes from network - backport (#3128)
* CORDA-866: Implement removal of stale nodes from network Backported * Implement removal of stale nodes from network Add eventHorizon to NetworkParameters structure. Add republishing of node info on 1 day intervals - it is treated by network map as heartbeat from node indicating if it's alive or not. Add removal of old node infos on network map signing. * Add copy method to NetworkParameters data class Add JvmOverloads annotation to the constructor, because it's data class exposed in API * Fix test
This commit is contained in:
parent
84d94d44ad
commit
24fa695ca0
@ -3,6 +3,8 @@ package net.corda.core.node
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.AttachmentId
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.days
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
|
||||
/**
|
||||
@ -17,18 +19,19 @@ import java.time.Instant
|
||||
* of parameters.
|
||||
* @property whitelistedContractImplementations List of whitelisted jars containing contract code for each contract class.
|
||||
* This will be used by [net.corda.core.contracts.WhitelistedByZoneAttachmentConstraint]. Read more about contract constraints here: <https://docs.corda.net/api-contract-constraints.html>
|
||||
* @property eventHorizon Time after which nodes will be removed from the network map if they have not been seen
|
||||
* during this period
|
||||
*/
|
||||
// TODO Add eventHorizon - how many days a node can be offline before being automatically ejected from the network.
|
||||
// It needs separate design.
|
||||
@CordaSerializable
|
||||
data class NetworkParameters(
|
||||
data class NetworkParameters @JvmOverloads constructor(
|
||||
val minimumPlatformVersion: Int,
|
||||
val notaries: List<NotaryInfo>,
|
||||
val maxMessageSize: Int,
|
||||
val maxTransactionSize: Int,
|
||||
val modifiedTime: Instant,
|
||||
val epoch: Int,
|
||||
val whitelistedContractImplementations: Map<String, List<AttachmentId>>
|
||||
val whitelistedContractImplementations: Map<String, List<AttachmentId>>,
|
||||
val eventHorizon: Duration = Int.MAX_VALUE.days
|
||||
) {
|
||||
init {
|
||||
require(minimumPlatformVersion > 0) { "minimumPlatformVersion must be at least 1" }
|
||||
@ -36,6 +39,25 @@ data class NetworkParameters(
|
||||
require(epoch > 0) { "epoch must be at least 1" }
|
||||
require(maxMessageSize > 0) { "maxMessageSize must be at least 1" }
|
||||
require(maxTransactionSize > 0) { "maxTransactionSize must be at least 1" }
|
||||
require(!eventHorizon.isNegative) { "eventHorizon must be positive value" }
|
||||
}
|
||||
|
||||
fun copy(minimumPlatformVersion: Int,
|
||||
notaries: List<NotaryInfo>,
|
||||
maxMessageSize: Int,
|
||||
maxTransactionSize: Int,
|
||||
modifiedTime: Instant,
|
||||
epoch: Int,
|
||||
whitelistedContractImplementations: Map<String, List<AttachmentId>>
|
||||
): NetworkParameters {
|
||||
return copy(minimumPlatformVersion = minimumPlatformVersion,
|
||||
notaries = notaries,
|
||||
maxMessageSize = maxMessageSize,
|
||||
maxTransactionSize = maxTransactionSize,
|
||||
modifiedTime = modifiedTime,
|
||||
epoch = epoch,
|
||||
whitelistedContractImplementations = whitelistedContractImplementations,
|
||||
eventHorizon = eventHorizon)
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
@ -47,6 +69,7 @@ data class NetworkParameters(
|
||||
whitelistedContractImplementations {
|
||||
${whitelistedContractImplementations.entries.joinToString("\n ")}
|
||||
}
|
||||
eventHorizon=$eventHorizon
|
||||
modifiedTime=$modifiedTime
|
||||
epoch=$epoch
|
||||
}"""
|
||||
|
@ -121,6 +121,9 @@ The current set of network parameters:
|
||||
For each contract class there is a list of hashes of the approved CorDapp jar versions containing that contract.
|
||||
Read more about *Zone constraints* here :doc:`api-contract-constraints`
|
||||
|
||||
:eventHorizon: Time after which nodes are considered to be unresponsive and removed from network map. Nodes republish their
|
||||
``NodeInfo`` on a regular interval. Network map treats that as a heartbeat from the node.
|
||||
|
||||
More parameters will be added in future releases to regulate things like allowed port numbers, how long a node can be
|
||||
offline before it is evicted from the zone, whether or not IPv6 connectivity is required for zone members, required
|
||||
cryptographic algorithms and rollout schedules (e.g. for moving to post quantum cryptography), parameters related to
|
||||
|
@ -15,6 +15,7 @@ import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||
import net.corda.core.serialization.internal._contextSerializationEnv
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.nodeapi.internal.ContractsJar
|
||||
@ -222,7 +223,8 @@ class NetworkBootstrapper {
|
||||
maxMessageSize = 10485760,
|
||||
maxTransactionSize = Int.MAX_VALUE,
|
||||
whitelistedContractImplementations = whitelist,
|
||||
epoch = 1
|
||||
epoch = 1,
|
||||
eventHorizon = 30.days
|
||||
)
|
||||
}
|
||||
val copier = NetworkParametersCopier(networkParameters, overwriteFile = true)
|
||||
|
@ -199,6 +199,23 @@ class NetworkMapTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test node heartbeat`() {
|
||||
internalDriver(
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
initialiseSerialization = false,
|
||||
systemProperties = mapOf("net.corda.node.internal.nodeinfo.publish.interval" to 1.seconds.toString())
|
||||
) {
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, devMode = false).getOrThrow()
|
||||
assertThat(networkMapServer.networkMapHashes()).contains(aliceNode.nodeInfo.serialize().hash)
|
||||
networkMapServer.removeNodeInfo(aliceNode.nodeInfo)
|
||||
assertThat(networkMapServer.networkMapHashes()).doesNotContain(aliceNode.nodeInfo.serialize().hash)
|
||||
Thread.sleep(2000)
|
||||
assertThat(networkMapServer.networkMapHashes()).contains(aliceNode.nodeInfo.serialize().hash)
|
||||
}
|
||||
}
|
||||
|
||||
private fun NodeHandle.onlySees(vararg nodes: NodeInfo) {
|
||||
// Make sure the nodes aren't getting the node infos from their additional directories
|
||||
val nodeInfosDir = baseDirectory / CordformNode.NODE_INFO_DIRECTORY
|
||||
|
@ -29,9 +29,7 @@ import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.node.CordaClock
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.classloading.requireAnnotation
|
||||
@ -88,6 +86,7 @@ import java.security.cert.X509Certificate
|
||||
import java.sql.Connection
|
||||
import java.time.Clock
|
||||
import java.time.Duration
|
||||
import java.time.format.DateTimeParseException
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.ExecutorService
|
||||
@ -367,6 +366,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
// Write the node-info file even if nothing's changed, just in case the file has been deleted.
|
||||
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
|
||||
|
||||
// Always republish on startup, it's treated by network map server as a heartbeat.
|
||||
if (networkMapClient != null) {
|
||||
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
|
||||
}
|
||||
@ -374,18 +374,31 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
return Pair(keyPairs, nodeInfo)
|
||||
}
|
||||
|
||||
// Publish node info on startup and start task that sends every day a heartbeat - republishes node info.
|
||||
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
|
||||
// By default heartbeat interval should be set to 1 day, but for testing we may change it.
|
||||
val republishProperty = System.getProperty("net.corda.node.internal.nodeinfo.publish.interval")
|
||||
val heartbeatInterval = if (republishProperty != null) {
|
||||
try {
|
||||
Duration.parse(republishProperty)
|
||||
} catch (e: DateTimeParseException) {
|
||||
1.days
|
||||
}
|
||||
} else {
|
||||
1.days
|
||||
}
|
||||
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory()))
|
||||
|
||||
executor.submit(object : Runnable {
|
||||
override fun run() {
|
||||
try {
|
||||
val republishInterval = try {
|
||||
networkMapClient.publish(signedNodeInfo)
|
||||
heartbeatInterval
|
||||
} catch (t: Throwable) {
|
||||
log.warn("Error encountered while publishing node info, will retry again", t)
|
||||
// TODO: Exponential backoff?
|
||||
executor.schedule(this, 1, TimeUnit.MINUTES)
|
||||
// TODO: Exponential backoff? It should reach max interval of eventHorizon/2.
|
||||
1.minutes
|
||||
}
|
||||
executor.schedule(this, republishInterval.toMinutes(), TimeUnit.MINUTES)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -8,11 +8,14 @@ import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.createDevNetworkMapCa
|
||||
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
|
||||
import net.corda.nodeapi.internal.network.NetworkMap
|
||||
import net.corda.nodeapi.internal.network.ParametersUpdate
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import org.eclipse.jetty.server.Server
|
||||
import org.eclipse.jetty.server.ServerConnector
|
||||
import org.eclipse.jetty.server.handler.HandlerCollection
|
||||
@ -40,7 +43,7 @@ class NetworkMapServer(private val pollInterval: Duration,
|
||||
private val myHostNameValue: String = "test.host.name",
|
||||
vararg additionalServices: Any) : Closeable {
|
||||
companion object {
|
||||
private val stubNetworkParameters = NetworkParameters(1, emptyList(), 10485760, Int.MAX_VALUE, Instant.now(), 10, emptyMap())
|
||||
private val stubNetworkParameters = testNetworkParameters(epoch = 10)
|
||||
}
|
||||
|
||||
private val server: Server
|
||||
@ -78,6 +81,8 @@ class NetworkMapServer(private val pollInterval: Duration,
|
||||
.let { NetworkHostAndPort(it.host, it.localPort) }
|
||||
}
|
||||
|
||||
fun networkMapHashes(): List<SecureHash> = service.nodeInfoMap.keys.toList()
|
||||
|
||||
fun removeNodeInfo(nodeInfo: NodeInfo) {
|
||||
service.removeNodeInfo(nodeInfo)
|
||||
}
|
||||
@ -108,7 +113,7 @@ class NetworkMapServer(private val pollInterval: Duration,
|
||||
@Path("network-map")
|
||||
inner class InMemoryNetworkMapService {
|
||||
private val nodeNamesUUID = mutableMapOf<CordaX500Name, UUID>()
|
||||
private val nodeInfoMap = mutableMapOf<SecureHash, SignedNodeInfo>()
|
||||
val nodeInfoMap = mutableMapOf<SecureHash, SignedNodeInfo>()
|
||||
// Mapping from the UUID of the network (null for global one) to hashes of the nodes in network
|
||||
private val networkMaps = mutableMapOf<UUID?, MutableSet<SecureHash>>()
|
||||
val latestAcceptedParametersMap = mutableMapOf<PublicKey, SecureHash>()
|
||||
|
@ -3,6 +3,8 @@ package net.corda.testing.common.internal
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.node.services.AttachmentId
|
||||
import net.corda.core.utilities.days
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
|
||||
fun testNetworkParameters(
|
||||
@ -13,7 +15,8 @@ fun testNetworkParameters(
|
||||
// TODO: Make this configurable and consistence across driver, bootstrapper, demobench and NetworkMapServer
|
||||
maxTransactionSize: Int = maxMessageSize,
|
||||
whitelistedContractImplementations: Map<String, List<AttachmentId>> = emptyMap(),
|
||||
epoch: Int = 1
|
||||
epoch: Int = 1,
|
||||
eventHorizon: Duration = 30.days
|
||||
): NetworkParameters {
|
||||
return NetworkParameters(
|
||||
minimumPlatformVersion = minimumPlatformVersion,
|
||||
@ -22,6 +25,7 @@ fun testNetworkParameters(
|
||||
maxTransactionSize = maxTransactionSize,
|
||||
whitelistedContractImplementations = whitelistedContractImplementations,
|
||||
modifiedTime = modifiedTime,
|
||||
epoch = epoch
|
||||
epoch = epoch,
|
||||
eventHorizon = eventHorizon
|
||||
)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user