CORDA-1276: Cleaned up creation of node-info object on node start and when using --just-generate-node-info (#2909)

This commit is contained in:
Shams Asari
2018-04-03 17:30:53 +01:00
committed by GitHub
parent 2f1b8ff23e
commit 65ff214130
4 changed files with 84 additions and 119 deletions

View File

@ -59,9 +59,11 @@ import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSoftLockManager import net.corda.node.services.vault.VaultSoftLockManager
import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.JVMAgentRegistry import net.corda.node.utilities.JVMAgentRegistry
import net.corda.node.utilities.NamedThreadFactory
import net.corda.node.utilities.NodeBuildProperties import net.corda.node.utilities.NodeBuildProperties
import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.NodeInfoAndSigned import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseConfig
@ -87,6 +89,7 @@ import java.util.*
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.TimeUnit.SECONDS
import kotlin.collections.set import kotlin.collections.set
import kotlin.reflect.KClass import kotlin.reflect.KClass
@ -143,7 +146,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected val runOnStop = ArrayList<() -> Any?>() protected val runOnStop = ArrayList<() -> Any?>()
private val _nodeReadyFuture = openFuture<Unit>() private val _nodeReadyFuture = openFuture<Unit>()
protected var networkMapClient: NetworkMapClient? = null protected var networkMapClient: NetworkMapClient? = null
protected lateinit var networkMapUpdater: NetworkMapUpdater private lateinit var networkMapUpdater: NetworkMapUpdater
lateinit var securityManager: RPCSecurityManager lateinit var securityManager: RPCSecurityManager
private val shutdownExecutor = Executors.newSingleThreadExecutor() private val shutdownExecutor = Executors.newSingleThreadExecutor()
@ -182,15 +185,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)).use { return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)).use {
it.transaction { it.transaction {
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like a code smell. // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks
// like a design smell.
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList()) val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
persistentNetworkMapCache.start() persistentNetworkMapCache.start()
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair) val (_, nodeInfo) = updateNodeInfo(persistentNetworkMapCache, null, identity, identityKeyPair)
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private
privateKey.sign(serialised.bytes)
}
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
nodeInfo nodeInfo
} }
} }
@ -204,15 +203,18 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null) val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null)
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
val identityService = makeIdentityService(identity.certificate) val identityService = makeIdentityService(identity.certificate)
networkMapClient = configuration.compatibilityZoneURL?.let { NetworkMapClient(it, identityService.trustRoot) } networkMapClient = configuration.compatibilityZoneURL?.let { NetworkMapClient(it, identityService.trustRoot) }
val networkParameters = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters val networkParameters = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) { check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
"Node's platform version is lower than network's required minimumPlatformVersion" "Node's platform version is lower than network's required minimumPlatformVersion"
} }
// Do all of this in a database transaction so anything that might need a connection has one. // Do all of this in a database transaction so anything that might need a connection has one.
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService).transaction { val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService).transaction {
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService) val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService)
val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair) val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair)
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts) identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
val metrics = MetricRegistry() val metrics = MetricRegistry()
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes) val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
@ -247,14 +249,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
flowLogicRefFactory = flowLogicRefFactory, flowLogicRefFactory = flowLogicRefFactory,
drainingModePollPeriod = configuration.drainingModePollPeriod, drainingModePollPeriod = configuration.drainingModePollPeriod,
nodeProperties = nodeProperties) nodeProperties = nodeProperties)
if (serverThread is ExecutorService) {
(serverThread as? ExecutorService)?.let {
runOnStop += { runOnStop += {
// We wait here, even though any in-flight messages should have been drained away because the // We wait here, even though any in-flight messages should have been drained away because the
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is // server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
// arbitrary and might be inappropriate. // arbitrary and might be inappropriate.
MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS) MoreExecutors.shutdownAndAwaitTermination(it, 50, SECONDS)
} }
} }
makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService, flowLogicRefFactory) makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService, flowLogicRefFactory)
val rpcOps = makeRPCOps(flowStarter, database, smm) val rpcOps = makeRPCOps(flowStarter, database, smm)
startMessagingService(rpcOps) startMessagingService(rpcOps)
@ -266,6 +270,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
startShell() startShell()
Pair(StartedNodeImpl(this@AbstractNode, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService) Pair(StartedNodeImpl(this@AbstractNode, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
} }
networkMapUpdater = NetworkMapUpdater(services.networkMapCache, networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)), NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
networkMapClient, networkMapClient,
@ -273,15 +278,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
configuration.baseDirectory) configuration.baseDirectory)
runOnStop += networkMapUpdater::close runOnStop += networkMapUpdater::close
log.info("Node-info for this node: ${services.myInfo}")
val nodeInfoAndSigned = NodeInfoAndSigned(services.myInfo) { publicKey, serialised ->
services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey()
}
networkMapUpdater.updateNodeInfo(nodeInfoAndSigned)
networkMapUpdater.subscribeToNetworkMap() networkMapUpdater.subscribeToNetworkMap()
// If we successfully loaded network data from database, we set this future to Unit. // If we successfully loaded network data from database, we set this future to Unit.
_nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit }) _nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit })
return startedImpl.apply { return startedImpl.apply {
@ -310,9 +309,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
} }
} }
private fun initNodeInfo(networkMapCache: NetworkMapCacheBaseInternal, private fun updateNodeInfo(networkMapCache: NetworkMapCacheBaseInternal,
identity: PartyAndCertificate, networkMapClient: NetworkMapClient?,
identityKeyPair: KeyPair): Pair<Set<KeyPair>, NodeInfo> { identity: PartyAndCertificate,
identityKeyPair: KeyPair): Pair<Set<KeyPair>, NodeInfo> {
val keyPairs = mutableSetOf(identityKeyPair) val keyPairs = mutableSetOf(identityKeyPair)
myNotaryIdentity = configuration.notary?.let { myNotaryIdentity = configuration.notary?.let {
@ -326,7 +326,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
} }
} }
val nodeInfoWithBlankSerial = NodeInfo( val potentialNodeInfo = NodeInfo(
myAddresses(), myAddresses(),
setOf(identity, myNotaryIdentity).filterNotNull(), setOf(identity, myNotaryIdentity).filterNotNull(),
versionInfo.platformVersion, versionInfo.platformVersion,
@ -335,16 +335,51 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name) val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name)
val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0)) { val nodeInfo = if (potentialNodeInfo == nodeInfoFromDb?.copy(serial = 0)) {
// The node info hasn't changed. We use the one from the database to preserve the serial. // The node info hasn't changed. We use the one from the database to preserve the serial.
log.debug("Node-info hasn't changed")
nodeInfoFromDb nodeInfoFromDb
} else { } else {
nodeInfoWithBlankSerial.copy(serial = platformClock.millis()) log.info("Node-info has changed so submitting update. Old node-info was $nodeInfoFromDb")
val newNodeInfo = potentialNodeInfo.copy(serial = platformClock.millis())
networkMapCache.addNode(newNodeInfo)
log.info("New node-info: $newNodeInfo")
newNodeInfo
} }
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private
privateKey.sign(serialised.bytes)
}
// Write the node-info file even if nothing's changed, just in case the file has been deleted.
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
if (networkMapClient != null) {
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
}
return Pair(keyPairs, nodeInfo) return Pair(keyPairs, nodeInfo)
} }
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory()))
executor.submit(object : Runnable {
override fun run() {
try {
networkMapClient.publish(signedNodeInfo)
} catch (t: Throwable) {
log.warn("Error encountered while publishing node info, will retry again", t)
// TODO: Exponential backoff?
executor.schedule(this, 1, TimeUnit.MINUTES)
}
}
})
}
protected abstract fun myAddresses(): List<NetworkHostAndPort> protected abstract fun myAddresses(): List<NetworkHostAndPort>
protected open fun makeStateMachineManager(database: CordaPersistence): StateMachineManager { protected open fun makeStateMachineManager(database: CordaPersistence): StateMachineManager {
return StateMachineManagerImpl( return StateMachineManagerImpl(
services, services,

View File

@ -15,12 +15,7 @@ import net.corda.core.utilities.minutes
import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.NamedThreadFactory import net.corda.node.utilities.NamedThreadFactory
import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException
import net.corda.nodeapi.internal.NodeInfoAndSigned import net.corda.nodeapi.internal.network.*
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.ParametersUpdate
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
import rx.Subscription import rx.Subscription
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.nio.file.Path import java.nio.file.Path
@ -28,6 +23,7 @@ import java.nio.file.StandardCopyOption
import java.time.Duration import java.time.Duration
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kotlin.system.exitProcess
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val fileWatcher: NodeInfoWatcher, private val fileWatcher: NodeInfoWatcher,
@ -57,36 +53,6 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
return DataFeed(currentUpdateInfo, parametersUpdatesTrack) return DataFeed(currentUpdateInfo, parametersUpdatesTrack)
} }
fun updateNodeInfo(nodeInfoAndSigned: NodeInfoAndSigned) {
// TODO We've already done this lookup and check in AbstractNode.initNodeInfo
val oldNodeInfo = networkMapCache.getNodeByLegalIdentity(nodeInfoAndSigned.nodeInfo.legalIdentities[0])
// Compare node info without timestamp.
if (nodeInfoAndSigned.nodeInfo.copy(serial = 0L) == oldNodeInfo?.copy(serial = 0L)) return
logger.info("Node-info has changed so submitting update. Old node-info was $oldNodeInfo")
// Only publish and write to disk if there are changes to the node info.
networkMapCache.addNode(nodeInfoAndSigned.nodeInfo)
fileWatcher.saveToFile(nodeInfoAndSigned)
if (networkMapClient != null) {
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
}
}
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
executor.submit(object : Runnable {
override fun run() {
try {
networkMapClient.publish(signedNodeInfo)
} catch (t: Throwable) {
logger.warn("Error encountered while publishing node info, will retry in $defaultRetryInterval", t)
// TODO: Exponential backoff?
executor.schedule(this, defaultRetryInterval.toMillis(), TimeUnit.MILLISECONDS)
}
}
})
}
fun subscribeToNetworkMap() { fun subscribeToNetworkMap() {
require(fileWatcherSubscription == null) { "Should not call this method twice." } require(fileWatcherSubscription == null) { "Should not call this method twice." }
// Subscribe to file based networkMap // Subscribe to file based networkMap
@ -114,17 +80,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) } networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) }
if (currentParametersHash != networkMap.networkParameterHash) { if (currentParametersHash != networkMap.networkParameterHash) {
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME exitOnParametersMismatch(networkMap)
val acceptedHash = if (updatesFile.exists()) updatesFile.readObject<SignedNetworkParameters>().raw.hash else null
if (acceptedHash == networkMap.networkParameterHash) {
logger.info("Flag day occurred. Network map switched to the new network parameters: ${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
} else {
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
"advertising: ${networkMap.networkParameterHash}.\n" +
"Node will shutdown now. Please update node to use correct network parameters file.")
}
System.exit(1)
} }
val currentNodeHashes = networkMapCache.allNodeHashes val currentNodeHashes = networkMapCache.allNodeHashes
@ -151,6 +107,23 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
return cacheTimeout return cacheTimeout
} }
private fun exitOnParametersMismatch(networkMap: NetworkMap) {
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
val acceptedHash = if (updatesFile.exists()) updatesFile.readObject<SignedNetworkParameters>().raw.hash else null
val exitCode = if (acceptedHash == networkMap.networkParameterHash) {
logger.info("Flag day occurred. Network map switched to the new network parameters: " +
"${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
0
} else {
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
"advertising: ${networkMap.networkParameterHash}.\n" +
"Node will shutdown now. Please update node to use correct network parameters file.")
1
}
exitProcess(exitCode)
}
private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) { private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) {
if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) { if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) {
// This update was handled already. // This update was handled already.

View File

@ -9,7 +9,6 @@ import net.corda.core.serialization.deserialize
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.VersionInfo import net.corda.node.VersionInfo
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX
import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseConfig
@ -21,7 +20,6 @@ import org.junit.Test
import org.junit.rules.TemporaryFolder import org.junit.rules.TemporaryFolder
import java.nio.file.Files import java.nio.file.Files
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
import kotlin.test.assertNull import kotlin.test.assertNull
class NodeTest { class NodeTest {
@ -66,16 +64,7 @@ class NodeTest {
val node = Node(configuration, rigorousMock<VersionInfo>().also { val node = Node(configuration, rigorousMock<VersionInfo>().also {
doReturn(platformVersion).whenever(it).platformVersion doReturn(platformVersion).whenever(it).platformVersion
}, initialiseSerialization = false) }, initialiseSerialization = false)
val nodeInfo = node.generateNodeInfo() assertEquals(node.generateNodeInfo(), node.generateNodeInfo()) // Node info doesn't change (including the serial)
assertEquals(listOf(nodeAddress), nodeInfo.addresses)
assertEquals(listOf(nodeName), nodeInfo.legalIdentitiesAndCerts.map { it.name })
assertEquals(platformVersion, nodeInfo.platformVersion)
node.generateNodeInfo().let {
assertNotEquals(nodeInfo, it) // Different serial.
assertEquals(nodeInfo, it.copy(serial = nodeInfo.serial))
}
PersistentNetworkMapCache(database, emptyList()).addNode(nodeInfo)
assertEquals(nodeInfo, node.generateNodeInfo())
} }
} }
} }

View File

@ -24,9 +24,11 @@ import net.corda.nodeapi.internal.createDevNetworkMapCa
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
import net.corda.nodeapi.internal.network.* import net.corda.nodeapi.internal.network.*
import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.* import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.expect
import net.corda.testing.core.expectEvents
import net.corda.testing.core.sequence
import net.corda.testing.internal.DEV_ROOT_CA import net.corda.testing.internal.DEV_ROOT_CA
import net.corda.testing.internal.TestNodeInfoBuilder
import net.corda.testing.internal.createNodeInfoAndSigned import net.corda.testing.internal.createNodeInfoAndSigned
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.After import org.junit.After
@ -56,7 +58,6 @@ class NetworkMapUpdaterTest {
private val networkParametersHash = SecureHash.randomSHA256() private val networkParametersHash = SecureHash.randomSHA256()
private val fileWatcher = NodeInfoWatcher(baseDir, scheduler) private val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
private val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, networkParametersHash, baseDir) private val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, networkParametersHash, baseDir)
private val nodeInfoBuilder = TestNodeInfoBuilder()
private var parametersUpdate: ParametersUpdate? = null private var parametersUpdate: ParametersUpdate? = null
@After @After
@ -65,39 +66,6 @@ class NetworkMapUpdaterTest {
fs.close() fs.close()
} }
@Test
fun `publish node info`() {
nodeInfoBuilder.addIdentity(ALICE_NAME)
val nodeInfo1AndSigned = nodeInfoBuilder.buildWithSigned()
val sameNodeInfoDifferentTimeAndSigned = nodeInfoBuilder.buildWithSigned(serial = System.currentTimeMillis())
// Publish node info for the first time.
updater.updateNodeInfo(nodeInfo1AndSigned)
// Sleep as publish is asynchronous.
// TODO: Remove sleep in unit test
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapClient, times(1)).publish(any())
networkMapCache.addNode(nodeInfo1AndSigned.nodeInfo)
// Publish the same node info, but with different serial.
updater.updateNodeInfo(sameNodeInfoDifferentTimeAndSigned)
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
// Same node info should not publish twice
verify(networkMapClient, times(0)).publish(sameNodeInfoDifferentTimeAndSigned.signed)
val differentNodeInfoAndSigned = createNodeInfoAndSigned("Bob")
// Publish different node info.
updater.updateNodeInfo(differentNodeInfoAndSigned)
// TODO: Remove sleep in unit test.
Thread.sleep(200)
verify(networkMapClient, times(1)).publish(differentNodeInfoAndSigned.signed)
}
@Test @Test
fun `process add node updates from network map, with additional node infos from dir`() { fun `process add node updates from network map, with additional node infos from dir`() {
val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("Info 1") val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("Info 1")