ENT-1519: Ensure NodeInfo always has at least one address by checking in the c'tor (#2538)

Further, the look up of the node's own node-info from its database has been tightened to ensure there isn't more than one.

Also fixed some brittle code which was assuming exactly one address rather than at least one.
This commit is contained in:
Shams Asari 2018-02-15 14:06:41 +00:00 committed by GitHub
parent 308d70c0d1
commit 2864ce1384
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 112 additions and 44 deletions

View File

@ -26,7 +26,9 @@ data class NodeInfo(val addresses: List<NetworkHostAndPort>,
) {
// TODO We currently don't support multi-IP/multi-identity nodes, we only left slots in the data structures.
init {
require(legalIdentitiesAndCerts.isNotEmpty()) { "Node should have at least one legal identity" }
require(addresses.isNotEmpty()) { "Node must have at least one address" }
require(legalIdentitiesAndCerts.isNotEmpty()) { "Node must have at least one legal identity" }
require(platformVersion > 0) { "Platform version must be at least 1" }
}
@Transient private var _legalIdentities: List<Party>? = null

View File

@ -67,12 +67,22 @@ interface NetworkMapCacheBase {
fun track(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange>
/**
* Look up the node info for a legal name.
* Notice that when there are more than one node for a given name (in case of distributed services) first service node
* found will be returned.
* Return a [NodeInfo] which has the given legal name for one of its identities, or null if no such node is found.
*
* @throws IllegalArgumentException If more than one matching node is found, in the case of a distributed service identity
* (such as with a notary cluster). For such a scenerio use [getNodesByLegalName] instead.
*/
fun getNodeByLegalName(name: CordaX500Name): NodeInfo?
/**
* Return a list of [NodeInfo]s which have the given legal name for one of their identities, or an empty list if no
* such nodes are found.
*
* Normally there is at most one node for a legal name, but for distributed service identities (such as with a notary
* cluster) there can be multiple nodes sharing the same identity.
*/
fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo>
/** Look up the node info for a host and port. */
fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo?
@ -100,13 +110,6 @@ interface NetworkMapCacheBase {
*/
fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo>
/**
* Look up the node information entries for a legal name.
* Note that normally there will be only one node for a legal name, but for clusters of nodes or distributed services there
* can be multiple nodes.
*/
fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo>
/** Returns information about the party, which may be a specific node or a service */
fun getPartyInfo(party: Party): PartyInfo?

View File

@ -7,13 +7,13 @@ from the previous milestone release.
UNRELEASED
----------
* Added ``NetworkMapCache.getNodesByLegalName`` for querying nodes belonging to a distributed service such as a notary cluster
where they all share a common identity. ``NetworkMapCache.getNodeByLegalName`` has been tightened to throw if more than
one node with the legal name is found.
* Per CorDapp configuration is now exposed. ``CordappContext`` now exposes a ``CordappConfig`` object that is populated
at CorDapp context creation time from a file source during runtime.
* Provided experimental support for specifying your own webserver to be used instead of the default development webserver in ``Cordform`` using the ``webserverJar`` argument
* The test utils in ``Expect.kt``, ``SerializationTestHelpers.kt``, ``TestConstants.kt`` and ``TestUtils.kt`` have moved from the ``net.corda.testing`` package to the ``net.corda.testing.core`` package, and ``FlowStackSnapshot.kt`` has moved to the ``net.corda.testing.services`` package. Moving items out of the ``net.corda.testing.*`` package will help make it clearer which parts of the api are stable. The bash script ``tools\scripts\update-test-packages.sh`` can be used to smooth the upgrade process for existing projects.
* Introduced Flow Draining mode, in which a node continues executing existing flows, but does not start new. This is to support graceful node shutdown/restarts.
In particular, when this mode is on, new flows through RPC will be rejected, scheduled flows will be ignored, and initial session messages will not be consumed.
This will ensure that the number of checkpoints will strictly diminish with time, allowing for a clean shutdown.
@ -195,9 +195,18 @@ at CorDapp context creation time from a file source during runtime.
* Marked ``stateMachine`` on ``FlowLogic`` as ``CordaInternal`` to make clear that is it not part of the public api and is
only for internal use
* Provided experimental support for specifying your own webserver to be used instead of the default development
webserver in ``Cordform`` using the ``webserverJar`` argument
* Created new ``StartedMockNode`` and ``UnstartedMockNode`` classes which are wrappers around our MockNode implementation
that expose relevant methods for testing without exposing internals, create these using a ``MockNetwork``.
* The test utils in ``Expect.kt``, ``SerializationTestHelpers.kt``, ``TestConstants.kt`` and ``TestUtils.kt`` have moved
from the ``net.corda.testing`` package to the ``net.corda.testing.core`` package, and ``FlowStackSnapshot.kt`` has moved to the
``net.corda.testing.services`` package. Moving items out of the ``net.corda.testing.*`` package will help make it clearer which
parts of the api are stable. The bash script ``tools\scripts\update-test-packages.sh`` can be used to smooth the upgrade
process for existing projects.
.. _changelog_v1:
Release 1.0

View File

@ -155,9 +155,8 @@ class AMQPBridgeManager(val config: NodeSSLConfiguration, val p2pAddress: Networ
}
}
private fun gatherAddresses(node: NodeInfo): Sequence<ArtemisMessagingComponent.ArtemisPeerAddress> {
val address = node.addresses.single()
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, address) }.asSequence()
private fun gatherAddresses(node: NodeInfo): List<ArtemisMessagingComponent.NodeAddress> {
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, node.addresses[0]) }
}
override fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>) {

View File

@ -9,6 +9,8 @@ import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.testing.core.*
import net.corda.testing.node.internal.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Before
import org.junit.Test
import kotlin.test.assertEquals
@ -35,6 +37,45 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
}
}
@Test
fun `unknown legal name`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netMapCache = alice.services.networkMapCache
alice.database.transaction {
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty()
assertThat(netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull()
assertThat(netMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull()
assertThat(netMapCache.getPeerCertificateByLegalName(DUMMY_NOTARY_NAME)).isNull()
}
}
@Test
fun `nodes in distributed service`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netMapCache = alice.services.networkMapCache
val distServiceNodeInfos = alice.database.transaction {
val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME).identity
(1..2).map {
val nodeInfo = NodeInfo(
addresses = listOf(NetworkHostAndPort("localhost", 1000 + it)),
legalIdentitiesAndCerts = listOf(TestIdentity.fresh("Org-$it").identity, distributedIdentity),
platformVersion = 3,
serial = 1
)
netMapCache.addNode(nodeInfo)
nodeInfo
}
}
alice.database.transaction {
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).containsOnlyElementsOf(distServiceNodeInfos)
assertThatExceptionOfType(IllegalArgumentException::class.java)
.isThrownBy { netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME) }
.withMessageContaining(DUMMY_NOTARY_NAME.toString())
}
}
@Test
fun `get nodes by owning key and by name`() {
val alice = startNodesWithPort(listOf(ALICE))[0]

View File

@ -178,13 +178,13 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// a code smell.
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
persistentNetworkMapCache.start()
val (keyPairs, info) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
val signedNodeInfo = info.sign { publicKey, serialised ->
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
val signedNodeInfo = nodeInfo.sign { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private
privateKey.sign(serialised.bytes)
}
NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo)
info
nodeInfo
}
}
@ -203,11 +203,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// Do all of this in a database transaction so anything that might need a connection has one.
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, GlobalProperties.networkParameters.notaries).start(), identityService)
val (keyPairs, info) = initNodeInfo(networkMapCache, identity, identityKeyPair)
identityService.loadIdentities(info.legalIdentitiesAndCerts)
val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair)
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, info, identityService, networkMapCache, nodeProperties)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, nodeInfo, identityService, networkMapCache, nodeProperties)
val notaryService = makeNotaryService(nodeServices, database)
val smm = makeStateMachineManager(database)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
@ -239,7 +239,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
registerCordappFlows(smm)
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
startShell(rpcOps)
Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
Pair(StartedNodeImpl(this, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
}
networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
@ -297,20 +297,22 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
var info = NodeInfo(
val nodeInfoWithBlankSerial = NodeInfo(
myAddresses(),
setOf(identity, myNotaryIdentity).filterNotNull(),
versionInfo.platformVersion,
platformClock.instant().toEpochMilli()
serial = 0
)
// Check if we have already stored a version of 'our own' NodeInfo, this is to avoid regenerating it with
// a different timestamp.
networkMapCache.getNodesByLegalName(configuration.myLegalName).firstOrNull()?.let {
if (info.copy(serial = it.serial) == it) {
info = it
val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name)
val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0)) {
// The node info hasn't changed. We use the one from the database to preserve the serial.
nodeInfoFromDb
} else {
nodeInfoWithBlankSerial.copy(serial = platformClock.millis())
}
}
return Pair(keyPairs, info)
return Pair(keyPairs, nodeInfo)
}
protected abstract fun myAddresses(): List<NetworkHostAndPort>
@ -534,7 +536,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
schemaService: SchemaService,
transactionStorage: WritableTransactionStorage,
database: CordaPersistence,
info: NodeInfo,
nodeInfo: NodeInfo,
identityService: IdentityServiceInternal,
networkMapCache: NetworkMapCacheInternal,
nodeProperties: NodePropertiesStore): MutableList<Any> {
@ -551,10 +553,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
MonitoringService(metrics),
cordappProvider,
database,
info,
nodeInfo,
networkMapCache,
nodeProperties)
network = makeMessagingService(database, info, nodeProperties)
network = makeMessagingService(database, nodeInfo, nodeProperties)
val tokenizableServices = mutableListOf(attachments, network, services.vaultService,
services.keyManagementService, services.identityService, platformClock,
services.auditService, services.monitoringService, services.networkMapCache, services.schemaService,

View File

@ -157,7 +157,7 @@ open class Node(configuration: NodeConfiguration,
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) else startLocalRpcBroker()
val advertisedAddress = info.addresses.single()
val advertisedAddress = info.addresses[0]
bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize)
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())

View File

@ -7,12 +7,11 @@ import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.node.NotaryInfo
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.openFuture
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo
import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.PartyInfo
@ -21,6 +20,7 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.NonInvalidatingCache
@ -153,8 +153,17 @@ open class PersistentNetworkMapCache(
return null
}
override fun getNodeByLegalName(name: CordaX500Name): NodeInfo? = getNodesByLegalName(name).firstOrNull()
override fun getNodeByLegalName(name: CordaX500Name): NodeInfo? {
val nodeInfos = getNodesByLegalName(name)
return when (nodeInfos.size) {
0 -> null
1 -> nodeInfos[0]
else -> throw IllegalArgumentException("More than one node found with legal name $name")
}
}
override fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo> = database.transaction { queryByLegalName(session, name) }
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey]
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } })

View File

@ -16,6 +16,7 @@ import net.corda.core.node.services.*
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.VersionInfo
import net.corda.node.internal.configureDatabase
import net.corda.node.internal.cordapp.CordappLoader
@ -206,7 +207,7 @@ open class MockServices private constructor(
override val clock: Clock get() = Clock.systemUTC()
override val myInfo: NodeInfo
get() {
return NodeInfo(emptyList(), listOf(initialIdentity.identity), 1, serial = 1L)
return NodeInfo(listOf(NetworkHostAndPort("mock.node.services", 10000)), listOf(initialIdentity.identity), 1, serial = 1L)
}
override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2)
private val mockCordappProvider: MockCordappProvider = MockCordappProvider(cordappLoader, attachments)

View File

@ -311,7 +311,9 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1)
override fun myAddresses(): List<NetworkHostAndPort> = emptyList()
// NodeInfo requires a non-empty addresses list and so we give it a dummy value for mock nodes.
// The non-empty addresses check is important to have and so we tolerate the ugliness here.
override fun myAddresses(): List<NetworkHostAndPort> = listOf(NetworkHostAndPort("mock.node", 1000))
// Allow unit tests to modify the serialization whitelist list before the node start,
// so they don't have to ServiceLoad test whitelists into all unit tests.