From 198133492185f019b2f79dc22ab751b568bf6146 Mon Sep 17 00:00:00 2001 From: Jose Coll Date: Fri, 13 Oct 2023 11:26:07 +0100 Subject: [PATCH 1/2] ENT-10110 Clean-up. (#7530) --- .../net/corda/core/flows/LedgerRecoverFlow.kt | 35 ------------------- 1 file changed, 35 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt b/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt index cf91603133..32c0ff3866 100644 --- a/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/LedgerRecoverFlow.kt @@ -2,8 +2,6 @@ package net.corda.core.flows import co.paralleluniverse.fibers.Suspendable import net.corda.core.CordaInternal -import net.corda.core.crypto.SecureHash -import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.serialization.CordaSerializable import net.corda.core.utilities.ProgressTracker @@ -12,7 +10,6 @@ import net.corda.core.utilities.ProgressTracker * Ledger Recovery Flow (available in Enterprise only). */ @StartableByRPC -@InitiatingFlow class LedgerRecoveryFlow( private val parameters: LedgerRecoveryParameters, override val progressTracker: ProgressTracker = ProgressTracker()) : FlowLogic() { @@ -29,14 +26,6 @@ class LedgerRecoveryFlow( } } -@InitiatedBy(LedgerRecoveryFlow::class) -class ReceiveLedgerRecoveryFlow constructor(private val otherSideSession: FlowSession) : FlowLogic() { - @Suspendable - override fun call() { - throw NotImplementedError("Enterprise only feature") - } -} - @CordaSerializable class LedgerRecoveryException(message: String) : FlowException("Ledger recovery failed: $message") @@ -45,9 +34,7 @@ data class LedgerRecoveryParameters( val recoveryPeers: Collection, val timeWindow: RecoveryTimeWindow? = null, val useAllNetworkNodes: Boolean = false, - val transactionRole: TransactionRole = TransactionRole.ALL, val dryRun: Boolean = false, - val optimisticInitiatorRecovery: Boolean = false, val useTimeWindowNarrowing: Boolean = true, val verboseLogging: Boolean = true, val recoveryBatchSize: Int = 1000 @@ -59,25 +46,3 @@ data class LedgerRecoveryResult( val totalRecoveredTransactions: Long, val totalErrors: Long ) - -/** - * This specifies which type of transactions to recover based on the transaction role of the recovering node - */ -@CordaSerializable -enum class TransactionRole { - ALL, - INITIATOR, // only recover transactions that I initiated - PEER, // only recover transactions where I am a participant on a transaction - OBSERVER, // only recover transactions where I am an observer (but not participant) to a transaction - PEER_AND_OBSERVER // recovery transactions where I am either participant or observer -} - -@CordaSerializable -data class RecoveryResult( - val transactionId: SecureHash, - val recoveryPeer: CordaX500Name, - val transactionRole: TransactionRole, // what role did I play in this transaction - val synchronised: Boolean, // whether the transaction was successfully synchronised (will always be false when dryRun option specified) - val synchronisedInitiated: Boolean = false, // only attempted if [optimisticInitiatorRecovery] option set to true and [TransactionRecoveryType.INITIATOR] - val failureCause: String? = null // reason why a transaction failed to synchronise -) From 6a2bad8077811da334f3fc66ba305f20044edafc Mon Sep 17 00:00:00 2001 From: Jose Coll Date: Tue, 17 Oct 2023 07:03:49 +0100 Subject: [PATCH 2/2] ENT-10110 Back-port changes from ENT + additional clean-up (#7532) --- .ci/api-current.txt | 10 ++ .../coretests/flows/FinalityFlowTests.kt | 10 +- .../net/corda/core/flows/FlowTransaction.kt | 78 --------- .../net/corda/core/flows/RecoveryTypes.kt | 150 ++++++++++++++++++ .../DBTransactionStorageLedgerRecovery.kt | 91 ++++------- ...DBTransactionStorageLedgerRecoveryTests.kt | 41 +++-- 6 files changed, 223 insertions(+), 157 deletions(-) delete mode 100644 core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt create mode 100644 core/src/main/kotlin/net/corda/core/flows/RecoveryTypes.kt diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 1c01aec0a6..a7f5449ff7 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -2580,6 +2580,16 @@ public static final class net.corda.core.flows.DistributionList$SenderDistributi public int hashCode() @NotNull public String toString() +@CordaSerializable +public abstract class net.corda.core.flows.DistributionRecord extends java.lang.Object implements net.corda.core.contracts.NamedByHash + public () + @NotNull + public abstract net.corda.core.crypto.SecureHash getPeerPartyId() + @NotNull + public abstract java.time.Instant getTimestamp() + public abstract int getTimestampDiscriminator() + @NotNull + public abstract net.corda.core.crypto.SecureHash getTxId() ## @InitiatingFlow public final class net.corda.core.flows.FinalityFlow extends net.corda.core.flows.FlowLogic diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt index 0c2c265efb..8048c99dbd 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt @@ -21,7 +21,9 @@ import net.corda.core.flows.NotaryException import net.corda.core.flows.NotarySigCheck import net.corda.core.flows.ReceiveFinalityFlow import net.corda.core.flows.ReceiveTransactionFlow +import net.corda.core.flows.ReceiverDistributionRecord import net.corda.core.flows.SendTransactionFlow +import net.corda.core.flows.SenderDistributionRecord import net.corda.core.flows.StartableByRPC import net.corda.core.flows.TransactionStatus import net.corda.core.flows.UnexpectedFlowEndException @@ -53,8 +55,6 @@ import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.DBTransactionStorageLedgerRecovery.DBReceiverDistributionRecord import net.corda.node.services.persistence.DBTransactionStorageLedgerRecovery.DBSenderDistributionRecord import net.corda.node.services.persistence.HashedDistributionList -import net.corda.node.services.persistence.ReceiverDistributionRecord -import net.corda.node.services.persistence.SenderDistributionRecord import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.testing.contracts.DummyContract import net.corda.testing.core.ALICE_NAME @@ -361,7 +361,7 @@ class FinalityFlowTests : WithFinality { assertNotNull(this) val hashedDL = HashedDistributionList.decrypt(this!!.encryptedDistributionList.bytes, aliceNode.internals.encryptionService) assertEquals(StatesToRecord.ONLY_RELEVANT, hashedDL.senderStatesToRecord) - assertEquals(SecureHash.sha256(aliceNode.info.singleIdentity().name.toString()), this.initiatorPartyId) + assertEquals(SecureHash.sha256(aliceNode.info.singleIdentity().name.toString()), this.peerPartyId) assertEquals(mapOf(SecureHash.sha256(BOB_NAME.toString()) to StatesToRecord.ALL_VISIBLE), hashedDL.peerHashToStatesToRecord) } validateSenderAndReceiverTimestamps(sdrs, rdr!!) @@ -396,7 +396,7 @@ class FinalityFlowTests : WithFinality { assertNotNull(this) val hashedDL = HashedDistributionList.decrypt(this!!.encryptedDistributionList.bytes, aliceNode.internals.encryptionService) assertEquals(StatesToRecord.ONLY_RELEVANT, hashedDL.senderStatesToRecord) - assertEquals(SecureHash.sha256(aliceNode.info.singleIdentity().name.toString()), this.initiatorPartyId) + assertEquals(SecureHash.sha256(aliceNode.info.singleIdentity().name.toString()), this.peerPartyId) // note: Charlie assertion here is using the hinted StatesToRecord value passed to it from Alice assertEquals(mapOf( SecureHash.sha256(BOB_NAME.toString()) to StatesToRecord.ONLY_RELEVANT, @@ -458,7 +458,7 @@ class FinalityFlowTests : WithFinality { assertNotNull(this) val hashedDL = HashedDistributionList.decrypt(this!!.encryptedDistributionList.bytes, aliceNode.internals.encryptionService) assertEquals(StatesToRecord.ONLY_RELEVANT, hashedDL.senderStatesToRecord) - assertEquals(SecureHash.sha256(aliceNode.info.singleIdentity().name.toString()), this.initiatorPartyId) + assertEquals(SecureHash.sha256(aliceNode.info.singleIdentity().name.toString()), this.peerPartyId) assertEquals(mapOf(SecureHash.sha256(BOB_NAME.toString()) to StatesToRecord.ONLY_RELEVANT), hashedDL.peerHashToStatesToRecord) } validateSenderAndReceiverTimestamps(sdr, rdr!!) diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt b/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt deleted file mode 100644 index b213c6dbd0..0000000000 --- a/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt +++ /dev/null @@ -1,78 +0,0 @@ -package net.corda.core.flows - -import net.corda.core.identity.CordaX500Name -import net.corda.core.node.StatesToRecord -import net.corda.core.serialization.CordaSerializable -import java.time.Instant - -/** - * Flow data object representing key information required for recovery. - */ - -@CordaSerializable -data class FlowTransactionInfo( - val stateMachineRunId: StateMachineRunId, - val txId: String, - val status: TransactionStatus, - val timestamp: Instant, - val metadata: TransactionMetadata? -) { - fun isInitiator(myCordaX500Name: CordaX500Name) = - this.metadata?.initiator == myCordaX500Name -} - -@CordaSerializable -data class TransactionMetadata( - val initiator: CordaX500Name, - val distributionList: DistributionList -) - -@CordaSerializable -sealed class DistributionList { - - @CordaSerializable - data class SenderDistributionList( - val senderStatesToRecord: StatesToRecord, - val peersToStatesToRecord: Map - ) : DistributionList() - - @CordaSerializable - data class ReceiverDistributionList( - val opaqueData: ByteArray, // decipherable only by sender - val receiverStatesToRecord: StatesToRecord // inferred or actual - ) : DistributionList() -} - -@CordaSerializable -enum class TransactionStatus { - UNVERIFIED, - VERIFIED, - IN_FLIGHT; -} - -@CordaSerializable -data class RecoveryTimeWindow(val fromTime: Instant, val untilTime: Instant = Instant.now()) { - - init { - if (untilTime < fromTime) { - throw IllegalArgumentException("$fromTime must be before $untilTime") - } - } - - companion object { - @JvmStatic - fun between(fromTime: Instant, untilTime: Instant): RecoveryTimeWindow { - return RecoveryTimeWindow(fromTime, untilTime) - } - - @JvmStatic - fun fromOnly(fromTime: Instant): RecoveryTimeWindow { - return RecoveryTimeWindow(fromTime = fromTime) - } - - @JvmStatic - fun untilOnly(untilTime: Instant): RecoveryTimeWindow { - return RecoveryTimeWindow(fromTime = Instant.EPOCH, untilTime = untilTime) - } - } -} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/flows/RecoveryTypes.kt b/core/src/main/kotlin/net/corda/core/flows/RecoveryTypes.kt new file mode 100644 index 0000000000..2fbdb9c1fe --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/flows/RecoveryTypes.kt @@ -0,0 +1,150 @@ +package net.corda.core.flows + +import net.corda.core.contracts.NamedByHash +import net.corda.core.crypto.SecureHash +import net.corda.core.identity.CordaX500Name +import net.corda.core.node.StatesToRecord +import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.OpaqueBytes +import java.time.Instant +import java.time.temporal.ChronoUnit + +/** + * Transaction recovery type information. + */ + +@CordaSerializable +data class FlowTransactionInfo( + val stateMachineRunId: StateMachineRunId, + val txId: String, + val status: TransactionStatus, + val timestamp: Instant, + val metadata: TransactionMetadata? +) { + fun isInitiator(myCordaX500Name: CordaX500Name) = + this.metadata?.initiator == myCordaX500Name +} + +@CordaSerializable +data class TransactionMetadata( + val initiator: CordaX500Name, + val distributionList: DistributionList +) + +@CordaSerializable +sealed class DistributionList { + + @CordaSerializable + data class SenderDistributionList( + val senderStatesToRecord: StatesToRecord, + val peersToStatesToRecord: Map + ) : DistributionList() + + @CordaSerializable + data class ReceiverDistributionList( + val opaqueData: ByteArray, // decipherable only by sender + val receiverStatesToRecord: StatesToRecord // inferred or actual + ) : DistributionList() +} + +@CordaSerializable +enum class TransactionStatus { + UNVERIFIED, + VERIFIED, + IN_FLIGHT; +} + +@CordaSerializable +class DistributionRecords( + val senderRecords: List = emptyList(), + val receiverRecords: List = emptyList() +) { + val size = senderRecords.size + receiverRecords.size +} + +@CordaSerializable +abstract class DistributionRecord : NamedByHash { + abstract val txId: SecureHash + abstract val peerPartyId: SecureHash + abstract val timestamp: Instant + abstract val timestampDiscriminator: Int +} + +@CordaSerializable +data class SenderDistributionRecord( + override val txId: SecureHash, + override val peerPartyId: SecureHash, + override val timestamp: Instant, + override val timestampDiscriminator: Int, + val senderStatesToRecord: StatesToRecord, + val receiverStatesToRecord: StatesToRecord +) : DistributionRecord() { + override val id: SecureHash + get() = this.txId +} + +@CordaSerializable +data class ReceiverDistributionRecord( + override val txId: SecureHash, + override val peerPartyId: SecureHash, + override val timestamp: Instant, + override val timestampDiscriminator: Int, + val encryptedDistributionList: OpaqueBytes, + val receiverStatesToRecord: StatesToRecord +) : DistributionRecord() { + override val id: SecureHash + get() = this.txId +} + +@CordaSerializable +enum class DistributionRecordType { + SENDER, RECEIVER, ALL +} +@CordaSerializable +data class DistributionRecordKey( + val txnId: SecureHash, + val timestamp: Instant, + val timestampDiscriminator: Int +) + +@CordaSerializable +data class RecoveryTimeWindow(val fromTime: Instant, val untilTime: Instant = Instant.now()) { + + init { + if (untilTime < fromTime) { + throw IllegalArgumentException("$fromTime must be before $untilTime") + } + } + + companion object { + @JvmStatic + fun between(fromTime: Instant, untilTime: Instant): RecoveryTimeWindow { + return RecoveryTimeWindow(fromTime, untilTime) + } + + @JvmStatic + fun fromOnly(fromTime: Instant): RecoveryTimeWindow { + return RecoveryTimeWindow(fromTime = fromTime) + } + + @JvmStatic + fun untilOnly(untilTime: Instant): RecoveryTimeWindow { + return RecoveryTimeWindow(fromTime = Instant.EPOCH, untilTime = untilTime) + } + } +} + +@CordaSerializable +data class ComparableRecoveryTimeWindow( + val fromTime: Instant, + val fromTimestampDiscriminator: Int, + val untilTime: Instant, + val untilTimestampDiscriminator: Int +) { + companion object { + fun from(timeWindow: RecoveryTimeWindow) = + ComparableRecoveryTimeWindow( + timeWindow.fromTime.truncatedTo(ChronoUnit.SECONDS), 0, + timeWindow.untilTime.truncatedTo(ChronoUnit.SECONDS), Int.MAX_VALUE) + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt index 84482acbd6..c68942bd8e 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt @@ -3,14 +3,17 @@ package net.corda.node.services.persistence import net.corda.core.crypto.SecureHash import net.corda.core.flows.DistributionList.ReceiverDistributionList import net.corda.core.flows.DistributionList.SenderDistributionList +import net.corda.core.flows.DistributionRecordKey +import net.corda.core.flows.DistributionRecordType +import net.corda.core.flows.DistributionRecords +import net.corda.core.flows.ReceiverDistributionRecord import net.corda.core.flows.RecoveryTimeWindow +import net.corda.core.flows.SenderDistributionRecord import net.corda.core.flows.TransactionMetadata import net.corda.core.identity.CordaX500Name import net.corda.core.internal.NamedCacheFactory -import net.corda.core.internal.VisibleForTesting import net.corda.core.node.StatesToRecord import net.corda.core.node.services.vault.Sort -import net.corda.core.serialization.CordaSerializable import net.corda.core.utilities.OpaqueBytes import net.corda.node.CordaClock import net.corda.node.services.EncryptionService @@ -20,6 +23,7 @@ import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import org.hibernate.annotations.Immutable import java.io.Serializable import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.concurrent.atomic.AtomicInteger import javax.persistence.Column import javax.persistence.Embeddable @@ -54,7 +58,6 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, constructor(key: Key) : this(key.txId.toString(), key.partyId.toString(), key.timestamp, key.timestampDiscriminator) } - @CordaSerializable @Entity @Table(name = "${NODE_DATABASE_PREFIX}sender_distr_recs") data class DBSenderDistributionRecord( @@ -69,17 +72,22 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, @Column(name = "receiver_states_to_record", nullable = false) var receiverStatesToRecord: StatesToRecord ) { + fun key() = DistributionRecordKey( + SecureHash.parse(this.compositeKey.txId), + this.compositeKey.timestamp, + this.compositeKey.timestampDiscriminator) + fun toSenderDistributionRecord() = SenderDistributionRecord( SecureHash.parse(this.compositeKey.txId), SecureHash.parse(this.compositeKey.peerPartyId), + this.compositeKey.timestamp, + this.compositeKey.timestampDiscriminator, this.senderStatesToRecord, - this.receiverStatesToRecord, - this.compositeKey.timestamp + this.receiverStatesToRecord ) } - @CordaSerializable @Entity @Table(name = "${NODE_DATABASE_PREFIX}receiver_distr_recs") data class DBReceiverDistributionRecord( @@ -100,13 +108,20 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, distributionList = encryptedDistributionList, receiverStatesToRecord = receiverStatesToRecord ) - @VisibleForTesting + + fun key() = DistributionRecordKey( + SecureHash.parse(this.compositeKey.txId), + this.compositeKey.timestamp, + this.compositeKey.timestampDiscriminator) + fun toReceiverDistributionRecord(): ReceiverDistributionRecord { return ReceiverDistributionRecord( SecureHash.parse(this.compositeKey.txId), SecureHash.parse(this.compositeKey.peerPartyId), + this.compositeKey.timestamp, + this.compositeKey.timestampDiscriminator, OpaqueBytes(this.distributionList), - this.compositeKey.timestamp + this.receiverStatesToRecord ) } } @@ -137,7 +152,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, override fun addSenderTransactionRecoveryMetadata(txId: SecureHash, metadata: TransactionMetadata): ByteArray { return database.transaction { - val senderRecordingTimestamp = clock.instant() + val senderRecordingTimestamp = clock.instant().truncatedTo(ChronoUnit.SECONDS) val timeDiscriminator = Key.nextDiscriminatorNumber.andIncrement val distributionList = metadata.distributionList as? SenderDistributionList ?: throw IllegalStateException("Expecting SenderDistributionList") distributionList.peersToStatesToRecord.map { (peerCordaX500Name, peerStatesToRecord) -> @@ -174,7 +189,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, distributionList.opaqueData, distributionList.receiverStatesToRecord ) - session.save(receiverDistributionRecord) + session.saveOrUpdate(receiverDistributionRecord) } } else -> throw IllegalStateException("Expecting ReceiverDistributionList") @@ -200,9 +215,9 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, } fun queryDistributionRecords(timeWindow: RecoveryTimeWindow, - recordType: DistributionRecordType = DistributionRecordType.ALL, - excludingTxnIds: Set = emptySet(), - orderByTimestamp: Sort.Direction? = null + recordType: DistributionRecordType = DistributionRecordType.ALL, + excludingTxnIds: Set = emptySet(), + orderByTimestamp: Sort.Direction? = null ): DistributionRecords { return when(recordType) { DistributionRecordType.SENDER -> @@ -224,7 +239,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, peers: Set = emptySet(), excludingTxnIds: Set = emptySet(), orderByTimestamp: Sort.Direction? = null - ): List { + ): List { return database.transaction { val criteriaBuilder = session.criteriaBuilder val criteriaQuery = criteriaBuilder.createQuery(DBSenderDistributionRecord::class.java) @@ -253,7 +268,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, criteriaQuery.orderBy(orderCriteria) } session.createQuery(criteriaQuery).resultList - } + }.map { it.toSenderDistributionRecord() } } @Suppress("SpreadOperator") @@ -261,7 +276,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, initiators: Set = emptySet(), excludingTxnIds: Set = emptySet(), orderByTimestamp: Sort.Direction? = null - ): List { + ): List { return database.transaction { val criteriaBuilder = session.criteriaBuilder val criteriaQuery = criteriaBuilder.createQuery(DBReceiverDistributionRecord::class.java) @@ -277,7 +292,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, } if (initiators.isNotEmpty()) { val initiatorPartyIds = initiators.map { partyInfoCache.getPartyIdByCordaX500Name(it).toString() } - predicates.add(criteriaBuilder.and(compositeKey.get(PersistentKey::peerPartyId.name).`in`(initiatorPartyIds))) + predicates.add(criteriaBuilder.and(compositeKey.get(PersistentKey::peerPartyId.name).`in`(initiatorPartyIds))) } criteriaQuery.where(*predicates.toTypedArray()) // optionally order by timestamp @@ -290,7 +305,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, criteriaQuery.orderBy(orderCriteria) } session.createQuery(criteriaQuery).resultList - } + }.map { it.toReceiverDistributionRecord() } } fun decryptHashedDistributionList(encryptedBytes: ByteArray): HashedDistributionList { @@ -298,43 +313,3 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, } } - -@CordaSerializable -class DistributionRecords( - val senderRecords: List = emptyList(), - val receiverRecords: List = emptyList() -) { - init { - require(senderRecords.isNotEmpty() || receiverRecords.isNotEmpty()) { "Must set senderRecords or receiverRecords or both." } - } - - val size = senderRecords.size + receiverRecords.size -} - -@CordaSerializable -abstract class DistributionRecord { - abstract val txId: SecureHash - abstract val timestamp: Instant -} - -@CordaSerializable -data class SenderDistributionRecord( - override val txId: SecureHash, - val peerPartyId: SecureHash, // CordaX500Name hashCode() - val senderStatesToRecord: StatesToRecord, - val receiverStatesToRecord: StatesToRecord, - override val timestamp: Instant -) : DistributionRecord() - -@CordaSerializable -data class ReceiverDistributionRecord( - override val txId: SecureHash, - val initiatorPartyId: SecureHash, // CordaX500Name hashCode() - val encryptedDistributionList: OpaqueBytes, - override val timestamp: Instant -) : DistributionRecord() - -@CordaSerializable -enum class DistributionRecordType { - SENDER, RECEIVER, ALL -} diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt index 60a48a073c..c4a98b1093 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt @@ -8,7 +8,10 @@ import net.corda.core.crypto.SignatureMetadata import net.corda.core.crypto.sign import net.corda.core.flows.DistributionList.ReceiverDistributionList import net.corda.core.flows.DistributionList.SenderDistributionList +import net.corda.core.flows.DistributionRecordType +import net.corda.core.flows.ReceiverDistributionRecord import net.corda.core.flows.RecoveryTimeWindow +import net.corda.core.flows.SenderDistributionRecord import net.corda.core.flows.TransactionMetadata import net.corda.core.node.NodeInfo import net.corda.core.node.StatesToRecord.ALL_VISIBLE @@ -18,7 +21,6 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.CordaClock -import net.corda.node.SimpleClock import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.network.PersistentPartyInfoCache @@ -40,6 +42,7 @@ import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.configureDatabase import net.corda.testing.internal.createWireTransaction import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import net.corda.testing.node.TestClock import net.corda.testing.node.internal.MockEncryptionService import org.assertj.core.api.Assertions.assertThat import org.junit.After @@ -48,7 +51,8 @@ import org.junit.Rule import org.junit.Test import java.security.KeyPair import java.time.Clock -import java.time.Instant.now +import java.time.Duration +import java.time.Instant import java.time.temporal.ChronoUnit import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -84,9 +88,13 @@ class DBTransactionStorageLedgerRecoveryTests { database.close() } + fun now(): Instant { + return transactionRecovery.clock.instant() + } + @Test(timeout = 300_000) fun `query local ledger for transactions with recovery peers within time window`() { - val beforeFirstTxn = now() + val beforeFirstTxn = now().truncatedTo(ChronoUnit.SECONDS) val txn = newTransaction() transactionRecovery.addUnnotarisedTransaction(txn) transactionRecovery.addSenderTransactionRecoveryMetadata(txn.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT)))) @@ -94,13 +102,14 @@ class DBTransactionStorageLedgerRecoveryTests { untilTime = beforeFirstTxn.plus(1, ChronoUnit.MINUTES)) val results = transactionRecovery.querySenderDistributionRecords(timeWindow) assertEquals(1, results.size) - - val afterFirstTxn = now() + (transactionRecovery.clock as TestClock).advanceBy(Duration.ofSeconds(1)) + val afterFirstTxn = now().truncatedTo(ChronoUnit.SECONDS) val txn2 = newTransaction() transactionRecovery.addUnnotarisedTransaction(txn2) transactionRecovery.addSenderTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT)))) assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow).size) - assertEquals(1, transactionRecovery.querySenderDistributionRecords(RecoveryTimeWindow(fromTime = afterFirstTxn)).size) + assertEquals(1, transactionRecovery.querySenderDistributionRecords(RecoveryTimeWindow(fromTime = afterFirstTxn, + untilTime = afterFirstTxn.plus(1, ChronoUnit.MINUTES))).size) } @Test(timeout = 300_000) @@ -114,7 +123,7 @@ class DBTransactionStorageLedgerRecoveryTests { val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) val results = transactionRecovery.querySenderDistributionRecords(timeWindow, excludingTxnIds = setOf(transaction1.id)) assertEquals(1, results.size) - assertEquals(transaction2.id.toString(), results[0].compositeKey.txId) + assertEquals(transaction2.id, results[0].txId) } @Test(timeout = 300_000) @@ -128,7 +137,7 @@ class DBTransactionStorageLedgerRecoveryTests { val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) val results = transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME)) assertEquals(1, results.size) - assertEquals(transaction2.id.toString(), results[0].compositeKey.txId) + assertEquals(transaction2.id, results[0].txId) } @Test(timeout = 300_000) @@ -147,13 +156,13 @@ class DBTransactionStorageLedgerRecoveryTests { val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.SENDER).let { assertEquals(2, it.size) - assertEquals(SecureHash.sha256(BOB_NAME.toString()).toString(), it.senderRecords[0].compositeKey.peerPartyId) + assertEquals(SecureHash.sha256(BOB_NAME.toString()), it.senderRecords[0].peerPartyId) assertEquals(ALL_VISIBLE, it.senderRecords[0].senderStatesToRecord) } transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.RECEIVER).let { assertEquals(1, it.size) - assertEquals(SecureHash.sha256(BOB_NAME.toString()).toString(), it.receiverRecords[0].compositeKey.peerPartyId) - assertEquals(ALL_VISIBLE, (HashedDistributionList.decrypt(it.receiverRecords[0].distributionList, encryptionService)).peerHashToStatesToRecord.map { it.value }[0]) + assertEquals(SecureHash.sha256(BOB_NAME.toString()), it.receiverRecords[0].peerPartyId) + assertEquals(ALL_VISIBLE, (HashedDistributionList.decrypt(it.receiverRecords[0].encryptedDistributionList.bytes, encryptionService)).peerHashToStatesToRecord.map { it.value }[0]) } val resultsAll = transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.ALL) assertEquals(3, resultsAll.size) @@ -224,9 +233,9 @@ class DBTransactionStorageLedgerRecoveryTests { val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(ALICE_NAME)).let { assertEquals(3, it.size) - assertEquals(HashedDistributionList.decrypt(it[0].distributionList, encryptionService).peerHashToStatesToRecord.map { it.value }[0], ALL_VISIBLE) - assertEquals(HashedDistributionList.decrypt(it[1].distributionList, encryptionService).peerHashToStatesToRecord.map { it.value }[0], ONLY_RELEVANT) - assertEquals(HashedDistributionList.decrypt(it[2].distributionList, encryptionService).peerHashToStatesToRecord.map { it.value }[0], NONE) + assertEquals(HashedDistributionList.decrypt(it[0].encryptedDistributionList.bytes, encryptionService).peerHashToStatesToRecord.map { it.value }[0], ALL_VISIBLE) + assertEquals(HashedDistributionList.decrypt(it[1].encryptedDistributionList.bytes, encryptionService).peerHashToStatesToRecord.map { it.value }[0], ONLY_RELEVANT) + assertEquals(HashedDistributionList.decrypt(it[2].encryptedDistributionList.bytes, encryptionService).peerHashToStatesToRecord.map { it.value }[0], NONE) } assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(BOB_NAME)).size) assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(CHARLIE_NAME)).size) @@ -266,7 +275,7 @@ class DBTransactionStorageLedgerRecoveryTests { val distList = transactionRecovery.decryptHashedDistributionList(record.encryptedDistributionList.bytes) assertEquals(ONLY_RELEVANT, distList.senderStatesToRecord) assertEquals(ALL_VISIBLE, distList.peerHashToStatesToRecord.values.first()) - assertEquals(ALICE_NAME, partyInfoCache.getCordaX500NameByPartyId(record.initiatorPartyId)) + assertEquals(ALICE_NAME, partyInfoCache.getCordaX500NameByPartyId(record.peerPartyId)) assertEquals(setOf(BOB_NAME), distList.peerHashToStatesToRecord.map { (peer) -> partyInfoCache.getCordaX500NameByPartyId(peer) }.toSet() ) } } @@ -364,7 +373,7 @@ class DBTransactionStorageLedgerRecoveryTests { return fromDb[0].toReceiverDistributionRecord() } - private fun newTransactionRecovery(cacheSizeBytesOverride: Long? = null, clock: CordaClock = SimpleClock(Clock.systemUTC())) { + private fun newTransactionRecovery(cacheSizeBytesOverride: Long? = null, clock: CordaClock = TestClock(Clock.systemUTC())) { val networkMapCache = PersistentNetworkMapCache(TestingNamedCacheFactory(), database, InMemoryIdentityService(trustRoot = DEV_ROOT_CA.certificate)) val alice = createNodeInfo(listOf(ALICE)) val bob = createNodeInfo(listOf(BOB))