From 24fa695ca0ef72fa851abc5b1630d722f32577ec Mon Sep 17 00:00:00 2001 From: Katarzyna Streich Date: Tue, 15 May 2018 12:10:04 +0100 Subject: [PATCH] CORDA-866: Implement removal of stale nodes from network - backport (#3128) * CORDA-866: Implement removal of stale nodes from network Backported * Implement removal of stale nodes from network Add eventHorizon to NetworkParameters structure. Add republishing of node info on 1 day intervals - it is treated by network map as heartbeat from node indicating if it's alive or not. Add removal of old node infos on network map signing. * Add copy method to NetworkParameters data class Add JvmOverloads annotation to the constructor, because it's data class exposed in API * Fix test --- .../net/corda/core/node/NetworkParameters.kt | 31 ++++++++++++++++--- docs/source/network-map.rst | 3 ++ .../internal/network/NetworkBootstrapper.kt | 4 ++- .../node/services/network/NetworkMapTest.kt | 17 ++++++++++ .../net/corda/node/internal/AbstractNode.kt | 27 +++++++++++----- .../node/internal/network/NetworkMapServer.kt | 9 ++++-- .../common/internal/ParametersUtilities.kt | 8 +++-- 7 files changed, 83 insertions(+), 16 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt b/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt index 2b8662d348..becd7e9d29 100644 --- a/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt +++ b/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt @@ -3,6 +3,8 @@ package net.corda.core.node import net.corda.core.identity.Party import net.corda.core.node.services.AttachmentId import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.days +import java.time.Duration import java.time.Instant /** @@ -17,18 +19,19 @@ import java.time.Instant * of parameters. * @property whitelistedContractImplementations List of whitelisted jars containing contract code for each contract class. * This will be used by [net.corda.core.contracts.WhitelistedByZoneAttachmentConstraint]. Read more about contract constraints here: + * @property eventHorizon Time after which nodes will be removed from the network map if they have not been seen + * during this period */ -// TODO Add eventHorizon - how many days a node can be offline before being automatically ejected from the network. -// It needs separate design. @CordaSerializable -data class NetworkParameters( +data class NetworkParameters @JvmOverloads constructor( val minimumPlatformVersion: Int, val notaries: List, val maxMessageSize: Int, val maxTransactionSize: Int, val modifiedTime: Instant, val epoch: Int, - val whitelistedContractImplementations: Map> + val whitelistedContractImplementations: Map>, + val eventHorizon: Duration = Int.MAX_VALUE.days ) { init { require(minimumPlatformVersion > 0) { "minimumPlatformVersion must be at least 1" } @@ -36,6 +39,25 @@ data class NetworkParameters( require(epoch > 0) { "epoch must be at least 1" } require(maxMessageSize > 0) { "maxMessageSize must be at least 1" } require(maxTransactionSize > 0) { "maxTransactionSize must be at least 1" } + require(!eventHorizon.isNegative) { "eventHorizon must be positive value" } + } + + fun copy(minimumPlatformVersion: Int, + notaries: List, + maxMessageSize: Int, + maxTransactionSize: Int, + modifiedTime: Instant, + epoch: Int, + whitelistedContractImplementations: Map> + ): NetworkParameters { + return copy(minimumPlatformVersion = minimumPlatformVersion, + notaries = notaries, + maxMessageSize = maxMessageSize, + maxTransactionSize = maxTransactionSize, + modifiedTime = modifiedTime, + epoch = epoch, + whitelistedContractImplementations = whitelistedContractImplementations, + eventHorizon = eventHorizon) } override fun toString(): String { @@ -47,6 +69,7 @@ data class NetworkParameters( whitelistedContractImplementations { ${whitelistedContractImplementations.entries.joinToString("\n ")} } + eventHorizon=$eventHorizon modifiedTime=$modifiedTime epoch=$epoch }""" diff --git a/docs/source/network-map.rst b/docs/source/network-map.rst index c7691e5e33..1c56b37ca8 100644 --- a/docs/source/network-map.rst +++ b/docs/source/network-map.rst @@ -121,6 +121,9 @@ The current set of network parameters: For each contract class there is a list of hashes of the approved CorDapp jar versions containing that contract. Read more about *Zone constraints* here :doc:`api-contract-constraints` +:eventHorizon: Time after which nodes are considered to be unresponsive and removed from network map. Nodes republish their + ``NodeInfo`` on a regular interval. Network map treats that as a heartbeat from the node. + More parameters will be added in future releases to regulate things like allowed port numbers, how long a node can be offline before it is evicted from the zone, whether or not IPv6 connectivity is required for zone members, required cryptographic algorithms and rollout schedules (e.g. for moving to post quantum cryptography), parameters related to diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt index e80bbc31a6..395c08ddfa 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt @@ -15,6 +15,7 @@ import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal._contextSerializationEnv +import net.corda.core.utilities.days import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.nodeapi.internal.ContractsJar @@ -222,7 +223,8 @@ class NetworkBootstrapper { maxMessageSize = 10485760, maxTransactionSize = Int.MAX_VALUE, whitelistedContractImplementations = whitelist, - epoch = 1 + epoch = 1, + eventHorizon = 30.days ) } val copier = NetworkParametersCopier(networkParameters, overwriteFile = true) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt index 62647424fa..683fc19fce 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt @@ -199,6 +199,23 @@ class NetworkMapTest { } } + @Test + fun `test node heartbeat`() { + internalDriver( + portAllocation = portAllocation, + compatibilityZone = compatibilityZone, + initialiseSerialization = false, + systemProperties = mapOf("net.corda.node.internal.nodeinfo.publish.interval" to 1.seconds.toString()) + ) { + val aliceNode = startNode(providedName = ALICE_NAME, devMode = false).getOrThrow() + assertThat(networkMapServer.networkMapHashes()).contains(aliceNode.nodeInfo.serialize().hash) + networkMapServer.removeNodeInfo(aliceNode.nodeInfo) + assertThat(networkMapServer.networkMapHashes()).doesNotContain(aliceNode.nodeInfo.serialize().hash) + Thread.sleep(2000) + assertThat(networkMapServer.networkMapHashes()).contains(aliceNode.nodeInfo.serialize().hash) + } + } + private fun NodeHandle.onlySees(vararg nodes: NodeInfo) { // Make sure the nodes aren't getting the node infos from their additional directories val nodeInfosDir = baseDirectory / CordformNode.NODE_INFO_DIRECTORY diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 47db47e03a..96dafeecc8 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -29,9 +29,7 @@ import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.debug -import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.* import net.corda.node.CordaClock import net.corda.node.VersionInfo import net.corda.node.internal.classloading.requireAnnotation @@ -88,6 +86,7 @@ import java.security.cert.X509Certificate import java.sql.Connection import java.time.Clock import java.time.Duration +import java.time.format.DateTimeParseException import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService @@ -367,6 +366,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // Write the node-info file even if nothing's changed, just in case the file has been deleted. NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned) + // Always republish on startup, it's treated by network map server as a heartbeat. if (networkMapClient != null) { tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient) } @@ -374,18 +374,31 @@ abstract class AbstractNode(val configuration: NodeConfiguration, return Pair(keyPairs, nodeInfo) } + // Publish node info on startup and start task that sends every day a heartbeat - republishes node info. private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) { + // By default heartbeat interval should be set to 1 day, but for testing we may change it. + val republishProperty = System.getProperty("net.corda.node.internal.nodeinfo.publish.interval") + val heartbeatInterval = if (republishProperty != null) { + try { + Duration.parse(republishProperty) + } catch (e: DateTimeParseException) { + 1.days + } + } else { + 1.days + } val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory())) - executor.submit(object : Runnable { override fun run() { - try { + val republishInterval = try { networkMapClient.publish(signedNodeInfo) + heartbeatInterval } catch (t: Throwable) { log.warn("Error encountered while publishing node info, will retry again", t) - // TODO: Exponential backoff? - executor.schedule(this, 1, TimeUnit.MINUTES) + // TODO: Exponential backoff? It should reach max interval of eventHorizon/2. + 1.minutes } + executor.schedule(this, republishInterval.toMinutes(), TimeUnit.MINUTES) } }) } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt index 0af7ff6458..d74b35053c 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt @@ -8,11 +8,14 @@ import net.corda.core.node.NetworkParameters import net.corda.core.node.NodeInfo import net.corda.core.serialization.serialize import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.days import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.createDevNetworkMapCa import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair import net.corda.nodeapi.internal.network.NetworkMap import net.corda.nodeapi.internal.network.ParametersUpdate +import net.corda.testing.common.internal.testNetworkParameters import org.eclipse.jetty.server.Server import org.eclipse.jetty.server.ServerConnector import org.eclipse.jetty.server.handler.HandlerCollection @@ -40,7 +43,7 @@ class NetworkMapServer(private val pollInterval: Duration, private val myHostNameValue: String = "test.host.name", vararg additionalServices: Any) : Closeable { companion object { - private val stubNetworkParameters = NetworkParameters(1, emptyList(), 10485760, Int.MAX_VALUE, Instant.now(), 10, emptyMap()) + private val stubNetworkParameters = testNetworkParameters(epoch = 10) } private val server: Server @@ -78,6 +81,8 @@ class NetworkMapServer(private val pollInterval: Duration, .let { NetworkHostAndPort(it.host, it.localPort) } } + fun networkMapHashes(): List = service.nodeInfoMap.keys.toList() + fun removeNodeInfo(nodeInfo: NodeInfo) { service.removeNodeInfo(nodeInfo) } @@ -108,7 +113,7 @@ class NetworkMapServer(private val pollInterval: Duration, @Path("network-map") inner class InMemoryNetworkMapService { private val nodeNamesUUID = mutableMapOf() - private val nodeInfoMap = mutableMapOf() + val nodeInfoMap = mutableMapOf() // Mapping from the UUID of the network (null for global one) to hashes of the nodes in network private val networkMaps = mutableMapOf>() val latestAcceptedParametersMap = mutableMapOf() diff --git a/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/ParametersUtilities.kt b/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/ParametersUtilities.kt index a1f51ca2d3..e9f4902c58 100644 --- a/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/ParametersUtilities.kt +++ b/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/ParametersUtilities.kt @@ -3,6 +3,8 @@ package net.corda.testing.common.internal import net.corda.core.node.NetworkParameters import net.corda.core.node.NotaryInfo import net.corda.core.node.services.AttachmentId +import net.corda.core.utilities.days +import java.time.Duration import java.time.Instant fun testNetworkParameters( @@ -13,7 +15,8 @@ fun testNetworkParameters( // TODO: Make this configurable and consistence across driver, bootstrapper, demobench and NetworkMapServer maxTransactionSize: Int = maxMessageSize, whitelistedContractImplementations: Map> = emptyMap(), - epoch: Int = 1 + epoch: Int = 1, + eventHorizon: Duration = 30.days ): NetworkParameters { return NetworkParameters( minimumPlatformVersion = minimumPlatformVersion, @@ -22,6 +25,7 @@ fun testNetworkParameters( maxTransactionSize = maxTransactionSize, whitelistedContractImplementations = whitelistedContractImplementations, modifiedTime = modifiedTime, - epoch = epoch + epoch = epoch, + eventHorizon = eventHorizon ) } \ No newline at end of file