[CORDA-442] DemoBench copies around the NodeInfos for running nodes. (#1796)

[CORDA-442] DemoBench copies around the NodeInfos for running nodes.
This commit is contained in:
Alberto Arri 2017-10-16 09:42:13 +01:00 committed by GitHub
parent 99b509cb68
commit d3d87c2497
4 changed files with 298 additions and 0 deletions

View File

@ -46,6 +46,8 @@ dependencies {
// Controls FX: more java FX components http://fxexperience.com/controlsfx/
compile "org.controlsfx:controlsfx:$controlsfx_version"
compile "net.corda.plugins:cordform-common:$gradle_plugins_version"
compile project(':client:rpc')
compile project(':finance')

View File

@ -28,6 +28,7 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() {
private val jvm by inject<JVMConfig>()
private val cordappController by inject<CordappController>()
private val nodeInfoFilesCopier by inject<NodeInfoFilesCopier>()
private var baseDir: Path = baseDirFor(ManagementFactory.getRuntimeMXBean().startTime)
private val cordaPath: Path = jvm.applicationDir.resolve("corda").resolve("corda.jar")
@ -86,12 +87,16 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() {
log.info("Network map provided by: ${nodeConfig.myLegalName}")
}
nodeInfoFilesCopier.addConfig(wrapper)
return wrapper
}
fun dispose(config: NodeConfigWrapper) {
config.state = NodeState.DEAD
nodeInfoFilesCopier.removeConfig(config)
if (config.nodeConfig.isNetworkMap) {
log.warning("Network map service (Node '${config.nodeConfig.myLegalName}') has exited.")
}
@ -138,6 +143,7 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() {
// Wipe out any knowledge of previous nodes.
networkMapConfig = null
nodes.clear()
nodeInfoFilesCopier.reset()
}
/**
@ -147,6 +153,7 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() {
if (nodes.putIfAbsent(config.key, config) != null) {
return false
}
nodeInfoFilesCopier.addConfig(config)
updatePort(config.nodeConfig)

View File

@ -0,0 +1,133 @@
package net.corda.demobench.model
import net.corda.cordform.CordformNode
import net.corda.core.internal.createDirectories
import net.corda.core.internal.isRegularFile
import net.corda.core.internal.list
import rx.Observable
import rx.Scheduler
import rx.schedulers.Schedulers
import tornadofx.*
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption.COPY_ATTRIBUTES
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.attribute.FileTime
import java.util.concurrent.TimeUnit
import java.util.logging.Level
/**
* Utility class which copies nodeInfo files across a set of running nodes.
*
* This class will create paths that it needs to poll and to where it needs to copy files in case those
* don't exist yet.
*/
class NodeInfoFilesCopier(scheduler: Scheduler = Schedulers.io()): Controller() {
private val nodeDataMap = mutableMapOf<Path, NodeData>()
init {
Observable.interval(5, TimeUnit.SECONDS, scheduler)
.subscribe { poll() }
}
/**
* @param nodeConfig the configuration to be added.
* Add a [NodeConfig] for a node which is about to be started.
* Its nodeInfo file will be copied to other nodes' additional-node-infos directory, and conversely,
* other nodes' nodeInfo files will be copied to this node additional-node-infos directory.
*/
@Synchronized
fun addConfig(nodeConfig: NodeConfigWrapper) {
val newNodeFile = NodeData(nodeConfig.nodeDir)
nodeDataMap[nodeConfig.nodeDir] = newNodeFile
for (previouslySeenFile in allPreviouslySeenFiles()) {
copy(previouslySeenFile, newNodeFile.destination.resolve(previouslySeenFile.fileName))
}
log.info("Now watching: ${nodeConfig.nodeDir}")
}
/**
* @param nodeConfig the configuration to be removed.
* Remove the configuration of a node which is about to be stopped or already stopped.
* No files written by that node will be copied to other nodes, nor files from other nodes will be copied to this
* one.
*/
@Synchronized
fun removeConfig(nodeConfig: NodeConfigWrapper) {
nodeDataMap.remove(nodeConfig.nodeDir) ?: return
log.info("Stopped watching: ${nodeConfig.nodeDir}")
}
@Synchronized
fun reset() {
nodeDataMap.clear()
}
private fun allPreviouslySeenFiles() = nodeDataMap.values.map { it.previouslySeenFiles.keys }.flatten()
@Synchronized
private fun poll() {
for (nodeData in nodeDataMap.values) {
nodeData.nodeDir.list { paths ->
paths.filter { it.isRegularFile() }
.filter { it.fileName.toString().startsWith("nodeInfo-") }
.forEach { path -> processPath(nodeData, path) }
}
}
}
// Takes a path under nodeData config dir and decides whether the file represented by that path needs to
// be copied.
private fun processPath(nodeData: NodeData, path: Path) {
val newTimestamp = Files.readAttributes(path, BasicFileAttributes::class.java).lastModifiedTime()
val previousTimestamp = nodeData.previouslySeenFiles.put(path, newTimestamp) ?: FileTime.fromMillis(-1)
if (newTimestamp > previousTimestamp) {
for (destination in nodeDataMap.values.filter { it.nodeDir != nodeData.nodeDir }.map { it.destination }) {
val fullDestinationPath = destination.resolve(path.fileName)
copy(path, fullDestinationPath)
}
}
}
private fun copy(source: Path, destination: Path) {
val tempDestination = try {
Files.createTempFile(destination.parent, ".", null)
} catch (exception: IOException) {
log.log(Level.WARNING, "Couldn't create a temporary file to copy $source", exception)
throw exception
}
try {
// First copy the file to a temporary file within the appropriate directory.
Files.copy(source, tempDestination, COPY_ATTRIBUTES, REPLACE_EXISTING)
} catch (exception: IOException) {
log.log(Level.WARNING, "Couldn't copy $source to $tempDestination.", exception)
Files.delete(tempDestination)
throw exception
}
try {
// Then rename it to the desired name. This way the file 'appears' on the filesystem as an atomic operation.
Files.move(tempDestination, destination, REPLACE_EXISTING)
} catch (exception: IOException) {
log.log(Level.WARNING, "Couldn't move $tempDestination to $destination.", exception)
Files.delete(tempDestination)
throw exception
}
}
/**
* Convenience holder for all the paths and files relative to a single node.
*/
private class NodeData(val nodeDir: Path) {
val destination: Path = nodeDir.resolve(CordformNode.NODE_INFO_DIRECTORY)
// Map from Path to its lastModifiedTime.
val previouslySeenFiles = mutableMapOf<Path, FileTime>()
init {
destination.createDirectories()
}
}
}

View File

@ -0,0 +1,156 @@
package net.corda.demobench.model
import net.corda.cordform.CordformNode
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.testing.eventually
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import rx.schedulers.TestScheduler
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration
import java.util.concurrent.TimeUnit
import kotlin.streams.toList
import kotlin.test.assertEquals
/**
* tests for [NodeInfoFilesCopier]
*/
class NodeInfoFilesCopierTest {
@Rule @JvmField var folder = TemporaryFolder()
private val rootPath get() = folder.root.toPath()
private val scheduler = TestScheduler()
companion object {
private const val ORGANIZATION = "Organization"
private const val NODE_1_PATH = "node1"
private const val NODE_2_PATH = "node2"
private val content = "blah".toByteArray(Charsets.UTF_8)
private val GOOD_NODE_INFO_NAME = "nodeInfo-test"
private val GOOD_NODE_INFO_NAME_2 = "nodeInfo-anotherNode"
private val BAD_NODE_INFO_NAME = "something"
private val legalName = CordaX500Name(organisation = ORGANIZATION, locality = "Nowhere", country = "GB")
private val hostAndPort = NetworkHostAndPort("localhost", 1)
}
private fun nodeDir(nodeBaseDir : String) = rootPath.resolve(nodeBaseDir).resolve(ORGANIZATION.toLowerCase())
private val node1Config by lazy { createConfig(NODE_1_PATH) }
private val node2Config by lazy { createConfig(NODE_2_PATH) }
private val node1RootPath by lazy { nodeDir(NODE_1_PATH) }
private val node2RootPath by lazy { nodeDir(NODE_2_PATH) }
private val node1AdditionalNodeInfoPath by lazy { node1RootPath.resolve(CordformNode.NODE_INFO_DIRECTORY) }
private val node2AdditionalNodeInfoPath by lazy { node2RootPath.resolve(CordformNode.NODE_INFO_DIRECTORY) }
lateinit var nodeInfoFilesCopier: NodeInfoFilesCopier
@Before
fun setUp() {
nodeInfoFilesCopier = NodeInfoFilesCopier(scheduler)
}
@Test
fun `files created before a node is started are copied to that node`() {
// Configure the first node.
nodeInfoFilesCopier.addConfig(node1Config)
// Ensure directories are created.
advanceTime()
// Create 2 files, a nodeInfo and another file in node1 folder.
Files.write(node1RootPath.resolve(GOOD_NODE_INFO_NAME), content)
Files.write(node1RootPath.resolve(BAD_NODE_INFO_NAME), content)
// Configure the second node.
nodeInfoFilesCopier.addConfig(node2Config)
advanceTime()
eventually<AssertionError, Unit>(Duration.ofMinutes(1)) {
// Check only one file is copied.
checkDirectoryContainsSingleFile(node2AdditionalNodeInfoPath, GOOD_NODE_INFO_NAME)
}
}
@Test
fun `polling of running nodes`() {
// Configure 2 nodes.
nodeInfoFilesCopier.addConfig(node1Config)
nodeInfoFilesCopier.addConfig(node2Config)
advanceTime()
// Create 2 files, one of which to be copied, in a node root path.
Files.write(node2RootPath.resolve(GOOD_NODE_INFO_NAME), content)
Files.write(node2RootPath.resolve(BAD_NODE_INFO_NAME), content)
advanceTime()
eventually<AssertionError, Unit>(Duration.ofMinutes(1)) {
// Check only one file is copied to the other node.
checkDirectoryContainsSingleFile(node1AdditionalNodeInfoPath, GOOD_NODE_INFO_NAME)
}
}
@Test
fun `remove nodes`() {
// Configure 2 nodes.
nodeInfoFilesCopier.addConfig(node1Config)
nodeInfoFilesCopier.addConfig(node2Config)
advanceTime()
// Create a file, in node 2 root path.
Files.write(node2RootPath.resolve(GOOD_NODE_INFO_NAME), content)
advanceTime()
// Remove node 2
nodeInfoFilesCopier.removeConfig(node2Config)
// Create another file in node 2 directory.
Files.write(node2RootPath.resolve(GOOD_NODE_INFO_NAME_2), content)
advanceTime()
eventually<AssertionError, Unit>(Duration.ofMinutes(1)) {
// Check only one file is copied to the other node.
checkDirectoryContainsSingleFile(node1AdditionalNodeInfoPath, GOOD_NODE_INFO_NAME)
}
}
@Test
fun `clear`() {
// Configure 2 nodes.
nodeInfoFilesCopier.addConfig(node1Config)
nodeInfoFilesCopier.addConfig(node2Config)
advanceTime()
nodeInfoFilesCopier.reset()
advanceTime()
Files.write(node2RootPath.resolve(GOOD_NODE_INFO_NAME_2), content)
// Give some time to the filesystem to report the change.
Thread.sleep(100)
assertEquals(0, Files.list(node1AdditionalNodeInfoPath).toList().size)
}
private fun advanceTime() {
scheduler.advanceTimeBy(1, TimeUnit.HOURS)
}
private fun checkDirectoryContainsSingleFile(path: Path, filename: String) {
assertEquals(1, Files.list(path).toList().size)
val onlyFileName = Files.list(path).toList().first().fileName.toString()
assertEquals(filename, onlyFileName)
}
private fun createConfig(relativePath: String) =
NodeConfigWrapper(rootPath.resolve(relativePath),
NodeConfig(myLegalName = legalName,
p2pAddress = hostAndPort,
rpcAddress = hostAndPort,
webAddress = hostAndPort,
h2port = -1,
notary = null,
networkMapService = null,
rpcUsers = listOf()))
}