CORDA-1093 - Deleting node info from directory (#3115) (#3921)

* CORDA-1093 Deleting node info from directory

Deleting NodeInfo from additional-node-infos directory should remove it from cache.
This commit is contained in:
Katelyn Baker 2018-09-13 13:43:20 +01:00 committed by GitHub
parent fadeda73fd
commit eac9d1b93c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 107 additions and 29 deletions

View File

@ -37,7 +37,7 @@ class NodeInfoWatcherTest {
val tempFolder = TemporaryFolder() val tempFolder = TemporaryFolder()
private val scheduler = TestScheduler() private val scheduler = TestScheduler()
private val testSubscriber = TestSubscriber<NodeInfo>() private val testSubscriber = TestSubscriber<NodeInfoUpdate>()
private lateinit var nodeInfoAndSigned: NodeInfoAndSigned private lateinit var nodeInfoAndSigned: NodeInfoAndSigned
private lateinit var nodeInfoPath: Path private lateinit var nodeInfoPath: Path
@ -101,7 +101,7 @@ class NodeInfoWatcherTest {
try { try {
val readNodes = testSubscriber.onNextEvents.distinct() val readNodes = testSubscriber.onNextEvents.distinct()
assertEquals(1, readNodes.size) assertEquals(1, readNodes.size)
assertEquals(nodeInfoAndSigned.nodeInfo, readNodes.first()) assertEquals(nodeInfoAndSigned.nodeInfo, (readNodes.first() as? NodeInfoUpdate.Add)?.nodeInfo)
} finally { } finally {
subscription.unsubscribe() subscription.unsubscribe()
} }
@ -126,7 +126,7 @@ class NodeInfoWatcherTest {
testSubscriber.awaitValueCount(1, 5, TimeUnit.SECONDS) testSubscriber.awaitValueCount(1, 5, TimeUnit.SECONDS)
// The same folder can be reported more than once, so take unique values. // The same folder can be reported more than once, so take unique values.
val readNodes = testSubscriber.onNextEvents.distinct() val readNodes = testSubscriber.onNextEvents.distinct()
assertEquals(nodeInfoAndSigned.nodeInfo, readNodes.first()) assertEquals(nodeInfoAndSigned.nodeInfo, (readNodes.first() as? NodeInfoUpdate.Add)?.nodeInfo)
} finally { } finally {
subscription.unsubscribe() subscription.unsubscribe()
} }

View File

@ -60,7 +60,17 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
fun subscribeToNetworkMap() { fun subscribeToNetworkMap() {
require(fileWatcherSubscription == null) { "Should not call this method twice." } require(fileWatcherSubscription == null) { "Should not call this method twice." }
// Subscribe to file based networkMap // Subscribe to file based networkMap
fileWatcherSubscription = fileWatcher.nodeInfoUpdates().subscribe(networkMapCache::addNode) fileWatcherSubscription = fileWatcher.nodeInfoUpdates().subscribe {
when (it) {
is NodeInfoUpdate.Add -> {
networkMapCache.addNode(it.nodeInfo)
}
is NodeInfoUpdate.Remove -> {
val nodeInfo = networkMapCache.getNodeByHash(it.hash)
nodeInfo?.let { networkMapCache.removeNode(it) }
}
}
}
if (networkMapClient == null) return if (networkMapClient == null) return

View File

@ -26,13 +26,18 @@ import java.time.Duration
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kotlin.streams.toList import kotlin.streams.toList
sealed class NodeInfoUpdate {
data class Add(val nodeInfo: NodeInfo) : NodeInfoUpdate()
data class Remove(val hash: SecureHash) : NodeInfoUpdate()
}
/** /**
* Class containing the logic to * Class containing the logic to
* - Serialize and de-serialize a [NodeInfo] to disk and reading it back. * - Serialize and de-serialize a [NodeInfo] to disk and reading it back.
* - Poll a directory for new serialized [NodeInfo] * - Poll a directory for new serialized [NodeInfo]
* *
* @param nodePath the base path of a node. * @param nodePath the base path of a node.
* @param pollInterval how often to poll the filesystem in milliseconds. Must be longer then 5 seconds. * @param pollInterval how often to poll the filesystem in milliseconds. Must be longer than 5 seconds.
* @param scheduler a [Scheduler] for the rx [Observable] returned by [nodeInfoUpdates], this is mainly useful for * @param scheduler a [Scheduler] for the rx [Observable] returned by [nodeInfoUpdates], this is mainly useful for
* testing. It defaults to the io scheduler which is the appropriate value for production uses. * testing. It defaults to the io scheduler which is the appropriate value for production uses.
*/ */
@ -58,10 +63,10 @@ class NodeInfoWatcher(private val nodePath: Path,
} }
} }
internal data class NodeInfoFromFile(val nodeInfohash: SecureHash, val lastModified: FileTime)
private val nodeInfosDir = nodePath / CordformNode.NODE_INFO_DIRECTORY private val nodeInfosDir = nodePath / CordformNode.NODE_INFO_DIRECTORY
private val nodeInfoFiles = HashMap<Path, FileTime>() private val nodeInfoFilesMap = HashMap<Path, NodeInfoFromFile>()
private val _processedNodeInfoHashes = HashSet<SecureHash>() val processedNodeInfoHashes: Set<SecureHash> get() = nodeInfoFilesMap.values.map { it.nodeInfohash }.toSet()
val processedNodeInfoHashes: Set<SecureHash> get() = _processedNodeInfoHashes
init { init {
require(pollInterval >= 5.seconds) { "Poll interval must be 5 seconds or longer." } require(pollInterval >= 5.seconds) { "Poll interval must be 5 seconds or longer." }
@ -75,33 +80,31 @@ class NodeInfoWatcher(private val nodePath: Path,
* We simply list the directory content every 5 seconds, the Java implementation of WatchService has been proven to * We simply list the directory content every 5 seconds, the Java implementation of WatchService has been proven to
* be unreliable on MacOs and given the fairly simple use case we have, this simple implementation should do. * be unreliable on MacOs and given the fairly simple use case we have, this simple implementation should do.
* *
* @return an [Observable] returning [NodeInfo]s, at most one [NodeInfo] is returned for each processed file. * @return an [Observable] returning [NodeInfoUpdate]s, at most one [NodeInfo] is returned for each processed file.
*/ */
fun nodeInfoUpdates(): Observable<NodeInfo> { fun nodeInfoUpdates(): Observable<NodeInfoUpdate> {
return Observable.interval(0, pollInterval.toMillis(), TimeUnit.MILLISECONDS, scheduler) return Observable.interval(0, pollInterval.toMillis(), TimeUnit.MILLISECONDS, scheduler)
.flatMapIterable { loadFromDirectory() } .flatMapIterable { loadFromDirectory() }
} }
// TODO This method doesn't belong in this class private fun loadFromDirectory(): List<NodeInfoUpdate> {
fun saveToFile(nodeInfoAndSigned: NodeInfoAndSigned) { val processedPaths = HashSet<Path>()
return Companion.saveToFile(nodePath, nodeInfoAndSigned)
}
private fun loadFromDirectory(): List<NodeInfo> {
val result = nodeInfosDir.list { paths -> val result = nodeInfosDir.list { paths ->
paths paths
.filter { it.isRegularFile() } .filter { it.isRegularFile() }
.filter { file -> .filter { file ->
val lastModifiedTime = file.lastModifiedTime() val lastModifiedTime = file.lastModifiedTime()
val previousLastModifiedTime = nodeInfoFiles[file] val previousLastModifiedTime = nodeInfoFilesMap[file]?.lastModified
val newOrChangedFile = previousLastModifiedTime == null || lastModifiedTime > previousLastModifiedTime val newOrChangedFile = previousLastModifiedTime == null || lastModifiedTime > previousLastModifiedTime
nodeInfoFiles[file] = lastModifiedTime processedPaths.add(file)
newOrChangedFile newOrChangedFile
} }
.mapNotNull { file -> .mapNotNull { file ->
logger.debug { "Reading SignedNodeInfo from $file" } logger.debug { "Reading SignedNodeInfo from $file" }
try { try {
NodeInfoAndSigned(file.readObject()) val nodeInfoSigned = NodeInfoAndSigned(file.readObject())
nodeInfoFilesMap[file] = NodeInfoFromFile(nodeInfoSigned.signed.raw.hash, file.lastModifiedTime())
nodeInfoSigned
} catch (e: Exception) { } catch (e: Exception) {
logger.warn("Unable to read SignedNodeInfo from $file", e) logger.warn("Unable to read SignedNodeInfo from $file", e)
null null
@ -109,10 +112,13 @@ class NodeInfoWatcher(private val nodePath: Path,
} }
.toList() .toList()
} }
val removedFiles = nodeInfoFilesMap.keys - processedPaths
val removedHashes = removedFiles.map { file ->
NodeInfoUpdate.Remove(nodeInfoFilesMap.remove(file)!!.nodeInfohash)
}
logger.debug { "Read ${result.size} NodeInfo files from $nodeInfosDir" } logger.debug { "Read ${result.size} NodeInfo files from $nodeInfosDir" }
_processedNodeInfoHashes += result.map { it.signed.raw.hash } logger.debug { "Number of removed NodeInfo files ${removedHashes.size}" }
return result.map { it.nodeInfo } return result.map { NodeInfoUpdate.Add(it.nodeInfo) } + removedHashes
} }
} }

View File

@ -251,8 +251,8 @@ open class PersistentNetworkMapCache(
} }
private fun removeInfoDB(session: Session, nodeInfo: NodeInfo) { private fun removeInfoDB(session: Session, nodeInfo: NodeInfo) {
val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey).single() val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey).singleOrNull()
session.remove(info) info?.let { session.remove(it) }
// invalidate cache last - this way, we might serve up the wrong info for a short time, but it will get refreshed // invalidate cache last - this way, we might serve up the wrong info for a short time, but it will get refreshed
// on the next load // on the next load
invalidateCaches(nodeInfo) invalidateCaches(nodeInfo)

View File

@ -9,6 +9,7 @@ import com.nhaarman.mockito_kotlin.verify
import net.corda.cordform.CordformNode.NODE_INFO_DIRECTORY import net.corda.cordform.CordformNode.NODE_INFO_DIRECTORY
import net.corda.core.crypto.Crypto import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sign
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.* import net.corda.core.internal.*
@ -29,18 +30,23 @@ import net.corda.testing.core.expect
import net.corda.testing.core.expectEvents import net.corda.testing.core.expectEvents
import net.corda.testing.core.sequence import net.corda.testing.core.sequence
import net.corda.testing.internal.DEV_ROOT_CA import net.corda.testing.internal.DEV_ROOT_CA
import net.corda.testing.internal.TestNodeInfoBuilder
import net.corda.testing.internal.createNodeInfoAndSigned import net.corda.testing.internal.createNodeInfoAndSigned
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.After import org.junit.After
import org.junit.Rule import org.junit.Rule
import org.junit.Test import org.junit.Test
import rx.schedulers.TestScheduler import rx.schedulers.TestScheduler
import java.nio.file.Files
import java.nio.file.Path
import java.time.Instant import java.time.Instant
import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals import kotlin.test.assertEquals
fun Path.delete(): Unit = Files.delete(this)
class NetworkMapUpdaterTest { class NetworkMapUpdaterTest {
@Rule @Rule
@JvmField @JvmField
@ -54,6 +60,7 @@ class NetworkMapUpdaterTest {
private val networkMapCa: CertificateAndKeyPair = createDevNetworkMapCa() private val networkMapCa: CertificateAndKeyPair = createDevNetworkMapCa()
private val cacheExpiryMs = 100 private val cacheExpiryMs = 100
private val networkMapClient = createMockNetworkMapClient() private val networkMapClient = createMockNetworkMapClient()
private val nodeInfoDir = baseDir / NODE_INFO_DIRECTORY
private val scheduler = TestScheduler() private val scheduler = TestScheduler()
private val networkParametersHash = SecureHash.randomSHA256() private val networkParametersHash = SecureHash.randomSHA256()
private val fileWatcher = NodeInfoWatcher(baseDir, scheduler) private val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
@ -88,7 +95,7 @@ class NetworkMapUpdaterTest {
verify(networkMapCache, times(1)).addNode(nodeInfo1) verify(networkMapCache, times(1)).addNode(nodeInfo1)
verify(networkMapCache, times(1)).addNode(nodeInfo2) verify(networkMapCache, times(1)).addNode(nodeInfo2)
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, fileNodeInfoAndSigned) NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned)
networkMapClient.publish(signedNodeInfo3) networkMapClient.publish(signedNodeInfo3)
networkMapClient.publish(signedNodeInfo4) networkMapClient.publish(signedNodeInfo4)
@ -112,7 +119,7 @@ class NetworkMapUpdaterTest {
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file") val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
// Add all nodes. // Add all nodes.
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, fileNodeInfoAndSigned) NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned)
networkMapClient.publish(signedNodeInfo1) networkMapClient.publish(signedNodeInfo1)
networkMapClient.publish(signedNodeInfo2) networkMapClient.publish(signedNodeInfo2)
networkMapClient.publish(signedNodeInfo3) networkMapClient.publish(signedNodeInfo3)
@ -151,7 +158,7 @@ class NetworkMapUpdaterTest {
updater.subscribeToNetworkMap() updater.subscribeToNetworkMap()
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, fileNodeInfoAndSigned) NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned)
scheduler.advanceTimeBy(10, TimeUnit.SECONDS) scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
verify(networkMapCache, times(1)).addNode(any()) verify(networkMapCache, times(1)).addNode(any())
@ -230,17 +237,72 @@ class NetworkMapUpdaterTest {
} }
} }
@Test
fun `remove node from filesystem deletes it from network map cache`() {
val fileNodeInfoAndSigned1 = createNodeInfoAndSigned("Info from file 1")
val fileNodeInfoAndSigned2 = createNodeInfoAndSigned("Info from file 2")
updater.subscribeToNetworkMap()
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned1)
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned2)
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
verify(networkMapCache, times(2)).addNode(any())
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned1.nodeInfo)
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned2.nodeInfo)
assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(fileNodeInfoAndSigned1.signed.raw.hash, fileNodeInfoAndSigned2.signed.raw.hash)
// Remove one of the nodes
val fileName1 = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${fileNodeInfoAndSigned1.nodeInfo.legalIdentities[0].name.serialize().hash}"
(nodeInfoDir / fileName1).delete()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
verify(networkMapCache, times(1)).removeNode(any())
verify(networkMapCache, times(1)).removeNode(fileNodeInfoAndSigned1.nodeInfo)
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned2.signed.raw.hash)
}
@Test
fun `remove node info file, but node in network map server`() {
val nodeInfoBuilder = TestNodeInfoBuilder()
val (_, key) = nodeInfoBuilder.addIdentity(CordaX500Name("Info", "London", "GB"))
val (serverNodeInfo, serverSignedNodeInfo) = nodeInfoBuilder.buildWithSigned(1, 1)
// Construct node for exactly same identity, but different serial. This one will go to additional-node-infos only.
val localNodeInfo = serverNodeInfo.copy(serial = 17)
val localSignedNodeInfo = NodeInfoAndSigned(localNodeInfo) { _, serialised ->
key.sign(serialised.bytes)
}
// The one with higher serial goes to additional-node-infos.
NodeInfoWatcher.saveToFile(nodeInfoDir, localSignedNodeInfo)
// Publish to network map the one with lower serial.
networkMapClient.publish(serverSignedNodeInfo)
updater.subscribeToNetworkMap()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
verify(networkMapCache, times(1)).addNode(localNodeInfo)
Thread.sleep(2L * cacheExpiryMs)
// Node from file has higher serial than the one from NetworkMapServer
assertThat(networkMapCache.allNodeHashes).containsOnly(localSignedNodeInfo.signed.raw.hash)
val fileName = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${localNodeInfo.legalIdentities[0].name.serialize().hash}"
(nodeInfoDir / fileName).delete()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
verify(networkMapCache, times(1)).removeNode(any())
verify(networkMapCache).removeNode(localNodeInfo)
Thread.sleep(2L * cacheExpiryMs)
// Instead of node from file we should have now the one from NetworkMapServer
assertThat(networkMapCache.allNodeHashes).containsOnly(serverSignedNodeInfo.raw.hash)
}
private fun createMockNetworkMapCache(): NetworkMapCacheInternal { private fun createMockNetworkMapCache(): NetworkMapCacheInternal {
return mock { return mock {
val data = ConcurrentHashMap<Party, NodeInfo>() val data = ConcurrentHashMap<Party, NodeInfo>()
on { addNode(any()) }.then { on { addNode(any()) }.then {
val nodeInfo = it.arguments[0] as NodeInfo val nodeInfo = it.arguments[0] as NodeInfo
data.put(nodeInfo.legalIdentities[0], nodeInfo) val party = nodeInfo.legalIdentities[0]
data.compute(party) { _, current ->
if (current == null || current.serial < nodeInfo.serial) nodeInfo else current
}
} }
on { removeNode(any()) }.then { data.remove((it.arguments[0] as NodeInfo).legalIdentities[0]) } on { removeNode(any()) }.then { data.remove((it.arguments[0] as NodeInfo).legalIdentities[0]) }
on { getNodeByLegalIdentity(any()) }.then { data[it.arguments[0]] } on { getNodeByLegalIdentity(any()) }.then { data[it.arguments[0]] }
on { allNodeHashes }.then { data.values.map { it.serialize().hash } } on { allNodeHashes }.then { data.values.map { it.serialize().hash } }
on { getNodeByHash(any()) }.then { mock -> data.values.single { it.serialize().hash == mock.arguments[0] } } on { getNodeByHash(any()) }.then { mock -> data.values.singleOrNull { it.serialize().hash == mock.arguments[0] } }
} }
} }