mirror of
https://github.com/corda/corda.git
synced 2024-12-21 22:07:55 +00:00
CORDA-1093 Deleting node info from directory (#3115)
* CORDA-1093 Deleting node info from directory Deleting NodeInfo from additional-node-infos directory should remove it from cache.
This commit is contained in:
parent
0c38a63486
commit
b06738b371
@ -6,7 +6,6 @@ import net.corda.cordform.CordformNode
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.size
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.KeyManagementService
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
|
||||
@ -37,7 +36,7 @@ class NodeInfoWatcherTest {
|
||||
val tempFolder = TemporaryFolder()
|
||||
|
||||
private val scheduler = TestScheduler()
|
||||
private val testSubscriber = TestSubscriber<NodeInfo>()
|
||||
private val testSubscriber = TestSubscriber<NodeInfoUpdate>()
|
||||
|
||||
private lateinit var nodeInfoAndSigned: NodeInfoAndSigned
|
||||
private lateinit var nodeInfoPath: Path
|
||||
@ -101,7 +100,7 @@ class NodeInfoWatcherTest {
|
||||
try {
|
||||
val readNodes = testSubscriber.onNextEvents.distinct()
|
||||
assertEquals(1, readNodes.size)
|
||||
assertEquals(nodeInfoAndSigned.nodeInfo, readNodes.first())
|
||||
assertEquals(nodeInfoAndSigned.nodeInfo, (readNodes.first() as? NodeInfoUpdate.Add)?.nodeInfo)
|
||||
} finally {
|
||||
subscription.unsubscribe()
|
||||
}
|
||||
@ -126,7 +125,7 @@ class NodeInfoWatcherTest {
|
||||
testSubscriber.awaitValueCount(1, 5, TimeUnit.SECONDS)
|
||||
// The same folder can be reported more than once, so take unique values.
|
||||
val readNodes = testSubscriber.onNextEvents.distinct()
|
||||
assertEquals(nodeInfoAndSigned.nodeInfo, readNodes.first())
|
||||
assertEquals(nodeInfoAndSigned.nodeInfo, (readNodes.first() as? NodeInfoUpdate.Add)?.nodeInfo)
|
||||
} finally {
|
||||
subscription.unsubscribe()
|
||||
}
|
||||
|
@ -63,7 +63,17 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
fun subscribeToNetworkMap() {
|
||||
require(fileWatcherSubscription == null) { "Should not call this method twice." }
|
||||
// Subscribe to file based networkMap
|
||||
fileWatcherSubscription = fileWatcher.nodeInfoUpdates().subscribe(networkMapCache::addNode)
|
||||
fileWatcherSubscription = fileWatcher.nodeInfoUpdates().subscribe {
|
||||
when (it) {
|
||||
is NodeInfoUpdate.Add -> {
|
||||
networkMapCache.addNode(it.nodeInfo)
|
||||
}
|
||||
is NodeInfoUpdate.Remove -> {
|
||||
val nodeInfo = networkMapCache.getNodeByHash(it.hash)
|
||||
nodeInfo?.let { networkMapCache.removeNode(it) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (networkMapClient == null) return
|
||||
|
||||
|
@ -26,13 +26,18 @@ import java.time.Duration
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.streams.toList
|
||||
|
||||
sealed class NodeInfoUpdate {
|
||||
data class Add(val nodeInfo: NodeInfo) : NodeInfoUpdate()
|
||||
data class Remove(val hash: SecureHash) : NodeInfoUpdate()
|
||||
}
|
||||
|
||||
/**
|
||||
* Class containing the logic to
|
||||
* - Serialize and de-serialize a [NodeInfo] to disk and reading it back.
|
||||
* - Poll a directory for new serialized [NodeInfo]
|
||||
*
|
||||
* @param nodePath the base path of a node.
|
||||
* @param pollInterval how often to poll the filesystem in milliseconds. Must be longer then 5 seconds.
|
||||
* @param pollInterval how often to poll the filesystem in milliseconds. Must be longer than 5 seconds.
|
||||
* @param scheduler a [Scheduler] for the rx [Observable] returned by [nodeInfoUpdates], this is mainly useful for
|
||||
* testing. It defaults to the io scheduler which is the appropriate value for production uses.
|
||||
*/
|
||||
@ -58,10 +63,10 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
}
|
||||
}
|
||||
|
||||
internal data class NodeInfoFromFile(val nodeInfohash: SecureHash, val lastModified: FileTime)
|
||||
private val nodeInfosDir = nodePath / CordformNode.NODE_INFO_DIRECTORY
|
||||
private val nodeInfoFiles = HashMap<Path, FileTime>()
|
||||
private val _processedNodeInfoHashes = HashSet<SecureHash>()
|
||||
val processedNodeInfoHashes: Set<SecureHash> get() = _processedNodeInfoHashes
|
||||
private val nodeInfoFilesMap = HashMap<Path, NodeInfoFromFile>()
|
||||
val processedNodeInfoHashes: Set<SecureHash> get() = nodeInfoFilesMap.values.map { it.nodeInfohash }.toSet()
|
||||
|
||||
init {
|
||||
require(pollInterval >= 5.seconds) { "Poll interval must be 5 seconds or longer." }
|
||||
@ -75,33 +80,31 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
* We simply list the directory content every 5 seconds, the Java implementation of WatchService has been proven to
|
||||
* be unreliable on MacOs and given the fairly simple use case we have, this simple implementation should do.
|
||||
*
|
||||
* @return an [Observable] returning [NodeInfo]s, at most one [NodeInfo] is returned for each processed file.
|
||||
* @return an [Observable] returning [NodeInfoUpdate]s, at most one [NodeInfo] is returned for each processed file.
|
||||
*/
|
||||
fun nodeInfoUpdates(): Observable<NodeInfo> {
|
||||
fun nodeInfoUpdates(): Observable<NodeInfoUpdate> {
|
||||
return Observable.interval(pollInterval.toMillis(), TimeUnit.MILLISECONDS, scheduler)
|
||||
.flatMapIterable { loadFromDirectory() }
|
||||
}
|
||||
|
||||
// TODO This method doesn't belong in this class
|
||||
fun saveToFile(nodeInfoAndSigned: NodeInfoAndSigned) {
|
||||
return Companion.saveToFile(nodePath, nodeInfoAndSigned)
|
||||
}
|
||||
|
||||
private fun loadFromDirectory(): List<NodeInfo> {
|
||||
private fun loadFromDirectory(): List<NodeInfoUpdate> {
|
||||
val processedPaths = HashSet<Path>()
|
||||
val result = nodeInfosDir.list { paths ->
|
||||
paths
|
||||
.filter { it.isRegularFile() }
|
||||
.filter { file ->
|
||||
val lastModifiedTime = file.lastModifiedTime()
|
||||
val previousLastModifiedTime = nodeInfoFiles[file]
|
||||
val previousLastModifiedTime = nodeInfoFilesMap[file]?.lastModified
|
||||
val newOrChangedFile = previousLastModifiedTime == null || lastModifiedTime > previousLastModifiedTime
|
||||
nodeInfoFiles[file] = lastModifiedTime
|
||||
processedPaths.add(file)
|
||||
newOrChangedFile
|
||||
}
|
||||
.mapNotNull { file ->
|
||||
logger.debug { "Reading SignedNodeInfo from $file" }
|
||||
try {
|
||||
NodeInfoAndSigned(file.readObject())
|
||||
val nodeInfoSigned = NodeInfoAndSigned(file.readObject())
|
||||
nodeInfoFilesMap[file] = NodeInfoFromFile(nodeInfoSigned.signed.raw.hash, file.lastModifiedTime())
|
||||
nodeInfoSigned
|
||||
} catch (e: Exception) {
|
||||
logger.warn("Unable to read SignedNodeInfo from $file", e)
|
||||
null
|
||||
@ -109,10 +112,13 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
}
|
||||
.toList()
|
||||
}
|
||||
|
||||
val removedFiles = nodeInfoFilesMap.keys - processedPaths
|
||||
val removedHashes = removedFiles.map { file ->
|
||||
NodeInfoUpdate.Remove(nodeInfoFilesMap.remove(file)!!.nodeInfohash)
|
||||
}
|
||||
logger.debug { "Read ${result.size} NodeInfo files from $nodeInfosDir" }
|
||||
_processedNodeInfoHashes += result.map { it.signed.raw.hash }
|
||||
return result.map { it.nodeInfo }
|
||||
logger.debug { "Number of removed NodeInfo files ${removedHashes.size}" }
|
||||
return result.map { NodeInfoUpdate.Add(it.nodeInfo) } + removedHashes
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -251,8 +251,8 @@ open class PersistentNetworkMapCache(
|
||||
}
|
||||
|
||||
private fun removeInfoDB(session: Session, nodeInfo: NodeInfo) {
|
||||
val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey).single()
|
||||
session.remove(info)
|
||||
val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey).singleOrNull()
|
||||
info?.let { session.remove(it) }
|
||||
// invalidate cache last - this way, we might serve up the wrong info for a short time, but it will get refreshed
|
||||
// on the next load
|
||||
invalidateCaches(nodeInfo)
|
||||
|
@ -5,6 +5,7 @@ import com.google.common.jimfs.Jimfs
|
||||
import com.nhaarman.mockito_kotlin.*
|
||||
import net.corda.cordform.CordformNode.NODE_INFO_DIRECTORY
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.sign
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
@ -19,6 +20,7 @@ import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.internal.DEV_ROOT_CA
|
||||
import net.corda.testing.internal.TestNodeInfoBuilder
|
||||
import net.corda.testing.internal.createNodeInfoAndSigned
|
||||
import net.corda.testing.node.internal.network.NetworkMapServer
|
||||
import org.assertj.core.api.Assertions
|
||||
@ -46,6 +48,7 @@ class NetworkMapUpdaterTest {
|
||||
private val privateNetUUID = UUID.randomUUID()
|
||||
private val fs = Jimfs.newFileSystem(unix())
|
||||
private val baseDir = fs.getPath("/node")
|
||||
private val nodeInfoDir = baseDir / NODE_INFO_DIRECTORY
|
||||
private val scheduler = TestScheduler()
|
||||
private val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
private val networkMapCache = createMockNetworkMapCache()
|
||||
@ -90,7 +93,7 @@ class NetworkMapUpdaterTest {
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo1)
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo2)
|
||||
|
||||
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, fileNodeInfoAndSigned)
|
||||
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned)
|
||||
networkMapClient.publish(signedNodeInfo3)
|
||||
networkMapClient.publish(signedNodeInfo4)
|
||||
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
|
||||
@ -112,7 +115,7 @@ class NetworkMapUpdaterTest {
|
||||
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
|
||||
|
||||
// Add all nodes.
|
||||
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, fileNodeInfoAndSigned)
|
||||
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned)
|
||||
networkMapClient.publish(signedNodeInfo1)
|
||||
networkMapClient.publish(signedNodeInfo2)
|
||||
networkMapClient.publish(signedNodeInfo3)
|
||||
@ -152,7 +155,7 @@ class NetworkMapUpdaterTest {
|
||||
|
||||
updater.subscribeToNetworkMap()
|
||||
|
||||
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, fileNodeInfoAndSigned)
|
||||
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned)
|
||||
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
|
||||
|
||||
verify(networkMapCache, times(1)).addNode(any())
|
||||
@ -215,17 +218,72 @@ class NetworkMapUpdaterTest {
|
||||
assertEquals(aliceInfo, networkMapClient.getNodeInfo(aliceHash))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `remove node from filesystem deletes it from network map cache`() {
|
||||
val fileNodeInfoAndSigned1 = createNodeInfoAndSigned("Info from file 1")
|
||||
val fileNodeInfoAndSigned2 = createNodeInfoAndSigned("Info from file 2")
|
||||
updater.subscribeToNetworkMap()
|
||||
|
||||
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned1)
|
||||
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned2)
|
||||
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
|
||||
verify(networkMapCache, times(2)).addNode(any())
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned1.nodeInfo)
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned2.nodeInfo)
|
||||
assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(fileNodeInfoAndSigned1.signed.raw.hash, fileNodeInfoAndSigned2.signed.raw.hash)
|
||||
// Remove one of the nodes
|
||||
val fileName1 = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${fileNodeInfoAndSigned1.nodeInfo.legalIdentities[0].name.serialize().hash}"
|
||||
(nodeInfoDir / fileName1).delete()
|
||||
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
|
||||
verify(networkMapCache, times(1)).removeNode(any())
|
||||
verify(networkMapCache, times(1)).removeNode(fileNodeInfoAndSigned1.nodeInfo)
|
||||
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned2.signed.raw.hash)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `remove node info file, but node in network map server`() {
|
||||
val nodeInfoBuilder = TestNodeInfoBuilder()
|
||||
val (_, key) = nodeInfoBuilder.addLegalIdentity(CordaX500Name("Info", "London", "GB"))
|
||||
val (serverNodeInfo, serverSignedNodeInfo) = nodeInfoBuilder.buildWithSigned(1, 1)
|
||||
// Construct node for exactly same identity, but different serial. This one will go to additional-node-infos only.
|
||||
val localNodeInfo = serverNodeInfo.copy(serial = 17)
|
||||
val localSignedNodeInfo = NodeInfoAndSigned(localNodeInfo) { _, serialised ->
|
||||
key.sign(serialised.bytes)
|
||||
}
|
||||
// The one with higher serial goes to additional-node-infos.
|
||||
NodeInfoWatcher.saveToFile(nodeInfoDir, localSignedNodeInfo)
|
||||
// Publish to network map the one with lower serial.
|
||||
networkMapClient.publish(serverSignedNodeInfo)
|
||||
updater.subscribeToNetworkMap()
|
||||
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
|
||||
verify(networkMapCache, times(1)).addNode(localNodeInfo)
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
// Node from file has higher serial than the one from NetworkMapServer
|
||||
assertThat(networkMapCache.allNodeHashes).containsOnly(localSignedNodeInfo.signed.raw.hash)
|
||||
val fileName = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${localNodeInfo.legalIdentities[0].name.serialize().hash}"
|
||||
(nodeInfoDir / fileName).delete()
|
||||
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
|
||||
verify(networkMapCache, times(1)).removeNode(any())
|
||||
verify(networkMapCache).removeNode(localNodeInfo)
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
// Instead of node from file we should have now the one from NetworkMapServer
|
||||
assertThat(networkMapCache.allNodeHashes).containsOnly(serverSignedNodeInfo.raw.hash)
|
||||
}
|
||||
|
||||
private fun createMockNetworkMapCache(): NetworkMapCacheInternal {
|
||||
return mock {
|
||||
val data = ConcurrentHashMap<Party, NodeInfo>()
|
||||
on { addNode(any()) }.then {
|
||||
val nodeInfo = it.arguments[0] as NodeInfo
|
||||
data.put(nodeInfo.legalIdentities[0], nodeInfo)
|
||||
val party = nodeInfo.legalIdentities[0]
|
||||
data.compute(party) { _, current ->
|
||||
if (current == null || current.serial < nodeInfo.serial) nodeInfo else current
|
||||
}
|
||||
}
|
||||
on { removeNode(any()) }.then { data.remove((it.arguments[0] as NodeInfo).legalIdentities[0]) }
|
||||
on { getNodeByLegalIdentity(any()) }.then { data[it.arguments[0]] }
|
||||
on { allNodeHashes }.then { data.values.map { it.serialize().hash } }
|
||||
on { getNodeByHash(any()) }.then { mock -> data.values.single { it.serialize().hash == mock.arguments[0] } }
|
||||
on { getNodeByHash(any()) }.then { mock -> data.values.singleOrNull { it.serialize().hash == mock.arguments[0] } }
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user