diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index ef1bbceeb0..e244117dae 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -59,9 +59,11 @@ import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.VaultSoftLockManager import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.JVMAgentRegistry +import net.corda.node.utilities.NamedThreadFactory import net.corda.node.utilities.NodeBuildProperties import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.NodeInfoAndSigned +import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig @@ -87,6 +89,7 @@ import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit.SECONDS import kotlin.collections.set import kotlin.reflect.KClass @@ -143,7 +146,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected val runOnStop = ArrayList<() -> Any?>() private val _nodeReadyFuture = openFuture() protected var networkMapClient: NetworkMapClient? = null - protected lateinit var networkMapUpdater: NetworkMapUpdater + private lateinit var networkMapUpdater: NetworkMapUpdater lateinit var securityManager: RPCSecurityManager private val shutdownExecutor = Executors.newSingleThreadExecutor() @@ -182,15 +185,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)).use { it.transaction { - // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like a code smell. + // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks + // like a design smell. val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList()) persistentNetworkMapCache.start() - val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair) - val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised -> - val privateKey = keyPairs.single { it.public == publicKey }.private - privateKey.sign(serialised.bytes) - } - NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned) + val (_, nodeInfo) = updateNodeInfo(persistentNetworkMapCache, null, identity, identityKeyPair) nodeInfo } } @@ -204,15 +203,18 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null) val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) val identityService = makeIdentityService(identity.certificate) + networkMapClient = configuration.compatibilityZoneURL?.let { NetworkMapClient(it, identityService.trustRoot) } + val networkParameters = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) { "Node's platform version is lower than network's required minimumPlatformVersion" } + // Do all of this in a database transaction so anything that might need a connection has one. val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService).transaction { val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService) - val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair) + val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair) identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts) val metrics = MetricRegistry() val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes) @@ -247,14 +249,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration, flowLogicRefFactory = flowLogicRefFactory, drainingModePollPeriod = configuration.drainingModePollPeriod, nodeProperties = nodeProperties) - if (serverThread is ExecutorService) { + + (serverThread as? ExecutorService)?.let { runOnStop += { // We wait here, even though any in-flight messages should have been drained away because the // server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is // arbitrary and might be inappropriate. - MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS) + MoreExecutors.shutdownAndAwaitTermination(it, 50, SECONDS) } } + makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService, flowLogicRefFactory) val rpcOps = makeRPCOps(flowStarter, database, smm) startMessagingService(rpcOps) @@ -266,6 +270,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, startShell() Pair(StartedNodeImpl(this@AbstractNode, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService) } + networkMapUpdater = NetworkMapUpdater(services.networkMapCache, NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)), networkMapClient, @@ -273,15 +278,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration, configuration.baseDirectory) runOnStop += networkMapUpdater::close - log.info("Node-info for this node: ${services.myInfo}") - - val nodeInfoAndSigned = NodeInfoAndSigned(services.myInfo) { publicKey, serialised -> - services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey() - } - networkMapUpdater.updateNodeInfo(nodeInfoAndSigned) networkMapUpdater.subscribeToNetworkMap() - // If we successfully loaded network data from database, we set this future to Unit. + // If we successfully loaded network data from database, we set this future to Unit. _nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit }) return startedImpl.apply { @@ -310,9 +309,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } } - private fun initNodeInfo(networkMapCache: NetworkMapCacheBaseInternal, - identity: PartyAndCertificate, - identityKeyPair: KeyPair): Pair, NodeInfo> { + private fun updateNodeInfo(networkMapCache: NetworkMapCacheBaseInternal, + networkMapClient: NetworkMapClient?, + identity: PartyAndCertificate, + identityKeyPair: KeyPair): Pair, NodeInfo> { val keyPairs = mutableSetOf(identityKeyPair) myNotaryIdentity = configuration.notary?.let { @@ -326,7 +326,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } } - val nodeInfoWithBlankSerial = NodeInfo( + val potentialNodeInfo = NodeInfo( myAddresses(), setOf(identity, myNotaryIdentity).filterNotNull(), versionInfo.platformVersion, @@ -335,16 +335,51 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name) - val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0)) { + val nodeInfo = if (potentialNodeInfo == nodeInfoFromDb?.copy(serial = 0)) { // The node info hasn't changed. We use the one from the database to preserve the serial. + log.debug("Node-info hasn't changed") nodeInfoFromDb } else { - nodeInfoWithBlankSerial.copy(serial = platformClock.millis()) + log.info("Node-info has changed so submitting update. Old node-info was $nodeInfoFromDb") + val newNodeInfo = potentialNodeInfo.copy(serial = platformClock.millis()) + networkMapCache.addNode(newNodeInfo) + log.info("New node-info: $newNodeInfo") + newNodeInfo } + + val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised -> + val privateKey = keyPairs.single { it.public == publicKey }.private + privateKey.sign(serialised.bytes) + } + + // Write the node-info file even if nothing's changed, just in case the file has been deleted. + NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned) + + if (networkMapClient != null) { + tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient) + } + return Pair(keyPairs, nodeInfo) } + private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) { + val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory())) + + executor.submit(object : Runnable { + override fun run() { + try { + networkMapClient.publish(signedNodeInfo) + } catch (t: Throwable) { + log.warn("Error encountered while publishing node info, will retry again", t) + // TODO: Exponential backoff? + executor.schedule(this, 1, TimeUnit.MINUTES) + } + } + }) + } + protected abstract fun myAddresses(): List + protected open fun makeStateMachineManager(database: CordaPersistence): StateMachineManager { return StateMachineManagerImpl( services, diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt index a8b3e7c448..806ea71291 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt @@ -15,12 +15,7 @@ import net.corda.core.utilities.minutes import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.utilities.NamedThreadFactory import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException -import net.corda.nodeapi.internal.NodeInfoAndSigned -import net.corda.nodeapi.internal.SignedNodeInfo -import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME -import net.corda.nodeapi.internal.network.ParametersUpdate -import net.corda.nodeapi.internal.network.SignedNetworkParameters -import net.corda.nodeapi.internal.network.verifiedNetworkMapCert +import net.corda.nodeapi.internal.network.* import rx.Subscription import rx.subjects.PublishSubject import java.nio.file.Path @@ -28,6 +23,7 @@ import java.nio.file.StandardCopyOption import java.time.Duration import java.util.concurrent.Executors import java.util.concurrent.TimeUnit +import kotlin.system.exitProcess class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, private val fileWatcher: NodeInfoWatcher, @@ -57,36 +53,6 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, return DataFeed(currentUpdateInfo, parametersUpdatesTrack) } - fun updateNodeInfo(nodeInfoAndSigned: NodeInfoAndSigned) { - // TODO We've already done this lookup and check in AbstractNode.initNodeInfo - val oldNodeInfo = networkMapCache.getNodeByLegalIdentity(nodeInfoAndSigned.nodeInfo.legalIdentities[0]) - // Compare node info without timestamp. - if (nodeInfoAndSigned.nodeInfo.copy(serial = 0L) == oldNodeInfo?.copy(serial = 0L)) return - - logger.info("Node-info has changed so submitting update. Old node-info was $oldNodeInfo") - // Only publish and write to disk if there are changes to the node info. - networkMapCache.addNode(nodeInfoAndSigned.nodeInfo) - fileWatcher.saveToFile(nodeInfoAndSigned) - - if (networkMapClient != null) { - tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient) - } - } - - private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) { - executor.submit(object : Runnable { - override fun run() { - try { - networkMapClient.publish(signedNodeInfo) - } catch (t: Throwable) { - logger.warn("Error encountered while publishing node info, will retry in $defaultRetryInterval", t) - // TODO: Exponential backoff? - executor.schedule(this, defaultRetryInterval.toMillis(), TimeUnit.MILLISECONDS) - } - } - }) - } - fun subscribeToNetworkMap() { require(fileWatcherSubscription == null) { "Should not call this method twice." } // Subscribe to file based networkMap @@ -114,17 +80,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) } if (currentParametersHash != networkMap.networkParameterHash) { - val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME - val acceptedHash = if (updatesFile.exists()) updatesFile.readObject().raw.hash else null - if (acceptedHash == networkMap.networkParameterHash) { - logger.info("Flag day occurred. Network map switched to the new network parameters: ${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.") - } else { - // TODO This needs special handling (node omitted update process or didn't accept new parameters) - logger.error("Node is using parameters with hash: $currentParametersHash but network map is " + - "advertising: ${networkMap.networkParameterHash}.\n" + - "Node will shutdown now. Please update node to use correct network parameters file.") - } - System.exit(1) + exitOnParametersMismatch(networkMap) } val currentNodeHashes = networkMapCache.allNodeHashes @@ -151,6 +107,23 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, return cacheTimeout } + private fun exitOnParametersMismatch(networkMap: NetworkMap) { + val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME + val acceptedHash = if (updatesFile.exists()) updatesFile.readObject().raw.hash else null + val exitCode = if (acceptedHash == networkMap.networkParameterHash) { + logger.info("Flag day occurred. Network map switched to the new network parameters: " + + "${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.") + 0 + } else { + // TODO This needs special handling (node omitted update process or didn't accept new parameters) + logger.error("Node is using parameters with hash: $currentParametersHash but network map is " + + "advertising: ${networkMap.networkParameterHash}.\n" + + "Node will shutdown now. Please update node to use correct network parameters file.") + 1 + } + exitProcess(exitCode) + } + private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) { if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) { // This update was handled already. diff --git a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt index e47ab67a0d..4c734bf17b 100644 --- a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt +++ b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt @@ -9,7 +9,6 @@ import net.corda.core.serialization.deserialize import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.VersionInfo import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX import net.corda.nodeapi.internal.persistence.DatabaseConfig @@ -21,7 +20,6 @@ import org.junit.Test import org.junit.rules.TemporaryFolder import java.nio.file.Files import kotlin.test.assertEquals -import kotlin.test.assertNotEquals import kotlin.test.assertNull class NodeTest { @@ -66,16 +64,7 @@ class NodeTest { val node = Node(configuration, rigorousMock().also { doReturn(platformVersion).whenever(it).platformVersion }, initialiseSerialization = false) - val nodeInfo = node.generateNodeInfo() - assertEquals(listOf(nodeAddress), nodeInfo.addresses) - assertEquals(listOf(nodeName), nodeInfo.legalIdentitiesAndCerts.map { it.name }) - assertEquals(platformVersion, nodeInfo.platformVersion) - node.generateNodeInfo().let { - assertNotEquals(nodeInfo, it) // Different serial. - assertEquals(nodeInfo, it.copy(serial = nodeInfo.serial)) - } - PersistentNetworkMapCache(database, emptyList()).addNode(nodeInfo) - assertEquals(nodeInfo, node.generateNodeInfo()) + assertEquals(node.generateNodeInfo(), node.generateNodeInfo()) // Node info doesn't change (including the serial) } } } diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt index 9fc5698821..64c997237a 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt @@ -24,9 +24,11 @@ import net.corda.nodeapi.internal.createDevNetworkMapCa import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair import net.corda.nodeapi.internal.network.* import net.corda.testing.common.internal.testNetworkParameters -import net.corda.testing.core.* +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.core.expect +import net.corda.testing.core.expectEvents +import net.corda.testing.core.sequence import net.corda.testing.internal.DEV_ROOT_CA -import net.corda.testing.internal.TestNodeInfoBuilder import net.corda.testing.internal.createNodeInfoAndSigned import org.assertj.core.api.Assertions.assertThat import org.junit.After @@ -56,7 +58,6 @@ class NetworkMapUpdaterTest { private val networkParametersHash = SecureHash.randomSHA256() private val fileWatcher = NodeInfoWatcher(baseDir, scheduler) private val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, networkParametersHash, baseDir) - private val nodeInfoBuilder = TestNodeInfoBuilder() private var parametersUpdate: ParametersUpdate? = null @After @@ -65,39 +66,6 @@ class NetworkMapUpdaterTest { fs.close() } - @Test - fun `publish node info`() { - nodeInfoBuilder.addIdentity(ALICE_NAME) - - val nodeInfo1AndSigned = nodeInfoBuilder.buildWithSigned() - val sameNodeInfoDifferentTimeAndSigned = nodeInfoBuilder.buildWithSigned(serial = System.currentTimeMillis()) - - // Publish node info for the first time. - updater.updateNodeInfo(nodeInfo1AndSigned) - // Sleep as publish is asynchronous. - // TODO: Remove sleep in unit test - Thread.sleep(2L * cacheExpiryMs) - verify(networkMapClient, times(1)).publish(any()) - - networkMapCache.addNode(nodeInfo1AndSigned.nodeInfo) - - // Publish the same node info, but with different serial. - updater.updateNodeInfo(sameNodeInfoDifferentTimeAndSigned) - // TODO: Remove sleep in unit test. - Thread.sleep(2L * cacheExpiryMs) - - // Same node info should not publish twice - verify(networkMapClient, times(0)).publish(sameNodeInfoDifferentTimeAndSigned.signed) - - val differentNodeInfoAndSigned = createNodeInfoAndSigned("Bob") - - // Publish different node info. - updater.updateNodeInfo(differentNodeInfoAndSigned) - // TODO: Remove sleep in unit test. - Thread.sleep(200) - verify(networkMapClient, times(1)).publish(differentNodeInfoAndSigned.signed) - } - @Test fun `process add node updates from network map, with additional node infos from dir`() { val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("Info 1")