diff --git a/.ci/dev/forward-merge/Jenkinsfile b/.ci/dev/forward-merge/Jenkinsfile index ccbd4dca13..91ac0e1580 100644 --- a/.ci/dev/forward-merge/Jenkinsfile +++ b/.ci/dev/forward-merge/Jenkinsfile @@ -1,8 +1,31 @@ @Library('corda-shared-build-pipeline-steps@5.1') _ -forwardMerger( - targetBranch: 'release/os/4.12', - originBranch: 'release/os/4.11', - slackChannel: '#build-team-automated-notifications' -) +/* + * Forward merge any changes in current branch to the branch with following version. + * + * Please note, the branches names are intentionally separated as variables, to minimised conflicts + * during automated merges for this file. + * + * These variables should be updated when a new version is cut + */ +/** + * the branch name of origin branch, it should match the current branch + * and it acts as a fail-safe inside {@code forwardMerger} pipeline + */ +String originBranch = 'release/os/4.11' + +/** + * the branch name of target branch, it should be the branch with the next version + * after the one in current branch. + */ +String targetBranch = 'release/os/4.12' + +/** + * Forward merge any changes between #originBranch and #targetBranch + */ +forwardMerger( + targetBranch: targetBranch, + originBranch: originBranch, + slackChannel: '#c4-forward-merge-bot-notifications', +) diff --git a/constants.properties b/constants.properties index 55e9e68db7..6c78be4da5 100644 --- a/constants.properties +++ b/constants.properties @@ -52,7 +52,7 @@ artemisVersion=2.19.1 # TODO Upgrade Jackson only when corda is using kotlin 1.3.10 jacksonVersion=2.13.5 jacksonKotlinVersion=2.9.7 -jettyVersion=9.4.19.v20190610 +jettyVersion=9.4.52.v20230823 jerseyVersion=2.25 servletVersion=4.0.1 assertjVersion=3.12.2 diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt index 421e1581cf..bfbb93c96b 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt @@ -355,14 +355,14 @@ class FinalityFlowTests : WithFinality { val sdrs = getSenderRecoveryData(stx.id, aliceNode.database).apply { assertEquals(1, this.size) assertEquals(StatesToRecord.ALL_VISIBLE, this[0].statesToRecord) - assertEquals(BOB_NAME.hashCode().toLong(), this[0].peerPartyId) + assertEquals(SecureHash.sha256(BOB_NAME.toString()), this[0].peerPartyId) } val rdr = getReceiverRecoveryData(stx.id, bobNode).apply { assertNotNull(this) val hashedDL = HashedDistributionList.decrypt(this!!.encryptedDistributionList.bytes, aliceNode.internals.encryptionService) assertEquals(StatesToRecord.ONLY_RELEVANT, hashedDL.senderStatesToRecord) - assertEquals(aliceNode.info.singleIdentity().name.hashCode().toLong(), this.initiatorPartyId) - assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ALL_VISIBLE), hashedDL.peerHashToStatesToRecord) + assertEquals(SecureHash.sha256(aliceNode.info.singleIdentity().name.toString()), this.initiatorPartyId) + assertEquals(mapOf(SecureHash.sha256(BOB_NAME.toString()) to StatesToRecord.ALL_VISIBLE), hashedDL.peerHashToStatesToRecord) } validateSenderAndReceiverTimestamps(sdrs, rdr!!) } @@ -388,19 +388,19 @@ class FinalityFlowTests : WithFinality { val sdrs = getSenderRecoveryData(stx.id, aliceNode.database).apply { assertEquals(2, this.size) assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord) - assertEquals(BOB_NAME.hashCode().toLong(), this[0].peerPartyId) + assertEquals(SecureHash.sha256(BOB_NAME.toString()), this[0].peerPartyId) assertEquals(StatesToRecord.ALL_VISIBLE, this[1].statesToRecord) - assertEquals(CHARLIE_NAME.hashCode().toLong(), this[1].peerPartyId) + assertEquals(SecureHash.sha256(CHARLIE_NAME.toString()), this[1].peerPartyId) } val rdr = getReceiverRecoveryData(stx.id, bobNode).apply { assertNotNull(this) val hashedDL = HashedDistributionList.decrypt(this!!.encryptedDistributionList.bytes, aliceNode.internals.encryptionService) assertEquals(StatesToRecord.ONLY_RELEVANT, hashedDL.senderStatesToRecord) - assertEquals(aliceNode.info.singleIdentity().name.hashCode().toLong(), this.initiatorPartyId) + assertEquals(SecureHash.sha256(aliceNode.info.singleIdentity().name.toString()), this.initiatorPartyId) // note: Charlie assertion here is using the hinted StatesToRecord value passed to it from Alice - assertEquals(mapOf( - BOB_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT, - CHARLIE_NAME.hashCode().toLong() to StatesToRecord.ALL_VISIBLE + assertEquals(mapOf( + SecureHash.sha256(BOB_NAME.toString()) to StatesToRecord.ONLY_RELEVANT, + SecureHash.sha256(CHARLIE_NAME.toString()) to StatesToRecord.ALL_VISIBLE ), hashedDL.peerHashToStatesToRecord) } validateSenderAndReceiverTimestamps(sdrs, rdr!!) @@ -452,14 +452,14 @@ class FinalityFlowTests : WithFinality { val sdr = getSenderRecoveryData(stx.id, aliceNode.database).apply { assertEquals(1, this.size) assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord) - assertEquals(BOB_NAME.hashCode().toLong(), this[0].peerPartyId) + assertEquals(SecureHash.sha256(BOB_NAME.toString()), this[0].peerPartyId) } val rdr = getReceiverRecoveryData(stx.id, bobNode).apply { assertNotNull(this) val hashedDL = HashedDistributionList.decrypt(this!!.encryptedDistributionList.bytes, aliceNode.internals.encryptionService) assertEquals(StatesToRecord.ONLY_RELEVANT, hashedDL.senderStatesToRecord) - assertEquals(aliceNode.info.singleIdentity().name.hashCode().toLong(), this.initiatorPartyId) - assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT), hashedDL.peerHashToStatesToRecord) + assertEquals(SecureHash.sha256(aliceNode.info.singleIdentity().name.toString()), this.initiatorPartyId) + assertEquals(mapOf(SecureHash.sha256(BOB_NAME.toString()) to StatesToRecord.ONLY_RELEVANT), hashedDL.peerHashToStatesToRecord) } validateSenderAndReceiverTimestamps(sdr, rdr!!) } diff --git a/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt b/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt index f9d7353ed4..cf91603133 100644 --- a/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt @@ -14,36 +14,17 @@ import net.corda.core.utilities.ProgressTracker @StartableByRPC @InitiatingFlow class LedgerRecoveryFlow( - private val recoveryPeers: Collection, - private val timeWindow: RecoveryTimeWindow, - private val useAllNetworkNodes: Boolean = false, - private val transactionRole: TransactionRole = TransactionRole.ALL, - private val dryRun: Boolean = false, - private val optimisticInitiatorRecovery: Boolean = false, - override val progressTracker: ProgressTracker = ProgressTracker()) : FlowLogic>() { + private val parameters: LedgerRecoveryParameters, + override val progressTracker: ProgressTracker = ProgressTracker()) : FlowLogic() { @CordaInternal - data class ExtraConstructorArgs(val recoveryPeers: Collection, - val timeWindow: RecoveryTimeWindow, - val useAllNetworkNodes: Boolean, - val transactionRole: TransactionRole, - val dryRun: Boolean, - val optimisticInitiatorRecovery: Boolean) + data class ExtraConstructorArgs(val parameters: LedgerRecoveryParameters) @CordaInternal - fun getExtraConstructorArgs() = ExtraConstructorArgs(recoveryPeers, timeWindow, useAllNetworkNodes, transactionRole, dryRun, optimisticInitiatorRecovery) - - // unused constructors added to facilitate Node Shell command invocation - constructor(recoveryPeer: Party, timeWindow: RecoveryTimeWindow) : this(setOf(recoveryPeer), timeWindow, false, TransactionRole.ALL, false, false) - constructor(recoveryPeer: Party, timeWindow: RecoveryTimeWindow, dryRun: Boolean) : this(setOf(recoveryPeer), timeWindow, false, TransactionRole.ALL, dryRun, false) - - constructor(timeWindow: RecoveryTimeWindow, dryRun: Boolean) : this(emptySet(), timeWindow, false, TransactionRole.ALL, dryRun, false) - constructor(timeWindow: RecoveryTimeWindow, dryRun: Boolean, optimisticInitiatorRecovery: Boolean) : this(emptySet(), timeWindow, false, TransactionRole.ALL, dryRun, optimisticInitiatorRecovery) - constructor(recoveryPeers: Collection, timeWindow: RecoveryTimeWindow, dryRun: Boolean) : this(recoveryPeers, timeWindow, false, TransactionRole.ALL, dryRun, false) - constructor(recoveryPeers: Collection, timeWindow: RecoveryTimeWindow, dryRun: Boolean, optimisticInitiatorRecovery: Boolean) : this(recoveryPeers, timeWindow, false, TransactionRole.ALL, dryRun, optimisticInitiatorRecovery) + fun getExtraConstructorArgs() = ExtraConstructorArgs(parameters) @Suspendable @Throws(LedgerRecoveryException::class) - override fun call(): Map { + override fun call(): LedgerRecoveryResult { throw NotImplementedError("Enterprise only feature") } } @@ -59,6 +40,26 @@ class ReceiveLedgerRecoveryFlow constructor(private val otherSideSession: FlowSe @CordaSerializable class LedgerRecoveryException(message: String) : FlowException("Ledger recovery failed: $message") +@CordaSerializable +data class LedgerRecoveryParameters( + val recoveryPeers: Collection, + val timeWindow: RecoveryTimeWindow? = null, + val useAllNetworkNodes: Boolean = false, + val transactionRole: TransactionRole = TransactionRole.ALL, + val dryRun: Boolean = false, + val optimisticInitiatorRecovery: Boolean = false, + val useTimeWindowNarrowing: Boolean = true, + val verboseLogging: Boolean = true, + val recoveryBatchSize: Int = 1000 +) + +@CordaSerializable +data class LedgerRecoveryResult( + val totalRecoveredRecords: Long, + val totalRecoveredTransactions: Long, + val totalErrors: Long +) + /** * This specifies which type of transactions to recover based on the transaction role of the recovering node */ @@ -80,6 +81,3 @@ data class RecoveryResult( val synchronisedInitiated: Boolean = false, // only attempted if [optimisticInitiatorRecovery] option set to true and [TransactionRecoveryType.INITIATOR] val failureCause: String? = null // reason why a transaction failed to synchronise ) - - - diff --git a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt index 7b45d972af..31bbc4cf3b 100644 --- a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt @@ -78,12 +78,7 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow checkParameterHash(stx.networkParametersHash) subFlow(ResolveTransactionsFlow(stx, otherSideSession, statesToRecord, deferredAck)) logger.info("Transaction dependencies resolution completed.") - try { - stx.verify(serviceHub, checkSufficientSignatures) - } catch (e: Exception) { - logger.warn("Transaction verification failed.") - throw e - } + verifyTx(stx, checkSufficientSignatures) if (checkSufficientSignatures) { // We should only send a transaction to the vault for processing if we did in fact fully verify it, and // there are no missing signatures. We don't want partly signed stuff in the vault. @@ -97,6 +92,15 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow } } + private fun verifyTx(stx: SignedTransaction, localCheckSufficientSignatures: Boolean) { + try { + stx.verify(serviceHub, localCheckSufficientSignatures) + } catch (e: Exception) { + logger.warn("Transaction verification failed.") + throw e + } + } + private fun isDeferredAck(payload: Any): Boolean { return payload is SignedTransactionWithDistributionList && checkSufficientSignatures && payload.isFinality } @@ -109,7 +113,7 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow checkParameterHash(stx.networkParametersHash) subFlow(ResolveTransactionsFlow(stx, otherSideSession, statesToRecord, true)) logger.info("Transaction dependencies resolution completed.") - + verifyTx(stx, false) serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) { logger.debug { "Peer recording transaction without notary signature." } (serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx) diff --git a/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt b/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt index 3323d2d743..d10bbe0ebb 100644 --- a/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt @@ -132,7 +132,7 @@ open class DataVendingFlow(val otherSessions: Set, val payload: Any protected open fun isFinality(): Boolean = false - @Suppress("ComplexCondition", "ComplexMethod", "LongMethod") + @Suppress("ComplexCondition", "ComplexMethod", "LongMethod", "TooGenericExceptionThrown") @Suspendable override fun call(): Void? { val networkMaxMessageSize = serviceHub.networkParameters.maxMessageSize @@ -151,11 +151,14 @@ open class DataVendingFlow(val otherSessions: Set, val payload: Any is NotarisationPayload -> TransactionAuthorisationFilter().addAuthorised(getInputTransactions(payload.signedTransaction)) is SignedTransaction -> TransactionAuthorisationFilter().addAuthorised(getInputTransactions(payload)) is RetrieveAnyTransactionPayload -> TransactionAuthorisationFilter(acceptAll = true) - is List<*> -> TransactionAuthorisationFilter().addAuthorised(payload.flatMap { stateAndRef -> - if (stateAndRef is StateAndRef<*>) { - getInputTransactions(serviceHub.validatedTransactions.getTransaction(stateAndRef.ref.txhash)!!) + stateAndRef.ref.txhash + is List<*> -> TransactionAuthorisationFilter().addAuthorised(payload.flatMap { someObject -> + if (someObject is StateAndRef<*>) { + getInputTransactions(serviceHub.validatedTransactions.getTransaction(someObject.ref.txhash)!!) + someObject.ref.txhash + } + else if (someObject is NamedByHash) { + setOf(someObject.id) } else { - throw Exception("Unknown payload type: ${stateAndRef!!::class.java} ?") + throw Exception("Unknown payload type: ${someObject!!::class.java} ?") } }.toSet()) else -> throw Exception("Unknown payload type: ${payload::class.java} ?") diff --git a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt index 882ca901fe..053015fef9 100644 --- a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt @@ -26,6 +26,9 @@ class ResolveTransactionsFlow private constructor( constructor(txHashes: Set, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE) : this(null, txHashes, otherSide, statesToRecord) + constructor(txHashes: Set, otherSide: FlowSession, statesToRecord: StatesToRecord, deferredAck: Boolean) + : this(null, txHashes, otherSide, statesToRecord, deferredAck) + /** * Resolves and validates the dependencies of the specified [SignedTransaction]. Fetches the attachments, but does * *not* validate or store the [SignedTransaction] itself. diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/flows/FinalityFlowErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/flows/FinalityFlowErrorHandlingTest.kt index 9617c21fb3..dc0133f575 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/flows/FinalityFlowErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/flows/FinalityFlowErrorHandlingTest.kt @@ -74,7 +74,7 @@ class FinalityFlowErrorHandlingTest : StateMachineErrorHandlingTest() { alice.rpc.startFlow(::GetFlowTransaction, txId).returnValue.getOrThrow().apply { assertEquals("V", this.first) // "V" -> VERIFIED - assertEquals(CHARLIE_NAME.hashCode().toLong(), this.second) // peer + assertEquals(SecureHash.sha256(CHARLIE_NAME.toString()).toString(), this.second) // peer } } } @@ -83,9 +83,9 @@ class FinalityFlowErrorHandlingTest : StateMachineErrorHandlingTest() { // Internal use for testing only!! @StartableByRPC -class GetFlowTransaction(private val txId: SecureHash) : FlowLogic>() { +class GetFlowTransaction(private val txId: SecureHash) : FlowLogic>() { @Suspendable - override fun call(): Pair { + override fun call(): Pair { val transactionStatus = serviceHub.jdbcSession().prepareStatement("select * from node_transactions where tx_id = ?") .apply { setString(1, txId.toString()) } .use { ps -> @@ -94,12 +94,12 @@ class GetFlowTransaction(private val txId: SecureHash) : FlowLogic ps.executeQuery().use { rs -> rs.next() - rs.getLong(4) // receiverPartyId + rs.getString(4) // receiverPartyId } } return Pair(transactionStatus, receiverPartyId) diff --git a/node/src/integration-test/kotlin/net/corda/contracts/incompatible/version1/AttachmentContract.kt b/node/src/integration-test/kotlin/net/corda/contracts/incompatible/version1/AttachmentContract.kt new file mode 100644 index 0000000000..f4c3fa965b --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/contracts/incompatible/version1/AttachmentContract.kt @@ -0,0 +1,33 @@ +package net.corda.contracts.incompatible.version1 + +import net.corda.core.contracts.Contract +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.TransactionVerificationException +import net.corda.core.contracts.TypeOnlyCommandData +import net.corda.core.crypto.SecureHash +import net.corda.core.identity.AbstractParty +import net.corda.core.serialization.internal.AttachmentsClassLoader +import net.corda.core.transactions.LedgerTransaction + +class AttachmentContract : Contract { + + private val FAIL_CONTRACT_VERIFY = java.lang.Boolean.getBoolean("net.corda.contracts.incompatible.AttachmentContract.fail.verify") + override fun verify(tx: LedgerTransaction) { + if (FAIL_CONTRACT_VERIFY) throw object:TransactionVerificationException(tx.id, "AttachmentContract verify failed.", null) {} + val state = tx.outputsOfType().single() + // we check that at least one has the matching hash, the other will be the contract + require(tx.attachments.any { it.id == state.hash }) {"At least one attachment in transaction must match hash ${state.hash}"} + } + + object Command : TypeOnlyCommandData() + + data class State(val hash: SecureHash.SHA256) : ContractState { + private val FAIL_CONTRACT_STATE = java.lang.Boolean.getBoolean("net.corda.contracts.incompatible.AttachmentContract.fail.state") && (this.javaClass.classLoader !is AttachmentsClassLoader) + init { + if (FAIL_CONTRACT_STATE) throw TransactionVerificationException.TransactionRequiredContractUnspecifiedException(hash,"AttachmentContract state initialisation failed.") + } + override val participants: List = emptyList() + } +} + +const val ATTACHMENT_PROGRAM_ID = "net.corda.contracts.incompatible.version1.AttachmentContract" diff --git a/node/src/integration-test/kotlin/net/corda/contracts/incompatible/version2/AttachmentContract.kt b/node/src/integration-test/kotlin/net/corda/contracts/incompatible/version2/AttachmentContract.kt new file mode 100644 index 0000000000..00bb7e2371 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/contracts/incompatible/version2/AttachmentContract.kt @@ -0,0 +1,25 @@ +package net.corda.contracts.incompatible.version2 + +import net.corda.core.contracts.Contract +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.TypeOnlyCommandData +import net.corda.core.crypto.SecureHash +import net.corda.core.identity.AbstractParty +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.utilities.OpaqueBytes + +class AttachmentContract : Contract { + override fun verify(tx: LedgerTransaction) { + val state = tx.outputsOfType().single() + // we check that at least one has the matching hash, the other will be the contract + require(tx.attachments.any { it.id == SecureHash.SHA256(state.opaqueBytes.bytes) }) {"At least one attachment in transaction must match hash ${state.opaqueBytes}"} + } + + object Command : TypeOnlyCommandData() + + data class State(val opaqueBytes: OpaqueBytes) : ContractState { + override val participants: List = emptyList() + } +} + +const val ATTACHMENT_PROGRAM_ID = "net.corda.contracts.incompatible.version2.AttachmentContract" diff --git a/node/src/integration-test/kotlin/net/corda/flows/incompatible/version1/AttachmentFlow.kt b/node/src/integration-test/kotlin/net/corda/flows/incompatible/version1/AttachmentFlow.kt new file mode 100644 index 0000000000..6a4ccd9d04 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/flows/incompatible/version1/AttachmentFlow.kt @@ -0,0 +1,94 @@ +package net.corda.flows.incompatible.version1 + +import co.paralleluniverse.fibers.Suspendable +import net.corda.contracts.incompatible.version1.ATTACHMENT_PROGRAM_ID +import net.corda.contracts.incompatible.version1.AttachmentContract +import net.corda.core.contracts.Command +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.TransactionState +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.CollectSignaturesFlow +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.ReceiveFinalityFlow +import net.corda.core.flows.SignTransactionFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party +import net.corda.core.node.StatesToRecord +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.ProgressTracker +import net.corda.core.utilities.unwrap + +@InitiatingFlow +@StartableByRPC +class AttachmentFlow(private val otherSide: Party, + private val notary: Party, + private val attachId: SecureHash.SHA256, + private val notariseInputState: StateAndRef? = null) : FlowLogic() { + + object SIGNING : ProgressTracker.Step("Signing transaction") + + override val progressTracker: ProgressTracker = ProgressTracker(SIGNING) + + @Suspendable + override fun call(): SignedTransaction { + val session = initiateFlow(otherSide) + val notarise = notariseInputState != null + session.send(notarise) // inform peer whether to sign for notarisation + + // Create a trivial transaction with an output that describes the attachment, and the attachment itself + val ptx = TransactionBuilder(notary) + .addOutputState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID) + .addAttachment(attachId) + if (notarise) { + ptx.addInputState(notariseInputState!!) + ptx.addCommand(AttachmentContract.Command, ourIdentity.owningKey, otherSide.owningKey) + } + else + ptx.addCommand(AttachmentContract.Command, ourIdentity.owningKey) + + progressTracker.currentStep = SIGNING + + val stx = serviceHub.signInitialTransaction(ptx) + val ftx = if (notarise) { + subFlow(CollectSignaturesFlow(stx, listOf(session))) + } else stx + + return subFlow(FinalityFlow(ftx, setOf(session), statesToRecord = StatesToRecord.ALL_VISIBLE)) + } +} + +@InitiatedBy(AttachmentFlow::class) +class StoreAttachmentFlow(private val otherSide: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val notarise = otherSide.receive().unwrap { it } + if (notarise) { + val stx = subFlow(object : SignTransactionFlow(otherSide) { + override fun checkTransaction(stx: SignedTransaction) { + } + }) + subFlow(ReceiveFinalityFlow(otherSide, stx.id, statesToRecord = StatesToRecord.ALL_VISIBLE)) + } else { + subFlow(ReceiveFinalityFlow(otherSide, statesToRecord = StatesToRecord.ALL_VISIBLE)) + } + } +} + +@StartableByRPC +class AttachmentIssueFlow(private val attachId: SecureHash.SHA256, + private val notary: Party): FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + val builder = TransactionBuilder(notary) + builder.addAttachment(attachId) + builder.addOutputState(TransactionState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID, notary)) + builder.addCommand(Command(AttachmentContract.Command, listOf(ourIdentity.owningKey))) + val tx = serviceHub.signInitialTransaction(builder, ourIdentity.owningKey) + return subFlow(FinalityFlow(tx, emptySet(), statesToRecord = StatesToRecord.ALL_VISIBLE)) + } +} diff --git a/node/src/integration-test/kotlin/net/corda/flows/incompatible/version2/AttachmentFlow.kt b/node/src/integration-test/kotlin/net/corda/flows/incompatible/version2/AttachmentFlow.kt new file mode 100644 index 0000000000..2a1faadef7 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/flows/incompatible/version2/AttachmentFlow.kt @@ -0,0 +1,58 @@ +package net.corda.flows.incompatible.version2 + +import co.paralleluniverse.fibers.Suspendable +import net.corda.contracts.incompatible.version2.ATTACHMENT_PROGRAM_ID +import net.corda.contracts.incompatible.version2.AttachmentContract +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.ReceiveTransactionFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party +import net.corda.core.node.StatesToRecord +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.ProgressTracker + +@InitiatingFlow +@StartableByRPC +class AttachmentFlow(private val otherSide: Party, + private val notary: Party, + private val attachId: SecureHash.SHA256) : FlowLogic() { + + object SIGNING : ProgressTracker.Step("Signing transaction") + + override val progressTracker: ProgressTracker = ProgressTracker(SIGNING) + + @Suspendable + override fun call(): SignedTransaction { + // Create a trivial transaction with an output that describes the attachment, and the attachment itself + val ptx = TransactionBuilder(notary) + .addOutputState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID) + .addCommand(AttachmentContract.Command, ourIdentity.owningKey) + .addAttachment(attachId) + + progressTracker.currentStep = SIGNING + + val stx = serviceHub.signInitialTransaction(ptx) + + // Send the transaction to the other recipient + return subFlow(FinalityFlow(stx, initiateFlow(otherSide))) + } +} + +@InitiatedBy(AttachmentFlow::class) +class StoreAttachmentFlow(private val otherSide: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + // purposely prevent transaction verification and recording in ReceiveTransactionFlow + val stx = subFlow(ReceiveTransactionFlow(otherSide, checkSufficientSignatures = false, statesToRecord = StatesToRecord.ALL_VISIBLE)) + logger.info("StoreAttachmentFlow: successfully received fully signed tx. Sending it to the vault for processing.") + + serviceHub.recordTransactions(StatesToRecord.ALL_VISIBLE, setOf(stx)) + logger.info("StoreAttachmentFlow: successfully recorded received transaction locally.") + } +} diff --git a/node/src/integration-test/kotlin/net/corda/flows/incompatible/version3/AttachmentFlow.kt b/node/src/integration-test/kotlin/net/corda/flows/incompatible/version3/AttachmentFlow.kt new file mode 100644 index 0000000000..082f4da8aa --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/flows/incompatible/version3/AttachmentFlow.kt @@ -0,0 +1,56 @@ +package net.corda.flows.incompatible.version3 + +import co.paralleluniverse.fibers.Suspendable +import net.corda.contracts.incompatible.version1.ATTACHMENT_PROGRAM_ID +import net.corda.contracts.incompatible.version1.AttachmentContract +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.ReceiveTransactionFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party +import net.corda.core.node.StatesToRecord +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.ProgressTracker + +@InitiatingFlow +@StartableByRPC +class AttachmentFlow(private val otherSide: Party, + private val notary: Party, + private val attachId: SecureHash.SHA256) : FlowLogic() { + + object SIGNING : ProgressTracker.Step("Signing transaction") + + override val progressTracker: ProgressTracker = ProgressTracker(SIGNING) + + @Suspendable + override fun call(): SignedTransaction { + // Create a trivial transaction with an output that describes the attachment, and the attachment itself + val ptx = TransactionBuilder(notary) + .addOutputState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID) + .addCommand(AttachmentContract.Command, ourIdentity.owningKey) + .addAttachment(attachId) + + progressTracker.currentStep = SIGNING + + val stx = serviceHub.signInitialTransaction(ptx) + + // Send the transaction to the other recipient + return subFlow(FinalityFlow(stx, initiateFlow(otherSide))) + } +} + +@InitiatedBy(AttachmentFlow::class) +class StoreAttachmentFlow(private val otherSide: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + // purposely enable transaction verification and recording in ReceiveTransactionFlow + subFlow(ReceiveTransactionFlow(otherSide, checkSufficientSignatures = true, statesToRecord = StatesToRecord.ALL_VISIBLE)) + logger.info("StoreAttachmentFlow: successfully received fully signed tx. Sending it to the vault for processing.") + } +} + diff --git a/node/src/integration-test/kotlin/net/corda/node/VaultUpdateDeserializationTest.kt b/node/src/integration-test/kotlin/net/corda/node/VaultUpdateDeserializationTest.kt new file mode 100644 index 0000000000..9b0bc3f0e9 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/VaultUpdateDeserializationTest.kt @@ -0,0 +1,249 @@ +package net.corda.node + +import co.paralleluniverse.strands.Strand +import junit.framework.TestCase.assertEquals +import junit.framework.TestCase.assertNotNull +import junit.framework.TestCase.assertTrue +import net.corda.core.flows.UnexpectedFlowEndException +import net.corda.core.internal.InputStreamAndHash +import net.corda.core.internal.deleteRecursively +import net.corda.core.internal.div +import net.corda.core.messaging.startFlow +import net.corda.core.messaging.vaultQueryBy +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.flows.incompatible.version1.AttachmentIssueFlow +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.NodeHandle +import net.corda.testing.driver.NodeParameters +import net.corda.testing.driver.OutOfProcess +import net.corda.testing.driver.driver +import net.corda.testing.driver.internal.incrementalPortAllocation +import net.corda.testing.flows.waitForAllFlowsToComplete +import net.corda.testing.node.NotarySpec +import net.corda.testing.node.TestCordapp +import net.corda.testing.node.internal.cordappWithPackages +import org.junit.Test +import java.util.concurrent.TimeoutException +import net.corda.contracts.incompatible.version1.AttachmentContract as AttachmentContractV1 +import net.corda.flows.incompatible.version1.AttachmentFlow as AttachmentFlowV1 + +class VaultUpdateDeserializationTest { + companion object { + // uses ReceiveFinalityFlow + val flowVersion1 = cordappWithPackages("net.corda.flows.incompatible.version1") + // single state field of type SecureHash.SHA256 with system property driven run-time behaviour: + // -force contract verify failure: -Dnet.corda.contracts.incompatible.AttachmentContract.fail.verify=true + // -force contract state init failure: -Dnet.corda.contracts.incompatible.AttachmentContract.fail.state=true + val contractVersion1 = cordappWithPackages("net.corda.contracts.incompatible.version1") + + fun driverParameters(cordapps: List): DriverParameters { + return DriverParameters( + portAllocation = incrementalPortAllocation(), + inMemoryDB = false, + startNodesInProcess = false, + notarySpecs = listOf(NotarySpec(DUMMY_NOTARY_NAME)), + cordappsForAllNodes = cordapps + ) + } + } + + /* + * Transaction sent from A -> B with Notarisation + * Test that a deserialization error is raised where the receiver node of a transaction has an incompatible contract jar. + * In the case of a notarised transaction, a deserialisation error is thrown in the receiver SignTransactionFlow (before finality) + * upon receiving the transaction to be signed and attempting to record its dependencies. + * The ledger will not record any transactions, and the flow must be retried by the sender upon installing the correct contract jar + * version at the receiver and re-starting the node. + */ + @Test(timeout=300_000) + fun `Notarised transaction fails completely upon receiver deserialization failure collecting signatures when using incompatible contract jar`() { + driver(driverParameters(listOf(flowVersion1, contractVersion1))) { + val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = ALICE_NAME).getOrThrow() + val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1), + systemProperties = mapOf("net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true")), + providedName = BOB_NAME).getOrThrow() + + val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0) + alice.rpc.uploadAttachment(inputStream) + + val stx = alice.rpc.startFlow(::AttachmentIssueFlow, hash, defaultNotaryIdentity).returnValue.getOrThrow(30.seconds) + val spendableState = stx.coreTransaction.outRef(0) + + // NOTE: exception is propagated from Receiver + try { + alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds) + } + catch(e: UnexpectedFlowEndException) { + println("Bob fails to deserialise transaction upon receipt of transaction for signing.") + } + assertEquals(0, bob.rpc.vaultQueryBy().states.size) + assertEquals(1, alice.rpc.vaultQueryBy().states.size) + // check transaction records + @Suppress("DEPRECATION") + assertEquals(1, alice.rpc.internalVerifiedTransactionsSnapshot().size) // issuance only + @Suppress("DEPRECATION") + assertTrue(bob.rpc.internalVerifiedTransactionsSnapshot().isEmpty()) + + // restart Bob with correct contract jar version + (bob as OutOfProcess).process.destroyForcibly() + bob.stop() + (baseDirectory(BOB_NAME) / "cordapps").deleteRecursively() + + val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = BOB_NAME).getOrThrow() + // re-run failed flow + alice.rpc.startFlow(::AttachmentFlowV1, restartedBob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds) + + assertEquals(1, waitForVaultUpdate(restartedBob)) + assertEquals(1, alice.rpc.vaultQueryBy().states.size) + @Suppress("DEPRECATION") + assertTrue(restartedBob.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty()) + } + } + + /* + * Transaction sent from A -> B with Notarisation + * + * Test original deserialization failure behaviour by setting a new configurable java system property. + * The ledger will enter an inconsistent state from which is cannot auto-recover. + */ + @Test(timeout=300_000) + fun `Notarised transaction when using incompatible contract jar and overriden system property`() { + driver(driverParameters(listOf(flowVersion1, contractVersion1))) { + val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = ALICE_NAME).getOrThrow() + val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1), + systemProperties = mapOf("net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true", + "net.corda.vaultupdate.ignore.transaction.deserialization.errors" to "true", + "net.corda.recordtransaction.signature.verification.disabled" to "true")), + providedName = BOB_NAME).getOrThrow() + + val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0) + alice.rpc.uploadAttachment(inputStream) + + val stx = alice.rpc.startFlow(::AttachmentIssueFlow, hash, defaultNotaryIdentity).returnValue.getOrThrow(30.seconds) + val spendableState = stx.coreTransaction.outRef(0) + + // Flow completes successfully (deserialisation error on Receiver node is ignored) + alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds) + + // sender node correctly updated + @Suppress("DEPRECATION") + assertEquals(2, alice.rpc.internalVerifiedTransactionsSnapshot().size) + assertEquals(1, alice.rpc.vaultQueryBy().states.size) + + // receiver node has transaction but vault not updated! + @Suppress("DEPRECATION") + assertEquals(2, bob.rpc.internalVerifiedTransactionsSnapshot().size) + assertEquals(0, bob.rpc.vaultQueryBy().states.size) + } + } + + /* + * Transaction sent from A -> B without Notarisation + * Test that a deserialization error is raised where the receiver node of a finality flow has an incompatible contract jar. + * The ledger will be temporarily inconsistent until the correct contract jar version is installed and the receiver node is re-started. + */ + @Test(timeout=300_000) + fun `un-notarised transaction is hospitalized at receiver upon deserialization failure in vault update when using incompatible contract jar`() { + driver(driverParameters(emptyList())) { + val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = ALICE_NAME).getOrThrow() + val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1), + systemProperties = mapOf("net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true")), + providedName = BOB_NAME).getOrThrow() + + val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0) + alice.rpc.uploadAttachment(inputStream) + + // ISSUE: exception is not propagating from Receiver + try { + alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, null).returnValue.getOrThrow(30.seconds) + } + catch(e: TimeoutException) { + println("Alice: Timeout awaiting flow completion.") + } + assertEquals(0, bob.rpc.vaultQueryBy().states.size) + // check transaction records + @Suppress("DEPRECATION") + assertTrue(alice.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty()) + @Suppress("DEPRECATION") + assertTrue(bob.rpc.internalVerifiedTransactionsSnapshot().isEmpty()) + + // restart Bob with correct contract jar version + (bob as OutOfProcess).process.destroyForcibly() + bob.stop() + (baseDirectory(BOB_NAME) / "cordapps").deleteRecursively() + + val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = BOB_NAME).getOrThrow() + // original hospitalized transaction should now have been re-processed with correct contract jar + assertEquals(1, waitForVaultUpdate(restartedBob)) + @Suppress("DEPRECATION") + assertTrue(restartedBob.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty()) + } + } + + /* + * Transaction sent from A -> B without Notarisation + * Test original deserialization failure behaviour by setting a new configurable java system property. + * The ledger will enter an inconsistent state from which is cannot auto-recover. + */ + @Test(timeout = 300_000) + fun `un-notarised transaction ignores deserialization failure in vault update when using incompatible contract jar and overriden system property`() { + driver(driverParameters(emptyList())) { + val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = ALICE_NAME).getOrThrow() + val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1), + systemProperties = mapOf( + "net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true", + "net.corda.vaultupdate.ignore.transaction.deserialization.errors" to "true", + "net.corda.recordtransaction.signature.verification.disabled" to "true")), + providedName = BOB_NAME).getOrThrow() + + val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0) + alice.rpc.uploadAttachment(inputStream) + + // Note: TransactionDeserialisationException is swallowed on the receiver node (without updating the vault). + val stx = alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, null).returnValue.getOrThrow(30.seconds) + println("Alice txId: ${stx.id}") + + waitForAllFlowsToComplete(bob) + val txId = bob.rpc.stateMachineRecordedTransactionMappingSnapshot().single().transactionId + println("Bob txId: $txId") + + assertEquals(0, bob.rpc.vaultQueryBy().states.size) + + // restart Bob with correct contract jar version + (bob as OutOfProcess).process.destroyForcibly() + bob.stop() + (baseDirectory(BOB_NAME) / "cordapps").deleteRecursively() + + val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)), + providedName = BOB_NAME).getOrThrow() + // transaction recorded + @Suppress("DEPRECATION") + assertNotNull(restartedBob.rpc.internalFindVerifiedTransaction(txId)) + // but vault states not updated + assertEquals(0, restartedBob.rpc.vaultQueryBy().states.size) + } + } + + private fun waitForVaultUpdate(nodeHandle: NodeHandle, maxIterations: Int = 5, iterationDelay: Long = 500): Int { + repeat((0..maxIterations).count()) { + val count = nodeHandle.rpc.vaultQueryBy().states + if (count.isNotEmpty()) { + return count.size + } + Strand.sleep(iterationDelay) + } + return 0 + } +} + diff --git a/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentPartyInfoCacheTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentPartyInfoCacheTest.kt index 42931cebc0..bb8e4ec9fc 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentPartyInfoCacheTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentPartyInfoCacheTest.kt @@ -1,5 +1,6 @@ package net.corda.node.services.network +import net.corda.core.crypto.SecureHash import net.corda.core.node.NodeInfo import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.services.identity.InMemoryIdentityService @@ -35,9 +36,9 @@ class PersistentPartyInfoCacheTest { createNodeInfo(listOf(CHARLIE)))) val partyInfoCache = PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database) partyInfoCache.start() - assertThat(partyInfoCache.getPartyIdByCordaX500Name(ALICE.name)).isEqualTo(ALICE.name.hashCode().toLong()) - assertThat(partyInfoCache.getPartyIdByCordaX500Name(BOB.name)).isEqualTo(BOB.name.hashCode().toLong()) - assertThat(partyInfoCache.getPartyIdByCordaX500Name(CHARLIE.name)).isEqualTo(CHARLIE.name.hashCode().toLong()) + assertThat(partyInfoCache.getPartyIdByCordaX500Name(ALICE.name)).isEqualTo(SecureHash.sha256(ALICE.name.toString())) + assertThat(partyInfoCache.getPartyIdByCordaX500Name(BOB.name)).isEqualTo(SecureHash.sha256(BOB.name.toString())) + assertThat(partyInfoCache.getPartyIdByCordaX500Name(CHARLIE.name)).isEqualTo(SecureHash.sha256(CHARLIE.name.toString())) } @Test(timeout=300_000) @@ -50,9 +51,9 @@ class PersistentPartyInfoCacheTest { // clear network map cache & bootstrap another PersistentInfoCache charlieNetMapCache.clearNetworkMapCache() val partyInfoCache = PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database) - assertThat(partyInfoCache.getPartyIdByCordaX500Name(ALICE.name)).isEqualTo(ALICE.name.hashCode().toLong()) - assertThat(partyInfoCache.getPartyIdByCordaX500Name(BOB.name)).isEqualTo(BOB.name.hashCode().toLong()) - assertThat(partyInfoCache.getPartyIdByCordaX500Name(CHARLIE.name)).isEqualTo(CHARLIE.name.hashCode().toLong()) + assertThat(partyInfoCache.getPartyIdByCordaX500Name(ALICE.name)).isEqualTo(SecureHash.sha256(ALICE.name.toString())) + assertThat(partyInfoCache.getPartyIdByCordaX500Name(BOB.name)).isEqualTo(SecureHash.sha256(BOB.name.toString())) + assertThat(partyInfoCache.getPartyIdByCordaX500Name(CHARLIE.name)).isEqualTo(SecureHash.sha256(CHARLIE.name.toString())) } @Test(timeout=300_000) @@ -63,9 +64,9 @@ class PersistentPartyInfoCacheTest { createNodeInfo(listOf(CHARLIE)))) val partyInfoCache = PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database) partyInfoCache.start() - assertThat(partyInfoCache.getCordaX500NameByPartyId(ALICE.name.hashCode().toLong())).isEqualTo(ALICE.name) - assertThat(partyInfoCache.getCordaX500NameByPartyId(BOB.name.hashCode().toLong())).isEqualTo(BOB.name) - assertThat(partyInfoCache.getCordaX500NameByPartyId(CHARLIE.name.hashCode().toLong())).isEqualTo(CHARLIE.name) + assertThat(partyInfoCache.getCordaX500NameByPartyId(SecureHash.sha256(ALICE.name.toString()))).isEqualTo(ALICE.name) + assertThat(partyInfoCache.getCordaX500NameByPartyId(SecureHash.sha256(BOB.name.toString()))).isEqualTo(BOB.name) + assertThat(partyInfoCache.getCordaX500NameByPartyId(SecureHash.sha256(CHARLIE.name.toString()))).isEqualTo(CHARLIE.name) } @Test(timeout=300_000) @@ -78,9 +79,9 @@ class PersistentPartyInfoCacheTest { // clear network map cache & bootstrap another PersistentInfoCache charlieNetMapCache.clearNetworkMapCache() val partyInfoCache = PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database) - assertThat(partyInfoCache.getCordaX500NameByPartyId(ALICE.name.hashCode().toLong())).isEqualTo(ALICE.name) - assertThat(partyInfoCache.getCordaX500NameByPartyId(BOB.name.hashCode().toLong())).isEqualTo(BOB.name) - assertThat(partyInfoCache.getCordaX500NameByPartyId(CHARLIE.name.hashCode().toLong())).isEqualTo(CHARLIE.name) + assertThat(partyInfoCache.getCordaX500NameByPartyId(SecureHash.sha256(ALICE.name.toString()))).isEqualTo(ALICE.name) + assertThat(partyInfoCache.getCordaX500NameByPartyId(SecureHash.sha256(BOB.name.toString()))).isEqualTo(BOB.name) + assertThat(partyInfoCache.getCordaX500NameByPartyId(SecureHash.sha256(CHARLIE.name.toString()))).isEqualTo(CHARLIE.name) } private fun createNodeInfo(identities: List, diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentPartyInfoCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentPartyInfoCache.kt index e9ba12a3a6..6cee01c45a 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentPartyInfoCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentPartyInfoCache.kt @@ -1,5 +1,6 @@ package net.corda.node.services.network +import net.corda.core.crypto.SecureHash import net.corda.core.identity.CordaX500Name import net.corda.core.internal.NamedCacheFactory import net.corda.core.node.services.NetworkMapCache @@ -14,13 +15,13 @@ class PersistentPartyInfoCache(private val networkMapCache: PersistentNetworkMap private val database: CordaPersistence) { // probably better off using a BiMap here: https://www.baeldung.com/guava-bimap - private val cordaX500NameToPartyIdCache = NonInvalidatingCache( + private val cordaX500NameToPartyIdCache = NonInvalidatingCache( cacheFactory = cacheFactory, name = "RecoveryPartyInfoCache_byCordaX500Name") { key -> database.transaction { queryByCordaX500Name(session, key) } } - private val partyIdToCordaX500NameCache = NonInvalidatingCache( + private val partyIdToCordaX500NameCache = NonInvalidatingCache( cacheFactory = cacheFactory, name = "RecoveryPartyInfoCache_byPartyId") { key -> database.transaction { queryByPartyId(session, key) } @@ -32,48 +33,48 @@ class PersistentPartyInfoCache(private val networkMapCache: PersistentNetworkMap val (snapshot, updates) = networkMapCache.track() snapshot.map { entry -> entry.legalIdentities.map { party -> - add(party.name.hashCode().toLong(), party.name) + add(SecureHash.sha256(party.name.toString()), party.name) } } trackNetworkMapUpdates = updates trackNetworkMapUpdates.cache().forEach { nodeInfo -> nodeInfo.node.legalIdentities.map { party -> - add(party.name.hashCode().toLong(), party.name) + add(SecureHash.sha256(party.name.toString()), party.name) } } } - fun getPartyIdByCordaX500Name(name: CordaX500Name): Long = cordaX500NameToPartyIdCache[name] ?: throw IllegalStateException("Missing cache entry for $name") + fun getPartyIdByCordaX500Name(name: CordaX500Name): SecureHash = cordaX500NameToPartyIdCache[name] ?: throw IllegalStateException("Missing cache entry for $name") - fun getCordaX500NameByPartyId(partyId: Long): CordaX500Name = partyIdToCordaX500NameCache[partyId] ?: throw IllegalStateException("Missing cache entry for $partyId") + fun getCordaX500NameByPartyId(partyId: SecureHash): CordaX500Name = partyIdToCordaX500NameCache[partyId] ?: throw IllegalStateException("Missing cache entry for $partyId") - private fun add(partyHashCode: Long, partyName: CordaX500Name) { + private fun add(partyHashCode: SecureHash, partyName: CordaX500Name) { partyIdToCordaX500NameCache.cache.put(partyHashCode, partyName) cordaX500NameToPartyIdCache.cache.put(partyName, partyHashCode) updateInfoDB(partyHashCode, partyName) } - private fun updateInfoDB(partyHashCode: Long, partyName: CordaX500Name) { + private fun updateInfoDB(partyHashCode: SecureHash, partyName: CordaX500Name) { database.transaction { if (queryByPartyId(session, partyHashCode) == null) { - session.save(DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo(partyHashCode, partyName.toString())) + session.save(DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo(partyHashCode.toString(), partyName.toString())) } } } - private fun queryByCordaX500Name(session: Session, key: CordaX500Name): Long? { + private fun queryByCordaX500Name(session: Session, key: CordaX500Name): SecureHash? { val query = session.createQuery( "FROM ${DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo::class.java.name} WHERE partyName = :partyName", DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo::class.java) query.setParameter("partyName", key.toString()) - return query.resultList.singleOrNull()?.partyId + return query.resultList.singleOrNull()?.let { SecureHash.parse(it.partyId) } } - private fun queryByPartyId(session: Session, key: Long): CordaX500Name? { + private fun queryByPartyId(session: Session, key: SecureHash): CordaX500Name? { val query = session.createQuery( "FROM ${DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo::class.java.name} WHERE partyId = :partyId", DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo::class.java) - query.setParameter("partyId", key) + query.setParameter("partyId", key.toString()) return query.resultList.singleOrNull()?.partyName?.let { CordaX500Name.parse(it) } } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt index 69df1fe9b3..84ea4869ca 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt @@ -39,8 +39,8 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, @Immutable data class PersistentKey( /** PartyId of flow peer **/ - @Column(name = "peer_party_id", nullable = false) - var peerPartyId: Long, + @Column(name = "peer_party_id", length = 144, nullable = false) + var peerPartyId: String, @Column(name = "timestamp", nullable = false) var timestamp: Instant, @@ -49,12 +49,12 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, var timestampDiscriminator: Int ) : Serializable { - constructor(key: Key) : this(key.partyId, key.timestamp, key.timestampDiscriminator) + constructor(key: Key) : this(key.partyId.toString(), key.timestamp, key.timestampDiscriminator) } @CordaSerializable @Entity - @Table(name = "${NODE_DATABASE_PREFIX}sender_distribution_records") + @Table(name = "${NODE_DATABASE_PREFIX}sender_distr_recs") data class DBSenderDistributionRecord( @EmbeddedId var compositeKey: PersistentKey, @@ -69,7 +69,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, fun toSenderDistributionRecord() = SenderDistributionRecord( SecureHash.parse(this.txId), - this.compositeKey.peerPartyId, + SecureHash.parse(this.compositeKey.peerPartyId), this.statesToRecord, this.compositeKey.timestamp ) @@ -77,8 +77,8 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, @CordaSerializable @Entity - @Table(name = "${NODE_DATABASE_PREFIX}receiver_distribution_records") - class DBReceiverDistributionRecord( + @Table(name = "${NODE_DATABASE_PREFIX}receiver_distr_recs") + data class DBReceiverDistributionRecord( @EmbeddedId var compositeKey: PersistentKey, @@ -104,7 +104,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, fun toReceiverDistributionRecord(): ReceiverDistributionRecord { return ReceiverDistributionRecord( SecureHash.parse(this.txId), - this.compositeKey.peerPartyId, + SecureHash.parse(this.compositeKey.peerPartyId), OpaqueBytes(this.distributionList), this.compositeKey.timestamp ) @@ -116,8 +116,8 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, data class DBRecoveryPartyInfo( @Id /** CordaX500Name hashCode() **/ - @Column(name = "party_id", nullable = false) - var partyId: Long, + @Column(name = "party_id", length = 144, nullable = false) + var partyId: String, /** CordaX500Name of party **/ @Column(name = "party_name", nullable = false) @@ -127,11 +127,11 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, data class TimestampKey(val timestamp: Instant, val timestampDiscriminator: Int) class Key( - val partyId: Long, + val partyId: SecureHash, val timestamp: Instant, val timestampDiscriminator: Int = nextDiscriminatorNumber.andIncrement ) { - constructor(key: TimestampKey, partyId: Long): this(partyId, key.timestamp, key.timestampDiscriminator) + constructor(key: TimestampKey, partyId: SecureHash): this(partyId, key.timestamp, key.timestampDiscriminator) companion object { val nextDiscriminatorNumber = AtomicInteger() } @@ -155,7 +155,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, val hashedDistributionList = HashedDistributionList( distributionList.senderStatesToRecord, hashedPeersToStatesToRecord, - HashedDistributionList.PublicHeader(senderRecordingTimestamp) + HashedDistributionList.PublicHeader(senderRecordingTimestamp, timeDiscriminator) ) hashedDistributionList.encrypt(encryptionService) } @@ -170,7 +170,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, val publicHeader = HashedDistributionList.PublicHeader.unauthenticatedDeserialise(distributionList.opaqueData, encryptionService) database.transaction { val receiverDistributionRecord = DBReceiverDistributionRecord( - Key(partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp), + Key(partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp, publicHeader.timeDiscriminator), txId, distributionList.opaqueData, distributionList.receiverStatesToRecord @@ -237,7 +237,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, excludingTxnIds.map { it.toString() })))) } if (peers.isNotEmpty()) { - val peerPartyIds = peers.map { partyInfoCache.getPartyIdByCordaX500Name(it) } + val peerPartyIds = peers.map { partyInfoCache.getPartyIdByCordaX500Name(it).toString() } predicates.add(criteriaBuilder.and(compositeKey.get(PersistentKey::peerPartyId.name).`in`(peerPartyIds))) } criteriaQuery.where(*predicates.toTypedArray()) @@ -275,7 +275,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, predicates.add(criteriaBuilder.and(criteriaBuilder.not(txId.`in`(excludingTxnIds.map { it.toString() })))) } if (initiators.isNotEmpty()) { - val initiatorPartyIds = initiators.map(partyInfoCache::getPartyIdByCordaX500Name) + val initiatorPartyIds = initiators.map { partyInfoCache.getPartyIdByCordaX500Name(it).toString() } predicates.add(criteriaBuilder.and(compositeKey.get(PersistentKey::peerPartyId.name).`in`(initiatorPartyIds))) } criteriaQuery.where(*predicates.toTypedArray()) @@ -319,7 +319,7 @@ abstract class DistributionRecord { @CordaSerializable data class SenderDistributionRecord( override val txId: SecureHash, - val peerPartyId: Long, // CordaX500Name hashCode() + val peerPartyId: SecureHash, // CordaX500Name hashCode() val statesToRecord: StatesToRecord, override val timestamp: Instant ) : DistributionRecord() @@ -327,7 +327,7 @@ data class SenderDistributionRecord( @CordaSerializable data class ReceiverDistributionRecord( override val txId: SecureHash, - val initiatorPartyId: Long, // CordaX500Name hashCode() + val initiatorPartyId: SecureHash, // CordaX500Name hashCode() val encryptedDistributionList: OpaqueBytes, override val timestamp: Instant ) : DistributionRecord() diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/HashedDistributionList.kt b/node/src/main/kotlin/net/corda/node/services/persistence/HashedDistributionList.kt index 910a00ce74..5fee0f24b2 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/HashedDistributionList.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/HashedDistributionList.kt @@ -1,5 +1,6 @@ package net.corda.node.services.persistence +import net.corda.core.crypto.SecureHash import net.corda.core.node.StatesToRecord import net.corda.core.serialization.CordaSerializable import net.corda.node.services.EncryptionService @@ -13,7 +14,7 @@ import java.time.Instant @CordaSerializable data class HashedDistributionList( val senderStatesToRecord: StatesToRecord, - val peerHashToStatesToRecord: Map, + val peerHashToStatesToRecord: Map, val publicHeader: PublicHeader ) { /** @@ -28,7 +29,7 @@ data class HashedDistributionList( out.writeByte(senderStatesToRecord.ordinal) out.writeInt(peerHashToStatesToRecord.size) for (entry in peerHashToStatesToRecord) { - out.writeLong(entry.key) + entry.key.writeTo(out) out.writeByte(entry.value.ordinal) } return encryptionService.encrypt(baos.toByteArray(), publicHeader.serialise()) @@ -37,12 +38,14 @@ data class HashedDistributionList( @CordaSerializable data class PublicHeader( - val senderRecordedTimestamp: Instant + val senderRecordedTimestamp: Instant, + val timeDiscriminator: Int ) { fun serialise(): ByteArray { - val buffer = ByteBuffer.allocate(1 + java.lang.Long.BYTES) + val buffer = ByteBuffer.allocate(1 + java.lang.Long.BYTES + Integer.BYTES) buffer.put(VERSION_TAG.toByte()) buffer.putLong(senderRecordedTimestamp.toEpochMilli()) + buffer.putInt(timeDiscriminator) return buffer.array() } @@ -66,7 +69,8 @@ data class HashedDistributionList( val version = buffer.get().toInt() require(version == VERSION_TAG) { "Unknown distribution list format $version" } val senderRecordedTimestamp = Instant.ofEpochMilli(buffer.getLong()) - return PublicHeader(senderRecordedTimestamp) + val timeDiscriminator = buffer.getInt() + return PublicHeader(senderRecordedTimestamp, timeDiscriminator) } catch (e: Exception) { throw IllegalArgumentException("Corrupt or not a distribution list header", e) } @@ -78,6 +82,7 @@ data class HashedDistributionList( // The version tag is serialised in the header, even though it is separate from the encrypted main body of the distribution list. // This is because the header and the dist list are cryptographically coupled and we want to avoid declaring the version field twice. private const val VERSION_TAG = 1 + private const val SECURE_HASH_LENGTH = 32 private val statesToRecordValues = StatesToRecord.values() // Cache the enum values since .values() returns a new array each time. /** @@ -91,9 +96,11 @@ data class HashedDistributionList( try { val senderStatesToRecord = statesToRecordValues[input.readByte().toInt()] val numPeerHashToStatesToRecords = input.readInt() - val peerHashToStatesToRecord = mutableMapOf() + val peerHashToStatesToRecord = mutableMapOf() repeat(numPeerHashToStatesToRecords) { - peerHashToStatesToRecord[input.readLong()] = statesToRecordValues[input.readByte().toInt()] + val secureHashBytes = ByteArray(SECURE_HASH_LENGTH) + input.readFully(secureHashBytes) + peerHashToStatesToRecord[SecureHash.createSHA256(secureHashBytes)] = statesToRecordValues[input.readByte().toInt()] } return HashedDistributionList(senderStatesToRecord, peerHashToStatesToRecord, publicHeader) } catch (e: Exception) { diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index ffcd38b108..a2623b638d 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -21,6 +21,7 @@ import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.tee import net.corda.core.internal.uncheckedCast +import net.corda.core.internal.warnOnce import net.corda.core.messaging.DataFeed import net.corda.core.node.StatesToRecord import net.corda.core.node.services.KeyManagementService @@ -107,6 +108,8 @@ class NodeVaultService( const val DEFAULT_SOFT_LOCKING_SQL_IN_CLAUSE_SIZE = 16 + private val IGNORE_TRANSACTION_DESERIALIZATION_ERRORS = java.lang.Boolean.getBoolean("net.corda.vaultupdate.ignore.transaction.deserialization.errors") + /** * Establish whether a given state is relevant to a node, given the node's public keys. * @@ -307,18 +310,29 @@ class NodeVaultService( private fun makeUpdates(batch: Iterable, statesToRecord: StatesToRecord, previouslySeen: Boolean): List> { - fun withValidDeserialization(list: List, txId: SecureHash): Map = (0 until list.size).mapNotNull { idx -> - try { - idx to list[idx] - } catch (e: TransactionDeserialisationException) { - // When resolving transaction dependencies we might encounter contracts we haven't installed locally. - // This will cause a failure as we can't deserialize such states in the context of the `appClassloader`. - // For now we ignore these states. - // In the future we will use the AttachmentsClassloader to correctly deserialize and asses the relevancy. - log.warn("Could not deserialize state $idx from transaction $txId. Cause: $e") - null - } - }.toMap() + fun withValidDeserialization(list: List, txId: SecureHash): Map { + var error: TransactionDeserialisationException? = null + val map = (0 until list.size).mapNotNull { idx -> + try { + idx to list[idx] + } catch (e: TransactionDeserialisationException) { + // When resolving transaction dependencies we might encounter contracts we haven't installed locally. + // This will cause a failure as we can't deserialize such states in the context of the `appClassloader`. + // For now we ignore these states. + // In the future we will use the AttachmentsClassloader to correctly deserialize and asses the relevancy. + if (IGNORE_TRANSACTION_DESERIALIZATION_ERRORS) { + log.warnOnce("The current usage of transaction deserialization for the vault is unsafe." + + "Ignoring vault updates due to failed deserialized states may lead to severe problems with ledger consistency. ") + log.warn("Could not deserialize state $idx from transaction $txId. Cause: $e") + } else { + log.error("Could not deserialize state $idx from transaction $txId. Cause: $e") + if(error == null) error = e + } + null + } + }.toMap() + return error?.let { throw it } ?: map + } // Returns only output states that can be deserialised successfully. fun WireTransaction.deserializableOutputStates(): Map> = withValidDeserialization(this.outputs, this.id) @@ -787,7 +801,7 @@ class NodeVaultService( private fun queryTotalStateCount(criteria: QueryCriteria, contractStateType: Class): Long { val (criteriaQuery, criteriaParser) = buildCriteriaQuery(criteria, contractStateType, null) - criteriaQuery.select(criteriaBuilder.countDistinct(criteriaParser.vaultStates)) + criteriaQuery.select(criteriaBuilder.count(criteriaParser.vaultStates)) val query = getSession().createQuery(criteriaQuery) return query.singleResult } diff --git a/node/src/main/resources/migration/node-core.changelog-v25.xml b/node/src/main/resources/migration/node-core.changelog-v25.xml index 9ea40bada9..0e32e3f0c4 100644 --- a/node/src/main/resources/migration/node-core.changelog-v25.xml +++ b/node/src/main/resources/migration/node-core.changelog-v25.xml @@ -11,7 +11,7 @@ - + @@ -21,7 +21,7 @@ - + @@ -30,13 +30,13 @@ - - + + - + @@ -46,7 +46,7 @@ - + @@ -58,14 +58,14 @@ - - + + - + @@ -79,14 +79,14 @@ - - diff --git a/node/src/main/resources/migration/node-core.changelog-v26.xml b/node/src/main/resources/migration/node-core.changelog-v26.xml index b0d4925c7a..f2bdef4833 100644 --- a/node/src/main/resources/migration/node-core.changelog-v26.xml +++ b/node/src/main/resources/migration/node-core.changelog-v26.xml @@ -19,10 +19,4 @@ - - - - - - diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt index 3c81be2ab8..85e086ad61 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt @@ -147,12 +147,12 @@ class DBTransactionStorageLedgerRecoveryTests { val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.SENDER).let { assertEquals(2, it.size) - assertEquals(BOB_NAME.hashCode().toLong(), it.senderRecords[0].compositeKey.peerPartyId) + assertEquals(SecureHash.sha256(BOB_NAME.toString()).toString(), it.senderRecords[0].compositeKey.peerPartyId) assertEquals(ALL_VISIBLE, it.senderRecords[0].statesToRecord) } transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.RECEIVER).let { assertEquals(1, it.size) - assertEquals(BOB_NAME.hashCode().toLong(), it.receiverRecords[0].compositeKey.peerPartyId) + assertEquals(SecureHash.sha256(BOB_NAME.toString()).toString(), it.receiverRecords[0].compositeKey.peerPartyId) assertEquals(ALL_VISIBLE, (HashedDistributionList.decrypt(it.receiverRecords[0].distributionList, encryptionService)).peerHashToStatesToRecord.map { it.value }[0]) } val resultsAll = transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.ALL) @@ -319,8 +319,8 @@ class DBTransactionStorageLedgerRecoveryTests { fun `test lightweight serialization and deserialization of hashed distribution list payload`() { val hashedDistList = HashedDistributionList( ALL_VISIBLE, - mapOf(BOB.name.hashCode().toLong() to NONE, CHARLIE_NAME.hashCode().toLong() to ONLY_RELEVANT), - HashedDistributionList.PublicHeader(now()) + mapOf(SecureHash.sha256(BOB.name.toString()) to NONE, SecureHash.sha256(CHARLIE_NAME.toString()) to ONLY_RELEVANT), + HashedDistributionList.PublicHeader(now(), 1) ) val roundtrip = HashedDistributionList.decrypt(hashedDistList.encrypt(encryptionService), encryptionService) assertThat(roundtrip).isEqualTo(hashedDistList) diff --git a/testing/node-driver/src/main/java/net/corda/testing/driver/SharedMemoryIncremental.java b/testing/node-driver/src/main/java/net/corda/testing/driver/SharedMemoryIncremental.java index 7d97f33140..88e6e726d7 100644 --- a/testing/node-driver/src/main/java/net/corda/testing/driver/SharedMemoryIncremental.java +++ b/testing/node-driver/src/main/java/net/corda/testing/driver/SharedMemoryIncremental.java @@ -76,8 +76,7 @@ public class SharedMemoryIncremental extends PortAllocation { newValue = (oldValue + 1); } boolean reserveSuccess = UNSAFE.compareAndSwapLong(null, startingAddress, oldValue, newValue); - boolean portAvailable = isLocalPortAvailable(newValue); - loopSuccess = reserveSuccess && portAvailable; + loopSuccess = reserveSuccess && isLocalPortAvailable(newValue); } while (!loopSuccess); return (int) newValue; diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/NodeParameters.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/NodeParameters.kt index b1604b94df..12b55fc146 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/NodeParameters.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/NodeParameters.kt @@ -36,7 +36,8 @@ data class NodeParameters( val additionalCordapps: Collection = emptySet(), val flowOverrides: Map>, Class>> = emptyMap(), val logLevelOverride: String? = null, - val rpcAddress: NetworkHostAndPort? = null + val rpcAddress: NetworkHostAndPort? = null, + val systemProperties: Map = emptyMap() ) { /** * Create a new node parameters object with default values. Each parameter can be specified with its wither method which returns a copy @@ -127,4 +128,97 @@ data class NodeParameters( flowOverrides = flowOverrides, logLevelOverride = logLevelOverride) + constructor( + providedName: CordaX500Name?, + rpcUsers: List, + verifierType: VerifierType, + customOverrides: Map, + startInSameProcess: Boolean?, + maximumHeapSize: String, + additionalCordapps: Collection = emptySet(), + flowOverrides: Map>, Class>>, + logLevelOverride: String? = null + ) : this( + providedName, + rpcUsers, + verifierType, + customOverrides, + startInSameProcess, + maximumHeapSize, + additionalCordapps, + flowOverrides, + logLevelOverride, + rpcAddress = null) + + @Suppress("LongParameterList") + fun copy( + providedName: CordaX500Name?, + rpcUsers: List, + verifierType: VerifierType, + customOverrides: Map, + startInSameProcess: Boolean?, + maximumHeapSize: String, + additionalCordapps: Collection = emptySet(), + flowOverrides: Map>, Class>>, + logLevelOverride: String? = null + ) = this.copy( + providedName = providedName, + rpcUsers = rpcUsers, + verifierType = verifierType, + customOverrides = customOverrides, + startInSameProcess = startInSameProcess, + maximumHeapSize = maximumHeapSize, + additionalCordapps = additionalCordapps, + flowOverrides = flowOverrides, + logLevelOverride = logLevelOverride, + rpcAddress = rpcAddress) + + constructor( + providedName: CordaX500Name?, + rpcUsers: List, + verifierType: VerifierType, + customOverrides: Map, + startInSameProcess: Boolean?, + maximumHeapSize: String, + additionalCordapps: Collection = emptySet(), + flowOverrides: Map>, Class>>, + logLevelOverride: String? = null, + rpcAddress: NetworkHostAndPort? = null + ) : this( + providedName, + rpcUsers, + verifierType, + customOverrides, + startInSameProcess, + maximumHeapSize, + additionalCordapps, + flowOverrides, + logLevelOverride, + rpcAddress, + systemProperties = emptyMap()) + + @Suppress("LongParameterList") + fun copy( + providedName: CordaX500Name?, + rpcUsers: List, + verifierType: VerifierType, + customOverrides: Map, + startInSameProcess: Boolean?, + maximumHeapSize: String, + additionalCordapps: Collection = emptySet(), + flowOverrides: Map>, Class>>, + logLevelOverride: String? = null, + rpcAddress: NetworkHostAndPort? = null + ) = this.copy( + providedName = providedName, + rpcUsers = rpcUsers, + verifierType = verifierType, + customOverrides = customOverrides, + startInSameProcess = startInSameProcess, + maximumHeapSize = maximumHeapSize, + additionalCordapps = additionalCordapps, + flowOverrides = flowOverrides, + logLevelOverride = logLevelOverride, + rpcAddress = rpcAddress, + systemProperties = systemProperties) } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt index b4b7d673e8..6ce7a2e419 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt @@ -755,7 +755,7 @@ class DriverDSLImpl( debugPort, bytemanJarPath, bytemanPort, - systemProperties, + systemProperties + parameters.systemProperties, parameters.maximumHeapSize, parameters.logLevelOverride, identifier,