mirror of
https://github.com/corda/corda.git
synced 2025-01-18 18:56:28 +00:00
Deflake node infoWatcherTest (#1836)
* Stop using the watch service, just re-read the whole directory every time. On macOS it's quite unpredictable and the tests are almost always failing.
This commit is contained in:
parent
29a101c378
commit
69ad52cf5c
@ -70,9 +70,9 @@ class NodeInfoWatcherTest : NodeBasedTest() {
|
||||
|
||||
nodeInfoWatcher.nodeInfoUpdates()
|
||||
.subscribe(testSubscriber)
|
||||
advanceTime()
|
||||
|
||||
val readNodes = testSubscriber.onNextEvents.distinct()
|
||||
advanceTime()
|
||||
assertEquals(0, readNodes.size)
|
||||
}
|
||||
|
||||
@ -82,6 +82,7 @@ class NodeInfoWatcherTest : NodeBasedTest() {
|
||||
|
||||
nodeInfoWatcher.nodeInfoUpdates()
|
||||
.subscribe(testSubscriber)
|
||||
advanceTime()
|
||||
|
||||
val readNodes = testSubscriber.onNextEvents.distinct()
|
||||
|
||||
@ -114,7 +115,7 @@ class NodeInfoWatcherTest : NodeBasedTest() {
|
||||
}
|
||||
|
||||
private fun advanceTime() {
|
||||
scheduler.advanceTimeBy(1, TimeUnit.HOURS)
|
||||
scheduler.advanceTimeBy(1, TimeUnit.MINUTES)
|
||||
}
|
||||
|
||||
// Write a nodeInfo under the right path.
|
||||
|
@ -3,7 +3,12 @@ package net.corda.node.services.network
|
||||
import net.corda.cordform.CordformNode
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.isDirectory
|
||||
import net.corda.core.internal.isRegularFile
|
||||
import net.corda.core.internal.list
|
||||
import net.corda.core.internal.readAll
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.KeyManagementService
|
||||
import net.corda.core.serialization.deserialize
|
||||
@ -13,10 +18,6 @@ import rx.Observable
|
||||
import rx.Scheduler
|
||||
import rx.schedulers.Schedulers
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardWatchEventKinds
|
||||
import java.nio.file.WatchEvent
|
||||
import java.nio.file.WatchKey
|
||||
import java.nio.file.WatchService
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.streams.toList
|
||||
|
||||
@ -33,7 +34,6 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
private val scheduler: Scheduler = Schedulers.io()) {
|
||||
|
||||
private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
|
||||
private val watchService : WatchService? by lazy { initWatch() }
|
||||
|
||||
companion object {
|
||||
private val logger = loggerFor<NodeInfoWatcher>()
|
||||
@ -67,14 +67,15 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
* Read all the files contained in [nodePath] / [CordformNode.NODE_INFO_DIRECTORY] and keep watching
|
||||
* the folder for further updates.
|
||||
*
|
||||
* We simply list the directory content every 5 seconds, the Java implementation of WatchService has been proven to
|
||||
* be unreliable on MacOs and given the fairly simple use case we have, this simple implementation should do.
|
||||
*
|
||||
* @return an [Observable] returning [NodeInfo]s, there is no guarantee that the same value isn't returned more
|
||||
* than once.
|
||||
*/
|
||||
fun nodeInfoUpdates(): Observable<NodeInfo> {
|
||||
val pollForFiles = Observable.interval(5, TimeUnit.SECONDS, scheduler)
|
||||
.flatMapIterable { pollWatch() }
|
||||
val readCurrentFiles = Observable.from(loadFromDirectory())
|
||||
return readCurrentFiles.mergeWith(pollForFiles)
|
||||
return Observable.interval(5, TimeUnit.SECONDS, scheduler)
|
||||
.flatMapIterable { loadFromDirectory() }
|
||||
}
|
||||
|
||||
/**
|
||||
@ -84,7 +85,6 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
* @return a list of [NodeInfo]s
|
||||
*/
|
||||
private fun loadFromDirectory(): List<NodeInfo> {
|
||||
val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
|
||||
if (!nodeInfoDirectory.isDirectory()) {
|
||||
logger.info("$nodeInfoDirectory isn't a Directory, not loading NodeInfo from files")
|
||||
return emptyList()
|
||||
@ -99,31 +99,6 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
return result
|
||||
}
|
||||
|
||||
// Polls the watchService for changes to nodeInfoDirectory, return all the newly read NodeInfos.
|
||||
private fun pollWatch(): List<NodeInfo> {
|
||||
if (watchService == null) {
|
||||
return emptyList()
|
||||
}
|
||||
val watchKey: WatchKey = watchService?.poll() ?: return emptyList()
|
||||
val files = mutableSetOf<Path>()
|
||||
for (event in watchKey.pollEvents()) {
|
||||
val kind = event.kind()
|
||||
if (kind == StandardWatchEventKinds.OVERFLOW) continue
|
||||
|
||||
val ev: WatchEvent<Path> = uncheckedCast(event)
|
||||
val filename = ev.context()
|
||||
val absolutePath = nodeInfoDirectory.resolve(filename)
|
||||
if (absolutePath.isRegularFile()) {
|
||||
files.add(absolutePath)
|
||||
}
|
||||
}
|
||||
val valid = watchKey.reset()
|
||||
if (!valid) {
|
||||
logger.warn("Can't poll $nodeInfoDirectory anymore, it was probably deleted.")
|
||||
}
|
||||
return files.mapNotNull { processFile(it) }
|
||||
}
|
||||
|
||||
private fun processFile(file: Path) : NodeInfo? {
|
||||
try {
|
||||
logger.info("Reading NodeInfo from file: $file")
|
||||
@ -134,17 +109,4 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
// Create a WatchService watching for changes in nodeInfoDirectory.
|
||||
private fun initWatch() : WatchService? {
|
||||
if (!nodeInfoDirectory.isDirectory()) {
|
||||
logger.warn("Not watching folder $nodeInfoDirectory it doesn't exist or it's not a directory")
|
||||
return null
|
||||
}
|
||||
val watchService = nodeInfoDirectory.fileSystem.newWatchService()
|
||||
nodeInfoDirectory.register(watchService, StandardWatchEventKinds.ENTRY_CREATE,
|
||||
StandardWatchEventKinds.ENTRY_MODIFY)
|
||||
logger.info("Watching $nodeInfoDirectory for new files")
|
||||
return watchService
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user