diff --git a/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt b/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt index b8cd728224..cf91603133 100644 --- a/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt @@ -15,7 +15,7 @@ import net.corda.core.utilities.ProgressTracker @InitiatingFlow class LedgerRecoveryFlow( private val parameters: LedgerRecoveryParameters, - override val progressTracker: ProgressTracker = ProgressTracker()) : FlowLogic() { + override val progressTracker: ProgressTracker = ProgressTracker()) : FlowLogic() { @CordaInternal data class ExtraConstructorArgs(val parameters: LedgerRecoveryParameters) @@ -24,7 +24,7 @@ class LedgerRecoveryFlow( @Suspendable @Throws(LedgerRecoveryException::class) - override fun call(): Long { + override fun call(): LedgerRecoveryResult { throw NotImplementedError("Enterprise only feature") } } @@ -40,6 +40,7 @@ class ReceiveLedgerRecoveryFlow constructor(private val otherSideSession: FlowSe @CordaSerializable class LedgerRecoveryException(message: String) : FlowException("Ledger recovery failed: $message") +@CordaSerializable data class LedgerRecoveryParameters( val recoveryPeers: Collection, val timeWindow: RecoveryTimeWindow? = null, @@ -47,11 +48,18 @@ data class LedgerRecoveryParameters( val transactionRole: TransactionRole = TransactionRole.ALL, val dryRun: Boolean = false, val optimisticInitiatorRecovery: Boolean = false, - val useTimeWindowNarrowing: Boolean = false, + val useTimeWindowNarrowing: Boolean = true, val verboseLogging: Boolean = true, val recoveryBatchSize: Int = 1000 ) +@CordaSerializable +data class LedgerRecoveryResult( + val totalRecoveredRecords: Long, + val totalRecoveredTransactions: Long, + val totalErrors: Long +) + /** * This specifies which type of transactions to recover based on the transaction role of the recovering node */ diff --git a/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt b/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt index 3323d2d743..d10bbe0ebb 100644 --- a/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt @@ -132,7 +132,7 @@ open class DataVendingFlow(val otherSessions: Set, val payload: Any protected open fun isFinality(): Boolean = false - @Suppress("ComplexCondition", "ComplexMethod", "LongMethod") + @Suppress("ComplexCondition", "ComplexMethod", "LongMethod", "TooGenericExceptionThrown") @Suspendable override fun call(): Void? { val networkMaxMessageSize = serviceHub.networkParameters.maxMessageSize @@ -151,11 +151,14 @@ open class DataVendingFlow(val otherSessions: Set, val payload: Any is NotarisationPayload -> TransactionAuthorisationFilter().addAuthorised(getInputTransactions(payload.signedTransaction)) is SignedTransaction -> TransactionAuthorisationFilter().addAuthorised(getInputTransactions(payload)) is RetrieveAnyTransactionPayload -> TransactionAuthorisationFilter(acceptAll = true) - is List<*> -> TransactionAuthorisationFilter().addAuthorised(payload.flatMap { stateAndRef -> - if (stateAndRef is StateAndRef<*>) { - getInputTransactions(serviceHub.validatedTransactions.getTransaction(stateAndRef.ref.txhash)!!) + stateAndRef.ref.txhash + is List<*> -> TransactionAuthorisationFilter().addAuthorised(payload.flatMap { someObject -> + if (someObject is StateAndRef<*>) { + getInputTransactions(serviceHub.validatedTransactions.getTransaction(someObject.ref.txhash)!!) + someObject.ref.txhash + } + else if (someObject is NamedByHash) { + setOf(someObject.id) } else { - throw Exception("Unknown payload type: ${stateAndRef!!::class.java} ?") + throw Exception("Unknown payload type: ${someObject!!::class.java} ?") } }.toSet()) else -> throw Exception("Unknown payload type: ${payload::class.java} ?") diff --git a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt index 882ca901fe..053015fef9 100644 --- a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt @@ -26,6 +26,9 @@ class ResolveTransactionsFlow private constructor( constructor(txHashes: Set, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE) : this(null, txHashes, otherSide, statesToRecord) + constructor(txHashes: Set, otherSide: FlowSession, statesToRecord: StatesToRecord, deferredAck: Boolean) + : this(null, txHashes, otherSide, statesToRecord, deferredAck) + /** * Resolves and validates the dependencies of the specified [SignedTransaction]. Fetches the attachments, but does * *not* validate or store the [SignedTransaction] itself. diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt index a85084b288..84ea4869ca 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt @@ -78,7 +78,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, @CordaSerializable @Entity @Table(name = "${NODE_DATABASE_PREFIX}receiver_distr_recs") - class DBReceiverDistributionRecord( + data class DBReceiverDistributionRecord( @EmbeddedId var compositeKey: PersistentKey,