ENT-1519: Ensure NodeInfo always has at least one address by checking in the c'tor (#2538)

Further, the look up of the node's own node-info from its database has been tightened to ensure there isn't more than one.

Also fixed some brittle code which was assuming exactly one address rather than at least one.
This commit is contained in:
Shams Asari
2018-02-15 14:06:41 +00:00
committed by GitHub
parent 308d70c0d1
commit 2864ce1384
10 changed files with 112 additions and 44 deletions

View File

@ -9,6 +9,8 @@ import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.testing.core.*
import net.corda.testing.node.internal.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Before
import org.junit.Test
import kotlin.test.assertEquals
@ -35,6 +37,45 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
}
}
@Test
fun `unknown legal name`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netMapCache = alice.services.networkMapCache
alice.database.transaction {
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty()
assertThat(netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull()
assertThat(netMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull()
assertThat(netMapCache.getPeerCertificateByLegalName(DUMMY_NOTARY_NAME)).isNull()
}
}
@Test
fun `nodes in distributed service`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netMapCache = alice.services.networkMapCache
val distServiceNodeInfos = alice.database.transaction {
val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME).identity
(1..2).map {
val nodeInfo = NodeInfo(
addresses = listOf(NetworkHostAndPort("localhost", 1000 + it)),
legalIdentitiesAndCerts = listOf(TestIdentity.fresh("Org-$it").identity, distributedIdentity),
platformVersion = 3,
serial = 1
)
netMapCache.addNode(nodeInfo)
nodeInfo
}
}
alice.database.transaction {
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).containsOnlyElementsOf(distServiceNodeInfos)
assertThatExceptionOfType(IllegalArgumentException::class.java)
.isThrownBy { netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME) }
.withMessageContaining(DUMMY_NOTARY_NAME.toString())
}
}
@Test
fun `get nodes by owning key and by name`() {
val alice = startNodesWithPort(listOf(ALICE))[0]

View File

@ -178,13 +178,13 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// a code smell.
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
persistentNetworkMapCache.start()
val (keyPairs, info) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
val signedNodeInfo = info.sign { publicKey, serialised ->
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
val signedNodeInfo = nodeInfo.sign { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private
privateKey.sign(serialised.bytes)
}
NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo)
info
nodeInfo
}
}
@ -203,11 +203,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// Do all of this in a database transaction so anything that might need a connection has one.
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, GlobalProperties.networkParameters.notaries).start(), identityService)
val (keyPairs, info) = initNodeInfo(networkMapCache, identity, identityKeyPair)
identityService.loadIdentities(info.legalIdentitiesAndCerts)
val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair)
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, info, identityService, networkMapCache, nodeProperties)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, nodeInfo, identityService, networkMapCache, nodeProperties)
val notaryService = makeNotaryService(nodeServices, database)
val smm = makeStateMachineManager(database)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
@ -239,7 +239,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
registerCordappFlows(smm)
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
startShell(rpcOps)
Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
Pair(StartedNodeImpl(this, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
}
networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
@ -297,20 +297,22 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
var info = NodeInfo(
val nodeInfoWithBlankSerial = NodeInfo(
myAddresses(),
setOf(identity, myNotaryIdentity).filterNotNull(),
versionInfo.platformVersion,
platformClock.instant().toEpochMilli()
serial = 0
)
// Check if we have already stored a version of 'our own' NodeInfo, this is to avoid regenerating it with
// a different timestamp.
networkMapCache.getNodesByLegalName(configuration.myLegalName).firstOrNull()?.let {
if (info.copy(serial = it.serial) == it) {
info = it
}
val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name)
val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0)) {
// The node info hasn't changed. We use the one from the database to preserve the serial.
nodeInfoFromDb
} else {
nodeInfoWithBlankSerial.copy(serial = platformClock.millis())
}
return Pair(keyPairs, info)
return Pair(keyPairs, nodeInfo)
}
protected abstract fun myAddresses(): List<NetworkHostAndPort>
@ -534,7 +536,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
schemaService: SchemaService,
transactionStorage: WritableTransactionStorage,
database: CordaPersistence,
info: NodeInfo,
nodeInfo: NodeInfo,
identityService: IdentityServiceInternal,
networkMapCache: NetworkMapCacheInternal,
nodeProperties: NodePropertiesStore): MutableList<Any> {
@ -551,10 +553,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
MonitoringService(metrics),
cordappProvider,
database,
info,
nodeInfo,
networkMapCache,
nodeProperties)
network = makeMessagingService(database, info, nodeProperties)
network = makeMessagingService(database, nodeInfo, nodeProperties)
val tokenizableServices = mutableListOf(attachments, network, services.vaultService,
services.keyManagementService, services.identityService, platformClock,
services.auditService, services.monitoringService, services.networkMapCache, services.schemaService,

View File

@ -157,7 +157,7 @@ open class Node(configuration: NodeConfiguration,
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) else startLocalRpcBroker()
val advertisedAddress = info.addresses.single()
val advertisedAddress = info.addresses[0]
bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize)
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())

View File

@ -7,12 +7,11 @@ import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.node.NotaryInfo
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.openFuture
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo
import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.PartyInfo
@ -21,6 +20,7 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.NonInvalidatingCache
@ -153,8 +153,17 @@ open class PersistentNetworkMapCache(
return null
}
override fun getNodeByLegalName(name: CordaX500Name): NodeInfo? = getNodesByLegalName(name).firstOrNull()
override fun getNodeByLegalName(name: CordaX500Name): NodeInfo? {
val nodeInfos = getNodesByLegalName(name)
return when (nodeInfos.size) {
0 -> null
1 -> nodeInfos[0]
else -> throw IllegalArgumentException("More than one node found with legal name $name")
}
}
override fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo> = database.transaction { queryByLegalName(session, name) }
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey]
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } })