Incorporating PR review feedback.

This commit is contained in:
Jose Coll 2023-08-18 17:22:42 +01:00
parent 06e43eb9e2
commit 4a6e99556b
11 changed files with 77 additions and 66 deletions

View File

@ -358,7 +358,7 @@ class FinalityFlowTests : WithFinality {
assertEquals(BOB_NAME.hashCode().toLong(), this[0].peerPartyId)
}
getReceiverRecoveryData(stx.id, bobNode, aliceNode).apply {
assertEquals(StatesToRecord.ALL_VISIBLE, this?.statesToRecord)
assertEquals(StatesToRecord.ONLY_RELEVANT, this?.statesToRecord)
assertEquals(aliceNode.info.singleIdentity().name.hashCode().toLong(), this?.initiatorPartyId)
assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ALL_VISIBLE), this?.peersToStatesToRecord)
}

View File

@ -86,7 +86,8 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow
open fun resolvePayload(payload: Any): SignedTransaction {
return if (payload is SignedTransactionWithDistributionList) {
if (checkSufficientSignatures || deferredAck) {
(serviceHub as ServiceHubCoreInternal).recordReceiverTransactionRecoveryMetadata(payload.stx.id, otherSideSession.counterparty.name, ourIdentity.name, statesToRecord, payload.distributionList)
(serviceHub as ServiceHubCoreInternal).recordReceiverTransactionRecoveryMetadata(payload.stx.id, otherSideSession.counterparty.name,
TransactionMetadata(otherSideSession.counterparty.name, DistributionList.ReceiverDistributionList(payload.distributionList, statesToRecord)))
payload.stx
} else payload.stx
} else payload as SignedTransaction

View File

@ -76,15 +76,11 @@ interface ServiceHubCoreInternal : ServiceHub {
*
* @param txnId The SecureHash of a transaction.
* @param sender The sender of the transaction.
* @param receiver The receiver of the transaction.
* @param receiverStatesToRecord The StatesToRecord value of the receiver.
* @param encryptedDistributionList encrypted distribution list (hashed peers -> StatesToRecord values)
* @param txnMetadata The recovery metadata associated with a transaction.
*/
fun recordReceiverTransactionRecoveryMetadata(txnId: SecureHash,
sender: CordaX500Name,
receiver: CordaX500Name,
receiverStatesToRecord: StatesToRecord,
encryptedDistributionList: ByteArray)
txnMetadata: TransactionMetadata)
}
interface TransactionsResolver {

View File

@ -198,8 +198,8 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
override fun recordSenderTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata) =
validatedTransactions.addSenderTransactionRecoveryMetadata(txnId, txnMetadata)
override fun recordReceiverTransactionRecoveryMetadata(txnId: SecureHash, sender: CordaX500Name, receiver: CordaX500Name, receiverStatesToRecord: StatesToRecord, encryptedDistributionList: ByteArray) =
validatedTransactions.addReceiverTransactionRecoveryMetadata(txnId, sender, receiver, receiverStatesToRecord, encryptedDistributionList)
override fun recordReceiverTransactionRecoveryMetadata(txnId: SecureHash, sender: CordaX500Name, txnMetadata: TransactionMetadata) =
validatedTransactions.addReceiverTransactionRecoveryMetadata(txnId, sender, txnMetadata)
@Suppress("NestedBlockDepth")
@VisibleForTesting
@ -383,15 +383,11 @@ interface WritableTransactionStorage : TransactionStorage {
*
* @param txId The SecureHash of a transaction.
* @param sender The sender of the transaction.
* @param receiver The receiver of the transaction.
* @param receiverStatesToRecord The StatesToRecord value of the receiver.
* @param encryptedDistributionList encrypted distribution list (hashed peers -> StatesToRecord values)
* @param metadata The recovery metadata associated with a transaction.
*/
fun addReceiverTransactionRecoveryMetadata(txId: SecureHash,
sender: CordaX500Name,
receiver: CordaX500Name,
receiverStatesToRecord: StatesToRecord,
encryptedDistributionList: ByteArray)
metadata: TransactionMetadata)
/**
* Removes an un-notarised transaction (with a status of *MISSING_TRANSACTION_SIG*) from the data store.

View File

@ -219,9 +219,8 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac
override fun addReceiverTransactionRecoveryMetadata(txId: SecureHash,
sender: CordaX500Name,
receiver: CordaX500Name,
receiverStatesToRecord: StatesToRecord,
encryptedDistributionList: ByteArray) { }
metadata: TransactionMetadata
) { }
override fun finalizeTransaction(transaction: SignedTransaction) =
addTransaction(transaction) {

View File

@ -1,11 +1,13 @@
package net.corda.node.services.persistence
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.DistributionList.ReceiverDistributionList
import net.corda.core.flows.DistributionList.SenderDistributionList
import net.corda.core.flows.RecoveryTimeWindow
import net.corda.core.flows.TransactionMetadata
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.VisibleForTesting
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.CordaSerializable
@ -26,7 +28,6 @@ import javax.persistence.Id
import javax.persistence.Lob
import javax.persistence.Table
import javax.persistence.criteria.Predicate
import kotlin.streams.toList
class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
cacheFactory: NamedCacheFactory,
@ -98,7 +99,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
distributionList = encryptedDistributionList,
receiverStatesToRecord = receiverStatesToRecord
)
@VisibleForTesting
fun toReceiverDistributionRecord(encryptionService: EncryptionService): ReceiverDistributionRecord {
val hashedDL = HashedDistributionList.decrypt(this.distributionList, encryptionService)
return ReceiverDistributionRecord(
@ -163,18 +164,22 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
override fun addReceiverTransactionRecoveryMetadata(txId: SecureHash,
sender: CordaX500Name,
receiver: CordaX500Name,
receiverStatesToRecord: StatesToRecord,
encryptedDistributionList: ByteArray) {
val publicHeader = HashedDistributionList.PublicHeader.unauthenticatedDeserialise(encryptedDistributionList, encryptionService)
database.transaction {
val receiverDistributionRecord = DBReceiverDistributionRecord(
Key(partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp),
txId,
encryptedDistributionList,
receiverStatesToRecord
)
session.save(receiverDistributionRecord)
metadata: TransactionMetadata) {
when (metadata.distributionList) {
is ReceiverDistributionList -> {
val distributionList = metadata.distributionList as ReceiverDistributionList
val publicHeader = HashedDistributionList.PublicHeader.unauthenticatedDeserialise(distributionList.opaqueData, encryptionService)
database.transaction {
val receiverDistributionRecord = DBReceiverDistributionRecord(
Key(partyInfoCache.getPartyIdByCordaX500Name(sender), publicHeader.senderRecordedTimestamp),
txId,
distributionList.opaqueData,
distributionList.receiverStatesToRecord
)
session.save(receiverDistributionRecord)
}
}
else -> throw IllegalStateException("Expecting ReceiverDistributionList")
}
}
@ -247,7 +252,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
}
criteriaQuery.orderBy(orderCriteria)
}
session.createQuery(criteriaQuery).stream().toList()
session.createQuery(criteriaQuery).resultList
}
}
@ -284,7 +289,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
}
criteriaQuery.orderBy(orderCriteria)
}
session.createQuery(criteriaQuery).stream().toList()
session.createQuery(criteriaQuery).resultList
}
}
}

View File

@ -818,11 +818,9 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
override fun addReceiverTransactionRecoveryMetadata(txId: SecureHash,
sender: CordaX500Name,
receiver: CordaX500Name,
receiverStatesToRecord: StatesToRecord,
encryptedDistributionList: ByteArray) {
metadata: TransactionMetadata) {
database.transaction {
delegate.addReceiverTransactionRecoveryMetadata(txId, sender, receiver, receiverStatesToRecord, encryptedDistributionList)
delegate.addReceiverTransactionRecoveryMetadata(txId, sender, metadata)
}
}

View File

@ -6,6 +6,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.sign
import net.corda.core.flows.DistributionList.ReceiverDistributionList
import net.corda.core.flows.DistributionList.SenderDistributionList
import net.corda.core.flows.RecoveryTimeWindow
import net.corda.core.flows.TransactionMetadata
@ -137,11 +138,13 @@ class DBTransactionStorageLedgerRecoveryTests {
val transaction2 = newTransaction()
// receiver txn
transactionRecovery.addUnnotarisedTransaction(transaction2)
transactionRecovery.addReceiverTransactionRecoveryMetadata(transaction2.id, BOB_NAME, ALICE_NAME, ALL_VISIBLE,
SenderDistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE)).toWire())
val encryptedDL = transactionRecovery.addSenderTransactionRecoveryMetadata(transaction2.id,
TransactionMetadata(BOB_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(transaction2.id, BOB_NAME,
TransactionMetadata(BOB_NAME, ReceiverDistributionList(encryptedDL, ALL_VISIBLE)))
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.SENDER).let {
assertEquals(1, it.size)
assertEquals(2, it.size)
assertEquals(BOB_NAME.hashCode().toLong(), it.senderRecords[0].compositeKey.peerPartyId)
assertEquals(ALL_VISIBLE, it.senderRecords[0].statesToRecord)
}
@ -151,7 +154,7 @@ class DBTransactionStorageLedgerRecoveryTests {
assertEquals(ALL_VISIBLE, (HashedDistributionList.decrypt(it.receiverRecords[0].distributionList, encryptionService)).peerHashToStatesToRecord.map { it.value }[0])
}
val resultsAll = transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.ALL)
assertEquals(2, resultsAll.size)
assertEquals(3, resultsAll.size)
}
@Test(timeout = 300_000)
@ -187,24 +190,34 @@ class DBTransactionStorageLedgerRecoveryTests {
fun `query for receiver distribution records by initiator`() {
val txn1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn1)
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn1.id, ALICE_NAME, BOB_NAME, ALL_VISIBLE,
SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE, CHARLIE_NAME to ALL_VISIBLE)).toWire())
val encryptedDL1 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn1.id,
TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE, CHARLIE_NAME to ALL_VISIBLE))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn1.id, ALICE_NAME,
TransactionMetadata(ALICE_NAME, ReceiverDistributionList(encryptedDL1, ALL_VISIBLE)))
val txn2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn2)
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn2.id, ALICE_NAME, BOB_NAME, ONLY_RELEVANT,
SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT)).toWire())
val encryptedDL2 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn2.id,
TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn2.id, ALICE_NAME,
TransactionMetadata(ALICE_NAME, ReceiverDistributionList(encryptedDL2, ONLY_RELEVANT)))
val txn3 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn3)
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn3.id, ALICE_NAME, CHARLIE_NAME, NONE,
SenderDistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to NONE)).toWire())
val encryptedDL3 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn3.id,
TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to NONE))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn3.id, ALICE_NAME,
TransactionMetadata(ALICE_NAME, ReceiverDistributionList(encryptedDL3, NONE)))
val txn4 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn4)
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn4.id, BOB_NAME, ALICE_NAME, ONLY_RELEVANT,
SenderDistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE)).toWire())
val encryptedDL4 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn4.id,
TransactionMetadata(BOB_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn4.id, BOB_NAME,
TransactionMetadata(BOB_NAME, ReceiverDistributionList(encryptedDL4, ALL_VISIBLE)))
val txn5 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn5)
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn5.id, CHARLIE_NAME, BOB_NAME, ONLY_RELEVANT,
SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT)).toWire())
val encryptedDL5 = transactionRecovery.addSenderTransactionRecoveryMetadata(txn5.id,
TransactionMetadata(CHARLIE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn5.id, CHARLIE_NAME,
TransactionMetadata(CHARLIE_NAME, ReceiverDistributionList(encryptedDL5, ONLY_RELEVANT)))
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(ALICE_NAME)).let {
@ -242,8 +255,10 @@ class DBTransactionStorageLedgerRecoveryTests {
val receiverTransaction = newTransaction()
transactionRecovery.addUnnotarisedTransaction(receiverTransaction)
transactionRecovery.addReceiverTransactionRecoveryMetadata(receiverTransaction.id, ALICE_NAME, BOB_NAME, ALL_VISIBLE,
SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE)).toWire())
val encryptedDL = transactionRecovery.addSenderTransactionRecoveryMetadata(receiverTransaction.id,
TransactionMetadata(ALICE_NAME, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(receiverTransaction.id, ALICE_NAME,
TransactionMetadata(ALICE_NAME, ReceiverDistributionList(encryptedDL, ALL_VISIBLE)))
assertEquals(IN_FLIGHT, readTransactionFromDB(receiverTransaction.id).status)
readReceiverDistributionRecordFromDB(receiverTransaction.id).let {
assertEquals(ONLY_RELEVANT, it.statesToRecord)
@ -270,8 +285,10 @@ class DBTransactionStorageLedgerRecoveryTests {
fun `remove un-notarised transaction and associated recovery metadata`() {
val senderTransaction = newTransaction(notarySig = false)
transactionRecovery.addUnnotarisedTransaction(senderTransaction)
transactionRecovery.addReceiverTransactionRecoveryMetadata(senderTransaction.id, ALICE.name, BOB.name, ONLY_RELEVANT,
SenderDistributionList(ONLY_RELEVANT, mapOf(BOB.name to ONLY_RELEVANT, CHARLIE_NAME to ONLY_RELEVANT)).toWire())
val encryptedDL1 = transactionRecovery.addSenderTransactionRecoveryMetadata(senderTransaction.id,
TransactionMetadata(ALICE.name, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB.name to ONLY_RELEVANT, CHARLIE_NAME to ONLY_RELEVANT))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(senderTransaction.id, BOB.name,
TransactionMetadata(ALICE.name, ReceiverDistributionList(encryptedDL1, ONLY_RELEVANT)))
assertNull(transactionRecovery.getTransaction(senderTransaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status)
@ -282,8 +299,10 @@ class DBTransactionStorageLedgerRecoveryTests {
val receiverTransaction = newTransaction(notarySig = false)
transactionRecovery.addUnnotarisedTransaction(receiverTransaction)
transactionRecovery.addReceiverTransactionRecoveryMetadata(receiverTransaction.id, ALICE.name, BOB.name, ONLY_RELEVANT,
SenderDistributionList(ONLY_RELEVANT, mapOf(BOB.name to ONLY_RELEVANT)).toWire())
val encryptedDL2 = transactionRecovery.addSenderTransactionRecoveryMetadata(receiverTransaction.id,
TransactionMetadata(ALICE.name, SenderDistributionList(ONLY_RELEVANT, mapOf(BOB.name to ONLY_RELEVANT))))
transactionRecovery.addReceiverTransactionRecoveryMetadata(receiverTransaction.id, BOB.name,
TransactionMetadata(ALICE.name, ReceiverDistributionList(encryptedDL2, ONLY_RELEVANT)))
assertNull(transactionRecovery.getTransaction(receiverTransaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(receiverTransaction.id).status)

View File

@ -42,9 +42,9 @@ class CashIssueWithObserversFlow(private val amount: Amount<Currency>,
}
@Suspendable
private fun finalise(tx: SignedTransaction, sessions: Collection<FlowSession>, message: String): SignedTransaction {
private fun finalise(tx: SignedTransaction, observerSessions: Collection<FlowSession>, message: String): SignedTransaction {
try {
return subFlow(FinalityFlow(tx, sessions))
return subFlow(FinalityFlow(tx, sessions = emptySet(), observerSessions = observerSessions))
} catch (e: NotaryException) {
throw CashException(message, e)
}

View File

@ -12,7 +12,6 @@ import net.corda.node.services.api.WritableTransactionStorage
import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.TransactionStatus
import net.corda.core.identity.CordaX500Name
import net.corda.core.node.StatesToRecord
import net.corda.testing.node.MockServices
import rx.Observable
import rx.subjects.PublishSubject
@ -65,9 +64,7 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali
override fun addReceiverTransactionRecoveryMetadata(txId: SecureHash,
sender: CordaX500Name,
receiver: CordaX500Name,
receiverStatesToRecord: StatesToRecord,
encryptedDistributionList: ByteArray) { }
metadata: TransactionMetadata) { }
override fun removeUnnotarisedTransaction(id: SecureHash): Boolean {
return txns.remove(id) != null

View File

@ -150,7 +150,7 @@ data class TestTransactionDSLInterpreter private constructor(
override fun recordSenderTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata): ByteArray? { return null }
override fun recordReceiverTransactionRecoveryMetadata(txnId: SecureHash, sender: CordaX500Name, receiver: CordaX500Name, receiverStatesToRecord: StatesToRecord, encryptedDistributionList: ByteArray) {}
override fun recordReceiverTransactionRecoveryMetadata(txnId: SecureHash, sender: CordaX500Name, txnMetadata: TransactionMetadata) {}
}
private fun copy(): TestTransactionDSLInterpreter =