diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt index 5f6b1d5c0c..6636b2374c 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt @@ -8,6 +8,7 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.* import net.corda.core.internal.concurrent.fork +import net.corda.core.internal.concurrent.transpose import net.corda.core.node.NetworkParameters import net.corda.core.node.NodeInfo import net.corda.core.node.NotaryInfo @@ -31,11 +32,13 @@ import java.nio.file.Path import java.nio.file.Paths import java.nio.file.StandardCopyOption.REPLACE_EXISTING import java.time.Instant +import java.util.* import java.util.concurrent.Executors -import java.util.concurrent.TimeoutException +import java.util.concurrent.TimeUnit import kotlin.collections.component1 import kotlin.collections.component2 import kotlin.collections.set +import kotlin.concurrent.schedule import kotlin.streams.toList /** @@ -108,11 +111,10 @@ class NetworkBootstrapper { println("Nodes found in the following sub-directories: ${nodeDirs.map { it.fileName }}") val configs = nodeDirs.associateBy({ it }, { ConfigFactory.parseFile((it / "node.conf").toFile()) }) generateServiceIdentitiesForNotaryClusters(configs) - val processes = startNodeInfoGeneration(nodeDirs) initialiseSerialization() try { println("Waiting for all nodes to generate their node-info files...") - val nodeInfoFiles = gatherNodeInfoFiles(processes, nodeDirs) + val nodeInfoFiles = generateNodeInfos(nodeDirs) println("Checking for duplicate nodes") checkForDuplicateLegalNames(nodeInfoFiles) println("Distributing all node-info files to all nodes") @@ -129,10 +131,41 @@ class NetworkBootstrapper { println("Bootstrapping complete!") } finally { _contextSerializationEnv.set(null) - processes.forEach { if (it.isAlive) it.destroyForcibly() } } } + private fun generateNodeInfos(nodeDirs: List): List { + val numParallelProcesses = Runtime.getRuntime().availableProcessors() + val timePerNode = 40.seconds // On the test machine, generating the node info takes 7 seconds for a single node. + val tExpected = maxOf(timePerNode, timePerNode * nodeDirs.size.toLong() / numParallelProcesses.toLong()) + val warningTimer = Timer("WarnOnSlowMachines", false).schedule(tExpected.toMillis()) { + println("...still waiting. If this is taking longer than usual, check the node logs.") + } + val executor = Executors.newFixedThreadPool(numParallelProcesses) + return try { + nodeDirs.map { executor.fork { generateNodeInfo(it) } }.transpose().getOrThrow() + } finally { + warningTimer.cancel() + executor.shutdownNow() + } + } + + private fun generateNodeInfo(nodeDir: Path): Path { + val logsDir = (nodeDir / LOGS_DIR_NAME).createDirectories() + val process = ProcessBuilder(nodeInfoGenCmd) + .directory(nodeDir.toFile()) + .redirectErrorStream(true) + .redirectOutput((logsDir / "node-info-gen.log").toFile()) + .apply { environment()["CAPSULE_CACHE_DIR"] = "../.cache" } + .start() + if (!process.waitFor(3, TimeUnit.MINUTES)) { + process.destroyForcibly() + throw IllegalStateException("Error while generating node info file. Please check the logs in $logsDir.") + } + check(process.exitValue() == 0) { "Error while generating node info file. Please check the logs in $logsDir." } + return nodeDir.list { paths -> paths.filter { it.fileName.toString().startsWith(NODE_INFO_FILE_NAME_PREFIX) }.findFirst().get() } + } + private fun generateDirectoriesIfNeeded(directory: Path, cordappJars: List) { val confFiles = directory.list { it.filter { it.toString().endsWith("_node.conf") }.toList() } val webServerConfFiles = directory.list { it.filter { it.toString().endsWith("_web-server.conf") }.toList() } @@ -160,38 +193,6 @@ class NetworkBootstrapper { return cordaJarPath } - private fun startNodeInfoGeneration(nodeDirs: List): List { - return nodeDirs.map { nodeDir -> - val logsDir = (nodeDir / LOGS_DIR_NAME).createDirectories() - ProcessBuilder(nodeInfoGenCmd) - .directory(nodeDir.toFile()) - .redirectErrorStream(true) - .redirectOutput((logsDir / "node-info-gen.log").toFile()) - .apply { environment()["CAPSULE_CACHE_DIR"] = "../.cache" } - .start() - } - } - - private fun gatherNodeInfoFiles(processes: List, nodeDirs: List): List { - val executor = Executors.newSingleThreadExecutor() - - val future = executor.fork { - processes.zip(nodeDirs).map { (process, nodeDir) -> - check(process.waitFor() == 0) { - "Node in ${nodeDir.fileName} exited with ${process.exitValue()} when generating its node-info - see logs in ${nodeDir / LOGS_DIR_NAME}" - } - nodeDir.list { paths -> paths.filter { it.fileName.toString().startsWith(NODE_INFO_FILE_NAME_PREFIX) }.findFirst().get() } - } - } - - return try { - future.getOrThrow(timeout = 60.seconds) - } catch (e: TimeoutException) { - println("...still waiting. If this is taking longer than usual, check the node logs.") - future.getOrThrow() - } - } - private fun distributeNodeInfos(nodeDirs: List, nodeInfoFiles: List) { for (nodeDir in nodeDirs) { val additionalNodeInfosDir = (nodeDir / CordformNode.NODE_INFO_DIRECTORY).createDirectories()