From 59ff476a409270999cf63cce7adaa273a544d959 Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Tue, 28 Nov 2023 11:15:21 +0000 Subject: [PATCH] ENT-11050: Support flag to indicate if in transaction back chain resolution, and use it to drop deserialization errors to warn (#7591) * Support flag to indicate if in transaction back chain resolution, and use it to drop deserialization errors to warn * Fix and rename test that has different outcome now --- .../node/VaultUpdateDeserializationTest.kt | 33 ++++++++----------- .../node/services/DbTransactionsResolver.kt | 3 +- .../node/services/api/ServiceHubInternal.kt | 14 ++++---- .../node/services/api/VaultServiceInternal.kt | 3 +- .../node/services/vault/NodeVaultService.kt | 19 ++++++----- 5 files changed, 36 insertions(+), 36 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/VaultUpdateDeserializationTest.kt b/node/src/integration-test/kotlin/net/corda/node/VaultUpdateDeserializationTest.kt index 9b0bc3f0e9..48aadf45d4 100644 --- a/node/src/integration-test/kotlin/net/corda/node/VaultUpdateDeserializationTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/VaultUpdateDeserializationTest.kt @@ -4,7 +4,6 @@ import co.paralleluniverse.strands.Strand import junit.framework.TestCase.assertEquals import junit.framework.TestCase.assertNotNull import junit.framework.TestCase.assertTrue -import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.internal.InputStreamAndHash import net.corda.core.internal.deleteRecursively import net.corda.core.internal.div @@ -55,13 +54,15 @@ class VaultUpdateDeserializationTest { /* * Transaction sent from A -> B with Notarisation * Test that a deserialization error is raised where the receiver node of a transaction has an incompatible contract jar. - * In the case of a notarised transaction, a deserialisation error is thrown in the receiver SignTransactionFlow (before finality) - * upon receiving the transaction to be signed and attempting to record its dependencies. - * The ledger will not record any transactions, and the flow must be retried by the sender upon installing the correct contract jar + * But only on new transactions, and not in the back chain. + * In the case of a notarised transaction, a deserialisation error is thrown in the receiver in the second phase of finality + * when updating the vault. The sender will not block, and the back chain is successfully recorded + * on the receiver even though those states have deserialization errors too. The flow on the receiver is hospitalised. + * The flow will be retried by the receiver upon installing the correct contract jar * version at the receiver and re-starting the node. */ - @Test(timeout=300_000) - fun `Notarised transaction fails completely upon receiver deserialization failure collecting signatures when using incompatible contract jar`() { + @Test(timeout = 300_000) + fun `Notarised transaction fails but back chain succeeds upon receiver deserialization failure when using incompatible contract jar`() { driver(driverParameters(listOf(flowVersion1, contractVersion1))) { val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), providedName = ALICE_NAME).getOrThrow() @@ -75,20 +76,15 @@ class VaultUpdateDeserializationTest { val stx = alice.rpc.startFlow(::AttachmentIssueFlow, hash, defaultNotaryIdentity).returnValue.getOrThrow(30.seconds) val spendableState = stx.coreTransaction.outRef(0) - // NOTE: exception is propagated from Receiver - try { - alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds) - } - catch(e: UnexpectedFlowEndException) { - println("Bob fails to deserialise transaction upon receipt of transaction for signing.") - } + alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds) + assertEquals(0, bob.rpc.vaultQueryBy().states.size) assertEquals(1, alice.rpc.vaultQueryBy().states.size) // check transaction records @Suppress("DEPRECATION") - assertEquals(1, alice.rpc.internalVerifiedTransactionsSnapshot().size) // issuance only + assertEquals(2, alice.rpc.internalVerifiedTransactionsSnapshot().size) // both @Suppress("DEPRECATION") - assertTrue(bob.rpc.internalVerifiedTransactionsSnapshot().isEmpty()) + assertEquals(1, bob.rpc.internalVerifiedTransactionsSnapshot().size) // issuance only // restart Bob with correct contract jar version (bob as OutOfProcess).process.destroyForcibly() @@ -97,13 +93,12 @@ class VaultUpdateDeserializationTest { val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), providedName = BOB_NAME).getOrThrow() - // re-run failed flow - alice.rpc.startFlow(::AttachmentFlowV1, restartedBob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds) - + // original hospitalized transaction should now have been re-processed with correct contract jar assertEquals(1, waitForVaultUpdate(restartedBob)) assertEquals(1, alice.rpc.vaultQueryBy().states.size) @Suppress("DEPRECATION") - assertTrue(restartedBob.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty()) + assertEquals(2, restartedBob.rpc.internalVerifiedTransactionsSnapshot().size) // both + assertEquals(1, restartedBob.rpc.vaultQueryBy().states.size) } } diff --git a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt index 668b95ff92..9a8e9d5eda 100644 --- a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt +++ b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt @@ -13,6 +13,7 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.debug import net.corda.core.utilities.seconds import net.corda.core.utilities.trace +import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.WritableTransactionStorage import java.util.* @@ -107,7 +108,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa } if (txStatus == TransactionStatus.UNVERIFIED) { tx.verify(flow.serviceHub) - flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx)) + (flow.serviceHub as ServiceHubInternal).recordTransactions(usedStatesToRecord, listOf(tx), false, disableSoftLocking = true) } else { logger.debug { "No need to record $txId as it's already been verified" } } diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 0ee732b210..b283c137c4 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -5,8 +5,8 @@ import net.corda.core.context.InvocationContext import net.corda.core.crypto.SecureHash import net.corda.core.crypto.TransactionSignature import net.corda.core.flows.FlowLogic -import net.corda.core.flows.TransactionMetadata import net.corda.core.flows.StateMachineRunId +import net.corda.core.flows.TransactionMetadata import net.corda.core.identity.CordaX500Name import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.internal.NamedCacheFactory @@ -38,7 +38,6 @@ import net.corda.node.services.persistence.AttachmentStorageInternal import net.corda.node.services.statemachine.ExternalEvent import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.nodeapi.internal.persistence.CordaPersistence -import java.lang.IllegalStateException import java.security.SignatureException import java.util.ArrayList import java.util.Collections @@ -88,6 +87,7 @@ interface ServiceHubInternal : ServiceHubCoreInternal { stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage, vaultService: VaultServiceInternal, database: CordaPersistence, + disableSoftLocking: Boolean = false, updateFn: (SignedTransaction) -> Boolean = validatedTransactions::addTransaction ) { @@ -147,7 +147,7 @@ interface ServiceHubInternal : ServiceHubCoreInternal { // // Because the primary use case for recording irrelevant states is observer/regulator nodes, who are unlikely // to make writes to the ledger very often or at all, we choose to punt this issue for the time being. - vaultService.notifyAll(statesToRecord, recordedTransactions.map { it.coreTransaction }, previouslySeenTxs.map { it.coreTransaction }) + vaultService.notifyAll(statesToRecord, recordedTransactions.map { it.coreTransaction }, previouslySeenTxs.map { it.coreTransaction }, disableSoftLocking) } } @@ -202,15 +202,14 @@ interface ServiceHubInternal : ServiceHubCoreInternal { @Suppress("NestedBlockDepth") @VisibleForTesting - fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable, disableSignatureVerification: Boolean) { + fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable, disableSignatureVerification: Boolean, disableSoftLocking: Boolean = false) { txs.forEach { requireSupportedHashType(it) if (it.coreTransaction is WireTransaction) { if (disableSignatureVerification) { log.warnOnce("The current usage of recordTransactions is unsafe." + "Recording transactions without signature verification may lead to severe problems with ledger consistency.") - } - else { + } else { try { it.verifyRequiredSignatures() } @@ -226,7 +225,8 @@ interface ServiceHubInternal : ServiceHubCoreInternal { validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, - database + database, + disableSoftLocking ) } diff --git a/node/src/main/kotlin/net/corda/node/services/api/VaultServiceInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/VaultServiceInternal.kt index e95ba4b015..3c54a3321d 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/VaultServiceInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/VaultServiceInternal.kt @@ -15,7 +15,8 @@ interface VaultServiceInternal : VaultService { * indicate whether an update consists entirely of regular or notary change transactions, which may require * different processing logic. */ - fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable, previouslySeenTxns: Iterable = emptyList()) + fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable, previouslySeenTxns: Iterable = emptyList(), + disableSoftLocking: Boolean = false) /** * Same as notifyAll but with a single transaction. diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index a2623b638d..e0fdce29c0 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -69,7 +69,8 @@ import java.security.PublicKey import java.sql.SQLException import java.time.Clock import java.time.Instant -import java.util.* +import java.util.Arrays +import java.util.UUID import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArraySet import java.util.stream.Stream @@ -80,8 +81,6 @@ import javax.persistence.criteria.CriteriaQuery import javax.persistence.criteria.CriteriaUpdate import javax.persistence.criteria.Predicate import javax.persistence.criteria.Root -import kotlin.collections.ArrayList -import kotlin.collections.LinkedHashSet import kotlin.collections.component1 import kotlin.collections.component2 @@ -284,12 +283,13 @@ class NodeVaultService( internal val publishUpdates get() = mutex.locked { updatesPublisher } /** Groups adjacent transactions into batches to generate separate net updates per transaction type. */ - override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable, previouslySeenTxns: Iterable) { + override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable, previouslySeenTxns: Iterable, + disableSoftLocking: Boolean) { if (statesToRecord == StatesToRecord.NONE || (!txns.any() && !previouslySeenTxns.any())) return val batch = mutableListOf() fun flushBatch(previouslySeen: Boolean) { - val updates = makeUpdates(batch, statesToRecord, previouslySeen) + val updates = makeUpdates(batch, statesToRecord, previouslySeen, disableSoftLocking) processAndNotify(updates) batch.clear() } @@ -308,7 +308,8 @@ class NodeVaultService( processTransactions(txns, false) } - private fun makeUpdates(batch: Iterable, statesToRecord: StatesToRecord, previouslySeen: Boolean): List> { + @Suppress("ComplexMethod", "ThrowsCount") + private fun makeUpdates(batch: Iterable, statesToRecord: StatesToRecord, previouslySeen: Boolean, disableSoftLocking: Boolean): List> { fun withValidDeserialization(list: List, txId: SecureHash): Map { var error: TransactionDeserialisationException? = null @@ -320,13 +321,15 @@ class NodeVaultService( // This will cause a failure as we can't deserialize such states in the context of the `appClassloader`. // For now we ignore these states. // In the future we will use the AttachmentsClassloader to correctly deserialize and asses the relevancy. - if (IGNORE_TRANSACTION_DESERIALIZATION_ERRORS) { + // Disabled if soft locking disabled, as assumes you are in the back chain and that maybe it is less important than top + // level transaction. + if (IGNORE_TRANSACTION_DESERIALIZATION_ERRORS || disableSoftLocking) { log.warnOnce("The current usage of transaction deserialization for the vault is unsafe." + "Ignoring vault updates due to failed deserialized states may lead to severe problems with ledger consistency. ") log.warn("Could not deserialize state $idx from transaction $txId. Cause: $e") } else { log.error("Could not deserialize state $idx from transaction $txId. Cause: $e") - if(error == null) error = e + if (error == null) error = e } null }