mirror of
https://github.com/corda/corda.git
synced 2024-12-21 22:07:55 +00:00
Merging forward updates from release/os/4.11 to release/os/4.12 - 2023-10-02
This commit is contained in:
commit
ac9b10ef81
@ -14,36 +14,17 @@ import net.corda.core.utilities.ProgressTracker
|
|||||||
@StartableByRPC
|
@StartableByRPC
|
||||||
@InitiatingFlow
|
@InitiatingFlow
|
||||||
class LedgerRecoveryFlow(
|
class LedgerRecoveryFlow(
|
||||||
private val recoveryPeers: Collection<Party>,
|
private val parameters: LedgerRecoveryParameters,
|
||||||
private val timeWindow: RecoveryTimeWindow? = null,
|
override val progressTracker: ProgressTracker = ProgressTracker()) : FlowLogic<Long>() {
|
||||||
private val useAllNetworkNodes: Boolean = false,
|
|
||||||
private val transactionRole: TransactionRole = TransactionRole.ALL,
|
|
||||||
private val dryRun: Boolean = false,
|
|
||||||
private val optimisticInitiatorRecovery: Boolean = false,
|
|
||||||
override val progressTracker: ProgressTracker = ProgressTracker()) : FlowLogic<Map<SecureHash, RecoveryResult>>() {
|
|
||||||
|
|
||||||
@CordaInternal
|
@CordaInternal
|
||||||
data class ExtraConstructorArgs(val recoveryPeers: Collection<Party>,
|
data class ExtraConstructorArgs(val parameters: LedgerRecoveryParameters)
|
||||||
val timeWindow: RecoveryTimeWindow? = null,
|
|
||||||
val useAllNetworkNodes: Boolean,
|
|
||||||
val transactionRole: TransactionRole,
|
|
||||||
val dryRun: Boolean,
|
|
||||||
val optimisticInitiatorRecovery: Boolean)
|
|
||||||
@CordaInternal
|
@CordaInternal
|
||||||
fun getExtraConstructorArgs() = ExtraConstructorArgs(recoveryPeers, timeWindow, useAllNetworkNodes, transactionRole, dryRun, optimisticInitiatorRecovery)
|
fun getExtraConstructorArgs() = ExtraConstructorArgs(parameters)
|
||||||
|
|
||||||
// unused constructors added to facilitate Node Shell command invocation
|
|
||||||
constructor(recoveryPeer: Party, timeWindow: RecoveryTimeWindow?) : this(setOf(recoveryPeer), timeWindow, false, TransactionRole.ALL, false, false)
|
|
||||||
constructor(recoveryPeer: Party, timeWindow: RecoveryTimeWindow?, dryRun: Boolean) : this(setOf(recoveryPeer), timeWindow, false, TransactionRole.ALL, dryRun, false)
|
|
||||||
|
|
||||||
constructor(timeWindow: RecoveryTimeWindow?, dryRun: Boolean) : this(emptySet(), timeWindow, false, TransactionRole.ALL, dryRun, false)
|
|
||||||
constructor(timeWindow: RecoveryTimeWindow?, dryRun: Boolean, optimisticInitiatorRecovery: Boolean) : this(emptySet(), timeWindow, false, TransactionRole.ALL, dryRun, optimisticInitiatorRecovery)
|
|
||||||
constructor(recoveryPeers: Collection<Party>, timeWindow: RecoveryTimeWindow?, dryRun: Boolean) : this(recoveryPeers, timeWindow, false, TransactionRole.ALL, dryRun, false)
|
|
||||||
constructor(recoveryPeers: Collection<Party>, timeWindow: RecoveryTimeWindow?, dryRun: Boolean, optimisticInitiatorRecovery: Boolean) : this(recoveryPeers, timeWindow, false, TransactionRole.ALL, dryRun, optimisticInitiatorRecovery)
|
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
@Throws(LedgerRecoveryException::class)
|
@Throws(LedgerRecoveryException::class)
|
||||||
override fun call(): Map<SecureHash, RecoveryResult> {
|
override fun call(): Long {
|
||||||
throw NotImplementedError("Enterprise only feature")
|
throw NotImplementedError("Enterprise only feature")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -59,6 +40,18 @@ class ReceiveLedgerRecoveryFlow constructor(private val otherSideSession: FlowSe
|
|||||||
@CordaSerializable
|
@CordaSerializable
|
||||||
class LedgerRecoveryException(message: String) : FlowException("Ledger recovery failed: $message")
|
class LedgerRecoveryException(message: String) : FlowException("Ledger recovery failed: $message")
|
||||||
|
|
||||||
|
data class LedgerRecoveryParameters(
|
||||||
|
val recoveryPeers: Collection<Party>,
|
||||||
|
val timeWindow: RecoveryTimeWindow? = null,
|
||||||
|
val useAllNetworkNodes: Boolean = false,
|
||||||
|
val transactionRole: TransactionRole = TransactionRole.ALL,
|
||||||
|
val dryRun: Boolean = false,
|
||||||
|
val optimisticInitiatorRecovery: Boolean = false,
|
||||||
|
val useTimeWindowNarrowing: Boolean = false,
|
||||||
|
val verboseLogging: Boolean = true,
|
||||||
|
val recoveryBatchSize: Int = 1000
|
||||||
|
)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This specifies which type of transactions to recover based on the transaction role of the recovering node
|
* This specifies which type of transactions to recover based on the transaction role of the recovering node
|
||||||
*/
|
*/
|
||||||
@ -80,6 +73,3 @@ data class RecoveryResult(
|
|||||||
val synchronisedInitiated: Boolean = false, // only attempted if [optimisticInitiatorRecovery] option set to true and [TransactionRecoveryType.INITIATOR]
|
val synchronisedInitiated: Boolean = false, // only attempted if [optimisticInitiatorRecovery] option set to true and [TransactionRecoveryType.INITIATOR]
|
||||||
val failureCause: String? = null // reason why a transaction failed to synchronise
|
val failureCause: String? = null // reason why a transaction failed to synchronise
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -155,7 +155,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
|||||||
val hashedDistributionList = HashedDistributionList(
|
val hashedDistributionList = HashedDistributionList(
|
||||||
distributionList.senderStatesToRecord,
|
distributionList.senderStatesToRecord,
|
||||||
hashedPeersToStatesToRecord,
|
hashedPeersToStatesToRecord,
|
||||||
HashedDistributionList.PublicHeader(senderRecordingTimestamp)
|
HashedDistributionList.PublicHeader(senderRecordingTimestamp, timeDiscriminator)
|
||||||
)
|
)
|
||||||
hashedDistributionList.encrypt(encryptionService)
|
hashedDistributionList.encrypt(encryptionService)
|
||||||
}
|
}
|
||||||
@ -170,7 +170,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
|||||||
val publicHeader = HashedDistributionList.PublicHeader.unauthenticatedDeserialise(distributionList.opaqueData, encryptionService)
|
val publicHeader = HashedDistributionList.PublicHeader.unauthenticatedDeserialise(distributionList.opaqueData, encryptionService)
|
||||||
database.transaction {
|
database.transaction {
|
||||||
val receiverDistributionRecord = DBReceiverDistributionRecord(
|
val receiverDistributionRecord = DBReceiverDistributionRecord(
|
||||||
Key(partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp),
|
Key(partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp, publicHeader.timeDiscriminator),
|
||||||
txId,
|
txId,
|
||||||
distributionList.opaqueData,
|
distributionList.opaqueData,
|
||||||
distributionList.receiverStatesToRecord
|
distributionList.receiverStatesToRecord
|
||||||
|
@ -38,12 +38,14 @@ data class HashedDistributionList(
|
|||||||
|
|
||||||
@CordaSerializable
|
@CordaSerializable
|
||||||
data class PublicHeader(
|
data class PublicHeader(
|
||||||
val senderRecordedTimestamp: Instant
|
val senderRecordedTimestamp: Instant,
|
||||||
|
val timeDiscriminator: Int
|
||||||
) {
|
) {
|
||||||
fun serialise(): ByteArray {
|
fun serialise(): ByteArray {
|
||||||
val buffer = ByteBuffer.allocate(1 + java.lang.Long.BYTES)
|
val buffer = ByteBuffer.allocate(1 + java.lang.Long.BYTES + Integer.BYTES)
|
||||||
buffer.put(VERSION_TAG.toByte())
|
buffer.put(VERSION_TAG.toByte())
|
||||||
buffer.putLong(senderRecordedTimestamp.toEpochMilli())
|
buffer.putLong(senderRecordedTimestamp.toEpochMilli())
|
||||||
|
buffer.putInt(timeDiscriminator)
|
||||||
return buffer.array()
|
return buffer.array()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,7 +69,8 @@ data class HashedDistributionList(
|
|||||||
val version = buffer.get().toInt()
|
val version = buffer.get().toInt()
|
||||||
require(version == VERSION_TAG) { "Unknown distribution list format $version" }
|
require(version == VERSION_TAG) { "Unknown distribution list format $version" }
|
||||||
val senderRecordedTimestamp = Instant.ofEpochMilli(buffer.getLong())
|
val senderRecordedTimestamp = Instant.ofEpochMilli(buffer.getLong())
|
||||||
return PublicHeader(senderRecordedTimestamp)
|
val timeDiscriminator = buffer.getInt()
|
||||||
|
return PublicHeader(senderRecordedTimestamp, timeDiscriminator)
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
throw IllegalArgumentException("Corrupt or not a distribution list header", e)
|
throw IllegalArgumentException("Corrupt or not a distribution list header", e)
|
||||||
}
|
}
|
||||||
|
@ -320,7 +320,7 @@ class DBTransactionStorageLedgerRecoveryTests {
|
|||||||
val hashedDistList = HashedDistributionList(
|
val hashedDistList = HashedDistributionList(
|
||||||
ALL_VISIBLE,
|
ALL_VISIBLE,
|
||||||
mapOf(SecureHash.sha256(BOB.name.toString()) to NONE, SecureHash.sha256(CHARLIE_NAME.toString()) to ONLY_RELEVANT),
|
mapOf(SecureHash.sha256(BOB.name.toString()) to NONE, SecureHash.sha256(CHARLIE_NAME.toString()) to ONLY_RELEVANT),
|
||||||
HashedDistributionList.PublicHeader(now())
|
HashedDistributionList.PublicHeader(now(), 1)
|
||||||
)
|
)
|
||||||
val roundtrip = HashedDistributionList.decrypt(hashedDistList.encrypt(encryptionService), encryptionService)
|
val roundtrip = HashedDistributionList.decrypt(hashedDistList.encrypt(encryptionService), encryptionService)
|
||||||
assertThat(roundtrip).isEqualTo(hashedDistList)
|
assertThat(roundtrip).isEqualTo(hashedDistList)
|
||||||
|
Loading…
Reference in New Issue
Block a user