CORDA-2770 - file watcher subscription in NetworkMapUpdater should be unsubscribed on close (#4914)

This commit is contained in:
dazraf 2019-03-22 09:42:29 +00:00 committed by Shams Asari
parent 9ebe464f63
commit a5dd23dd43

View File

@ -29,6 +29,7 @@ import java.time.Duration
import java.util.*
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import kotlin.reflect.KProperty1
import kotlin.reflect.full.declaredMemberProperties
import kotlin.reflect.full.findAnnotation
@ -52,7 +53,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
executeExistingDelayedTasksAfterShutdownPolicy = false
}
private var newNetworkParameters: Pair<ParametersUpdate, SignedNetworkParameters>? = null
private var fileWatcherSubscription: Subscription? = null
private val fileWatcherSubscription = AtomicReference<Subscription?>()
private var autoAcceptNetworkParameters: Boolean = true
private lateinit var trustRoot: X509Certificate
private lateinit var currentParametersHash: SecureHash
@ -63,7 +64,14 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private lateinit var excludedAutoAcceptNetworkParameters: Set<String>
override fun close() {
fileWatcherSubscription?.unsubscribe()
fileWatcherSubscription.updateAndGet { subscription ->
subscription?.apply {
if (!isUnsubscribed) {
unsubscribe()
}
}
null // sets the atomic ref to null
}
MoreExecutors.shutdownAndAwaitTermination(networkMapPoller, 50, TimeUnit.SECONDS)
}
@ -73,28 +81,31 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
networkParameters: NetworkParameters,
keyManagementService: KeyManagementService,
networkParameterAcceptanceSettings: NetworkParameterAcceptanceSettings) {
require(fileWatcherSubscription == null) { "Should not call this method twice." }
this.trustRoot = trustRoot
this.currentParametersHash = currentParametersHash
this.ourNodeInfo = ourNodeInfo
this.ourNodeInfoHash = ourNodeInfo.raw.hash
this.networkParameters = networkParameters
this.keyManagementService = keyManagementService
this.autoAcceptNetworkParameters = networkParameterAcceptanceSettings.autoAcceptEnabled
this.excludedAutoAcceptNetworkParameters = networkParameterAcceptanceSettings.excludedAutoAcceptableParameters
fileWatcherSubscription.updateAndGet { subscription ->
require(subscription == null) { "Should not call this method twice" }
this.trustRoot = trustRoot
this.currentParametersHash = currentParametersHash
this.ourNodeInfo = ourNodeInfo
this.ourNodeInfoHash = ourNodeInfo.raw.hash
this.networkParameters = networkParameters
this.keyManagementService = keyManagementService
this.autoAcceptNetworkParameters = networkParameterAcceptanceSettings.autoAcceptEnabled
this.excludedAutoAcceptNetworkParameters = networkParameterAcceptanceSettings.excludedAutoAcceptableParameters
val autoAcceptNetworkParametersNames = autoAcceptablePropertyNames - excludedAutoAcceptNetworkParameters
if (autoAcceptNetworkParameters && autoAcceptNetworkParametersNames.isNotEmpty()) {
logger.info("Auto-accept enabled for network parameter changes which modify only: $autoAcceptNetworkParametersNames")
}
watchForNodeInfoFiles()
if (networkMapClient != null) {
watchHttpNetworkMap()
val autoAcceptNetworkParametersNames = autoAcceptablePropertyNames - excludedAutoAcceptNetworkParameters
if (autoAcceptNetworkParameters && autoAcceptNetworkParametersNames.isNotEmpty()) {
logger.info("Auto-accept enabled for network parameter changes which modify only: $autoAcceptNetworkParametersNames")
}
watchForNodeInfoFiles().also {
if (networkMapClient != null) {
watchHttpNetworkMap()
}
}
}
}
private fun watchForNodeInfoFiles() {
nodeInfoWatcher
private fun watchForNodeInfoFiles(): Subscription {
return nodeInfoWatcher
.nodeInfoUpdates()
.subscribe {
for (update in it) {