Add a configuration option to set how often should a node check for new nodeinfos on disk (#1851)

* Add a configuration option to set how often should a node check for new NodeInfo files in additional-node-infos
This commit is contained in:
Alberto Arri
2017-10-10 09:55:20 +01:00
committed by GitHub
parent 6166fa8358
commit 7af1f02a2d
5 changed files with 21 additions and 6 deletions

View File

@ -50,7 +50,7 @@ class NodeInfoWatcherTest : NodeBasedTest() {
fun start() { fun start() {
val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT) val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT)
keyManagementService = MockKeyManagementService(identityService, ALICE_KEY) keyManagementService = MockKeyManagementService(identityService, ALICE_KEY)
nodeInfoWatcher = NodeInfoWatcher(folder.root.toPath(), scheduler) nodeInfoWatcher = NodeInfoWatcher(folder.root.toPath(), scheduler = scheduler)
nodeInfoPath = folder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY nodeInfoPath = folder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY
} }

View File

@ -2,6 +2,7 @@ package net.corda.node.services.config
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.node.internal.NetworkMapInfo import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.messaging.CertificateChainCheckPolicy import net.corda.node.services.messaging.CertificateChainCheckPolicy
import net.corda.nodeapi.User import net.corda.nodeapi.User
@ -33,6 +34,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
val messageRedeliveryDelaySeconds: Int val messageRedeliveryDelaySeconds: Int
val notary: NotaryConfig? val notary: NotaryConfig?
val activeMQServer: ActiveMqServerConfiguration val activeMQServer: ActiveMqServerConfiguration
val additionalNodeInfoPollingFrequencyMsec: Long
} }
data class NotaryConfig(val validating: Boolean, val raft: RaftConfig? = null, val bftSMaRt: BFTSMaRtConfiguration? = null) { data class NotaryConfig(val validating: Boolean, val raft: RaftConfig? = null, val bftSMaRt: BFTSMaRtConfiguration? = null) {
@ -86,7 +88,8 @@ data class FullNodeConfiguration(
override val devMode: Boolean = false, override val devMode: Boolean = false,
val useTestClock: Boolean = false, val useTestClock: Boolean = false,
val detectPublicIp: Boolean = true, val detectPublicIp: Boolean = true,
override val activeMQServer: ActiveMqServerConfiguration override val activeMQServer: ActiveMqServerConfiguration,
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis()
) : NodeConfiguration { ) : NodeConfiguration {
override val exportJMXto: String get() = "http" override val exportJMXto: String get() = "http"

View File

@ -8,10 +8,10 @@ import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.seconds
import rx.Observable import rx.Observable
import rx.Scheduler import rx.Scheduler
import rx.schedulers.Schedulers import rx.schedulers.Schedulers
import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kotlin.streams.toList import kotlin.streams.toList
@ -22,13 +22,17 @@ import kotlin.streams.toList
* - Poll a directory for new serialized [NodeInfo] * - Poll a directory for new serialized [NodeInfo]
* *
* @param path the base path of a node. * @param path the base path of a node.
* @param pollFrequencyMsec how often to poll the filesystem in milliseconds. Any value smaller than 5 seconds will
* be treated as 5 seconds.
* @param scheduler a [Scheduler] for the rx [Observable] returned by [nodeInfoUpdates], this is mainly useful for * @param scheduler a [Scheduler] for the rx [Observable] returned by [nodeInfoUpdates], this is mainly useful for
* testing. It defaults to the io scheduler which is the appropriate value for production uses. * testing. It defaults to the io scheduler which is the appropriate value for production uses.
*/ */
class NodeInfoWatcher(private val nodePath: Path, class NodeInfoWatcher(private val nodePath: Path,
pollFrequencyMsec: Long = 5.seconds.toMillis(),
private val scheduler: Scheduler = Schedulers.io()) { private val scheduler: Scheduler = Schedulers.io()) {
private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
private val pollFrequencyMsec: Long
companion object { companion object {
private val logger = loggerFor<NodeInfoWatcher>() private val logger = loggerFor<NodeInfoWatcher>()
@ -56,6 +60,10 @@ class NodeInfoWatcher(private val nodePath: Path,
} }
} }
init {
this.pollFrequencyMsec = maxOf(pollFrequencyMsec, 5.seconds.toMillis())
}
/** /**
* Read all the files contained in [nodePath] / [CordformNode.NODE_INFO_DIRECTORY] and keep watching * Read all the files contained in [nodePath] / [CordformNode.NODE_INFO_DIRECTORY] and keep watching
* the folder for further updates. * the folder for further updates.
@ -67,7 +75,7 @@ class NodeInfoWatcher(private val nodePath: Path,
* than once. * than once.
*/ */
fun nodeInfoUpdates(): Observable<NodeInfo> { fun nodeInfoUpdates(): Observable<NodeInfo> {
return Observable.interval(5, TimeUnit.SECONDS, scheduler) return Observable.interval(pollFrequencyMsec, TimeUnit.MILLISECONDS, scheduler)
.flatMapIterable { loadFromDirectory() } .flatMapIterable { loadFromDirectory() }
} }

View File

@ -39,6 +39,7 @@ import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.security.PublicKey import java.security.PublicKey
import java.security.SignatureException import java.security.SignatureException
import java.time.Duration
import java.util.* import java.util.*
import javax.annotation.concurrent.ThreadSafe import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.HashMap import kotlin.collections.HashMap
@ -87,7 +88,8 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal)
.sortedBy { it.name.toString() } .sortedBy { it.name.toString() }
} }
private val nodeInfoSerializer = NodeInfoWatcher(serviceHub.configuration.baseDirectory) private val nodeInfoSerializer = NodeInfoWatcher(serviceHub.configuration.baseDirectory,
serviceHub.configuration.additionalNodeInfoPollingFrequencyMsec)
init { init {
loadFromFiles() loadFromFiles()

View File

@ -1,6 +1,7 @@
package net.corda.node.services.config package net.corda.node.services.config
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.nodeapi.User import net.corda.nodeapi.User
import net.corda.testing.ALICE import net.corda.testing.ALICE
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
@ -33,7 +34,8 @@ class FullNodeConfigurationTest {
notary = null, notary = null,
certificateChainCheckPolicies = emptyList(), certificateChainCheckPolicies = emptyList(),
devMode = true, devMode = true,
activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0))) activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)),
additionalNodeInfoPollingFrequencyMsec = 5.seconds.toMillis())
fun configWithRPCUsername(username: String) { fun configWithRPCUsername(username: String) {
testConfiguration.copy(rpcUsers = listOf(User(username, "pass", emptySet()))) testConfiguration.copy(rpcUsers = listOf(User(username, "pass", emptySet())))