CORDA-1958: The node ready future completes on the first poll of the network map sources, even if they return empty. (#3904)

This is to allow the first node in a test environment to fully start up.
This commit is contained in:
Shams Asari 2018-09-10 10:43:40 +01:00 committed by GitHub
parent 83e66d542d
commit d56a80d159
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 104 additions and 159 deletions

View File

@ -88,7 +88,7 @@ class ArtemisMessagingTest {
}
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
networkMapCache = PersistentNetworkMapCache(database, rigorousMock(), ALICE_NAME).apply { start(emptyList()) }
networkMapCache = PersistentNetworkMapCache(database, rigorousMock()).apply { start(emptyList()) }
}
@After

View File

@ -1,7 +1,6 @@
package net.corda.node.services.network
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.configureDatabase
import net.corda.node.internal.schemas.NodeInfoSchemaV1
@ -20,7 +19,6 @@ class PersistentNetworkMapCacheTest {
private companion object {
val ALICE = TestIdentity(ALICE_NAME, 70)
val BOB = TestIdentity(BOB_NAME, 80)
val CHARLIE = TestIdentity(CHARLIE_NAME, 90)
}
@Rule
@ -29,7 +27,7 @@ class PersistentNetworkMapCacheTest {
private var portCounter = 1000
private val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
private val charlieNetMapCache = PersistentNetworkMapCache(database, InMemoryIdentityService(trustRoot = DEV_ROOT_CA.certificate), CHARLIE.name)
private val charlieNetMapCache = PersistentNetworkMapCache(database, InMemoryIdentityService(trustRoot = DEV_ROOT_CA.certificate))
@After
fun cleanUp() {
@ -40,7 +38,6 @@ class PersistentNetworkMapCacheTest {
fun addNode() {
val alice = createNodeInfo(listOf(ALICE))
charlieNetMapCache.addNode(alice)
assertThat(charlieNetMapCache.nodeReady).isDone()
val fromDb = database.transaction {
session.createQuery(
"from ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name}",
@ -50,32 +47,6 @@ class PersistentNetworkMapCacheTest {
assertThat(fromDb).containsOnly(alice)
}
@Test
fun `adding the node's own node-info doesn't complete the nodeReady future`() {
val charlie = createNodeInfo(listOf(CHARLIE))
charlieNetMapCache.addNode(charlie)
assertThat(charlieNetMapCache.nodeReady).isNotDone()
assertThat(charlieNetMapCache.getNodeByLegalName(CHARLIE.name)).isEqualTo(charlie)
}
@Test
fun `starting with just the node's own node-info in the db`() {
val charlie = createNodeInfo(listOf(CHARLIE))
saveNodeInfoIntoDb(charlie)
assertThat(charlieNetMapCache.allNodes).containsOnly(charlie)
charlieNetMapCache.start(emptyList())
assertThat(charlieNetMapCache.nodeReady).isNotDone()
}
@Test
fun `starting with another node-info in the db`() {
val alice = createNodeInfo(listOf(ALICE))
saveNodeInfoIntoDb(alice)
assertThat(charlieNetMapCache.allNodes).containsOnly(alice)
charlieNetMapCache.start(emptyList())
assertThat(charlieNetMapCache.nodeReady).isDone()
}
@Test
fun `unknown legal name`() {
charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE)))
@ -137,19 +108,4 @@ class PersistentNetworkMapCacheTest {
serial = 1
)
}
private fun saveNodeInfoIntoDb(nodeInfo: NodeInfo) {
database.transaction {
session.save(NodeInfoSchemaV1.PersistentNodeInfo(
id = 0,
hash = nodeInfo.serialize().hash.toString(),
addresses = nodeInfo.addresses.map { NodeInfoSchemaV1.DBHostAndPort.fromHostAndPort(it) },
legalIdentitiesAndCerts = nodeInfo.legalIdentitiesAndCerts.mapIndexed { idx, elem ->
NodeInfoSchemaV1.DBPartyAndCertificate(elem, isMain = idx == 0)
},
platformVersion = nodeInfo.platformVersion,
serial = nodeInfo.serial
))
}
}
}

View File

@ -146,7 +146,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
// TODO Break cyclic dependency
identityService.database = database
}
val networkMapCache = PersistentNetworkMapCache(database, identityService, configuration.myLegalName).tokenize()
val networkMapCache = PersistentNetworkMapCache(database, identityService).tokenize()
val checkpointStorage = DBCheckpointStorage()
@Suppress("LeakingThis")
val transactionStorage = makeTransactionStorage(configuration.transactionCacheSizeBytes).tokenize()
@ -217,7 +217,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
}
/** Set to non-null once [start] has been successfully called. */
open val started get() = _started
open val started: S? get() = _started
@Volatile
private var _started: S? = null
@ -302,13 +302,13 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
"Node's platform version is lower than network's required minimumPlatformVersion"
}
servicesForResolution.start(netParams)
networkMapCache.start(netParams.notaries)
startDatabase()
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
identityService.start(trustRoot, listOf(identity.certificate, nodeCa))
val (keyPairs, nodeInfoAndSigned, myNotaryIdentity) = database.transaction {
networkMapCache.start(netParams.notaries)
updateNodeInfo(identity, identityKeyPair, publish = true)
}
@ -456,7 +456,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
} else {
1.days
}
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory()))
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater"))
executor.submit(object : Runnable {
override fun run() {
val republishInterval = try {

View File

@ -6,6 +6,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.node.NodeInfo
@ -28,6 +29,8 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
import java.security.PublicKey
interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBase {
override val nodeReady: OpenFuture<Void?>
val allNodeHashes: List<SecureHash>
fun getNodeByHash(nodeHash: SecureHash): NodeInfo?

View File

@ -24,13 +24,12 @@ import java.nio.file.StandardCopyOption
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.TimeUnit
import kotlin.system.exitProcess
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val fileWatcher: NodeInfoWatcher,
private val nodeInfoWatcher: NodeInfoWatcher,
private val networkMapClient: NetworkMapClient?,
private val baseDirectory: Path,
private val extraNetworkMapKeys: List<UUID>
@ -40,8 +39,10 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val defaultRetryInterval = 1.minutes
}
private val parametersUpdatesTrack: PublishSubject<ParametersUpdateInfo> = PublishSubject.create<ParametersUpdateInfo>()
private val executor = ScheduledThreadPoolExecutor(1, NamedThreadFactory("Network Map Updater Thread", Executors.defaultThreadFactory()))
private val parametersUpdatesTrack = PublishSubject.create<ParametersUpdateInfo>()
private val networkMapPoller = ScheduledThreadPoolExecutor(1, NamedThreadFactory("Network Map Updater Thread")).apply {
executeExistingDelayedTasksAfterShutdownPolicy = false
}
private var newNetworkParameters: Pair<ParametersUpdate, SignedNetworkParameters>? = null
private var fileWatcherSubscription: Subscription? = null
private lateinit var trustRoot: X509Certificate
@ -50,7 +51,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
override fun close() {
fileWatcherSubscription?.unsubscribe()
MoreExecutors.shutdownAndAwaitTermination(executor, 50, TimeUnit.SECONDS)
MoreExecutors.shutdownAndAwaitTermination(networkMapPoller, 50, TimeUnit.SECONDS)
}
fun start(trustRoot: X509Certificate, currentParametersHash: SecureHash, ourNodeInfoHash: SecureHash) {
@ -58,26 +59,38 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
this.trustRoot = trustRoot
this.currentParametersHash = currentParametersHash
this.ourNodeInfoHash = ourNodeInfoHash
// Subscribe to file based networkMap
fileWatcherSubscription = fileWatcher.nodeInfoUpdates().subscribe {
when (it) {
is NodeInfoUpdate.Add -> {
networkMapCache.addNode(it.nodeInfo)
}
is NodeInfoUpdate.Remove -> {
if (it.hash != ourNodeInfoHash) {
val nodeInfo = networkMapCache.getNodeByHash(it.hash)
nodeInfo?.let { networkMapCache.removeNode(it) }
watchForNodeInfoFiles()
if (networkMapClient != null) {
watchHttpNetworkMap()
}
}
private fun watchForNodeInfoFiles() {
nodeInfoWatcher
.nodeInfoUpdates()
.subscribe {
for (update in it) {
when (update) {
is NodeInfoUpdate.Add -> networkMapCache.addNode(update.nodeInfo)
is NodeInfoUpdate.Remove -> {
if (update.hash != ourNodeInfoHash) {
val nodeInfo = networkMapCache.getNodeByHash(update.hash)
nodeInfo?.let(networkMapCache::removeNode)
}
}
}
}
if (networkMapClient == null) {
// Mark the network map cache as ready on a successful poll of the node infos dir if not using
// the HTTP network map even if there aren't any node infos
networkMapCache.nodeReady.set(null)
}
}
}
}
}
if (networkMapClient == null) return
// Subscribe to remote network map if configured.
executor.executeExistingDelayedTasksAfterShutdownPolicy = false
executor.submit(object : Runnable {
private fun watchHttpNetworkMap() {
// The check may be expensive, so always run it in the background even the first time.
networkMapPoller.submit(object : Runnable {
override fun run() {
val nextScheduleDelay = try {
updateNetworkMapCache()
@ -86,9 +99,9 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
defaultRetryInterval
}
// Schedule the next update.
executor.schedule(this, nextScheduleDelay.toMillis(), TimeUnit.MILLISECONDS)
networkMapPoller.schedule(this, nextScheduleDelay.toMillis(), TimeUnit.MILLISECONDS)
}
}) // The check may be expensive, so always run it in the background even the first time.
})
}
fun trackParametersUpdate(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
@ -99,9 +112,13 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
}
fun updateNetworkMapCache(): Duration {
if (networkMapClient == null) throw CordaRuntimeException("Network map cache can be updated only if network map/compatibility zone URL is specified")
if (networkMapClient == null) {
throw CordaRuntimeException("Network map cache can be updated only if network map/compatibility zone URL is specified")
}
val (globalNetworkMap, cacheTimeout) = networkMapClient.getNetworkMap()
globalNetworkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) }
val additionalHashes = extraNetworkMapKeys.flatMap {
try {
networkMapClient.getNetworkMap(it).payload.nodeInfoHashes
@ -111,6 +128,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
emptyList<SecureHash>()
}
}
val allHashesFromNetworkMap = (globalNetworkMap.nodeInfoHashes + additionalHashes).toSet()
if (currentParametersHash != globalNetworkMap.networkParameterHash) {
@ -120,12 +138,9 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
val currentNodeHashes = networkMapCache.allNodeHashes
// Remove node info from network map.
(currentNodeHashes - allHashesFromNetworkMap - fileWatcher.processedNodeInfoHashes)
.mapNotNull {
if (it != ourNodeInfoHash) {
networkMapCache.getNodeByHash(it)
} else null
}.forEach(networkMapCache::removeNode)
(currentNodeHashes - allHashesFromNetworkMap - nodeInfoWatcher.processedNodeInfoHashes)
.mapNotNull { if (it != ourNodeInfoHash) networkMapCache.getNodeByHash(it) else null }
.forEach(networkMapCache::removeNode)
(allHashesFromNetworkMap - currentNodeHashes).mapNotNull {
// Download new node info from network map
@ -141,6 +156,10 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
networkMapCache.addNode(it)
}
// Mark the network map cache as ready on a successful poll of the HTTP network map, even on the odd chance that
// it's empty
networkMapCache.nodeReady.set(null)
return cacheTimeout
}

View File

@ -3,23 +3,16 @@ package net.corda.node.services.network
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.*
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal._contextSerializationEnv
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.seconds
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
import net.corda.nodeapi.internal.NODE_INFO_DIRECTORY
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
import net.corda.serialization.internal.AMQP_P2P_CONTEXT
import net.corda.serialization.internal.SerializationFactoryImpl
import rx.Observable
import rx.Scheduler
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
import java.nio.file.attribute.FileTime
import java.time.Duration
@ -63,7 +56,7 @@ class NodeInfoWatcher(private val nodePath: Path,
}
}
internal data class NodeInfoFromFile(val nodeInfohash: SecureHash, val lastModified: FileTime)
private data class NodeInfoFromFile(val nodeInfohash: SecureHash, val lastModified: FileTime)
private val nodeInfosDir = nodePath / NODE_INFO_DIRECTORY
private val nodeInfoFilesMap = HashMap<Path, NodeInfoFromFile>()
@ -75,20 +68,16 @@ class NodeInfoWatcher(private val nodePath: Path,
}
/**
* Read all the files contained in [nodePath] / [NODE_INFO_DIRECTORY] and keep watching
* the folder for further updates.
* Read all the files contained in [nodePath] / [NODE_INFO_DIRECTORY] and keep watching the folder for further updates.
*
* We simply list the directory content every 5 seconds, the Java implementation of WatchService has been proven to
* be unreliable on MacOs and given the fairly simple use case we have, this simple implementation should do.
*
* @return an [Observable] returning [NodeInfoUpdate]s, at most one [NodeInfo] is returned for each processed file.
* @return an [Observable] that emits lists of [NodeInfoUpdate]s. Each emitted list is a poll event of the folder and
* may be empty if no changes were detected.
*/
fun nodeInfoUpdates(): Observable<NodeInfoUpdate> {
return Observable.interval(0, pollInterval.toMillis(), TimeUnit.MILLISECONDS, scheduler)
.flatMapIterable { loadFromDirectory() }
fun nodeInfoUpdates(): Observable<List<NodeInfoUpdate>> {
return Observable.interval(0, pollInterval.toMillis(), TimeUnit.MILLISECONDS, scheduler).map { pollDirectory() }
}
private fun loadFromDirectory(): List<NodeInfoUpdate> {
private fun pollDirectory(): List<NodeInfoUpdate> {
val processedPaths = HashSet<Path>()
val result = nodeInfosDir.list { paths ->
paths
@ -122,14 +111,3 @@ class NodeInfoWatcher(private val nodePath: Path,
return result.map { NodeInfoUpdate.Add(it.nodeInfo) } + removedHashes
}
}
// TODO Remove this once we have a tool that can read AMQP serialised files
fun main(args: Array<String>) {
_contextSerializationEnv.set(SerializationEnvironmentImpl(
SerializationFactoryImpl().apply {
registerScheme(AMQPServerSerializationScheme())
},
AMQP_P2P_CONTEXT)
)
println(Paths.get(args[0]).readObject<SignedNodeInfo>().verified())
}

View File

@ -1,6 +1,5 @@
package net.corda.node.services.network
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.AbstractParty
@ -8,6 +7,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo
@ -37,8 +37,7 @@ import javax.annotation.concurrent.ThreadSafe
/** Database-based network map cache. */
@ThreadSafe
open class PersistentNetworkMapCache(private val database: CordaPersistence,
private val identityService: IdentityService,
private val myLegalName: CordaX500Name) : NetworkMapCacheInternal, SingletonSerializeAsToken() {
private val identityService: IdentityService) : NetworkMapCacheInternal, SingletonSerializeAsToken() {
companion object {
private val logger = contextLogger()
}
@ -48,10 +47,8 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence,
override val changed: Observable<MapChange> = _changed.wrapWithDatabaseTransaction()
private val changePublisher: rx.Observer<MapChange> get() = _changed.bufferUntilDatabaseCommit()
// TODO revisit the logic under which nodeReady and loadDBSuccess are set.
// with the NetworkMapService redesign their meaning is not too well defined.
private val _nodeReady = openFuture<Void?>()
override val nodeReady: CordaFuture<Void?> = _nodeReady
override val nodeReady: OpenFuture<Void?> = openFuture()
private lateinit var notaries: List<NotaryInfo>
override val notaryIdentities: List<Party> get() = notaries.map { it.identity }
@ -71,15 +68,6 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence,
fun start(notaries: List<NotaryInfo>) {
this.notaries = notaries
val otherNodeInfoCount = database.transaction {
session.createQuery(
"select count(*) from ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n join n.legalIdentitiesAndCerts i where i.name != :myLegalName")
.setParameter("myLegalName", myLegalName.toString())
.singleResult as Long
}
if (otherNodeInfoCount > 0) {
_nodeReady.set(null)
}
}
override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? {
@ -193,9 +181,6 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence,
logger.info("Previous node was identical to incoming one - doing nothing")
}
}
if (node.legalIdentities[0].name != myLegalName) {
_nodeReady.set(null)
}
logger.debug { "Done adding node with info: $node" }
}

View File

@ -1,6 +1,5 @@
package net.corda.node.utilities
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicInteger
@ -10,19 +9,12 @@ import java.util.concurrent.atomic.AtomicInteger
* via an executor. It will use an underlying thread factory to create the actual thread
* and then override the thread name with the prefix and an ever increasing number
*/
class NamedThreadFactory(private val name: String, private val underlyingFactory: ThreadFactory) : ThreadFactory {
val threadNumber = AtomicInteger(1)
override fun newThread(runnable: Runnable?): Thread {
val thread = underlyingFactory.newThread(runnable)
class NamedThreadFactory(private val name: String,
private val delegate: ThreadFactory = Executors.defaultThreadFactory()) : ThreadFactory {
private val threadNumber = AtomicInteger(1)
override fun newThread(runnable: Runnable): Thread {
val thread = delegate.newThread(runnable)
thread.name = name + "-" + threadNumber.getAndIncrement()
return thread
}
}
/**
* Create a single thread executor with a NamedThreadFactory based on the default thread factory
* defined in java.util.concurrent.Executors
*/
fun newNamedSingleThreadExecutor(name: String): ExecutorService {
return Executors.newSingleThreadExecutor(NamedThreadFactory(name, Executors.defaultThreadFactory()))
}

View File

@ -9,6 +9,7 @@ import net.corda.core.crypto.sign
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
@ -27,8 +28,8 @@ import net.corda.testing.internal.DEV_ROOT_CA
import net.corda.testing.internal.TestNodeInfoBuilder
import net.corda.testing.internal.createNodeInfoAndSigned
import net.corda.testing.node.internal.network.NetworkMapServer
import org.assertj.core.api.Assertions
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
import org.junit.Before
import org.junit.Rule
@ -55,6 +56,7 @@ class NetworkMapUpdaterTest {
private val nodeInfoDir = baseDir / NODE_INFO_DIRECTORY
private val scheduler = TestScheduler()
private val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
private val nodeReadyFuture = openFuture<Void?>()
private val networkMapCache = createMockNetworkMapCache()
private lateinit var server: NetworkMapServer
private lateinit var networkMapClient: NetworkMapClient
@ -100,16 +102,18 @@ class NetworkMapUpdaterTest {
startUpdater()
networkMapClient.publish(signedNodeInfo2)
assertThat(nodeReadyFuture).isNotDone()
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapCache, times(2)).addNode(any())
verify(networkMapCache, times(1)).addNode(nodeInfo1)
verify(networkMapCache, times(1)).addNode(nodeInfo2)
assertThat(nodeReadyFuture).isDone()
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned)
networkMapClient.publish(signedNodeInfo3)
networkMapClient.publish(signedNodeInfo4)
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
advanceTime()
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
// 4 node info from network map, and 1 from file.
@ -136,7 +140,7 @@ class NetworkMapUpdaterTest {
networkMapClient.publish(signedNodeInfo4)
startUpdater()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
advanceTime()
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
@ -162,7 +166,7 @@ class NetworkMapUpdaterTest {
@Test
fun `receive node infos from directory, without a network map`() {
setUpdater()
setUpdater(netMapClient = null)
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
// Not subscribed yet.
@ -171,10 +175,12 @@ class NetworkMapUpdaterTest {
startUpdater()
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned)
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
assertThat(nodeReadyFuture).isNotDone()
advanceTime()
verify(networkMapCache, times(1)).addNode(any())
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo)
assertThat(nodeReadyFuture).isDone()
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash)
}
@ -223,7 +229,7 @@ class NetworkMapUpdaterTest {
fun `fetch nodes from private network`() {
setUpdater(extraNetworkMapKeys = listOf(privateNetUUID))
server.addNodesToPrivateNetwork(privateNetUUID, listOf(ALICE_NAME))
Assertions.assertThatThrownBy { networkMapClient.getNetworkMap(privateNetUUID).payload.nodeInfoHashes }
assertThatThrownBy { networkMapClient.getNetworkMap(privateNetUUID).payload.nodeInfoHashes }
.isInstanceOf(IOException::class.java)
.hasMessageContaining("Response Code 404")
val (aliceInfo, signedAliceInfo) = createNodeInfoAndSigned(ALICE_NAME) // Goes to private network map
@ -245,7 +251,7 @@ class NetworkMapUpdaterTest {
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned1)
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned2)
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
advanceTime()
verify(networkMapCache, times(2)).addNode(any())
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned1.nodeInfo)
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned2.nodeInfo)
@ -253,7 +259,7 @@ class NetworkMapUpdaterTest {
// Remove one of the nodes
val fileName1 = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${fileNodeInfoAndSigned1.nodeInfo.legalIdentities[0].name.serialize().hash}"
(nodeInfoDir / fileName1).delete()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
advanceTime()
verify(networkMapCache, times(1)).removeNode(any())
verify(networkMapCache, times(1)).removeNode(fileNodeInfoAndSigned1.nodeInfo)
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned2.signed.raw.hash)
@ -275,14 +281,14 @@ class NetworkMapUpdaterTest {
// Publish to network map the one with lower serial.
networkMapClient.publish(serverSignedNodeInfo)
startUpdater()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
advanceTime()
verify(networkMapCache, times(1)).addNode(localNodeInfo)
Thread.sleep(2L * cacheExpiryMs)
// Node from file has higher serial than the one from NetworkMapServer
assertThat(networkMapCache.allNodeHashes).containsOnly(localSignedNodeInfo.signed.raw.hash)
val fileName = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${localNodeInfo.legalIdentities[0].name.serialize().hash}"
(nodeInfoDir / fileName).delete()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
advanceTime()
verify(networkMapCache, times(1)).removeNode(any())
verify(networkMapCache).removeNode(localNodeInfo)
Thread.sleep(2L * cacheExpiryMs)
@ -331,7 +337,7 @@ class NetworkMapUpdaterTest {
assert(networkMapCache.allNodeHashes.size == 1)
networkMapClient.publish(signedNodeInfo2)
Thread.sleep(2L * cacheExpiryMs)
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
advanceTime()
verify(networkMapCache, times(1)).addNode(signedNodeInfo2.verified())
verify(networkMapCache, times(1)).removeNode(signedNodeInfo1.verified())
@ -341,6 +347,7 @@ class NetworkMapUpdaterTest {
private fun createMockNetworkMapCache(): NetworkMapCacheInternal {
return mock {
on { nodeReady }.thenReturn(nodeReadyFuture)
val data = ConcurrentHashMap<Party, NodeInfo>()
on { addNode(any()) }.then {
val nodeInfo = it.arguments[0] as NodeInfo
@ -359,4 +366,8 @@ class NetworkMapUpdaterTest {
private fun createNodeInfoAndSigned(org: String): NodeInfoAndSigned {
return createNodeInfoAndSigned(CordaX500Name(org, "London", "GB"))
}
}
private fun advanceTime() {
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
}
}

View File

@ -36,7 +36,7 @@ class NodeInfoWatcherTest {
val tempFolder = TemporaryFolder()
private val scheduler = TestScheduler()
private val testSubscriber = TestSubscriber<NodeInfoUpdate>()
private val testSubscriber = TestSubscriber<List<NodeInfoUpdate>>()
private lateinit var nodeInfoAndSigned: NodeInfoAndSigned
private lateinit var nodeInfoPath: Path
@ -83,7 +83,7 @@ class NodeInfoWatcherTest {
val subscription = nodeInfoWatcher.nodeInfoUpdates().subscribe(testSubscriber)
try {
advanceTime()
val readNodes = testSubscriber.onNextEvents.distinct()
val readNodes = testSubscriber.onNextEvents.distinct().flatten()
assertEquals(0, readNodes.size)
} finally {
subscription.unsubscribe()
@ -98,7 +98,7 @@ class NodeInfoWatcherTest {
advanceTime()
try {
val readNodes = testSubscriber.onNextEvents.distinct()
val readNodes = testSubscriber.onNextEvents.distinct().flatten()
assertEquals(1, readNodes.size)
assertEquals(nodeInfoAndSigned.nodeInfo, (readNodes.first() as? NodeInfoUpdate.Add)?.nodeInfo)
} finally {
@ -116,7 +116,8 @@ class NodeInfoWatcherTest {
// Ensure the watch service is started.
advanceTime()
// Check no nodeInfos are read.
assertEquals(0, testSubscriber.valueCount)
assertEquals(0, testSubscriber.onNextEvents.distinct().flatten().size)
createNodeInfoFileInPath()
advanceTime()
@ -124,7 +125,7 @@ class NodeInfoWatcherTest {
// We need the WatchService to report a change and that might not happen immediately.
testSubscriber.awaitValueCount(1, 5, TimeUnit.SECONDS)
// The same folder can be reported more than once, so take unique values.
val readNodes = testSubscriber.onNextEvents.distinct()
val readNodes = testSubscriber.onNextEvents.distinct().flatten()
assertEquals(nodeInfoAndSigned.nodeInfo, (readNodes.first() as? NodeInfoUpdate.Add)?.nodeInfo)
} finally {
subscription.unsubscribe()