mirror of
https://github.com/corda/corda.git
synced 2024-12-20 05:28:21 +00:00
ENT-6875 Deserialisation exception on output state can cause ledger inconsistency (#7476)
This commit is contained in:
parent
a90e0b4cfb
commit
b16c93e76b
@ -0,0 +1,33 @@
|
||||
package net.corda.contracts.incompatible.version1
|
||||
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.TransactionVerificationException
|
||||
import net.corda.core.contracts.TypeOnlyCommandData
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.serialization.internal.AttachmentsClassLoader
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
|
||||
class AttachmentContract : Contract {
|
||||
|
||||
private val FAIL_CONTRACT_VERIFY = java.lang.Boolean.getBoolean("net.corda.contracts.incompatible.AttachmentContract.fail.verify")
|
||||
override fun verify(tx: LedgerTransaction) {
|
||||
if (FAIL_CONTRACT_VERIFY) throw object:TransactionVerificationException(tx.id, "AttachmentContract verify failed.", null) {}
|
||||
val state = tx.outputsOfType<State>().single()
|
||||
// we check that at least one has the matching hash, the other will be the contract
|
||||
require(tx.attachments.any { it.id == state.hash }) {"At least one attachment in transaction must match hash ${state.hash}"}
|
||||
}
|
||||
|
||||
object Command : TypeOnlyCommandData()
|
||||
|
||||
data class State(val hash: SecureHash.SHA256) : ContractState {
|
||||
private val FAIL_CONTRACT_STATE = java.lang.Boolean.getBoolean("net.corda.contracts.incompatible.AttachmentContract.fail.state") && (this.javaClass.classLoader !is AttachmentsClassLoader)
|
||||
init {
|
||||
if (FAIL_CONTRACT_STATE) throw TransactionVerificationException.TransactionRequiredContractUnspecifiedException(hash,"AttachmentContract state initialisation failed.")
|
||||
}
|
||||
override val participants: List<AbstractParty> = emptyList()
|
||||
}
|
||||
}
|
||||
|
||||
const val ATTACHMENT_PROGRAM_ID = "net.corda.contracts.incompatible.version1.AttachmentContract"
|
@ -0,0 +1,25 @@
|
||||
package net.corda.contracts.incompatible.version2
|
||||
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.TypeOnlyCommandData
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
|
||||
class AttachmentContract : Contract {
|
||||
override fun verify(tx: LedgerTransaction) {
|
||||
val state = tx.outputsOfType<State>().single()
|
||||
// we check that at least one has the matching hash, the other will be the contract
|
||||
require(tx.attachments.any { it.id == SecureHash.SHA256(state.opaqueBytes.bytes) }) {"At least one attachment in transaction must match hash ${state.opaqueBytes}"}
|
||||
}
|
||||
|
||||
object Command : TypeOnlyCommandData()
|
||||
|
||||
data class State(val opaqueBytes: OpaqueBytes) : ContractState {
|
||||
override val participants: List<AbstractParty> = emptyList()
|
||||
}
|
||||
}
|
||||
|
||||
const val ATTACHMENT_PROGRAM_ID = "net.corda.contracts.incompatible.version2.AttachmentContract"
|
@ -0,0 +1,94 @@
|
||||
package net.corda.flows.incompatible.version1
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.contracts.incompatible.version1.ATTACHMENT_PROGRAM_ID
|
||||
import net.corda.contracts.incompatible.version1.AttachmentContract
|
||||
import net.corda.core.contracts.Command
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.CollectSignaturesFlow
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.ReceiveFinalityFlow
|
||||
import net.corda.core.flows.SignTransactionFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.unwrap
|
||||
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class AttachmentFlow(private val otherSide: Party,
|
||||
private val notary: Party,
|
||||
private val attachId: SecureHash.SHA256,
|
||||
private val notariseInputState: StateAndRef<AttachmentContract.State>? = null) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
object SIGNING : ProgressTracker.Step("Signing transaction")
|
||||
|
||||
override val progressTracker: ProgressTracker = ProgressTracker(SIGNING)
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val session = initiateFlow(otherSide)
|
||||
val notarise = notariseInputState != null
|
||||
session.send(notarise) // inform peer whether to sign for notarisation
|
||||
|
||||
// Create a trivial transaction with an output that describes the attachment, and the attachment itself
|
||||
val ptx = TransactionBuilder(notary)
|
||||
.addOutputState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID)
|
||||
.addAttachment(attachId)
|
||||
if (notarise) {
|
||||
ptx.addInputState(notariseInputState!!)
|
||||
ptx.addCommand(AttachmentContract.Command, ourIdentity.owningKey, otherSide.owningKey)
|
||||
}
|
||||
else
|
||||
ptx.addCommand(AttachmentContract.Command, ourIdentity.owningKey)
|
||||
|
||||
progressTracker.currentStep = SIGNING
|
||||
|
||||
val stx = serviceHub.signInitialTransaction(ptx)
|
||||
val ftx = if (notarise) {
|
||||
subFlow(CollectSignaturesFlow(stx, listOf(session)))
|
||||
} else stx
|
||||
|
||||
return subFlow(FinalityFlow(ftx, setOf(session), statesToRecord = StatesToRecord.ALL_VISIBLE))
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(AttachmentFlow::class)
|
||||
class StoreAttachmentFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val notarise = otherSide.receive<Boolean>().unwrap { it }
|
||||
if (notarise) {
|
||||
val stx = subFlow(object : SignTransactionFlow(otherSide) {
|
||||
override fun checkTransaction(stx: SignedTransaction) {
|
||||
}
|
||||
})
|
||||
subFlow(ReceiveFinalityFlow(otherSide, stx.id, statesToRecord = StatesToRecord.ALL_VISIBLE))
|
||||
} else {
|
||||
subFlow(ReceiveFinalityFlow(otherSide, statesToRecord = StatesToRecord.ALL_VISIBLE))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class AttachmentIssueFlow(private val attachId: SecureHash.SHA256,
|
||||
private val notary: Party): FlowLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val builder = TransactionBuilder(notary)
|
||||
builder.addAttachment(attachId)
|
||||
builder.addOutputState(TransactionState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID, notary))
|
||||
builder.addCommand(Command(AttachmentContract.Command, listOf(ourIdentity.owningKey)))
|
||||
val tx = serviceHub.signInitialTransaction(builder, ourIdentity.owningKey)
|
||||
return subFlow(FinalityFlow(tx, emptySet<FlowSession>(), statesToRecord = StatesToRecord.ALL_VISIBLE))
|
||||
}
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
package net.corda.flows.incompatible.version2
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.contracts.incompatible.version2.ATTACHMENT_PROGRAM_ID
|
||||
import net.corda.contracts.incompatible.version2.AttachmentContract
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.ReceiveTransactionFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class AttachmentFlow(private val otherSide: Party,
|
||||
private val notary: Party,
|
||||
private val attachId: SecureHash.SHA256) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
object SIGNING : ProgressTracker.Step("Signing transaction")
|
||||
|
||||
override val progressTracker: ProgressTracker = ProgressTracker(SIGNING)
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
// Create a trivial transaction with an output that describes the attachment, and the attachment itself
|
||||
val ptx = TransactionBuilder(notary)
|
||||
.addOutputState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID)
|
||||
.addCommand(AttachmentContract.Command, ourIdentity.owningKey)
|
||||
.addAttachment(attachId)
|
||||
|
||||
progressTracker.currentStep = SIGNING
|
||||
|
||||
val stx = serviceHub.signInitialTransaction(ptx)
|
||||
|
||||
// Send the transaction to the other recipient
|
||||
return subFlow(FinalityFlow(stx, initiateFlow(otherSide)))
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(AttachmentFlow::class)
|
||||
class StoreAttachmentFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
// purposely prevent transaction verification and recording in ReceiveTransactionFlow
|
||||
val stx = subFlow(ReceiveTransactionFlow(otherSide, checkSufficientSignatures = false, statesToRecord = StatesToRecord.ALL_VISIBLE))
|
||||
logger.info("StoreAttachmentFlow: successfully received fully signed tx. Sending it to the vault for processing.")
|
||||
|
||||
serviceHub.recordTransactions(StatesToRecord.ALL_VISIBLE, setOf(stx))
|
||||
logger.info("StoreAttachmentFlow: successfully recorded received transaction locally.")
|
||||
}
|
||||
}
|
@ -0,0 +1,56 @@
|
||||
package net.corda.flows.incompatible.version3
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.contracts.incompatible.version1.ATTACHMENT_PROGRAM_ID
|
||||
import net.corda.contracts.incompatible.version1.AttachmentContract
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.ReceiveTransactionFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class AttachmentFlow(private val otherSide: Party,
|
||||
private val notary: Party,
|
||||
private val attachId: SecureHash.SHA256) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
object SIGNING : ProgressTracker.Step("Signing transaction")
|
||||
|
||||
override val progressTracker: ProgressTracker = ProgressTracker(SIGNING)
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
// Create a trivial transaction with an output that describes the attachment, and the attachment itself
|
||||
val ptx = TransactionBuilder(notary)
|
||||
.addOutputState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID)
|
||||
.addCommand(AttachmentContract.Command, ourIdentity.owningKey)
|
||||
.addAttachment(attachId)
|
||||
|
||||
progressTracker.currentStep = SIGNING
|
||||
|
||||
val stx = serviceHub.signInitialTransaction(ptx)
|
||||
|
||||
// Send the transaction to the other recipient
|
||||
return subFlow(FinalityFlow(stx, initiateFlow(otherSide)))
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(AttachmentFlow::class)
|
||||
class StoreAttachmentFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
// purposely enable transaction verification and recording in ReceiveTransactionFlow
|
||||
subFlow(ReceiveTransactionFlow(otherSide, checkSufficientSignatures = true, statesToRecord = StatesToRecord.ALL_VISIBLE))
|
||||
logger.info("StoreAttachmentFlow: successfully received fully signed tx. Sending it to the vault for processing.")
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,249 @@
|
||||
package net.corda.node
|
||||
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import junit.framework.TestCase.assertEquals
|
||||
import junit.framework.TestCase.assertNotNull
|
||||
import junit.framework.TestCase.assertTrue
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
import net.corda.core.internal.InputStreamAndHash
|
||||
import net.corda.core.internal.deleteRecursively
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.messaging.vaultQueryBy
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.flows.incompatible.version1.AttachmentIssueFlow
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.NodeParameters
|
||||
import net.corda.testing.driver.OutOfProcess
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.driver.internal.incrementalPortAllocation
|
||||
import net.corda.testing.flows.waitForAllFlowsToComplete
|
||||
import net.corda.testing.node.NotarySpec
|
||||
import net.corda.testing.node.TestCordapp
|
||||
import net.corda.testing.node.internal.cordappWithPackages
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.TimeoutException
|
||||
import net.corda.contracts.incompatible.version1.AttachmentContract as AttachmentContractV1
|
||||
import net.corda.flows.incompatible.version1.AttachmentFlow as AttachmentFlowV1
|
||||
|
||||
class VaultUpdateDeserializationTest {
|
||||
companion object {
|
||||
// uses ReceiveFinalityFlow
|
||||
val flowVersion1 = cordappWithPackages("net.corda.flows.incompatible.version1")
|
||||
// single state field of type SecureHash.SHA256 with system property driven run-time behaviour:
|
||||
// -force contract verify failure: -Dnet.corda.contracts.incompatible.AttachmentContract.fail.verify=true
|
||||
// -force contract state init failure: -Dnet.corda.contracts.incompatible.AttachmentContract.fail.state=true
|
||||
val contractVersion1 = cordappWithPackages("net.corda.contracts.incompatible.version1")
|
||||
|
||||
fun driverParameters(cordapps: List<TestCordapp>): DriverParameters {
|
||||
return DriverParameters(
|
||||
portAllocation = incrementalPortAllocation(),
|
||||
inMemoryDB = false,
|
||||
startNodesInProcess = false,
|
||||
notarySpecs = listOf(NotarySpec(DUMMY_NOTARY_NAME)),
|
||||
cordappsForAllNodes = cordapps
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Transaction sent from A -> B with Notarisation
|
||||
* Test that a deserialization error is raised where the receiver node of a transaction has an incompatible contract jar.
|
||||
* In the case of a notarised transaction, a deserialisation error is thrown in the receiver SignTransactionFlow (before finality)
|
||||
* upon receiving the transaction to be signed and attempting to record its dependencies.
|
||||
* The ledger will not record any transactions, and the flow must be retried by the sender upon installing the correct contract jar
|
||||
* version at the receiver and re-starting the node.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `Notarised transaction fails completely upon receiver deserialization failure collecting signatures when using incompatible contract jar`() {
|
||||
driver(driverParameters(listOf(flowVersion1, contractVersion1))) {
|
||||
val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1),
|
||||
systemProperties = mapOf("net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true")),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
|
||||
val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0)
|
||||
alice.rpc.uploadAttachment(inputStream)
|
||||
|
||||
val stx = alice.rpc.startFlow(::AttachmentIssueFlow, hash, defaultNotaryIdentity).returnValue.getOrThrow(30.seconds)
|
||||
val spendableState = stx.coreTransaction.outRef<AttachmentContractV1.State>(0)
|
||||
|
||||
// NOTE: exception is propagated from Receiver
|
||||
try {
|
||||
alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds)
|
||||
}
|
||||
catch(e: UnexpectedFlowEndException) {
|
||||
println("Bob fails to deserialise transaction upon receipt of transaction for signing.")
|
||||
}
|
||||
assertEquals(0, bob.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
assertEquals(1, alice.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
// check transaction records
|
||||
@Suppress("DEPRECATION")
|
||||
assertEquals(1, alice.rpc.internalVerifiedTransactionsSnapshot().size) // issuance only
|
||||
@Suppress("DEPRECATION")
|
||||
assertTrue(bob.rpc.internalVerifiedTransactionsSnapshot().isEmpty())
|
||||
|
||||
// restart Bob with correct contract jar version
|
||||
(bob as OutOfProcess).process.destroyForcibly()
|
||||
bob.stop()
|
||||
(baseDirectory(BOB_NAME) / "cordapps").deleteRecursively()
|
||||
|
||||
val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
// re-run failed flow
|
||||
alice.rpc.startFlow(::AttachmentFlowV1, restartedBob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds)
|
||||
|
||||
assertEquals(1, waitForVaultUpdate(restartedBob))
|
||||
assertEquals(1, alice.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
@Suppress("DEPRECATION")
|
||||
assertTrue(restartedBob.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty())
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Transaction sent from A -> B with Notarisation
|
||||
*
|
||||
* Test original deserialization failure behaviour by setting a new configurable java system property.
|
||||
* The ledger will enter an inconsistent state from which is cannot auto-recover.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `Notarised transaction when using incompatible contract jar and overriden system property`() {
|
||||
driver(driverParameters(listOf(flowVersion1, contractVersion1))) {
|
||||
val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1),
|
||||
systemProperties = mapOf("net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true",
|
||||
"net.corda.vaultupdate.ignore.transaction.deserialization.errors" to "true",
|
||||
"net.corda.recordtransaction.signature.verification.disabled" to "true")),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
|
||||
val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0)
|
||||
alice.rpc.uploadAttachment(inputStream)
|
||||
|
||||
val stx = alice.rpc.startFlow(::AttachmentIssueFlow, hash, defaultNotaryIdentity).returnValue.getOrThrow(30.seconds)
|
||||
val spendableState = stx.coreTransaction.outRef<AttachmentContractV1.State>(0)
|
||||
|
||||
// Flow completes successfully (deserialisation error on Receiver node is ignored)
|
||||
alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds)
|
||||
|
||||
// sender node correctly updated
|
||||
@Suppress("DEPRECATION")
|
||||
assertEquals(2, alice.rpc.internalVerifiedTransactionsSnapshot().size)
|
||||
assertEquals(1, alice.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
|
||||
// receiver node has transaction but vault not updated!
|
||||
@Suppress("DEPRECATION")
|
||||
assertEquals(2, bob.rpc.internalVerifiedTransactionsSnapshot().size)
|
||||
assertEquals(0, bob.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Transaction sent from A -> B without Notarisation
|
||||
* Test that a deserialization error is raised where the receiver node of a finality flow has an incompatible contract jar.
|
||||
* The ledger will be temporarily inconsistent until the correct contract jar version is installed and the receiver node is re-started.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `un-notarised transaction is hospitalized at receiver upon deserialization failure in vault update when using incompatible contract jar`() {
|
||||
driver(driverParameters(emptyList())) {
|
||||
val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1),
|
||||
systemProperties = mapOf("net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true")),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
|
||||
val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0)
|
||||
alice.rpc.uploadAttachment(inputStream)
|
||||
|
||||
// ISSUE: exception is not propagating from Receiver
|
||||
try {
|
||||
alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, null).returnValue.getOrThrow(30.seconds)
|
||||
}
|
||||
catch(e: TimeoutException) {
|
||||
println("Alice: Timeout awaiting flow completion.")
|
||||
}
|
||||
assertEquals(0, bob.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
// check transaction records
|
||||
@Suppress("DEPRECATION")
|
||||
assertTrue(alice.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty())
|
||||
@Suppress("DEPRECATION")
|
||||
assertTrue(bob.rpc.internalVerifiedTransactionsSnapshot().isEmpty())
|
||||
|
||||
// restart Bob with correct contract jar version
|
||||
(bob as OutOfProcess).process.destroyForcibly()
|
||||
bob.stop()
|
||||
(baseDirectory(BOB_NAME) / "cordapps").deleteRecursively()
|
||||
|
||||
val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
// original hospitalized transaction should now have been re-processed with correct contract jar
|
||||
assertEquals(1, waitForVaultUpdate(restartedBob))
|
||||
@Suppress("DEPRECATION")
|
||||
assertTrue(restartedBob.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty())
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Transaction sent from A -> B without Notarisation
|
||||
* Test original deserialization failure behaviour by setting a new configurable java system property.
|
||||
* The ledger will enter an inconsistent state from which is cannot auto-recover.
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `un-notarised transaction ignores deserialization failure in vault update when using incompatible contract jar and overriden system property`() {
|
||||
driver(driverParameters(emptyList())) {
|
||||
val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1),
|
||||
systemProperties = mapOf(
|
||||
"net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true",
|
||||
"net.corda.vaultupdate.ignore.transaction.deserialization.errors" to "true",
|
||||
"net.corda.recordtransaction.signature.verification.disabled" to "true")),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
|
||||
val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0)
|
||||
alice.rpc.uploadAttachment(inputStream)
|
||||
|
||||
// Note: TransactionDeserialisationException is swallowed on the receiver node (without updating the vault).
|
||||
val stx = alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, null).returnValue.getOrThrow(30.seconds)
|
||||
println("Alice txId: ${stx.id}")
|
||||
|
||||
waitForAllFlowsToComplete(bob)
|
||||
val txId = bob.rpc.stateMachineRecordedTransactionMappingSnapshot().single().transactionId
|
||||
println("Bob txId: $txId")
|
||||
|
||||
assertEquals(0, bob.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
|
||||
// restart Bob with correct contract jar version
|
||||
(bob as OutOfProcess).process.destroyForcibly()
|
||||
bob.stop()
|
||||
(baseDirectory(BOB_NAME) / "cordapps").deleteRecursively()
|
||||
|
||||
val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
// transaction recorded
|
||||
@Suppress("DEPRECATION")
|
||||
assertNotNull(restartedBob.rpc.internalFindVerifiedTransaction(txId))
|
||||
// but vault states not updated
|
||||
assertEquals(0, restartedBob.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
}
|
||||
}
|
||||
|
||||
private fun waitForVaultUpdate(nodeHandle: NodeHandle, maxIterations: Int = 5, iterationDelay: Long = 500): Int {
|
||||
repeat((0..maxIterations).count()) {
|
||||
val count = nodeHandle.rpc.vaultQueryBy<AttachmentContractV1.State>().states
|
||||
if (count.isNotEmpty()) {
|
||||
return count.size
|
||||
}
|
||||
Strand.sleep(iterationDelay)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.tee
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.internal.warnOnce
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.node.services.KeyManagementService
|
||||
@ -107,6 +108,8 @@ class NodeVaultService(
|
||||
|
||||
const val DEFAULT_SOFT_LOCKING_SQL_IN_CLAUSE_SIZE = 16
|
||||
|
||||
private val IGNORE_TRANSACTION_DESERIALIZATION_ERRORS = java.lang.Boolean.getBoolean("net.corda.vaultupdate.ignore.transaction.deserialization.errors")
|
||||
|
||||
/**
|
||||
* Establish whether a given state is relevant to a node, given the node's public keys.
|
||||
*
|
||||
@ -307,18 +310,29 @@ class NodeVaultService(
|
||||
|
||||
private fun makeUpdates(batch: Iterable<CoreTransaction>, statesToRecord: StatesToRecord, previouslySeen: Boolean): List<Vault.Update<ContractState>> {
|
||||
|
||||
fun <T> withValidDeserialization(list: List<T>, txId: SecureHash): Map<Int, T> = (0 until list.size).mapNotNull { idx ->
|
||||
try {
|
||||
idx to list[idx]
|
||||
} catch (e: TransactionDeserialisationException) {
|
||||
// When resolving transaction dependencies we might encounter contracts we haven't installed locally.
|
||||
// This will cause a failure as we can't deserialize such states in the context of the `appClassloader`.
|
||||
// For now we ignore these states.
|
||||
// In the future we will use the AttachmentsClassloader to correctly deserialize and asses the relevancy.
|
||||
log.warn("Could not deserialize state $idx from transaction $txId. Cause: $e")
|
||||
null
|
||||
}
|
||||
}.toMap()
|
||||
fun <T> withValidDeserialization(list: List<T>, txId: SecureHash): Map<Int, T> {
|
||||
var error: TransactionDeserialisationException? = null
|
||||
val map = (0 until list.size).mapNotNull { idx ->
|
||||
try {
|
||||
idx to list[idx]
|
||||
} catch (e: TransactionDeserialisationException) {
|
||||
// When resolving transaction dependencies we might encounter contracts we haven't installed locally.
|
||||
// This will cause a failure as we can't deserialize such states in the context of the `appClassloader`.
|
||||
// For now we ignore these states.
|
||||
// In the future we will use the AttachmentsClassloader to correctly deserialize and asses the relevancy.
|
||||
if (IGNORE_TRANSACTION_DESERIALIZATION_ERRORS) {
|
||||
log.warnOnce("The current usage of transaction deserialization for the vault is unsafe." +
|
||||
"Ignoring vault updates due to failed deserialized states may lead to severe problems with ledger consistency. ")
|
||||
log.warn("Could not deserialize state $idx from transaction $txId. Cause: $e")
|
||||
} else {
|
||||
log.error("Could not deserialize state $idx from transaction $txId. Cause: $e")
|
||||
if(error == null) error = e
|
||||
}
|
||||
null
|
||||
}
|
||||
}.toMap()
|
||||
return error?.let { throw it } ?: map
|
||||
}
|
||||
|
||||
// Returns only output states that can be deserialised successfully.
|
||||
fun WireTransaction.deserializableOutputStates(): Map<Int, TransactionState<ContractState>> = withValidDeserialization(this.outputs, this.id)
|
||||
|
@ -36,7 +36,8 @@ data class NodeParameters(
|
||||
val additionalCordapps: Collection<TestCordapp> = emptySet(),
|
||||
val flowOverrides: Map<out Class<out FlowLogic<*>>, Class<out FlowLogic<*>>> = emptyMap(),
|
||||
val logLevelOverride: String? = null,
|
||||
val rpcAddress: NetworkHostAndPort? = null
|
||||
val rpcAddress: NetworkHostAndPort? = null,
|
||||
val systemProperties: Map<String, String> = emptyMap()
|
||||
) {
|
||||
/**
|
||||
* Create a new node parameters object with default values. Each parameter can be specified with its wither method which returns a copy
|
||||
@ -127,4 +128,97 @@ data class NodeParameters(
|
||||
flowOverrides = flowOverrides,
|
||||
logLevelOverride = logLevelOverride)
|
||||
|
||||
constructor(
|
||||
providedName: CordaX500Name?,
|
||||
rpcUsers: List<User>,
|
||||
verifierType: VerifierType,
|
||||
customOverrides: Map<String, Any?>,
|
||||
startInSameProcess: Boolean?,
|
||||
maximumHeapSize: String,
|
||||
additionalCordapps: Collection<TestCordapp> = emptySet(),
|
||||
flowOverrides: Map<out Class<out FlowLogic<*>>, Class<out FlowLogic<*>>>,
|
||||
logLevelOverride: String? = null
|
||||
) : this(
|
||||
providedName,
|
||||
rpcUsers,
|
||||
verifierType,
|
||||
customOverrides,
|
||||
startInSameProcess,
|
||||
maximumHeapSize,
|
||||
additionalCordapps,
|
||||
flowOverrides,
|
||||
logLevelOverride,
|
||||
rpcAddress = null)
|
||||
|
||||
@Suppress("LongParameterList")
|
||||
fun copy(
|
||||
providedName: CordaX500Name?,
|
||||
rpcUsers: List<User>,
|
||||
verifierType: VerifierType,
|
||||
customOverrides: Map<String, Any?>,
|
||||
startInSameProcess: Boolean?,
|
||||
maximumHeapSize: String,
|
||||
additionalCordapps: Collection<TestCordapp> = emptySet(),
|
||||
flowOverrides: Map<out Class<out FlowLogic<*>>, Class<out FlowLogic<*>>>,
|
||||
logLevelOverride: String? = null
|
||||
) = this.copy(
|
||||
providedName = providedName,
|
||||
rpcUsers = rpcUsers,
|
||||
verifierType = verifierType,
|
||||
customOverrides = customOverrides,
|
||||
startInSameProcess = startInSameProcess,
|
||||
maximumHeapSize = maximumHeapSize,
|
||||
additionalCordapps = additionalCordapps,
|
||||
flowOverrides = flowOverrides,
|
||||
logLevelOverride = logLevelOverride,
|
||||
rpcAddress = rpcAddress)
|
||||
|
||||
constructor(
|
||||
providedName: CordaX500Name?,
|
||||
rpcUsers: List<User>,
|
||||
verifierType: VerifierType,
|
||||
customOverrides: Map<String, Any?>,
|
||||
startInSameProcess: Boolean?,
|
||||
maximumHeapSize: String,
|
||||
additionalCordapps: Collection<TestCordapp> = emptySet(),
|
||||
flowOverrides: Map<out Class<out FlowLogic<*>>, Class<out FlowLogic<*>>>,
|
||||
logLevelOverride: String? = null,
|
||||
rpcAddress: NetworkHostAndPort? = null
|
||||
) : this(
|
||||
providedName,
|
||||
rpcUsers,
|
||||
verifierType,
|
||||
customOverrides,
|
||||
startInSameProcess,
|
||||
maximumHeapSize,
|
||||
additionalCordapps,
|
||||
flowOverrides,
|
||||
logLevelOverride,
|
||||
rpcAddress,
|
||||
systemProperties = emptyMap())
|
||||
|
||||
@Suppress("LongParameterList")
|
||||
fun copy(
|
||||
providedName: CordaX500Name?,
|
||||
rpcUsers: List<User>,
|
||||
verifierType: VerifierType,
|
||||
customOverrides: Map<String, Any?>,
|
||||
startInSameProcess: Boolean?,
|
||||
maximumHeapSize: String,
|
||||
additionalCordapps: Collection<TestCordapp> = emptySet(),
|
||||
flowOverrides: Map<out Class<out FlowLogic<*>>, Class<out FlowLogic<*>>>,
|
||||
logLevelOverride: String? = null,
|
||||
rpcAddress: NetworkHostAndPort? = null
|
||||
) = this.copy(
|
||||
providedName = providedName,
|
||||
rpcUsers = rpcUsers,
|
||||
verifierType = verifierType,
|
||||
customOverrides = customOverrides,
|
||||
startInSameProcess = startInSameProcess,
|
||||
maximumHeapSize = maximumHeapSize,
|
||||
additionalCordapps = additionalCordapps,
|
||||
flowOverrides = flowOverrides,
|
||||
logLevelOverride = logLevelOverride,
|
||||
rpcAddress = rpcAddress,
|
||||
systemProperties = systemProperties)
|
||||
}
|
||||
|
@ -755,7 +755,7 @@ class DriverDSLImpl(
|
||||
debugPort,
|
||||
bytemanJarPath,
|
||||
bytemanPort,
|
||||
systemProperties,
|
||||
systemProperties + parameters.systemProperties,
|
||||
parameters.maximumHeapSize,
|
||||
parameters.logLevelOverride,
|
||||
identifier,
|
||||
|
Loading…
Reference in New Issue
Block a user