ENT-1519 - Ensure NodeInfo always has at least one address by checking in the c'tor (#2538) (#2542)

Further, the look up of the node's own node-info from its database has been tightened to ensure there isn't more than one.

Also fixed some brittle code which was assuming exactly one address rather than at least one.

(cherry picked from commit 2864ce1)
This commit is contained in:
Shams Asari 2018-02-16 16:52:08 +00:00 committed by Katelyn Baker
parent b24ec9f680
commit 4bc3f9ffa8
10 changed files with 116 additions and 39 deletions

View File

@ -26,7 +26,9 @@ data class NodeInfo(val addresses: List<NetworkHostAndPort>,
) { ) {
// TODO We currently don't support multi-IP/multi-identity nodes, we only left slots in the data structures. // TODO We currently don't support multi-IP/multi-identity nodes, we only left slots in the data structures.
init { init {
require(legalIdentitiesAndCerts.isNotEmpty()) { "Node should have at least one legal identity" } require(addresses.isNotEmpty()) { "Node must have at least one address" }
require(legalIdentitiesAndCerts.isNotEmpty()) { "Node must have at least one legal identity" }
require(platformVersion > 0) { "Platform version must be at least 1" }
} }
@Transient private var _legalIdentities: List<Party>? = null @Transient private var _legalIdentities: List<Party>? = null

View File

@ -67,12 +67,22 @@ interface NetworkMapCacheBase {
fun track(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange> fun track(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange>
/** /**
* Look up the node info for a legal name. * Return a [NodeInfo] which has the given legal name for one of its identities, or null if no such node is found.
* Notice that when there are more than one node for a given name (in case of distributed services) first service node *
* found will be returned. * @throws IllegalArgumentException If more than one matching node is found, in the case of a distributed service identity
* (such as with a notary cluster). For such a scenerio use [getNodesByLegalName] instead.
*/ */
fun getNodeByLegalName(name: CordaX500Name): NodeInfo? fun getNodeByLegalName(name: CordaX500Name): NodeInfo?
/**
* Return a list of [NodeInfo]s which have the given legal name for one of their identities, or an empty list if no
* such nodes are found.
*
* Normally there is at most one node for a legal name, but for distributed service identities (such as with a notary
* cluster) there can be multiple nodes sharing the same identity.
*/
fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo>
/** Look up the node info for a host and port. */ /** Look up the node info for a host and port. */
fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo?
@ -100,13 +110,6 @@ interface NetworkMapCacheBase {
*/ */
fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo>
/**
* Look up the node information entries for a legal name.
* Note that normally there will be only one node for a legal name, but for clusters of nodes or distributed services there
* can be multiple nodes.
*/
fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo>
/** Returns information about the party, which may be a specific node or a service */ /** Returns information about the party, which may be a specific node or a service */
fun getPartyInfo(party: Party): PartyInfo? fun getPartyInfo(party: Party): PartyInfo?

View File

@ -6,6 +6,14 @@ from the previous milestone release.
UNRELEASED UNRELEASED
---------- ----------
* Added ``NetworkMapCache.getNodesByLegalName`` for querying nodes belonging to a distributed service such as a notary cluster
where they all share a common identity. ``NetworkMapCache.getNodeByLegalName`` has been tightened to throw if more than
one node with the legal name is found.
* Per CorDapp configuration is now exposed. ``CordappContext`` now exposes a ``CordappConfig`` object that is populated
at CorDapp context creation time from a file source during runtime.
* Introduced Flow Draining mode, in which a node continues executing existing flows, but does not start new. This is to support graceful node shutdown/restarts. * Introduced Flow Draining mode, in which a node continues executing existing flows, but does not start new. This is to support graceful node shutdown/restarts.
In particular, when this mode is on, new flows through RPC will be rejected, scheduled flows will be ignored, and initial session messages will not be consumed. In particular, when this mode is on, new flows through RPC will be rejected, scheduled flows will be ignored, and initial session messages will not be consumed.
This will ensure that the number of checkpoints will strictly diminish with time, allowing for a clean shutdown. This will ensure that the number of checkpoints will strictly diminish with time, allowing for a clean shutdown.
@ -185,6 +193,9 @@ UNRELEASED
* Marked ``stateMachine`` on ``FlowLogic`` as ``CordaInternal`` to make clear that is it not part of the public api and is * Marked ``stateMachine`` on ``FlowLogic`` as ``CordaInternal`` to make clear that is it not part of the public api and is
only for internal use only for internal use
* Provided experimental support for specifying your own webserver to be used instead of the default development
webserver in ``Cordform`` using the ``webserverJar`` argument
* Created new ``StartedMockNode`` and ``UnstartedMockNode`` classes which are wrappers around our MockNode implementation * Created new ``StartedMockNode`` and ``UnstartedMockNode`` classes which are wrappers around our MockNode implementation
that expose relevant methods for testing without exposing internals, create these using a ``MockNetwork``. that expose relevant methods for testing without exposing internals, create these using a ``MockNetwork``.

View File

@ -155,9 +155,8 @@ class AMQPBridgeManager(val config: NodeSSLConfiguration, val p2pAddress: Networ
} }
} }
private fun gatherAddresses(node: NodeInfo): Sequence<ArtemisMessagingComponent.ArtemisPeerAddress> { private fun gatherAddresses(node: NodeInfo): List<ArtemisMessagingComponent.NodeAddress> {
val address = node.addresses.single() return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, node.addresses[0]) }
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, address) }.asSequence()
} }
override fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>) { override fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>) {

View File

@ -9,6 +9,8 @@ import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode import net.corda.node.internal.StartedNode
import net.corda.testing.core.* import net.corda.testing.core.*
import net.corda.testing.node.internal.NodeBasedTest import net.corda.testing.node.internal.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals
@ -35,6 +37,45 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
} }
} }
@Test
fun `unknown legal name`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netMapCache = alice.services.networkMapCache
alice.database.transaction {
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty()
assertThat(netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull()
assertThat(netMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull()
assertThat(netMapCache.getPeerCertificateByLegalName(DUMMY_NOTARY_NAME)).isNull()
}
}
@Test
fun `nodes in distributed service`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netMapCache = alice.services.networkMapCache
val distServiceNodeInfos = alice.database.transaction {
val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME).identity
(1..2).map {
val nodeInfo = NodeInfo(
addresses = listOf(NetworkHostAndPort("localhost", 1000 + it)),
legalIdentitiesAndCerts = listOf(TestIdentity.fresh("Org-$it").identity, distributedIdentity),
platformVersion = 3,
serial = 1
)
netMapCache.addNode(nodeInfo)
nodeInfo
}
}
alice.database.transaction {
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).containsOnlyElementsOf(distServiceNodeInfos)
assertThatExceptionOfType(IllegalArgumentException::class.java)
.isThrownBy { netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME) }
.withMessageContaining(DUMMY_NOTARY_NAME.toString())
}
}
@Test @Test
fun `get nodes by owning key and by name`() { fun `get nodes by owning key and by name`() {
val alice = startNodesWithPort(listOf(ALICE))[0] val alice = startNodesWithPort(listOf(ALICE))[0]

View File

@ -180,13 +180,13 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// a code smell. // a code smell.
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList()) val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
persistentNetworkMapCache.start() persistentNetworkMapCache.start()
val (keyPairs, info) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair) val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
val signedNodeInfo = info.sign { publicKey, serialised -> val signedNodeInfo = nodeInfo.sign { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private val privateKey = keyPairs.single { it.public == publicKey }.private
privateKey.sign(serialised.bytes) privateKey.sign(serialised.bytes)
} }
NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo) NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo)
info nodeInfo
} }
} }
@ -205,11 +205,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// Do all of this in a database transaction so anything that might need a connection has one. // Do all of this in a database transaction so anything that might need a connection has one.
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database -> val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, GlobalProperties.networkParameters.notaries).start(), identityService) val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, GlobalProperties.networkParameters.notaries).start(), identityService)
val (keyPairs, info) = initNodeInfo(networkMapCache, identity, identityKeyPair) val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair)
identityService.loadIdentities(info.legalIdentitiesAndCerts) identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes) val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database) val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, info, identityService, networkMapCache, nodeProperties) val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, nodeInfo, identityService, networkMapCache, nodeProperties)
val notaryService = makeNotaryService(nodeServices, database) val notaryService = makeNotaryService(nodeServices, database)
val smm = makeStateMachineManager(database) val smm = makeStateMachineManager(database)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader) val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
@ -241,7 +241,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
registerCordappFlows(smm) registerCordappFlows(smm)
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows } _services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
startShell(rpcOps) startShell(rpcOps)
Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService) Pair(StartedNodeImpl(this, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
} }
networkMapUpdater = NetworkMapUpdater(services.networkMapCache, networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)), NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
@ -297,20 +297,22 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
} }
} }
var info = NodeInfo( val nodeInfoWithBlankSerial = NodeInfo(
myAddresses(), myAddresses(),
setOf(identity, myNotaryIdentity).filterNotNull(), setOf(identity, myNotaryIdentity).filterNotNull(),
versionInfo.platformVersion, versionInfo.platformVersion,
platformClock.instant().toEpochMilli() serial = 0
) )
// Check if we have already stored a version of 'our own' NodeInfo, this is to avoid regenerating it with
// a different timestamp. val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name)
networkMapCache.getNodesByLegalName(configuration.myLegalName).firstOrNull()?.let {
if (info.copy(serial = it.serial) == it) { val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0)) {
info = it // The node info hasn't changed. We use the one from the database to preserve the serial.
} nodeInfoFromDb
} else {
nodeInfoWithBlankSerial.copy(serial = platformClock.millis())
} }
return Pair(keyPairs, info) return Pair(keyPairs, nodeInfo)
} }
protected abstract fun myAddresses(): List<NetworkHostAndPort> protected abstract fun myAddresses(): List<NetworkHostAndPort>
@ -530,7 +532,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
* Builds node internal, advertised, and plugin services. * Builds node internal, advertised, and plugin services.
* Returns a list of tokenizable services to be added to the serialisation context. * Returns a list of tokenizable services to be added to the serialisation context.
*/ */
private fun makeServices(keyPairs: Set<KeyPair>, schemaService: SchemaService, transactionStorage: WritableTransactionStorage, database: CordaPersistence, info: NodeInfo, identityService: IdentityService, networkMapCache: NetworkMapCacheInternal, nodeProperties: NodePropertiesStore): MutableList<Any> { private fun makeServices(keyPairs: Set<KeyPair>,
schemaService: SchemaService,
transactionStorage: WritableTransactionStorage,
database: CordaPersistence,
nodeInfo: NodeInfo,
identityService: IdentityService,
networkMapCache: NetworkMapCacheInternal,
nodeProperties: NodePropertiesStore): MutableList<Any> {
checkpointStorage = DBCheckpointStorage() checkpointStorage = DBCheckpointStorage()
val metrics = MetricRegistry() val metrics = MetricRegistry()
attachments = NodeAttachmentService(metrics, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound) attachments = NodeAttachmentService(metrics, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound)
@ -544,10 +553,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
MonitoringService(metrics), MonitoringService(metrics),
cordappProvider, cordappProvider,
database, database,
info, nodeInfo,
networkMapCache, networkMapCache,
nodeProperties) nodeProperties)
network = makeMessagingService(database, info, nodeProperties) network = makeMessagingService(database, nodeInfo, nodeProperties)
val tokenizableServices = mutableListOf(attachments, network, services.vaultService, val tokenizableServices = mutableListOf(attachments, network, services.vaultService,
services.keyManagementService, services.identityService, platformClock, services.keyManagementService, services.identityService, platformClock,
services.auditService, services.monitoringService, services.networkMapCache, services.schemaService, services.auditService, services.monitoringService, services.networkMapCache, services.schemaService,

View File

@ -159,7 +159,7 @@ open class Node(configuration: NodeConfiguration,
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker() val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) else startLocalRpcBroker() val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) else startLocalRpcBroker()
val advertisedAddress = info.addresses.single() val advertisedAddress = info.addresses[0]
bridgeControlListener = BridgeControlListener(configuration, serverAddress, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE) bridgeControlListener = BridgeControlListener(configuration, serverAddress, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE)
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString()) printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())

View File

@ -7,12 +7,11 @@ import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate import net.corda.core.identity.PartyAndCertificate
import net.corda.core.node.NotaryInfo
import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.core.messaging.DataFeed import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.IdentityService import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.PartyInfo import net.corda.core.node.services.PartyInfo
@ -21,6 +20,7 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.node.services.api.NetworkMapCacheBaseInternal import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.NonInvalidatingCache import net.corda.node.utilities.NonInvalidatingCache
@ -153,8 +153,17 @@ open class PersistentNetworkMapCache(
return null return null
} }
override fun getNodeByLegalName(name: CordaX500Name): NodeInfo? = getNodesByLegalName(name).firstOrNull() override fun getNodeByLegalName(name: CordaX500Name): NodeInfo? {
val nodeInfos = getNodesByLegalName(name)
return when (nodeInfos.size) {
0 -> null
1 -> nodeInfos[0]
else -> throw IllegalArgumentException("More than one node found with legal name $name")
}
}
override fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo> = database.transaction { queryByLegalName(session, name) } override fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo> = database.transaction { queryByLegalName(session, name) }
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey] override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey]
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } }) private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } })

View File

@ -16,6 +16,7 @@ import net.corda.core.node.services.*
import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.VersionInfo import net.corda.node.VersionInfo
import net.corda.node.internal.configureDatabase import net.corda.node.internal.configureDatabase
import net.corda.node.internal.cordapp.CordappLoader import net.corda.node.internal.cordapp.CordappLoader
@ -206,7 +207,7 @@ open class MockServices private constructor(
override val clock: Clock get() = Clock.systemUTC() override val clock: Clock get() = Clock.systemUTC()
override val myInfo: NodeInfo override val myInfo: NodeInfo
get() { get() {
return NodeInfo(emptyList(), listOf(initialIdentity.identity), 1, serial = 1L) return NodeInfo(listOf(NetworkHostAndPort("mock.node.services", 10000)), listOf(initialIdentity.identity), 1, serial = 1L)
} }
override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2) override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2)
private val mockCordappProvider: MockCordappProvider = MockCordappProvider(cordappLoader, attachments) private val mockCordappProvider: MockCordappProvider = MockCordappProvider(cordappLoader, attachments)

View File

@ -303,7 +303,9 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1) override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1)
override fun myAddresses(): List<NetworkHostAndPort> = emptyList() // NodeInfo requires a non-empty addresses list and so we give it a dummy value for mock nodes.
// The non-empty addresses check is important to have and so we tolerate the ugliness here.
override fun myAddresses(): List<NetworkHostAndPort> = listOf(NetworkHostAndPort("mock.node", 1000))
// Allow unit tests to modify the serialization whitelist list before the node start, // Allow unit tests to modify the serialization whitelist list before the node start,
// so they don't have to ServiceLoad test whitelists into all unit tests. // so they don't have to ServiceLoad test whitelists into all unit tests.