diff --git a/node/src/integration-test/kotlin/net/corda/contracts/incompatible/version1/AttachmentContract.kt b/node/src/integration-test/kotlin/net/corda/contracts/incompatible/version1/AttachmentContract.kt new file mode 100644 index 0000000000..f4c3fa965b --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/contracts/incompatible/version1/AttachmentContract.kt @@ -0,0 +1,33 @@ +package net.corda.contracts.incompatible.version1 + +import net.corda.core.contracts.Contract +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.TransactionVerificationException +import net.corda.core.contracts.TypeOnlyCommandData +import net.corda.core.crypto.SecureHash +import net.corda.core.identity.AbstractParty +import net.corda.core.serialization.internal.AttachmentsClassLoader +import net.corda.core.transactions.LedgerTransaction + +class AttachmentContract : Contract { + + private val FAIL_CONTRACT_VERIFY = java.lang.Boolean.getBoolean("net.corda.contracts.incompatible.AttachmentContract.fail.verify") + override fun verify(tx: LedgerTransaction) { + if (FAIL_CONTRACT_VERIFY) throw object:TransactionVerificationException(tx.id, "AttachmentContract verify failed.", null) {} + val state = tx.outputsOfType().single() + // we check that at least one has the matching hash, the other will be the contract + require(tx.attachments.any { it.id == state.hash }) {"At least one attachment in transaction must match hash ${state.hash}"} + } + + object Command : TypeOnlyCommandData() + + data class State(val hash: SecureHash.SHA256) : ContractState { + private val FAIL_CONTRACT_STATE = java.lang.Boolean.getBoolean("net.corda.contracts.incompatible.AttachmentContract.fail.state") && (this.javaClass.classLoader !is AttachmentsClassLoader) + init { + if (FAIL_CONTRACT_STATE) throw TransactionVerificationException.TransactionRequiredContractUnspecifiedException(hash,"AttachmentContract state initialisation failed.") + } + override val participants: List = emptyList() + } +} + +const val ATTACHMENT_PROGRAM_ID = "net.corda.contracts.incompatible.version1.AttachmentContract" diff --git a/node/src/integration-test/kotlin/net/corda/contracts/incompatible/version2/AttachmentContract.kt b/node/src/integration-test/kotlin/net/corda/contracts/incompatible/version2/AttachmentContract.kt new file mode 100644 index 0000000000..00bb7e2371 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/contracts/incompatible/version2/AttachmentContract.kt @@ -0,0 +1,25 @@ +package net.corda.contracts.incompatible.version2 + +import net.corda.core.contracts.Contract +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.TypeOnlyCommandData +import net.corda.core.crypto.SecureHash +import net.corda.core.identity.AbstractParty +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.utilities.OpaqueBytes + +class AttachmentContract : Contract { + override fun verify(tx: LedgerTransaction) { + val state = tx.outputsOfType().single() + // we check that at least one has the matching hash, the other will be the contract + require(tx.attachments.any { it.id == SecureHash.SHA256(state.opaqueBytes.bytes) }) {"At least one attachment in transaction must match hash ${state.opaqueBytes}"} + } + + object Command : TypeOnlyCommandData() + + data class State(val opaqueBytes: OpaqueBytes) : ContractState { + override val participants: List = emptyList() + } +} + +const val ATTACHMENT_PROGRAM_ID = "net.corda.contracts.incompatible.version2.AttachmentContract" diff --git a/node/src/integration-test/kotlin/net/corda/flows/incompatible/version1/AttachmentFlow.kt b/node/src/integration-test/kotlin/net/corda/flows/incompatible/version1/AttachmentFlow.kt new file mode 100644 index 0000000000..6a4ccd9d04 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/flows/incompatible/version1/AttachmentFlow.kt @@ -0,0 +1,94 @@ +package net.corda.flows.incompatible.version1 + +import co.paralleluniverse.fibers.Suspendable +import net.corda.contracts.incompatible.version1.ATTACHMENT_PROGRAM_ID +import net.corda.contracts.incompatible.version1.AttachmentContract +import net.corda.core.contracts.Command +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.TransactionState +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.CollectSignaturesFlow +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.ReceiveFinalityFlow +import net.corda.core.flows.SignTransactionFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party +import net.corda.core.node.StatesToRecord +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.ProgressTracker +import net.corda.core.utilities.unwrap + +@InitiatingFlow +@StartableByRPC +class AttachmentFlow(private val otherSide: Party, + private val notary: Party, + private val attachId: SecureHash.SHA256, + private val notariseInputState: StateAndRef? = null) : FlowLogic() { + + object SIGNING : ProgressTracker.Step("Signing transaction") + + override val progressTracker: ProgressTracker = ProgressTracker(SIGNING) + + @Suspendable + override fun call(): SignedTransaction { + val session = initiateFlow(otherSide) + val notarise = notariseInputState != null + session.send(notarise) // inform peer whether to sign for notarisation + + // Create a trivial transaction with an output that describes the attachment, and the attachment itself + val ptx = TransactionBuilder(notary) + .addOutputState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID) + .addAttachment(attachId) + if (notarise) { + ptx.addInputState(notariseInputState!!) + ptx.addCommand(AttachmentContract.Command, ourIdentity.owningKey, otherSide.owningKey) + } + else + ptx.addCommand(AttachmentContract.Command, ourIdentity.owningKey) + + progressTracker.currentStep = SIGNING + + val stx = serviceHub.signInitialTransaction(ptx) + val ftx = if (notarise) { + subFlow(CollectSignaturesFlow(stx, listOf(session))) + } else stx + + return subFlow(FinalityFlow(ftx, setOf(session), statesToRecord = StatesToRecord.ALL_VISIBLE)) + } +} + +@InitiatedBy(AttachmentFlow::class) +class StoreAttachmentFlow(private val otherSide: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val notarise = otherSide.receive().unwrap { it } + if (notarise) { + val stx = subFlow(object : SignTransactionFlow(otherSide) { + override fun checkTransaction(stx: SignedTransaction) { + } + }) + subFlow(ReceiveFinalityFlow(otherSide, stx.id, statesToRecord = StatesToRecord.ALL_VISIBLE)) + } else { + subFlow(ReceiveFinalityFlow(otherSide, statesToRecord = StatesToRecord.ALL_VISIBLE)) + } + } +} + +@StartableByRPC +class AttachmentIssueFlow(private val attachId: SecureHash.SHA256, + private val notary: Party): FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + val builder = TransactionBuilder(notary) + builder.addAttachment(attachId) + builder.addOutputState(TransactionState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID, notary)) + builder.addCommand(Command(AttachmentContract.Command, listOf(ourIdentity.owningKey))) + val tx = serviceHub.signInitialTransaction(builder, ourIdentity.owningKey) + return subFlow(FinalityFlow(tx, emptySet(), statesToRecord = StatesToRecord.ALL_VISIBLE)) + } +} diff --git a/node/src/integration-test/kotlin/net/corda/flows/incompatible/version2/AttachmentFlow.kt b/node/src/integration-test/kotlin/net/corda/flows/incompatible/version2/AttachmentFlow.kt new file mode 100644 index 0000000000..2a1faadef7 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/flows/incompatible/version2/AttachmentFlow.kt @@ -0,0 +1,58 @@ +package net.corda.flows.incompatible.version2 + +import co.paralleluniverse.fibers.Suspendable +import net.corda.contracts.incompatible.version2.ATTACHMENT_PROGRAM_ID +import net.corda.contracts.incompatible.version2.AttachmentContract +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.ReceiveTransactionFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party +import net.corda.core.node.StatesToRecord +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.ProgressTracker + +@InitiatingFlow +@StartableByRPC +class AttachmentFlow(private val otherSide: Party, + private val notary: Party, + private val attachId: SecureHash.SHA256) : FlowLogic() { + + object SIGNING : ProgressTracker.Step("Signing transaction") + + override val progressTracker: ProgressTracker = ProgressTracker(SIGNING) + + @Suspendable + override fun call(): SignedTransaction { + // Create a trivial transaction with an output that describes the attachment, and the attachment itself + val ptx = TransactionBuilder(notary) + .addOutputState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID) + .addCommand(AttachmentContract.Command, ourIdentity.owningKey) + .addAttachment(attachId) + + progressTracker.currentStep = SIGNING + + val stx = serviceHub.signInitialTransaction(ptx) + + // Send the transaction to the other recipient + return subFlow(FinalityFlow(stx, initiateFlow(otherSide))) + } +} + +@InitiatedBy(AttachmentFlow::class) +class StoreAttachmentFlow(private val otherSide: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + // purposely prevent transaction verification and recording in ReceiveTransactionFlow + val stx = subFlow(ReceiveTransactionFlow(otherSide, checkSufficientSignatures = false, statesToRecord = StatesToRecord.ALL_VISIBLE)) + logger.info("StoreAttachmentFlow: successfully received fully signed tx. Sending it to the vault for processing.") + + serviceHub.recordTransactions(StatesToRecord.ALL_VISIBLE, setOf(stx)) + logger.info("StoreAttachmentFlow: successfully recorded received transaction locally.") + } +} diff --git a/node/src/integration-test/kotlin/net/corda/flows/incompatible/version3/AttachmentFlow.kt b/node/src/integration-test/kotlin/net/corda/flows/incompatible/version3/AttachmentFlow.kt new file mode 100644 index 0000000000..082f4da8aa --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/flows/incompatible/version3/AttachmentFlow.kt @@ -0,0 +1,56 @@ +package net.corda.flows.incompatible.version3 + +import co.paralleluniverse.fibers.Suspendable +import net.corda.contracts.incompatible.version1.ATTACHMENT_PROGRAM_ID +import net.corda.contracts.incompatible.version1.AttachmentContract +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.ReceiveTransactionFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party +import net.corda.core.node.StatesToRecord +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.ProgressTracker + +@InitiatingFlow +@StartableByRPC +class AttachmentFlow(private val otherSide: Party, + private val notary: Party, + private val attachId: SecureHash.SHA256) : FlowLogic() { + + object SIGNING : ProgressTracker.Step("Signing transaction") + + override val progressTracker: ProgressTracker = ProgressTracker(SIGNING) + + @Suspendable + override fun call(): SignedTransaction { + // Create a trivial transaction with an output that describes the attachment, and the attachment itself + val ptx = TransactionBuilder(notary) + .addOutputState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID) + .addCommand(AttachmentContract.Command, ourIdentity.owningKey) + .addAttachment(attachId) + + progressTracker.currentStep = SIGNING + + val stx = serviceHub.signInitialTransaction(ptx) + + // Send the transaction to the other recipient + return subFlow(FinalityFlow(stx, initiateFlow(otherSide))) + } +} + +@InitiatedBy(AttachmentFlow::class) +class StoreAttachmentFlow(private val otherSide: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + // purposely enable transaction verification and recording in ReceiveTransactionFlow + subFlow(ReceiveTransactionFlow(otherSide, checkSufficientSignatures = true, statesToRecord = StatesToRecord.ALL_VISIBLE)) + logger.info("StoreAttachmentFlow: successfully received fully signed tx. Sending it to the vault for processing.") + } +} + diff --git a/node/src/integration-test/kotlin/net/corda/node/VaultUpdateDeserializationTest.kt b/node/src/integration-test/kotlin/net/corda/node/VaultUpdateDeserializationTest.kt new file mode 100644 index 0000000000..9b0bc3f0e9 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/VaultUpdateDeserializationTest.kt @@ -0,0 +1,249 @@ +package net.corda.node + +import co.paralleluniverse.strands.Strand +import junit.framework.TestCase.assertEquals +import junit.framework.TestCase.assertNotNull +import junit.framework.TestCase.assertTrue +import net.corda.core.flows.UnexpectedFlowEndException +import net.corda.core.internal.InputStreamAndHash +import net.corda.core.internal.deleteRecursively +import net.corda.core.internal.div +import net.corda.core.messaging.startFlow +import net.corda.core.messaging.vaultQueryBy +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.flows.incompatible.version1.AttachmentIssueFlow +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.NodeHandle +import net.corda.testing.driver.NodeParameters +import net.corda.testing.driver.OutOfProcess +import net.corda.testing.driver.driver +import net.corda.testing.driver.internal.incrementalPortAllocation +import net.corda.testing.flows.waitForAllFlowsToComplete +import net.corda.testing.node.NotarySpec +import net.corda.testing.node.TestCordapp +import net.corda.testing.node.internal.cordappWithPackages +import org.junit.Test +import java.util.concurrent.TimeoutException +import net.corda.contracts.incompatible.version1.AttachmentContract as AttachmentContractV1 +import net.corda.flows.incompatible.version1.AttachmentFlow as AttachmentFlowV1 + +class VaultUpdateDeserializationTest { + companion object { + // uses ReceiveFinalityFlow + val flowVersion1 = cordappWithPackages("net.corda.flows.incompatible.version1") + // single state field of type SecureHash.SHA256 with system property driven run-time behaviour: + // -force contract verify failure: -Dnet.corda.contracts.incompatible.AttachmentContract.fail.verify=true + // -force contract state init failure: -Dnet.corda.contracts.incompatible.AttachmentContract.fail.state=true + val contractVersion1 = cordappWithPackages("net.corda.contracts.incompatible.version1") + + fun driverParameters(cordapps: List): DriverParameters { + return DriverParameters( + portAllocation = incrementalPortAllocation(), + inMemoryDB = false, + startNodesInProcess = false, + notarySpecs = listOf(NotarySpec(DUMMY_NOTARY_NAME)), + cordappsForAllNodes = cordapps + ) + } + } + + /* + * Transaction sent from A -> B with Notarisation + * Test that a deserialization error is raised where the receiver node of a transaction has an incompatible contract jar. + * In the case of a notarised transaction, a deserialisation error is thrown in the receiver SignTransactionFlow (before finality) + * upon receiving the transaction to be signed and attempting to record its dependencies. + * The ledger will not record any transactions, and the flow must be retried by the sender upon installing the correct contract jar + * version at the receiver and re-starting the node. + */ + @Test(timeout=300_000) + fun `Notarised transaction fails completely upon receiver deserialization failure collecting signatures when using incompatible contract jar`() { + driver(driverParameters(listOf(flowVersion1, contractVersion1))) { + val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = ALICE_NAME).getOrThrow() + val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1), + systemProperties = mapOf("net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true")), + providedName = BOB_NAME).getOrThrow() + + val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0) + alice.rpc.uploadAttachment(inputStream) + + val stx = alice.rpc.startFlow(::AttachmentIssueFlow, hash, defaultNotaryIdentity).returnValue.getOrThrow(30.seconds) + val spendableState = stx.coreTransaction.outRef(0) + + // NOTE: exception is propagated from Receiver + try { + alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds) + } + catch(e: UnexpectedFlowEndException) { + println("Bob fails to deserialise transaction upon receipt of transaction for signing.") + } + assertEquals(0, bob.rpc.vaultQueryBy().states.size) + assertEquals(1, alice.rpc.vaultQueryBy().states.size) + // check transaction records + @Suppress("DEPRECATION") + assertEquals(1, alice.rpc.internalVerifiedTransactionsSnapshot().size) // issuance only + @Suppress("DEPRECATION") + assertTrue(bob.rpc.internalVerifiedTransactionsSnapshot().isEmpty()) + + // restart Bob with correct contract jar version + (bob as OutOfProcess).process.destroyForcibly() + bob.stop() + (baseDirectory(BOB_NAME) / "cordapps").deleteRecursively() + + val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = BOB_NAME).getOrThrow() + // re-run failed flow + alice.rpc.startFlow(::AttachmentFlowV1, restartedBob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds) + + assertEquals(1, waitForVaultUpdate(restartedBob)) + assertEquals(1, alice.rpc.vaultQueryBy().states.size) + @Suppress("DEPRECATION") + assertTrue(restartedBob.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty()) + } + } + + /* + * Transaction sent from A -> B with Notarisation + * + * Test original deserialization failure behaviour by setting a new configurable java system property. + * The ledger will enter an inconsistent state from which is cannot auto-recover. + */ + @Test(timeout=300_000) + fun `Notarised transaction when using incompatible contract jar and overriden system property`() { + driver(driverParameters(listOf(flowVersion1, contractVersion1))) { + val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = ALICE_NAME).getOrThrow() + val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1), + systemProperties = mapOf("net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true", + "net.corda.vaultupdate.ignore.transaction.deserialization.errors" to "true", + "net.corda.recordtransaction.signature.verification.disabled" to "true")), + providedName = BOB_NAME).getOrThrow() + + val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0) + alice.rpc.uploadAttachment(inputStream) + + val stx = alice.rpc.startFlow(::AttachmentIssueFlow, hash, defaultNotaryIdentity).returnValue.getOrThrow(30.seconds) + val spendableState = stx.coreTransaction.outRef(0) + + // Flow completes successfully (deserialisation error on Receiver node is ignored) + alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds) + + // sender node correctly updated + @Suppress("DEPRECATION") + assertEquals(2, alice.rpc.internalVerifiedTransactionsSnapshot().size) + assertEquals(1, alice.rpc.vaultQueryBy().states.size) + + // receiver node has transaction but vault not updated! + @Suppress("DEPRECATION") + assertEquals(2, bob.rpc.internalVerifiedTransactionsSnapshot().size) + assertEquals(0, bob.rpc.vaultQueryBy().states.size) + } + } + + /* + * Transaction sent from A -> B without Notarisation + * Test that a deserialization error is raised where the receiver node of a finality flow has an incompatible contract jar. + * The ledger will be temporarily inconsistent until the correct contract jar version is installed and the receiver node is re-started. + */ + @Test(timeout=300_000) + fun `un-notarised transaction is hospitalized at receiver upon deserialization failure in vault update when using incompatible contract jar`() { + driver(driverParameters(emptyList())) { + val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = ALICE_NAME).getOrThrow() + val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1), + systemProperties = mapOf("net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true")), + providedName = BOB_NAME).getOrThrow() + + val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0) + alice.rpc.uploadAttachment(inputStream) + + // ISSUE: exception is not propagating from Receiver + try { + alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, null).returnValue.getOrThrow(30.seconds) + } + catch(e: TimeoutException) { + println("Alice: Timeout awaiting flow completion.") + } + assertEquals(0, bob.rpc.vaultQueryBy().states.size) + // check transaction records + @Suppress("DEPRECATION") + assertTrue(alice.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty()) + @Suppress("DEPRECATION") + assertTrue(bob.rpc.internalVerifiedTransactionsSnapshot().isEmpty()) + + // restart Bob with correct contract jar version + (bob as OutOfProcess).process.destroyForcibly() + bob.stop() + (baseDirectory(BOB_NAME) / "cordapps").deleteRecursively() + + val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = BOB_NAME).getOrThrow() + // original hospitalized transaction should now have been re-processed with correct contract jar + assertEquals(1, waitForVaultUpdate(restartedBob)) + @Suppress("DEPRECATION") + assertTrue(restartedBob.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty()) + } + } + + /* + * Transaction sent from A -> B without Notarisation + * Test original deserialization failure behaviour by setting a new configurable java system property. + * The ledger will enter an inconsistent state from which is cannot auto-recover. + */ + @Test(timeout = 300_000) + fun `un-notarised transaction ignores deserialization failure in vault update when using incompatible contract jar and overriden system property`() { + driver(driverParameters(emptyList())) { + val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = ALICE_NAME).getOrThrow() + val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1), + systemProperties = mapOf( + "net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true", + "net.corda.vaultupdate.ignore.transaction.deserialization.errors" to "true", + "net.corda.recordtransaction.signature.verification.disabled" to "true")), + providedName = BOB_NAME).getOrThrow() + + val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0) + alice.rpc.uploadAttachment(inputStream) + + // Note: TransactionDeserialisationException is swallowed on the receiver node (without updating the vault). + val stx = alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, null).returnValue.getOrThrow(30.seconds) + println("Alice txId: ${stx.id}") + + waitForAllFlowsToComplete(bob) + val txId = bob.rpc.stateMachineRecordedTransactionMappingSnapshot().single().transactionId + println("Bob txId: $txId") + + assertEquals(0, bob.rpc.vaultQueryBy().states.size) + + // restart Bob with correct contract jar version + (bob as OutOfProcess).process.destroyForcibly() + bob.stop() + (baseDirectory(BOB_NAME) / "cordapps").deleteRecursively() + + val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = BOB_NAME).getOrThrow() + // transaction recorded + @Suppress("DEPRECATION") + assertNotNull(restartedBob.rpc.internalFindVerifiedTransaction(txId)) + // but vault states not updated + assertEquals(0, restartedBob.rpc.vaultQueryBy().states.size) + } + } + + private fun waitForVaultUpdate(nodeHandle: NodeHandle, maxIterations: Int = 5, iterationDelay: Long = 500): Int { + repeat((0..maxIterations).count()) { + val count = nodeHandle.rpc.vaultQueryBy().states + if (count.isNotEmpty()) { + return count.size + } + Strand.sleep(iterationDelay) + } + return 0 + } +} + diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index a444b11cc4..a2623b638d 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -21,6 +21,7 @@ import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.tee import net.corda.core.internal.uncheckedCast +import net.corda.core.internal.warnOnce import net.corda.core.messaging.DataFeed import net.corda.core.node.StatesToRecord import net.corda.core.node.services.KeyManagementService @@ -107,6 +108,8 @@ class NodeVaultService( const val DEFAULT_SOFT_LOCKING_SQL_IN_CLAUSE_SIZE = 16 + private val IGNORE_TRANSACTION_DESERIALIZATION_ERRORS = java.lang.Boolean.getBoolean("net.corda.vaultupdate.ignore.transaction.deserialization.errors") + /** * Establish whether a given state is relevant to a node, given the node's public keys. * @@ -307,18 +310,29 @@ class NodeVaultService( private fun makeUpdates(batch: Iterable, statesToRecord: StatesToRecord, previouslySeen: Boolean): List> { - fun withValidDeserialization(list: List, txId: SecureHash): Map = (0 until list.size).mapNotNull { idx -> - try { - idx to list[idx] - } catch (e: TransactionDeserialisationException) { - // When resolving transaction dependencies we might encounter contracts we haven't installed locally. - // This will cause a failure as we can't deserialize such states in the context of the `appClassloader`. - // For now we ignore these states. - // In the future we will use the AttachmentsClassloader to correctly deserialize and asses the relevancy. - log.warn("Could not deserialize state $idx from transaction $txId. Cause: $e") - null - } - }.toMap() + fun withValidDeserialization(list: List, txId: SecureHash): Map { + var error: TransactionDeserialisationException? = null + val map = (0 until list.size).mapNotNull { idx -> + try { + idx to list[idx] + } catch (e: TransactionDeserialisationException) { + // When resolving transaction dependencies we might encounter contracts we haven't installed locally. + // This will cause a failure as we can't deserialize such states in the context of the `appClassloader`. + // For now we ignore these states. + // In the future we will use the AttachmentsClassloader to correctly deserialize and asses the relevancy. + if (IGNORE_TRANSACTION_DESERIALIZATION_ERRORS) { + log.warnOnce("The current usage of transaction deserialization for the vault is unsafe." + + "Ignoring vault updates due to failed deserialized states may lead to severe problems with ledger consistency. ") + log.warn("Could not deserialize state $idx from transaction $txId. Cause: $e") + } else { + log.error("Could not deserialize state $idx from transaction $txId. Cause: $e") + if(error == null) error = e + } + null + } + }.toMap() + return error?.let { throw it } ?: map + } // Returns only output states that can be deserialised successfully. fun WireTransaction.deserializableOutputStates(): Map> = withValidDeserialization(this.outputs, this.id) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/NodeParameters.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/NodeParameters.kt index b1604b94df..12b55fc146 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/NodeParameters.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/NodeParameters.kt @@ -36,7 +36,8 @@ data class NodeParameters( val additionalCordapps: Collection = emptySet(), val flowOverrides: Map>, Class>> = emptyMap(), val logLevelOverride: String? = null, - val rpcAddress: NetworkHostAndPort? = null + val rpcAddress: NetworkHostAndPort? = null, + val systemProperties: Map = emptyMap() ) { /** * Create a new node parameters object with default values. Each parameter can be specified with its wither method which returns a copy @@ -127,4 +128,97 @@ data class NodeParameters( flowOverrides = flowOverrides, logLevelOverride = logLevelOverride) + constructor( + providedName: CordaX500Name?, + rpcUsers: List, + verifierType: VerifierType, + customOverrides: Map, + startInSameProcess: Boolean?, + maximumHeapSize: String, + additionalCordapps: Collection = emptySet(), + flowOverrides: Map>, Class>>, + logLevelOverride: String? = null + ) : this( + providedName, + rpcUsers, + verifierType, + customOverrides, + startInSameProcess, + maximumHeapSize, + additionalCordapps, + flowOverrides, + logLevelOverride, + rpcAddress = null) + + @Suppress("LongParameterList") + fun copy( + providedName: CordaX500Name?, + rpcUsers: List, + verifierType: VerifierType, + customOverrides: Map, + startInSameProcess: Boolean?, + maximumHeapSize: String, + additionalCordapps: Collection = emptySet(), + flowOverrides: Map>, Class>>, + logLevelOverride: String? = null + ) = this.copy( + providedName = providedName, + rpcUsers = rpcUsers, + verifierType = verifierType, + customOverrides = customOverrides, + startInSameProcess = startInSameProcess, + maximumHeapSize = maximumHeapSize, + additionalCordapps = additionalCordapps, + flowOverrides = flowOverrides, + logLevelOverride = logLevelOverride, + rpcAddress = rpcAddress) + + constructor( + providedName: CordaX500Name?, + rpcUsers: List, + verifierType: VerifierType, + customOverrides: Map, + startInSameProcess: Boolean?, + maximumHeapSize: String, + additionalCordapps: Collection = emptySet(), + flowOverrides: Map>, Class>>, + logLevelOverride: String? = null, + rpcAddress: NetworkHostAndPort? = null + ) : this( + providedName, + rpcUsers, + verifierType, + customOverrides, + startInSameProcess, + maximumHeapSize, + additionalCordapps, + flowOverrides, + logLevelOverride, + rpcAddress, + systemProperties = emptyMap()) + + @Suppress("LongParameterList") + fun copy( + providedName: CordaX500Name?, + rpcUsers: List, + verifierType: VerifierType, + customOverrides: Map, + startInSameProcess: Boolean?, + maximumHeapSize: String, + additionalCordapps: Collection = emptySet(), + flowOverrides: Map>, Class>>, + logLevelOverride: String? = null, + rpcAddress: NetworkHostAndPort? = null + ) = this.copy( + providedName = providedName, + rpcUsers = rpcUsers, + verifierType = verifierType, + customOverrides = customOverrides, + startInSameProcess = startInSameProcess, + maximumHeapSize = maximumHeapSize, + additionalCordapps = additionalCordapps, + flowOverrides = flowOverrides, + logLevelOverride = logLevelOverride, + rpcAddress = rpcAddress, + systemProperties = systemProperties) } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt index b4b7d673e8..6ce7a2e419 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt @@ -755,7 +755,7 @@ class DriverDSLImpl( debugPort, bytemanJarPath, bytemanPort, - systemProperties, + systemProperties + parameters.systemProperties, parameters.maximumHeapSize, parameters.logLevelOverride, identifier,