mirror of
https://github.com/corda/corda.git
synced 2025-06-14 05:08:18 +00:00
CORDA-830 Introducing the network bootstrapper
Copying of the node-info files moved out of Cordform and into NetworkParametersGenerator (which is now called NetworkBootstrapper). This class becomes an external tool to enable deployment of nodes in a test setup on a single filesystem.
This commit is contained in:
@ -1,31 +0,0 @@
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.crypto.entropyToKeyPair
|
||||
import net.corda.core.crypto.sign
|
||||
import net.corda.core.internal.copyTo
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.serialization.serialize
|
||||
import java.math.BigInteger
|
||||
import java.nio.file.FileAlreadyExistsException
|
||||
import java.nio.file.Path
|
||||
|
||||
class NetworkParametersCopier(networkParameters: NetworkParameters) {
|
||||
private companion object {
|
||||
val DUMMY_MAP_KEY = entropyToKeyPair(BigInteger.valueOf(123))
|
||||
}
|
||||
|
||||
private val serializedNetworkParameters = networkParameters.let {
|
||||
val serialize = it.serialize()
|
||||
val signature = DUMMY_MAP_KEY.sign(serialize)
|
||||
SignedData(serialize, signature).serialize()
|
||||
}
|
||||
|
||||
fun install(dir: Path) {
|
||||
try {
|
||||
serializedNetworkParameters.open().copyTo(dir / NETWORK_PARAMS_FILE_NAME)
|
||||
} catch (e: FileAlreadyExistsException) {
|
||||
// Leave the file untouched if it already exists
|
||||
}
|
||||
}
|
||||
}
|
@ -1,107 +0,0 @@
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.list
|
||||
import net.corda.core.internal.readAll
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||
import net.corda.core.serialization.internal._contextSerializationEnv
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
|
||||
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import java.nio.file.Path
|
||||
import java.time.Instant
|
||||
|
||||
/**
|
||||
* This class is loaded by Cordform using reflection to generate the network parameters. It is assumed that Cordform has
|
||||
* already asked each node to generate its node info file.
|
||||
*/
|
||||
@Suppress("UNUSED")
|
||||
class NetworkParametersGenerator {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
fun run(nodesDirs: List<Path>) {
|
||||
logger.info("NetworkParameters generation using node directories: $nodesDirs")
|
||||
try {
|
||||
initialiseSerialization()
|
||||
val notaryInfos = gatherNotaryIdentities(nodesDirs)
|
||||
val copier = NetworkParametersCopier(NetworkParameters(
|
||||
minimumPlatformVersion = 1,
|
||||
notaries = notaryInfos,
|
||||
modifiedTime = Instant.now(),
|
||||
maxMessageSize = 10485760,
|
||||
maxTransactionSize = 40000,
|
||||
epoch = 1
|
||||
))
|
||||
nodesDirs.forEach(copier::install)
|
||||
} finally {
|
||||
_contextSerializationEnv.set(null)
|
||||
}
|
||||
}
|
||||
|
||||
private fun gatherNotaryIdentities(nodesDirs: List<Path>): List<NotaryInfo> {
|
||||
return nodesDirs.mapNotNull { nodeDir ->
|
||||
val nodeConfig = ConfigFactory.parseFile((nodeDir / "node.conf").toFile())
|
||||
if (nodeConfig.hasPath("notary")) {
|
||||
val validating = nodeConfig.getConfig("notary").getBoolean("validating")
|
||||
val nodeInfoFile = nodeDir.list { paths -> paths.filter { it.fileName.toString().startsWith("nodeInfo-") }.findFirst().get() }
|
||||
processFile(nodeInfoFile)?.let { NotaryInfo(it.notaryIdentity(), validating) }
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}.distinct() // We need distinct as nodes part of a distributed notary share the same notary identity
|
||||
}
|
||||
|
||||
private fun NodeInfo.notaryIdentity(): Party {
|
||||
return when (legalIdentities.size) {
|
||||
// Single node notaries have just one identity like all other nodes. This identity is the notary identity
|
||||
1 -> legalIdentities[0]
|
||||
// Nodes which are part of a distributed notary have a second identity which is the composite identity of the
|
||||
// cluster and is shared by all the other members. This is the notary identity.
|
||||
2 -> legalIdentities[1]
|
||||
else -> throw IllegalArgumentException("Not sure how to get the notary identity in this scenerio: $this")
|
||||
}
|
||||
}
|
||||
|
||||
private fun processFile(file: Path): NodeInfo? {
|
||||
return try {
|
||||
logger.info("Reading NodeInfo from file: $file")
|
||||
val signedData = file.readAll().deserialize<SignedNodeInfo>()
|
||||
signedData.verified()
|
||||
} catch (e: Exception) {
|
||||
logger.warn("Exception parsing NodeInfo from file. $file", e)
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
// We need to to set serialization env, because generation of parameters is run from Cordform.
|
||||
// KryoServerSerializationScheme is not accessible from nodeapi.
|
||||
private fun initialiseSerialization() {
|
||||
_contextSerializationEnv.set(SerializationEnvironmentImpl(
|
||||
SerializationFactoryImpl().apply {
|
||||
registerScheme(KryoParametersSerializationScheme)
|
||||
registerScheme(AMQPServerSerializationScheme())
|
||||
},
|
||||
AMQP_P2P_CONTEXT)
|
||||
)
|
||||
}
|
||||
|
||||
private object KryoParametersSerializationScheme : AbstractKryoSerializationScheme() {
|
||||
override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean {
|
||||
return byteSequence == KryoHeaderV0_1 && target == SerializationContext.UseCase.P2P
|
||||
}
|
||||
override fun rpcClientKryoPool(context: SerializationContext) = throw UnsupportedOperationException()
|
||||
override fun rpcServerKryoPool(context: SerializationContext) = throw UnsupportedOperationException()
|
||||
}
|
||||
}
|
@ -0,0 +1,167 @@
|
||||
package net.corda.nodeapi.internal.network
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import net.corda.cordform.CordformNode
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||
import net.corda.core.serialization.internal._contextSerializationEnv
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
|
||||
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.nio.file.StandardCopyOption
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.TimeUnit.SECONDS
|
||||
import kotlin.streams.toList
|
||||
|
||||
/**
|
||||
* Class to bootstrap a local network of Corda nodes on the same filesystem.
|
||||
*/
|
||||
class NetworkBootstrapper {
|
||||
companion object {
|
||||
// TODO This will probably need to change once we start using a bundled JVM
|
||||
private val nodeInfoGenCmd = listOf(
|
||||
"java",
|
||||
"-jar",
|
||||
"corda.jar",
|
||||
"--just-generate-node-info"
|
||||
)
|
||||
|
||||
private const val LOGS_DIR_NAME = "logs"
|
||||
|
||||
@JvmStatic
|
||||
fun main(args: Array<String>) {
|
||||
val arg = args.singleOrNull() ?: throw IllegalArgumentException("Expecting single argument which is the nodes' parent directory")
|
||||
NetworkBootstrapper().bootstrap(Paths.get(arg).toAbsolutePath().normalize())
|
||||
}
|
||||
}
|
||||
|
||||
fun bootstrap(directory: Path) {
|
||||
directory.createDirectories()
|
||||
println("Bootstrapping local network in $directory")
|
||||
val nodeDirs = directory.list { paths -> paths.filter { (it / "corda.jar").exists() }.toList() }
|
||||
require(nodeDirs.isNotEmpty()) { "No nodes found" }
|
||||
println("Nodes found in the following sub-directories: ${nodeDirs.map { it.fileName }}")
|
||||
val processes = startNodeInfoGeneration(nodeDirs)
|
||||
initialiseSerialization()
|
||||
try {
|
||||
println("Waiting for all nodes to generate their node-info files")
|
||||
val nodeInfoFiles = gatherNodeInfoFiles(processes, nodeDirs)
|
||||
println("Distributing all node info-files to all nodes")
|
||||
distributeNodeInfos(nodeDirs, nodeInfoFiles)
|
||||
println("Gathering notary identities")
|
||||
val notaryInfos = gatherNotaryInfos(nodeInfoFiles)
|
||||
println("Notary identities to be used in network-parameters file: ${notaryInfos.joinToString("; ") { it.prettyPrint() }}")
|
||||
installNetworkParameters(notaryInfos, nodeDirs)
|
||||
println("Bootstrapping complete!")
|
||||
} finally {
|
||||
_contextSerializationEnv.set(null)
|
||||
processes.forEach { if (it.isAlive) it.destroyForcibly() }
|
||||
}
|
||||
}
|
||||
|
||||
private fun startNodeInfoGeneration(nodeDirs: List<Path>): List<Process> {
|
||||
return nodeDirs.map { nodeDir ->
|
||||
val logsDir = (nodeDir / LOGS_DIR_NAME).createDirectories()
|
||||
ProcessBuilder(nodeInfoGenCmd)
|
||||
.directory(nodeDir.toFile())
|
||||
.redirectErrorStream(true)
|
||||
.redirectOutput((logsDir / "node-info-gen.log").toFile())
|
||||
.apply { environment()["CAPSULE_CACHE_DIR"] = "../.cache" }
|
||||
.start()
|
||||
}
|
||||
}
|
||||
|
||||
private fun gatherNodeInfoFiles(processes: List<Process>, nodeDirs: List<Path>): List<Path> {
|
||||
val timeOutInSeconds = 60L
|
||||
return processes.zip(nodeDirs).map { (process, nodeDir) ->
|
||||
check(process.waitFor(timeOutInSeconds, SECONDS)) {
|
||||
"Node in ${nodeDir.fileName} took longer than ${timeOutInSeconds}s to generate its node-info - see logs in ${nodeDir / LOGS_DIR_NAME}"
|
||||
}
|
||||
check(process.exitValue() == 0) {
|
||||
"Node in ${nodeDir.fileName} exited with ${process.exitValue()} when generating its node-info - see logs in ${nodeDir / LOGS_DIR_NAME}"
|
||||
}
|
||||
nodeDir.list { paths -> paths.filter { it.fileName.toString().startsWith("nodeInfo-") }.findFirst().get() }
|
||||
}
|
||||
}
|
||||
|
||||
private fun distributeNodeInfos(nodeDirs: List<Path>, nodeInfoFiles: List<Path>) {
|
||||
for (nodeDir in nodeDirs) {
|
||||
val additionalNodeInfosDir = (nodeDir / CordformNode.NODE_INFO_DIRECTORY).createDirectories()
|
||||
for (nodeInfoFile in nodeInfoFiles) {
|
||||
nodeInfoFile.copyToDirectory(additionalNodeInfosDir, StandardCopyOption.REPLACE_EXISTING)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun gatherNotaryInfos(nodeInfoFiles: List<Path>): List<NotaryInfo> {
|
||||
return nodeInfoFiles.mapNotNull { nodeInfoFile ->
|
||||
// The config contains the notary type
|
||||
val nodeConfig = ConfigFactory.parseFile((nodeInfoFile.parent / "node.conf").toFile())
|
||||
if (nodeConfig.hasPath("notary")) {
|
||||
val validating = nodeConfig.getConfig("notary").getBoolean("validating")
|
||||
// And the node-info file contains the notary's identity
|
||||
val nodeInfo = nodeInfoFile.readAll().deserialize<SignedNodeInfo>().verified()
|
||||
NotaryInfo(nodeInfo.notaryIdentity(), validating)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}.distinct() // We need distinct as nodes part of a distributed notary share the same notary identity
|
||||
}
|
||||
|
||||
private fun installNetworkParameters(notaryInfos: List<NotaryInfo>, nodeDirs: List<Path>) {
|
||||
// TODO Add config for minimumPlatformVersion, maxMessageSize and maxTransactionSize
|
||||
val copier = NetworkParametersCopier(NetworkParameters(
|
||||
minimumPlatformVersion = 1,
|
||||
notaries = notaryInfos,
|
||||
modifiedTime = Instant.now(),
|
||||
maxMessageSize = 10485760,
|
||||
maxTransactionSize = 40000,
|
||||
epoch = 1
|
||||
), overwriteFile = true)
|
||||
|
||||
nodeDirs.forEach(copier::install)
|
||||
}
|
||||
|
||||
private fun NotaryInfo.prettyPrint(): String = "${identity.name} (${if (validating) "" else "non-"}validating)"
|
||||
|
||||
private fun NodeInfo.notaryIdentity(): Party {
|
||||
return when (legalIdentities.size) {
|
||||
// Single node notaries have just one identity like all other nodes. This identity is the notary identity
|
||||
1 -> legalIdentities[0]
|
||||
// Nodes which are part of a distributed notary have a second identity which is the composite identity of the
|
||||
// cluster and is shared by all the other members. This is the notary identity.
|
||||
2 -> legalIdentities[1]
|
||||
else -> throw IllegalArgumentException("Not sure how to get the notary identity in this scenerio: $this")
|
||||
}
|
||||
}
|
||||
|
||||
// We need to to set serialization env, because generation of parameters is run from Cordform.
|
||||
// KryoServerSerializationScheme is not accessible from nodeapi.
|
||||
private fun initialiseSerialization() {
|
||||
_contextSerializationEnv.set(SerializationEnvironmentImpl(
|
||||
SerializationFactoryImpl().apply {
|
||||
registerScheme(KryoParametersSerializationScheme)
|
||||
registerScheme(AMQPServerSerializationScheme())
|
||||
},
|
||||
AMQP_P2P_CONTEXT)
|
||||
)
|
||||
}
|
||||
|
||||
private object KryoParametersSerializationScheme : AbstractKryoSerializationScheme() {
|
||||
override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean {
|
||||
return byteSequence == KryoHeaderV0_1 && target == SerializationContext.UseCase.P2P
|
||||
}
|
||||
override fun rpcClientKryoPool(context: SerializationContext) = throw UnsupportedOperationException()
|
||||
override fun rpcServerKryoPool(context: SerializationContext) = throw UnsupportedOperationException()
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package net.corda.nodeapi.internal
|
||||
package net.corda.nodeapi.internal.network
|
||||
|
||||
import net.corda.core.crypto.DigitalSignature
|
||||
import net.corda.core.crypto.SecureHash
|
||||
@ -15,7 +15,7 @@ import java.security.cert.X509Certificate
|
||||
import java.time.Instant
|
||||
|
||||
const val NETWORK_PARAMS_FILE_NAME = "network-parameters"
|
||||
// TODO: Need more discussion on rather we should move this class out of internal.
|
||||
|
||||
/**
|
||||
* Data class containing hash of [NetworkParameters] and network participant's [NodeInfo] hashes.
|
||||
*/
|
@ -0,0 +1,35 @@
|
||||
package net.corda.nodeapi.internal.network
|
||||
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.crypto.sign
|
||||
import net.corda.core.internal.copyTo
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import java.nio.file.FileAlreadyExistsException
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardCopyOption
|
||||
import java.security.KeyPair
|
||||
|
||||
class NetworkParametersCopier(
|
||||
networkParameters: NetworkParameters,
|
||||
signingKeyPair: KeyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME),
|
||||
overwriteFile: Boolean = false
|
||||
) {
|
||||
private val copyOptions = if (overwriteFile) arrayOf(StandardCopyOption.REPLACE_EXISTING) else emptyArray()
|
||||
private val serializedNetworkParameters = networkParameters.let {
|
||||
val serialize = it.serialize()
|
||||
val signature = signingKeyPair.sign(serialize)
|
||||
SignedData(serialize, signature).serialize()
|
||||
}
|
||||
|
||||
fun install(nodeDir: Path) {
|
||||
try {
|
||||
serializedNetworkParameters.open().copyTo(nodeDir / NETWORK_PARAMS_FILE_NAME, *copyOptions)
|
||||
} catch (e: FileAlreadyExistsException) {
|
||||
// This is only thrown if the file already exists and we didn't specify to overwrite it. In that case we
|
||||
// ignore this exception as we're happy with the existing file.
|
||||
}
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package net.corda.nodeapi.internal
|
||||
package net.corda.nodeapi.internal.network
|
||||
|
||||
import net.corda.cordform.CordformNode
|
||||
import net.corda.core.internal.ThreadBox
|
||||
@ -65,7 +65,6 @@ class NodeInfoFilesCopier(scheduler: Scheduler = Schedulers.io()) : AutoCloseabl
|
||||
}
|
||||
|
||||
/**
|
||||
* @param nodeConfig the configuration to be removed.
|
||||
* Remove the configuration of a node which is about to be stopped or already stopped.
|
||||
* No files written by that node will be copied to other nodes, nor files from other nodes will be copied to this
|
||||
* one.
|
@ -1,7 +1,7 @@
|
||||
package net.corda.nodeapi
|
||||
package net.corda.nodeapi.internal.network
|
||||
|
||||
import net.corda.cordform.CordformNode
|
||||
import net.corda.nodeapi.internal.NodeInfoFilesCopier
|
||||
import net.corda.nodeapi.eventually
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
@ -14,14 +14,7 @@ import java.util.concurrent.TimeUnit
|
||||
import kotlin.streams.toList
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
/**
|
||||
* tests for [NodeInfoFilesCopier]
|
||||
*/
|
||||
class NodeInfoFilesCopierTest {
|
||||
|
||||
@Rule @JvmField var folder = TemporaryFolder()
|
||||
private val rootPath get() = folder.root.toPath()
|
||||
private val scheduler = TestScheduler()
|
||||
companion object {
|
||||
private const val ORGANIZATION = "Organization"
|
||||
private const val NODE_1_PATH = "node1"
|
||||
@ -33,6 +26,13 @@ class NodeInfoFilesCopierTest {
|
||||
private val BAD_NODE_INFO_NAME = "something"
|
||||
}
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val folder = TemporaryFolder()
|
||||
|
||||
private val rootPath get() = folder.root.toPath()
|
||||
private val scheduler = TestScheduler()
|
||||
|
||||
private fun nodeDir(nodeBaseDir : String) = rootPath.resolve(nodeBaseDir).resolve(ORGANIZATION.toLowerCase())
|
||||
|
||||
private val node1RootPath by lazy { nodeDir(NODE_1_PATH) }
|
||||
@ -40,7 +40,7 @@ class NodeInfoFilesCopierTest {
|
||||
private val node1AdditionalNodeInfoPath by lazy { node1RootPath.resolve(CordformNode.NODE_INFO_DIRECTORY) }
|
||||
private val node2AdditionalNodeInfoPath by lazy { node2RootPath.resolve(CordformNode.NODE_INFO_DIRECTORY) }
|
||||
|
||||
lateinit var nodeInfoFilesCopier: NodeInfoFilesCopier
|
||||
private lateinit var nodeInfoFilesCopier: NodeInfoFilesCopier
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
Reference in New Issue
Block a user