mirror of
https://github.com/corda/corda.git
synced 2025-01-18 10:46:38 +00:00
EG-3458 - Missing onError implementation message logged in the node l… (#6602)
* EG-3458 - Missing onError implementation message logged in the node log file with ERROR level - the changes made on top of 4.6 branch * EG-3458 - Reducing the number of logs by only logging on first consecutive error. Retry without completing the observable * EG-3458 - Refactor the overly complex method to smaller functions * EG-3458 - Reducing the number of functions in the class
This commit is contained in:
parent
a64a3bd02d
commit
3ce78b813d
@ -39,6 +39,7 @@ import rx.subjects.PublishSubject
|
||||
import java.lang.Integer.max
|
||||
import java.lang.Integer.min
|
||||
import java.lang.reflect.Method
|
||||
import java.nio.file.NoSuchFileException
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardCopyOption
|
||||
import java.security.cert.X509Certificate
|
||||
@ -48,6 +49,7 @@ import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.function.Consumer
|
||||
import java.util.function.Supplier
|
||||
@ -66,8 +68,9 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
) : AutoCloseable, NetworkParameterUpdateListener {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
private val defaultRetryInterval = 1.minutes
|
||||
private val defaultWatchHttpNetworkMapRetryInterval = 1.minutes
|
||||
private const val bulkNodeInfoFetchThreshold = 50
|
||||
private const val defaultWatchNodeInfoFilesRetryIntervalSeconds = 10L
|
||||
}
|
||||
|
||||
private val parametersUpdatesTrack = PublishSubject.create<ParametersUpdateInfo>()
|
||||
@ -134,26 +137,48 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
}
|
||||
|
||||
private fun watchForNodeInfoFiles(): Subscription {
|
||||
val previousConsecutiveErrors = AtomicBoolean(false)
|
||||
return nodeInfoWatcher
|
||||
.nodeInfoUpdates()
|
||||
.subscribe {
|
||||
for (update in it) {
|
||||
when (update) {
|
||||
is NodeInfoUpdate.Add -> networkMapCache.addOrUpdateNode(update.nodeInfo)
|
||||
is NodeInfoUpdate.Remove -> {
|
||||
if (update.hash != ourNodeInfoHash) {
|
||||
val nodeInfo = networkMapCache.getNodeByHash(update.hash)
|
||||
nodeInfo?.let(networkMapCache::removeNode)
|
||||
}
|
||||
}
|
||||
.doOnError {
|
||||
// only log this error once instead on every retry
|
||||
if (previousConsecutiveErrors.compareAndSet(false, true)) {
|
||||
if (it is NoSuchFileException) {
|
||||
logger.warn("Folder not found while polling directory for network map updates. Create this folder or try " +
|
||||
"restarting node. Retrying every $defaultWatchNodeInfoFilesRetryIntervalSeconds seconds - $it")
|
||||
} else {
|
||||
logger.warn("Error encountered while polling directory for network map updates, " +
|
||||
"retrying every $defaultWatchNodeInfoFilesRetryIntervalSeconds seconds", it)
|
||||
}
|
||||
}
|
||||
if (networkMapClient == null) {
|
||||
// Mark the network map cache as ready on a successful poll of the node infos dir if not using
|
||||
// the HTTP network map even if there aren't any node infos
|
||||
networkMapCache.nodeReady.set(null)
|
||||
}
|
||||
.doOnNext {
|
||||
// log this only if errors occurred
|
||||
if (previousConsecutiveErrors.compareAndSet(true, false)) {
|
||||
logger.info("File polling for network map updates succeeded after one or more retries")
|
||||
}
|
||||
}
|
||||
.retryWhen { t -> t.delay(defaultWatchNodeInfoFilesRetryIntervalSeconds, TimeUnit.SECONDS, nodeInfoWatcher.scheduler) }
|
||||
.subscribe { processNodeInfoUpdates(it) }
|
||||
}
|
||||
|
||||
private fun processNodeInfoUpdates(it: List<NodeInfoUpdate>) {
|
||||
for (update in it) {
|
||||
when (update) {
|
||||
is NodeInfoUpdate.Add -> networkMapCache.addOrUpdateNode(update.nodeInfo)
|
||||
is NodeInfoUpdate.Remove -> {
|
||||
if (update.hash != ourNodeInfoHash) {
|
||||
val nodeInfo = networkMapCache.getNodeByHash(update.hash)
|
||||
nodeInfo?.let(networkMapCache::removeNode)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (networkMapClient == null) {
|
||||
// Mark the network map cache as ready on a successful poll of the node infos dir if not using
|
||||
// the HTTP network map even if there aren't any node infos
|
||||
networkMapCache.nodeReady.set(null)
|
||||
}
|
||||
}
|
||||
|
||||
private fun watchHttpNetworkMap() {
|
||||
@ -163,8 +188,8 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
val nextScheduleDelay = try {
|
||||
updateNetworkMapCache()
|
||||
} catch (e: Exception) {
|
||||
logger.warn("Error encountered while updating network map, will retry in $defaultRetryInterval", e)
|
||||
defaultRetryInterval
|
||||
logger.warn("Error encountered while updating network map, will retry in $defaultWatchHttpNetworkMapRetryInterval", e)
|
||||
defaultWatchHttpNetworkMapRetryInterval
|
||||
}
|
||||
// Schedule the next update.
|
||||
networkMapPoller.schedule(this, nextScheduleDelay.toMillis(), TimeUnit.MILLISECONDS)
|
||||
@ -380,4 +405,4 @@ internal fun NetworkParameters.valueChanged(newNetworkParameters: NetworkParamet
|
||||
val propertyValue = getter?.invoke(this)
|
||||
val newPropertyValue = getter?.invoke(newNetworkParameters)
|
||||
return propertyValue != newPropertyValue
|
||||
}
|
||||
}
|
@ -35,7 +35,7 @@ sealed class NodeInfoUpdate {
|
||||
*/
|
||||
// TODO: Use NIO watch service instead?
|
||||
class NodeInfoWatcher(private val nodePath: Path,
|
||||
private val scheduler: Scheduler,
|
||||
internal val scheduler: Scheduler,
|
||||
private val pollInterval: Duration = 5.seconds) {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
|
@ -19,6 +19,7 @@ import net.corda.core.internal.NODE_INFO_DIRECTORY
|
||||
import net.corda.core.internal.NetworkParametersStorage
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.createDirectory
|
||||
import net.corda.core.internal.delete
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.exists
|
||||
@ -280,6 +281,40 @@ class NetworkMapUpdaterTest {
|
||||
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `receive node infos from directory after an error due to missing additional-node-infos directory`() {
|
||||
setUpdater(netMapClient = null)
|
||||
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
|
||||
|
||||
// Not subscribed yet
|
||||
verify(networkMapCache, times(0)).addOrUpdateNode(any())
|
||||
|
||||
nodeInfoDir.delete()
|
||||
assertFalse(nodeInfoDir.exists())
|
||||
|
||||
// Observable will get a NoSuchFileException and log it
|
||||
startUpdater()
|
||||
// Updater will resubscribe to observable with delayed retry. We should see one log warning message despite two retries.
|
||||
advanceTime()
|
||||
advanceTime()
|
||||
|
||||
// no changes will be made to networkMapCache at this point
|
||||
verify(networkMapCache, times(0)).addOrUpdateNode(any())
|
||||
|
||||
nodeInfoDir.createDirectory()
|
||||
assertTrue(nodeInfoDir.exists())
|
||||
|
||||
// Now that directory has been created, save a nodeInfo and assert that the file polling watcher behaves as expected
|
||||
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned)
|
||||
assertThat(nodeReadyFuture).isNotDone
|
||||
advanceTime()
|
||||
|
||||
verify(networkMapCache, times(1)).addOrUpdateNode(fileNodeInfoAndSigned.nodeInfo)
|
||||
assertThat(nodeReadyFuture).isDone
|
||||
|
||||
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `emit new parameters update info on parameters update from network map`() {
|
||||
setUpdater()
|
||||
|
Loading…
Reference in New Issue
Block a user