ENT-10110 Store Ledger Recovery records only if the transaction was locally stored in the first place. (#7564)

This commit is contained in:
Jose Coll 2023-11-08 08:52:25 +00:00 committed by GitHub
parent 3c34caabc3
commit c445c72d3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 0 additions and 253 deletions

View File

@ -4,16 +4,12 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.DistributionList.ReceiverDistributionList import net.corda.core.flows.DistributionList.ReceiverDistributionList
import net.corda.core.flows.DistributionList.SenderDistributionList import net.corda.core.flows.DistributionList.SenderDistributionList
import net.corda.core.flows.DistributionRecordKey import net.corda.core.flows.DistributionRecordKey
import net.corda.core.flows.DistributionRecordType
import net.corda.core.flows.DistributionRecords
import net.corda.core.flows.ReceiverDistributionRecord import net.corda.core.flows.ReceiverDistributionRecord
import net.corda.core.flows.RecoveryTimeWindow
import net.corda.core.flows.SenderDistributionRecord import net.corda.core.flows.SenderDistributionRecord
import net.corda.core.flows.TransactionMetadata import net.corda.core.flows.TransactionMetadata
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.NamedCacheFactory import net.corda.core.internal.NamedCacheFactory
import net.corda.core.node.StatesToRecord import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.vault.Sort
import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.OpaqueBytes
import net.corda.node.CordaClock import net.corda.node.CordaClock
import net.corda.node.services.EncryptionService import net.corda.node.services.EncryptionService
@ -32,7 +28,6 @@ import javax.persistence.Entity
import javax.persistence.Id import javax.persistence.Id
import javax.persistence.Lob import javax.persistence.Lob
import javax.persistence.Table import javax.persistence.Table
import javax.persistence.criteria.Predicate
class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
cacheFactory: NamedCacheFactory, cacheFactory: NamedCacheFactory,
@ -214,100 +209,6 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
} }
} }
fun queryDistributionRecords(timeWindow: RecoveryTimeWindow,
recordType: DistributionRecordType = DistributionRecordType.ALL,
excludingTxnIds: Set<SecureHash> = emptySet(),
orderByTimestamp: Sort.Direction? = null
): DistributionRecords {
return when(recordType) {
DistributionRecordType.SENDER ->
DistributionRecords(senderRecords =
querySenderDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp))
DistributionRecordType.RECEIVER ->
DistributionRecords(receiverRecords =
queryReceiverDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp))
DistributionRecordType.ALL ->
DistributionRecords(senderRecords =
querySenderDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp),
receiverRecords =
queryReceiverDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp))
}
}
@Suppress("SpreadOperator")
fun querySenderDistributionRecords(timeWindow: RecoveryTimeWindow,
peers: Set<CordaX500Name> = emptySet(),
excludingTxnIds: Set<SecureHash> = emptySet(),
orderByTimestamp: Sort.Direction? = null
): List<SenderDistributionRecord> {
return database.transaction {
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(DBSenderDistributionRecord::class.java)
val txnMetadata = criteriaQuery.from(DBSenderDistributionRecord::class.java)
val predicates = mutableListOf<Predicate>()
val compositeKey = txnMetadata.get<PersistentKey>("compositeKey")
predicates.add(criteriaBuilder.greaterThanOrEqualTo(compositeKey.get<Instant>(PersistentKey::timestamp.name), timeWindow.fromTime))
predicates.add(criteriaBuilder.and(criteriaBuilder.lessThanOrEqualTo(compositeKey.get<Instant>(PersistentKey::timestamp.name), timeWindow.untilTime)))
if (excludingTxnIds.isNotEmpty()) {
predicates.add(criteriaBuilder.and(criteriaBuilder.not(compositeKey.get<String>(PersistentKey::txId.name).`in`(
excludingTxnIds.map { it.toString() }))))
}
if (peers.isNotEmpty()) {
val peerPartyIds = peers.map { partyInfoCache.getPartyIdByCordaX500Name(it).toString() }
predicates.add(criteriaBuilder.and(compositeKey.get<Long>(PersistentKey::peerPartyId.name).`in`(peerPartyIds)))
}
criteriaQuery.where(*predicates.toTypedArray())
// optionally order by timestamp
orderByTimestamp?.let {
val orderCriteria =
when (orderByTimestamp) {
// when adding column position of 'group by' shift in case columns were removed
Sort.Direction.ASC -> criteriaBuilder.asc(compositeKey.get<Instant>(PersistentKey::timestamp.name))
Sort.Direction.DESC -> criteriaBuilder.desc(compositeKey.get<Instant>(PersistentKey::timestamp.name))
}
criteriaQuery.orderBy(orderCriteria)
}
session.createQuery(criteriaQuery).resultList
}.map { it.toSenderDistributionRecord() }
}
@Suppress("SpreadOperator")
fun queryReceiverDistributionRecords(timeWindow: RecoveryTimeWindow,
initiators: Set<CordaX500Name> = emptySet(),
excludingTxnIds: Set<SecureHash> = emptySet(),
orderByTimestamp: Sort.Direction? = null
): List<ReceiverDistributionRecord> {
return database.transaction {
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(DBReceiverDistributionRecord::class.java)
val txnMetadata = criteriaQuery.from(DBReceiverDistributionRecord::class.java)
val predicates = mutableListOf<Predicate>()
val compositeKey = txnMetadata.get<PersistentKey>("compositeKey")
val timestamp = compositeKey.get<Instant>(PersistentKey::timestamp.name)
predicates.add(criteriaBuilder.greaterThanOrEqualTo(timestamp, timeWindow.fromTime))
predicates.add(criteriaBuilder.and(criteriaBuilder.lessThanOrEqualTo(timestamp, timeWindow.untilTime)))
if (excludingTxnIds.isNotEmpty()) {
val txId = compositeKey.get<String>(PersistentKey::txId.name)
predicates.add(criteriaBuilder.and(criteriaBuilder.not(txId.`in`(excludingTxnIds.map { it.toString() }))))
}
if (initiators.isNotEmpty()) {
val initiatorPartyIds = initiators.map { partyInfoCache.getPartyIdByCordaX500Name(it).toString() }
predicates.add(criteriaBuilder.and(compositeKey.get<String>(PersistentKey::peerPartyId.name).`in`(initiatorPartyIds)))
}
criteriaQuery.where(*predicates.toTypedArray())
// optionally order by timestamp
orderByTimestamp?.let {
val orderCriteria = when (orderByTimestamp) {
// when adding column position of 'group by' shift in case columns were removed
Sort.Direction.ASC -> criteriaBuilder.asc(timestamp)
Sort.Direction.DESC -> criteriaBuilder.desc(timestamp)
}
criteriaQuery.orderBy(orderCriteria)
}
session.createQuery(criteriaQuery).resultList
}.map { it.toReceiverDistributionRecord() }
}
fun decryptHashedDistributionList(encryptedBytes: ByteArray): HashedDistributionList { fun decryptHashedDistributionList(encryptedBytes: ByteArray): HashedDistributionList {
return HashedDistributionList.decrypt(encryptedBytes, encryptionService) return HashedDistributionList.decrypt(encryptedBytes, encryptionService)
} }

View File

@ -8,9 +8,7 @@ import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.sign import net.corda.core.crypto.sign
import net.corda.core.flows.DistributionList.ReceiverDistributionList import net.corda.core.flows.DistributionList.ReceiverDistributionList
import net.corda.core.flows.DistributionList.SenderDistributionList import net.corda.core.flows.DistributionList.SenderDistributionList
import net.corda.core.flows.DistributionRecordType
import net.corda.core.flows.ReceiverDistributionRecord import net.corda.core.flows.ReceiverDistributionRecord
import net.corda.core.flows.RecoveryTimeWindow
import net.corda.core.flows.SenderDistributionRecord import net.corda.core.flows.SenderDistributionRecord
import net.corda.core.flows.TransactionMetadata import net.corda.core.flows.TransactionMetadata
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
@ -51,9 +49,7 @@ import org.junit.Rule
import org.junit.Test import org.junit.Test
import java.security.KeyPair import java.security.KeyPair
import java.time.Clock import java.time.Clock
import java.time.Duration
import java.time.Instant import java.time.Instant
import java.time.temporal.ChronoUnit
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
import kotlin.test.assertNull import kotlin.test.assertNull
@ -92,156 +88,6 @@ class DBTransactionStorageLedgerRecoveryTests {
return transactionRecovery.clock.instant() return transactionRecovery.clock.instant()
} }
@Test(timeout = 300_000)
fun `query local ledger for transactions with recovery peers within time window`() {
val beforeFirstTxn = now().truncatedTo(ChronoUnit.SECONDS)
val txn = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))))
val timeWindow = RecoveryTimeWindow(fromTime = beforeFirstTxn,
untilTime = beforeFirstTxn.plus(1, ChronoUnit.MINUTES))
val results = transactionRecovery.querySenderDistributionRecords(timeWindow)
assertEquals(1, results.size)
(transactionRecovery.clock as TestClock).advanceBy(Duration.ofSeconds(1))
val afterFirstTxn = now().truncatedTo(ChronoUnit.SECONDS)
val txn2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn2)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT))))
assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow).size)
assertEquals(1, transactionRecovery.querySenderDistributionRecords(RecoveryTimeWindow(fromTime = afterFirstTxn,
untilTime = afterFirstTxn.plus(1, ChronoUnit.MINUTES))).size)
}
@Test(timeout = 300_000)
fun `query local ledger for transactions within timeWindow and excluding remoteTransactionIds`() {
val transaction1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction1)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))))
val transaction2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction2)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction2.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))))
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
val results = transactionRecovery.querySenderDistributionRecords(timeWindow, excludingTxnIds = setOf(transaction1.id))
assertEquals(1, results.size)
assertEquals(transaction2.id, results[0].txId)
}
@Test(timeout = 300_000)
fun `query local ledger for transactions within timeWindow and for given peers`() {
val transaction1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction1)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))))
val transaction2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction2)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction2.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(CHARLIE_NAME to ONLY_RELEVANT))))
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
val results = transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME))
assertEquals(1, results.size)
assertEquals(transaction2.id, results[0].txId)
}
@Test(timeout = 300_000)
fun `query local ledger by distribution record type`() {
val transaction1 = newTransaction()
// sender txn
transactionRecovery.addUnnotarisedTransaction(transaction1)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))))
val transaction2 = newTransaction()
// receiver txn
transactionRecovery.addUnnotarisedTransaction(transaction2)
val encryptedDL = transactionRecovery.addSenderTransactionRecoveryMetadata(transaction2.id,
TransactionMetadata(BOB_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(transaction2.id, BOB_NAME,
TransactionMetadata(BOB_NAME, ReceiverDistributionList(encryptedDL, ALL_VISIBLE)))
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.SENDER).let {
assertEquals(2, it.size)
assertEquals(SecureHash.sha256(BOB_NAME.toString()), it.senderRecords[0].peerPartyId)
assertEquals(ALL_VISIBLE, it.senderRecords[0].senderStatesToRecord)
}
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.RECEIVER).let {
assertEquals(1, it.size)
assertEquals(SecureHash.sha256(BOB_NAME.toString()), it.receiverRecords[0].peerPartyId)
assertEquals(ALL_VISIBLE, (HashedDistributionList.decrypt(it.receiverRecords[0].encryptedDistributionList.bytes, encryptionService)).peerHashToStatesToRecord.map { it.value }[0])
}
val resultsAll = transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.ALL)
assertEquals(3, resultsAll.size)
}
@Test(timeout = 300_000)
fun `query for sender distribution records by peers`() {
val txn1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn1)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn1.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))))
val txn2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn2)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT))))
val txn3 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn3)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn3.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT, CHARLIE_NAME to ALL_VISIBLE))))
val txn4 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn4)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn4.id, TransactionMetadata(BOB_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ONLY_RELEVANT))))
val txn5 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn5)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn5.id, TransactionMetadata(CHARLIE_NAME, SenderDistributionList(ONLY_RELEVANT, emptyMap())))
assertEquals(5, readSenderDistributionRecordFromDB().size)
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(BOB_NAME)).let {
assertEquals(2, it.size)
assertEquals(it[0].senderStatesToRecord, ALL_VISIBLE)
assertEquals(it[1].senderStatesToRecord, ONLY_RELEVANT)
}
assertEquals(1, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(ALICE_NAME)).size)
assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME)).size)
}
@Test(timeout = 300_000)
fun `query for receiver distribution records by initiator`() {
val txn1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn1)
val encryptedDL1 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn1.id,
TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE, CHARLIE_NAME to ALL_VISIBLE))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn1.id, ALICE_NAME,
TransactionMetadata(ALICE_NAME, ReceiverDistributionList(encryptedDL1, ALL_VISIBLE)))
val txn2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn2)
val encryptedDL2 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn2.id,
TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn2.id, ALICE_NAME,
TransactionMetadata(ALICE_NAME, ReceiverDistributionList(encryptedDL2, ONLY_RELEVANT)))
val txn3 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn3)
val encryptedDL3 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn3.id,
TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to NONE))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn3.id, ALICE_NAME,
TransactionMetadata(ALICE_NAME, ReceiverDistributionList(encryptedDL3, NONE)))
val txn4 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn4)
val encryptedDL4 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn4.id,
TransactionMetadata(BOB_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn4.id, BOB_NAME,
TransactionMetadata(BOB_NAME, ReceiverDistributionList(encryptedDL4, ALL_VISIBLE)))
val txn5 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn5)
val encryptedDL5 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn5.id,
TransactionMetadata(CHARLIE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn5.id, CHARLIE_NAME,
TransactionMetadata(CHARLIE_NAME, ReceiverDistributionList(encryptedDL5, ONLY_RELEVANT)))
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(ALICE_NAME)).let {
assertEquals(3, it.size)
assertEquals(HashedDistributionList.decrypt(it[0].encryptedDistributionList.bytes, encryptionService).peerHashToStatesToRecord.map { it.value }[0], ALL_VISIBLE)
assertEquals(HashedDistributionList.decrypt(it[1].encryptedDistributionList.bytes, encryptionService).peerHashToStatesToRecord.map { it.value }[0], ONLY_RELEVANT)
assertEquals(HashedDistributionList.decrypt(it[2].encryptedDistributionList.bytes, encryptionService).peerHashToStatesToRecord.map { it.value }[0], NONE)
}
assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(BOB_NAME)).size)
assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(CHARLIE_NAME)).size)
assertEquals(2, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(BOB_NAME, CHARLIE_NAME)).size)
}
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `transaction without peers does not store recovery metadata in database`() { fun `transaction without peers does not store recovery metadata in database`() {
val senderTransaction = newTransaction() val senderTransaction = newTransaction()