ENT-2359: Complete the nodeReady future if there are already node-infos in the db on startup (#3745)

Otherwise PersistentNetworkMapCache only completed the future if a node-info was added, which may not occur after a node restart and thus prevented the SMM from registering its listener with the messaging layer.

Further, the future is only completed if the node-info is other than the node's own one.
This commit is contained in:
Shams Asari 2018-08-07 16:17:06 +01:00 committed by GitHub
parent dd4923e80e
commit 3f27e8e0be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 233 additions and 119 deletions

View File

@ -80,7 +80,7 @@ class ArtemisMessagingTest {
}
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
val persistentNetworkMapCache = PersistentNetworkMapCache(database).apply { start(emptyList()) }
val persistentNetworkMapCache = PersistentNetworkMapCache(database, ALICE_NAME).apply { start(emptyList()) }
networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, rigorousMock(), database).apply { start() }
}

View File

@ -1,118 +1,153 @@
package net.corda.node.services.network
import net.corda.core.crypto.generateKeyPair
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.NodeWithInfo
import net.corda.node.internal.configureDatabase
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.*
import net.corda.testing.node.internal.NodeBasedTest
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Before
import org.assertj.core.api.Assertions.assertThatIllegalArgumentException
import org.junit.After
import org.junit.Rule
import org.junit.Test
import kotlin.test.assertEquals
// TODO Clean up these tests, they were written with old network map design in place.
class PersistentNetworkMapCacheTest : NodeBasedTest() {
class PersistentNetworkMapCacheTest {
private companion object {
val ALICE = TestIdentity(ALICE_NAME, 70).party
val BOB = TestIdentity(BOB_NAME, 80).party
val DUMMY_REGULATOR = TestIdentity(CordaX500Name("Regulator A", "Paris", "FR"), 100).party
val ALICE = TestIdentity(ALICE_NAME, 70)
val BOB = TestIdentity(BOB_NAME, 80)
val CHARLIE = TestIdentity(CHARLIE_NAME, 90)
}
private val partiesList = listOf(DUMMY_REGULATOR, ALICE, BOB)
private val addressesMap = HashMap<CordaX500Name, NetworkHostAndPort>()
private val infos = HashSet<NodeInfo>()
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
@Before
fun start() {
val nodes = startNodesWithPort(partiesList)
nodes.forEach {
infos.add(it.info)
addressesMap[it.info.singleIdentity().name] = it.info.addresses[0]
it.dispose() // We want them to communicate with NetworkMapService to save data to cache.
private var portCounter = 1000
private val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
private val charlieNetMapCache = PersistentNetworkMapCache(database, CHARLIE.name)
@After
fun cleanUp() {
database.close()
}
@Test
fun addNode() {
val alice = createNodeInfo(listOf(ALICE))
charlieNetMapCache.addNode(alice)
assertThat(charlieNetMapCache.nodeReady).isDone()
val fromDb = database.transaction {
session.createQuery(
"from ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name}",
NodeInfoSchemaV1.PersistentNodeInfo::class.java
).resultList.map { it.toNodeInfo() }
}
assertThat(fromDb).containsOnly(alice)
}
@Test
fun `adding the node's own node-info doesn't complete the nodeReady future`() {
val charlie = createNodeInfo(listOf(CHARLIE))
charlieNetMapCache.addNode(charlie)
assertThat(charlieNetMapCache.nodeReady).isNotDone()
assertThat(charlieNetMapCache.getNodeByLegalName(CHARLIE.name)).isEqualTo(charlie)
}
@Test
fun `starting with just the node's own node-info in the db`() {
val charlie = createNodeInfo(listOf(CHARLIE))
saveNodeInfoIntoDb(charlie)
assertThat(charlieNetMapCache.allNodes).containsOnly(charlie)
charlieNetMapCache.start(emptyList())
assertThat(charlieNetMapCache.nodeReady).isNotDone()
}
@Test
fun `starting with another node-info in the db`() {
val alice = createNodeInfo(listOf(ALICE))
saveNodeInfoIntoDb(alice)
assertThat(charlieNetMapCache.allNodes).containsOnly(alice)
charlieNetMapCache.start(emptyList())
assertThat(charlieNetMapCache.nodeReady).isDone()
}
@Test
fun `unknown legal name`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netMapCache = alice.services.networkMapCache
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty()
assertThat(netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull()
assertThat(netMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull()
assertThat(netMapCache.getPeerCertificateByLegalName(DUMMY_NOTARY_NAME)).isNull()
charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE)))
assertThat(charlieNetMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty()
assertThat(charlieNetMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull()
assertThat(charlieNetMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull()
assertThat(charlieNetMapCache.getPeerCertificateByLegalName(DUMMY_NOTARY_NAME)).isNull()
}
@Test
fun `nodes in distributed service`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netMapCache = alice.services.networkMapCache
charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE)))
val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME)
val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME).identity
val distServiceNodeInfos = (1..2).map {
val nodeInfo = NodeInfo(
addresses = listOf(NetworkHostAndPort("localhost", 1000 + it)),
legalIdentitiesAndCerts = listOf(TestIdentity.fresh("Org-$it").identity, distributedIdentity),
platformVersion = 3,
serial = 1
)
netMapCache.addNode(nodeInfo)
val nodeInfo = createNodeInfo(identities = listOf(TestIdentity.fresh("Org-$it"), distributedIdentity))
charlieNetMapCache.addNode(nodeInfo)
nodeInfo
}
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).containsOnlyElementsOf(distServiceNodeInfos)
assertThatExceptionOfType(IllegalArgumentException::class.java)
.isThrownBy { netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME) }
assertThat(charlieNetMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).containsOnlyElementsOf(distServiceNodeInfos)
assertThatIllegalArgumentException()
.isThrownBy { charlieNetMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME) }
.withMessageContaining(DUMMY_NOTARY_NAME.toString())
}
@Test
fun `get nodes by owning key and by name`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netCache = alice.services.networkMapCache
val res = netCache.getNodeByLegalIdentity(alice.info.singleIdentity())
assertEquals(alice.info, res)
val res2 = netCache.getNodeByLegalName(DUMMY_REGULATOR.name)
assertEquals(infos.singleOrNull { DUMMY_REGULATOR.name in it.legalIdentities.map { it.name } }, res2)
val alice = createNodeInfo(listOf(ALICE))
charlieNetMapCache.addNode(alice)
assertThat(charlieNetMapCache.getNodesByLegalIdentityKey(ALICE.publicKey)).containsOnly(alice)
assertThat(charlieNetMapCache.getNodeByLegalName(ALICE.name)).isEqualTo(alice)
}
@Test
fun `get nodes by address`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netCache = alice.services.networkMapCache
val res = netCache.getNodeByAddress(alice.info.addresses[0])
assertEquals(alice.info, res)
val alice = createNodeInfo(listOf(ALICE))
charlieNetMapCache.addNode(alice)
assertThat(charlieNetMapCache.getNodeByAddress(alice.addresses[0])).isEqualTo(alice)
}
// This test has to be done as normal node not mock, because MockNodes don't have addresses.
@Test
fun `insert two node infos with the same host and port`() {
val aliceNode = startNode(ALICE_NAME)
val charliePartyCert = getTestPartyAndCertificate(CHARLIE_NAME, generateKeyPair().public)
val aliceCache = aliceNode.services.networkMapCache
aliceCache.addNode(aliceNode.info.copy(legalIdentitiesAndCerts = listOf(charliePartyCert)))
val res = aliceCache.allNodes.filter { aliceNode.info.addresses[0] in it.addresses }
assertEquals(2, res.size)
val alice = createNodeInfo(listOf(ALICE))
charlieNetMapCache.addNode(alice)
val bob = createNodeInfo(listOf(BOB), address = alice.addresses[0])
charlieNetMapCache.addNode(bob)
val nodeInfos = charlieNetMapCache.allNodes.filter { alice.addresses[0] in it.addresses }
assertThat(nodeInfos).hasSize(2)
}
@Test
fun `restart node with DB map cache`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val partyNodes = alice.services.networkMapCache.allNodes
assertEquals(infos.size, partyNodes.size)
assertEquals(infos.flatMap { it.legalIdentities }.toSet(), partyNodes.flatMap { it.legalIdentities }.toSet())
private fun createNodeInfo(identities: List<TestIdentity>,
address: NetworkHostAndPort = NetworkHostAndPort("localhost", portCounter++)): NodeInfo {
return NodeInfo(
addresses = listOf(address),
legalIdentitiesAndCerts = identities.map { it.identity },
platformVersion = 3,
serial = 1
)
}
// HELPERS
// Helper function to restart nodes with the same host and port.
private fun startNodesWithPort(nodesToStart: List<Party>, customRetryIntervalMs: Long? = null): List<NodeWithInfo> {
return nodesToStart.map { party ->
val configOverrides = (addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap()) +
(customRetryIntervalMs?.let { mapOf("activeMQServer.bridge.retryIntervalMs" to it.toString()) } ?: emptyMap())
startNode(party.name, configOverrides = configOverrides)
private fun saveNodeInfoIntoDb(nodeInfo: NodeInfo) {
database.transaction {
session.save(NodeInfoSchemaV1.PersistentNodeInfo(
id = 0,
hash = nodeInfo.serialize().hash.toString(),
addresses = nodeInfo.addresses.map { NodeInfoSchemaV1.DBHostAndPort.fromHostAndPort(it) },
legalIdentitiesAndCerts = nodeInfo.legalIdentitiesAndCerts.mapIndexed { idx, elem ->
NodeInfoSchemaV1.DBPartyAndCertificate(elem, isMain = idx == 0)
},
platformVersion = nodeInfo.platformVersion,
serial = nodeInfo.serial
))
}
}
}

View File

@ -143,7 +143,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
// TODO Break cyclic dependency
identityService.database = database
}
private val persistentNetworkMapCache = PersistentNetworkMapCache(database)
private val persistentNetworkMapCache = PersistentNetworkMapCache(database, configuration.myLegalName)
val networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, identityService, database).tokenize()
val checkpointStorage = DBCheckpointStorage()
@Suppress("LeakingThis")
@ -247,7 +247,6 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
open fun generateAndSaveNodeInfo(): NodeInfo {
check(started == null) { "Node has already been started" }
log.info("Generating nodeInfo ...")
persistentNetworkMapCache.start(notaries = emptyList())
val trustRoot = initKeyStore()
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
startDatabase()
@ -255,6 +254,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
identityService.start(trustRoot, listOf(identity.certificate, nodeCa))
return database.use {
it.transaction {
persistentNetworkMapCache.start(notaries = emptyList())
val (_, nodeInfoAndSigned) = updateNodeInfo(identity, identityKeyPair, publish = false)
nodeInfoAndSigned.nodeInfo
}
@ -264,9 +264,9 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
fun clearNetworkMapCache() {
Node.printBasicNodeInfo("Clearing network map cache entries")
log.info("Starting clearing of network map cache entries...")
persistentNetworkMapCache.start(notaries = emptyList())
startDatabase()
database.use {
persistentNetworkMapCache.start(notaries = emptyList())
persistentNetworkMapCache.clearNetworkMapCache()
}
}
@ -302,13 +302,13 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
"Node's platform version is lower than network's required minimumPlatformVersion"
}
servicesForResolution.start(netParams)
persistentNetworkMapCache.start(netParams.notaries)
startDatabase()
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
identityService.start(trustRoot, listOf(identity.certificate, nodeCa))
val (keyPairs, nodeInfoAndSigned, myNotaryIdentity) = database.transaction {
persistentNetworkMapCache.start(netParams.notaries)
networkMapCache.start()
updateNodeInfo(identity, identityKeyPair, publish = true)
}

View File

@ -0,0 +1,51 @@
package net.corda.node.services.network
import net.corda.core.identity.AbstractParty
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.nodeapi.internal.persistence.CordaPersistence
class NetworkMapCacheImpl(
private val networkMapCacheBase: NetworkMapCacheBaseInternal,
private val identityService: IdentityService,
private val database: CordaPersistence
) : NetworkMapCacheBaseInternal by networkMapCacheBase, NetworkMapCacheInternal, SingletonSerializeAsToken() {
companion object {
private val logger = contextLogger()
}
fun start() {
for (nodeInfo in networkMapCacheBase.allNodes) {
for (identity in nodeInfo.legalIdentitiesAndCerts) {
identityService.verifyAndRegisterIdentity(identity)
}
}
networkMapCacheBase.changed.subscribe { mapChange ->
// TODO how should we handle network map removal
if (mapChange is NetworkMapCache.MapChange.Added) {
mapChange.node.legalIdentitiesAndCerts.forEach {
try {
identityService.verifyAndRegisterIdentity(it)
} catch (ignore: Exception) {
// Log a warning to indicate node info is not added to the network map cache.
logger.warn("Node info for :'${it.name}' is not added to the network map due to verification error.")
}
}
}
}
}
override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? {
return database.transaction {
val wellKnownParty = identityService.wellKnownPartyFromAnonymous(party)
wellKnownParty?.let {
getNodesByLegalIdentityKey(it.owningKey).firstOrNull()
}
}
}
}

View File

@ -3,7 +3,6 @@ package net.corda.node.services.network
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
@ -12,7 +11,6 @@ import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo
import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SingletonSerializeAsToken
@ -22,7 +20,6 @@ import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.NonInvalidatingCache
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
@ -34,45 +31,10 @@ import java.security.PublicKey
import java.util.*
import javax.annotation.concurrent.ThreadSafe
class NetworkMapCacheImpl(
private val networkMapCacheBase: NetworkMapCacheBaseInternal,
private val identityService: IdentityService,
private val database: CordaPersistence
) : NetworkMapCacheBaseInternal by networkMapCacheBase, NetworkMapCacheInternal, SingletonSerializeAsToken() {
companion object {
private val logger = contextLogger()
}
fun start() {
networkMapCacheBase.allNodes.forEach { it.legalIdentitiesAndCerts.forEach { identityService.verifyAndRegisterIdentity(it) } }
networkMapCacheBase.changed.subscribe { mapChange ->
// TODO how should we handle network map removal
if (mapChange is MapChange.Added) {
mapChange.node.legalIdentitiesAndCerts.forEach {
try {
identityService.verifyAndRegisterIdentity(it)
} catch (ignore: Exception) {
// Log a warning to indicate node info is not added to the network map cache.
logger.warn("Node info for :'${it.name}' is not added to the network map due to verification error.")
}
}
}
}
}
override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? {
return database.transaction {
val wellKnownParty = identityService.wellKnownPartyFromAnonymous(party)
wellKnownParty?.let {
getNodesByLegalIdentityKey(it.owningKey).firstOrNull()
}
}
}
}
/** Database-based network map cache. */
@ThreadSafe
open class PersistentNetworkMapCache(private val database: CordaPersistence) : SingletonSerializeAsToken(), NetworkMapCacheBaseInternal {
open class PersistentNetworkMapCache(private val database: CordaPersistence,
private val myLegalName: CordaX500Name) : SingletonSerializeAsToken(), NetworkMapCacheBaseInternal {
companion object {
private val logger = contextLogger()
}
@ -105,6 +67,15 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence) : S
fun start(notaries: List<NotaryInfo>) {
this.notaries = notaries
val otherNodeInfoCount = database.transaction {
session.createQuery(
"select count(*) from ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n join n.legalIdentitiesAndCerts i where i.name != :myLegalName")
.setParameter("myLegalName", myLegalName.toString())
.singleResult as Long
}
if (otherNodeInfoCount > 0) {
_nodeReady.set(null)
}
}
override fun getNodeByHash(nodeHash: SecureHash): NodeInfo? {
@ -202,7 +173,9 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence) : S
logger.info("Previous node was identical to incoming one - doing nothing")
}
}
_nodeReady.set(null)
if (node.legalIdentities[0].name != myLegalName) {
_nodeReady.set(null)
}
logger.debug { "Done adding node with info: $node" }
}

View File

@ -0,0 +1,52 @@
package net.corda.node.internal
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.startFlow
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Test
class NodeRestartTests {
private val mockNet = InternalMockNetwork(threadPerNode = true, autoVisibleNodes = false, notarySpecs = emptyList())
@After
fun cleanUp() {
mockNet.close()
}
@Test
fun `restart with no network map cache update`() {
val alice = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
val bob = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME))
bob.registerInitiatedFlow(Responder::class.java)
alice.services.networkMapCache.addNode(bob.info)
bob.services.networkMapCache.addNode(alice.info)
val alice2 = mockNet.restartNode(alice)
val result = alice2.services.startFlow(Initiator(bob.info.singleIdentity())).resultFuture.getOrThrow()
assertThat(result).isEqualTo(123)
}
@InitiatingFlow
private class Initiator(private val otherSide: Party) : FlowLogic<Int>() {
@Suspendable
override fun call(): Int = initiateFlow(otherSide).receive<Int>().unwrap { it }
}
@InitiatedBy(Initiator::class)
private class Responder(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() = otherSide.send(123)
}
}

View File

@ -82,6 +82,7 @@ data class MockNodeArgs(
val version: VersionInfo = MOCK_VERSION_INFO
)
// TODO We don't need a parameters object as this is internal only
data class InternalMockNodeParameters(
val forcedID: Int? = null,
val legalName: CordaX500Name? = null,
@ -147,7 +148,8 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
val testDirectory: Path = Paths.get("build", getTimestampAsDirectoryName()),
val networkParameters: NetworkParameters = testNetworkParameters(),
val defaultFactory: (MockNodeArgs, CordappLoader?) -> MockNode = { args, cordappLoader -> cordappLoader?.let { MockNode(args, it) } ?: MockNode(args) },
val cordappsForAllNodes: Set<TestCorDapp> = emptySet()) : AutoCloseable {
val cordappsForAllNodes: Set<TestCorDapp> = emptySet(),
val autoVisibleNodes: Boolean = true) : AutoCloseable {
init {
// Apache SSHD for whatever reason registers a SFTP FileSystemProvider - which gets loaded by JimFS.
// This SFTP support loads BouncyCastle, which we want to avoid.
@ -359,6 +361,7 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
}
private fun advertiseNodeToNetwork(newNode: TestStartedNode) {
if (!mockNet.autoVisibleNodes) return
mockNet.nodes
.mapNotNull { it.started }
.forEach { existingNode ->