diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index e3f2682f65..982abcfc25 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -60,8 +60,10 @@ import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.VaultSoftLockManager import net.corda.node.shell.InteractiveShell import net.corda.node.utilities.AffinityExecutor +import net.corda.node.utilities.NamedThreadFactory import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.NodeInfoAndSigned +import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig @@ -84,6 +86,8 @@ import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit.SECONDS import kotlin.collections.set import kotlin.reflect.KClass @@ -140,7 +144,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected val runOnStop = ArrayList<() -> Any?>() private val _nodeReadyFuture = openFuture() protected var networkMapClient: NetworkMapClient? = null - protected lateinit var networkMapUpdater: NetworkMapUpdater + private lateinit var networkMapUpdater: NetworkMapUpdater lateinit var securityManager: RPCSecurityManager /** Completes once the node has successfully registered with the network map service @@ -174,18 +178,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration, initCertificate() val schemaService = NodeSchemaService(cordappLoader.cordappSchemas) val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) - return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)) { database -> - // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like - // a code smell. - val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList()) - persistentNetworkMapCache.start() - val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair) - val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised -> - val privateKey = keyPairs.single { it.public == publicKey }.private - privateKey.sign(serialised.bytes) - } - NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned) - nodeInfo + return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)){ database -> + // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks// like// a design smell. + val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList()) + persistentNetworkMapCache.start() + val (_, nodeInfo) = updateNodeInfo(persistentNetworkMapCache, null,identity, identityKeyPair) + + nodeInfo + } } @@ -196,15 +196,18 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null) val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) val identityService = makeIdentityService(identity.certificate) + networkMapClient = configuration.compatibilityZoneURL?.let { NetworkMapClient(it, identityService.trustRoot) } + val networkParameters = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) { "Node's platform version is lower than network's required minimumPlatformVersion" } + // Do all of this in a database transaction so anything that might need a connection has one. val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database -> val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService) - val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair) + val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair) identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts) val metrics = MetricRegistry() val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes) @@ -239,14 +242,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration, flowLogicRefFactory = flowLogicRefFactory, drainingModePollPeriod = configuration.drainingModePollPeriod, nodeProperties = nodeProperties) - if (serverThread is ExecutorService) { + + (serverThread as? ExecutorService)?.let { runOnStop += { // We wait here, even though any in-flight messages should have been drained away because the // server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is // arbitrary and might be inappropriate. - MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS) + MoreExecutors.shutdownAndAwaitTermination(it, 50, SECONDS) } } + makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService, flowLogicRefFactory) val rpcOps = makeRPCOps(flowStarter, database, smm) startMessagingService(rpcOps) @@ -258,6 +263,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, startShell(rpcOps) Pair(StartedNodeImpl(this, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService) } + networkMapUpdater = NetworkMapUpdater(services.networkMapCache, NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)), networkMapClient, @@ -265,15 +271,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration, configuration.baseDirectory) runOnStop += networkMapUpdater::close - log.info("Node-info for this node: ${services.myInfo}") - - val nodeInfoAndSigned = NodeInfoAndSigned(services.myInfo) { publicKey, serialised -> - services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey() - } - networkMapUpdater.updateNodeInfo(nodeInfoAndSigned) networkMapUpdater.subscribeToNetworkMap() - // If we successfully loaded network data from database, we set this future to Unit. + // If we successfully loaded network data from database, we set this future to Unit. _nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit }) return startedImpl.apply { @@ -297,9 +297,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration, InteractiveShell.startShell(configuration, rpcOps, securityManager, _services.identityService, _services.database) } - private fun initNodeInfo(networkMapCache: NetworkMapCacheBaseInternal, - identity: PartyAndCertificate, - identityKeyPair: KeyPair): Pair, NodeInfo> { + private fun updateNodeInfo(networkMapCache: NetworkMapCacheBaseInternal, + networkMapClient: NetworkMapClient?, + identity: PartyAndCertificate, + identityKeyPair: KeyPair): Pair, NodeInfo> { val keyPairs = mutableSetOf(identityKeyPair) myNotaryIdentity = configuration.notary?.let { @@ -313,7 +314,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } } - val nodeInfoWithBlankSerial = NodeInfo( + val potentialNodeInfo = NodeInfo( myAddresses(), setOf(identity, myNotaryIdentity).filterNotNull(), versionInfo.platformVersion, @@ -322,16 +323,51 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name) - val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0)) { + val nodeInfo = if (potentialNodeInfo == nodeInfoFromDb?.copy(serial = 0)) { // The node info hasn't changed. We use the one from the database to preserve the serial. + log.debug("Node-info hasn't changed") nodeInfoFromDb } else { - nodeInfoWithBlankSerial.copy(serial = platformClock.millis()) + log.info("Node-info has changed so submitting update. Old node-info was $nodeInfoFromDb") + val newNodeInfo = potentialNodeInfo.copy(serial = platformClock.millis()) + networkMapCache.addNode(newNodeInfo) + log.info("New node-info: $newNodeInfo") + newNodeInfo } + + val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised -> + val privateKey = keyPairs.single { it.public == publicKey }.private + privateKey.sign(serialised.bytes) + } + + // Write the node-info file even if nothing's changed, just in case the file has been deleted. + NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned) + + if (networkMapClient != null) { + tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient) + } + return Pair(keyPairs, nodeInfo) } + private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) { + val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory())) + + executor.submit(object : Runnable { + override fun run() { + try { + networkMapClient.publish(signedNodeInfo) + } catch (t: Throwable) { + log.warn("Error encountered while publishing node info, will retry again", t) + // TODO: Exponential backoff? + executor.schedule(this, 1, TimeUnit.MINUTES) + } + } + }) + } + protected abstract fun myAddresses(): List + protected open fun makeStateMachineManager(database: CordaPersistence): StateMachineManager { return StateMachineManagerImpl( services, diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt index 5bcc61f1e1..7125b0f6be 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt @@ -15,12 +15,7 @@ import net.corda.core.utilities.minutes import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.utilities.NamedThreadFactory import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException -import net.corda.nodeapi.internal.NodeInfoAndSigned -import net.corda.nodeapi.internal.SignedNodeInfo -import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME -import net.corda.nodeapi.internal.network.ParametersUpdate -import net.corda.nodeapi.internal.network.SignedNetworkParameters -import net.corda.nodeapi.internal.network.verifiedNetworkMapCert +import net.corda.nodeapi.internal.network.* import rx.Subscription import rx.subjects.PublishSubject import java.nio.file.Path @@ -28,6 +23,7 @@ import java.nio.file.StandardCopyOption import java.time.Duration import java.util.concurrent.Executors import java.util.concurrent.TimeUnit +import kotlin.system.exitProcess class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, private val fileWatcher: NodeInfoWatcher, @@ -57,36 +53,6 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, return DataFeed(currentUpdateInfo, parametersUpdatesTrack) } - fun updateNodeInfo(nodeInfoAndSigned: NodeInfoAndSigned) { - // TODO We've already done this lookup and check in AbstractNode.initNodeInfo - val oldNodeInfo = networkMapCache.getNodeByLegalIdentity(nodeInfoAndSigned.nodeInfo.legalIdentities[0]) - // Compare node info without timestamp. - if (nodeInfoAndSigned.nodeInfo.copy(serial = 0L) == oldNodeInfo?.copy(serial = 0L)) return - - logger.info("Node-info has changed so submitting update. Old node-info was $oldNodeInfo") - // Only publish and write to disk if there are changes to the node info. - networkMapCache.addNode(nodeInfoAndSigned.nodeInfo) - fileWatcher.saveToFile(nodeInfoAndSigned) - - if (networkMapClient != null) { - tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient) - } - } - - private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) { - executor.submit(object : Runnable { - override fun run() { - try { - networkMapClient.publish(signedNodeInfo) - } catch (t: Throwable) { - logger.warn("Error encountered while publishing node info, will retry in $defaultRetryInterval", t) - // TODO: Exponential backoff? - executor.schedule(this, defaultRetryInterval.toMillis(), TimeUnit.MILLISECONDS) - } - } - }) - } - fun subscribeToNetworkMap() { require(fileWatcherSubscription == null) { "Should not call this method twice." } // Subscribe to file based networkMap @@ -114,17 +80,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) } if (currentParametersHash != networkMap.networkParameterHash) { - val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME - val acceptedHash = if (updatesFile.exists()) updatesFile.readObject().raw.hash else null - if (acceptedHash == networkMap.networkParameterHash) { - logger.info("Flag day occurred. Network map switched to the new network parameters: ${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.") - } else { - // TODO This needs special handling (node omitted update process or didn't accept new parameters) - logger.error("Node is using parameters with hash: $currentParametersHash but network map is " + - "advertising: ${networkMap.networkParameterHash}.\n" + - "Node will shutdown now. Please update node to use correct network parameters file.") - } - System.exit(1) + exitOnParametersMismatch(networkMap) } val currentNodeHashes = networkMapCache.allNodeHashes @@ -151,6 +107,23 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, return cacheTimeout } + private fun exitOnParametersMismatch(networkMap: NetworkMap) { + val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME + val acceptedHash = if (updatesFile.exists()) updatesFile.readObject().raw.hash else null + val exitCode = if (acceptedHash == networkMap.networkParameterHash) { + logger.info("Flag day occurred. Network map switched to the new network parameters: " + + "${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.") + 0 + } else { + // TODO This needs special handling (node omitted update process or didn't accept new parameters) + logger.error("Node is using parameters with hash: $currentParametersHash but network map is " + + "advertising: ${networkMap.networkParameterHash}.\n" + + "Node will shutdown now. Please update node to use correct network parameters file.") + 1 + } + exitProcess(exitCode) + } + private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) { if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) { // This update was handled already. diff --git a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt new file mode 100644 index 0000000000..4c734bf17b --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt @@ -0,0 +1,70 @@ +package net.corda.node.internal + +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.readObject +import net.corda.core.node.NodeInfo +import net.corda.core.serialization.deserialize +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.VersionInfo +import net.corda.node.services.config.NodeConfiguration +import net.corda.nodeapi.internal.SignedNodeInfo +import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX +import net.corda.nodeapi.internal.persistence.DatabaseConfig +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.internal.rigorousMock +import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import java.nio.file.Files +import kotlin.test.assertEquals +import kotlin.test.assertNull + +class NodeTest { + private abstract class AbstractNodeConfiguration : NodeConfiguration + + @Rule + @JvmField + val temporaryFolder = TemporaryFolder() + @Rule + @JvmField + val testSerialization = SerializationEnvironmentRule() + + private fun nodeInfoFile() = temporaryFolder.root.listFiles().singleOrNull { it.name.startsWith(NODE_INFO_FILE_NAME_PREFIX) } + private fun AbstractNode.generateNodeInfo(): NodeInfo { + assertNull(nodeInfoFile()) + generateAndSaveNodeInfo() + val path = nodeInfoFile()!!.toPath() + val nodeInfo = path.readObject().raw.deserialize() + Files.delete(path) + return nodeInfo + } + + @Test + fun `generateAndSaveNodeInfo works`() { + val nodeAddress = NetworkHostAndPort("0.1.2.3", 456) + val nodeName = CordaX500Name("Manx Blockchain Corp", "Douglas", "IM") + val platformVersion = 789 + val dataSourceProperties = makeTestDataSourceProperties() + val databaseConfig = DatabaseConfig() + val configuration = rigorousMock().also { + doReturn(nodeAddress).whenever(it).p2pAddress + doReturn(nodeName).whenever(it).myLegalName + doReturn(null).whenever(it).notary // Don't add notary identity. + doReturn(dataSourceProperties).whenever(it).dataSourceProperties + doReturn(databaseConfig).whenever(it).database + doReturn(temporaryFolder.root.toPath()).whenever(it).baseDirectory + doReturn(true).whenever(it).devMode // Needed for identity cert. + doReturn("tsp").whenever(it).trustStorePassword + doReturn("ksp").whenever(it).keyStorePassword + } + configureDatabase(dataSourceProperties, databaseConfig, rigorousMock()).use { database -> + val node = Node(configuration, rigorousMock().also { + doReturn(platformVersion).whenever(it).platformVersion + }, initialiseSerialization = false) + assertEquals(node.generateNodeInfo(), node.generateNodeInfo()) // Node info doesn't change (including the serial) + } + } +} diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt index 2e77979c24..0b061be3b0 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt @@ -24,9 +24,11 @@ import net.corda.nodeapi.internal.createDevNetworkMapCa import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair import net.corda.nodeapi.internal.network.* import net.corda.testing.common.internal.testNetworkParameters -import net.corda.testing.core.* +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.core.expect +import net.corda.testing.core.expectEvents +import net.corda.testing.core.sequence import net.corda.testing.internal.DEV_ROOT_CA -import net.corda.testing.internal.TestNodeInfoBuilder import net.corda.testing.internal.createNodeInfoAndSigned import org.assertj.core.api.Assertions.assertThat import org.junit.After @@ -56,7 +58,6 @@ class NetworkMapUpdaterTest { private val networkParametersHash = SecureHash.randomSHA256() private val fileWatcher = NodeInfoWatcher(baseDir, scheduler) private val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, networkParametersHash, baseDir) - private val nodeInfoBuilder = TestNodeInfoBuilder() private var parametersUpdate: ParametersUpdate? = null @After @@ -65,39 +66,6 @@ class NetworkMapUpdaterTest { fs.close() } - @Test - fun `publish node info`() { - nodeInfoBuilder.addIdentity(ALICE_NAME) - - val nodeInfo1AndSigned = nodeInfoBuilder.buildWithSigned() - val sameNodeInfoDifferentTimeAndSigned = nodeInfoBuilder.buildWithSigned(serial = System.currentTimeMillis()) - - // Publish node info for the first time. - updater.updateNodeInfo(nodeInfo1AndSigned) - // Sleep as publish is asynchronous. - // TODO: Remove sleep in unit test - Thread.sleep(2L * cacheExpiryMs) - verify(networkMapClient, times(1)).publish(any()) - - networkMapCache.addNode(nodeInfo1AndSigned.nodeInfo) - - // Publish the same node info, but with different serial. - updater.updateNodeInfo(sameNodeInfoDifferentTimeAndSigned) - // TODO: Remove sleep in unit test. - Thread.sleep(2L * cacheExpiryMs) - - // Same node info should not publish twice - verify(networkMapClient, times(0)).publish(sameNodeInfoDifferentTimeAndSigned.signed) - - val differentNodeInfoAndSigned = createNodeInfoAndSigned("Bob") - - // Publish different node info. - updater.updateNodeInfo(differentNodeInfoAndSigned) - // TODO: Remove sleep in unit test. - Thread.sleep(200) - verify(networkMapClient, times(1)).publish(differentNodeInfoAndSigned.signed) - } - @Test fun `process add node updates from network map, with additional node infos from dir`() { val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("Info 1")