mirror of
https://github.com/corda/corda.git
synced 2025-02-20 09:26:41 +00:00
Limit concurrency of the bootstrapper (#3271)
This commit is contained in:
parent
aafa548454
commit
4bc9151d5d
@ -8,6 +8,7 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.concurrent.fork
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.NotaryInfo
|
||||
@ -31,11 +32,13 @@ import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeoutException
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.collections.component1
|
||||
import kotlin.collections.component2
|
||||
import kotlin.collections.set
|
||||
import kotlin.concurrent.schedule
|
||||
import kotlin.streams.toList
|
||||
|
||||
/**
|
||||
@ -108,11 +111,10 @@ class NetworkBootstrapper {
|
||||
println("Nodes found in the following sub-directories: ${nodeDirs.map { it.fileName }}")
|
||||
val configs = nodeDirs.associateBy({ it }, { ConfigFactory.parseFile((it / "node.conf").toFile()) })
|
||||
generateServiceIdentitiesForNotaryClusters(configs)
|
||||
val processes = startNodeInfoGeneration(nodeDirs)
|
||||
initialiseSerialization()
|
||||
try {
|
||||
println("Waiting for all nodes to generate their node-info files...")
|
||||
val nodeInfoFiles = gatherNodeInfoFiles(processes, nodeDirs)
|
||||
val nodeInfoFiles = generateNodeInfos(nodeDirs)
|
||||
println("Checking for duplicate nodes")
|
||||
checkForDuplicateLegalNames(nodeInfoFiles)
|
||||
println("Distributing all node-info files to all nodes")
|
||||
@ -129,10 +131,41 @@ class NetworkBootstrapper {
|
||||
println("Bootstrapping complete!")
|
||||
} finally {
|
||||
_contextSerializationEnv.set(null)
|
||||
processes.forEach { if (it.isAlive) it.destroyForcibly() }
|
||||
}
|
||||
}
|
||||
|
||||
private fun generateNodeInfos(nodeDirs: List<Path>): List<Path> {
|
||||
val numParallelProcesses = Runtime.getRuntime().availableProcessors()
|
||||
val timePerNode = 40.seconds // On the test machine, generating the node info takes 7 seconds for a single node.
|
||||
val tExpected = maxOf(timePerNode, timePerNode * nodeDirs.size.toLong() / numParallelProcesses.toLong())
|
||||
val warningTimer = Timer("WarnOnSlowMachines", false).schedule(tExpected.toMillis()) {
|
||||
println("...still waiting. If this is taking longer than usual, check the node logs.")
|
||||
}
|
||||
val executor = Executors.newFixedThreadPool(numParallelProcesses)
|
||||
return try {
|
||||
nodeDirs.map { executor.fork { generateNodeInfo(it) } }.transpose().getOrThrow()
|
||||
} finally {
|
||||
warningTimer.cancel()
|
||||
executor.shutdownNow()
|
||||
}
|
||||
}
|
||||
|
||||
private fun generateNodeInfo(nodeDir: Path): Path {
|
||||
val logsDir = (nodeDir / LOGS_DIR_NAME).createDirectories()
|
||||
val process = ProcessBuilder(nodeInfoGenCmd)
|
||||
.directory(nodeDir.toFile())
|
||||
.redirectErrorStream(true)
|
||||
.redirectOutput((logsDir / "node-info-gen.log").toFile())
|
||||
.apply { environment()["CAPSULE_CACHE_DIR"] = "../.cache" }
|
||||
.start()
|
||||
if (!process.waitFor(3, TimeUnit.MINUTES)) {
|
||||
process.destroyForcibly()
|
||||
throw IllegalStateException("Error while generating node info file. Please check the logs in $logsDir.")
|
||||
}
|
||||
check(process.exitValue() == 0) { "Error while generating node info file. Please check the logs in $logsDir." }
|
||||
return nodeDir.list { paths -> paths.filter { it.fileName.toString().startsWith(NODE_INFO_FILE_NAME_PREFIX) }.findFirst().get() }
|
||||
}
|
||||
|
||||
private fun generateDirectoriesIfNeeded(directory: Path, cordappJars: List<Path>) {
|
||||
val confFiles = directory.list { it.filter { it.toString().endsWith("_node.conf") }.toList() }
|
||||
val webServerConfFiles = directory.list { it.filter { it.toString().endsWith("_web-server.conf") }.toList() }
|
||||
@ -160,38 +193,6 @@ class NetworkBootstrapper {
|
||||
return cordaJarPath
|
||||
}
|
||||
|
||||
private fun startNodeInfoGeneration(nodeDirs: List<Path>): List<Process> {
|
||||
return nodeDirs.map { nodeDir ->
|
||||
val logsDir = (nodeDir / LOGS_DIR_NAME).createDirectories()
|
||||
ProcessBuilder(nodeInfoGenCmd)
|
||||
.directory(nodeDir.toFile())
|
||||
.redirectErrorStream(true)
|
||||
.redirectOutput((logsDir / "node-info-gen.log").toFile())
|
||||
.apply { environment()["CAPSULE_CACHE_DIR"] = "../.cache" }
|
||||
.start()
|
||||
}
|
||||
}
|
||||
|
||||
private fun gatherNodeInfoFiles(processes: List<Process>, nodeDirs: List<Path>): List<Path> {
|
||||
val executor = Executors.newSingleThreadExecutor()
|
||||
|
||||
val future = executor.fork {
|
||||
processes.zip(nodeDirs).map { (process, nodeDir) ->
|
||||
check(process.waitFor() == 0) {
|
||||
"Node in ${nodeDir.fileName} exited with ${process.exitValue()} when generating its node-info - see logs in ${nodeDir / LOGS_DIR_NAME}"
|
||||
}
|
||||
nodeDir.list { paths -> paths.filter { it.fileName.toString().startsWith(NODE_INFO_FILE_NAME_PREFIX) }.findFirst().get() }
|
||||
}
|
||||
}
|
||||
|
||||
return try {
|
||||
future.getOrThrow(timeout = 60.seconds)
|
||||
} catch (e: TimeoutException) {
|
||||
println("...still waiting. If this is taking longer than usual, check the node logs.")
|
||||
future.getOrThrow()
|
||||
}
|
||||
}
|
||||
|
||||
private fun distributeNodeInfos(nodeDirs: List<Path>, nodeInfoFiles: List<Path>) {
|
||||
for (nodeDir in nodeDirs) {
|
||||
val additionalNodeInfosDir = (nodeDir / CordformNode.NODE_INFO_DIRECTORY).createDirectories()
|
||||
|
Loading…
x
Reference in New Issue
Block a user