CORDA-1276: Cleaned up creation of node-info object on node start and when using --just-generate-node-info (#2909)

This commit is contained in:
Shams Asari 2018-04-03 17:30:53 +01:00 committed by GitHub
parent 2f1b8ff23e
commit 65ff214130
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 119 deletions

View File

@ -59,9 +59,11 @@ import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSoftLockManager
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.JVMAgentRegistry
import net.corda.node.utilities.NamedThreadFactory
import net.corda.node.utilities.NodeBuildProperties
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
@ -87,6 +89,7 @@ import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.SECONDS
import kotlin.collections.set
import kotlin.reflect.KClass
@ -143,7 +146,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected val runOnStop = ArrayList<() -> Any?>()
private val _nodeReadyFuture = openFuture<Unit>()
protected var networkMapClient: NetworkMapClient? = null
protected lateinit var networkMapUpdater: NetworkMapUpdater
private lateinit var networkMapUpdater: NetworkMapUpdater
lateinit var securityManager: RPCSecurityManager
private val shutdownExecutor = Executors.newSingleThreadExecutor()
@ -182,15 +185,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)).use {
it.transaction {
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like a code smell.
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks
// like a design smell.
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
persistentNetworkMapCache.start()
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private
privateKey.sign(serialised.bytes)
}
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
val (_, nodeInfo) = updateNodeInfo(persistentNetworkMapCache, null, identity, identityKeyPair)
nodeInfo
}
}
@ -204,15 +203,18 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null)
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
val identityService = makeIdentityService(identity.certificate)
networkMapClient = configuration.compatibilityZoneURL?.let { NetworkMapClient(it, identityService.trustRoot) }
val networkParameters = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
"Node's platform version is lower than network's required minimumPlatformVersion"
}
// Do all of this in a database transaction so anything that might need a connection has one.
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService).transaction {
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService)
val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair)
val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair)
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
val metrics = MetricRegistry()
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
@ -247,14 +249,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
flowLogicRefFactory = flowLogicRefFactory,
drainingModePollPeriod = configuration.drainingModePollPeriod,
nodeProperties = nodeProperties)
if (serverThread is ExecutorService) {
(serverThread as? ExecutorService)?.let {
runOnStop += {
// We wait here, even though any in-flight messages should have been drained away because the
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
// arbitrary and might be inappropriate.
MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS)
MoreExecutors.shutdownAndAwaitTermination(it, 50, SECONDS)
}
}
makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService, flowLogicRefFactory)
val rpcOps = makeRPCOps(flowStarter, database, smm)
startMessagingService(rpcOps)
@ -266,6 +270,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
startShell()
Pair(StartedNodeImpl(this@AbstractNode, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
}
networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
networkMapClient,
@ -273,15 +278,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
configuration.baseDirectory)
runOnStop += networkMapUpdater::close
log.info("Node-info for this node: ${services.myInfo}")
val nodeInfoAndSigned = NodeInfoAndSigned(services.myInfo) { publicKey, serialised ->
services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey()
}
networkMapUpdater.updateNodeInfo(nodeInfoAndSigned)
networkMapUpdater.subscribeToNetworkMap()
// If we successfully loaded network data from database, we set this future to Unit.
// If we successfully loaded network data from database, we set this future to Unit.
_nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit })
return startedImpl.apply {
@ -310,9 +309,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
private fun initNodeInfo(networkMapCache: NetworkMapCacheBaseInternal,
identity: PartyAndCertificate,
identityKeyPair: KeyPair): Pair<Set<KeyPair>, NodeInfo> {
private fun updateNodeInfo(networkMapCache: NetworkMapCacheBaseInternal,
networkMapClient: NetworkMapClient?,
identity: PartyAndCertificate,
identityKeyPair: KeyPair): Pair<Set<KeyPair>, NodeInfo> {
val keyPairs = mutableSetOf(identityKeyPair)
myNotaryIdentity = configuration.notary?.let {
@ -326,7 +326,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
val nodeInfoWithBlankSerial = NodeInfo(
val potentialNodeInfo = NodeInfo(
myAddresses(),
setOf(identity, myNotaryIdentity).filterNotNull(),
versionInfo.platformVersion,
@ -335,16 +335,51 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name)
val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0)) {
val nodeInfo = if (potentialNodeInfo == nodeInfoFromDb?.copy(serial = 0)) {
// The node info hasn't changed. We use the one from the database to preserve the serial.
log.debug("Node-info hasn't changed")
nodeInfoFromDb
} else {
nodeInfoWithBlankSerial.copy(serial = platformClock.millis())
log.info("Node-info has changed so submitting update. Old node-info was $nodeInfoFromDb")
val newNodeInfo = potentialNodeInfo.copy(serial = platformClock.millis())
networkMapCache.addNode(newNodeInfo)
log.info("New node-info: $newNodeInfo")
newNodeInfo
}
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private
privateKey.sign(serialised.bytes)
}
// Write the node-info file even if nothing's changed, just in case the file has been deleted.
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
if (networkMapClient != null) {
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
}
return Pair(keyPairs, nodeInfo)
}
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory()))
executor.submit(object : Runnable {
override fun run() {
try {
networkMapClient.publish(signedNodeInfo)
} catch (t: Throwable) {
log.warn("Error encountered while publishing node info, will retry again", t)
// TODO: Exponential backoff?
executor.schedule(this, 1, TimeUnit.MINUTES)
}
}
})
}
protected abstract fun myAddresses(): List<NetworkHostAndPort>
protected open fun makeStateMachineManager(database: CordaPersistence): StateMachineManager {
return StateMachineManagerImpl(
services,

View File

@ -15,12 +15,7 @@ import net.corda.core.utilities.minutes
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.NamedThreadFactory
import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.ParametersUpdate
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
import net.corda.nodeapi.internal.network.*
import rx.Subscription
import rx.subjects.PublishSubject
import java.nio.file.Path
@ -28,6 +23,7 @@ import java.nio.file.StandardCopyOption
import java.time.Duration
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.system.exitProcess
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val fileWatcher: NodeInfoWatcher,
@ -57,36 +53,6 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
return DataFeed(currentUpdateInfo, parametersUpdatesTrack)
}
fun updateNodeInfo(nodeInfoAndSigned: NodeInfoAndSigned) {
// TODO We've already done this lookup and check in AbstractNode.initNodeInfo
val oldNodeInfo = networkMapCache.getNodeByLegalIdentity(nodeInfoAndSigned.nodeInfo.legalIdentities[0])
// Compare node info without timestamp.
if (nodeInfoAndSigned.nodeInfo.copy(serial = 0L) == oldNodeInfo?.copy(serial = 0L)) return
logger.info("Node-info has changed so submitting update. Old node-info was $oldNodeInfo")
// Only publish and write to disk if there are changes to the node info.
networkMapCache.addNode(nodeInfoAndSigned.nodeInfo)
fileWatcher.saveToFile(nodeInfoAndSigned)
if (networkMapClient != null) {
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
}
}
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
executor.submit(object : Runnable {
override fun run() {
try {
networkMapClient.publish(signedNodeInfo)
} catch (t: Throwable) {
logger.warn("Error encountered while publishing node info, will retry in $defaultRetryInterval", t)
// TODO: Exponential backoff?
executor.schedule(this, defaultRetryInterval.toMillis(), TimeUnit.MILLISECONDS)
}
}
})
}
fun subscribeToNetworkMap() {
require(fileWatcherSubscription == null) { "Should not call this method twice." }
// Subscribe to file based networkMap
@ -114,17 +80,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) }
if (currentParametersHash != networkMap.networkParameterHash) {
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
val acceptedHash = if (updatesFile.exists()) updatesFile.readObject<SignedNetworkParameters>().raw.hash else null
if (acceptedHash == networkMap.networkParameterHash) {
logger.info("Flag day occurred. Network map switched to the new network parameters: ${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
} else {
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
"advertising: ${networkMap.networkParameterHash}.\n" +
"Node will shutdown now. Please update node to use correct network parameters file.")
}
System.exit(1)
exitOnParametersMismatch(networkMap)
}
val currentNodeHashes = networkMapCache.allNodeHashes
@ -151,6 +107,23 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
return cacheTimeout
}
private fun exitOnParametersMismatch(networkMap: NetworkMap) {
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
val acceptedHash = if (updatesFile.exists()) updatesFile.readObject<SignedNetworkParameters>().raw.hash else null
val exitCode = if (acceptedHash == networkMap.networkParameterHash) {
logger.info("Flag day occurred. Network map switched to the new network parameters: " +
"${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
0
} else {
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
"advertising: ${networkMap.networkParameterHash}.\n" +
"Node will shutdown now. Please update node to use correct network parameters file.")
1
}
exitProcess(exitCode)
}
private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) {
if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) {
// This update was handled already.

View File

@ -9,7 +9,6 @@ import net.corda.core.serialization.deserialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.VersionInfo
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX
import net.corda.nodeapi.internal.persistence.DatabaseConfig
@ -21,7 +20,6 @@ import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.nio.file.Files
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
import kotlin.test.assertNull
class NodeTest {
@ -66,16 +64,7 @@ class NodeTest {
val node = Node(configuration, rigorousMock<VersionInfo>().also {
doReturn(platformVersion).whenever(it).platformVersion
}, initialiseSerialization = false)
val nodeInfo = node.generateNodeInfo()
assertEquals(listOf(nodeAddress), nodeInfo.addresses)
assertEquals(listOf(nodeName), nodeInfo.legalIdentitiesAndCerts.map { it.name })
assertEquals(platformVersion, nodeInfo.platformVersion)
node.generateNodeInfo().let {
assertNotEquals(nodeInfo, it) // Different serial.
assertEquals(nodeInfo, it.copy(serial = nodeInfo.serial))
}
PersistentNetworkMapCache(database, emptyList()).addNode(nodeInfo)
assertEquals(nodeInfo, node.generateNodeInfo())
assertEquals(node.generateNodeInfo(), node.generateNodeInfo()) // Node info doesn't change (including the serial)
}
}
}

View File

@ -24,9 +24,11 @@ import net.corda.nodeapi.internal.createDevNetworkMapCa
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
import net.corda.nodeapi.internal.network.*
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.*
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.expect
import net.corda.testing.core.expectEvents
import net.corda.testing.core.sequence
import net.corda.testing.internal.DEV_ROOT_CA
import net.corda.testing.internal.TestNodeInfoBuilder
import net.corda.testing.internal.createNodeInfoAndSigned
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
@ -56,7 +58,6 @@ class NetworkMapUpdaterTest {
private val networkParametersHash = SecureHash.randomSHA256()
private val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
private val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, networkParametersHash, baseDir)
private val nodeInfoBuilder = TestNodeInfoBuilder()
private var parametersUpdate: ParametersUpdate? = null
@After
@ -65,39 +66,6 @@ class NetworkMapUpdaterTest {
fs.close()
}
@Test
fun `publish node info`() {
nodeInfoBuilder.addIdentity(ALICE_NAME)
val nodeInfo1AndSigned = nodeInfoBuilder.buildWithSigned()
val sameNodeInfoDifferentTimeAndSigned = nodeInfoBuilder.buildWithSigned(serial = System.currentTimeMillis())
// Publish node info for the first time.
updater.updateNodeInfo(nodeInfo1AndSigned)
// Sleep as publish is asynchronous.
// TODO: Remove sleep in unit test
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapClient, times(1)).publish(any())
networkMapCache.addNode(nodeInfo1AndSigned.nodeInfo)
// Publish the same node info, but with different serial.
updater.updateNodeInfo(sameNodeInfoDifferentTimeAndSigned)
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
// Same node info should not publish twice
verify(networkMapClient, times(0)).publish(sameNodeInfoDifferentTimeAndSigned.signed)
val differentNodeInfoAndSigned = createNodeInfoAndSigned("Bob")
// Publish different node info.
updater.updateNodeInfo(differentNodeInfoAndSigned)
// TODO: Remove sleep in unit test.
Thread.sleep(200)
verify(networkMapClient, times(1)).publish(differentNodeInfoAndSigned.signed)
}
@Test
fun `process add node updates from network map, with additional node infos from dir`() {
val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("Info 1")