diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index ac4dabae8d..d3a924dfef 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -5,7 +5,6 @@ package net.corda.core.internal import net.corda.core.cordapp.Cordapp import net.corda.core.cordapp.CordappConfig import net.corda.core.cordapp.CordappContext -import net.corda.core.cordapp.CordappProvider import net.corda.core.crypto.* import net.corda.core.flows.NotarisationRequest import net.corda.core.flows.NotarisationRequestSignature @@ -38,6 +37,7 @@ import java.nio.charset.Charset import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.* import java.nio.file.attribute.FileAttribute +import java.nio.file.attribute.FileTime import java.security.KeyPair import java.security.PrivateKey import java.security.cert.X509Certificate @@ -130,6 +130,7 @@ fun Path.moveTo(target: Path, vararg options: CopyOption): Path = Files.move(thi fun Path.isRegularFile(vararg options: LinkOption): Boolean = Files.isRegularFile(this, *options) fun Path.isDirectory(vararg options: LinkOption): Boolean = Files.isDirectory(this, *options) inline val Path.size: Long get() = Files.size(this) +fun Path.lastModifiedTime(vararg options: LinkOption): FileTime = Files.getLastModifiedTime(this, *options) inline fun Path.list(block: (Stream) -> R): R = Files.list(this).use(block) fun Path.deleteIfExists(): Boolean = Files.deleteIfExists(this) fun Path.reader(charset: Charset = UTF_8): BufferedReader = Files.newBufferedReader(this, charset) @@ -257,6 +258,13 @@ fun IntProgression.stream(parallel: Boolean = false): IntStream = StreamSupport. // When toArray has filled in the array, the component type is no longer T? but T (that may itself be nullable): inline fun Stream.toTypedArray(): Array = uncheckedCast(toArray { size -> arrayOfNulls(size) }) +inline fun Stream.mapNotNull(crossinline transform: (T) -> R?): Stream { + return flatMap { + val value = transform(it) + if (value != null) Stream.of(value) else Stream.empty() + } +} + fun Class.castIfPossible(obj: Any): T? = if (isInstance(obj)) cast(obj) else null /** Returns a [DeclaredField] wrapper around the declared (possibly non-public) static field of the receiver [Class]. */ diff --git a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt index 5059276ed5..bbcbb37e89 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt @@ -8,6 +8,7 @@ import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal._contextSerializationEnv import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.core.utilities.seconds import net.corda.nodeapi.internal.NodeInfoAndSigned import net.corda.nodeapi.internal.SignedNodeInfo @@ -17,13 +18,12 @@ import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme import rx.Observable import rx.Scheduler -import java.io.IOException import java.nio.file.Path import java.nio.file.Paths import java.nio.file.StandardCopyOption.REPLACE_EXISTING +import java.nio.file.attribute.FileTime import java.time.Duration import java.util.concurrent.TimeUnit -import java.util.stream.Stream import kotlin.streams.toList /** @@ -58,20 +58,14 @@ class NodeInfoWatcher(private val nodePath: Path, } } - private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY - + private val nodeInfosDir = nodePath / CordformNode.NODE_INFO_DIRECTORY + private val nodeInfoFiles = HashMap() private val _processedNodeInfoHashes = HashSet() val processedNodeInfoHashes: Set get() = _processedNodeInfoHashes init { require(pollInterval >= 5.seconds) { "Poll interval must be 5 seconds or longer." } - if (!nodeInfoDirectory.isDirectory()) { - try { - nodeInfoDirectory.createDirectories() - } catch (e: IOException) { - logger.info("Failed to create $nodeInfoDirectory", e) - } - } + nodeInfosDir.createDirectories() } /** @@ -93,42 +87,32 @@ class NodeInfoWatcher(private val nodePath: Path, return Companion.saveToFile(nodePath, nodeInfoAndSigned) } - /** - * Loads all the files contained in a given path and returns the deserialized [NodeInfo]s. - * Signatures are checked before returning a value. - * - * @return a list of [NodeInfo]s - */ private fun loadFromDirectory(): List { - if (!nodeInfoDirectory.isDirectory()) { - return emptyList() - } - val result = nodeInfoDirectory.list { paths -> + val result = nodeInfosDir.list { paths -> paths .filter { it.isRegularFile() } - .flatMap { path -> - val nodeInfo = processFile(path)?.let { - if (_processedNodeInfoHashes.add(it.signed.raw.hash)) it.nodeInfo else null + .filter { file -> + val lastModifiedTime = file.lastModifiedTime() + val previousLastModifiedTime = nodeInfoFiles[file] + val newOrChangedFile = previousLastModifiedTime == null || lastModifiedTime > previousLastModifiedTime + nodeInfoFiles[file] = lastModifiedTime + newOrChangedFile + } + .mapNotNull { file -> + logger.debug { "Reading SignedNodeInfo from $file" } + try { + NodeInfoAndSigned(file.readObject()) + } catch (e: Exception) { + logger.warn("Unable to read SignedNodeInfo from $file", e) + null } - if (nodeInfo != null) Stream.of(nodeInfo) else Stream.empty() } .toList() } - if (result.isNotEmpty()) { - logger.info("Successfully read ${result.size} NodeInfo files from disk.") - } - return result - } - private fun processFile(file: Path): NodeInfoAndSigned? { - return try { - logger.info("Reading NodeInfo from file: $file") - val signedNodeInfo = file.readObject() - NodeInfoAndSigned(signedNodeInfo) - } catch (e: Exception) { - logger.warn("Exception parsing NodeInfo from file. $file", e) - null - } + logger.debug { "Read ${result.size} NodeInfo files from $nodeInfosDir" } + _processedNodeInfoHashes += result.map { it.signed.raw.hash } + return result.map { it.nodeInfo } } }