Add filesystem polling to nodes (#1623)

Add the logic in node to poll for new serialized nodes to appear on disk.

Newly discovered nodes are automatically added to the PersistentNetworkMapCache
This commit is contained in:
Alberto Arri 2017-10-05 14:11:10 +01:00 committed by GitHub
parent 257756d862
commit cf83328d5d
7 changed files with 271 additions and 159 deletions

View File

@ -11,7 +11,8 @@ UNRELEASED
* Cordform may not specify a value for ``NetworkMap``, when that happens, during the task execution the following happens:
1. Each node is started and its signed serialized NodeInfo is written to disk in the node base directory.
2. Every serialized ``NodeInfo`` above is copied in every other node "additional-node-info" folder under the NodeInfo folder.
* Nodes read all the nodes stored in ``additional-node-info`` when the ``NetworkMapService`` starts up.
* Nodes read and poll the filesystem for serialized ``NodeInfo`` in the ``additional-node-info`` directory.
* ``Cordapp`` now has a name field for identifying CorDapps and all CorDapp names are printed to console at startup.

View File

@ -170,7 +170,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
private fun saveOwnNodeInfo() {
NodeInfoSerializer().saveToFile(configuration.baseDirectory, info, services.keyManagementService)
NodeInfoWatcher.saveToFile(configuration.baseDirectory, info, services.keyManagementService)
}
private fun initCertificate() {
@ -409,7 +409,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
services.transactionVerifierService, services.validatedTransactions, services.contractUpgradeService,
services, cordappProvider, this)
makeNetworkServices(tokenizableServices)
return tokenizableServices
}

View File

@ -1,85 +0,0 @@
package net.corda.node.services.network
import net.corda.cordform.CordformNode
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.isDirectory
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.ByteSequence
import net.corda.core.utilities.loggerFor
import java.io.File
import java.nio.file.Files
import java.nio.file.Path
/**
* Class containing the logic to serialize and de-serialize a [NodeInfo] to disk and reading it back.
*/
class NodeInfoSerializer {
companion object {
val logger = loggerFor<NodeInfoSerializer>()
}
/**
* Saves the given [NodeInfo] to a path.
* The node is 'encoded' as a SignedData<NodeInfo>, signed with the owning key of its first identity.
* The name of the written file will be "nodeInfo-" followed by the hash of the content. The hash in the filename
* is used so that one can freely copy these files without fearing to overwrite another one.
*
* @param path the path where to write the file, if non-existent it will be created.
* @param nodeInfo the NodeInfo to serialize.
* @param keyManager a KeyManagementService used to sign the NodeInfo data.
*/
fun saveToFile(path: Path, nodeInfo: NodeInfo, keyManager: KeyManagementService) {
try {
path.createDirectories()
val serializedBytes = nodeInfo.serialize()
val regSig = keyManager.sign(serializedBytes.bytes, nodeInfo.legalIdentities.first().owningKey)
val signedData = SignedData(serializedBytes, regSig)
val file = (path / ("nodeInfo-" + SecureHash.sha256(serializedBytes.bytes).toString())).toFile()
file.writeBytes(signedData.serialize().bytes)
} catch (e : Exception) {
logger.warn("Couldn't write node info to file: $e")
}
}
/**
* Loads all the files contained in a given path and returns the deserialized [NodeInfo]s.
* Signatures are checked before returning a value.
*
* @param nodePath the node base path. NodeInfo files are searched for in nodePath/[NODE_INFO_FOLDER]
* @return a list of [NodeInfo]s
*/
fun loadFromDirectory(nodePath: Path): List<NodeInfo> {
val result = mutableListOf<NodeInfo>()
val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
if (!nodeInfoDirectory.isDirectory()) {
logger.info("$nodeInfoDirectory isn't a Directory, not loading NodeInfo from files")
return result
}
for (path in Files.list(nodeInfoDirectory)) {
val file = path.toFile()
if (file.isFile) {
try {
logger.info("Reading NodeInfo from file: $file")
val nodeInfo = loadFromFile(file)
result.add(nodeInfo)
} catch (e: Exception) {
logger.error("Exception parsing NodeInfo from file. $file" , e)
}
}
}
logger.info("Succesfully read ${result.size} NodeInfo files.")
return result
}
private fun loadFromFile(file: File): NodeInfo {
val signedData = ByteSequence.of(file.readBytes()).deserialize<SignedData<NodeInfo>>()
return signedData.verified()
}
}

View File

@ -0,0 +1,150 @@
package net.corda.node.services.network
import net.corda.cordform.CordformNode
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData
import net.corda.core.internal.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import rx.Observable
import rx.Scheduler
import rx.schedulers.Schedulers
import java.nio.file.Path
import java.nio.file.StandardWatchEventKinds
import java.nio.file.WatchEvent
import java.nio.file.WatchKey
import java.nio.file.WatchService
import java.util.concurrent.TimeUnit
import kotlin.streams.toList
/**
* Class containing the logic to
* - Serialize and de-serialize a [NodeInfo] to disk and reading it back.
* - Poll a directory for new serialized [NodeInfo]
*
* @param path the base path of a node.
* @param scheduler a [Scheduler] for the rx [Observable] returned by [nodeInfoUpdates], this is mainly useful for
* testing. It defaults to the io scheduler which is the appropriate value for production uses.
*/
class NodeInfoWatcher(private val nodePath: Path,
private val scheduler: Scheduler = Schedulers.io()) {
private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
private val watchService : WatchService? by lazy { initWatch() }
companion object {
private val logger = loggerFor<NodeInfoWatcher>()
/**
* Saves the given [NodeInfo] to a path.
* The node is 'encoded' as a SignedData<NodeInfo>, signed with the owning key of its first identity.
* The name of the written file will be "nodeInfo-" followed by the hash of the content. The hash in the filename
* is used so that one can freely copy these files without fearing to overwrite another one.
*
* @param path the path where to write the file, if non-existent it will be created.
* @param nodeInfo the NodeInfo to serialize.
* @param keyManager a KeyManagementService used to sign the NodeInfo data.
*/
fun saveToFile(path: Path, nodeInfo: NodeInfo, keyManager: KeyManagementService) {
try {
path.createDirectories()
val serializedBytes = nodeInfo.serialize()
val regSig = keyManager.sign(serializedBytes.bytes,
nodeInfo.legalIdentities.first().owningKey)
val signedData = SignedData(serializedBytes, regSig)
val file = (path / ("nodeInfo-" + SecureHash.sha256(serializedBytes.bytes).toString())).toFile()
file.writeBytes(signedData.serialize().bytes)
} catch (e: Exception) {
logger.warn("Couldn't write node info to file", e)
}
}
}
/**
* Read all the files contained in [nodePath] / [CordformNode.NODE_INFO_DIRECTORY] and keep watching
* the folder for further updates.
*
* @return an [Observable] returning [NodeInfo]s, there is no guarantee that the same value isn't returned more
* than once.
*/
fun nodeInfoUpdates(): Observable<NodeInfo> {
val pollForFiles = Observable.interval(5, TimeUnit.SECONDS, scheduler)
.flatMapIterable { pollWatch() }
val readCurrentFiles = Observable.from(loadFromDirectory())
return readCurrentFiles.mergeWith(pollForFiles)
}
/**
* Loads all the files contained in a given path and returns the deserialized [NodeInfo]s.
* Signatures are checked before returning a value.
*
* @return a list of [NodeInfo]s
*/
private fun loadFromDirectory(): List<NodeInfo> {
val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
if (!nodeInfoDirectory.isDirectory()) {
logger.info("$nodeInfoDirectory isn't a Directory, not loading NodeInfo from files")
return emptyList()
}
val result = nodeInfoDirectory.list { paths ->
paths.filter { it.isRegularFile() }
.map { processFile(it) }
.toList()
.filterNotNull()
}
logger.info("Successfully read ${result.size} NodeInfo files.")
return result
}
// Polls the watchService for changes to nodeInfoDirectory, return all the newly read NodeInfos.
private fun pollWatch(): List<NodeInfo> {
if (watchService == null) {
return emptyList()
}
val watchKey: WatchKey = watchService?.poll() ?: return emptyList()
val files = mutableSetOf<Path>()
for (event in watchKey.pollEvents()) {
val kind = event.kind()
if (kind == StandardWatchEventKinds.OVERFLOW) continue
val ev: WatchEvent<Path> = uncheckedCast(event)
val filename = ev.context()
val absolutePath = nodeInfoDirectory.resolve(filename)
if (absolutePath.isRegularFile()) {
files.add(absolutePath)
}
}
val valid = watchKey.reset()
if (!valid) {
logger.warn("Can't poll $nodeInfoDirectory anymore, it was probably deleted.")
}
return files.mapNotNull { processFile(it) }
}
private fun processFile(file: Path) : NodeInfo? {
try {
logger.info("Reading NodeInfo from file: $file")
val signedData = file.readAll().deserialize<SignedData<NodeInfo>>()
return signedData.verified()
} catch (e: Exception) {
logger.warn("Exception parsing NodeInfo from file. $file", e)
return null
}
}
// Create a WatchService watching for changes in nodeInfoDirectory.
private fun initWatch() : WatchService? {
if (!nodeInfoDirectory.isDirectory()) {
logger.warn("Not watching folder $nodeInfoDirectory it doesn't exist or it's not a directory")
return null
}
val watchService = nodeInfoDirectory.fileSystem.newWatchService()
nodeInfoDirectory.register(watchService, StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY)
logger.info("Watching $nodeInfoDirectory for new files")
return watchService
}
}

View File

@ -1,11 +1,11 @@
package net.corda.node.services.network
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
@ -40,6 +40,7 @@ import java.security.PublicKey
import java.security.SignatureException
import java.util.*
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.HashMap
/**
* Extremely simple in-memory cache of the network map.
@ -87,6 +88,8 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal)
.sortedBy { it.name.toString() }
}
private val nodeInfoSerializer = NodeInfoWatcher(serviceHub.configuration.baseDirectory)
init {
loadFromFiles()
serviceHub.database.transaction { loadFromDB() }
@ -94,9 +97,7 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal)
private fun loadFromFiles() {
logger.info("Loading network map from files..")
for (node in NodeInfoSerializer().loadFromDirectory(serviceHub.configuration.baseDirectory)) {
addNode(node)
}
nodeInfoSerializer.nodeInfoUpdates().subscribe { node -> addNode(node) }
}
override fun getPartyInfo(party: Party): PartyInfo? {

View File

@ -1,67 +0,0 @@
package net.corda.node.services.network
import net.corda.cordform.CordformNode
import net.corda.core.internal.div
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.KeyManagementService
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.testing.*
import net.corda.testing.node.MockKeyManagementService
import net.corda.testing.node.NodeBasedTest
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.nio.charset.Charset
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.contentOf
class NodeInfoSerializerTest : NodeBasedTest() {
@Rule @JvmField var folder = TemporaryFolder()
lateinit var keyManagementService: KeyManagementService
// Object under test
val nodeInfoSerializer = NodeInfoSerializer()
companion object {
val nodeInfoFileRegex = Regex("nodeInfo\\-.*")
val nodeInfo = NodeInfo(listOf(), listOf(getTestPartyAndCertificate(ALICE)), 0, 0)
}
@Before
fun start() {
val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT)
keyManagementService = MockKeyManagementService(identityService, ALICE_KEY)
}
@Test
fun `save a NodeInfo`() {
nodeInfoSerializer.saveToFile(folder.root.toPath(), nodeInfo, keyManagementService)
assertEquals(1, folder.root.list().size)
val fileName = folder.root.list()[0]
assertTrue(fileName.matches(nodeInfoFileRegex))
val file = (folder.root.path / fileName).toFile()
// Just check that something is written, another tests verifies that the written value can be read back.
assertThat(contentOf(file)).isNotEmpty()
}
@Test
fun `load an empty Directory`() {
assertEquals(0, nodeInfoSerializer.loadFromDirectory(folder.root.toPath()).size)
}
@Test
fun `load a non empty Directory`() {
val nodeInfoFolder = folder.newFolder(CordformNode.NODE_INFO_DIRECTORY)
nodeInfoSerializer.saveToFile(nodeInfoFolder.toPath(), nodeInfo, keyManagementService)
val nodeInfos = nodeInfoSerializer.loadFromDirectory(folder.root.toPath())
assertEquals(1, nodeInfos.size)
assertEquals(nodeInfo, nodeInfos.first())
}
}

View File

@ -0,0 +1,113 @@
package net.corda.node.services.network
import net.corda.cordform.CordformNode
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.KeyManagementService
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.testing.*
import net.corda.testing.node.MockKeyManagementService
import net.corda.testing.node.NodeBasedTest
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import rx.observers.TestSubscriber
import rx.schedulers.TestScheduler
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.contentOf
import java.nio.file.Path
class NodeInfoWatcherTest : NodeBasedTest() {
@Rule @JvmField var folder = TemporaryFolder()
lateinit var keyManagementService: KeyManagementService
lateinit var nodeInfoPath: Path
val scheduler = TestScheduler();
val testSubscriber = TestSubscriber<NodeInfo>()
// Object under test
lateinit var nodeInfoWatcher: NodeInfoWatcher
companion object {
val nodeInfoFileRegex = Regex("nodeInfo\\-.*")
val nodeInfo = NodeInfo(listOf(), listOf(getTestPartyAndCertificate(ALICE)), 0, 0)
}
@Before
fun start() {
val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT)
keyManagementService = MockKeyManagementService(identityService, ALICE_KEY)
nodeInfoWatcher = NodeInfoWatcher(folder.root.toPath(), scheduler)
nodeInfoPath = folder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY
}
@Test
fun `save a NodeInfo`() {
NodeInfoWatcher.saveToFile(folder.root.toPath(), nodeInfo, keyManagementService)
assertEquals(1, folder.root.list().size)
val fileName = folder.root.list()[0]
assertTrue(fileName.matches(nodeInfoFileRegex))
val file = (folder.root.path / fileName).toFile()
// Just check that something is written, another tests verifies that the written value can be read back.
assertThat(contentOf(file)).isNotEmpty()
}
@Test
fun `load an empty Directory`() {
nodeInfoPath.createDirectories()
nodeInfoWatcher.nodeInfoUpdates()
.subscribe(testSubscriber)
val readNodes = testSubscriber.onNextEvents.distinct()
scheduler.advanceTimeBy(1, TimeUnit.HOURS)
assertEquals(0, readNodes.size)
}
@Test
fun `load a non empty Directory`() {
createNodeInfoFileInPath(nodeInfo)
nodeInfoWatcher.nodeInfoUpdates()
.subscribe(testSubscriber)
val readNodes = testSubscriber.onNextEvents.distinct()
assertEquals(1, readNodes.size)
assertEquals(nodeInfo, readNodes.first())
}
@Test
fun `polling folder`() {
nodeInfoPath.createDirectories()
// Start polling with an empty folder.
nodeInfoWatcher.nodeInfoUpdates()
.subscribe(testSubscriber)
// Ensure the watch service is started.
scheduler.advanceTimeBy(1, TimeUnit.HOURS)
// Check no nodeInfos are read.
assertEquals(0, testSubscriber.valueCount)
createNodeInfoFileInPath(nodeInfo)
scheduler.advanceTimeBy(1, TimeUnit.HOURS)
// The same folder can be reported more than once, so take unique values.
val readNodes = testSubscriber.onNextEvents.distinct()
assertEquals(1, readNodes.size)
assertEquals(nodeInfo, readNodes.first())
}
// Write a nodeInfo under the right path.
private fun createNodeInfoFileInPath(nodeInfo: NodeInfo) {
NodeInfoWatcher.saveToFile(nodeInfoPath, nodeInfo, keyManagementService)
}
}