mirror of
https://github.com/corda/corda.git
synced 2024-12-19 21:17:58 +00:00
Merge branch 'release/os/4.11' into seanb/ES-678/release-orchestrator-test
This commit is contained in:
commit
64e032a744
33
.ci/dev/forward-merge/Jenkinsfile
vendored
33
.ci/dev/forward-merge/Jenkinsfile
vendored
@ -1,8 +1,31 @@
|
||||
@Library('corda-shared-build-pipeline-steps@5.1') _
|
||||
|
||||
forwardMerger(
|
||||
targetBranch: 'release/os/4.12',
|
||||
originBranch: 'release/os/4.11',
|
||||
slackChannel: '#build-team-automated-notifications'
|
||||
)
|
||||
/*
|
||||
* Forward merge any changes in current branch to the branch with following version.
|
||||
*
|
||||
* Please note, the branches names are intentionally separated as variables, to minimised conflicts
|
||||
* during automated merges for this file.
|
||||
*
|
||||
* These variables should be updated when a new version is cut
|
||||
*/
|
||||
|
||||
/**
|
||||
* the branch name of origin branch, it should match the current branch
|
||||
* and it acts as a fail-safe inside {@code forwardMerger} pipeline
|
||||
*/
|
||||
String originBranch = 'release/os/4.11'
|
||||
|
||||
/**
|
||||
* the branch name of target branch, it should be the branch with the next version
|
||||
* after the one in current branch.
|
||||
*/
|
||||
String targetBranch = 'release/os/4.12'
|
||||
|
||||
/**
|
||||
* Forward merge any changes between #originBranch and #targetBranch
|
||||
*/
|
||||
forwardMerger(
|
||||
targetBranch: targetBranch,
|
||||
originBranch: originBranch,
|
||||
slackChannel: '#c4-forward-merge-bot-notifications',
|
||||
)
|
||||
|
@ -52,7 +52,7 @@ artemisVersion=2.19.1
|
||||
# TODO Upgrade Jackson only when corda is using kotlin 1.3.10
|
||||
jacksonVersion=2.13.5
|
||||
jacksonKotlinVersion=2.9.7
|
||||
jettyVersion=9.4.19.v20190610
|
||||
jettyVersion=9.4.52.v20230823
|
||||
jerseyVersion=2.25
|
||||
servletVersion=4.0.1
|
||||
assertjVersion=3.12.2
|
||||
|
@ -355,14 +355,14 @@ class FinalityFlowTests : WithFinality {
|
||||
val sdrs = getSenderRecoveryData(stx.id, aliceNode.database).apply {
|
||||
assertEquals(1, this.size)
|
||||
assertEquals(StatesToRecord.ALL_VISIBLE, this[0].statesToRecord)
|
||||
assertEquals(BOB_NAME.hashCode().toLong(), this[0].peerPartyId)
|
||||
assertEquals(SecureHash.sha256(BOB_NAME.toString()), this[0].peerPartyId)
|
||||
}
|
||||
val rdr = getReceiverRecoveryData(stx.id, bobNode).apply {
|
||||
assertNotNull(this)
|
||||
val hashedDL = HashedDistributionList.decrypt(this!!.encryptedDistributionList.bytes, aliceNode.internals.encryptionService)
|
||||
assertEquals(StatesToRecord.ONLY_RELEVANT, hashedDL.senderStatesToRecord)
|
||||
assertEquals(aliceNode.info.singleIdentity().name.hashCode().toLong(), this.initiatorPartyId)
|
||||
assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ALL_VISIBLE), hashedDL.peerHashToStatesToRecord)
|
||||
assertEquals(SecureHash.sha256(aliceNode.info.singleIdentity().name.toString()), this.initiatorPartyId)
|
||||
assertEquals(mapOf<SecureHash, StatesToRecord>(SecureHash.sha256(BOB_NAME.toString()) to StatesToRecord.ALL_VISIBLE), hashedDL.peerHashToStatesToRecord)
|
||||
}
|
||||
validateSenderAndReceiverTimestamps(sdrs, rdr!!)
|
||||
}
|
||||
@ -388,19 +388,19 @@ class FinalityFlowTests : WithFinality {
|
||||
val sdrs = getSenderRecoveryData(stx.id, aliceNode.database).apply {
|
||||
assertEquals(2, this.size)
|
||||
assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord)
|
||||
assertEquals(BOB_NAME.hashCode().toLong(), this[0].peerPartyId)
|
||||
assertEquals(SecureHash.sha256(BOB_NAME.toString()), this[0].peerPartyId)
|
||||
assertEquals(StatesToRecord.ALL_VISIBLE, this[1].statesToRecord)
|
||||
assertEquals(CHARLIE_NAME.hashCode().toLong(), this[1].peerPartyId)
|
||||
assertEquals(SecureHash.sha256(CHARLIE_NAME.toString()), this[1].peerPartyId)
|
||||
}
|
||||
val rdr = getReceiverRecoveryData(stx.id, bobNode).apply {
|
||||
assertNotNull(this)
|
||||
val hashedDL = HashedDistributionList.decrypt(this!!.encryptedDistributionList.bytes, aliceNode.internals.encryptionService)
|
||||
assertEquals(StatesToRecord.ONLY_RELEVANT, hashedDL.senderStatesToRecord)
|
||||
assertEquals(aliceNode.info.singleIdentity().name.hashCode().toLong(), this.initiatorPartyId)
|
||||
assertEquals(SecureHash.sha256(aliceNode.info.singleIdentity().name.toString()), this.initiatorPartyId)
|
||||
// note: Charlie assertion here is using the hinted StatesToRecord value passed to it from Alice
|
||||
assertEquals(mapOf(
|
||||
BOB_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT,
|
||||
CHARLIE_NAME.hashCode().toLong() to StatesToRecord.ALL_VISIBLE
|
||||
assertEquals(mapOf<SecureHash, StatesToRecord>(
|
||||
SecureHash.sha256(BOB_NAME.toString()) to StatesToRecord.ONLY_RELEVANT,
|
||||
SecureHash.sha256(CHARLIE_NAME.toString()) to StatesToRecord.ALL_VISIBLE
|
||||
), hashedDL.peerHashToStatesToRecord)
|
||||
}
|
||||
validateSenderAndReceiverTimestamps(sdrs, rdr!!)
|
||||
@ -452,14 +452,14 @@ class FinalityFlowTests : WithFinality {
|
||||
val sdr = getSenderRecoveryData(stx.id, aliceNode.database).apply {
|
||||
assertEquals(1, this.size)
|
||||
assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord)
|
||||
assertEquals(BOB_NAME.hashCode().toLong(), this[0].peerPartyId)
|
||||
assertEquals(SecureHash.sha256(BOB_NAME.toString()), this[0].peerPartyId)
|
||||
}
|
||||
val rdr = getReceiverRecoveryData(stx.id, bobNode).apply {
|
||||
assertNotNull(this)
|
||||
val hashedDL = HashedDistributionList.decrypt(this!!.encryptedDistributionList.bytes, aliceNode.internals.encryptionService)
|
||||
assertEquals(StatesToRecord.ONLY_RELEVANT, hashedDL.senderStatesToRecord)
|
||||
assertEquals(aliceNode.info.singleIdentity().name.hashCode().toLong(), this.initiatorPartyId)
|
||||
assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT), hashedDL.peerHashToStatesToRecord)
|
||||
assertEquals(SecureHash.sha256(aliceNode.info.singleIdentity().name.toString()), this.initiatorPartyId)
|
||||
assertEquals(mapOf<SecureHash, StatesToRecord>(SecureHash.sha256(BOB_NAME.toString()) to StatesToRecord.ONLY_RELEVANT), hashedDL.peerHashToStatesToRecord)
|
||||
}
|
||||
validateSenderAndReceiverTimestamps(sdr, rdr!!)
|
||||
}
|
||||
|
@ -14,36 +14,17 @@ import net.corda.core.utilities.ProgressTracker
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class LedgerRecoveryFlow(
|
||||
private val recoveryPeers: Collection<Party>,
|
||||
private val timeWindow: RecoveryTimeWindow,
|
||||
private val useAllNetworkNodes: Boolean = false,
|
||||
private val transactionRole: TransactionRole = TransactionRole.ALL,
|
||||
private val dryRun: Boolean = false,
|
||||
private val optimisticInitiatorRecovery: Boolean = false,
|
||||
override val progressTracker: ProgressTracker = ProgressTracker()) : FlowLogic<Map<SecureHash, RecoveryResult>>() {
|
||||
private val parameters: LedgerRecoveryParameters,
|
||||
override val progressTracker: ProgressTracker = ProgressTracker()) : FlowLogic<LedgerRecoveryResult>() {
|
||||
|
||||
@CordaInternal
|
||||
data class ExtraConstructorArgs(val recoveryPeers: Collection<Party>,
|
||||
val timeWindow: RecoveryTimeWindow,
|
||||
val useAllNetworkNodes: Boolean,
|
||||
val transactionRole: TransactionRole,
|
||||
val dryRun: Boolean,
|
||||
val optimisticInitiatorRecovery: Boolean)
|
||||
data class ExtraConstructorArgs(val parameters: LedgerRecoveryParameters)
|
||||
@CordaInternal
|
||||
fun getExtraConstructorArgs() = ExtraConstructorArgs(recoveryPeers, timeWindow, useAllNetworkNodes, transactionRole, dryRun, optimisticInitiatorRecovery)
|
||||
|
||||
// unused constructors added to facilitate Node Shell command invocation
|
||||
constructor(recoveryPeer: Party, timeWindow: RecoveryTimeWindow) : this(setOf(recoveryPeer), timeWindow, false, TransactionRole.ALL, false, false)
|
||||
constructor(recoveryPeer: Party, timeWindow: RecoveryTimeWindow, dryRun: Boolean) : this(setOf(recoveryPeer), timeWindow, false, TransactionRole.ALL, dryRun, false)
|
||||
|
||||
constructor(timeWindow: RecoveryTimeWindow, dryRun: Boolean) : this(emptySet(), timeWindow, false, TransactionRole.ALL, dryRun, false)
|
||||
constructor(timeWindow: RecoveryTimeWindow, dryRun: Boolean, optimisticInitiatorRecovery: Boolean) : this(emptySet(), timeWindow, false, TransactionRole.ALL, dryRun, optimisticInitiatorRecovery)
|
||||
constructor(recoveryPeers: Collection<Party>, timeWindow: RecoveryTimeWindow, dryRun: Boolean) : this(recoveryPeers, timeWindow, false, TransactionRole.ALL, dryRun, false)
|
||||
constructor(recoveryPeers: Collection<Party>, timeWindow: RecoveryTimeWindow, dryRun: Boolean, optimisticInitiatorRecovery: Boolean) : this(recoveryPeers, timeWindow, false, TransactionRole.ALL, dryRun, optimisticInitiatorRecovery)
|
||||
fun getExtraConstructorArgs() = ExtraConstructorArgs(parameters)
|
||||
|
||||
@Suspendable
|
||||
@Throws(LedgerRecoveryException::class)
|
||||
override fun call(): Map<SecureHash, RecoveryResult> {
|
||||
override fun call(): LedgerRecoveryResult {
|
||||
throw NotImplementedError("Enterprise only feature")
|
||||
}
|
||||
}
|
||||
@ -59,6 +40,26 @@ class ReceiveLedgerRecoveryFlow constructor(private val otherSideSession: FlowSe
|
||||
@CordaSerializable
|
||||
class LedgerRecoveryException(message: String) : FlowException("Ledger recovery failed: $message")
|
||||
|
||||
@CordaSerializable
|
||||
data class LedgerRecoveryParameters(
|
||||
val recoveryPeers: Collection<Party>,
|
||||
val timeWindow: RecoveryTimeWindow? = null,
|
||||
val useAllNetworkNodes: Boolean = false,
|
||||
val transactionRole: TransactionRole = TransactionRole.ALL,
|
||||
val dryRun: Boolean = false,
|
||||
val optimisticInitiatorRecovery: Boolean = false,
|
||||
val useTimeWindowNarrowing: Boolean = true,
|
||||
val verboseLogging: Boolean = true,
|
||||
val recoveryBatchSize: Int = 1000
|
||||
)
|
||||
|
||||
@CordaSerializable
|
||||
data class LedgerRecoveryResult(
|
||||
val totalRecoveredRecords: Long,
|
||||
val totalRecoveredTransactions: Long,
|
||||
val totalErrors: Long
|
||||
)
|
||||
|
||||
/**
|
||||
* This specifies which type of transactions to recover based on the transaction role of the recovering node
|
||||
*/
|
||||
@ -80,6 +81,3 @@ data class RecoveryResult(
|
||||
val synchronisedInitiated: Boolean = false, // only attempted if [optimisticInitiatorRecovery] option set to true and [TransactionRecoveryType.INITIATOR]
|
||||
val failureCause: String? = null // reason why a transaction failed to synchronise
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
@ -78,12 +78,7 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow
|
||||
checkParameterHash(stx.networkParametersHash)
|
||||
subFlow(ResolveTransactionsFlow(stx, otherSideSession, statesToRecord, deferredAck))
|
||||
logger.info("Transaction dependencies resolution completed.")
|
||||
try {
|
||||
stx.verify(serviceHub, checkSufficientSignatures)
|
||||
} catch (e: Exception) {
|
||||
logger.warn("Transaction verification failed.")
|
||||
throw e
|
||||
}
|
||||
verifyTx(stx, checkSufficientSignatures)
|
||||
if (checkSufficientSignatures) {
|
||||
// We should only send a transaction to the vault for processing if we did in fact fully verify it, and
|
||||
// there are no missing signatures. We don't want partly signed stuff in the vault.
|
||||
@ -97,6 +92,15 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow
|
||||
}
|
||||
}
|
||||
|
||||
private fun verifyTx(stx: SignedTransaction, localCheckSufficientSignatures: Boolean) {
|
||||
try {
|
||||
stx.verify(serviceHub, localCheckSufficientSignatures)
|
||||
} catch (e: Exception) {
|
||||
logger.warn("Transaction verification failed.")
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
private fun isDeferredAck(payload: Any): Boolean {
|
||||
return payload is SignedTransactionWithDistributionList && checkSufficientSignatures && payload.isFinality
|
||||
}
|
||||
@ -109,7 +113,7 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow
|
||||
checkParameterHash(stx.networkParametersHash)
|
||||
subFlow(ResolveTransactionsFlow(stx, otherSideSession, statesToRecord, true))
|
||||
logger.info("Transaction dependencies resolution completed.")
|
||||
|
||||
verifyTx(stx, false)
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
|
||||
logger.debug { "Peer recording transaction without notary signature." }
|
||||
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx)
|
||||
|
@ -132,7 +132,7 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
|
||||
|
||||
protected open fun isFinality(): Boolean = false
|
||||
|
||||
@Suppress("ComplexCondition", "ComplexMethod", "LongMethod")
|
||||
@Suppress("ComplexCondition", "ComplexMethod", "LongMethod", "TooGenericExceptionThrown")
|
||||
@Suspendable
|
||||
override fun call(): Void? {
|
||||
val networkMaxMessageSize = serviceHub.networkParameters.maxMessageSize
|
||||
@ -151,11 +151,14 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
|
||||
is NotarisationPayload -> TransactionAuthorisationFilter().addAuthorised(getInputTransactions(payload.signedTransaction))
|
||||
is SignedTransaction -> TransactionAuthorisationFilter().addAuthorised(getInputTransactions(payload))
|
||||
is RetrieveAnyTransactionPayload -> TransactionAuthorisationFilter(acceptAll = true)
|
||||
is List<*> -> TransactionAuthorisationFilter().addAuthorised(payload.flatMap { stateAndRef ->
|
||||
if (stateAndRef is StateAndRef<*>) {
|
||||
getInputTransactions(serviceHub.validatedTransactions.getTransaction(stateAndRef.ref.txhash)!!) + stateAndRef.ref.txhash
|
||||
is List<*> -> TransactionAuthorisationFilter().addAuthorised(payload.flatMap { someObject ->
|
||||
if (someObject is StateAndRef<*>) {
|
||||
getInputTransactions(serviceHub.validatedTransactions.getTransaction(someObject.ref.txhash)!!) + someObject.ref.txhash
|
||||
}
|
||||
else if (someObject is NamedByHash) {
|
||||
setOf(someObject.id)
|
||||
} else {
|
||||
throw Exception("Unknown payload type: ${stateAndRef!!::class.java} ?")
|
||||
throw Exception("Unknown payload type: ${someObject!!::class.java} ?")
|
||||
}
|
||||
}.toSet())
|
||||
else -> throw Exception("Unknown payload type: ${payload::class.java} ?")
|
||||
|
@ -26,6 +26,9 @@ class ResolveTransactionsFlow private constructor(
|
||||
constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE)
|
||||
: this(null, txHashes, otherSide, statesToRecord)
|
||||
|
||||
constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord, deferredAck: Boolean)
|
||||
: this(null, txHashes, otherSide, statesToRecord, deferredAck)
|
||||
|
||||
/**
|
||||
* Resolves and validates the dependencies of the specified [SignedTransaction]. Fetches the attachments, but does
|
||||
* *not* validate or store the [SignedTransaction] itself.
|
||||
|
@ -74,7 +74,7 @@ class FinalityFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
|
||||
|
||||
alice.rpc.startFlow(::GetFlowTransaction, txId).returnValue.getOrThrow().apply {
|
||||
assertEquals("V", this.first) // "V" -> VERIFIED
|
||||
assertEquals(CHARLIE_NAME.hashCode().toLong(), this.second) // peer
|
||||
assertEquals(SecureHash.sha256(CHARLIE_NAME.toString()).toString(), this.second) // peer
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -83,9 +83,9 @@ class FinalityFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
|
||||
|
||||
// Internal use for testing only!!
|
||||
@StartableByRPC
|
||||
class GetFlowTransaction(private val txId: SecureHash) : FlowLogic<Pair<String, Long>>() {
|
||||
class GetFlowTransaction(private val txId: SecureHash) : FlowLogic<Pair<String, String>>() {
|
||||
@Suspendable
|
||||
override fun call(): Pair<String, Long> {
|
||||
override fun call(): Pair<String, String> {
|
||||
val transactionStatus = serviceHub.jdbcSession().prepareStatement("select * from node_transactions where tx_id = ?")
|
||||
.apply { setString(1, txId.toString()) }
|
||||
.use { ps ->
|
||||
@ -94,12 +94,12 @@ class GetFlowTransaction(private val txId: SecureHash) : FlowLogic<Pair<String,
|
||||
rs.getString(4) // TransactionStatus
|
||||
}
|
||||
}
|
||||
val receiverPartyId = serviceHub.jdbcSession().prepareStatement("select * from node_sender_distribution_records where transaction_id = ?")
|
||||
val receiverPartyId = serviceHub.jdbcSession().prepareStatement("select * from node_sender_distr_recs where transaction_id = ?")
|
||||
.apply { setString(1, txId.toString()) }
|
||||
.use { ps ->
|
||||
ps.executeQuery().use { rs ->
|
||||
rs.next()
|
||||
rs.getLong(4) // receiverPartyId
|
||||
rs.getString(4) // receiverPartyId
|
||||
}
|
||||
}
|
||||
return Pair(transactionStatus, receiverPartyId)
|
||||
|
@ -0,0 +1,33 @@
|
||||
package net.corda.contracts.incompatible.version1
|
||||
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.TransactionVerificationException
|
||||
import net.corda.core.contracts.TypeOnlyCommandData
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.serialization.internal.AttachmentsClassLoader
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
|
||||
class AttachmentContract : Contract {
|
||||
|
||||
private val FAIL_CONTRACT_VERIFY = java.lang.Boolean.getBoolean("net.corda.contracts.incompatible.AttachmentContract.fail.verify")
|
||||
override fun verify(tx: LedgerTransaction) {
|
||||
if (FAIL_CONTRACT_VERIFY) throw object:TransactionVerificationException(tx.id, "AttachmentContract verify failed.", null) {}
|
||||
val state = tx.outputsOfType<State>().single()
|
||||
// we check that at least one has the matching hash, the other will be the contract
|
||||
require(tx.attachments.any { it.id == state.hash }) {"At least one attachment in transaction must match hash ${state.hash}"}
|
||||
}
|
||||
|
||||
object Command : TypeOnlyCommandData()
|
||||
|
||||
data class State(val hash: SecureHash.SHA256) : ContractState {
|
||||
private val FAIL_CONTRACT_STATE = java.lang.Boolean.getBoolean("net.corda.contracts.incompatible.AttachmentContract.fail.state") && (this.javaClass.classLoader !is AttachmentsClassLoader)
|
||||
init {
|
||||
if (FAIL_CONTRACT_STATE) throw TransactionVerificationException.TransactionRequiredContractUnspecifiedException(hash,"AttachmentContract state initialisation failed.")
|
||||
}
|
||||
override val participants: List<AbstractParty> = emptyList()
|
||||
}
|
||||
}
|
||||
|
||||
const val ATTACHMENT_PROGRAM_ID = "net.corda.contracts.incompatible.version1.AttachmentContract"
|
@ -0,0 +1,25 @@
|
||||
package net.corda.contracts.incompatible.version2
|
||||
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.TypeOnlyCommandData
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
|
||||
class AttachmentContract : Contract {
|
||||
override fun verify(tx: LedgerTransaction) {
|
||||
val state = tx.outputsOfType<State>().single()
|
||||
// we check that at least one has the matching hash, the other will be the contract
|
||||
require(tx.attachments.any { it.id == SecureHash.SHA256(state.opaqueBytes.bytes) }) {"At least one attachment in transaction must match hash ${state.opaqueBytes}"}
|
||||
}
|
||||
|
||||
object Command : TypeOnlyCommandData()
|
||||
|
||||
data class State(val opaqueBytes: OpaqueBytes) : ContractState {
|
||||
override val participants: List<AbstractParty> = emptyList()
|
||||
}
|
||||
}
|
||||
|
||||
const val ATTACHMENT_PROGRAM_ID = "net.corda.contracts.incompatible.version2.AttachmentContract"
|
@ -0,0 +1,94 @@
|
||||
package net.corda.flows.incompatible.version1
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.contracts.incompatible.version1.ATTACHMENT_PROGRAM_ID
|
||||
import net.corda.contracts.incompatible.version1.AttachmentContract
|
||||
import net.corda.core.contracts.Command
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.CollectSignaturesFlow
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.ReceiveFinalityFlow
|
||||
import net.corda.core.flows.SignTransactionFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.unwrap
|
||||
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class AttachmentFlow(private val otherSide: Party,
|
||||
private val notary: Party,
|
||||
private val attachId: SecureHash.SHA256,
|
||||
private val notariseInputState: StateAndRef<AttachmentContract.State>? = null) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
object SIGNING : ProgressTracker.Step("Signing transaction")
|
||||
|
||||
override val progressTracker: ProgressTracker = ProgressTracker(SIGNING)
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val session = initiateFlow(otherSide)
|
||||
val notarise = notariseInputState != null
|
||||
session.send(notarise) // inform peer whether to sign for notarisation
|
||||
|
||||
// Create a trivial transaction with an output that describes the attachment, and the attachment itself
|
||||
val ptx = TransactionBuilder(notary)
|
||||
.addOutputState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID)
|
||||
.addAttachment(attachId)
|
||||
if (notarise) {
|
||||
ptx.addInputState(notariseInputState!!)
|
||||
ptx.addCommand(AttachmentContract.Command, ourIdentity.owningKey, otherSide.owningKey)
|
||||
}
|
||||
else
|
||||
ptx.addCommand(AttachmentContract.Command, ourIdentity.owningKey)
|
||||
|
||||
progressTracker.currentStep = SIGNING
|
||||
|
||||
val stx = serviceHub.signInitialTransaction(ptx)
|
||||
val ftx = if (notarise) {
|
||||
subFlow(CollectSignaturesFlow(stx, listOf(session)))
|
||||
} else stx
|
||||
|
||||
return subFlow(FinalityFlow(ftx, setOf(session), statesToRecord = StatesToRecord.ALL_VISIBLE))
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(AttachmentFlow::class)
|
||||
class StoreAttachmentFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val notarise = otherSide.receive<Boolean>().unwrap { it }
|
||||
if (notarise) {
|
||||
val stx = subFlow(object : SignTransactionFlow(otherSide) {
|
||||
override fun checkTransaction(stx: SignedTransaction) {
|
||||
}
|
||||
})
|
||||
subFlow(ReceiveFinalityFlow(otherSide, stx.id, statesToRecord = StatesToRecord.ALL_VISIBLE))
|
||||
} else {
|
||||
subFlow(ReceiveFinalityFlow(otherSide, statesToRecord = StatesToRecord.ALL_VISIBLE))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class AttachmentIssueFlow(private val attachId: SecureHash.SHA256,
|
||||
private val notary: Party): FlowLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val builder = TransactionBuilder(notary)
|
||||
builder.addAttachment(attachId)
|
||||
builder.addOutputState(TransactionState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID, notary))
|
||||
builder.addCommand(Command(AttachmentContract.Command, listOf(ourIdentity.owningKey)))
|
||||
val tx = serviceHub.signInitialTransaction(builder, ourIdentity.owningKey)
|
||||
return subFlow(FinalityFlow(tx, emptySet<FlowSession>(), statesToRecord = StatesToRecord.ALL_VISIBLE))
|
||||
}
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
package net.corda.flows.incompatible.version2
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.contracts.incompatible.version2.ATTACHMENT_PROGRAM_ID
|
||||
import net.corda.contracts.incompatible.version2.AttachmentContract
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.ReceiveTransactionFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class AttachmentFlow(private val otherSide: Party,
|
||||
private val notary: Party,
|
||||
private val attachId: SecureHash.SHA256) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
object SIGNING : ProgressTracker.Step("Signing transaction")
|
||||
|
||||
override val progressTracker: ProgressTracker = ProgressTracker(SIGNING)
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
// Create a trivial transaction with an output that describes the attachment, and the attachment itself
|
||||
val ptx = TransactionBuilder(notary)
|
||||
.addOutputState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID)
|
||||
.addCommand(AttachmentContract.Command, ourIdentity.owningKey)
|
||||
.addAttachment(attachId)
|
||||
|
||||
progressTracker.currentStep = SIGNING
|
||||
|
||||
val stx = serviceHub.signInitialTransaction(ptx)
|
||||
|
||||
// Send the transaction to the other recipient
|
||||
return subFlow(FinalityFlow(stx, initiateFlow(otherSide)))
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(AttachmentFlow::class)
|
||||
class StoreAttachmentFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
// purposely prevent transaction verification and recording in ReceiveTransactionFlow
|
||||
val stx = subFlow(ReceiveTransactionFlow(otherSide, checkSufficientSignatures = false, statesToRecord = StatesToRecord.ALL_VISIBLE))
|
||||
logger.info("StoreAttachmentFlow: successfully received fully signed tx. Sending it to the vault for processing.")
|
||||
|
||||
serviceHub.recordTransactions(StatesToRecord.ALL_VISIBLE, setOf(stx))
|
||||
logger.info("StoreAttachmentFlow: successfully recorded received transaction locally.")
|
||||
}
|
||||
}
|
@ -0,0 +1,56 @@
|
||||
package net.corda.flows.incompatible.version3
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.contracts.incompatible.version1.ATTACHMENT_PROGRAM_ID
|
||||
import net.corda.contracts.incompatible.version1.AttachmentContract
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.ReceiveTransactionFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class AttachmentFlow(private val otherSide: Party,
|
||||
private val notary: Party,
|
||||
private val attachId: SecureHash.SHA256) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
object SIGNING : ProgressTracker.Step("Signing transaction")
|
||||
|
||||
override val progressTracker: ProgressTracker = ProgressTracker(SIGNING)
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
// Create a trivial transaction with an output that describes the attachment, and the attachment itself
|
||||
val ptx = TransactionBuilder(notary)
|
||||
.addOutputState(AttachmentContract.State(attachId), ATTACHMENT_PROGRAM_ID)
|
||||
.addCommand(AttachmentContract.Command, ourIdentity.owningKey)
|
||||
.addAttachment(attachId)
|
||||
|
||||
progressTracker.currentStep = SIGNING
|
||||
|
||||
val stx = serviceHub.signInitialTransaction(ptx)
|
||||
|
||||
// Send the transaction to the other recipient
|
||||
return subFlow(FinalityFlow(stx, initiateFlow(otherSide)))
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(AttachmentFlow::class)
|
||||
class StoreAttachmentFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
// purposely enable transaction verification and recording in ReceiveTransactionFlow
|
||||
subFlow(ReceiveTransactionFlow(otherSide, checkSufficientSignatures = true, statesToRecord = StatesToRecord.ALL_VISIBLE))
|
||||
logger.info("StoreAttachmentFlow: successfully received fully signed tx. Sending it to the vault for processing.")
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,249 @@
|
||||
package net.corda.node
|
||||
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import junit.framework.TestCase.assertEquals
|
||||
import junit.framework.TestCase.assertNotNull
|
||||
import junit.framework.TestCase.assertTrue
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
import net.corda.core.internal.InputStreamAndHash
|
||||
import net.corda.core.internal.deleteRecursively
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.messaging.vaultQueryBy
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.flows.incompatible.version1.AttachmentIssueFlow
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.NodeParameters
|
||||
import net.corda.testing.driver.OutOfProcess
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.driver.internal.incrementalPortAllocation
|
||||
import net.corda.testing.flows.waitForAllFlowsToComplete
|
||||
import net.corda.testing.node.NotarySpec
|
||||
import net.corda.testing.node.TestCordapp
|
||||
import net.corda.testing.node.internal.cordappWithPackages
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.TimeoutException
|
||||
import net.corda.contracts.incompatible.version1.AttachmentContract as AttachmentContractV1
|
||||
import net.corda.flows.incompatible.version1.AttachmentFlow as AttachmentFlowV1
|
||||
|
||||
class VaultUpdateDeserializationTest {
|
||||
companion object {
|
||||
// uses ReceiveFinalityFlow
|
||||
val flowVersion1 = cordappWithPackages("net.corda.flows.incompatible.version1")
|
||||
// single state field of type SecureHash.SHA256 with system property driven run-time behaviour:
|
||||
// -force contract verify failure: -Dnet.corda.contracts.incompatible.AttachmentContract.fail.verify=true
|
||||
// -force contract state init failure: -Dnet.corda.contracts.incompatible.AttachmentContract.fail.state=true
|
||||
val contractVersion1 = cordappWithPackages("net.corda.contracts.incompatible.version1")
|
||||
|
||||
fun driverParameters(cordapps: List<TestCordapp>): DriverParameters {
|
||||
return DriverParameters(
|
||||
portAllocation = incrementalPortAllocation(),
|
||||
inMemoryDB = false,
|
||||
startNodesInProcess = false,
|
||||
notarySpecs = listOf(NotarySpec(DUMMY_NOTARY_NAME)),
|
||||
cordappsForAllNodes = cordapps
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Transaction sent from A -> B with Notarisation
|
||||
* Test that a deserialization error is raised where the receiver node of a transaction has an incompatible contract jar.
|
||||
* In the case of a notarised transaction, a deserialisation error is thrown in the receiver SignTransactionFlow (before finality)
|
||||
* upon receiving the transaction to be signed and attempting to record its dependencies.
|
||||
* The ledger will not record any transactions, and the flow must be retried by the sender upon installing the correct contract jar
|
||||
* version at the receiver and re-starting the node.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `Notarised transaction fails completely upon receiver deserialization failure collecting signatures when using incompatible contract jar`() {
|
||||
driver(driverParameters(listOf(flowVersion1, contractVersion1))) {
|
||||
val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1),
|
||||
systemProperties = mapOf("net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true")),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
|
||||
val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0)
|
||||
alice.rpc.uploadAttachment(inputStream)
|
||||
|
||||
val stx = alice.rpc.startFlow(::AttachmentIssueFlow, hash, defaultNotaryIdentity).returnValue.getOrThrow(30.seconds)
|
||||
val spendableState = stx.coreTransaction.outRef<AttachmentContractV1.State>(0)
|
||||
|
||||
// NOTE: exception is propagated from Receiver
|
||||
try {
|
||||
alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds)
|
||||
}
|
||||
catch(e: UnexpectedFlowEndException) {
|
||||
println("Bob fails to deserialise transaction upon receipt of transaction for signing.")
|
||||
}
|
||||
assertEquals(0, bob.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
assertEquals(1, alice.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
// check transaction records
|
||||
@Suppress("DEPRECATION")
|
||||
assertEquals(1, alice.rpc.internalVerifiedTransactionsSnapshot().size) // issuance only
|
||||
@Suppress("DEPRECATION")
|
||||
assertTrue(bob.rpc.internalVerifiedTransactionsSnapshot().isEmpty())
|
||||
|
||||
// restart Bob with correct contract jar version
|
||||
(bob as OutOfProcess).process.destroyForcibly()
|
||||
bob.stop()
|
||||
(baseDirectory(BOB_NAME) / "cordapps").deleteRecursively()
|
||||
|
||||
val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
// re-run failed flow
|
||||
alice.rpc.startFlow(::AttachmentFlowV1, restartedBob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds)
|
||||
|
||||
assertEquals(1, waitForVaultUpdate(restartedBob))
|
||||
assertEquals(1, alice.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
@Suppress("DEPRECATION")
|
||||
assertTrue(restartedBob.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty())
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Transaction sent from A -> B with Notarisation
|
||||
*
|
||||
* Test original deserialization failure behaviour by setting a new configurable java system property.
|
||||
* The ledger will enter an inconsistent state from which is cannot auto-recover.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `Notarised transaction when using incompatible contract jar and overriden system property`() {
|
||||
driver(driverParameters(listOf(flowVersion1, contractVersion1))) {
|
||||
val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1),
|
||||
systemProperties = mapOf("net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true",
|
||||
"net.corda.vaultupdate.ignore.transaction.deserialization.errors" to "true",
|
||||
"net.corda.recordtransaction.signature.verification.disabled" to "true")),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
|
||||
val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0)
|
||||
alice.rpc.uploadAttachment(inputStream)
|
||||
|
||||
val stx = alice.rpc.startFlow(::AttachmentIssueFlow, hash, defaultNotaryIdentity).returnValue.getOrThrow(30.seconds)
|
||||
val spendableState = stx.coreTransaction.outRef<AttachmentContractV1.State>(0)
|
||||
|
||||
// Flow completes successfully (deserialisation error on Receiver node is ignored)
|
||||
alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, spendableState).returnValue.getOrThrow(30.seconds)
|
||||
|
||||
// sender node correctly updated
|
||||
@Suppress("DEPRECATION")
|
||||
assertEquals(2, alice.rpc.internalVerifiedTransactionsSnapshot().size)
|
||||
assertEquals(1, alice.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
|
||||
// receiver node has transaction but vault not updated!
|
||||
@Suppress("DEPRECATION")
|
||||
assertEquals(2, bob.rpc.internalVerifiedTransactionsSnapshot().size)
|
||||
assertEquals(0, bob.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Transaction sent from A -> B without Notarisation
|
||||
* Test that a deserialization error is raised where the receiver node of a finality flow has an incompatible contract jar.
|
||||
* The ledger will be temporarily inconsistent until the correct contract jar version is installed and the receiver node is re-started.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `un-notarised transaction is hospitalized at receiver upon deserialization failure in vault update when using incompatible contract jar`() {
|
||||
driver(driverParameters(emptyList())) {
|
||||
val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1),
|
||||
systemProperties = mapOf("net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true")),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
|
||||
val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0)
|
||||
alice.rpc.uploadAttachment(inputStream)
|
||||
|
||||
// ISSUE: exception is not propagating from Receiver
|
||||
try {
|
||||
alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, null).returnValue.getOrThrow(30.seconds)
|
||||
}
|
||||
catch(e: TimeoutException) {
|
||||
println("Alice: Timeout awaiting flow completion.")
|
||||
}
|
||||
assertEquals(0, bob.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
// check transaction records
|
||||
@Suppress("DEPRECATION")
|
||||
assertTrue(alice.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty())
|
||||
@Suppress("DEPRECATION")
|
||||
assertTrue(bob.rpc.internalVerifiedTransactionsSnapshot().isEmpty())
|
||||
|
||||
// restart Bob with correct contract jar version
|
||||
(bob as OutOfProcess).process.destroyForcibly()
|
||||
bob.stop()
|
||||
(baseDirectory(BOB_NAME) / "cordapps").deleteRecursively()
|
||||
|
||||
val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
// original hospitalized transaction should now have been re-processed with correct contract jar
|
||||
assertEquals(1, waitForVaultUpdate(restartedBob))
|
||||
@Suppress("DEPRECATION")
|
||||
assertTrue(restartedBob.rpc.internalVerifiedTransactionsSnapshot().isNotEmpty())
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Transaction sent from A -> B without Notarisation
|
||||
* Test original deserialization failure behaviour by setting a new configurable java system property.
|
||||
* The ledger will enter an inconsistent state from which is cannot auto-recover.
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `un-notarised transaction ignores deserialization failure in vault update when using incompatible contract jar and overriden system property`() {
|
||||
driver(driverParameters(emptyList())) {
|
||||
val alice = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1),
|
||||
systemProperties = mapOf(
|
||||
"net.corda.contracts.incompatible.AttachmentContract.fail.state" to "true",
|
||||
"net.corda.vaultupdate.ignore.transaction.deserialization.errors" to "true",
|
||||
"net.corda.recordtransaction.signature.verification.disabled" to "true")),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
|
||||
val (inputStream, hash) = InputStreamAndHash.createInMemoryTestZip(1024, 0)
|
||||
alice.rpc.uploadAttachment(inputStream)
|
||||
|
||||
// Note: TransactionDeserialisationException is swallowed on the receiver node (without updating the vault).
|
||||
val stx = alice.rpc.startFlow(::AttachmentFlowV1, bob.nodeInfo.singleIdentity(), defaultNotaryIdentity, hash, null).returnValue.getOrThrow(30.seconds)
|
||||
println("Alice txId: ${stx.id}")
|
||||
|
||||
waitForAllFlowsToComplete(bob)
|
||||
val txId = bob.rpc.stateMachineRecordedTransactionMappingSnapshot().single().transactionId
|
||||
println("Bob txId: $txId")
|
||||
|
||||
assertEquals(0, bob.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
|
||||
// restart Bob with correct contract jar version
|
||||
(bob as OutOfProcess).process.destroyForcibly()
|
||||
bob.stop()
|
||||
(baseDirectory(BOB_NAME) / "cordapps").deleteRecursively()
|
||||
|
||||
val restartedBob = startNode(NodeParameters(additionalCordapps = listOf(flowVersion1, contractVersion1)),
|
||||
providedName = BOB_NAME).getOrThrow()
|
||||
// transaction recorded
|
||||
@Suppress("DEPRECATION")
|
||||
assertNotNull(restartedBob.rpc.internalFindVerifiedTransaction(txId))
|
||||
// but vault states not updated
|
||||
assertEquals(0, restartedBob.rpc.vaultQueryBy<AttachmentContractV1.State>().states.size)
|
||||
}
|
||||
}
|
||||
|
||||
private fun waitForVaultUpdate(nodeHandle: NodeHandle, maxIterations: Int = 5, iterationDelay: Long = 500): Int {
|
||||
repeat((0..maxIterations).count()) {
|
||||
val count = nodeHandle.rpc.vaultQueryBy<AttachmentContractV1.State>().states
|
||||
if (count.isNotEmpty()) {
|
||||
return count.size
|
||||
}
|
||||
Strand.sleep(iterationDelay)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.services.identity.InMemoryIdentityService
|
||||
@ -35,9 +36,9 @@ class PersistentPartyInfoCacheTest {
|
||||
createNodeInfo(listOf(CHARLIE))))
|
||||
val partyInfoCache = PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database)
|
||||
partyInfoCache.start()
|
||||
assertThat(partyInfoCache.getPartyIdByCordaX500Name(ALICE.name)).isEqualTo(ALICE.name.hashCode().toLong())
|
||||
assertThat(partyInfoCache.getPartyIdByCordaX500Name(BOB.name)).isEqualTo(BOB.name.hashCode().toLong())
|
||||
assertThat(partyInfoCache.getPartyIdByCordaX500Name(CHARLIE.name)).isEqualTo(CHARLIE.name.hashCode().toLong())
|
||||
assertThat(partyInfoCache.getPartyIdByCordaX500Name(ALICE.name)).isEqualTo(SecureHash.sha256(ALICE.name.toString()))
|
||||
assertThat(partyInfoCache.getPartyIdByCordaX500Name(BOB.name)).isEqualTo(SecureHash.sha256(BOB.name.toString()))
|
||||
assertThat(partyInfoCache.getPartyIdByCordaX500Name(CHARLIE.name)).isEqualTo(SecureHash.sha256(CHARLIE.name.toString()))
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@ -50,9 +51,9 @@ class PersistentPartyInfoCacheTest {
|
||||
// clear network map cache & bootstrap another PersistentInfoCache
|
||||
charlieNetMapCache.clearNetworkMapCache()
|
||||
val partyInfoCache = PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database)
|
||||
assertThat(partyInfoCache.getPartyIdByCordaX500Name(ALICE.name)).isEqualTo(ALICE.name.hashCode().toLong())
|
||||
assertThat(partyInfoCache.getPartyIdByCordaX500Name(BOB.name)).isEqualTo(BOB.name.hashCode().toLong())
|
||||
assertThat(partyInfoCache.getPartyIdByCordaX500Name(CHARLIE.name)).isEqualTo(CHARLIE.name.hashCode().toLong())
|
||||
assertThat(partyInfoCache.getPartyIdByCordaX500Name(ALICE.name)).isEqualTo(SecureHash.sha256(ALICE.name.toString()))
|
||||
assertThat(partyInfoCache.getPartyIdByCordaX500Name(BOB.name)).isEqualTo(SecureHash.sha256(BOB.name.toString()))
|
||||
assertThat(partyInfoCache.getPartyIdByCordaX500Name(CHARLIE.name)).isEqualTo(SecureHash.sha256(CHARLIE.name.toString()))
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@ -63,9 +64,9 @@ class PersistentPartyInfoCacheTest {
|
||||
createNodeInfo(listOf(CHARLIE))))
|
||||
val partyInfoCache = PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database)
|
||||
partyInfoCache.start()
|
||||
assertThat(partyInfoCache.getCordaX500NameByPartyId(ALICE.name.hashCode().toLong())).isEqualTo(ALICE.name)
|
||||
assertThat(partyInfoCache.getCordaX500NameByPartyId(BOB.name.hashCode().toLong())).isEqualTo(BOB.name)
|
||||
assertThat(partyInfoCache.getCordaX500NameByPartyId(CHARLIE.name.hashCode().toLong())).isEqualTo(CHARLIE.name)
|
||||
assertThat(partyInfoCache.getCordaX500NameByPartyId(SecureHash.sha256(ALICE.name.toString()))).isEqualTo(ALICE.name)
|
||||
assertThat(partyInfoCache.getCordaX500NameByPartyId(SecureHash.sha256(BOB.name.toString()))).isEqualTo(BOB.name)
|
||||
assertThat(partyInfoCache.getCordaX500NameByPartyId(SecureHash.sha256(CHARLIE.name.toString()))).isEqualTo(CHARLIE.name)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@ -78,9 +79,9 @@ class PersistentPartyInfoCacheTest {
|
||||
// clear network map cache & bootstrap another PersistentInfoCache
|
||||
charlieNetMapCache.clearNetworkMapCache()
|
||||
val partyInfoCache = PersistentPartyInfoCache(charlieNetMapCache, TestingNamedCacheFactory(), database)
|
||||
assertThat(partyInfoCache.getCordaX500NameByPartyId(ALICE.name.hashCode().toLong())).isEqualTo(ALICE.name)
|
||||
assertThat(partyInfoCache.getCordaX500NameByPartyId(BOB.name.hashCode().toLong())).isEqualTo(BOB.name)
|
||||
assertThat(partyInfoCache.getCordaX500NameByPartyId(CHARLIE.name.hashCode().toLong())).isEqualTo(CHARLIE.name)
|
||||
assertThat(partyInfoCache.getCordaX500NameByPartyId(SecureHash.sha256(ALICE.name.toString()))).isEqualTo(ALICE.name)
|
||||
assertThat(partyInfoCache.getCordaX500NameByPartyId(SecureHash.sha256(BOB.name.toString()))).isEqualTo(BOB.name)
|
||||
assertThat(partyInfoCache.getCordaX500NameByPartyId(SecureHash.sha256(CHARLIE.name.toString()))).isEqualTo(CHARLIE.name)
|
||||
}
|
||||
|
||||
private fun createNodeInfo(identities: List<TestIdentity>,
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.NamedCacheFactory
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
@ -14,13 +15,13 @@ class PersistentPartyInfoCache(private val networkMapCache: PersistentNetworkMap
|
||||
private val database: CordaPersistence) {
|
||||
|
||||
// probably better off using a BiMap here: https://www.baeldung.com/guava-bimap
|
||||
private val cordaX500NameToPartyIdCache = NonInvalidatingCache<CordaX500Name, Long?>(
|
||||
private val cordaX500NameToPartyIdCache = NonInvalidatingCache<CordaX500Name, SecureHash?>(
|
||||
cacheFactory = cacheFactory,
|
||||
name = "RecoveryPartyInfoCache_byCordaX500Name") { key ->
|
||||
database.transaction { queryByCordaX500Name(session, key) }
|
||||
}
|
||||
|
||||
private val partyIdToCordaX500NameCache = NonInvalidatingCache<Long, CordaX500Name?>(
|
||||
private val partyIdToCordaX500NameCache = NonInvalidatingCache<SecureHash, CordaX500Name?>(
|
||||
cacheFactory = cacheFactory,
|
||||
name = "RecoveryPartyInfoCache_byPartyId") { key ->
|
||||
database.transaction { queryByPartyId(session, key) }
|
||||
@ -32,48 +33,48 @@ class PersistentPartyInfoCache(private val networkMapCache: PersistentNetworkMap
|
||||
val (snapshot, updates) = networkMapCache.track()
|
||||
snapshot.map { entry ->
|
||||
entry.legalIdentities.map { party ->
|
||||
add(party.name.hashCode().toLong(), party.name)
|
||||
add(SecureHash.sha256(party.name.toString()), party.name)
|
||||
}
|
||||
}
|
||||
trackNetworkMapUpdates = updates
|
||||
trackNetworkMapUpdates.cache().forEach { nodeInfo ->
|
||||
nodeInfo.node.legalIdentities.map { party ->
|
||||
add(party.name.hashCode().toLong(), party.name)
|
||||
add(SecureHash.sha256(party.name.toString()), party.name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun getPartyIdByCordaX500Name(name: CordaX500Name): Long = cordaX500NameToPartyIdCache[name] ?: throw IllegalStateException("Missing cache entry for $name")
|
||||
fun getPartyIdByCordaX500Name(name: CordaX500Name): SecureHash = cordaX500NameToPartyIdCache[name] ?: throw IllegalStateException("Missing cache entry for $name")
|
||||
|
||||
fun getCordaX500NameByPartyId(partyId: Long): CordaX500Name = partyIdToCordaX500NameCache[partyId] ?: throw IllegalStateException("Missing cache entry for $partyId")
|
||||
fun getCordaX500NameByPartyId(partyId: SecureHash): CordaX500Name = partyIdToCordaX500NameCache[partyId] ?: throw IllegalStateException("Missing cache entry for $partyId")
|
||||
|
||||
private fun add(partyHashCode: Long, partyName: CordaX500Name) {
|
||||
private fun add(partyHashCode: SecureHash, partyName: CordaX500Name) {
|
||||
partyIdToCordaX500NameCache.cache.put(partyHashCode, partyName)
|
||||
cordaX500NameToPartyIdCache.cache.put(partyName, partyHashCode)
|
||||
updateInfoDB(partyHashCode, partyName)
|
||||
}
|
||||
|
||||
private fun updateInfoDB(partyHashCode: Long, partyName: CordaX500Name) {
|
||||
private fun updateInfoDB(partyHashCode: SecureHash, partyName: CordaX500Name) {
|
||||
database.transaction {
|
||||
if (queryByPartyId(session, partyHashCode) == null) {
|
||||
session.save(DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo(partyHashCode, partyName.toString()))
|
||||
session.save(DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo(partyHashCode.toString(), partyName.toString()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun queryByCordaX500Name(session: Session, key: CordaX500Name): Long? {
|
||||
private fun queryByCordaX500Name(session: Session, key: CordaX500Name): SecureHash? {
|
||||
val query = session.createQuery(
|
||||
"FROM ${DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo::class.java.name} WHERE partyName = :partyName",
|
||||
DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo::class.java)
|
||||
query.setParameter("partyName", key.toString())
|
||||
return query.resultList.singleOrNull()?.partyId
|
||||
return query.resultList.singleOrNull()?.let { SecureHash.parse(it.partyId) }
|
||||
}
|
||||
|
||||
private fun queryByPartyId(session: Session, key: Long): CordaX500Name? {
|
||||
private fun queryByPartyId(session: Session, key: SecureHash): CordaX500Name? {
|
||||
val query = session.createQuery(
|
||||
"FROM ${DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo::class.java.name} WHERE partyId = :partyId",
|
||||
DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo::class.java)
|
||||
query.setParameter("partyId", key)
|
||||
query.setParameter("partyId", key.toString())
|
||||
return query.resultList.singleOrNull()?.partyName?.let { CordaX500Name.parse(it) }
|
||||
}
|
||||
}
|
@ -39,8 +39,8 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
@Immutable
|
||||
data class PersistentKey(
|
||||
/** PartyId of flow peer **/
|
||||
@Column(name = "peer_party_id", nullable = false)
|
||||
var peerPartyId: Long,
|
||||
@Column(name = "peer_party_id", length = 144, nullable = false)
|
||||
var peerPartyId: String,
|
||||
|
||||
@Column(name = "timestamp", nullable = false)
|
||||
var timestamp: Instant,
|
||||
@ -49,12 +49,12 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
var timestampDiscriminator: Int
|
||||
|
||||
) : Serializable {
|
||||
constructor(key: Key) : this(key.partyId, key.timestamp, key.timestampDiscriminator)
|
||||
constructor(key: Key) : this(key.partyId.toString(), key.timestamp, key.timestampDiscriminator)
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
@Entity
|
||||
@Table(name = "${NODE_DATABASE_PREFIX}sender_distribution_records")
|
||||
@Table(name = "${NODE_DATABASE_PREFIX}sender_distr_recs")
|
||||
data class DBSenderDistributionRecord(
|
||||
@EmbeddedId
|
||||
var compositeKey: PersistentKey,
|
||||
@ -69,7 +69,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
fun toSenderDistributionRecord() =
|
||||
SenderDistributionRecord(
|
||||
SecureHash.parse(this.txId),
|
||||
this.compositeKey.peerPartyId,
|
||||
SecureHash.parse(this.compositeKey.peerPartyId),
|
||||
this.statesToRecord,
|
||||
this.compositeKey.timestamp
|
||||
)
|
||||
@ -77,8 +77,8 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
|
||||
@CordaSerializable
|
||||
@Entity
|
||||
@Table(name = "${NODE_DATABASE_PREFIX}receiver_distribution_records")
|
||||
class DBReceiverDistributionRecord(
|
||||
@Table(name = "${NODE_DATABASE_PREFIX}receiver_distr_recs")
|
||||
data class DBReceiverDistributionRecord(
|
||||
@EmbeddedId
|
||||
var compositeKey: PersistentKey,
|
||||
|
||||
@ -104,7 +104,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
fun toReceiverDistributionRecord(): ReceiverDistributionRecord {
|
||||
return ReceiverDistributionRecord(
|
||||
SecureHash.parse(this.txId),
|
||||
this.compositeKey.peerPartyId,
|
||||
SecureHash.parse(this.compositeKey.peerPartyId),
|
||||
OpaqueBytes(this.distributionList),
|
||||
this.compositeKey.timestamp
|
||||
)
|
||||
@ -116,8 +116,8 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
data class DBRecoveryPartyInfo(
|
||||
@Id
|
||||
/** CordaX500Name hashCode() **/
|
||||
@Column(name = "party_id", nullable = false)
|
||||
var partyId: Long,
|
||||
@Column(name = "party_id", length = 144, nullable = false)
|
||||
var partyId: String,
|
||||
|
||||
/** CordaX500Name of party **/
|
||||
@Column(name = "party_name", nullable = false)
|
||||
@ -127,11 +127,11 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
data class TimestampKey(val timestamp: Instant, val timestampDiscriminator: Int)
|
||||
|
||||
class Key(
|
||||
val partyId: Long,
|
||||
val partyId: SecureHash,
|
||||
val timestamp: Instant,
|
||||
val timestampDiscriminator: Int = nextDiscriminatorNumber.andIncrement
|
||||
) {
|
||||
constructor(key: TimestampKey, partyId: Long): this(partyId, key.timestamp, key.timestampDiscriminator)
|
||||
constructor(key: TimestampKey, partyId: SecureHash): this(partyId, key.timestamp, key.timestampDiscriminator)
|
||||
companion object {
|
||||
val nextDiscriminatorNumber = AtomicInteger()
|
||||
}
|
||||
@ -155,7 +155,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
val hashedDistributionList = HashedDistributionList(
|
||||
distributionList.senderStatesToRecord,
|
||||
hashedPeersToStatesToRecord,
|
||||
HashedDistributionList.PublicHeader(senderRecordingTimestamp)
|
||||
HashedDistributionList.PublicHeader(senderRecordingTimestamp, timeDiscriminator)
|
||||
)
|
||||
hashedDistributionList.encrypt(encryptionService)
|
||||
}
|
||||
@ -170,7 +170,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
val publicHeader = HashedDistributionList.PublicHeader.unauthenticatedDeserialise(distributionList.opaqueData, encryptionService)
|
||||
database.transaction {
|
||||
val receiverDistributionRecord = DBReceiverDistributionRecord(
|
||||
Key(partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp),
|
||||
Key(partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp, publicHeader.timeDiscriminator),
|
||||
txId,
|
||||
distributionList.opaqueData,
|
||||
distributionList.receiverStatesToRecord
|
||||
@ -237,7 +237,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
excludingTxnIds.map { it.toString() }))))
|
||||
}
|
||||
if (peers.isNotEmpty()) {
|
||||
val peerPartyIds = peers.map { partyInfoCache.getPartyIdByCordaX500Name(it) }
|
||||
val peerPartyIds = peers.map { partyInfoCache.getPartyIdByCordaX500Name(it).toString() }
|
||||
predicates.add(criteriaBuilder.and(compositeKey.get<Long>(PersistentKey::peerPartyId.name).`in`(peerPartyIds)))
|
||||
}
|
||||
criteriaQuery.where(*predicates.toTypedArray())
|
||||
@ -275,7 +275,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
predicates.add(criteriaBuilder.and(criteriaBuilder.not(txId.`in`(excludingTxnIds.map { it.toString() }))))
|
||||
}
|
||||
if (initiators.isNotEmpty()) {
|
||||
val initiatorPartyIds = initiators.map(partyInfoCache::getPartyIdByCordaX500Name)
|
||||
val initiatorPartyIds = initiators.map { partyInfoCache.getPartyIdByCordaX500Name(it).toString() }
|
||||
predicates.add(criteriaBuilder.and(compositeKey.get<Long>(PersistentKey::peerPartyId.name).`in`(initiatorPartyIds)))
|
||||
}
|
||||
criteriaQuery.where(*predicates.toTypedArray())
|
||||
@ -319,7 +319,7 @@ abstract class DistributionRecord {
|
||||
@CordaSerializable
|
||||
data class SenderDistributionRecord(
|
||||
override val txId: SecureHash,
|
||||
val peerPartyId: Long, // CordaX500Name hashCode()
|
||||
val peerPartyId: SecureHash, // CordaX500Name hashCode()
|
||||
val statesToRecord: StatesToRecord,
|
||||
override val timestamp: Instant
|
||||
) : DistributionRecord()
|
||||
@ -327,7 +327,7 @@ data class SenderDistributionRecord(
|
||||
@CordaSerializable
|
||||
data class ReceiverDistributionRecord(
|
||||
override val txId: SecureHash,
|
||||
val initiatorPartyId: Long, // CordaX500Name hashCode()
|
||||
val initiatorPartyId: SecureHash, // CordaX500Name hashCode()
|
||||
val encryptedDistributionList: OpaqueBytes,
|
||||
override val timestamp: Instant
|
||||
) : DistributionRecord()
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.node.services.persistence
|
||||
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.node.services.EncryptionService
|
||||
@ -13,7 +14,7 @@ import java.time.Instant
|
||||
@CordaSerializable
|
||||
data class HashedDistributionList(
|
||||
val senderStatesToRecord: StatesToRecord,
|
||||
val peerHashToStatesToRecord: Map<Long, StatesToRecord>,
|
||||
val peerHashToStatesToRecord: Map<SecureHash, StatesToRecord>,
|
||||
val publicHeader: PublicHeader
|
||||
) {
|
||||
/**
|
||||
@ -28,7 +29,7 @@ data class HashedDistributionList(
|
||||
out.writeByte(senderStatesToRecord.ordinal)
|
||||
out.writeInt(peerHashToStatesToRecord.size)
|
||||
for (entry in peerHashToStatesToRecord) {
|
||||
out.writeLong(entry.key)
|
||||
entry.key.writeTo(out)
|
||||
out.writeByte(entry.value.ordinal)
|
||||
}
|
||||
return encryptionService.encrypt(baos.toByteArray(), publicHeader.serialise())
|
||||
@ -37,12 +38,14 @@ data class HashedDistributionList(
|
||||
|
||||
@CordaSerializable
|
||||
data class PublicHeader(
|
||||
val senderRecordedTimestamp: Instant
|
||||
val senderRecordedTimestamp: Instant,
|
||||
val timeDiscriminator: Int
|
||||
) {
|
||||
fun serialise(): ByteArray {
|
||||
val buffer = ByteBuffer.allocate(1 + java.lang.Long.BYTES)
|
||||
val buffer = ByteBuffer.allocate(1 + java.lang.Long.BYTES + Integer.BYTES)
|
||||
buffer.put(VERSION_TAG.toByte())
|
||||
buffer.putLong(senderRecordedTimestamp.toEpochMilli())
|
||||
buffer.putInt(timeDiscriminator)
|
||||
return buffer.array()
|
||||
}
|
||||
|
||||
@ -66,7 +69,8 @@ data class HashedDistributionList(
|
||||
val version = buffer.get().toInt()
|
||||
require(version == VERSION_TAG) { "Unknown distribution list format $version" }
|
||||
val senderRecordedTimestamp = Instant.ofEpochMilli(buffer.getLong())
|
||||
return PublicHeader(senderRecordedTimestamp)
|
||||
val timeDiscriminator = buffer.getInt()
|
||||
return PublicHeader(senderRecordedTimestamp, timeDiscriminator)
|
||||
} catch (e: Exception) {
|
||||
throw IllegalArgumentException("Corrupt or not a distribution list header", e)
|
||||
}
|
||||
@ -78,6 +82,7 @@ data class HashedDistributionList(
|
||||
// The version tag is serialised in the header, even though it is separate from the encrypted main body of the distribution list.
|
||||
// This is because the header and the dist list are cryptographically coupled and we want to avoid declaring the version field twice.
|
||||
private const val VERSION_TAG = 1
|
||||
private const val SECURE_HASH_LENGTH = 32
|
||||
private val statesToRecordValues = StatesToRecord.values() // Cache the enum values since .values() returns a new array each time.
|
||||
|
||||
/**
|
||||
@ -91,9 +96,11 @@ data class HashedDistributionList(
|
||||
try {
|
||||
val senderStatesToRecord = statesToRecordValues[input.readByte().toInt()]
|
||||
val numPeerHashToStatesToRecords = input.readInt()
|
||||
val peerHashToStatesToRecord = mutableMapOf<Long, StatesToRecord>()
|
||||
val peerHashToStatesToRecord = mutableMapOf<SecureHash, StatesToRecord>()
|
||||
repeat(numPeerHashToStatesToRecords) {
|
||||
peerHashToStatesToRecord[input.readLong()] = statesToRecordValues[input.readByte().toInt()]
|
||||
val secureHashBytes = ByteArray(SECURE_HASH_LENGTH)
|
||||
input.readFully(secureHashBytes)
|
||||
peerHashToStatesToRecord[SecureHash.createSHA256(secureHashBytes)] = statesToRecordValues[input.readByte().toInt()]
|
||||
}
|
||||
return HashedDistributionList(senderStatesToRecord, peerHashToStatesToRecord, publicHeader)
|
||||
} catch (e: Exception) {
|
||||
|
@ -21,6 +21,7 @@ import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.tee
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.internal.warnOnce
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.node.services.KeyManagementService
|
||||
@ -107,6 +108,8 @@ class NodeVaultService(
|
||||
|
||||
const val DEFAULT_SOFT_LOCKING_SQL_IN_CLAUSE_SIZE = 16
|
||||
|
||||
private val IGNORE_TRANSACTION_DESERIALIZATION_ERRORS = java.lang.Boolean.getBoolean("net.corda.vaultupdate.ignore.transaction.deserialization.errors")
|
||||
|
||||
/**
|
||||
* Establish whether a given state is relevant to a node, given the node's public keys.
|
||||
*
|
||||
@ -307,18 +310,29 @@ class NodeVaultService(
|
||||
|
||||
private fun makeUpdates(batch: Iterable<CoreTransaction>, statesToRecord: StatesToRecord, previouslySeen: Boolean): List<Vault.Update<ContractState>> {
|
||||
|
||||
fun <T> withValidDeserialization(list: List<T>, txId: SecureHash): Map<Int, T> = (0 until list.size).mapNotNull { idx ->
|
||||
try {
|
||||
idx to list[idx]
|
||||
} catch (e: TransactionDeserialisationException) {
|
||||
// When resolving transaction dependencies we might encounter contracts we haven't installed locally.
|
||||
// This will cause a failure as we can't deserialize such states in the context of the `appClassloader`.
|
||||
// For now we ignore these states.
|
||||
// In the future we will use the AttachmentsClassloader to correctly deserialize and asses the relevancy.
|
||||
log.warn("Could not deserialize state $idx from transaction $txId. Cause: $e")
|
||||
null
|
||||
}
|
||||
}.toMap()
|
||||
fun <T> withValidDeserialization(list: List<T>, txId: SecureHash): Map<Int, T> {
|
||||
var error: TransactionDeserialisationException? = null
|
||||
val map = (0 until list.size).mapNotNull { idx ->
|
||||
try {
|
||||
idx to list[idx]
|
||||
} catch (e: TransactionDeserialisationException) {
|
||||
// When resolving transaction dependencies we might encounter contracts we haven't installed locally.
|
||||
// This will cause a failure as we can't deserialize such states in the context of the `appClassloader`.
|
||||
// For now we ignore these states.
|
||||
// In the future we will use the AttachmentsClassloader to correctly deserialize and asses the relevancy.
|
||||
if (IGNORE_TRANSACTION_DESERIALIZATION_ERRORS) {
|
||||
log.warnOnce("The current usage of transaction deserialization for the vault is unsafe." +
|
||||
"Ignoring vault updates due to failed deserialized states may lead to severe problems with ledger consistency. ")
|
||||
log.warn("Could not deserialize state $idx from transaction $txId. Cause: $e")
|
||||
} else {
|
||||
log.error("Could not deserialize state $idx from transaction $txId. Cause: $e")
|
||||
if(error == null) error = e
|
||||
}
|
||||
null
|
||||
}
|
||||
}.toMap()
|
||||
return error?.let { throw it } ?: map
|
||||
}
|
||||
|
||||
// Returns only output states that can be deserialised successfully.
|
||||
fun WireTransaction.deserializableOutputStates(): Map<Int, TransactionState<ContractState>> = withValidDeserialization(this.outputs, this.id)
|
||||
@ -787,7 +801,7 @@ class NodeVaultService(
|
||||
|
||||
private fun <T : ContractState> queryTotalStateCount(criteria: QueryCriteria, contractStateType: Class<out T>): Long {
|
||||
val (criteriaQuery, criteriaParser) = buildCriteriaQuery<Long>(criteria, contractStateType, null)
|
||||
criteriaQuery.select(criteriaBuilder.countDistinct(criteriaParser.vaultStates))
|
||||
criteriaQuery.select(criteriaBuilder.count(criteriaParser.vaultStates))
|
||||
val query = getSession().createQuery(criteriaQuery)
|
||||
return query.singleResult
|
||||
}
|
||||
|
@ -11,7 +11,7 @@
|
||||
</changeSet>
|
||||
|
||||
<changeSet author="R3.Corda" id="create_sender_distribution_records_table">
|
||||
<createTable tableName="node_sender_distribution_records">
|
||||
<createTable tableName="node_sender_distr_recs">
|
||||
<column name="timestamp" type="TIMESTAMP">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
@ -21,7 +21,7 @@
|
||||
<column name="transaction_id" type="NVARCHAR(144)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="peer_party_id" type="BIGINT">
|
||||
<column name="peer_party_id" type="NVARCHAR(144)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="states_to_record" type="INT">
|
||||
@ -30,13 +30,13 @@
|
||||
</createTable>
|
||||
</changeSet>
|
||||
|
||||
<changeSet author="R3.Corda" id="node_sender_distribution_records_pkey">
|
||||
<addPrimaryKey columnNames="peer_party_id, timestamp, timestamp_discriminator" constraintName="node_sender_distribution_records_pkey"
|
||||
tableName="node_sender_distribution_records"/>
|
||||
<changeSet author="R3.Corda" id="node_sender_distr_recs_pkey">
|
||||
<addPrimaryKey columnNames="peer_party_id, timestamp, timestamp_discriminator" constraintName="node_sender_distr_recs_pkey"
|
||||
tableName="node_sender_distr_recs"/>
|
||||
</changeSet>
|
||||
|
||||
<changeSet author="R3.Corda" id="create_receiver_distribution_records_table">
|
||||
<createTable tableName="node_receiver_distribution_records">
|
||||
<createTable tableName="node_receiver_distr_recs">
|
||||
<column name="timestamp" type="TIMESTAMP">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
@ -46,7 +46,7 @@
|
||||
<column name="transaction_id" type="NVARCHAR(144)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="peer_party_id" type="BIGINT">
|
||||
<column name="peer_party_id" type="NVARCHAR(144)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="distribution_list" type="BLOB">
|
||||
@ -58,14 +58,14 @@
|
||||
</createTable>
|
||||
</changeSet>
|
||||
|
||||
<changeSet author="R3.Corda" id="node_receiver_distribution_records_pkey">
|
||||
<addPrimaryKey columnNames="peer_party_id, timestamp, timestamp_discriminator" constraintName="node_receiver_distribution_records_pkey"
|
||||
tableName="node_receiver_distribution_records"/>
|
||||
<changeSet author="R3.Corda" id="node_receiver_distr_recs_pkey">
|
||||
<addPrimaryKey columnNames="peer_party_id, timestamp, timestamp_discriminator" constraintName="node_receiver_distr_recs_pkey"
|
||||
tableName="node_receiver_distr_recs"/>
|
||||
</changeSet>
|
||||
|
||||
<changeSet author="R3.Corda" id="create_recovery_party_info_table">
|
||||
<createTable tableName="node_recovery_party_info">
|
||||
<column name="party_id" type="BIGINT">
|
||||
<column name="party_id" type="NVARCHAR(144)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="party_name" type="NVARCHAR(255)">
|
||||
@ -79,14 +79,14 @@
|
||||
</changeSet>
|
||||
|
||||
<changeSet author="R3.Corda" id="FK__sender_distribution_records__peer_party_id">
|
||||
<addForeignKeyConstraint baseColumnNames="peer_party_id" baseTableName="node_sender_distribution_records"
|
||||
constraintName="FK__sender_distribution_records__peer_party_id"
|
||||
<addForeignKeyConstraint baseColumnNames="peer_party_id" baseTableName="node_sender_distr_recs"
|
||||
constraintName="FK__send_distr__peer_party_id"
|
||||
referencedColumnNames="party_id" referencedTableName="node_recovery_party_info"/>
|
||||
</changeSet>
|
||||
|
||||
<changeSet author="R3.Corda" id="FK__receiver_distribution_records__peer_party_id">
|
||||
<addForeignKeyConstraint baseColumnNames="peer_party_id" baseTableName="node_receiver_distribution_records"
|
||||
constraintName="FK__receiver_distribution_records__peer_party_id"
|
||||
<addForeignKeyConstraint baseColumnNames="peer_party_id" baseTableName="node_receiver_distr_recs"
|
||||
constraintName="FK__recv_distr__peer_party_id"
|
||||
referencedColumnNames="party_id" referencedTableName="node_recovery_party_info"/>
|
||||
</changeSet>
|
||||
|
||||
|
@ -19,10 +19,4 @@
|
||||
<addPrimaryKey constraintName="node_aes_encryption_keys_pkey" tableName="node_aes_encryption_keys" columnNames="key_id"/>
|
||||
</changeSet>
|
||||
|
||||
<changeSet author="R3.Corda" id="node_aes_encryption_keys_idx">
|
||||
<createIndex indexName="node_aes_encryption_keys_idx" tableName="node_aes_encryption_keys">
|
||||
<column name="key_id"/>
|
||||
</createIndex>
|
||||
</changeSet>
|
||||
|
||||
</databaseChangeLog>
|
||||
|
@ -147,12 +147,12 @@ class DBTransactionStorageLedgerRecoveryTests {
|
||||
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
|
||||
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.SENDER).let {
|
||||
assertEquals(2, it.size)
|
||||
assertEquals(BOB_NAME.hashCode().toLong(), it.senderRecords[0].compositeKey.peerPartyId)
|
||||
assertEquals(SecureHash.sha256(BOB_NAME.toString()).toString(), it.senderRecords[0].compositeKey.peerPartyId)
|
||||
assertEquals(ALL_VISIBLE, it.senderRecords[0].statesToRecord)
|
||||
}
|
||||
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.RECEIVER).let {
|
||||
assertEquals(1, it.size)
|
||||
assertEquals(BOB_NAME.hashCode().toLong(), it.receiverRecords[0].compositeKey.peerPartyId)
|
||||
assertEquals(SecureHash.sha256(BOB_NAME.toString()).toString(), it.receiverRecords[0].compositeKey.peerPartyId)
|
||||
assertEquals(ALL_VISIBLE, (HashedDistributionList.decrypt(it.receiverRecords[0].distributionList, encryptionService)).peerHashToStatesToRecord.map { it.value }[0])
|
||||
}
|
||||
val resultsAll = transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.ALL)
|
||||
@ -319,8 +319,8 @@ class DBTransactionStorageLedgerRecoveryTests {
|
||||
fun `test lightweight serialization and deserialization of hashed distribution list payload`() {
|
||||
val hashedDistList = HashedDistributionList(
|
||||
ALL_VISIBLE,
|
||||
mapOf(BOB.name.hashCode().toLong() to NONE, CHARLIE_NAME.hashCode().toLong() to ONLY_RELEVANT),
|
||||
HashedDistributionList.PublicHeader(now())
|
||||
mapOf(SecureHash.sha256(BOB.name.toString()) to NONE, SecureHash.sha256(CHARLIE_NAME.toString()) to ONLY_RELEVANT),
|
||||
HashedDistributionList.PublicHeader(now(), 1)
|
||||
)
|
||||
val roundtrip = HashedDistributionList.decrypt(hashedDistList.encrypt(encryptionService), encryptionService)
|
||||
assertThat(roundtrip).isEqualTo(hashedDistList)
|
||||
|
@ -76,8 +76,7 @@ public class SharedMemoryIncremental extends PortAllocation {
|
||||
newValue = (oldValue + 1);
|
||||
}
|
||||
boolean reserveSuccess = UNSAFE.compareAndSwapLong(null, startingAddress, oldValue, newValue);
|
||||
boolean portAvailable = isLocalPortAvailable(newValue);
|
||||
loopSuccess = reserveSuccess && portAvailable;
|
||||
loopSuccess = reserveSuccess && isLocalPortAvailable(newValue);
|
||||
} while (!loopSuccess);
|
||||
|
||||
return (int) newValue;
|
||||
|
@ -36,7 +36,8 @@ data class NodeParameters(
|
||||
val additionalCordapps: Collection<TestCordapp> = emptySet(),
|
||||
val flowOverrides: Map<out Class<out FlowLogic<*>>, Class<out FlowLogic<*>>> = emptyMap(),
|
||||
val logLevelOverride: String? = null,
|
||||
val rpcAddress: NetworkHostAndPort? = null
|
||||
val rpcAddress: NetworkHostAndPort? = null,
|
||||
val systemProperties: Map<String, String> = emptyMap()
|
||||
) {
|
||||
/**
|
||||
* Create a new node parameters object with default values. Each parameter can be specified with its wither method which returns a copy
|
||||
@ -127,4 +128,97 @@ data class NodeParameters(
|
||||
flowOverrides = flowOverrides,
|
||||
logLevelOverride = logLevelOverride)
|
||||
|
||||
constructor(
|
||||
providedName: CordaX500Name?,
|
||||
rpcUsers: List<User>,
|
||||
verifierType: VerifierType,
|
||||
customOverrides: Map<String, Any?>,
|
||||
startInSameProcess: Boolean?,
|
||||
maximumHeapSize: String,
|
||||
additionalCordapps: Collection<TestCordapp> = emptySet(),
|
||||
flowOverrides: Map<out Class<out FlowLogic<*>>, Class<out FlowLogic<*>>>,
|
||||
logLevelOverride: String? = null
|
||||
) : this(
|
||||
providedName,
|
||||
rpcUsers,
|
||||
verifierType,
|
||||
customOverrides,
|
||||
startInSameProcess,
|
||||
maximumHeapSize,
|
||||
additionalCordapps,
|
||||
flowOverrides,
|
||||
logLevelOverride,
|
||||
rpcAddress = null)
|
||||
|
||||
@Suppress("LongParameterList")
|
||||
fun copy(
|
||||
providedName: CordaX500Name?,
|
||||
rpcUsers: List<User>,
|
||||
verifierType: VerifierType,
|
||||
customOverrides: Map<String, Any?>,
|
||||
startInSameProcess: Boolean?,
|
||||
maximumHeapSize: String,
|
||||
additionalCordapps: Collection<TestCordapp> = emptySet(),
|
||||
flowOverrides: Map<out Class<out FlowLogic<*>>, Class<out FlowLogic<*>>>,
|
||||
logLevelOverride: String? = null
|
||||
) = this.copy(
|
||||
providedName = providedName,
|
||||
rpcUsers = rpcUsers,
|
||||
verifierType = verifierType,
|
||||
customOverrides = customOverrides,
|
||||
startInSameProcess = startInSameProcess,
|
||||
maximumHeapSize = maximumHeapSize,
|
||||
additionalCordapps = additionalCordapps,
|
||||
flowOverrides = flowOverrides,
|
||||
logLevelOverride = logLevelOverride,
|
||||
rpcAddress = rpcAddress)
|
||||
|
||||
constructor(
|
||||
providedName: CordaX500Name?,
|
||||
rpcUsers: List<User>,
|
||||
verifierType: VerifierType,
|
||||
customOverrides: Map<String, Any?>,
|
||||
startInSameProcess: Boolean?,
|
||||
maximumHeapSize: String,
|
||||
additionalCordapps: Collection<TestCordapp> = emptySet(),
|
||||
flowOverrides: Map<out Class<out FlowLogic<*>>, Class<out FlowLogic<*>>>,
|
||||
logLevelOverride: String? = null,
|
||||
rpcAddress: NetworkHostAndPort? = null
|
||||
) : this(
|
||||
providedName,
|
||||
rpcUsers,
|
||||
verifierType,
|
||||
customOverrides,
|
||||
startInSameProcess,
|
||||
maximumHeapSize,
|
||||
additionalCordapps,
|
||||
flowOverrides,
|
||||
logLevelOverride,
|
||||
rpcAddress,
|
||||
systemProperties = emptyMap())
|
||||
|
||||
@Suppress("LongParameterList")
|
||||
fun copy(
|
||||
providedName: CordaX500Name?,
|
||||
rpcUsers: List<User>,
|
||||
verifierType: VerifierType,
|
||||
customOverrides: Map<String, Any?>,
|
||||
startInSameProcess: Boolean?,
|
||||
maximumHeapSize: String,
|
||||
additionalCordapps: Collection<TestCordapp> = emptySet(),
|
||||
flowOverrides: Map<out Class<out FlowLogic<*>>, Class<out FlowLogic<*>>>,
|
||||
logLevelOverride: String? = null,
|
||||
rpcAddress: NetworkHostAndPort? = null
|
||||
) = this.copy(
|
||||
providedName = providedName,
|
||||
rpcUsers = rpcUsers,
|
||||
verifierType = verifierType,
|
||||
customOverrides = customOverrides,
|
||||
startInSameProcess = startInSameProcess,
|
||||
maximumHeapSize = maximumHeapSize,
|
||||
additionalCordapps = additionalCordapps,
|
||||
flowOverrides = flowOverrides,
|
||||
logLevelOverride = logLevelOverride,
|
||||
rpcAddress = rpcAddress,
|
||||
systemProperties = systemProperties)
|
||||
}
|
||||
|
@ -755,7 +755,7 @@ class DriverDSLImpl(
|
||||
debugPort,
|
||||
bytemanJarPath,
|
||||
bytemanPort,
|
||||
systemProperties,
|
||||
systemProperties + parameters.systemProperties,
|
||||
parameters.maximumHeapSize,
|
||||
parameters.logLevelOverride,
|
||||
identifier,
|
||||
|
Loading…
Reference in New Issue
Block a user