diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt index b36f34d632..504c3df727 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt @@ -39,6 +39,7 @@ import rx.subjects.PublishSubject import java.lang.Integer.max import java.lang.Integer.min import java.lang.reflect.Method +import java.nio.file.NoSuchFileException import java.nio.file.Path import java.nio.file.StandardCopyOption import java.security.cert.X509Certificate @@ -48,6 +49,7 @@ import java.util.concurrent.CompletableFuture import java.util.concurrent.Executors import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference import java.util.function.Consumer import java.util.function.Supplier @@ -66,8 +68,9 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, ) : AutoCloseable, NetworkParameterUpdateListener { companion object { private val logger = contextLogger() - private val defaultRetryInterval = 1.minutes + private val defaultWatchHttpNetworkMapRetryInterval = 1.minutes private const val bulkNodeInfoFetchThreshold = 50 + private const val defaultWatchNodeInfoFilesRetryIntervalSeconds = 10L } private val parametersUpdatesTrack = PublishSubject.create() @@ -134,26 +137,48 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, } private fun watchForNodeInfoFiles(): Subscription { + val previousConsecutiveErrors = AtomicBoolean(false) return nodeInfoWatcher .nodeInfoUpdates() - .subscribe { - for (update in it) { - when (update) { - is NodeInfoUpdate.Add -> networkMapCache.addOrUpdateNode(update.nodeInfo) - is NodeInfoUpdate.Remove -> { - if (update.hash != ourNodeInfoHash) { - val nodeInfo = networkMapCache.getNodeByHash(update.hash) - nodeInfo?.let(networkMapCache::removeNode) - } - } + .doOnError { + // only log this error once instead on every retry + if (previousConsecutiveErrors.compareAndSet(false, true)) { + if (it is NoSuchFileException) { + logger.warn("Folder not found while polling directory for network map updates. Create this folder or try " + + "restarting node. Retrying every $defaultWatchNodeInfoFilesRetryIntervalSeconds seconds - $it") + } else { + logger.warn("Error encountered while polling directory for network map updates, " + + "retrying every $defaultWatchNodeInfoFilesRetryIntervalSeconds seconds", it) } } - if (networkMapClient == null) { - // Mark the network map cache as ready on a successful poll of the node infos dir if not using - // the HTTP network map even if there aren't any node infos - networkMapCache.nodeReady.set(null) + } + .doOnNext { + // log this only if errors occurred + if (previousConsecutiveErrors.compareAndSet(true, false)) { + logger.info("File polling for network map updates succeeded after one or more retries") } } + .retryWhen { t -> t.delay(defaultWatchNodeInfoFilesRetryIntervalSeconds, TimeUnit.SECONDS, nodeInfoWatcher.scheduler) } + .subscribe { processNodeInfoUpdates(it) } + } + + private fun processNodeInfoUpdates(it: List) { + for (update in it) { + when (update) { + is NodeInfoUpdate.Add -> networkMapCache.addOrUpdateNode(update.nodeInfo) + is NodeInfoUpdate.Remove -> { + if (update.hash != ourNodeInfoHash) { + val nodeInfo = networkMapCache.getNodeByHash(update.hash) + nodeInfo?.let(networkMapCache::removeNode) + } + } + } + } + if (networkMapClient == null) { + // Mark the network map cache as ready on a successful poll of the node infos dir if not using + // the HTTP network map even if there aren't any node infos + networkMapCache.nodeReady.set(null) + } } private fun watchHttpNetworkMap() { @@ -163,8 +188,8 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, val nextScheduleDelay = try { updateNetworkMapCache() } catch (e: Exception) { - logger.warn("Error encountered while updating network map, will retry in $defaultRetryInterval", e) - defaultRetryInterval + logger.warn("Error encountered while updating network map, will retry in $defaultWatchHttpNetworkMapRetryInterval", e) + defaultWatchHttpNetworkMapRetryInterval } // Schedule the next update. networkMapPoller.schedule(this, nextScheduleDelay.toMillis(), TimeUnit.MILLISECONDS) @@ -380,4 +405,4 @@ internal fun NetworkParameters.valueChanged(newNetworkParameters: NetworkParamet val propertyValue = getter?.invoke(this) val newPropertyValue = getter?.invoke(newNetworkParameters) return propertyValue != newPropertyValue -} +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt index 428a6163d1..9a2867459c 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt @@ -35,7 +35,7 @@ sealed class NodeInfoUpdate { */ // TODO: Use NIO watch service instead? class NodeInfoWatcher(private val nodePath: Path, - private val scheduler: Scheduler, + internal val scheduler: Scheduler, private val pollInterval: Duration = 5.seconds) { companion object { private val logger = contextLogger() diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt index 8e36a97de9..8ef62bbcd9 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt @@ -19,6 +19,7 @@ import net.corda.core.internal.NODE_INFO_DIRECTORY import net.corda.core.internal.NetworkParametersStorage import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.createDirectory import net.corda.core.internal.delete import net.corda.core.internal.div import net.corda.core.internal.exists @@ -280,6 +281,40 @@ class NetworkMapUpdaterTest { assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash) } + @Test(timeout=300_000) + fun `receive node infos from directory after an error due to missing additional-node-infos directory`() { + setUpdater(netMapClient = null) + val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file") + + // Not subscribed yet + verify(networkMapCache, times(0)).addOrUpdateNode(any()) + + nodeInfoDir.delete() + assertFalse(nodeInfoDir.exists()) + + // Observable will get a NoSuchFileException and log it + startUpdater() + // Updater will resubscribe to observable with delayed retry. We should see one log warning message despite two retries. + advanceTime() + advanceTime() + + // no changes will be made to networkMapCache at this point + verify(networkMapCache, times(0)).addOrUpdateNode(any()) + + nodeInfoDir.createDirectory() + assertTrue(nodeInfoDir.exists()) + + // Now that directory has been created, save a nodeInfo and assert that the file polling watcher behaves as expected + NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned) + assertThat(nodeReadyFuture).isNotDone + advanceTime() + + verify(networkMapCache, times(1)).addOrUpdateNode(fileNodeInfoAndSigned.nodeInfo) + assertThat(nodeReadyFuture).isDone + + assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash) + } + @Test(timeout=300_000) fun `emit new parameters update info on parameters update from network map`() { setUpdater()