From a66042ec097f61791eb567067d6d98be948c5b26 Mon Sep 17 00:00:00 2001 From: Jose Coll Date: Tue, 10 Oct 2023 17:41:49 +0100 Subject: [PATCH] ENT-10110 Ledger Recovery database table tweaks (#7528) --- .../coretests/flows/FinalityFlowTests.kt | 16 ++--- .../DBTransactionStorageLedgerRecovery.kt | 58 ++++++++++--------- .../migration/node-core.changelog-v25.xml | 5 +- ...DBTransactionStorageLedgerRecoveryTests.kt | 21 +++---- 4 files changed, 53 insertions(+), 47 deletions(-) diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt index bfbb93c96b..0c2c265efb 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt @@ -123,7 +123,6 @@ class FinalityFlowTests : WithFinality { fun `allow use of the old API if the CorDapp target version is 3`() { val oldBob = createBob(cordapps = listOf(tokenOldCordapp())) val stx = aliceNode.issuesCashTo(oldBob) - @Suppress("DEPRECATION") aliceNode.startFlowAndRunNetwork(OldFinalityInvoker(stx)).resultFuture.getOrThrow() assertThat(oldBob.services.validatedTransactions.getTransaction(stx.id)).isNotNull } @@ -239,7 +238,7 @@ class FinalityFlowTests : WithFinality { private fun assertTxnRemovedFromDatabase(node: TestStartedNode, stxId: SecureHash) { val fromDb = node.database.transaction { session.createQuery( - "from ${DBTransactionStorage.DBTransaction::class.java.name} where txId = :transactionId", + "from ${DBTransactionStorage.DBTransaction::class.java.name} where tx_id = :transactionId", DBTransactionStorage.DBTransaction::class.java ).setParameter("transactionId", stxId.toString()).resultList } @@ -354,7 +353,8 @@ class FinalityFlowTests : WithFinality { val sdrs = getSenderRecoveryData(stx.id, aliceNode.database).apply { assertEquals(1, this.size) - assertEquals(StatesToRecord.ALL_VISIBLE, this[0].statesToRecord) + assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].senderStatesToRecord) + assertEquals(StatesToRecord.ALL_VISIBLE, this[0].receiverStatesToRecord) assertEquals(SecureHash.sha256(BOB_NAME.toString()), this[0].peerPartyId) } val rdr = getReceiverRecoveryData(stx.id, bobNode).apply { @@ -387,9 +387,9 @@ class FinalityFlowTests : WithFinality { val sdrs = getSenderRecoveryData(stx.id, aliceNode.database).apply { assertEquals(2, this.size) - assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord) + assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].senderStatesToRecord) assertEquals(SecureHash.sha256(BOB_NAME.toString()), this[0].peerPartyId) - assertEquals(StatesToRecord.ALL_VISIBLE, this[1].statesToRecord) + assertEquals(StatesToRecord.ALL_VISIBLE, this[1].receiverStatesToRecord) assertEquals(SecureHash.sha256(CHARLIE_NAME.toString()), this[1].peerPartyId) } val rdr = getReceiverRecoveryData(stx.id, bobNode).apply { @@ -451,7 +451,7 @@ class FinalityFlowTests : WithFinality { val sdr = getSenderRecoveryData(stx.id, aliceNode.database).apply { assertEquals(1, this.size) - assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord) + assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].senderStatesToRecord) assertEquals(SecureHash.sha256(BOB_NAME.toString()), this[0].peerPartyId) } val rdr = getReceiverRecoveryData(stx.id, bobNode).apply { @@ -467,7 +467,7 @@ class FinalityFlowTests : WithFinality { private fun getSenderRecoveryData(id: SecureHash, database: CordaPersistence): List { val fromDb = database.transaction { session.createQuery( - "from ${DBSenderDistributionRecord::class.java.name} where txId = :transactionId", + "from ${DBSenderDistributionRecord::class.java.name} where transaction_id = :transactionId", DBSenderDistributionRecord::class.java ).setParameter("transactionId", id.toString()).resultList } @@ -477,7 +477,7 @@ class FinalityFlowTests : WithFinality { private fun getReceiverRecoveryData(txId: SecureHash, receiver: TestStartedNode): ReceiverDistributionRecord? { return receiver.database.transaction { session.createQuery( - "from ${DBReceiverDistributionRecord::class.java.name} where txId = :transactionId", + "from ${DBReceiverDistributionRecord::class.java.name} where transaction_id = :transactionId", DBReceiverDistributionRecord::class.java ).setParameter("transactionId", txId.toString()).resultList }.singleOrNull()?.toReceiverDistributionRecord() diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt index 84ea4869ca..84482acbd6 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt @@ -38,7 +38,9 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, @Embeddable @Immutable data class PersistentKey( - /** PartyId of flow peer **/ + @Column(name = "transaction_id", length = 144, nullable = false) + var txId: String, + @Column(name = "peer_party_id", length = 144, nullable = false) var peerPartyId: String, @@ -49,7 +51,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, var timestampDiscriminator: Int ) : Serializable { - constructor(key: Key) : this(key.partyId.toString(), key.timestamp, key.timestampDiscriminator) + constructor(key: Key) : this(key.txId.toString(), key.partyId.toString(), key.timestamp, key.timestampDiscriminator) } @CordaSerializable @@ -59,18 +61,20 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, @EmbeddedId var compositeKey: PersistentKey, - @Column(name = "transaction_id", length = 144, nullable = false) - var txId: String, + /** states to record: NONE, ALL_VISIBLE, ONLY_RELEVANT */ + @Column(name = "sender_states_to_record", nullable = false) + var senderStatesToRecord: StatesToRecord, /** states to record: NONE, ALL_VISIBLE, ONLY_RELEVANT */ - @Column(name = "states_to_record", nullable = false) - var statesToRecord: StatesToRecord + @Column(name = "receiver_states_to_record", nullable = false) + var receiverStatesToRecord: StatesToRecord ) { fun toSenderDistributionRecord() = SenderDistributionRecord( - SecureHash.parse(this.txId), + SecureHash.parse(this.compositeKey.txId), SecureHash.parse(this.compositeKey.peerPartyId), - this.statesToRecord, + this.senderStatesToRecord, + this.receiverStatesToRecord, this.compositeKey.timestamp ) } @@ -82,9 +86,6 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, @EmbeddedId var compositeKey: PersistentKey, - @Column(name = "transaction_id", length = 144, nullable = false) - var txId: String, - /** Encrypted recovery information for sole use by Sender **/ @Lob @Column(name = "distribution_list", nullable = false) @@ -94,16 +95,15 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, @Column(name = "receiver_states_to_record", nullable = false) val receiverStatesToRecord: StatesToRecord ) { - constructor(key: Key, txId: SecureHash, encryptedDistributionList: ByteArray, receiverStatesToRecord: StatesToRecord) : + constructor(key: Key, encryptedDistributionList: ByteArray, receiverStatesToRecord: StatesToRecord) : this(PersistentKey(key), - txId = txId.toString(), distributionList = encryptedDistributionList, receiverStatesToRecord = receiverStatesToRecord ) @VisibleForTesting fun toReceiverDistributionRecord(): ReceiverDistributionRecord { return ReceiverDistributionRecord( - SecureHash.parse(this.txId), + SecureHash.parse(this.compositeKey.txId), SecureHash.parse(this.compositeKey.peerPartyId), OpaqueBytes(this.distributionList), this.compositeKey.timestamp @@ -124,14 +124,12 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, val partyName: String ) - data class TimestampKey(val timestamp: Instant, val timestampDiscriminator: Int) - class Key( + val txId: SecureHash, val partyId: SecureHash, val timestamp: Instant, val timestampDiscriminator: Int = nextDiscriminatorNumber.andIncrement ) { - constructor(key: TimestampKey, partyId: SecureHash): this(partyId, key.timestamp, key.timestampDiscriminator) companion object { val nextDiscriminatorNumber = AtomicInteger() } @@ -144,8 +142,10 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, val distributionList = metadata.distributionList as? SenderDistributionList ?: throw IllegalStateException("Expecting SenderDistributionList") distributionList.peersToStatesToRecord.map { (peerCordaX500Name, peerStatesToRecord) -> val senderDistributionRecord = DBSenderDistributionRecord( - PersistentKey(Key(TimestampKey(senderRecordingTimestamp, timeDiscriminator), partyInfoCache.getPartyIdByCordaX500Name(peerCordaX500Name))), - txId.toString(), + PersistentKey(Key(txId, + partyInfoCache.getPartyIdByCordaX500Name(peerCordaX500Name), + senderRecordingTimestamp, timeDiscriminator)), + distributionList.senderStatesToRecord, peerStatesToRecord) session.save(senderDistributionRecord) } @@ -170,8 +170,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, val publicHeader = HashedDistributionList.PublicHeader.unauthenticatedDeserialise(distributionList.opaqueData, encryptionService) database.transaction { val receiverDistributionRecord = DBReceiverDistributionRecord( - Key(partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp, publicHeader.timeDiscriminator), - txId, + Key(txId, partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp, publicHeader.timeDiscriminator), distributionList.opaqueData, distributionList.receiverStatesToRecord ) @@ -187,12 +186,14 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, super.removeUnnotarisedTransaction(id) val criteriaBuilder = session.criteriaBuilder val deleteSenderDistributionRecords = criteriaBuilder.createCriteriaDelete(DBSenderDistributionRecord::class.java) - val root = deleteSenderDistributionRecords.from(DBSenderDistributionRecord::class.java) - deleteSenderDistributionRecords.where(criteriaBuilder.equal(root.get(DBSenderDistributionRecord::txId.name), id.toString())) + val rootSender = deleteSenderDistributionRecords.from(DBSenderDistributionRecord::class.java) + val compositeKeySender = rootSender.get("compositeKey") + deleteSenderDistributionRecords.where(criteriaBuilder.equal(compositeKeySender.get(PersistentKey::txId.name), id.toString())) val deletedSenderDistributionRecords = session.createQuery(deleteSenderDistributionRecords).executeUpdate() != 0 val deleteReceiverDistributionRecords = criteriaBuilder.createCriteriaDelete(DBReceiverDistributionRecord::class.java) - val rootReceiverDistributionRecord = deleteReceiverDistributionRecords.from(DBReceiverDistributionRecord::class.java) - deleteReceiverDistributionRecords.where(criteriaBuilder.equal(rootReceiverDistributionRecord.get(DBReceiverDistributionRecord::txId.name), id.toString())) + val rootReceiver = deleteReceiverDistributionRecords.from(DBReceiverDistributionRecord::class.java) + val compositeKeyReceiver = rootReceiver.get("compositeKey") + deleteReceiverDistributionRecords.where(criteriaBuilder.equal(compositeKeyReceiver.get(PersistentKey::txId.name), id.toString())) val deletedReceiverDistributionRecords = session.createQuery(deleteReceiverDistributionRecords).executeUpdate() != 0 deletedSenderDistributionRecords || deletedReceiverDistributionRecords } @@ -233,7 +234,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, predicates.add(criteriaBuilder.greaterThanOrEqualTo(compositeKey.get(PersistentKey::timestamp.name), timeWindow.fromTime)) predicates.add(criteriaBuilder.and(criteriaBuilder.lessThanOrEqualTo(compositeKey.get(PersistentKey::timestamp.name), timeWindow.untilTime))) if (excludingTxnIds.isNotEmpty()) { - predicates.add(criteriaBuilder.and(criteriaBuilder.not(txnMetadata.get(DBSenderDistributionRecord::txId.name).`in`( + predicates.add(criteriaBuilder.and(criteriaBuilder.not(compositeKey.get(PersistentKey::txId.name).`in`( excludingTxnIds.map { it.toString() })))) } if (peers.isNotEmpty()) { @@ -271,7 +272,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, predicates.add(criteriaBuilder.greaterThanOrEqualTo(timestamp, timeWindow.fromTime)) predicates.add(criteriaBuilder.and(criteriaBuilder.lessThanOrEqualTo(timestamp, timeWindow.untilTime))) if (excludingTxnIds.isNotEmpty()) { - val txId = txnMetadata.get(DBSenderDistributionRecord::txId.name) + val txId = compositeKey.get(PersistentKey::txId.name) predicates.add(criteriaBuilder.and(criteriaBuilder.not(txId.`in`(excludingTxnIds.map { it.toString() })))) } if (initiators.isNotEmpty()) { @@ -320,7 +321,8 @@ abstract class DistributionRecord { data class SenderDistributionRecord( override val txId: SecureHash, val peerPartyId: SecureHash, // CordaX500Name hashCode() - val statesToRecord: StatesToRecord, + val senderStatesToRecord: StatesToRecord, + val receiverStatesToRecord: StatesToRecord, override val timestamp: Instant ) : DistributionRecord() diff --git a/node/src/main/resources/migration/node-core.changelog-v25.xml b/node/src/main/resources/migration/node-core.changelog-v25.xml index 0e32e3f0c4..bc31223fe7 100644 --- a/node/src/main/resources/migration/node-core.changelog-v25.xml +++ b/node/src/main/resources/migration/node-core.changelog-v25.xml @@ -24,7 +24,10 @@ - + + + + diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt index 85e086ad61..60a48a073c 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt @@ -114,7 +114,7 @@ class DBTransactionStorageLedgerRecoveryTests { val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) val results = transactionRecovery.querySenderDistributionRecords(timeWindow, excludingTxnIds = setOf(transaction1.id)) assertEquals(1, results.size) - assertEquals(transaction2.id.toString(), results[0].txId) + assertEquals(transaction2.id.toString(), results[0].compositeKey.txId) } @Test(timeout = 300_000) @@ -128,7 +128,7 @@ class DBTransactionStorageLedgerRecoveryTests { val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) val results = transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME)) assertEquals(1, results.size) - assertEquals(transaction2.id.toString(), results[0].txId) + assertEquals(transaction2.id.toString(), results[0].compositeKey.txId) } @Test(timeout = 300_000) @@ -148,7 +148,7 @@ class DBTransactionStorageLedgerRecoveryTests { transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.SENDER).let { assertEquals(2, it.size) assertEquals(SecureHash.sha256(BOB_NAME.toString()).toString(), it.senderRecords[0].compositeKey.peerPartyId) - assertEquals(ALL_VISIBLE, it.senderRecords[0].statesToRecord) + assertEquals(ALL_VISIBLE, it.senderRecords[0].senderStatesToRecord) } transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.RECEIVER).let { assertEquals(1, it.size) @@ -181,8 +181,8 @@ class DBTransactionStorageLedgerRecoveryTests { val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(BOB_NAME)).let { assertEquals(2, it.size) - assertEquals(it[0].statesToRecord, ALL_VISIBLE) - assertEquals(it[1].statesToRecord, ONLY_RELEVANT) + assertEquals(it[0].senderStatesToRecord, ALL_VISIBLE) + assertEquals(it[1].senderStatesToRecord, ONLY_RELEVANT) } assertEquals(1, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(ALICE_NAME)).size) assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME)).size) @@ -251,7 +251,7 @@ class DBTransactionStorageLedgerRecoveryTests { assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status) readSenderDistributionRecordFromDB(senderTransaction.id).let { assertEquals(1, it.size) - assertEquals(ALL_VISIBLE, it[0].statesToRecord) + assertEquals(ALL_VISIBLE, it[0].senderStatesToRecord) assertEquals(BOB_NAME, partyInfoCache.getCordaX500NameByPartyId(it[0].peerPartyId)) } @@ -280,7 +280,8 @@ class DBTransactionStorageLedgerRecoveryTests { assertEquals(VERIFIED, readTransactionFromDB(transaction.id).status) readSenderDistributionRecordFromDB(transaction.id).apply { assertEquals(1, this.size) - assertEquals(ALL_VISIBLE, this[0].statesToRecord) + assertEquals(ONLY_RELEVANT, this[0].senderStatesToRecord) + assertEquals(ALL_VISIBLE, this[0].receiverStatesToRecord) } } @@ -329,7 +330,7 @@ class DBTransactionStorageLedgerRecoveryTests { private fun readTransactionFromDB(txId: SecureHash): DBTransactionStorage.DBTransaction { val fromDb = database.transaction { session.createQuery( - "from ${DBTransactionStorage.DBTransaction::class.java.name} where txId = :transactionId", + "from ${DBTransactionStorage.DBTransaction::class.java.name} where tx_id = :transactionId", DBTransactionStorage.DBTransaction::class.java ).setParameter("transactionId", txId.toString()).resultList } @@ -341,7 +342,7 @@ class DBTransactionStorageLedgerRecoveryTests { return database.transaction { if (txId != null) session.createQuery( - "from ${DBSenderDistributionRecord::class.java.name} where txId = :transactionId", + "from ${DBSenderDistributionRecord::class.java.name} where transaction_id = :transactionId", DBSenderDistributionRecord::class.java ).setParameter("transactionId", txId.toString()).resultList.map { it.toSenderDistributionRecord() } else @@ -355,7 +356,7 @@ class DBTransactionStorageLedgerRecoveryTests { private fun readReceiverDistributionRecordFromDB(txId: SecureHash): ReceiverDistributionRecord { val fromDb = database.transaction { session.createQuery( - "from ${DBReceiverDistributionRecord::class.java.name} where txId = :transactionId", + "from ${DBReceiverDistributionRecord::class.java.name} where transaction_id = :transactionId", DBReceiverDistributionRecord::class.java ).setParameter("transactionId", txId.toString()).resultList }