From 509a52ad5ee764d9c521ea78aa901598a1bb9219 Mon Sep 17 00:00:00 2001 From: Katarzyna Streich Date: Fri, 11 May 2018 17:11:56 +0100 Subject: [PATCH] CORDA-866: Implement removal of stale nodes from network (#774) * Implement removal of stale nodes from network Add eventHorizon to NetworkParameters structure. Add republishing of node info on 1 day intervals - it is treated by network map as heartbeat from node indicating if it's alive or not. Add removal of old node infos on network map signing. * Add copy method to NetworkParameters data class Add JvmOverloads annotation to the constructor, because it's data class exposed in API --- .../net/corda/core/node/NetworkParameters.kt | 31 ++++++++++++++++--- docs/source/network-map.rst | 3 ++ .../doorman/network-parameters.conf | 1 + network-management/README.md | 1 + network-management/network-parameters.conf | 1 + .../doorman/NetworkParametersUpdateTest.kt | 3 +- .../doorman/NodeRegistrationTest.kt | 3 +- .../common/persistence/NetworkMapStorage.kt | 3 +- .../common/persistence/NodeInfoStorage.kt | 4 +++ .../PersistentNetworkMapStorage.kt | 15 +++++++-- .../persistence/PersistentNodeInfoStorage.kt | 17 +++++----- .../persistence/entity/NodeInfoEntity.kt | 2 ++ .../doorman/DoormanArgsParser.kt | 6 +++- .../doorman/NetworkParametersConfig.kt | 2 ++ .../com/r3/corda/networkmanage/TestUtils.kt | 3 +- .../PersistentNetworkMapStorageTest.kt | 31 +++++++++++++++++++ .../PersistentNodeInfoStorageTest.kt | 4 +-- .../doorman/ParametersUpdateHandlerTest.kt | 3 +- .../internal/network/NetworkBootstrapper.kt | 4 ++- .../node/services/network/NetworkMapTest.kt | 17 ++++++++++ .../net/corda/node/internal/AbstractNode.kt | 27 +++++++++++----- .../services/network/NetworkMapUpdater.kt | 1 - .../node/internal/network/NetworkMapServer.kt | 9 ++++-- .../common/internal/ParametersUtilities.kt | 8 +++-- 24 files changed, 163 insertions(+), 36 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt b/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt index e86e570689..935aae08a6 100644 --- a/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt +++ b/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt @@ -13,6 +13,8 @@ package net.corda.core.node import net.corda.core.identity.Party import net.corda.core.node.services.AttachmentId import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.days +import java.time.Duration import java.time.Instant /** @@ -27,18 +29,19 @@ import java.time.Instant * of parameters. * @property whitelistedContractImplementations List of whitelisted jars containing contract code for each contract class. * This will be used by [net.corda.core.contracts.WhitelistedByZoneAttachmentConstraint]. Read more about contract constraints here: + * @property eventHorizon Time after which nodes will be removed from the network map if they have not been seen + * during this period */ -// TODO Add eventHorizon - how many days a node can be offline before being automatically ejected from the network. -// It needs separate design. @CordaSerializable -data class NetworkParameters( +data class NetworkParameters @JvmOverloads constructor( val minimumPlatformVersion: Int, val notaries: List, val maxMessageSize: Int, val maxTransactionSize: Int, val modifiedTime: Instant, val epoch: Int, - val whitelistedContractImplementations: Map> + val whitelistedContractImplementations: Map>, + val eventHorizon: Duration = Int.MAX_VALUE.days ) { init { require(minimumPlatformVersion > 0) { "minimumPlatformVersion must be at least 1" } @@ -46,6 +49,25 @@ data class NetworkParameters( require(epoch > 0) { "epoch must be at least 1" } require(maxMessageSize > 0) { "maxMessageSize must be at least 1" } require(maxTransactionSize > 0) { "maxTransactionSize must be at least 1" } + require(!eventHorizon.isNegative) { "eventHorizon must be positive value" } + } + + fun copy(minimumPlatformVersion: Int, + notaries: List, + maxMessageSize: Int, + maxTransactionSize: Int, + modifiedTime: Instant, + epoch: Int, + whitelistedContractImplementations: Map> + ): NetworkParameters { + return copy(minimumPlatformVersion = minimumPlatformVersion, + notaries = notaries, + maxMessageSize = maxMessageSize, + maxTransactionSize = maxTransactionSize, + modifiedTime = modifiedTime, + epoch = epoch, + whitelistedContractImplementations = whitelistedContractImplementations, + eventHorizon = eventHorizon) } override fun toString(): String { @@ -57,6 +79,7 @@ data class NetworkParameters( whitelistedContractImplementations { ${whitelistedContractImplementations.entries.joinToString("\n ")} } + eventHorizon=$eventHorizon modifiedTime=$modifiedTime epoch=$epoch }""" diff --git a/docs/source/network-map.rst b/docs/source/network-map.rst index 3233ee3b08..78da763990 100644 --- a/docs/source/network-map.rst +++ b/docs/source/network-map.rst @@ -121,6 +121,9 @@ The current set of network parameters: For each contract class there is a list of hashes of the approved CorDapp jar versions containing that contract. Read more about *Zone constraints* here :doc:`api-contract-constraints` +:eventHorizon: Time after which nodes are considered to be unresponsive and removed from network map. Nodes republish their + ``NodeInfo`` on a regular interval. Network map treats that as a heartbeat from the node. + More parameters will be added in future releases to regulate things like allowed port numbers, how long a node can be offline before it is evicted from the zone, whether or not IPv6 connectivity is required for zone members, required cryptographic algorithms and rollout schedules (e.g. for moving to post quantum cryptography), parameters related to diff --git a/experimental/kubernetes/config-files/doorman/network-parameters.conf b/experimental/kubernetes/config-files/doorman/network-parameters.conf index 60694df675..806d6f62c4 100644 --- a/experimental/kubernetes/config-files/doorman/network-parameters.conf +++ b/experimental/kubernetes/config-files/doorman/network-parameters.conf @@ -5,3 +5,4 @@ notaries : [{ minimumPlatformVersion = 1 maxMessageSize = 10485760 maxTransactionSize = 10485760 + eventHorizonDays = 30 # Duration in days diff --git a/network-management/README.md b/network-management/README.md index fe8a4c048f..353a8fcd15 100644 --- a/network-management/README.md +++ b/network-management/README.md @@ -244,6 +244,7 @@ networkMap { minimumPlatformVersion = 1 maxMessageSize = 10485760 maxTransactionSize = 10485760 + eventHorizonDays = 30 # Duration in days Save the parameters to `network-parameters.conf` diff --git a/network-management/network-parameters.conf b/network-management/network-parameters.conf index 2491be3486..0245c99e4e 100644 --- a/network-management/network-parameters.conf +++ b/network-management/network-parameters.conf @@ -8,3 +8,4 @@ notaries : [{ minimumPlatformVersion = 1 maxMessageSize = 10485760 maxTransactionSize = 10485760 +eventHorizonDays = 30 # Duration in days \ No newline at end of file diff --git a/network-management/src/integration-test/kotlin/com/r3/corda/networkmanage/doorman/NetworkParametersUpdateTest.kt b/network-management/src/integration-test/kotlin/com/r3/corda/networkmanage/doorman/NetworkParametersUpdateTest.kt index 873fc2fa24..76c936cad8 100644 --- a/network-management/src/integration-test/kotlin/com/r3/corda/networkmanage/doorman/NetworkParametersUpdateTest.kt +++ b/network-management/src/integration-test/kotlin/com/r3/corda/networkmanage/doorman/NetworkParametersUpdateTest.kt @@ -79,7 +79,8 @@ class NetworkParametersUpdateTest : IntegrationTest() { minimumPlatformVersion = 1, maxMessageSize = 1_000_000, maxTransactionSize = 1_000_000, - parametersUpdate = null + parametersUpdate = null, + eventHorizonDays = 30 ) applyNetworkParametersAndStart(initialNetParams) diff --git a/network-management/src/integration-test/kotlin/com/r3/corda/networkmanage/doorman/NodeRegistrationTest.kt b/network-management/src/integration-test/kotlin/com/r3/corda/networkmanage/doorman/NodeRegistrationTest.kt index a966c6750d..61d280eaf1 100644 --- a/network-management/src/integration-test/kotlin/com/r3/corda/networkmanage/doorman/NodeRegistrationTest.kt +++ b/network-management/src/integration-test/kotlin/com/r3/corda/networkmanage/doorman/NodeRegistrationTest.kt @@ -117,7 +117,8 @@ class NodeRegistrationTest : IntegrationTest() { minimumPlatformVersion = 1, maxMessageSize = 10485760, maxTransactionSize = 10485760, - parametersUpdate = null + parametersUpdate = null, + eventHorizonDays = 30 ) // Restart the server once we're able to generate the network parameters applyNetworkParametersAndStart(setNetParams) diff --git a/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/NetworkMapStorage.kt b/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/NetworkMapStorage.kt index eaf8757ab7..c7cf8e16b6 100644 --- a/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/NetworkMapStorage.kt +++ b/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/NetworkMapStorage.kt @@ -37,7 +37,8 @@ interface NetworkMapStorage { fun saveNewNetworkMap(networkId: String? = null, networkMapAndSigned: NetworkMapAndSigned) /** - * Retrieves node info hashes for both public and private networks where [NodeInfoEntity.isCurrent] is true and the certificate status is [CertificateStatus.VALID] + * Retrieves node info hashes for both public and private networks where [NodeInfoEntity.isCurrent] is true and the certificate status is [CertificateStatus.VALID], + * and that were published less than eventHorizon ago. * Nodes should have declared that they are using correct set of parameters. */ fun getNodeInfoHashes(): NodeInfoHashes diff --git a/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/NodeInfoStorage.kt b/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/NodeInfoStorage.kt index 9fb43c2d17..ad4f354ddb 100644 --- a/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/NodeInfoStorage.kt +++ b/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/NodeInfoStorage.kt @@ -42,6 +42,10 @@ interface NodeInfoStorage { /** * The [nodeInfoAndSigned] is keyed by the public key, old node info with the same public key will be replaced by the new node info. + * If republishing of the same nodeInfo happens, then we will record the time it was republished in the database. + * Based on that information we can remove unresponsive nodes from network (event horizon is the parameter that tells how + * long node can be down before it gets removed). If the nodes becomes active again, it will enter back to the network map + * after republishing its [NodeInfo]. * @param nodeInfoAndSigned signed node info data to be stored * @return hash for the newly created node info entry */ diff --git a/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNetworkMapStorage.kt b/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNetworkMapStorage.kt index d8432d3a78..6b2cd38acf 100644 --- a/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNetworkMapStorage.kt +++ b/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNetworkMapStorage.kt @@ -69,7 +69,9 @@ class PersistentNetworkMapStorage(private val database: CordaPersistence) : Netw override fun getNodeInfoHashes(): NodeInfoHashes { return database.transaction { + val currentParameters = getNetworkMaps().publicNetworkMap?.networkParameters?.networkParameters val builder = session.criteriaBuilder + // TODO Convert this query to JPQL so it's more readable. val query = builder.createTupleQuery().run { from(NodeInfoEntity::class.java).run { val certStatusExpression = get(NodeInfoEntity::certificateSigningRequest.name) @@ -79,12 +81,19 @@ class PersistentNetworkMapStorage(private val database: CordaPersistence) : Netw // isn't needed. val certStatusEq = builder.equal(certStatusExpression, CertificateStatus.VALID) val isCurrentNodeInfo = builder.isTrue(get(NodeInfoEntity::isCurrent.name)) - + // We enable eventHorizon only if minimum platform version is greater than 3, nodes on previous versions + // don't republish their node infos on regular intervals so they shouldn't be evicted from network after eventHorizon. + val eventHorizonAgo = if (currentParameters != null && currentParameters.minimumPlatformVersion >= 4) { + builder.greaterThanOrEqualTo(get(NodeInfoEntity::publishedAt.name), + Instant.now().minus(currentParameters.eventHorizon)) + } else { + builder.and() // This expression is always true. It's needed when eventHorizon isn't enabled. + } val networkIdSelector = get(NodeInfoEntity::certificateSigningRequest.name) .get(CertificateSigningRequestEntity::privateNetwork.name) .get(PrivateNetworkEntity::networkId.name) - - multiselect(networkIdSelector, get(NodeInfoEntity::nodeInfoHash.name)).where(builder.and(certStatusEq, isCurrentNodeInfo)) + multiselect(networkIdSelector, get(NodeInfoEntity::nodeInfoHash.name)) + .where(builder.and(certStatusEq, isCurrentNodeInfo, eventHorizonAgo)) } } val allNodeInfos = session.createQuery(query).resultList.groupBy { it[0]?.toString() ?: PUBLIC_NETWORK_ID }.mapValues { it.value.map { SecureHash.parse(it.get(1, String::class.java)) } } diff --git a/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNodeInfoStorage.kt b/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNodeInfoStorage.kt index e2a229c4bb..59e3340e84 100644 --- a/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNodeInfoStorage.kt +++ b/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNodeInfoStorage.kt @@ -16,10 +16,10 @@ import com.r3.corda.networkmanage.common.persistence.entity.ParametersUpdateEnti import com.r3.corda.networkmanage.common.persistence.entity.UpdateStatus import com.r3.corda.networkmanage.common.utils.logger import net.corda.core.crypto.SecureHash -import net.corda.core.crypto.sha256 import net.corda.core.internal.CertRole import net.corda.core.internal.CertRole.NODE_CA import net.corda.core.internal.hash +import net.corda.core.utilities.debug import net.corda.nodeapi.internal.NodeInfoAndSigned import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.crypto.x509Certificates @@ -27,6 +27,7 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseTransaction import java.security.PublicKey import java.security.cert.CertPath +import java.time.Instant /** * Database implementation of the [NetworkMapStorage] interface @@ -40,16 +41,16 @@ class PersistentNodeInfoStorage(private val database: CordaPersistence) : NodeIn val registeredIdentities = nodeInfo.legalIdentitiesAndCerts.map { it.certPath.x509Certificates.single { CertRole.extract(it) in setOf(CertRole.SERVICE_IDENTITY, NODE_CA) } } database.transaction { - val count = session.createQuery( - "select count(*) from ${NodeInfoEntity::class.java.name} where nodeInfoHash = :nodeInfoHash and isCurrent = true", java.lang.Long::class.java) + // Record fact of republishing of the node info, it's treated as a heartbeat from the node. + val rowsUpdated = session.createQuery("update ${NodeInfoEntity::class.java.name} n set publishedAt = :now " + + "where n.nodeInfoHash = :nodeInfoHash and n.isCurrent = true") + .setParameter("now", Instant.now()) .setParameter("nodeInfoHash", nodeInfoHash.toString()) - .singleResult - .toLong() - if (count != 0L) { - logger.debug("Ignoring duplicate publish: $nodeInfo") + .executeUpdate() + if (rowsUpdated != 0) { + logger.debug { "Republish of $nodeInfo" } return@transaction nodeInfoHash } - // TODO Move these checks out of data access layer // For each identity known by the doorman, validate against it's CSR. val requests = registeredIdentities.map { diff --git a/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/entity/NodeInfoEntity.kt b/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/entity/NodeInfoEntity.kt index 100973b572..4f0f224323 100644 --- a/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/entity/NodeInfoEntity.kt +++ b/network-management/src/main/kotlin/com/r3/corda/networkmanage/common/persistence/entity/NodeInfoEntity.kt @@ -12,6 +12,7 @@ package com.r3.corda.networkmanage.common.persistence.entity import net.corda.core.crypto.SecureHash import net.corda.nodeapi.internal.SignedNodeInfo +import org.hibernate.annotations.UpdateTimestamp import java.io.Serializable import java.time.Instant import javax.persistence.* @@ -42,6 +43,7 @@ data class NodeInfoEntity( val isCurrent: Boolean, @Column(name = "published_at", nullable = false) + @UpdateTimestamp val publishedAt: Instant = Instant.now(), @ManyToOne(fetch = FetchType.EAGER) diff --git a/network-management/src/main/kotlin/com/r3/corda/networkmanage/doorman/DoormanArgsParser.kt b/network-management/src/main/kotlin/com/r3/corda/networkmanage/doorman/DoormanArgsParser.kt index 044717a9b5..32697dc298 100644 --- a/network-management/src/main/kotlin/com/r3/corda/networkmanage/doorman/DoormanArgsParser.kt +++ b/network-management/src/main/kotlin/com/r3/corda/networkmanage/doorman/DoormanArgsParser.kt @@ -8,6 +8,7 @@ import joptsimple.util.PathConverter import joptsimple.util.PathProperties import net.corda.core.node.NetworkParameters import net.corda.core.node.NotaryInfo +import net.corda.core.utilities.days import java.nio.file.Path import java.time.Instant @@ -70,6 +71,7 @@ sealed class NetworkParametersCmd { val notaries: List, val maxMessageSize: Int, val maxTransactionSize: Int, + val eventHorizonDays: Int, val parametersUpdate: ParametersUpdateConfig? ) : NetworkParametersCmd() { companion object { @@ -79,6 +81,7 @@ sealed class NetworkParametersCmd { config.notaries.map { it.toNotaryInfo() }, config.maxMessageSize, config.maxTransactionSize, + config.eventHorizonDays, config.parametersUpdate ) } @@ -103,7 +106,8 @@ sealed class NetworkParametersCmd { modifiedTime, epoch, // TODO: Tudor, Michal - pass the actual network parameters where we figure out how - emptyMap() + emptyMap(), + eventHorizonDays.days ) } } diff --git a/network-management/src/main/kotlin/com/r3/corda/networkmanage/doorman/NetworkParametersConfig.kt b/network-management/src/main/kotlin/com/r3/corda/networkmanage/doorman/NetworkParametersConfig.kt index 8fd8795afc..6e3a043185 100644 --- a/network-management/src/main/kotlin/com/r3/corda/networkmanage/doorman/NetworkParametersConfig.kt +++ b/network-management/src/main/kotlin/com/r3/corda/networkmanage/doorman/NetworkParametersConfig.kt @@ -18,6 +18,7 @@ import net.corda.core.node.NotaryInfo import net.corda.nodeapi.internal.SignedNodeInfo import java.nio.file.Path import java.time.Instant +import java.time.Duration /** * Data class representing a [NotaryInfo] which can be easily parsed by a typesafe [ConfigFactory]. @@ -53,4 +54,5 @@ data class NetworkParametersConfig(val minimumPlatformVersion: Int, val notaries: List, val maxMessageSize: Int, val maxTransactionSize: Int, + val eventHorizonDays: Int, val parametersUpdate: ParametersUpdateConfig?) diff --git a/network-management/src/test/kotlin/com/r3/corda/networkmanage/TestUtils.kt b/network-management/src/test/kotlin/com/r3/corda/networkmanage/TestUtils.kt index 4388c045d7..59386c26d6 100644 --- a/network-management/src/test/kotlin/com/r3/corda/networkmanage/TestUtils.kt +++ b/network-management/src/test/kotlin/com/r3/corda/networkmanage/TestUtils.kt @@ -80,6 +80,7 @@ fun NetworkParameters.toCmd(parametersUpdate: ParametersUpdateConfig? = null): N notaries = notaries, maxMessageSize = maxMessageSize, maxTransactionSize = maxTransactionSize, - parametersUpdate = parametersUpdate + parametersUpdate = parametersUpdate, + eventHorizonDays = eventHorizon.toDays().toInt() ) } diff --git a/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNetworkMapStorageTest.kt b/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNetworkMapStorageTest.kt index d5a329e54f..b416b5c765 100644 --- a/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNetworkMapStorageTest.kt +++ b/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNetworkMapStorageTest.kt @@ -18,6 +18,7 @@ import com.r3.corda.networkmanage.common.persistence.entity.UpdateStatus import net.corda.core.crypto.SecureHash import net.corda.core.serialization.serialize import net.corda.core.utilities.days +import net.corda.core.utilities.seconds import net.corda.nodeapi.internal.createDevNetworkMapCa import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair import net.corda.nodeapi.internal.network.NetworkMap @@ -208,4 +209,34 @@ class PersistentNetworkMapStorageTest : TestBase() { // then assertThat(validNodeInfoHashes).containsOnly(nodeInfoHashB) } + + @Test + fun `remove nodes older than eventHorizon from network map`() { + val networkParameters = testNetworkParameters(eventHorizon = 1.seconds, minimumPlatformVersion = 4) + val (signedNodeInfoA) = createValidSignedNodeInfo("TestA", requestStorage) + val networkParametersEntity = networkMapStorage.saveNetworkParameters(networkParameters, networkMapCertAndKeyPair.sign(networkParameters).sig) + val networkMap = NetworkMap(emptyList(), SecureHash.parse(networkParametersEntity.hash), null) + val networkMapAndSigned = NetworkMapAndSigned(networkMap) { networkMapCertAndKeyPair.sign(networkMap).sig } + networkMapStorage.saveNewNetworkMap(networkMapAndSigned = networkMapAndSigned) + nodeInfoStorage.putNodeInfo(signedNodeInfoA) + assertThat(networkMapStorage.getNodeInfoHashes().publicNodeInfoHashes).containsExactly(signedNodeInfoA.signed.raw.hash) + Thread.sleep(2000) // Wait for node to be older than eventHorizon + assertThat(networkMapStorage.getNodeInfoHashes().publicNodeInfoHashes).doesNotContain(signedNodeInfoA.signed.raw.hash) + nodeInfoStorage.putNodeInfo(signedNodeInfoA) // Republish + assertThat(networkMapStorage.getNodeInfoHashes().publicNodeInfoHashes).containsExactly(signedNodeInfoA.signed.raw.hash) + } + + @Test + fun `don't enable eventHorizon for platform version less than 4`() { + val networkParameters = testNetworkParameters(eventHorizon = 1.seconds, minimumPlatformVersion = 3) + val (signedNodeInfoA) = createValidSignedNodeInfo("TestA", requestStorage) + val networkParametersEntity = networkMapStorage.saveNetworkParameters(networkParameters, networkMapCertAndKeyPair.sign(networkParameters).sig) + val networkMap = NetworkMap(emptyList(), SecureHash.parse(networkParametersEntity.hash), null) + val networkMapAndSigned = NetworkMapAndSigned(networkMap) { networkMapCertAndKeyPair.sign(networkMap).sig } + networkMapStorage.saveNewNetworkMap(networkMapAndSigned = networkMapAndSigned) + nodeInfoStorage.putNodeInfo(signedNodeInfoA) + assertThat(networkMapStorage.getNodeInfoHashes().publicNodeInfoHashes).containsExactly(signedNodeInfoA.signed.raw.hash) + Thread.sleep(2000) // Wait for eventHorizon to pass + assertThat(networkMapStorage.getNodeInfoHashes().publicNodeInfoHashes).containsExactly(signedNodeInfoA.signed.raw.hash) + } } diff --git a/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNodeInfoStorageTest.kt b/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNodeInfoStorageTest.kt index 57835beba1..f4f258820c 100644 --- a/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNodeInfoStorageTest.kt +++ b/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNodeInfoStorageTest.kt @@ -11,7 +11,6 @@ package com.r3.corda.networkmanage.common.persistence import com.r3.corda.networkmanage.TestBase -import com.r3.corda.networkmanage.common.persistence.entity.NetworkMapEntity import com.r3.corda.networkmanage.common.persistence.entity.NodeInfoEntity import net.corda.core.crypto.Crypto import net.corda.core.identity.CordaX500Name @@ -160,8 +159,9 @@ class PersistentNodeInfoStorageTest : TestBase() { val (nodeInfoAndSigned) = createValidSignedNodeInfo("Test", requestStorage) nodeInfoStorage.putNodeInfo(nodeInfoAndSigned) val nodeInfo = singleNodeInfo() + Thread.sleep(500) nodeInfoStorage.putNodeInfo(nodeInfoAndSigned) - assertThat(nodeInfo.publishedAt).isEqualTo(singleNodeInfo().publishedAt) // Check publishAt hasn't changed + assertThat(nodeInfo.publishedAt).isBefore(singleNodeInfo().publishedAt) // Check publishAt has changed assertThat(singleNodeInfo().isCurrent).isTrue() } diff --git a/network-management/src/test/kotlin/com/r3/corda/networkmanage/doorman/ParametersUpdateHandlerTest.kt b/network-management/src/test/kotlin/com/r3/corda/networkmanage/doorman/ParametersUpdateHandlerTest.kt index ca904e8691..0ee01532e0 100644 --- a/network-management/src/test/kotlin/com/r3/corda/networkmanage/doorman/ParametersUpdateHandlerTest.kt +++ b/network-management/src/test/kotlin/com/r3/corda/networkmanage/doorman/ParametersUpdateHandlerTest.kt @@ -157,7 +157,8 @@ class ParametersUpdateHandlerTest { mapOf("minimumPlatformVersion" to 1, "maxMessageSize" to 10485760, "maxTransactionSize" to 10485760, - "notaries" to notaryFiles.map { mapOf("notaryNodeInfoFile" to it.toString(), "validating" to true) } + "notaries" to notaryFiles.map { mapOf("notaryNodeInfoFile" to it.toString(), "validating" to true) }, + "eventHorizonDays" to 7 ) ).toConfig() diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt index a61fa67c40..5b35321a3b 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt @@ -25,6 +25,7 @@ import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal._contextSerializationEnv +import net.corda.core.utilities.days import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.nodeapi.internal.ContractsJar @@ -232,7 +233,8 @@ class NetworkBootstrapper { maxMessageSize = 10485760, maxTransactionSize = Int.MAX_VALUE, whitelistedContractImplementations = whitelist, - epoch = 1 + epoch = 1, + eventHorizon = 30.days ) } val copier = NetworkParametersCopier(networkParameters, overwriteFile = true) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt index ba57aeab6f..c605ac9f3b 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt @@ -211,6 +211,23 @@ class NetworkMapTest : IntegrationTest() { } } + @Test + fun `test node heartbeat`() { + internalDriver( + portAllocation = portAllocation, + compatibilityZone = compatibilityZone, + initialiseSerialization = false, + systemProperties = mapOf("net.corda.node.internal.nodeinfo.publish.interval" to 1.seconds.toString()) + ) { + val aliceNode = startNode(providedName = ALICE_NAME).getOrThrow() + assertThat(networkMapServer.networkMapHashes()).contains(aliceNode.nodeInfo.serialize().hash) + networkMapServer.removeNodeInfo(aliceNode.nodeInfo) + assertThat(networkMapServer.networkMapHashes()).doesNotContain(aliceNode.nodeInfo.serialize().hash) + Thread.sleep(2000) + assertThat(networkMapServer.networkMapHashes()).contains(aliceNode.nodeInfo.serialize().hash) + } + } + private fun NodeHandle.onlySees(vararg nodes: NodeInfo) { // Make sure the nodes aren't getting the node infos from their additional directories val nodeInfosDir = baseDirectory / CordformNode.NODE_INFO_DIRECTORY diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index d7cbd0b716..61015442b6 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -37,9 +37,7 @@ import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.debug -import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.* import net.corda.node.CordaClock import net.corda.node.VersionInfo import net.corda.node.internal.classloading.requireAnnotation @@ -96,6 +94,7 @@ import java.sql.Connection import java.sql.DriverManager import java.time.Clock import java.time.Duration +import java.time.format.DateTimeParseException import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors @@ -363,6 +362,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // Write the node-info file even if nothing's changed, just in case the file has been deleted. NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned) + // Always republish on startup, it's treated by network map server as a heartbeat. if (networkMapClient != null) { tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient) } @@ -370,18 +370,31 @@ abstract class AbstractNode(val configuration: NodeConfiguration, return Pair(keyPairs, nodeInfo) } + // Publish node info on startup and start task that sends every day a heartbeat - republishes node info. private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) { + // By default heartbeat interval should be set to 1 day, but for testing we may change it. + val republishProperty = System.getProperty("net.corda.node.internal.nodeinfo.publish.interval") + val heartbeatInterval = if (republishProperty != null) { + try { + Duration.parse(republishProperty) + } catch (e: DateTimeParseException) { + 1.days + } + } else { + 1.days + } val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory())) - executor.submit(object : Runnable { override fun run() { - try { + val republishInterval = try { networkMapClient.publish(signedNodeInfo) + heartbeatInterval } catch (t: Throwable) { log.warn("Error encountered while publishing node info, will retry again", t) - // TODO: Exponential backoff? - executor.schedule(this, 1, TimeUnit.MINUTES) + // TODO: Exponential backoff? It should reach max interval of eventHorizon/2. + 1.minutes } + executor.schedule(this, republishInterval.toMinutes(), TimeUnit.MINUTES) } }) } diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt index 799f87d89f..d6f6512cef 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt @@ -28,7 +28,6 @@ import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException import net.corda.nodeapi.internal.network.* import rx.Subscription import rx.subjects.PublishSubject -import java.net.URL import java.nio.file.Path import java.nio.file.StandardCopyOption import java.time.Duration diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt index b65a105163..1f87edfd42 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt @@ -18,11 +18,14 @@ import net.corda.core.node.NetworkParameters import net.corda.core.node.NodeInfo import net.corda.core.serialization.serialize import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.days import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.createDevNetworkMapCa import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair import net.corda.nodeapi.internal.network.NetworkMap import net.corda.nodeapi.internal.network.ParametersUpdate +import net.corda.testing.common.internal.testNetworkParameters import org.eclipse.jetty.server.Server import org.eclipse.jetty.server.ServerConnector import org.eclipse.jetty.server.handler.HandlerCollection @@ -50,7 +53,7 @@ class NetworkMapServer(private val pollInterval: Duration, private val myHostNameValue: String = "test.host.name", vararg additionalServices: Any) : Closeable { companion object { - private val stubNetworkParameters = NetworkParameters(1, emptyList(), 10485760, Int.MAX_VALUE, Instant.now(), 10, emptyMap()) + private val stubNetworkParameters = testNetworkParameters(epoch = 10) } private val server: Server @@ -88,6 +91,8 @@ class NetworkMapServer(private val pollInterval: Duration, .let { NetworkHostAndPort(it.host, it.localPort) } } + fun networkMapHashes(): List = service.nodeInfoMap.keys.toList() + fun removeNodeInfo(nodeInfo: NodeInfo) { service.removeNodeInfo(nodeInfo) } @@ -118,7 +123,7 @@ class NetworkMapServer(private val pollInterval: Duration, @Path("network-map") inner class InMemoryNetworkMapService { private val nodeNamesUUID = mutableMapOf() - private val nodeInfoMap = mutableMapOf() + val nodeInfoMap = mutableMapOf() // Mapping from the UUID of the network (null for global one) to hashes of the nodes in network private val networkMaps = mutableMapOf>() val latestAcceptedParametersMap = mutableMapOf() diff --git a/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/ParametersUtilities.kt b/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/ParametersUtilities.kt index a43bf0e035..f0163a54ea 100644 --- a/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/ParametersUtilities.kt +++ b/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/ParametersUtilities.kt @@ -13,6 +13,8 @@ package net.corda.testing.common.internal import net.corda.core.node.NetworkParameters import net.corda.core.node.NotaryInfo import net.corda.core.node.services.AttachmentId +import net.corda.core.utilities.days +import java.time.Duration import java.time.Instant fun testNetworkParameters( @@ -23,7 +25,8 @@ fun testNetworkParameters( // TODO: Make this configurable and consistence across driver, bootstrapper, demobench and NetworkMapServer maxTransactionSize: Int = maxMessageSize, whitelistedContractImplementations: Map> = emptyMap(), - epoch: Int = 1 + epoch: Int = 1, + eventHorizon: Duration = 30.days ): NetworkParameters { return NetworkParameters( minimumPlatformVersion = minimumPlatformVersion, @@ -32,6 +35,7 @@ fun testNetworkParameters( maxTransactionSize = maxTransactionSize, whitelistedContractImplementations = whitelistedContractImplementations, modifiedTime = modifiedTime, - epoch = epoch + epoch = epoch, + eventHorizon = eventHorizon ) } \ No newline at end of file