Merge pull request #7575 from corda/merge-release/os/4.11-release/os/4.12-2023-11-14-36

ENT-11130: Merging forward updates from release/os/4.11 to release/os/4.12 - 2023-11-14
This commit is contained in:
Adel El-Beik 2023-12-07 17:54:21 +00:00 committed by GitHub
commit 6c4ab9a440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 12 additions and 256 deletions

View File

@ -37,6 +37,7 @@ import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import net.corda.serialization.internal.CordaSerializationEncoding.SNAPPY
import org.hibernate.annotations.Type
import rx.Observable
import rx.subjects.PublishSubject
import java.time.Instant
@ -77,6 +78,7 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac
val timestamp: Instant,
@Column(name = "signatures")
@Type(type = "corda-blob")
val signatures: ByteArray?
)

View File

@ -4,16 +4,12 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.DistributionList.ReceiverDistributionList
import net.corda.core.flows.DistributionList.SenderDistributionList
import net.corda.core.flows.DistributionRecordKey
import net.corda.core.flows.DistributionRecordType
import net.corda.core.flows.DistributionRecords
import net.corda.core.flows.ReceiverDistributionRecord
import net.corda.core.flows.RecoveryTimeWindow
import net.corda.core.flows.SenderDistributionRecord
import net.corda.core.flows.TransactionMetadata
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.vault.Sort
import net.corda.core.utilities.OpaqueBytes
import net.corda.node.CordaClock
import net.corda.node.services.EncryptionService
@ -32,7 +28,6 @@ import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Lob
import javax.persistence.Table
import javax.persistence.criteria.Predicate
class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
cacheFactory: NamedCacheFactory,
@ -214,100 +209,6 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
}
}
fun queryDistributionRecords(timeWindow: RecoveryTimeWindow,
recordType: DistributionRecordType = DistributionRecordType.ALL,
excludingTxnIds: Set<SecureHash> = emptySet(),
orderByTimestamp: Sort.Direction? = null
): DistributionRecords {
return when(recordType) {
DistributionRecordType.SENDER ->
DistributionRecords(senderRecords =
querySenderDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp))
DistributionRecordType.RECEIVER ->
DistributionRecords(receiverRecords =
queryReceiverDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp))
DistributionRecordType.ALL ->
DistributionRecords(senderRecords =
querySenderDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp),
receiverRecords =
queryReceiverDistributionRecords(timeWindow, excludingTxnIds = excludingTxnIds, orderByTimestamp = orderByTimestamp))
}
}
@Suppress("SpreadOperator")
fun querySenderDistributionRecords(timeWindow: RecoveryTimeWindow,
peers: Set<CordaX500Name> = emptySet(),
excludingTxnIds: Set<SecureHash> = emptySet(),
orderByTimestamp: Sort.Direction? = null
): List<SenderDistributionRecord> {
return database.transaction {
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(DBSenderDistributionRecord::class.java)
val txnMetadata = criteriaQuery.from(DBSenderDistributionRecord::class.java)
val predicates = mutableListOf<Predicate>()
val compositeKey = txnMetadata.get<PersistentKey>("compositeKey")
predicates.add(criteriaBuilder.greaterThanOrEqualTo(compositeKey.get<Instant>(PersistentKey::timestamp.name), timeWindow.fromTime))
predicates.add(criteriaBuilder.and(criteriaBuilder.lessThanOrEqualTo(compositeKey.get<Instant>(PersistentKey::timestamp.name), timeWindow.untilTime)))
if (excludingTxnIds.isNotEmpty()) {
predicates.add(criteriaBuilder.and(criteriaBuilder.not(compositeKey.get<String>(PersistentKey::txId.name).`in`(
excludingTxnIds.map { it.toString() }))))
}
if (peers.isNotEmpty()) {
val peerPartyIds = peers.map { partyInfoCache.getPartyIdByCordaX500Name(it).toString() }
predicates.add(criteriaBuilder.and(compositeKey.get<Long>(PersistentKey::peerPartyId.name).`in`(peerPartyIds)))
}
criteriaQuery.where(*predicates.toTypedArray())
// optionally order by timestamp
orderByTimestamp?.let {
val orderCriteria =
when (orderByTimestamp) {
// when adding column position of 'group by' shift in case columns were removed
Sort.Direction.ASC -> criteriaBuilder.asc(compositeKey.get<Instant>(PersistentKey::timestamp.name))
Sort.Direction.DESC -> criteriaBuilder.desc(compositeKey.get<Instant>(PersistentKey::timestamp.name))
}
criteriaQuery.orderBy(orderCriteria)
}
session.createQuery(criteriaQuery).resultList
}.map { it.toSenderDistributionRecord() }
}
@Suppress("SpreadOperator")
fun queryReceiverDistributionRecords(timeWindow: RecoveryTimeWindow,
initiators: Set<CordaX500Name> = emptySet(),
excludingTxnIds: Set<SecureHash> = emptySet(),
orderByTimestamp: Sort.Direction? = null
): List<ReceiverDistributionRecord> {
return database.transaction {
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(DBReceiverDistributionRecord::class.java)
val txnMetadata = criteriaQuery.from(DBReceiverDistributionRecord::class.java)
val predicates = mutableListOf<Predicate>()
val compositeKey = txnMetadata.get<PersistentKey>("compositeKey")
val timestamp = compositeKey.get<Instant>(PersistentKey::timestamp.name)
predicates.add(criteriaBuilder.greaterThanOrEqualTo(timestamp, timeWindow.fromTime))
predicates.add(criteriaBuilder.and(criteriaBuilder.lessThanOrEqualTo(timestamp, timeWindow.untilTime)))
if (excludingTxnIds.isNotEmpty()) {
val txId = compositeKey.get<String>(PersistentKey::txId.name)
predicates.add(criteriaBuilder.and(criteriaBuilder.not(txId.`in`(excludingTxnIds.map { it.toString() }))))
}
if (initiators.isNotEmpty()) {
val initiatorPartyIds = initiators.map { partyInfoCache.getPartyIdByCordaX500Name(it).toString() }
predicates.add(criteriaBuilder.and(compositeKey.get<String>(PersistentKey::peerPartyId.name).`in`(initiatorPartyIds)))
}
criteriaQuery.where(*predicates.toTypedArray())
// optionally order by timestamp
orderByTimestamp?.let {
val orderCriteria = when (orderByTimestamp) {
// when adding column position of 'group by' shift in case columns were removed
Sort.Direction.ASC -> criteriaBuilder.asc(timestamp)
Sort.Direction.DESC -> criteriaBuilder.desc(timestamp)
}
criteriaQuery.orderBy(orderCriteria)
}
session.createQuery(criteriaQuery).resultList
}.map { it.toReceiverDistributionRecord() }
}
fun decryptHashedDistributionList(encryptedBytes: ByteArray): HashedDistributionList {
return HashedDistributionList.decrypt(encryptedBytes, encryptionService)
}

View File

@ -4,9 +4,16 @@
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="add_signatures_column">
<changeSet author="R3.Corda" id="add_signatures_column" dbms="postgresql">
<addColumn tableName="node_transactions">
<column name="signatures" type="VARBINARY(33554432)"/>
<column name="signatures" type="varbinary(33554432)"/>
</addColumn>
</changeSet>
<changeSet author="R3.Corda" id="add_signatures_column" dbms="!postgresql">
<addColumn tableName="node_transactions">
<column name="signatures" type="blob"/>
</addColumn>
</changeSet>
</databaseChangeLog>

View File

@ -39,7 +39,7 @@
</changeSet>
<changeSet author="R3.Corda" id="node_sender_distr_recs_add_indexes">
<createIndex indexName="node_sender_distr_recs_keyinfo_idx" tableName="node_sender_distr_recs">
<createIndex indexName="node_sender_distr_recs_idx1" tableName="node_sender_distr_recs">
<column name="transaction_id"/>
<column name="timestamp"/>
<column name="timestamp_discriminator"/>

View File

@ -8,9 +8,7 @@ import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.sign
import net.corda.core.flows.DistributionList.ReceiverDistributionList
import net.corda.core.flows.DistributionList.SenderDistributionList
import net.corda.core.flows.DistributionRecordType
import net.corda.core.flows.ReceiverDistributionRecord
import net.corda.core.flows.RecoveryTimeWindow
import net.corda.core.flows.SenderDistributionRecord
import net.corda.core.flows.TransactionMetadata
import net.corda.core.node.NodeInfo
@ -52,9 +50,7 @@ import org.junit.Rule
import org.junit.Test
import java.security.KeyPair
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNull
@ -93,156 +89,6 @@ class DBTransactionStorageLedgerRecoveryTests {
return transactionRecovery.clock.instant()
}
@Test(timeout = 300_000)
fun `query local ledger for transactions with recovery peers within time window`() {
val beforeFirstTxn = now().truncatedTo(ChronoUnit.SECONDS)
val txn = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))))
val timeWindow = RecoveryTimeWindow(fromTime = beforeFirstTxn,
untilTime = beforeFirstTxn.plus(1, ChronoUnit.MINUTES))
val results = transactionRecovery.querySenderDistributionRecords(timeWindow)
assertEquals(1, results.size)
(transactionRecovery.clock as TestClock).advanceBy(Duration.ofSeconds(1))
val afterFirstTxn = now().truncatedTo(ChronoUnit.SECONDS)
val txn2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn2)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT))))
assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow).size)
assertEquals(1, transactionRecovery.querySenderDistributionRecords(RecoveryTimeWindow(fromTime = afterFirstTxn,
untilTime = afterFirstTxn.plus(1, ChronoUnit.MINUTES))).size)
}
@Test(timeout = 300_000)
fun `query local ledger for transactions within timeWindow and excluding remoteTransactionIds`() {
val transaction1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction1)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))))
val transaction2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction2)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction2.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))))
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
val results = transactionRecovery.querySenderDistributionRecords(timeWindow, excludingTxnIds = setOf(transaction1.id))
assertEquals(1, results.size)
assertEquals(transaction2.id, results[0].txId)
}
@Test(timeout = 300_000)
fun `query local ledger for transactions within timeWindow and for given peers`() {
val transaction1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction1)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))))
val transaction2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction2)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction2.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(CHARLIE_NAME to ONLY_RELEVANT))))
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
val results = transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME))
assertEquals(1, results.size)
assertEquals(transaction2.id, results[0].txId)
}
@Test(timeout = 300_000)
fun `query local ledger by distribution record type`() {
val transaction1 = newTransaction()
// sender txn
transactionRecovery.addUnnotarisedTransaction(transaction1)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))))
val transaction2 = newTransaction()
// receiver txn
transactionRecovery.addUnnotarisedTransaction(transaction2)
val encryptedDL = transactionRecovery.addSenderTransactionRecoveryMetadata(transaction2.id,
TransactionMetadata(BOB_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(transaction2.id, BOB_NAME,
TransactionMetadata(BOB_NAME, ReceiverDistributionList(encryptedDL, ALL_VISIBLE)))
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.SENDER).let {
assertEquals(2, it.size)
assertEquals(SecureHash.sha256(BOB_NAME.toString()), it.senderRecords[0].peerPartyId)
assertEquals(ALL_VISIBLE, it.senderRecords[0].senderStatesToRecord)
}
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.RECEIVER).let {
assertEquals(1, it.size)
assertEquals(SecureHash.sha256(BOB_NAME.toString()), it.receiverRecords[0].peerPartyId)
assertEquals(ALL_VISIBLE, (HashedDistributionList.decrypt(it.receiverRecords[0].encryptedDistributionList.bytes, encryptionService)).peerHashToStatesToRecord.map { it.value }[0])
}
val resultsAll = transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.ALL)
assertEquals(3, resultsAll.size)
}
@Test(timeout = 300_000)
fun `query for sender distribution records by peers`() {
val txn1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn1)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn1.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))))
val txn2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn2)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT))))
val txn3 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn3)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn3.id, TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT, CHARLIE_NAME to ALL_VISIBLE))))
val txn4 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn4)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn4.id, TransactionMetadata(BOB_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ONLY_RELEVANT))))
val txn5 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn5)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn5.id, TransactionMetadata(CHARLIE_NAME, SenderDistributionList(ONLY_RELEVANT, emptyMap())))
assertEquals(5, readSenderDistributionRecordFromDB().size)
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(BOB_NAME)).let {
assertEquals(2, it.size)
assertEquals(it[0].senderStatesToRecord, ALL_VISIBLE)
assertEquals(it[1].senderStatesToRecord, ONLY_RELEVANT)
}
assertEquals(1, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(ALICE_NAME)).size)
assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME)).size)
}
@Test(timeout = 300_000)
fun `query for receiver distribution records by initiator`() {
val txn1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn1)
val encryptedDL1 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn1.id,
TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE, CHARLIE_NAME to ALL_VISIBLE))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn1.id, ALICE_NAME,
TransactionMetadata(ALICE_NAME, ReceiverDistributionList(encryptedDL1, ALL_VISIBLE)))
val txn2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn2)
val encryptedDL2 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn2.id,
TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn2.id, ALICE_NAME,
TransactionMetadata(ALICE_NAME, ReceiverDistributionList(encryptedDL2, ONLY_RELEVANT)))
val txn3 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn3)
val encryptedDL3 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn3.id,
TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to NONE))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn3.id, ALICE_NAME,
TransactionMetadata(ALICE_NAME, ReceiverDistributionList(encryptedDL3, NONE)))
val txn4 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn4)
val encryptedDL4 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn4.id,
TransactionMetadata(BOB_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn4.id, BOB_NAME,
TransactionMetadata(BOB_NAME, ReceiverDistributionList(encryptedDL4, ALL_VISIBLE)))
val txn5 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn5)
val encryptedDL5 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn5.id,
TransactionMetadata(CHARLIE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn5.id, CHARLIE_NAME,
TransactionMetadata(CHARLIE_NAME, ReceiverDistributionList(encryptedDL5, ONLY_RELEVANT)))
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(ALICE_NAME)).let {
assertEquals(3, it.size)
assertEquals(HashedDistributionList.decrypt(it[0].encryptedDistributionList.bytes, encryptionService).peerHashToStatesToRecord.map { it.value }[0], ALL_VISIBLE)
assertEquals(HashedDistributionList.decrypt(it[1].encryptedDistributionList.bytes, encryptionService).peerHashToStatesToRecord.map { it.value }[0], ONLY_RELEVANT)
assertEquals(HashedDistributionList.decrypt(it[2].encryptedDistributionList.bytes, encryptionService).peerHashToStatesToRecord.map { it.value }[0], NONE)
}
assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(BOB_NAME)).size)
assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(CHARLIE_NAME)).size)
assertEquals(2, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(BOB_NAME, CHARLIE_NAME)).size)
}
@Test(timeout = 300_000)
fun `transaction without peers does not store recovery metadata in database`() {
val senderTransaction = newTransaction()