NOTICK - fix network builder for v4 (#5205)

make builder more parallel by making docker node building async
This commit is contained in:
Stefano Franz 2019-06-26 17:53:32 +00:00 committed by Katelyn Baker
parent 5713865702
commit 6c05197954
4 changed files with 65 additions and 32 deletions

View File

@ -5,8 +5,10 @@ import net.corda.bootstrapper.context.Context
import net.corda.bootstrapper.nodes.*
import net.corda.bootstrapper.notaries.NotaryCopier
import net.corda.bootstrapper.notaries.NotaryFinder
import net.corda.node.utilities.NamedThreadFactory
import java.io.File
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
interface NetworkBuilder {
@ -31,12 +33,10 @@ interface NetworkBuilder {
fun onNodeStartBuild(callback: (FoundNode) -> Unit): NetworkBuilder
fun onNodePushStart(callback: (BuiltNode) -> Unit): NetworkBuilder
fun onNodeInstancesRequested(callback: (List<NodeInstanceRequest>) -> Unit): NetworkBuilder
}
private class NetworkBuilderImpl : NetworkBuilder {
@Volatile
private var onNodeLocatedCallback: ((FoundNode) -> Unit) = {}
@Volatile
@ -76,7 +76,6 @@ private class NetworkBuilderImpl : NetworkBuilder {
return this
}
override fun onNodeStartBuild(callback: (FoundNode) -> Unit): NetworkBuilder {
this.onNodeBuildStartCallback = callback
return this;
@ -127,8 +126,10 @@ private class NetworkBuilderImpl : NetworkBuilder {
return this;
}
override fun build(): CompletableFuture<Pair<List<NodeInstance>, Context>> {
val executor = Executors.newCachedThreadPool(NamedThreadFactory("network-builder"))
val cacheDir = File(workingDir, cacheDirName)
val baseDir = workingDir!!
val context = Context(networkName, backendType, backendOptions)
@ -163,12 +164,21 @@ private class NetworkBuilderImpl : NetworkBuilder {
val notariesFuture = notaryDiscoveryFuture.thenCompose { copiedNotaries ->
copiedNotaries
.map { copiedNotary ->
nodeBuilder.buildNode(copiedNotary).also(onNodeBuiltCallback)
}.map { builtNotary ->
onNodePushStartCallback(builtNotary)
nodePusher.pushNode(builtNotary).thenApply { it.also(onNodePushedCallback) }
nodeBuilder.buildNode(copiedNotary).thenAlsoAsync {
onNodeBuildStartCallback.invoke(it)
}
}.map { builtNotaryFuture ->
builtNotaryFuture.thenComposeAsync { builtNotary ->
onNodeBuiltCallback(builtNotary)
onNodePushStartCallback(builtNotary)
nodePusher.pushNode(builtNotary).thenAlsoAsync { pushedNotary ->
onNodePushedCallback(pushedNotary)
}
}
}.map { pushedNotary ->
pushedNotary.thenApplyAsync { nodeInstantiator.createInstanceRequest(it).also { onNodeInstanceRequestedCallback.invoke(listOf(it)) } }
pushedNotary.thenApplyAsync {
nodeInstantiator.createInstanceRequest(it).also { onNodeInstanceRequestedCallback(listOf(it)) }
}
}.map { instanceRequest ->
instanceRequest.thenComposeAsync { request ->
nodeInstantiator.instantiateNotaryInstance(request).thenApply { it.also(onNodeInstanceCallback) }
@ -185,17 +195,16 @@ private class NetworkBuilderImpl : NetworkBuilder {
}
}.map { copiedNode: CopiedNode ->
onNodeBuildStartCallback.invoke(copiedNode)
nodeBuilder.buildNode(copiedNode).let {
onNodeBuiltCallback.invoke(it)
it
nodeBuilder.buildNode(copiedNode)
}.map { builtNodeFuture ->
builtNodeFuture.thenComposeAsync { builtNode ->
onNodeBuiltCallback.invoke(builtNode)
nodePusher.pushNode(builtNode).thenAlsoAsync { pushedNode ->
onNodePushedCallback.invoke(pushedNode)
}
}
}.map { builtNode ->
nodePusher.pushNode(builtNode).thenApplyAsync {
onNodePushedCallback.invoke(it)
it
}
}.map { pushedNode ->
pushedNode.thenApplyAsync {
}.map { pushedNodeFuture ->
pushedNodeFuture.thenApplyAsync {
nodeInstantiator.createInstanceRequests(it, nodeCount).also(onNodeInstanceRequestedCallback)
}
@ -213,10 +222,10 @@ private class NetworkBuilderImpl : NetworkBuilder {
}.toSingleFuture()
}.thenCompose { it }.thenApplyAsync { it.flatten() }
return notariesFuture.thenCombineAsync(nodesFuture, { _, nodeInstances ->
return notariesFuture.thenCombineAsync(nodesFuture) { _, nodeInstances ->
context.networkInitiated = true
nodeInstances to context
})
}
}
}
@ -225,3 +234,10 @@ fun <T> List<CompletableFuture<T>>.toSingleFuture(): CompletableFuture<List<T>>
this.map { it.getNow(null) }
}
}
fun <T> CompletableFuture<T>.thenAlsoAsync(consumer: (T) -> Unit): CompletableFuture<T> {
return this.thenApplyAsync {
consumer(it)
it
}
}

View File

@ -1,5 +1,7 @@
package net.corda.bootstrapper.nodes
import com.github.dockerjava.api.model.BuildResponseItem
import com.github.dockerjava.core.async.ResultCallbackTemplate
import com.github.dockerjava.core.command.BuildImageResultCallback
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
@ -11,6 +13,7 @@ import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.parseAsNodeConfiguration
import org.slf4j.LoggerFactory
import java.io.File
import java.util.concurrent.CompletableFuture
open class NodeBuilder {
@ -18,7 +21,10 @@ open class NodeBuilder {
val LOG = LoggerFactory.getLogger(NodeBuilder::class.java)
}
fun buildNode(copiedNode: CopiedNode): BuiltNode {
fun buildNode(copiedNode: CopiedNode): CompletableFuture<BuiltNode> {
val future: CompletableFuture<BuiltNode> = CompletableFuture()
val localDockerClient = DockerUtils.createLocalDockerClient()
val copiedNodeConfig = copiedNode.copiedNodeConfig
val nodeDir = copiedNodeConfig.parentFile
@ -27,15 +33,27 @@ open class NodeBuilder {
}
val nodeConfig = ConfigFactory.parseFile(copiedNodeConfig)
LOG.info("starting to build docker image for: $nodeDir")
val nodeImageId = localDockerClient.buildImageCmd()
localDockerClient.buildImageCmd()
.withDockerfile(File(nodeDir, "Dockerfile"))
.withBaseDirectory(nodeDir)
.exec(BuildImageResultCallback()).awaitImageId()
LOG.info("finished building docker image for: $nodeDir with id: $nodeImageId")
val config = nodeConfig.parseAsNodeConfigWithFallback(ConfigFactory.parseFile(copiedNode.configFile)).value()
return copiedNode.builtNode(config, nodeImageId)
}
.withBaseDirectory(nodeDir).exec(object : ResultCallbackTemplate<BuildImageResultCallback, BuildResponseItem>() {
var result: BuildResponseItem? = null
override fun onNext(`object`: BuildResponseItem?) {
this.result = `object`
}
override fun onError(throwable: Throwable?) {
future.completeExceptionally(throwable)
}
override fun onComplete() {
super.onComplete()
LOG.info("finished building docker image for: $nodeDir with id: ${result?.imageId}")
val config = nodeConfig.parseAsNodeConfigWithFallback(ConfigFactory.parseFile(copiedNode.configFile)).value()
future.complete(copiedNode.builtNode(config, result?.imageId!!))
}
})
return future
}
}
fun Config.parseAsNodeConfigWithFallback(preCopyConfig: Config): Validated<NodeConfiguration, Configuration.Validation.Error> {

View File

@ -1,5 +1,4 @@
# Base image from (http://phusion.github.io/baseimage-docker)
FROM openjdk:8u151-jre-alpine
FROM openjdk:8u212-jre-alpine
RUN apk upgrade --update && \
apk add --update --no-cache bash iputils && \

View File

@ -1,5 +1,5 @@
# Base image from (http://phusion.github.io/baseimage-docker)
FROM openjdk:8u151-jre-alpine
FROM openjdk:8u212-jre-alpine
RUN apk upgrade --update && \
apk add --update --no-cache bash iputils && \