mirror of
https://github.com/corda/corda.git
synced 2025-01-19 11:16:54 +00:00
ENT-1519: Ensure NodeInfo always has at least one address by checking in the c'tor (#2538)
Further, the look up of the node's own node-info from its database has been tightened to ensure there isn't more than one. Also fixed some brittle code which was assuming exactly one address rather than at least one.
This commit is contained in:
parent
308d70c0d1
commit
2864ce1384
@ -26,7 +26,9 @@ data class NodeInfo(val addresses: List<NetworkHostAndPort>,
|
||||
) {
|
||||
// TODO We currently don't support multi-IP/multi-identity nodes, we only left slots in the data structures.
|
||||
init {
|
||||
require(legalIdentitiesAndCerts.isNotEmpty()) { "Node should have at least one legal identity" }
|
||||
require(addresses.isNotEmpty()) { "Node must have at least one address" }
|
||||
require(legalIdentitiesAndCerts.isNotEmpty()) { "Node must have at least one legal identity" }
|
||||
require(platformVersion > 0) { "Platform version must be at least 1" }
|
||||
}
|
||||
|
||||
@Transient private var _legalIdentities: List<Party>? = null
|
||||
|
@ -67,12 +67,22 @@ interface NetworkMapCacheBase {
|
||||
fun track(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange>
|
||||
|
||||
/**
|
||||
* Look up the node info for a legal name.
|
||||
* Notice that when there are more than one node for a given name (in case of distributed services) first service node
|
||||
* found will be returned.
|
||||
* Return a [NodeInfo] which has the given legal name for one of its identities, or null if no such node is found.
|
||||
*
|
||||
* @throws IllegalArgumentException If more than one matching node is found, in the case of a distributed service identity
|
||||
* (such as with a notary cluster). For such a scenerio use [getNodesByLegalName] instead.
|
||||
*/
|
||||
fun getNodeByLegalName(name: CordaX500Name): NodeInfo?
|
||||
|
||||
/**
|
||||
* Return a list of [NodeInfo]s which have the given legal name for one of their identities, or an empty list if no
|
||||
* such nodes are found.
|
||||
*
|
||||
* Normally there is at most one node for a legal name, but for distributed service identities (such as with a notary
|
||||
* cluster) there can be multiple nodes sharing the same identity.
|
||||
*/
|
||||
fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo>
|
||||
|
||||
/** Look up the node info for a host and port. */
|
||||
fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo?
|
||||
|
||||
@ -100,13 +110,6 @@ interface NetworkMapCacheBase {
|
||||
*/
|
||||
fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo>
|
||||
|
||||
/**
|
||||
* Look up the node information entries for a legal name.
|
||||
* Note that normally there will be only one node for a legal name, but for clusters of nodes or distributed services there
|
||||
* can be multiple nodes.
|
||||
*/
|
||||
fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo>
|
||||
|
||||
/** Returns information about the party, which may be a specific node or a service */
|
||||
fun getPartyInfo(party: Party): PartyInfo?
|
||||
|
||||
|
@ -7,12 +7,12 @@ from the previous milestone release.
|
||||
UNRELEASED
|
||||
----------
|
||||
|
||||
* Per CorDapp configuration is now exposed. ``CordappContext`` now exposes a ``CordappConfig`` object that is populated
|
||||
at CorDapp context creation time from a file source during runtime.
|
||||
* Added ``NetworkMapCache.getNodesByLegalName`` for querying nodes belonging to a distributed service such as a notary cluster
|
||||
where they all share a common identity. ``NetworkMapCache.getNodeByLegalName`` has been tightened to throw if more than
|
||||
one node with the legal name is found.
|
||||
|
||||
* Provided experimental support for specifying your own webserver to be used instead of the default development webserver in ``Cordform`` using the ``webserverJar`` argument
|
||||
|
||||
* The test utils in ``Expect.kt``, ``SerializationTestHelpers.kt``, ``TestConstants.kt`` and ``TestUtils.kt`` have moved from the ``net.corda.testing`` package to the ``net.corda.testing.core`` package, and ``FlowStackSnapshot.kt`` has moved to the ``net.corda.testing.services`` package. Moving items out of the ``net.corda.testing.*`` package will help make it clearer which parts of the api are stable. The bash script ``tools\scripts\update-test-packages.sh`` can be used to smooth the upgrade process for existing projects.
|
||||
* Per CorDapp configuration is now exposed. ``CordappContext`` now exposes a ``CordappConfig`` object that is populated
|
||||
at CorDapp context creation time from a file source during runtime.
|
||||
|
||||
* Introduced Flow Draining mode, in which a node continues executing existing flows, but does not start new. This is to support graceful node shutdown/restarts.
|
||||
In particular, when this mode is on, new flows through RPC will be rejected, scheduled flows will be ignored, and initial session messages will not be consumed.
|
||||
@ -195,9 +195,18 @@ at CorDapp context creation time from a file source during runtime.
|
||||
* Marked ``stateMachine`` on ``FlowLogic`` as ``CordaInternal`` to make clear that is it not part of the public api and is
|
||||
only for internal use
|
||||
|
||||
* Provided experimental support for specifying your own webserver to be used instead of the default development
|
||||
webserver in ``Cordform`` using the ``webserverJar`` argument
|
||||
|
||||
* Created new ``StartedMockNode`` and ``UnstartedMockNode`` classes which are wrappers around our MockNode implementation
|
||||
that expose relevant methods for testing without exposing internals, create these using a ``MockNetwork``.
|
||||
|
||||
* The test utils in ``Expect.kt``, ``SerializationTestHelpers.kt``, ``TestConstants.kt`` and ``TestUtils.kt`` have moved
|
||||
from the ``net.corda.testing`` package to the ``net.corda.testing.core`` package, and ``FlowStackSnapshot.kt`` has moved to the
|
||||
``net.corda.testing.services`` package. Moving items out of the ``net.corda.testing.*`` package will help make it clearer which
|
||||
parts of the api are stable. The bash script ``tools\scripts\update-test-packages.sh`` can be used to smooth the upgrade
|
||||
process for existing projects.
|
||||
|
||||
.. _changelog_v1:
|
||||
|
||||
Release 1.0
|
||||
|
@ -155,9 +155,8 @@ class AMQPBridgeManager(val config: NodeSSLConfiguration, val p2pAddress: Networ
|
||||
}
|
||||
}
|
||||
|
||||
private fun gatherAddresses(node: NodeInfo): Sequence<ArtemisMessagingComponent.ArtemisPeerAddress> {
|
||||
val address = node.addresses.single()
|
||||
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, address) }.asSequence()
|
||||
private fun gatherAddresses(node: NodeInfo): List<ArtemisMessagingComponent.NodeAddress> {
|
||||
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, node.addresses[0]) }
|
||||
}
|
||||
|
||||
override fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>) {
|
||||
|
@ -9,6 +9,8 @@ import net.corda.node.internal.Node
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.node.internal.NodeBasedTest
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
@ -35,6 +37,45 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `unknown legal name`() {
|
||||
val alice = startNodesWithPort(listOf(ALICE))[0]
|
||||
val netMapCache = alice.services.networkMapCache
|
||||
alice.database.transaction {
|
||||
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty()
|
||||
assertThat(netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull()
|
||||
assertThat(netMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull()
|
||||
assertThat(netMapCache.getPeerCertificateByLegalName(DUMMY_NOTARY_NAME)).isNull()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `nodes in distributed service`() {
|
||||
val alice = startNodesWithPort(listOf(ALICE))[0]
|
||||
val netMapCache = alice.services.networkMapCache
|
||||
|
||||
val distServiceNodeInfos = alice.database.transaction {
|
||||
val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME).identity
|
||||
(1..2).map {
|
||||
val nodeInfo = NodeInfo(
|
||||
addresses = listOf(NetworkHostAndPort("localhost", 1000 + it)),
|
||||
legalIdentitiesAndCerts = listOf(TestIdentity.fresh("Org-$it").identity, distributedIdentity),
|
||||
platformVersion = 3,
|
||||
serial = 1
|
||||
)
|
||||
netMapCache.addNode(nodeInfo)
|
||||
nodeInfo
|
||||
}
|
||||
}
|
||||
|
||||
alice.database.transaction {
|
||||
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).containsOnlyElementsOf(distServiceNodeInfos)
|
||||
assertThatExceptionOfType(IllegalArgumentException::class.java)
|
||||
.isThrownBy { netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME) }
|
||||
.withMessageContaining(DUMMY_NOTARY_NAME.toString())
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `get nodes by owning key and by name`() {
|
||||
val alice = startNodesWithPort(listOf(ALICE))[0]
|
||||
|
@ -178,13 +178,13 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
// a code smell.
|
||||
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
|
||||
persistentNetworkMapCache.start()
|
||||
val (keyPairs, info) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
|
||||
val signedNodeInfo = info.sign { publicKey, serialised ->
|
||||
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
|
||||
val signedNodeInfo = nodeInfo.sign { publicKey, serialised ->
|
||||
val privateKey = keyPairs.single { it.public == publicKey }.private
|
||||
privateKey.sign(serialised.bytes)
|
||||
}
|
||||
NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo)
|
||||
info
|
||||
nodeInfo
|
||||
}
|
||||
}
|
||||
|
||||
@ -203,11 +203,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
// Do all of this in a database transaction so anything that might need a connection has one.
|
||||
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
|
||||
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, GlobalProperties.networkParameters.notaries).start(), identityService)
|
||||
val (keyPairs, info) = initNodeInfo(networkMapCache, identity, identityKeyPair)
|
||||
identityService.loadIdentities(info.legalIdentitiesAndCerts)
|
||||
val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair)
|
||||
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
|
||||
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
|
||||
val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
|
||||
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, info, identityService, networkMapCache, nodeProperties)
|
||||
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, nodeInfo, identityService, networkMapCache, nodeProperties)
|
||||
val notaryService = makeNotaryService(nodeServices, database)
|
||||
val smm = makeStateMachineManager(database)
|
||||
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
|
||||
@ -239,7 +239,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
registerCordappFlows(smm)
|
||||
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
|
||||
startShell(rpcOps)
|
||||
Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
|
||||
Pair(StartedNodeImpl(this, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
|
||||
}
|
||||
networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
|
||||
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
|
||||
@ -297,20 +297,22 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
var info = NodeInfo(
|
||||
val nodeInfoWithBlankSerial = NodeInfo(
|
||||
myAddresses(),
|
||||
setOf(identity, myNotaryIdentity).filterNotNull(),
|
||||
versionInfo.platformVersion,
|
||||
platformClock.instant().toEpochMilli()
|
||||
serial = 0
|
||||
)
|
||||
// Check if we have already stored a version of 'our own' NodeInfo, this is to avoid regenerating it with
|
||||
// a different timestamp.
|
||||
networkMapCache.getNodesByLegalName(configuration.myLegalName).firstOrNull()?.let {
|
||||
if (info.copy(serial = it.serial) == it) {
|
||||
info = it
|
||||
}
|
||||
|
||||
val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name)
|
||||
|
||||
val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0)) {
|
||||
// The node info hasn't changed. We use the one from the database to preserve the serial.
|
||||
nodeInfoFromDb
|
||||
} else {
|
||||
nodeInfoWithBlankSerial.copy(serial = platformClock.millis())
|
||||
}
|
||||
return Pair(keyPairs, info)
|
||||
return Pair(keyPairs, nodeInfo)
|
||||
}
|
||||
|
||||
protected abstract fun myAddresses(): List<NetworkHostAndPort>
|
||||
@ -534,7 +536,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
schemaService: SchemaService,
|
||||
transactionStorage: WritableTransactionStorage,
|
||||
database: CordaPersistence,
|
||||
info: NodeInfo,
|
||||
nodeInfo: NodeInfo,
|
||||
identityService: IdentityServiceInternal,
|
||||
networkMapCache: NetworkMapCacheInternal,
|
||||
nodeProperties: NodePropertiesStore): MutableList<Any> {
|
||||
@ -551,10 +553,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
MonitoringService(metrics),
|
||||
cordappProvider,
|
||||
database,
|
||||
info,
|
||||
nodeInfo,
|
||||
networkMapCache,
|
||||
nodeProperties)
|
||||
network = makeMessagingService(database, info, nodeProperties)
|
||||
network = makeMessagingService(database, nodeInfo, nodeProperties)
|
||||
val tokenizableServices = mutableListOf(attachments, network, services.vaultService,
|
||||
services.keyManagementService, services.identityService, platformClock,
|
||||
services.auditService, services.monitoringService, services.networkMapCache, services.schemaService,
|
||||
|
@ -157,7 +157,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
|
||||
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
|
||||
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) else startLocalRpcBroker()
|
||||
val advertisedAddress = info.addresses.single()
|
||||
val advertisedAddress = info.addresses[0]
|
||||
bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize)
|
||||
|
||||
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())
|
||||
|
@ -7,12 +7,11 @@ import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.node.internal.schemas.NodeInfoSchemaV1
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.node.services.IdentityService
|
||||
import net.corda.core.node.services.NetworkMapCache.MapChange
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
@ -21,6 +20,7 @@ import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.internal.schemas.NodeInfoSchemaV1
|
||||
import net.corda.node.services.api.NetworkMapCacheBaseInternal
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.utilities.NonInvalidatingCache
|
||||
@ -153,8 +153,17 @@ open class PersistentNetworkMapCache(
|
||||
return null
|
||||
}
|
||||
|
||||
override fun getNodeByLegalName(name: CordaX500Name): NodeInfo? = getNodesByLegalName(name).firstOrNull()
|
||||
override fun getNodeByLegalName(name: CordaX500Name): NodeInfo? {
|
||||
val nodeInfos = getNodesByLegalName(name)
|
||||
return when (nodeInfos.size) {
|
||||
0 -> null
|
||||
1 -> nodeInfos[0]
|
||||
else -> throw IllegalArgumentException("More than one node found with legal name $name")
|
||||
}
|
||||
}
|
||||
|
||||
override fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo> = database.transaction { queryByLegalName(session, name) }
|
||||
|
||||
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey]
|
||||
|
||||
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } })
|
||||
|
@ -16,6 +16,7 @@ import net.corda.core.node.services.*
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
@ -206,7 +207,7 @@ open class MockServices private constructor(
|
||||
override val clock: Clock get() = Clock.systemUTC()
|
||||
override val myInfo: NodeInfo
|
||||
get() {
|
||||
return NodeInfo(emptyList(), listOf(initialIdentity.identity), 1, serial = 1L)
|
||||
return NodeInfo(listOf(NetworkHostAndPort("mock.node.services", 10000)), listOf(initialIdentity.identity), 1, serial = 1L)
|
||||
}
|
||||
override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2)
|
||||
private val mockCordappProvider: MockCordappProvider = MockCordappProvider(cordappLoader, attachments)
|
||||
|
@ -311,7 +311,9 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
|
||||
|
||||
override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1)
|
||||
|
||||
override fun myAddresses(): List<NetworkHostAndPort> = emptyList()
|
||||
// NodeInfo requires a non-empty addresses list and so we give it a dummy value for mock nodes.
|
||||
// The non-empty addresses check is important to have and so we tolerate the ugliness here.
|
||||
override fun myAddresses(): List<NetworkHostAndPort> = listOf(NetworkHostAndPort("mock.node", 1000))
|
||||
|
||||
// Allow unit tests to modify the serialization whitelist list before the node start,
|
||||
// so they don't have to ServiceLoad test whitelists into all unit tests.
|
||||
|
Loading…
Reference in New Issue
Block a user