mirror of
https://github.com/corda/corda.git
synced 2025-01-18 02:39:51 +00:00
CORDA-1276: Cleaned up creation of node-info object on node start and when using --just-generate-node-info (#2909)
This commit is contained in:
parent
2f1b8ff23e
commit
65ff214130
@ -59,9 +59,11 @@ import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.services.vault.VaultSoftLockManager
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.JVMAgentRegistry
|
||||
import net.corda.node.utilities.NamedThreadFactory
|
||||
import net.corda.node.utilities.NodeBuildProperties
|
||||
import net.corda.nodeapi.internal.DevIdentityGenerator
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
@ -87,6 +89,7 @@ import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.TimeUnit.SECONDS
|
||||
import kotlin.collections.set
|
||||
import kotlin.reflect.KClass
|
||||
@ -143,7 +146,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
protected val runOnStop = ArrayList<() -> Any?>()
|
||||
private val _nodeReadyFuture = openFuture<Unit>()
|
||||
protected var networkMapClient: NetworkMapClient? = null
|
||||
protected lateinit var networkMapUpdater: NetworkMapUpdater
|
||||
private lateinit var networkMapUpdater: NetworkMapUpdater
|
||||
lateinit var securityManager: RPCSecurityManager
|
||||
|
||||
private val shutdownExecutor = Executors.newSingleThreadExecutor()
|
||||
@ -182,15 +185,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
|
||||
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)).use {
|
||||
it.transaction {
|
||||
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like a code smell.
|
||||
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks
|
||||
// like a design smell.
|
||||
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
|
||||
persistentNetworkMapCache.start()
|
||||
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
|
||||
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
|
||||
val privateKey = keyPairs.single { it.public == publicKey }.private
|
||||
privateKey.sign(serialised.bytes)
|
||||
}
|
||||
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
|
||||
val (_, nodeInfo) = updateNodeInfo(persistentNetworkMapCache, null, identity, identityKeyPair)
|
||||
nodeInfo
|
||||
}
|
||||
}
|
||||
@ -204,15 +203,18 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null)
|
||||
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
|
||||
val identityService = makeIdentityService(identity.certificate)
|
||||
|
||||
networkMapClient = configuration.compatibilityZoneURL?.let { NetworkMapClient(it, identityService.trustRoot) }
|
||||
|
||||
val networkParameters = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters
|
||||
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
|
||||
"Node's platform version is lower than network's required minimumPlatformVersion"
|
||||
}
|
||||
|
||||
// Do all of this in a database transaction so anything that might need a connection has one.
|
||||
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService).transaction {
|
||||
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService)
|
||||
val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair)
|
||||
val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair)
|
||||
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
|
||||
val metrics = MetricRegistry()
|
||||
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
|
||||
@ -247,14 +249,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
flowLogicRefFactory = flowLogicRefFactory,
|
||||
drainingModePollPeriod = configuration.drainingModePollPeriod,
|
||||
nodeProperties = nodeProperties)
|
||||
if (serverThread is ExecutorService) {
|
||||
|
||||
(serverThread as? ExecutorService)?.let {
|
||||
runOnStop += {
|
||||
// We wait here, even though any in-flight messages should have been drained away because the
|
||||
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
|
||||
// arbitrary and might be inappropriate.
|
||||
MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS)
|
||||
MoreExecutors.shutdownAndAwaitTermination(it, 50, SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService, flowLogicRefFactory)
|
||||
val rpcOps = makeRPCOps(flowStarter, database, smm)
|
||||
startMessagingService(rpcOps)
|
||||
@ -266,6 +270,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
startShell()
|
||||
Pair(StartedNodeImpl(this@AbstractNode, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
|
||||
}
|
||||
|
||||
networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
|
||||
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
|
||||
networkMapClient,
|
||||
@ -273,15 +278,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
configuration.baseDirectory)
|
||||
runOnStop += networkMapUpdater::close
|
||||
|
||||
log.info("Node-info for this node: ${services.myInfo}")
|
||||
|
||||
val nodeInfoAndSigned = NodeInfoAndSigned(services.myInfo) { publicKey, serialised ->
|
||||
services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey()
|
||||
}
|
||||
networkMapUpdater.updateNodeInfo(nodeInfoAndSigned)
|
||||
networkMapUpdater.subscribeToNetworkMap()
|
||||
|
||||
// If we successfully loaded network data from database, we set this future to Unit.
|
||||
// If we successfully loaded network data from database, we set this future to Unit.
|
||||
_nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit })
|
||||
|
||||
return startedImpl.apply {
|
||||
@ -310,9 +309,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
private fun initNodeInfo(networkMapCache: NetworkMapCacheBaseInternal,
|
||||
identity: PartyAndCertificate,
|
||||
identityKeyPair: KeyPair): Pair<Set<KeyPair>, NodeInfo> {
|
||||
private fun updateNodeInfo(networkMapCache: NetworkMapCacheBaseInternal,
|
||||
networkMapClient: NetworkMapClient?,
|
||||
identity: PartyAndCertificate,
|
||||
identityKeyPair: KeyPair): Pair<Set<KeyPair>, NodeInfo> {
|
||||
val keyPairs = mutableSetOf(identityKeyPair)
|
||||
|
||||
myNotaryIdentity = configuration.notary?.let {
|
||||
@ -326,7 +326,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
val nodeInfoWithBlankSerial = NodeInfo(
|
||||
val potentialNodeInfo = NodeInfo(
|
||||
myAddresses(),
|
||||
setOf(identity, myNotaryIdentity).filterNotNull(),
|
||||
versionInfo.platformVersion,
|
||||
@ -335,16 +335,51 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
|
||||
val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name)
|
||||
|
||||
val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0)) {
|
||||
val nodeInfo = if (potentialNodeInfo == nodeInfoFromDb?.copy(serial = 0)) {
|
||||
// The node info hasn't changed. We use the one from the database to preserve the serial.
|
||||
log.debug("Node-info hasn't changed")
|
||||
nodeInfoFromDb
|
||||
} else {
|
||||
nodeInfoWithBlankSerial.copy(serial = platformClock.millis())
|
||||
log.info("Node-info has changed so submitting update. Old node-info was $nodeInfoFromDb")
|
||||
val newNodeInfo = potentialNodeInfo.copy(serial = platformClock.millis())
|
||||
networkMapCache.addNode(newNodeInfo)
|
||||
log.info("New node-info: $newNodeInfo")
|
||||
newNodeInfo
|
||||
}
|
||||
|
||||
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
|
||||
val privateKey = keyPairs.single { it.public == publicKey }.private
|
||||
privateKey.sign(serialised.bytes)
|
||||
}
|
||||
|
||||
// Write the node-info file even if nothing's changed, just in case the file has been deleted.
|
||||
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
|
||||
|
||||
if (networkMapClient != null) {
|
||||
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
|
||||
}
|
||||
|
||||
return Pair(keyPairs, nodeInfo)
|
||||
}
|
||||
|
||||
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
|
||||
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory()))
|
||||
|
||||
executor.submit(object : Runnable {
|
||||
override fun run() {
|
||||
try {
|
||||
networkMapClient.publish(signedNodeInfo)
|
||||
} catch (t: Throwable) {
|
||||
log.warn("Error encountered while publishing node info, will retry again", t)
|
||||
// TODO: Exponential backoff?
|
||||
executor.schedule(this, 1, TimeUnit.MINUTES)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
protected abstract fun myAddresses(): List<NetworkHostAndPort>
|
||||
|
||||
protected open fun makeStateMachineManager(database: CordaPersistence): StateMachineManager {
|
||||
return StateMachineManagerImpl(
|
||||
services,
|
||||
|
@ -15,12 +15,7 @@ import net.corda.core.utilities.minutes
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.utilities.NamedThreadFactory
|
||||
import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.ParametersUpdate
|
||||
import net.corda.nodeapi.internal.network.SignedNetworkParameters
|
||||
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
|
||||
import net.corda.nodeapi.internal.network.*
|
||||
import rx.Subscription
|
||||
import rx.subjects.PublishSubject
|
||||
import java.nio.file.Path
|
||||
@ -28,6 +23,7 @@ import java.nio.file.StandardCopyOption
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.system.exitProcess
|
||||
|
||||
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
private val fileWatcher: NodeInfoWatcher,
|
||||
@ -57,36 +53,6 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
return DataFeed(currentUpdateInfo, parametersUpdatesTrack)
|
||||
}
|
||||
|
||||
fun updateNodeInfo(nodeInfoAndSigned: NodeInfoAndSigned) {
|
||||
// TODO We've already done this lookup and check in AbstractNode.initNodeInfo
|
||||
val oldNodeInfo = networkMapCache.getNodeByLegalIdentity(nodeInfoAndSigned.nodeInfo.legalIdentities[0])
|
||||
// Compare node info without timestamp.
|
||||
if (nodeInfoAndSigned.nodeInfo.copy(serial = 0L) == oldNodeInfo?.copy(serial = 0L)) return
|
||||
|
||||
logger.info("Node-info has changed so submitting update. Old node-info was $oldNodeInfo")
|
||||
// Only publish and write to disk if there are changes to the node info.
|
||||
networkMapCache.addNode(nodeInfoAndSigned.nodeInfo)
|
||||
fileWatcher.saveToFile(nodeInfoAndSigned)
|
||||
|
||||
if (networkMapClient != null) {
|
||||
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
|
||||
}
|
||||
}
|
||||
|
||||
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
|
||||
executor.submit(object : Runnable {
|
||||
override fun run() {
|
||||
try {
|
||||
networkMapClient.publish(signedNodeInfo)
|
||||
} catch (t: Throwable) {
|
||||
logger.warn("Error encountered while publishing node info, will retry in $defaultRetryInterval", t)
|
||||
// TODO: Exponential backoff?
|
||||
executor.schedule(this, defaultRetryInterval.toMillis(), TimeUnit.MILLISECONDS)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fun subscribeToNetworkMap() {
|
||||
require(fileWatcherSubscription == null) { "Should not call this method twice." }
|
||||
// Subscribe to file based networkMap
|
||||
@ -114,17 +80,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) }
|
||||
|
||||
if (currentParametersHash != networkMap.networkParameterHash) {
|
||||
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
val acceptedHash = if (updatesFile.exists()) updatesFile.readObject<SignedNetworkParameters>().raw.hash else null
|
||||
if (acceptedHash == networkMap.networkParameterHash) {
|
||||
logger.info("Flag day occurred. Network map switched to the new network parameters: ${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
|
||||
} else {
|
||||
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
|
||||
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
|
||||
"advertising: ${networkMap.networkParameterHash}.\n" +
|
||||
"Node will shutdown now. Please update node to use correct network parameters file.")
|
||||
}
|
||||
System.exit(1)
|
||||
exitOnParametersMismatch(networkMap)
|
||||
}
|
||||
|
||||
val currentNodeHashes = networkMapCache.allNodeHashes
|
||||
@ -151,6 +107,23 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
return cacheTimeout
|
||||
}
|
||||
|
||||
private fun exitOnParametersMismatch(networkMap: NetworkMap) {
|
||||
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
val acceptedHash = if (updatesFile.exists()) updatesFile.readObject<SignedNetworkParameters>().raw.hash else null
|
||||
val exitCode = if (acceptedHash == networkMap.networkParameterHash) {
|
||||
logger.info("Flag day occurred. Network map switched to the new network parameters: " +
|
||||
"${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
|
||||
0
|
||||
} else {
|
||||
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
|
||||
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
|
||||
"advertising: ${networkMap.networkParameterHash}.\n" +
|
||||
"Node will shutdown now. Please update node to use correct network parameters file.")
|
||||
1
|
||||
}
|
||||
exitProcess(exitCode)
|
||||
}
|
||||
|
||||
private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) {
|
||||
if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) {
|
||||
// This update was handled already.
|
||||
|
@ -9,7 +9,6 @@ import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.network.PersistentNetworkMapCache
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
@ -21,7 +20,6 @@ import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.nio.file.Files
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNotEquals
|
||||
import kotlin.test.assertNull
|
||||
|
||||
class NodeTest {
|
||||
@ -66,16 +64,7 @@ class NodeTest {
|
||||
val node = Node(configuration, rigorousMock<VersionInfo>().also {
|
||||
doReturn(platformVersion).whenever(it).platformVersion
|
||||
}, initialiseSerialization = false)
|
||||
val nodeInfo = node.generateNodeInfo()
|
||||
assertEquals(listOf(nodeAddress), nodeInfo.addresses)
|
||||
assertEquals(listOf(nodeName), nodeInfo.legalIdentitiesAndCerts.map { it.name })
|
||||
assertEquals(platformVersion, nodeInfo.platformVersion)
|
||||
node.generateNodeInfo().let {
|
||||
assertNotEquals(nodeInfo, it) // Different serial.
|
||||
assertEquals(nodeInfo, it.copy(serial = nodeInfo.serial))
|
||||
}
|
||||
PersistentNetworkMapCache(database, emptyList()).addNode(nodeInfo)
|
||||
assertEquals(nodeInfo, node.generateNodeInfo())
|
||||
assertEquals(node.generateNodeInfo(), node.generateNodeInfo()) // Node info doesn't change (including the serial)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -24,9 +24,11 @@ import net.corda.nodeapi.internal.createDevNetworkMapCa
|
||||
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
|
||||
import net.corda.nodeapi.internal.network.*
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.expect
|
||||
import net.corda.testing.core.expectEvents
|
||||
import net.corda.testing.core.sequence
|
||||
import net.corda.testing.internal.DEV_ROOT_CA
|
||||
import net.corda.testing.internal.TestNodeInfoBuilder
|
||||
import net.corda.testing.internal.createNodeInfoAndSigned
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
@ -56,7 +58,6 @@ class NetworkMapUpdaterTest {
|
||||
private val networkParametersHash = SecureHash.randomSHA256()
|
||||
private val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
private val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, networkParametersHash, baseDir)
|
||||
private val nodeInfoBuilder = TestNodeInfoBuilder()
|
||||
private var parametersUpdate: ParametersUpdate? = null
|
||||
|
||||
@After
|
||||
@ -65,39 +66,6 @@ class NetworkMapUpdaterTest {
|
||||
fs.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `publish node info`() {
|
||||
nodeInfoBuilder.addIdentity(ALICE_NAME)
|
||||
|
||||
val nodeInfo1AndSigned = nodeInfoBuilder.buildWithSigned()
|
||||
val sameNodeInfoDifferentTimeAndSigned = nodeInfoBuilder.buildWithSigned(serial = System.currentTimeMillis())
|
||||
|
||||
// Publish node info for the first time.
|
||||
updater.updateNodeInfo(nodeInfo1AndSigned)
|
||||
// Sleep as publish is asynchronous.
|
||||
// TODO: Remove sleep in unit test
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
verify(networkMapClient, times(1)).publish(any())
|
||||
|
||||
networkMapCache.addNode(nodeInfo1AndSigned.nodeInfo)
|
||||
|
||||
// Publish the same node info, but with different serial.
|
||||
updater.updateNodeInfo(sameNodeInfoDifferentTimeAndSigned)
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
|
||||
// Same node info should not publish twice
|
||||
verify(networkMapClient, times(0)).publish(sameNodeInfoDifferentTimeAndSigned.signed)
|
||||
|
||||
val differentNodeInfoAndSigned = createNodeInfoAndSigned("Bob")
|
||||
|
||||
// Publish different node info.
|
||||
updater.updateNodeInfo(differentNodeInfoAndSigned)
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(200)
|
||||
verify(networkMapClient, times(1)).publish(differentNodeInfoAndSigned.signed)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `process add node updates from network map, with additional node infos from dir`() {
|
||||
val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("Info 1")
|
||||
|
Loading…
Reference in New Issue
Block a user