CORDA-3263 Improve error handling for registering peer node (#5559)

* Capture and log "nodeInfo" persistence failures, whilst maintaining an optimistic retry mechanism.

* Additional test cases (update and insert)

* Handle both updates and inserts consistently (single transaction for happy path)

* Fix detekt violations and update baseline with false detection.

* Streamline the code a little.

* Update baseline reporting false violation.
This commit is contained in:
josecoll 2019-10-08 11:38:11 +01:00 committed by Rick Parker
parent 246ec8766e
commit efa01410ec
4 changed files with 116 additions and 11 deletions

View File

@ -184,7 +184,6 @@
<ID>ComplexMethod:NodeVaultService.kt$NodeVaultService$private fun makeUpdates(batch: Iterable&lt;CoreTransaction&gt;, statesToRecord: StatesToRecord, previouslySeen: Boolean): List&lt;Vault.Update&lt;ContractState&gt;&gt;</ID>
<ID>ComplexMethod:ObjectDiffer.kt$ObjectDiffer$fun diff(a: Any?, b: Any?): DiffTree?</ID>
<ID>ComplexMethod:Obligation.kt$Obligation$override fun verify(tx: LedgerTransaction)</ID>
<ID>ComplexMethod:PersistentNetworkMapCache.kt$PersistentNetworkMapCache$override fun addNodes(nodes: List&lt;NodeInfo&gt;)</ID>
<ID>ComplexMethod:QuasarInstrumentationHook.kt$QuasarInstrumentationHookAgent.Companion$@JvmStatic fun premain(argumentsString: String?, instrumentation: Instrumentation)</ID>
<ID>ComplexMethod:RPCClientProxyHandler.kt$RPCClientProxyHandler$ private fun close(notify: Boolean = true)</ID>
<ID>ComplexMethod:RPCClientProxyHandler.kt$RPCClientProxyHandler$// The handler for Artemis messages. private fun artemisMessageHandler(message: ClientMessage)</ID>
@ -3030,6 +3029,7 @@
<ID>MaxLineLength:PersistentNetworkMapCache.kt$PersistentNetworkMapCache$"SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name"</ID>
<ID>MaxLineLength:PersistentNetworkMapCache.kt$PersistentNetworkMapCache$"SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.owningKeyHash = :owningKeyHash"</ID>
<ID>MaxLineLength:PersistentNetworkMapCache.kt$PersistentNetworkMapCache$val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey).singleOrNull { it.serial == nodeInfo.serial }</ID>
<ID>MaxLineLength:PersistentNetworkMapCache.kt$PersistentNetworkMapCache$val newNodes = mutableListOf&lt;NodeInfo&gt;() val updatedNodes = mutableListOf&lt;Pair&lt;NodeInfo, NodeInfo&gt;&gt;() nodes.map { it to getNodesByLegalIdentityKey(it.legalIdentities.first().owningKey).firstOrNull() } .forEach { (node, previousNode) -&gt; when { previousNode == null -&gt; { logger.info("No previous node found for ${node.legalIdentities.first().name}") if (verifyAndRegisterIdentities(node)) { newNodes.add(node) } } previousNode.serial &gt; node.serial -&gt; { logger.info("Discarding older nodeInfo for ${node.legalIdentities.first().name}") } previousNode != node -&gt; { logger.info("Previous node was found for ${node.legalIdentities.first().name} as: $previousNode") // TODO We should be adding any new identities as well if (verifyIdentities(node)) { updatedNodes.add(node to previousNode) } } else -&gt; logger.info("Previous node was identical to incoming one - doing nothing") } } /** * This algorithm protects against database failure (eg. attempt to persist a nodeInfo entry larger than permissible by the * database X500Name) without sacrificing performance incurred by attempting to flush nodeInfo's individually. * Upon database transaction failure, the list of new nodeInfo's is split in half, and then each half is persisted independently. * This continues recursively until all valid nodeInfo's are persisted, and failed ones reported as warnings. */ recursivelyUpdateNodes(newNodes.map { nodeInfo -&gt; Pair(nodeInfo, MapChange.Added(nodeInfo)) } + updatedNodes.map { (nodeInfo, previousNodeInfo) -&gt; Pair(nodeInfo, MapChange.Modified(nodeInfo, previousNodeInfo)) })</ID>
<ID>MaxLineLength:PersistentNetworkMapCacheTest.kt$PersistentNetworkMapCacheTest$private val charlieNetMapCache = PersistentNetworkMapCache(TestingNamedCacheFactory(), database, InMemoryIdentityService(trustRoot = DEV_ROOT_CA.certificate))</ID>
<ID>MaxLineLength:PersistentScheduledFlowRepository.kt$PersistentScheduledFlowRepository$private</ID>
<ID>MaxLineLength:PersistentScheduledFlowRepository.kt$PersistentScheduledFlowRepository$return Pair(StateRef(SecureHash.parse(txId), index), ScheduledStateRef(StateRef(SecureHash.parse(txId), index), scheduledStateRecord.scheduledAt))</ID>

View File

@ -1,5 +1,6 @@
package net.corda.node.services.network
import net.corda.core.identity.CordaX500Name
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.schemas.NodeInfoSchemaV1
@ -20,6 +21,17 @@ class PersistentNetworkMapCacheTest {
private companion object {
val ALICE = TestIdentity(ALICE_NAME, 70)
val BOB = TestIdentity(BOB_NAME, 80)
val CHARLIE = TestIdentity(CHARLIE_NAME, 90)
val LONG_X500_NAME = CordaX500Name(
commonName = "AB123456789012345678901234567890123456789012345678901234567890",
organisationUnit = "AB123456789012345678901234567890123456789012345678901234567890",
organisation = "Long Plc",
locality = "AB123456789012345678901234567890123456789012345678901234567890",
state = "AB123456789012345678901234567890123456789012345678901234567890",
country= "IT")
val LONG_PLC = TestIdentity(LONG_X500_NAME, 95)
val LONGER_PLC = TestIdentity(LONG_X500_NAME.copy(organisation = "Longer Plc"), 96)
}
@Rule
@ -100,6 +112,53 @@ class PersistentNetworkMapCacheTest {
assertThat(nodeInfos).hasSize(2)
}
@Test
fun `negative test - attempt to insert invalid node info`() {
charlieNetMapCache.addNode(createNodeInfo(listOf(LONG_PLC)))
assertThat(charlieNetMapCache.allNodes).hasSize(0)
}
@Test
fun `negative test - attempt to update existing node with invalid node info`() {
charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE)))
val aliceUpdate = TestIdentity(LONG_X500_NAME, ALICE.keyPair)
charlieNetMapCache.addNode(createNodeInfo(listOf(aliceUpdate)))
assertThat(charlieNetMapCache.allNodes).hasSize(1)
assertThat(charlieNetMapCache.getNodeByLegalName(ALICE_NAME)).isNotNull
assertThat(charlieNetMapCache.getNodeByLegalName(LONG_X500_NAME)).isNull()
}
@Test
fun `negative test - insert two valid node infos and one invalid one`() {
charlieNetMapCache.addNodes(listOf(createNodeInfo(listOf(ALICE)),
createNodeInfo(listOf(BOB)),
createNodeInfo(listOf(LONG_PLC))))
assertThat(charlieNetMapCache.allNodes).hasSize(2)
assertThat(charlieNetMapCache.allNodes.flatMap { it.legalIdentities }).isEqualTo(listOf(ALICE.party, BOB.party))
}
@Test
fun `negative test - insert three valid node infos and two invalid ones`() {
charlieNetMapCache.addNodes(listOf(createNodeInfo(listOf(LONG_PLC)),
createNodeInfo(listOf(ALICE)),
createNodeInfo(listOf(BOB)),
createNodeInfo(listOf(CHARLIE)),
createNodeInfo(listOf(LONGER_PLC))))
assertThat(charlieNetMapCache.allNodes).hasSize(3)
assertThat(charlieNetMapCache.allNodes.flatMap { it.legalIdentities }).isEqualTo(listOf(ALICE.party, BOB.party, CHARLIE.party))
}
@Test
fun `negative test - insert one valid node info then attempt to add one invalid node info and update the existing valid nodeinfo`() {
charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE)))
val aliceUpdate = TestIdentity(LONG_X500_NAME, ALICE.keyPair)
charlieNetMapCache.addNodes(listOf(createNodeInfo(listOf(aliceUpdate)),
createNodeInfo(listOf(LONGER_PLC)), createNodeInfo(listOf(BOB))))
assertThat(charlieNetMapCache.allNodes).hasSize(2)
assertThat(charlieNetMapCache.getNodeByLegalName(ALICE_NAME)).isNotNull
assertThat(charlieNetMapCache.getNodeByLegalName(BOB_NAME)).isNotNull
}
private fun createNodeInfo(identities: List<TestIdentity>,
address: NetworkHostAndPort = NetworkHostAndPort("localhost", portCounter++)): NodeInfo {
return NodeInfo(

View File

@ -34,6 +34,7 @@ import rx.subjects.PublishSubject
import java.security.PublicKey
import java.util.*
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.PersistenceException
/** Database-based network map cache. */
@ThreadSafe
@ -184,20 +185,44 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
else -> logger.info("Previous node was identical to incoming one - doing nothing")
}
}
/**
* This algorithm protects against database failure (eg. attempt to persist a nodeInfo entry larger than permissible by the
* database X500Name) without sacrificing performance incurred by attempting to flush nodeInfo's individually.
* Upon database transaction failure, the list of new nodeInfo's is split in half, and then each half is persisted independently.
* This continues recursively until all valid nodeInfo's are persisted, and failed ones reported as warnings.
*/
recursivelyUpdateNodes(newNodes.map { nodeInfo -> Pair(nodeInfo, MapChange.Added(nodeInfo)) } +
updatedNodes.map { (nodeInfo, previousNodeInfo) -> Pair(nodeInfo, MapChange.Modified(nodeInfo, previousNodeInfo)) })
}
}
database.transaction {
updatedNodes.forEach { (node, previousNode) ->
//updated
updateInfoDB(node, session)
changePublisher.onNext(MapChange.Modified(node, previousNode))
}
newNodes.forEach { node ->
//new
updateInfoDB(node, session)
changePublisher.onNext(MapChange.Added(node))
private fun recursivelyUpdateNodes(nodeUpdates: List<Pair<NodeInfo, MapChange>>) {
try {
persistNodeUpdates(nodeUpdates)
}
catch (e: PersistenceException) {
if (nodeUpdates.isNotEmpty()) {
when {
nodeUpdates.size > 1 -> {
// persist first half
val nodeUpdatesLow = nodeUpdates.subList(0, (nodeUpdates.size / 2))
recursivelyUpdateNodes(nodeUpdatesLow)
// persist second half
val nodeUpdatesHigh = nodeUpdates.subList((nodeUpdates.size / 2), nodeUpdates.size)
recursivelyUpdateNodes(nodeUpdatesHigh)
}
else -> logger.warn("Failed to add or update node with info: ${nodeUpdates.single()}")
}
}
}
}
private fun persistNodeUpdates(nodeUpdates: List<Pair<NodeInfo, MapChange>>) {
database.transaction {
nodeUpdates.forEach { (nodeInfo, change) ->
updateInfoDB(nodeInfo, session)
changePublisher.onNext(change)
}
}
}

View File

@ -2,6 +2,7 @@ package net.corda.node.services.network
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.sha256
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.sign
import net.corda.core.serialization.serialize
import net.corda.core.utilities.seconds
@ -71,6 +72,26 @@ class NetworkMapClientTest {
assertEquals(nodeInfo2, networkMapClient.getNodeInfo(nodeInfoHash2))
}
@Test
fun `negative test - registered invalid node is added to the network map`() {
val invalidLongNodeName = CordaX500Name(
commonName = "AB123456789012345678901234567890123456789012345678901234567890",
organisationUnit = "AB123456789012345678901234567890123456789012345678901234567890",
organisation = "Long Plc",
locality = "AB123456789012345678901234567890123456789012345678901234567890",
state = "AB123456789012345678901234567890123456789012345678901234567890",
country= "IT")
val (nodeInfo, signedNodeInfo) = createNodeInfoAndSigned(invalidLongNodeName)
networkMapClient.publish(signedNodeInfo)
val nodeInfoHash = nodeInfo.serialize().sha256()
assertThat(networkMapClient.getNetworkMap().payload.nodeInfoHashes).containsExactly(nodeInfoHash)
assertEquals(nodeInfo, networkMapClient.getNodeInfo(nodeInfoHash))
}
@Test
fun `errors return a meaningful error message`() {
val nodeInfoBuilder = TestNodeInfoBuilder()