[CORDA-442] let Driver run without network map (#1890)

* [CORDA-442] let Driver run without network map

- Nodes started by driver run without a networkMapNode.

- Driver does not take a networkMapStartStrategy anymore

- a new parameter in the configuration "noNetworkMapServiceMode" allows for a node not to be a networkMapNode nor to connect to one.

- Driver now waits for each node to write its own NodeInfo file to disk and then copies it into each other node.

- When driver starts a node N, it waits for every node to be have N nodes in their network map.

Note: the code to copy around the NodeInfo files was already in DemoBench, the NodeInfoFilesCopier class was just moved from DemoBench into core (I'm very open to core not being the best place, please advise)
This commit is contained in:
Alberto Arri 2017-10-18 13:49:32 +01:00 committed by GitHub
parent b4c53647cd
commit b33b013284
18 changed files with 316 additions and 279 deletions

View File

@ -92,6 +92,7 @@ class RPCStabilityTests {
startAndStop()
}
val numberOfThreadsAfter = waitUntilNumberOfThreadsStable(executor)
assertTrue(numberOfThreadsBefore >= numberOfThreadsAfter)
executor.shutdownNow()
}

View File

@ -18,6 +18,8 @@ dependencies {
// TODO: Remove this dependency and the code that requires it
compile "commons-fileupload:commons-fileupload:$fileupload_version"
compile "net.corda.plugins:cordform-common:$gradle_plugins_version"
// TypeSafe Config: for simple and human friendly config files.
compile "com.typesafe:config:$typesafe_config_version"

View File

@ -0,0 +1,165 @@
package net.corda.nodeapi
import net.corda.cordform.CordformNode
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.createDirectories
import net.corda.core.internal.isRegularFile
import net.corda.core.internal.list
import net.corda.core.utilities.loggerFor
import rx.Observable
import rx.Scheduler
import rx.Subscription
import rx.schedulers.Schedulers
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption.COPY_ATTRIBUTES
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.attribute.FileTime
import java.util.concurrent.TimeUnit
/**
* Utility class which copies nodeInfo files across a set of running nodes.
*
* This class will create paths that it needs to poll and to where it needs to copy files in case those
* don't exist yet.
*/
class NodeInfoFilesCopier(scheduler: Scheduler = Schedulers.io()) : AutoCloseable {
companion object {
private val log = loggerFor<NodeInfoFilesCopier>()
const val NODE_INFO_FILE_NAME_PREFIX = "nodeInfo-"
}
private val nodeDataMapBox = ThreadBox(mutableMapOf<Path, NodeData>())
/**
* Whether the NodeInfoFilesCopier is closed. When the NodeInfoFilesCopier is closed it will stop polling the
* filesystem and all the public methods except [#close] will throw.
*/
private var closed = false
private val subscription: Subscription
init {
this.subscription = Observable.interval(5, TimeUnit.SECONDS, scheduler)
.subscribe { poll() }
}
/**
* @param nodeDir a path to be watched for NodeInfos
* Add a path of a node which is about to be started.
* Its nodeInfo file will be copied to other nodes' additional-node-infos directory, and conversely,
* other nodes' nodeInfo files will be copied to this node additional-node-infos directory.
*/
fun addConfig(nodeDir: Path) {
require(!closed) { "NodeInfoFilesCopier is already closed" }
nodeDataMapBox.locked {
val newNodeFile = NodeData(nodeDir)
put(nodeDir, newNodeFile)
for (previouslySeenFile in allPreviouslySeenFiles()) {
atomicCopy(previouslySeenFile, newNodeFile.additionalNodeInfoDirectory.resolve(previouslySeenFile.fileName))
}
log.info("Now watching: $nodeDir")
}
}
/**
* @param nodeConfig the configuration to be removed.
* Remove the configuration of a node which is about to be stopped or already stopped.
* No files written by that node will be copied to other nodes, nor files from other nodes will be copied to this
* one.
*/
fun removeConfig(nodeDir: Path) {
require(!closed) { "NodeInfoFilesCopier is already closed" }
nodeDataMapBox.locked {
remove(nodeDir) ?: return
log.info("Stopped watching: $nodeDir")
}
}
fun reset() {
require(!closed) { "NodeInfoFilesCopier is already closed" }
nodeDataMapBox.locked {
clear()
}
}
/**
* Stops polling the filesystem.
* This function can be called as many times as one wants.
*/
override fun close() {
if (!closed) {
closed = true
subscription.unsubscribe()
}
}
private fun allPreviouslySeenFiles() = nodeDataMapBox.alreadyLocked { values.flatMap { it.previouslySeenFiles.keys } }
private fun poll() {
nodeDataMapBox.locked {
for (nodeData in values) {
nodeData.nodeDir.list { paths ->
paths.filter { it.isRegularFile() }
.filter { it.fileName.toString().startsWith(NODE_INFO_FILE_NAME_PREFIX) }
.forEach { path -> processPath(nodeData, path) }
}
}
}
}
// Takes a path under nodeData config dir and decides whether the file represented by that path needs to
// be copied.
private fun processPath(nodeData: NodeData, path: Path) {
nodeDataMapBox.alreadyLocked {
val newTimestamp = Files.readAttributes(path, BasicFileAttributes::class.java).lastModifiedTime()
val previousTimestamp = nodeData.previouslySeenFiles.put(path, newTimestamp) ?: FileTime.fromMillis(-1)
if (newTimestamp > previousTimestamp) {
for (destination in this.values.filter { it.nodeDir != nodeData.nodeDir }.map { it.additionalNodeInfoDirectory }) {
val fullDestinationPath = destination.resolve(path.fileName)
atomicCopy(path, fullDestinationPath)
}
}
}
}
private fun atomicCopy(source: Path, destination: Path) {
val tempDestination = try {
Files.createTempFile(destination.parent, "", null)
} catch (exception: IOException) {
log.warn("Couldn't create a temporary file to copy $source", exception)
throw exception
}
try {
// First copy the file to a temporary file within the appropriate directory.
Files.copy(source, tempDestination, COPY_ATTRIBUTES, REPLACE_EXISTING)
} catch (exception: IOException) {
log.warn("Couldn't copy $source to $tempDestination.", exception)
Files.delete(tempDestination)
throw exception
}
try {
// Then rename it to the desired name. This way the file 'appears' on the filesystem as an atomic operation.
Files.move(tempDestination, destination, REPLACE_EXISTING)
} catch (exception: IOException) {
log.warn("Couldn't move $tempDestination to $destination.", exception)
Files.delete(tempDestination)
throw exception
}
}
/**
* Convenience holder for all the paths and files relative to a single node.
*/
private class NodeData(val nodeDir: Path) {
val additionalNodeInfoDirectory: Path = nodeDir.resolve(CordformNode.NODE_INFO_DIRECTORY)
// Map from Path to its lastModifiedTime.
val previouslySeenFiles = mutableMapOf<Path, FileTime>()
init {
additionalNodeInfoDirectory.createDirectories()
}
}
}

View File

@ -1,8 +1,6 @@
package net.corda.demobench.model
package net.corda.nodeapi
import net.corda.cordform.CordformNode
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.testing.eventually
import org.junit.Before
import org.junit.Rule
@ -30,17 +28,13 @@ class NodeInfoFilesCopierTest {
private const val NODE_2_PATH = "node2"
private val content = "blah".toByteArray(Charsets.UTF_8)
private val GOOD_NODE_INFO_NAME = "nodeInfo-test"
private val GOOD_NODE_INFO_NAME_2 = "nodeInfo-anotherNode"
private val GOOD_NODE_INFO_NAME = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}test"
private val GOOD_NODE_INFO_NAME_2 = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}anotherNode"
private val BAD_NODE_INFO_NAME = "something"
private val legalName = CordaX500Name(organisation = ORGANIZATION, locality = "Nowhere", country = "GB")
private val hostAndPort = NetworkHostAndPort("localhost", 1)
}
private fun nodeDir(nodeBaseDir : String) = rootPath.resolve(nodeBaseDir).resolve(ORGANIZATION.toLowerCase())
private val node1Config by lazy { createConfig(NODE_1_PATH) }
private val node2Config by lazy { createConfig(NODE_2_PATH) }
private val node1RootPath by lazy { nodeDir(NODE_1_PATH) }
private val node2RootPath by lazy { nodeDir(NODE_2_PATH) }
private val node1AdditionalNodeInfoPath by lazy { node1RootPath.resolve(CordformNode.NODE_INFO_DIRECTORY) }
@ -56,7 +50,7 @@ class NodeInfoFilesCopierTest {
@Test
fun `files created before a node is started are copied to that node`() {
// Configure the first node.
nodeInfoFilesCopier.addConfig(node1Config)
nodeInfoFilesCopier.addConfig(node1RootPath)
// Ensure directories are created.
advanceTime()
@ -65,7 +59,7 @@ class NodeInfoFilesCopierTest {
Files.write(node1RootPath.resolve(BAD_NODE_INFO_NAME), content)
// Configure the second node.
nodeInfoFilesCopier.addConfig(node2Config)
nodeInfoFilesCopier.addConfig(node2RootPath)
advanceTime()
eventually<AssertionError, Unit>(Duration.ofMinutes(1)) {
@ -77,8 +71,8 @@ class NodeInfoFilesCopierTest {
@Test
fun `polling of running nodes`() {
// Configure 2 nodes.
nodeInfoFilesCopier.addConfig(node1Config)
nodeInfoFilesCopier.addConfig(node2Config)
nodeInfoFilesCopier.addConfig(node1RootPath)
nodeInfoFilesCopier.addConfig(node2RootPath)
advanceTime()
// Create 2 files, one of which to be copied, in a node root path.
@ -95,8 +89,8 @@ class NodeInfoFilesCopierTest {
@Test
fun `remove nodes`() {
// Configure 2 nodes.
nodeInfoFilesCopier.addConfig(node1Config)
nodeInfoFilesCopier.addConfig(node2Config)
nodeInfoFilesCopier.addConfig(node1RootPath)
nodeInfoFilesCopier.addConfig(node2RootPath)
advanceTime()
// Create a file, in node 2 root path.
@ -104,7 +98,7 @@ class NodeInfoFilesCopierTest {
advanceTime()
// Remove node 2
nodeInfoFilesCopier.removeConfig(node2Config)
nodeInfoFilesCopier.removeConfig(node2RootPath)
// Create another file in node 2 directory.
Files.write(node2RootPath.resolve(GOOD_NODE_INFO_NAME_2), content)
@ -119,8 +113,8 @@ class NodeInfoFilesCopierTest {
@Test
fun `clear`() {
// Configure 2 nodes.
nodeInfoFilesCopier.addConfig(node1Config)
nodeInfoFilesCopier.addConfig(node2Config)
nodeInfoFilesCopier.addConfig(node1RootPath)
nodeInfoFilesCopier.addConfig(node2RootPath)
advanceTime()
nodeInfoFilesCopier.reset()
@ -142,15 +136,4 @@ class NodeInfoFilesCopierTest {
val onlyFileName = Files.list(path).toList().first().fileName.toString()
assertEquals(filename, onlyFileName)
}
private fun createConfig(relativePath: String) =
NodeConfigWrapper(rootPath.resolve(relativePath),
NodeConfig(myLegalName = legalName,
p2pAddress = hostAndPort,
rpcAddress = hostAndPort,
webAddress = hostAndPort,
h2port = -1,
notary = null,
networkMapService = null,
rpcUsers = listOf()))
}

View File

@ -14,7 +14,6 @@ import net.corda.nodeapi.internal.ServiceType
import net.corda.testing.ALICE
import net.corda.testing.ProjectStructure.projectRootDir
import net.corda.testing.driver.ListenProcessDeathException
import net.corda.testing.driver.NetworkMapStartStrategy
import net.corda.testing.driver.driver
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
@ -59,7 +58,7 @@ class BootTests {
@Test
fun `node quits on failure to register with network map`() {
val tooManyAdvertisedServices = (1..100).map { ServiceInfo(ServiceType.notary.getSubType("$it")) }.toSet()
driver(networkMapStartStrategy = NetworkMapStartStrategy.Nominated(ALICE.name)) {
driver {
val future = startNode(providedName = ALICE.name)
assertFailsWith(ListenProcessDeathException::class) { future.getOrThrow() }
}

View File

@ -1,7 +1,6 @@
package net.corda.node
import com.google.common.base.Stopwatch
import net.corda.testing.driver.NetworkMapStartStrategy
import net.corda.testing.driver.driver
import org.junit.Ignore
import org.junit.Test
@ -14,8 +13,7 @@ class NodeStartupPerformanceTests {
// Measure the startup time of nodes. Note that this includes an RPC roundtrip, which causes e.g. Kryo initialisation.
@Test
fun `single node startup time`() {
driver(networkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false)) {
startDedicatedNetworkMapService().get()
driver {
val times = ArrayList<Long>()
for (i in 1..10) {
val time = Stopwatch.createStarted().apply {

View File

@ -8,6 +8,7 @@ import net.corda.core.internal.div
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.KeyManagementService
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.nodeapi.NodeInfoFilesCopier
import net.corda.testing.ALICE
import net.corda.testing.ALICE_KEY
import net.corda.testing.DEV_TRUST_ROOT
@ -42,7 +43,6 @@ class NodeInfoWatcherTest : NodeBasedTest() {
lateinit var nodeInfoWatcher: NodeInfoWatcher
companion object {
val nodeInfoFileRegex = Regex("nodeInfo\\-.*")
val nodeInfo = NodeInfo(listOf(), listOf(getTestPartyAndCertificate(ALICE)), 0, 0)
}
@ -56,13 +56,14 @@ class NodeInfoWatcherTest : NodeBasedTest() {
@Test
fun `save a NodeInfo`() {
assertEquals(0, folder.root.list().filter { it.matches(nodeInfoFileRegex) }.size)
assertEquals(0,
folder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.size)
NodeInfoWatcher.saveToFile(folder.root.toPath(), nodeInfo, keyManagementService)
val nodeInfoFiles = folder.root.list().filter { it.matches(nodeInfoFileRegex) }
val nodeInfoFiles = folder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }
assertEquals(1, nodeInfoFiles.size)
val fileName = nodeInfoFiles.first()
assertTrue(fileName.matches(nodeInfoFileRegex))
assertTrue(fileName.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX))
val file = (folder.root.path / fileName).toFile()
// Just check that something is written, another tests verifies that the written value can be read back.
assertThat(contentOf(file)).isNotEmpty()

View File

@ -552,8 +552,16 @@ abstract class AbstractNode(config: NodeConfiguration,
}
}
private fun setupInNodeNetworkMapService(networkMapCache: NetworkMapCacheInternal) {
inNodeNetworkMapService =
if (configuration.networkMapService == null && !configuration.noNetworkMapServiceMode)
makeNetworkMapService(network, networkMapCache)
else
NullNetworkMapService
}
private fun makeNetworkServices(network: MessagingService, networkMapCache: NetworkMapCacheInternal, tokenizableServices: MutableList<Any>) {
inNodeNetworkMapService = if (configuration.networkMapService == null) makeNetworkMapService(network, networkMapCache) else NullNetworkMapService
setupInNodeNetworkMapService(networkMapCache)
configuration.notary?.let {
val notaryService = makeCoreNotaryService(it)
tokenizableServices.add(notaryService)
@ -612,7 +620,7 @@ abstract class AbstractNode(config: NodeConfiguration,
/** This is overriden by the mock node implementation to enable operation without any network map service */
protected open fun noNetworkMapConfigured(): CordaFuture<Unit> {
if (services.networkMapCache.loadDBSuccess) {
if (services.networkMapCache.loadDBSuccess || configuration.noNetworkMapServiceMode) {
return doneFuture(Unit)
} else {
// TODO: There should be a consistent approach to configuration error exceptions.

View File

@ -20,6 +20,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
* service.
*/
val networkMapService: NetworkMapInfo?
val noNetworkMapServiceMode: Boolean
val minimumPlatformVersion: Int
val emailAddress: String
val exportJMXto: String
@ -78,6 +79,7 @@ data class FullNodeConfiguration(
override val database: Properties?,
override val certificateSigningService: URL,
override val networkMapService: NetworkMapInfo?,
override val noNetworkMapServiceMode: Boolean = false,
override val minimumPlatformVersion: Int = 1,
override val rpcUsers: List<User>,
override val verifierType: VerifierType,

View File

@ -9,6 +9,7 @@ import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.seconds
import net.corda.nodeapi.NodeInfoFilesCopier
import rx.Observable
import rx.Scheduler
import rx.schedulers.Schedulers
@ -55,7 +56,8 @@ class NodeInfoWatcher(private val nodePath: Path,
val serializedBytes = nodeInfo.serialize()
val regSig = keyManager.sign(serializedBytes.bytes, nodeInfo.legalIdentities.first().owningKey)
val signedData = SignedData(serializedBytes, regSig)
signedData.serialize().open().copyTo(path / "nodeInfo-${serializedBytes.hash}")
signedData.serialize().open().copyTo(
path / "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${serializedBytes.hash}")
} catch (e: Exception) {
logger.warn("Couldn't write node info to file", e)
}

View File

@ -228,7 +228,6 @@ fun <A> rpcDriver(
systemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false,
initialiseSerialization: Boolean = true,
networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false),
startNodesInProcess: Boolean = false,
extraCordappPackagesToScan: List<String> = emptyList(),
dsl: RPCDriverExposedDSLInterface.() -> A
@ -240,7 +239,6 @@ fun <A> rpcDriver(
systemProperties = systemProperties,
driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock,
networkMapStartStrategy = networkMapStartStrategy,
isDebug = isDebug,
startNodesInProcess = startNodesInProcess,
extraCordappPackagesToScan = extraCordappPackagesToScan

View File

@ -8,7 +8,6 @@ import com.typesafe.config.ConfigRenderOptions
import net.corda.client.rpc.CordaRPCClient
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformNode
import net.corda.cordform.NodeDefinition
import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.concurrent.firstOf
@ -20,6 +19,8 @@ import net.corda.core.internal.div
import net.corda.core.internal.times
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.toFuture
import net.corda.core.utilities.*
import net.corda.node.internal.Node
import net.corda.node.internal.NodeStartup
@ -28,6 +29,7 @@ import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.services.config.*
import net.corda.node.services.network.NetworkMapService
import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.nodeapi.NodeInfoFilesCopier
import net.corda.nodeapi.User
import net.corda.nodeapi.config.parseAs
import net.corda.nodeapi.config.toConfig
@ -37,6 +39,8 @@ import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import okhttp3.OkHttpClient
import okhttp3.Request
import org.slf4j.Logger
import rx.Observable
import rx.observables.ConnectableObservable
import java.io.File
import java.net.*
import java.nio.file.Path
@ -149,14 +153,6 @@ interface DriverDSLExposedInterface : CordformContext {
*/
fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle>
/**
* Starts a network map service node. Note that only a single one should ever be running, so you will probably want
* to set networkMapStartStrategy to Dedicated(false) in your [driver] call.
* @param startInProcess Determines if the node should be started inside this process. If null the Driver-level
* value will be used.
*/
fun startDedicatedNetworkMapService(startInProcess: Boolean? = null, maximumHeapSize: String = "200m"): CordaFuture<NodeHandle>
fun waitForAllNodesToFinish()
/**
@ -211,13 +207,15 @@ sealed class NodeHandle {
override val configuration: FullNodeConfiguration,
override val webAddress: NetworkHostAndPort,
val debugPort: Int?,
val process: Process
val process: Process,
private val onStopCallback: () -> Unit
) : NodeHandle() {
override fun stop(): CordaFuture<Unit> {
with(process) {
destroy()
waitFor()
}
onStopCallback()
return doneFuture(Unit)
}
}
@ -228,7 +226,8 @@ sealed class NodeHandle {
override val configuration: FullNodeConfiguration,
override val webAddress: NetworkHostAndPort,
val node: StartedNode<Node>,
val nodeThread: Thread
val nodeThread: Thread,
private val onStopCallback: () -> Unit
) : NodeHandle() {
override fun stop(): CordaFuture<Unit> {
node.dispose()
@ -236,6 +235,7 @@ sealed class NodeHandle {
interrupt()
join()
}
onStopCallback()
return doneFuture(Unit)
}
}
@ -316,7 +316,6 @@ data class NodeParameters(
* @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental.
* @param systemProperties A Map of extra system properties which will be given to each new node. Defaults to empty.
* @param useTestClock If true the test clock will be used in Node.
* @param networkMapStartStrategy Determines whether a network map node is started automatically.
* @param startNodesInProcess Provides the default behaviour of whether new nodes should start inside this process or
* not. Note that this may be overridden in [DriverDSLExposedInterface.startNode].
* @param dsl The dsl itself.
@ -331,7 +330,7 @@ fun <A> driver(
systemProperties: Map<String, String> = defaultParameters.systemProperties,
useTestClock: Boolean = defaultParameters.useTestClock,
initialiseSerialization: Boolean = defaultParameters.initialiseSerialization,
networkMapStartStrategy: NetworkMapStartStrategy = defaultParameters.networkMapStartStrategy,
startNodesInProcess: Boolean = defaultParameters.startNodesInProcess,
extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan,
dsl: DriverDSLExposedInterface.() -> A
@ -344,7 +343,6 @@ fun <A> driver(
driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock,
isDebug = isDebug,
networkMapStartStrategy = networkMapStartStrategy,
startNodesInProcess = startNodesInProcess,
extraCordappPackagesToScan = extraCordappPackagesToScan
),
@ -379,7 +377,6 @@ data class DriverParameters(
val systemProperties: Map<String, String> = emptyMap(),
val useTestClock: Boolean = false,
val initialiseSerialization: Boolean = true,
val networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = true),
val startNodesInProcess: Boolean = false,
val extraCordappPackagesToScan: List<String> = emptyList()
) {
@ -390,7 +387,6 @@ data class DriverParameters(
fun setSystemProperties(systemProperties: Map<String, String>) = copy(systemProperties = systemProperties)
fun setUseTestClock(useTestClock: Boolean) = copy(useTestClock = useTestClock)
fun setInitialiseSerialization(initialiseSerialization: Boolean) = copy(initialiseSerialization = initialiseSerialization)
fun setNetworkMapStartStrategy(networkMapStartStrategy: NetworkMapStartStrategy) = copy(networkMapStartStrategy = networkMapStartStrategy)
fun setStartNodesInProcess(startNodesInProcess: Boolean) = copy(startNodesInProcess = startNodesInProcess)
fun setExtraCordappPackagesToScan(extraCordappPackagesToScan: List<String>) = copy(extraCordappPackagesToScan = extraCordappPackagesToScan)
}
@ -605,16 +601,20 @@ class DriverDSL(
val driverDirectory: Path,
val useTestClock: Boolean,
val isDebug: Boolean,
val networkMapStartStrategy: NetworkMapStartStrategy,
val startNodesInProcess: Boolean,
extraCordappPackagesToScan: List<String>
) : DriverDSLInternalInterface {
private val dedicatedNetworkMapAddress = portAllocation.nextHostAndPort()
private var _executorService: ScheduledExecutorService? = null
val executorService get() = _executorService!!
private var _shutdownManager: ShutdownManager? = null
override val shutdownManager get() = _shutdownManager!!
private val cordappPackages = extraCordappPackagesToScan + getCallerPackage()
// TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/
// This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system.
// Investigate whether we can avoid that.
private val nodeInfoFilesCopier = NodeInfoFilesCopier()
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
private val countObservables = mutableMapOf<CordaX500Name, Observable<Int>>()
class State {
val processes = ArrayList<CordaFuture<Process>>()
@ -671,25 +671,6 @@ class DriverDSL(
}
}
private fun networkMapServiceConfigLookup(networkMapCandidates: List<NodeDefinition>): (CordaX500Name) -> Map<String, String>? {
return networkMapStartStrategy.run {
when (this) {
is NetworkMapStartStrategy.Dedicated -> {
serviceConfig(dedicatedNetworkMapAddress).let {
{ _: CordaX500Name -> it }
}
}
is NetworkMapStartStrategy.Nominated -> {
serviceConfig(networkMapCandidates.single {
it.name == legalName.toString()
}.config.getString("p2pAddress").let(NetworkHostAndPort.Companion::parse)).let {
{ nodeName: CordaX500Name -> if (nodeName == legalName) null else it }
}
}
}
}
}
override fun startNode(
defaultParameters: NodeParameters,
providedName: CordaX500Name?,
@ -704,10 +685,6 @@ class DriverDSL(
val webAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name
val name = providedName ?: CordaX500Name(organisation = "${oneOf(names).organisation}-${p2pAddress.port}", locality = "London", country = "GB")
val networkMapServiceConfigLookup = networkMapServiceConfigLookup(listOf(object : NodeDefinition {
override fun getName() = name.toString()
override fun getConfig() = configOf("p2pAddress" to p2pAddress.toString())
}))
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
@ -716,10 +693,10 @@ class DriverDSL(
"p2pAddress" to p2pAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"webAddress" to webAddress.toString(),
"networkMapService" to networkMapServiceConfigLookup(name),
"useTestClock" to useTestClock,
"rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers.map { it.toConfig().root().unwrapped() },
"verifierType" to verifierType.name
"verifierType" to verifierType.name,
"noNetworkMapServiceMode" to true
) + customOverrides
)
return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize)
@ -735,7 +712,6 @@ class DriverDSL(
}
override fun startNodes(nodes: List<CordformNode>, startInSameProcess: Boolean?, maximumHeapSize: String): List<CordaFuture<NodeHandle>> {
val networkMapServiceConfigLookup = networkMapServiceConfigLookup(nodes)
return nodes.map { node ->
portAllocation.nextHostAndPort() // rpcAddress
val webAddress = portAllocation.nextHostAndPort()
@ -746,8 +722,8 @@ class DriverDSL(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = node.config + notary + mapOf(
"networkMapService" to networkMapServiceConfigLookup(name),
"rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers
"rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers,
"noNetworkMapServiceMode" to true
)
)
startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize)
@ -833,9 +809,7 @@ class DriverDSL(
override fun start() {
_executorService = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build())
_shutdownManager = ShutdownManager(executorService)
if (networkMapStartStrategy.startDedicated) {
startDedicatedNetworkMapService().andForget(log) // Allow it to start concurrently with other nodes.
}
shutdownManager.registerShutdown { nodeInfoFilesCopier.close() }
}
fun baseDirectory(nodeName: CordaX500Name): Path {
@ -846,28 +820,57 @@ class DriverDSL(
override fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName))
override fun startDedicatedNetworkMapService(startInProcess: Boolean?, maximumHeapSize: String): CordaFuture<NodeHandle> {
val webAddress = portAllocation.nextHostAndPort()
val rpcAddress = portAllocation.nextHostAndPort()
val networkMapLegalName = networkMapStartStrategy.legalName
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(networkMapLegalName),
allowMissingConfig = true,
configOverrides = configOf(
"myLegalName" to networkMapLegalName.toString(),
// TODO: remove the webAddress as NMS doesn't need to run a web server. This will cause all
// node port numbers to be shifted, so all demos and docs need to be updated accordingly.
"webAddress" to webAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"rpcUsers" to defaultRpcUserList,
"p2pAddress" to dedicatedNetworkMapAddress.toString(),
"useTestClock" to useTestClock)
)
return startNodeInternal(config, webAddress, startInProcess, maximumHeapSize)
/**
* @param initial number of nodes currently in the network map of a running node.
* @param networkMapCacheChangeObservable an observable returning the updates to the node network map.
* @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes
* the initial value emitted is always [initial]
*/
private fun nodeCountObservable(initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
ConnectableObservable<Int> {
val count = AtomicInteger(initial)
return networkMapCacheChangeObservable.map { it ->
when (it) {
is NetworkMapCache.MapChange.Added -> count.incrementAndGet()
is NetworkMapCache.MapChange.Removed -> count.decrementAndGet()
is NetworkMapCache.MapChange.Modified -> count.get()
}
}.startWith(initial).replay()
}
/**
* @param rpc the [CordaRPCOps] of a newly started node.
* @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes
* equal to the number of running nodes. The future will yield the number of connected nodes.
*/
private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture<Int> {
val (snapshot, updates) = rpc.networkMapFeed()
val counterObservable = nodeCountObservable(snapshot.size, updates)
countObservables.put(rpc.nodeInfo().legalIdentities.first().name, counterObservable)
/* TODO: this might not always be the exact number of nodes one has to wait for,
* for example in the following sequence
* 1 start 3 nodes in order, A, B, C.
* 2 before the future returned by this function resolves, kill B
* At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes.
*/
val requiredNodes = countObservables.size
// This is an observable which yield the minimum number of nodes in each node network map.
val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args : Array<Any> ->
args.map { it as Int }.min() ?: 0
}
val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture()
counterObservable.connect()
return future
}
private fun startNodeInternal(config: Config, webAddress: NetworkHostAndPort, startInProcess: Boolean?, maximumHeapSize: String): CordaFuture<NodeHandle> {
val nodeConfiguration = config.parseAs<FullNodeConfiguration>()
nodeInfoFilesCopier.addConfig(nodeConfiguration.baseDirectory)
val onNodeExit: () -> Unit = {
nodeInfoFilesCopier.removeConfig(nodeConfiguration.baseDirectory)
countObservables.remove(nodeConfiguration.myLegalName)
}
if (startInProcess ?: startNodesInProcess) {
val nodeAndThreadFuture = startInProcessNode(executorService, nodeConfiguration, config, cordappPackages)
shutdownManager.registerShutdown(
@ -880,8 +883,8 @@ class DriverDSL(
)
return nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(nodeConfiguration, openFuture()).flatMap { rpc ->
rpc.waitUntilNetworkReady().map {
NodeHandle.InProcess(rpc.nodeInfo(), rpc, nodeConfiguration, webAddress, node, thread)
allNodesConnected(rpc).map {
NodeHandle.InProcess(rpc.nodeInfo(), rpc, nodeConfiguration, webAddress, node, thread, onNodeExit)
}
}
}
@ -896,7 +899,7 @@ class DriverDSL(
establishRpc(nodeConfiguration, processDeathFuture).flatMap { rpc ->
// Call waitUntilNetworkReady in background in case RPC is failing over:
val forked = executorService.fork {
rpc.waitUntilNetworkReady()
allNodesConnected(rpc)
}
val networkMapFuture = forked.flatMap { it }
firstOf(processDeathFuture, networkMapFuture) {
@ -905,7 +908,8 @@ class DriverDSL(
}
processDeathFuture.cancel(false)
log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: ${webAddress}")
NodeHandle.OutOfProcess(rpc.nodeInfo(), rpc, nodeConfiguration, webAddress, debugPort, process)
NodeHandle.OutOfProcess(rpc.nodeInfo(), rpc, nodeConfiguration, webAddress, debugPort, process,
onNodeExit)
}
}
}

View File

@ -1,23 +0,0 @@
package net.corda.testing.driver
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.testing.DUMMY_MAP
sealed class NetworkMapStartStrategy {
internal abstract val startDedicated: Boolean
internal abstract val legalName: CordaX500Name
internal fun serviceConfig(address: NetworkHostAndPort) = mapOf(
"address" to address.toString(),
"legalName" to legalName.toString()
)
class Dedicated(startAutomatically: Boolean) : NetworkMapStartStrategy() {
override val startDedicated = startAutomatically
override val legalName = DUMMY_MAP.name
}
class Nominated(override val legalName: CordaX500Name) : NetworkMapStartStrategy() {
override val startDedicated = false
}
}

View File

@ -0,0 +1,35 @@
package net.corda.demobench.model
import net.corda.nodeapi.NodeInfoFilesCopier
import rx.Scheduler
import rx.schedulers.Schedulers
import tornadofx.*
/**
* Utility class which copies nodeInfo files across a set of running nodes.
*
* This class will create paths that it needs to poll and to where it needs to copy files in case those
* don't exist yet.
*/
class DemoBenchNodeInfoFilesCopier(scheduler: Scheduler = Schedulers.io()): Controller() {
private val nodeInfoFilesCopier = NodeInfoFilesCopier(scheduler)
/**
* @param nodeConfig the configuration to be added.
* Add a [NodeConfig] for a node which is about to be started.
* Its nodeInfo file will be copied to other nodes' additional-node-infos directory, and conversely,
* other nodes' nodeInfo files will be copied to this node additional-node-infos directory.
*/
fun addConfig(nodeConfig: NodeConfigWrapper) : Unit = nodeInfoFilesCopier.addConfig(nodeConfig.nodeDir)
/**
* @param nodeConfig the configuration to be removed.
* Remove the configuration of a node which is about to be stopped or already stopped.
* No files written by that node will be copied to other nodes, nor files from other nodes will be copied to this
* one.
*/
fun removeConfig(nodeConfig: NodeConfigWrapper) : Unit = nodeInfoFilesCopier.removeConfig(nodeConfig.nodeDir)
fun reset() : Unit = nodeInfoFilesCopier.reset()
}

View File

@ -28,7 +28,7 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() {
private val jvm by inject<JVMConfig>()
private val cordappController by inject<CordappController>()
private val nodeInfoFilesCopier by inject<NodeInfoFilesCopier>()
private val nodeInfoFilesCopier by inject<DemoBenchNodeInfoFilesCopier>()
private var baseDir: Path = baseDirFor(ManagementFactory.getRuntimeMXBean().startTime)
private val cordaPath: Path = jvm.applicationDir.resolve("corda").resolve("corda.jar")

View File

@ -1,133 +0,0 @@
package net.corda.demobench.model
import net.corda.cordform.CordformNode
import net.corda.core.internal.createDirectories
import net.corda.core.internal.isRegularFile
import net.corda.core.internal.list
import rx.Observable
import rx.Scheduler
import rx.schedulers.Schedulers
import tornadofx.*
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption.COPY_ATTRIBUTES
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.attribute.FileTime
import java.util.concurrent.TimeUnit
import java.util.logging.Level
/**
* Utility class which copies nodeInfo files across a set of running nodes.
*
* This class will create paths that it needs to poll and to where it needs to copy files in case those
* don't exist yet.
*/
class NodeInfoFilesCopier(scheduler: Scheduler = Schedulers.io()): Controller() {
private val nodeDataMap = mutableMapOf<Path, NodeData>()
init {
Observable.interval(5, TimeUnit.SECONDS, scheduler)
.subscribe { poll() }
}
/**
* @param nodeConfig the configuration to be added.
* Add a [NodeConfig] for a node which is about to be started.
* Its nodeInfo file will be copied to other nodes' additional-node-infos directory, and conversely,
* other nodes' nodeInfo files will be copied to this node additional-node-infos directory.
*/
@Synchronized
fun addConfig(nodeConfig: NodeConfigWrapper) {
val newNodeFile = NodeData(nodeConfig.nodeDir)
nodeDataMap[nodeConfig.nodeDir] = newNodeFile
for (previouslySeenFile in allPreviouslySeenFiles()) {
copy(previouslySeenFile, newNodeFile.destination.resolve(previouslySeenFile.fileName))
}
log.info("Now watching: ${nodeConfig.nodeDir}")
}
/**
* @param nodeConfig the configuration to be removed.
* Remove the configuration of a node which is about to be stopped or already stopped.
* No files written by that node will be copied to other nodes, nor files from other nodes will be copied to this
* one.
*/
@Synchronized
fun removeConfig(nodeConfig: NodeConfigWrapper) {
nodeDataMap.remove(nodeConfig.nodeDir) ?: return
log.info("Stopped watching: ${nodeConfig.nodeDir}")
}
@Synchronized
fun reset() {
nodeDataMap.clear()
}
private fun allPreviouslySeenFiles() = nodeDataMap.values.map { it.previouslySeenFiles.keys }.flatten()
@Synchronized
private fun poll() {
for (nodeData in nodeDataMap.values) {
nodeData.nodeDir.list { paths ->
paths.filter { it.isRegularFile() }
.filter { it.fileName.toString().startsWith("nodeInfo-") }
.forEach { path -> processPath(nodeData, path) }
}
}
}
// Takes a path under nodeData config dir and decides whether the file represented by that path needs to
// be copied.
private fun processPath(nodeData: NodeData, path: Path) {
val newTimestamp = Files.readAttributes(path, BasicFileAttributes::class.java).lastModifiedTime()
val previousTimestamp = nodeData.previouslySeenFiles.put(path, newTimestamp) ?: FileTime.fromMillis(-1)
if (newTimestamp > previousTimestamp) {
for (destination in nodeDataMap.values.filter { it.nodeDir != nodeData.nodeDir }.map { it.destination }) {
val fullDestinationPath = destination.resolve(path.fileName)
copy(path, fullDestinationPath)
}
}
}
private fun copy(source: Path, destination: Path) {
val tempDestination = try {
Files.createTempFile(destination.parent, ".", null)
} catch (exception: IOException) {
log.log(Level.WARNING, "Couldn't create a temporary file to copy $source", exception)
throw exception
}
try {
// First copy the file to a temporary file within the appropriate directory.
Files.copy(source, tempDestination, COPY_ATTRIBUTES, REPLACE_EXISTING)
} catch (exception: IOException) {
log.log(Level.WARNING, "Couldn't copy $source to $tempDestination.", exception)
Files.delete(tempDestination)
throw exception
}
try {
// Then rename it to the desired name. This way the file 'appears' on the filesystem as an atomic operation.
Files.move(tempDestination, destination, REPLACE_EXISTING)
} catch (exception: IOException) {
log.log(Level.WARNING, "Couldn't move $tempDestination to $destination.", exception)
Files.delete(tempDestination)
throw exception
}
}
/**
* Convenience holder for all the paths and files relative to a single node.
*/
private class NodeData(val nodeDir: Path) {
val destination: Path = nodeDir.resolve(CordformNode.NODE_INFO_DIRECTORY)
// Map from Path to its lastModifiedTime.
val previouslySeenFiles = mutableMapOf<Path, FileTime>()
init {
destination.createDirectories()
}
}
}

View File

@ -74,7 +74,6 @@ fun <A> verifierDriver(
debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005),
systemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false,
networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false),
startNodesInProcess: Boolean = false,
extraCordappPackagesToScan: List<String> = emptyList(),
dsl: VerifierExposedDSLInterface.() -> A
@ -86,7 +85,6 @@ fun <A> verifierDriver(
systemProperties = systemProperties,
driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock,
networkMapStartStrategy = networkMapStartStrategy,
isDebug = isDebug,
startNodesInProcess = startNodesInProcess,
extraCordappPackagesToScan = extraCordappPackagesToScan

View File

@ -13,9 +13,9 @@ import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.node.services.config.VerifierType
import net.corda.testing.ALICE
import net.corda.testing.ALICE_NAME
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.*
import net.corda.testing.driver.NetworkMapStartStrategy
import net.corda.testing.DUMMY_NOTARY_SERVICE_NAME
import org.junit.Test
import java.util.*
import java.util.concurrent.atomic.AtomicInteger
@ -129,10 +129,7 @@ class VerifierTests {
@Test
fun `single verifier works with a node`() {
verifierDriver(
networkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = true),
extraCordappPackagesToScan = listOf("net.corda.finance.contracts")
) {
verifierDriver(extraCordappPackagesToScan = listOf("net.corda.finance.contracts")) {
val aliceFuture = startNode(providedName = ALICE.name)
val notaryFuture = startNotaryNode(DUMMY_NOTARY.name, verifierType = VerifierType.OutOfProcess)
val aliceNode = aliceFuture.get()