CORDA-1160: Only read node-info files if their last modified time has changed. (#2717)

This commit is contained in:
Shams Asari 2018-03-05 11:10:38 +00:00 committed by GitHub
parent 26fe90c8e9
commit 4a73a80b39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 40 deletions

View File

@ -5,7 +5,6 @@ package net.corda.core.internal
import net.corda.core.cordapp.Cordapp
import net.corda.core.cordapp.CordappConfig
import net.corda.core.cordapp.CordappContext
import net.corda.core.cordapp.CordappProvider
import net.corda.core.crypto.*
import net.corda.core.flows.NotarisationRequest
import net.corda.core.flows.NotarisationRequestSignature
@ -38,6 +37,7 @@ import java.nio.charset.Charset
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.*
import java.nio.file.attribute.FileAttribute
import java.nio.file.attribute.FileTime
import java.security.KeyPair
import java.security.PrivateKey
import java.security.cert.X509Certificate
@ -130,6 +130,7 @@ fun Path.moveTo(target: Path, vararg options: CopyOption): Path = Files.move(thi
fun Path.isRegularFile(vararg options: LinkOption): Boolean = Files.isRegularFile(this, *options)
fun Path.isDirectory(vararg options: LinkOption): Boolean = Files.isDirectory(this, *options)
inline val Path.size: Long get() = Files.size(this)
fun Path.lastModifiedTime(vararg options: LinkOption): FileTime = Files.getLastModifiedTime(this, *options)
inline fun <R> Path.list(block: (Stream<Path>) -> R): R = Files.list(this).use(block)
fun Path.deleteIfExists(): Boolean = Files.deleteIfExists(this)
fun Path.reader(charset: Charset = UTF_8): BufferedReader = Files.newBufferedReader(this, charset)
@ -257,6 +258,13 @@ fun IntProgression.stream(parallel: Boolean = false): IntStream = StreamSupport.
// When toArray has filled in the array, the component type is no longer T? but T (that may itself be nullable):
inline fun <reified T> Stream<out T>.toTypedArray(): Array<T> = uncheckedCast(toArray { size -> arrayOfNulls<T>(size) })
inline fun <T, R : Any> Stream<T>.mapNotNull(crossinline transform: (T) -> R?): Stream<R> {
return flatMap {
val value = transform(it)
if (value != null) Stream.of(value) else Stream.empty()
}
}
fun <T> Class<T>.castIfPossible(obj: Any): T? = if (isInstance(obj)) cast(obj) else null
/** Returns a [DeclaredField] wrapper around the declared (possibly non-public) static field of the receiver [Class]. */

View File

@ -8,6 +8,7 @@ import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal._contextSerializationEnv
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.seconds
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo
@ -17,13 +18,12 @@ import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
import rx.Observable
import rx.Scheduler
import java.io.IOException
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
import java.nio.file.attribute.FileTime
import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.stream.Stream
import kotlin.streams.toList
/**
@ -58,20 +58,14 @@ class NodeInfoWatcher(private val nodePath: Path,
}
}
private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
private val nodeInfosDir = nodePath / CordformNode.NODE_INFO_DIRECTORY
private val nodeInfoFiles = HashMap<Path, FileTime>()
private val _processedNodeInfoHashes = HashSet<SecureHash>()
val processedNodeInfoHashes: Set<SecureHash> get() = _processedNodeInfoHashes
init {
require(pollInterval >= 5.seconds) { "Poll interval must be 5 seconds or longer." }
if (!nodeInfoDirectory.isDirectory()) {
try {
nodeInfoDirectory.createDirectories()
} catch (e: IOException) {
logger.info("Failed to create $nodeInfoDirectory", e)
}
}
nodeInfosDir.createDirectories()
}
/**
@ -93,42 +87,32 @@ class NodeInfoWatcher(private val nodePath: Path,
return Companion.saveToFile(nodePath, nodeInfoAndSigned)
}
/**
* Loads all the files contained in a given path and returns the deserialized [NodeInfo]s.
* Signatures are checked before returning a value.
*
* @return a list of [NodeInfo]s
*/
private fun loadFromDirectory(): List<NodeInfo> {
if (!nodeInfoDirectory.isDirectory()) {
return emptyList()
}
val result = nodeInfoDirectory.list { paths ->
val result = nodeInfosDir.list { paths ->
paths
.filter { it.isRegularFile() }
.flatMap { path ->
val nodeInfo = processFile(path)?.let {
if (_processedNodeInfoHashes.add(it.signed.raw.hash)) it.nodeInfo else null
.filter { file ->
val lastModifiedTime = file.lastModifiedTime()
val previousLastModifiedTime = nodeInfoFiles[file]
val newOrChangedFile = previousLastModifiedTime == null || lastModifiedTime > previousLastModifiedTime
nodeInfoFiles[file] = lastModifiedTime
newOrChangedFile
}
.mapNotNull { file ->
logger.debug { "Reading SignedNodeInfo from $file" }
try {
NodeInfoAndSigned(file.readObject())
} catch (e: Exception) {
logger.warn("Unable to read SignedNodeInfo from $file", e)
null
}
if (nodeInfo != null) Stream.of(nodeInfo) else Stream.empty()
}
.toList()
}
if (result.isNotEmpty()) {
logger.info("Successfully read ${result.size} NodeInfo files from disk.")
}
return result
}
private fun processFile(file: Path): NodeInfoAndSigned? {
return try {
logger.info("Reading NodeInfo from file: $file")
val signedNodeInfo = file.readObject<SignedNodeInfo>()
NodeInfoAndSigned(signedNodeInfo)
} catch (e: Exception) {
logger.warn("Exception parsing NodeInfo from file. $file", e)
null
}
logger.debug { "Read ${result.size} NodeInfo files from $nodeInfosDir" }
_processedNodeInfoHashes += result.map { it.signed.raw.hash }
return result.map { it.nodeInfo }
}
}