diff --git a/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt b/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt index 41a869e2ff..b8cd728224 100644 --- a/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt @@ -14,36 +14,17 @@ import net.corda.core.utilities.ProgressTracker @StartableByRPC @InitiatingFlow class LedgerRecoveryFlow( - private val recoveryPeers: Collection, - private val timeWindow: RecoveryTimeWindow? = null, - private val useAllNetworkNodes: Boolean = false, - private val transactionRole: TransactionRole = TransactionRole.ALL, - private val dryRun: Boolean = false, - private val optimisticInitiatorRecovery: Boolean = false, - override val progressTracker: ProgressTracker = ProgressTracker()) : FlowLogic>() { + private val parameters: LedgerRecoveryParameters, + override val progressTracker: ProgressTracker = ProgressTracker()) : FlowLogic() { @CordaInternal - data class ExtraConstructorArgs(val recoveryPeers: Collection, - val timeWindow: RecoveryTimeWindow? = null, - val useAllNetworkNodes: Boolean, - val transactionRole: TransactionRole, - val dryRun: Boolean, - val optimisticInitiatorRecovery: Boolean) + data class ExtraConstructorArgs(val parameters: LedgerRecoveryParameters) @CordaInternal - fun getExtraConstructorArgs() = ExtraConstructorArgs(recoveryPeers, timeWindow, useAllNetworkNodes, transactionRole, dryRun, optimisticInitiatorRecovery) - - // unused constructors added to facilitate Node Shell command invocation - constructor(recoveryPeer: Party, timeWindow: RecoveryTimeWindow?) : this(setOf(recoveryPeer), timeWindow, false, TransactionRole.ALL, false, false) - constructor(recoveryPeer: Party, timeWindow: RecoveryTimeWindow?, dryRun: Boolean) : this(setOf(recoveryPeer), timeWindow, false, TransactionRole.ALL, dryRun, false) - - constructor(timeWindow: RecoveryTimeWindow?, dryRun: Boolean) : this(emptySet(), timeWindow, false, TransactionRole.ALL, dryRun, false) - constructor(timeWindow: RecoveryTimeWindow?, dryRun: Boolean, optimisticInitiatorRecovery: Boolean) : this(emptySet(), timeWindow, false, TransactionRole.ALL, dryRun, optimisticInitiatorRecovery) - constructor(recoveryPeers: Collection, timeWindow: RecoveryTimeWindow?, dryRun: Boolean) : this(recoveryPeers, timeWindow, false, TransactionRole.ALL, dryRun, false) - constructor(recoveryPeers: Collection, timeWindow: RecoveryTimeWindow?, dryRun: Boolean, optimisticInitiatorRecovery: Boolean) : this(recoveryPeers, timeWindow, false, TransactionRole.ALL, dryRun, optimisticInitiatorRecovery) + fun getExtraConstructorArgs() = ExtraConstructorArgs(parameters) @Suspendable @Throws(LedgerRecoveryException::class) - override fun call(): Map { + override fun call(): Long { throw NotImplementedError("Enterprise only feature") } } @@ -59,6 +40,18 @@ class ReceiveLedgerRecoveryFlow constructor(private val otherSideSession: FlowSe @CordaSerializable class LedgerRecoveryException(message: String) : FlowException("Ledger recovery failed: $message") +data class LedgerRecoveryParameters( + val recoveryPeers: Collection, + val timeWindow: RecoveryTimeWindow? = null, + val useAllNetworkNodes: Boolean = false, + val transactionRole: TransactionRole = TransactionRole.ALL, + val dryRun: Boolean = false, + val optimisticInitiatorRecovery: Boolean = false, + val useTimeWindowNarrowing: Boolean = false, + val verboseLogging: Boolean = true, + val recoveryBatchSize: Int = 1000 +) + /** * This specifies which type of transactions to recover based on the transaction role of the recovering node */ @@ -80,6 +73,3 @@ data class RecoveryResult( val synchronisedInitiated: Boolean = false, // only attempted if [optimisticInitiatorRecovery] option set to true and [TransactionRecoveryType.INITIATOR] val failureCause: String? = null // reason why a transaction failed to synchronise ) - - - diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt index aafd27503e..a85084b288 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt @@ -155,7 +155,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, val hashedDistributionList = HashedDistributionList( distributionList.senderStatesToRecord, hashedPeersToStatesToRecord, - HashedDistributionList.PublicHeader(senderRecordingTimestamp) + HashedDistributionList.PublicHeader(senderRecordingTimestamp, timeDiscriminator) ) hashedDistributionList.encrypt(encryptionService) } @@ -170,7 +170,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, val publicHeader = HashedDistributionList.PublicHeader.unauthenticatedDeserialise(distributionList.opaqueData, encryptionService) database.transaction { val receiverDistributionRecord = DBReceiverDistributionRecord( - Key(partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp), + Key(partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp, publicHeader.timeDiscriminator), txId, distributionList.opaqueData, distributionList.receiverStatesToRecord diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/HashedDistributionList.kt b/node/src/main/kotlin/net/corda/node/services/persistence/HashedDistributionList.kt index 4f284f11b0..5fee0f24b2 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/HashedDistributionList.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/HashedDistributionList.kt @@ -38,12 +38,14 @@ data class HashedDistributionList( @CordaSerializable data class PublicHeader( - val senderRecordedTimestamp: Instant + val senderRecordedTimestamp: Instant, + val timeDiscriminator: Int ) { fun serialise(): ByteArray { - val buffer = ByteBuffer.allocate(1 + java.lang.Long.BYTES) + val buffer = ByteBuffer.allocate(1 + java.lang.Long.BYTES + Integer.BYTES) buffer.put(VERSION_TAG.toByte()) buffer.putLong(senderRecordedTimestamp.toEpochMilli()) + buffer.putInt(timeDiscriminator) return buffer.array() } @@ -67,7 +69,8 @@ data class HashedDistributionList( val version = buffer.get().toInt() require(version == VERSION_TAG) { "Unknown distribution list format $version" } val senderRecordedTimestamp = Instant.ofEpochMilli(buffer.getLong()) - return PublicHeader(senderRecordedTimestamp) + val timeDiscriminator = buffer.getInt() + return PublicHeader(senderRecordedTimestamp, timeDiscriminator) } catch (e: Exception) { throw IllegalArgumentException("Corrupt or not a distribution list header", e) } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt index debc39d102..85e086ad61 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt @@ -320,7 +320,7 @@ class DBTransactionStorageLedgerRecoveryTests { val hashedDistList = HashedDistributionList( ALL_VISIBLE, mapOf(SecureHash.sha256(BOB.name.toString()) to NONE, SecureHash.sha256(CHARLIE_NAME.toString()) to ONLY_RELEVANT), - HashedDistributionList.PublicHeader(now()) + HashedDistributionList.PublicHeader(now(), 1) ) val roundtrip = HashedDistributionList.decrypt(hashedDistList.encrypt(encryptionService), encryptionService) assertThat(roundtrip).isEqualTo(hashedDistList)