mirror of
https://github.com/corda/corda.git
synced 2025-01-18 02:39:51 +00:00
ENT-11050: Support flag to indicate if in transaction back chain resolution, and use it to drop deserialization errors to warn (#7591)
* Support flag to indicate if in transaction back chain resolution, and use it to drop deserialization errors to warn * Fix and rename test that has different outcome now
This commit is contained in:
parent
72e5c4fed2
commit
59ff476a40
@ -4,7 +4,6 @@ import co.paralleluniverse.strands.Strand
|
||||
import junit.framework.TestCase.assertEquals
|
||||
import junit.framework.TestCase.assertNotNull
|
||||
import junit.framework.TestCase.assertTrue
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
import net.corda.core.internal.InputStreamAndHash
|
||||
import net.corda.core.internal.deleteRecursively
|
||||
import net.corda.core.internal.div
|
||||
@ -55,13 +54,15 @@ class VaultUpdateDeserializationTest {
|
||||
/*
|
||||
* Transaction sent from A -> B with Notarisation
|
||||
* Test that a deserialization error is raised where the receiver node of a transaction has an incompatible contract jar.
|
||||
* In the case of a notarised transaction, a deserialisation error is thrown in the receiver SignTransactionFlow (before finality)
|
||||
* upon receiving the transaction to be signed and attempting to record its dependencies.
|
||||
* The ledger will not record any transactions, and the flow must be retried by the sender upon installing the correct contract jar
|
||||
* But only on new transactions, and not in the back chain.
|
||||
* In the case of a notarised transaction, a deserialisation error is thrown in the receiver in the second phase of finality
|
||||
* when updating the vault. The sender will not block, and the back chain is successfully recorded
|
||||
* on the receiver even though those states have deserialization errors too. The flow on the receiver is hospitalised.
|
||||
* The flow will be retried by the receiver upon installing the correct contract jar
|
||||
* version at the receiver and re-starting the node.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `Notarised transaction fails completely upon receiver deserialization failure collecting signatures when using incompatible contract jar`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `Notarised transaction fails but back chain succeeds upon receiver deserialization failure when using incompatible contract jar`() {
|
||||
driver(driverParameters(listOf(flowVersion1, contractVersion1))) {
|
||||
val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = ALICE_NAME).getOrThrow()
|
||||
@ -75,20 +76,15 @@ class VaultUpdateDeserializationTest {
|
||||
val stx = alice.rpc.startFlow(::AttachmentIssueFlow, hash, defaultNotaryIdentity).returnValue.getOrThrow(30.seconds)
|
||||
val spendableState = stx.coreTransaction.outRef<AttachmentContractV1.State>(0)
|
||||
|
||||
// NOTE: exception is propagated from Receiver
|
||||
try {
|
||||
alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds)
|
||||
}
|
||||
catch(e: UnexpectedFlowEndException) {
|
||||
println("Bob fails to deserialise transaction upon receipt of transaction for signing.")
|
||||
}
|
||||
alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds)
|
||||
|
||||
assertEquals(0, bob.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
assertEquals(1, alice.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
// check transaction records
|
||||
@Suppress("DEPRECATION")
|
||||
assertEquals(1, alice.rpc.internalVerifiedTransactionsSnapshot().size) // issuance only
|
||||
assertEquals(2, alice.rpc.internalVerifiedTransactionsSnapshot().size) // both
|
||||
@Suppress("DEPRECATION")
|
||||
assertTrue(bob.rpc.internalVerifiedTransactionsSnapshot().isEmpty())
|
||||
assertEquals(1, bob.rpc.internalVerifiedTransactionsSnapshot().size) // issuance only
|
||||
|
||||
// restart Bob with correct contract jar version
|
||||
(bob as OutOfProcess).process.destroyForcibly()
|
||||
@ -97,13 +93,12 @@ class VaultUpdateDeserializationTest {
|
||||
|
||||
val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
// re-run failed flow
|
||||
alice.rpc.startFlow(::AttachmentFlowV1, restartedBob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds)
|
||||
|
||||
// original hospitalized transaction should now have been re-processed with correct contract jar
|
||||
assertEquals(1, waitForVaultUpdate(restartedBob))
|
||||
assertEquals(1, alice.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
@Suppress("DEPRECATION")
|
||||
assertTrue(restartedBob.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty())
|
||||
assertEquals(2, restartedBob.rpc.internalVerifiedTransactionsSnapshot().size) // both
|
||||
assertEquals(1, restartedBob.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -13,6 +13,7 @@ import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import java.util.*
|
||||
|
||||
@ -107,7 +108,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
||||
}
|
||||
if (txStatus == TransactionStatus.UNVERIFIED) {
|
||||
tx.verify(flow.serviceHub)
|
||||
flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx))
|
||||
(flow.serviceHub as ServiceHubInternal).recordTransactions(usedStatesToRecord, listOf(tx), false, disableSoftLocking = true)
|
||||
} else {
|
||||
logger.debug { "No need to record $txId as it's already been verified" }
|
||||
}
|
||||
|
@ -5,8 +5,8 @@ import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.TransactionMetadata
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.flows.TransactionMetadata
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.FlowStateMachineHandle
|
||||
import net.corda.core.internal.NamedCacheFactory
|
||||
@ -38,7 +38,6 @@ import net.corda.node.services.persistence.AttachmentStorageInternal
|
||||
import net.corda.node.services.statemachine.ExternalEvent
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import java.lang.IllegalStateException
|
||||
import java.security.SignatureException
|
||||
import java.util.ArrayList
|
||||
import java.util.Collections
|
||||
@ -88,6 +87,7 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
|
||||
stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage,
|
||||
vaultService: VaultServiceInternal,
|
||||
database: CordaPersistence,
|
||||
disableSoftLocking: Boolean = false,
|
||||
updateFn: (SignedTransaction) -> Boolean = validatedTransactions::addTransaction
|
||||
) {
|
||||
|
||||
@ -147,7 +147,7 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
|
||||
//
|
||||
// Because the primary use case for recording irrelevant states is observer/regulator nodes, who are unlikely
|
||||
// to make writes to the ledger very often or at all, we choose to punt this issue for the time being.
|
||||
vaultService.notifyAll(statesToRecord, recordedTransactions.map { it.coreTransaction }, previouslySeenTxs.map { it.coreTransaction })
|
||||
vaultService.notifyAll(statesToRecord, recordedTransactions.map { it.coreTransaction }, previouslySeenTxs.map { it.coreTransaction }, disableSoftLocking)
|
||||
}
|
||||
}
|
||||
|
||||
@ -202,15 +202,14 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
|
||||
|
||||
@Suppress("NestedBlockDepth")
|
||||
@VisibleForTesting
|
||||
fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>, disableSignatureVerification: Boolean) {
|
||||
fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>, disableSignatureVerification: Boolean, disableSoftLocking: Boolean = false) {
|
||||
txs.forEach {
|
||||
requireSupportedHashType(it)
|
||||
if (it.coreTransaction is WireTransaction) {
|
||||
if (disableSignatureVerification) {
|
||||
log.warnOnce("The current usage of recordTransactions is unsafe." +
|
||||
"Recording transactions without signature verification may lead to severe problems with ledger consistency.")
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
try {
|
||||
it.verifyRequiredSignatures()
|
||||
}
|
||||
@ -226,7 +225,8 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
|
||||
validatedTransactions,
|
||||
stateMachineRecordedTransactionMapping,
|
||||
vaultService,
|
||||
database
|
||||
database,
|
||||
disableSoftLocking
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,8 @@ interface VaultServiceInternal : VaultService {
|
||||
* indicate whether an update consists entirely of regular or notary change transactions, which may require
|
||||
* different processing logic.
|
||||
*/
|
||||
fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>, previouslySeenTxns: Iterable<CoreTransaction> = emptyList())
|
||||
fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>, previouslySeenTxns: Iterable<CoreTransaction> = emptyList(),
|
||||
disableSoftLocking: Boolean = false)
|
||||
|
||||
/**
|
||||
* Same as notifyAll but with a single transaction.
|
||||
|
@ -69,7 +69,8 @@ import java.security.PublicKey
|
||||
import java.sql.SQLException
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.Arrays
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.CopyOnWriteArraySet
|
||||
import java.util.stream.Stream
|
||||
@ -80,8 +81,6 @@ import javax.persistence.criteria.CriteriaQuery
|
||||
import javax.persistence.criteria.CriteriaUpdate
|
||||
import javax.persistence.criteria.Predicate
|
||||
import javax.persistence.criteria.Root
|
||||
import kotlin.collections.ArrayList
|
||||
import kotlin.collections.LinkedHashSet
|
||||
import kotlin.collections.component1
|
||||
import kotlin.collections.component2
|
||||
|
||||
@ -284,12 +283,13 @@ class NodeVaultService(
|
||||
internal val publishUpdates get() = mutex.locked { updatesPublisher }
|
||||
|
||||
/** Groups adjacent transactions into batches to generate separate net updates per transaction type. */
|
||||
override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>, previouslySeenTxns: Iterable<CoreTransaction>) {
|
||||
override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>, previouslySeenTxns: Iterable<CoreTransaction>,
|
||||
disableSoftLocking: Boolean) {
|
||||
if (statesToRecord == StatesToRecord.NONE || (!txns.any() && !previouslySeenTxns.any())) return
|
||||
val batch = mutableListOf<CoreTransaction>()
|
||||
|
||||
fun flushBatch(previouslySeen: Boolean) {
|
||||
val updates = makeUpdates(batch, statesToRecord, previouslySeen)
|
||||
val updates = makeUpdates(batch, statesToRecord, previouslySeen, disableSoftLocking)
|
||||
processAndNotify(updates)
|
||||
batch.clear()
|
||||
}
|
||||
@ -308,7 +308,8 @@ class NodeVaultService(
|
||||
processTransactions(txns, false)
|
||||
}
|
||||
|
||||
private fun makeUpdates(batch: Iterable<CoreTransaction>, statesToRecord: StatesToRecord, previouslySeen: Boolean): List<Vault.Update<ContractState>> {
|
||||
@Suppress("ComplexMethod", "ThrowsCount")
|
||||
private fun makeUpdates(batch: Iterable<CoreTransaction>, statesToRecord: StatesToRecord, previouslySeen: Boolean, disableSoftLocking: Boolean): List<Vault.Update<ContractState>> {
|
||||
|
||||
fun <T> withValidDeserialization(list: List<T>, txId: SecureHash): Map<Int, T> {
|
||||
var error: TransactionDeserialisationException? = null
|
||||
@ -320,13 +321,15 @@ class NodeVaultService(
|
||||
// This will cause a failure as we can't deserialize such states in the context of the `appClassloader`.
|
||||
// For now we ignore these states.
|
||||
// In the future we will use the AttachmentsClassloader to correctly deserialize and asses the relevancy.
|
||||
if (IGNORE_TRANSACTION_DESERIALIZATION_ERRORS) {
|
||||
// Disabled if soft locking disabled, as assumes you are in the back chain and that maybe it is less important than top
|
||||
// level transaction.
|
||||
if (IGNORE_TRANSACTION_DESERIALIZATION_ERRORS || disableSoftLocking) {
|
||||
log.warnOnce("The current usage of transaction deserialization for the vault is unsafe." +
|
||||
"Ignoring vault updates due to failed deserialized states may lead to severe problems with ledger consistency. ")
|
||||
log.warn("Could not deserialize state $idx from transaction $txId. Cause: $e")
|
||||
} else {
|
||||
log.error("Could not deserialize state $idx from transaction $txId. Cause: $e")
|
||||
if(error == null) error = e
|
||||
if (error == null) error = e
|
||||
}
|
||||
null
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user