mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
ENT-10110 Ledger Recovery database table tweaks (#7528)
This commit is contained in:
parent
6153990ef5
commit
a66042ec09
@ -123,7 +123,6 @@ class FinalityFlowTests : WithFinality {
|
||||
fun `allow use of the old API if the CorDapp target version is 3`() {
|
||||
val oldBob = createBob(cordapps = listOf(tokenOldCordapp()))
|
||||
val stx = aliceNode.issuesCashTo(oldBob)
|
||||
@Suppress("DEPRECATION")
|
||||
aliceNode.startFlowAndRunNetwork(OldFinalityInvoker(stx)).resultFuture.getOrThrow()
|
||||
assertThat(oldBob.services.validatedTransactions.getTransaction(stx.id)).isNotNull
|
||||
}
|
||||
@ -239,7 +238,7 @@ class FinalityFlowTests : WithFinality {
|
||||
private fun assertTxnRemovedFromDatabase(node: TestStartedNode, stxId: SecureHash) {
|
||||
val fromDb = node.database.transaction {
|
||||
session.createQuery(
|
||||
"from ${DBTransactionStorage.DBTransaction::class.java.name} where txId = :transactionId",
|
||||
"from ${DBTransactionStorage.DBTransaction::class.java.name} where tx_id = :transactionId",
|
||||
DBTransactionStorage.DBTransaction::class.java
|
||||
).setParameter("transactionId", stxId.toString()).resultList
|
||||
}
|
||||
@ -354,7 +353,8 @@ class FinalityFlowTests : WithFinality {
|
||||
|
||||
val sdrs = getSenderRecoveryData(stx.id, aliceNode.database).apply {
|
||||
assertEquals(1, this.size)
|
||||
assertEquals(StatesToRecord.ALL_VISIBLE, this[0].statesToRecord)
|
||||
assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].senderStatesToRecord)
|
||||
assertEquals(StatesToRecord.ALL_VISIBLE, this[0].receiverStatesToRecord)
|
||||
assertEquals(SecureHash.sha256(BOB_NAME.toString()), this[0].peerPartyId)
|
||||
}
|
||||
val rdr = getReceiverRecoveryData(stx.id, bobNode).apply {
|
||||
@ -387,9 +387,9 @@ class FinalityFlowTests : WithFinality {
|
||||
|
||||
val sdrs = getSenderRecoveryData(stx.id, aliceNode.database).apply {
|
||||
assertEquals(2, this.size)
|
||||
assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord)
|
||||
assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].senderStatesToRecord)
|
||||
assertEquals(SecureHash.sha256(BOB_NAME.toString()), this[0].peerPartyId)
|
||||
assertEquals(StatesToRecord.ALL_VISIBLE, this[1].statesToRecord)
|
||||
assertEquals(StatesToRecord.ALL_VISIBLE, this[1].receiverStatesToRecord)
|
||||
assertEquals(SecureHash.sha256(CHARLIE_NAME.toString()), this[1].peerPartyId)
|
||||
}
|
||||
val rdr = getReceiverRecoveryData(stx.id, bobNode).apply {
|
||||
@ -451,7 +451,7 @@ class FinalityFlowTests : WithFinality {
|
||||
|
||||
val sdr = getSenderRecoveryData(stx.id, aliceNode.database).apply {
|
||||
assertEquals(1, this.size)
|
||||
assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord)
|
||||
assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].senderStatesToRecord)
|
||||
assertEquals(SecureHash.sha256(BOB_NAME.toString()), this[0].peerPartyId)
|
||||
}
|
||||
val rdr = getReceiverRecoveryData(stx.id, bobNode).apply {
|
||||
@ -467,7 +467,7 @@ class FinalityFlowTests : WithFinality {
|
||||
private fun getSenderRecoveryData(id: SecureHash, database: CordaPersistence): List<SenderDistributionRecord> {
|
||||
val fromDb = database.transaction {
|
||||
session.createQuery(
|
||||
"from ${DBSenderDistributionRecord::class.java.name} where txId = :transactionId",
|
||||
"from ${DBSenderDistributionRecord::class.java.name} where transaction_id = :transactionId",
|
||||
DBSenderDistributionRecord::class.java
|
||||
).setParameter("transactionId", id.toString()).resultList
|
||||
}
|
||||
@ -477,7 +477,7 @@ class FinalityFlowTests : WithFinality {
|
||||
private fun getReceiverRecoveryData(txId: SecureHash, receiver: TestStartedNode): ReceiverDistributionRecord? {
|
||||
return receiver.database.transaction {
|
||||
session.createQuery(
|
||||
"from ${DBReceiverDistributionRecord::class.java.name} where txId = :transactionId",
|
||||
"from ${DBReceiverDistributionRecord::class.java.name} where transaction_id = :transactionId",
|
||||
DBReceiverDistributionRecord::class.java
|
||||
).setParameter("transactionId", txId.toString()).resultList
|
||||
}.singleOrNull()?.toReceiverDistributionRecord()
|
||||
|
@ -38,7 +38,9 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
@Embeddable
|
||||
@Immutable
|
||||
data class PersistentKey(
|
||||
/** PartyId of flow peer **/
|
||||
@Column(name = "transaction_id", length = 144, nullable = false)
|
||||
var txId: String,
|
||||
|
||||
@Column(name = "peer_party_id", length = 144, nullable = false)
|
||||
var peerPartyId: String,
|
||||
|
||||
@ -49,7 +51,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
var timestampDiscriminator: Int
|
||||
|
||||
) : Serializable {
|
||||
constructor(key: Key) : this(key.partyId.toString(), key.timestamp, key.timestampDiscriminator)
|
||||
constructor(key: Key) : this(key.txId.toString(), key.partyId.toString(), key.timestamp, key.timestampDiscriminator)
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
@ -59,18 +61,20 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
@EmbeddedId
|
||||
var compositeKey: PersistentKey,
|
||||
|
||||
@Column(name = "transaction_id", length = 144, nullable = false)
|
||||
var txId: String,
|
||||
/** states to record: NONE, ALL_VISIBLE, ONLY_RELEVANT */
|
||||
@Column(name = "sender_states_to_record", nullable = false)
|
||||
var senderStatesToRecord: StatesToRecord,
|
||||
|
||||
/** states to record: NONE, ALL_VISIBLE, ONLY_RELEVANT */
|
||||
@Column(name = "states_to_record", nullable = false)
|
||||
var statesToRecord: StatesToRecord
|
||||
@Column(name = "receiver_states_to_record", nullable = false)
|
||||
var receiverStatesToRecord: StatesToRecord
|
||||
) {
|
||||
fun toSenderDistributionRecord() =
|
||||
SenderDistributionRecord(
|
||||
SecureHash.parse(this.txId),
|
||||
SecureHash.parse(this.compositeKey.txId),
|
||||
SecureHash.parse(this.compositeKey.peerPartyId),
|
||||
this.statesToRecord,
|
||||
this.senderStatesToRecord,
|
||||
this.receiverStatesToRecord,
|
||||
this.compositeKey.timestamp
|
||||
)
|
||||
}
|
||||
@ -82,9 +86,6 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
@EmbeddedId
|
||||
var compositeKey: PersistentKey,
|
||||
|
||||
@Column(name = "transaction_id", length = 144, nullable = false)
|
||||
var txId: String,
|
||||
|
||||
/** Encrypted recovery information for sole use by Sender **/
|
||||
@Lob
|
||||
@Column(name = "distribution_list", nullable = false)
|
||||
@ -94,16 +95,15 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
@Column(name = "receiver_states_to_record", nullable = false)
|
||||
val receiverStatesToRecord: StatesToRecord
|
||||
) {
|
||||
constructor(key: Key, txId: SecureHash, encryptedDistributionList: ByteArray, receiverStatesToRecord: StatesToRecord) :
|
||||
constructor(key: Key, encryptedDistributionList: ByteArray, receiverStatesToRecord: StatesToRecord) :
|
||||
this(PersistentKey(key),
|
||||
txId = txId.toString(),
|
||||
distributionList = encryptedDistributionList,
|
||||
receiverStatesToRecord = receiverStatesToRecord
|
||||
)
|
||||
@VisibleForTesting
|
||||
fun toReceiverDistributionRecord(): ReceiverDistributionRecord {
|
||||
return ReceiverDistributionRecord(
|
||||
SecureHash.parse(this.txId),
|
||||
SecureHash.parse(this.compositeKey.txId),
|
||||
SecureHash.parse(this.compositeKey.peerPartyId),
|
||||
OpaqueBytes(this.distributionList),
|
||||
this.compositeKey.timestamp
|
||||
@ -124,14 +124,12 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
val partyName: String
|
||||
)
|
||||
|
||||
data class TimestampKey(val timestamp: Instant, val timestampDiscriminator: Int)
|
||||
|
||||
class Key(
|
||||
val txId: SecureHash,
|
||||
val partyId: SecureHash,
|
||||
val timestamp: Instant,
|
||||
val timestampDiscriminator: Int = nextDiscriminatorNumber.andIncrement
|
||||
) {
|
||||
constructor(key: TimestampKey, partyId: SecureHash): this(partyId, key.timestamp, key.timestampDiscriminator)
|
||||
companion object {
|
||||
val nextDiscriminatorNumber = AtomicInteger()
|
||||
}
|
||||
@ -144,8 +142,10 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
val distributionList = metadata.distributionList as? SenderDistributionList ?: throw IllegalStateException("Expecting SenderDistributionList")
|
||||
distributionList.peersToStatesToRecord.map { (peerCordaX500Name, peerStatesToRecord) ->
|
||||
val senderDistributionRecord = DBSenderDistributionRecord(
|
||||
PersistentKey(Key(TimestampKey(senderRecordingTimestamp, timeDiscriminator), partyInfoCache.getPartyIdByCordaX500Name(peerCordaX500Name))),
|
||||
txId.toString(),
|
||||
PersistentKey(Key(txId,
|
||||
partyInfoCache.getPartyIdByCordaX500Name(peerCordaX500Name),
|
||||
senderRecordingTimestamp, timeDiscriminator)),
|
||||
distributionList.senderStatesToRecord,
|
||||
peerStatesToRecord)
|
||||
session.save(senderDistributionRecord)
|
||||
}
|
||||
@ -170,8 +170,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
val publicHeader = HashedDistributionList.PublicHeader.unauthenticatedDeserialise(distributionList.opaqueData, encryptionService)
|
||||
database.transaction {
|
||||
val receiverDistributionRecord = DBReceiverDistributionRecord(
|
||||
Key(partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp, publicHeader.timeDiscriminator),
|
||||
txId,
|
||||
Key(txId, partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp, publicHeader.timeDiscriminator),
|
||||
distributionList.opaqueData,
|
||||
distributionList.receiverStatesToRecord
|
||||
)
|
||||
@ -187,12 +186,14 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
super.removeUnnotarisedTransaction(id)
|
||||
val criteriaBuilder = session.criteriaBuilder
|
||||
val deleteSenderDistributionRecords = criteriaBuilder.createCriteriaDelete(DBSenderDistributionRecord::class.java)
|
||||
val root = deleteSenderDistributionRecords.from(DBSenderDistributionRecord::class.java)
|
||||
deleteSenderDistributionRecords.where(criteriaBuilder.equal(root.get<String>(DBSenderDistributionRecord::txId.name), id.toString()))
|
||||
val rootSender = deleteSenderDistributionRecords.from(DBSenderDistributionRecord::class.java)
|
||||
val compositeKeySender = rootSender.get<PersistentKey>("compositeKey")
|
||||
deleteSenderDistributionRecords.where(criteriaBuilder.equal(compositeKeySender.get<String>(PersistentKey::txId.name), id.toString()))
|
||||
val deletedSenderDistributionRecords = session.createQuery(deleteSenderDistributionRecords).executeUpdate() != 0
|
||||
val deleteReceiverDistributionRecords = criteriaBuilder.createCriteriaDelete(DBReceiverDistributionRecord::class.java)
|
||||
val rootReceiverDistributionRecord = deleteReceiverDistributionRecords.from(DBReceiverDistributionRecord::class.java)
|
||||
deleteReceiverDistributionRecords.where(criteriaBuilder.equal(rootReceiverDistributionRecord.get<String>(DBReceiverDistributionRecord::txId.name), id.toString()))
|
||||
val rootReceiver = deleteReceiverDistributionRecords.from(DBReceiverDistributionRecord::class.java)
|
||||
val compositeKeyReceiver = rootReceiver.get<PersistentKey>("compositeKey")
|
||||
deleteReceiverDistributionRecords.where(criteriaBuilder.equal(compositeKeyReceiver.get<String>(PersistentKey::txId.name), id.toString()))
|
||||
val deletedReceiverDistributionRecords = session.createQuery(deleteReceiverDistributionRecords).executeUpdate() != 0
|
||||
deletedSenderDistributionRecords || deletedReceiverDistributionRecords
|
||||
}
|
||||
@ -233,7 +234,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
predicates.add(criteriaBuilder.greaterThanOrEqualTo(compositeKey.get<Instant>(PersistentKey::timestamp.name), timeWindow.fromTime))
|
||||
predicates.add(criteriaBuilder.and(criteriaBuilder.lessThanOrEqualTo(compositeKey.get<Instant>(PersistentKey::timestamp.name), timeWindow.untilTime)))
|
||||
if (excludingTxnIds.isNotEmpty()) {
|
||||
predicates.add(criteriaBuilder.and(criteriaBuilder.not(txnMetadata.get<String>(DBSenderDistributionRecord::txId.name).`in`(
|
||||
predicates.add(criteriaBuilder.and(criteriaBuilder.not(compositeKey.get<String>(PersistentKey::txId.name).`in`(
|
||||
excludingTxnIds.map { it.toString() }))))
|
||||
}
|
||||
if (peers.isNotEmpty()) {
|
||||
@ -271,7 +272,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
|
||||
predicates.add(criteriaBuilder.greaterThanOrEqualTo(timestamp, timeWindow.fromTime))
|
||||
predicates.add(criteriaBuilder.and(criteriaBuilder.lessThanOrEqualTo(timestamp, timeWindow.untilTime)))
|
||||
if (excludingTxnIds.isNotEmpty()) {
|
||||
val txId = txnMetadata.get<String>(DBSenderDistributionRecord::txId.name)
|
||||
val txId = compositeKey.get<String>(PersistentKey::txId.name)
|
||||
predicates.add(criteriaBuilder.and(criteriaBuilder.not(txId.`in`(excludingTxnIds.map { it.toString() }))))
|
||||
}
|
||||
if (initiators.isNotEmpty()) {
|
||||
@ -320,7 +321,8 @@ abstract class DistributionRecord {
|
||||
data class SenderDistributionRecord(
|
||||
override val txId: SecureHash,
|
||||
val peerPartyId: SecureHash, // CordaX500Name hashCode()
|
||||
val statesToRecord: StatesToRecord,
|
||||
val senderStatesToRecord: StatesToRecord,
|
||||
val receiverStatesToRecord: StatesToRecord,
|
||||
override val timestamp: Instant
|
||||
) : DistributionRecord()
|
||||
|
||||
|
@ -24,7 +24,10 @@
|
||||
<column name="peer_party_id" type="NVARCHAR(144)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="states_to_record" type="INT">
|
||||
<column name="sender_states_to_record" type="INT">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="receiver_states_to_record" type="INT">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
</createTable>
|
||||
|
@ -114,7 +114,7 @@ class DBTransactionStorageLedgerRecoveryTests {
|
||||
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
|
||||
val results = transactionRecovery.querySenderDistributionRecords(timeWindow, excludingTxnIds = setOf(transaction1.id))
|
||||
assertEquals(1, results.size)
|
||||
assertEquals(transaction2.id.toString(), results[0].txId)
|
||||
assertEquals(transaction2.id.toString(), results[0].compositeKey.txId)
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
@ -128,7 +128,7 @@ class DBTransactionStorageLedgerRecoveryTests {
|
||||
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
|
||||
val results = transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME))
|
||||
assertEquals(1, results.size)
|
||||
assertEquals(transaction2.id.toString(), results[0].txId)
|
||||
assertEquals(transaction2.id.toString(), results[0].compositeKey.txId)
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
@ -148,7 +148,7 @@ class DBTransactionStorageLedgerRecoveryTests {
|
||||
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.SENDER).let {
|
||||
assertEquals(2, it.size)
|
||||
assertEquals(SecureHash.sha256(BOB_NAME.toString()).toString(), it.senderRecords[0].compositeKey.peerPartyId)
|
||||
assertEquals(ALL_VISIBLE, it.senderRecords[0].statesToRecord)
|
||||
assertEquals(ALL_VISIBLE, it.senderRecords[0].senderStatesToRecord)
|
||||
}
|
||||
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.RECEIVER).let {
|
||||
assertEquals(1, it.size)
|
||||
@ -181,8 +181,8 @@ class DBTransactionStorageLedgerRecoveryTests {
|
||||
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
|
||||
transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(BOB_NAME)).let {
|
||||
assertEquals(2, it.size)
|
||||
assertEquals(it[0].statesToRecord, ALL_VISIBLE)
|
||||
assertEquals(it[1].statesToRecord, ONLY_RELEVANT)
|
||||
assertEquals(it[0].senderStatesToRecord, ALL_VISIBLE)
|
||||
assertEquals(it[1].senderStatesToRecord, ONLY_RELEVANT)
|
||||
}
|
||||
assertEquals(1, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(ALICE_NAME)).size)
|
||||
assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME)).size)
|
||||
@ -251,7 +251,7 @@ class DBTransactionStorageLedgerRecoveryTests {
|
||||
assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status)
|
||||
readSenderDistributionRecordFromDB(senderTransaction.id).let {
|
||||
assertEquals(1, it.size)
|
||||
assertEquals(ALL_VISIBLE, it[0].statesToRecord)
|
||||
assertEquals(ALL_VISIBLE, it[0].senderStatesToRecord)
|
||||
assertEquals(BOB_NAME, partyInfoCache.getCordaX500NameByPartyId(it[0].peerPartyId))
|
||||
}
|
||||
|
||||
@ -280,7 +280,8 @@ class DBTransactionStorageLedgerRecoveryTests {
|
||||
assertEquals(VERIFIED, readTransactionFromDB(transaction.id).status)
|
||||
readSenderDistributionRecordFromDB(transaction.id).apply {
|
||||
assertEquals(1, this.size)
|
||||
assertEquals(ALL_VISIBLE, this[0].statesToRecord)
|
||||
assertEquals(ONLY_RELEVANT, this[0].senderStatesToRecord)
|
||||
assertEquals(ALL_VISIBLE, this[0].receiverStatesToRecord)
|
||||
}
|
||||
}
|
||||
|
||||
@ -329,7 +330,7 @@ class DBTransactionStorageLedgerRecoveryTests {
|
||||
private fun readTransactionFromDB(txId: SecureHash): DBTransactionStorage.DBTransaction {
|
||||
val fromDb = database.transaction {
|
||||
session.createQuery(
|
||||
"from ${DBTransactionStorage.DBTransaction::class.java.name} where txId = :transactionId",
|
||||
"from ${DBTransactionStorage.DBTransaction::class.java.name} where tx_id = :transactionId",
|
||||
DBTransactionStorage.DBTransaction::class.java
|
||||
).setParameter("transactionId", txId.toString()).resultList
|
||||
}
|
||||
@ -341,7 +342,7 @@ class DBTransactionStorageLedgerRecoveryTests {
|
||||
return database.transaction {
|
||||
if (txId != null)
|
||||
session.createQuery(
|
||||
"from ${DBSenderDistributionRecord::class.java.name} where txId = :transactionId",
|
||||
"from ${DBSenderDistributionRecord::class.java.name} where transaction_id = :transactionId",
|
||||
DBSenderDistributionRecord::class.java
|
||||
).setParameter("transactionId", txId.toString()).resultList.map { it.toSenderDistributionRecord() }
|
||||
else
|
||||
@ -355,7 +356,7 @@ class DBTransactionStorageLedgerRecoveryTests {
|
||||
private fun readReceiverDistributionRecordFromDB(txId: SecureHash): ReceiverDistributionRecord {
|
||||
val fromDb = database.transaction {
|
||||
session.createQuery(
|
||||
"from ${DBReceiverDistributionRecord::class.java.name} where txId = :transactionId",
|
||||
"from ${DBReceiverDistributionRecord::class.java.name} where transaction_id = :transactionId",
|
||||
DBReceiverDistributionRecord::class.java
|
||||
).setParameter("transactionId", txId.toString()).resultList
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user