ENT-9924 (Follow-up) Update recording of transaction flow recovery metadata into Send/Receive transaction flows. (#7382)

This commit is contained in:
Jose Coll 2023-06-05 16:59:06 +01:00 committed by GitHub
parent 2c775bcc41
commit f791adf442
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 182 additions and 115 deletions

View File

@ -400,13 +400,6 @@ class FinalityFlowTests : WithFinality {
assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT,
CHARLIE_NAME.hashCode().toLong() to StatesToRecord.ALL_VISIBLE), this?.peersToStatesToRecord)
}
getReceiverRecoveryData(stx.id, charlieNode.database).apply {
assertEquals(StatesToRecord.ONLY_RELEVANT, this?.statesToRecord)
assertEquals(StatesToRecord.ONLY_RELEVANT, this?.senderStatesToRecord)
// note: Charlie assertion here is using actually default StatesToRecord.ONLY_RELEVANT
assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT,
CHARLIE_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT), this?.peersToStatesToRecord)
}
// exercise the new FinalityFlow observerSessions constructor parameter
val stx3 = aliceNode.startFlowAndRunNetwork(CashPaymentWithObserversFlow(

View File

@ -62,7 +62,7 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow
val payload = otherSideSession.receive<Any>().unwrap { it }
val stx =
if (payload is SignedTransactionWithDistributionList) {
recordTransactionMetadata(payload.stx, payload.distributionList)
(serviceHub as ServiceHubCoreInternal).recordReceiverTransactionRecoveryMetadata(payload.stx.id, otherSideSession.counterparty.name, ourIdentity.name, statesToRecord, payload.distributionList)
payload.stx
} else payload as SignedTransaction
stx.pushToLoggingContext()
@ -87,21 +87,6 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow
return stx
}
@Suspendable
private fun recordTransactionMetadata(stx: SignedTransaction, distributionList: DistributionList?) {
distributionList?.let {
val txnMetadata = TransactionMetadata(otherSideSession.counterparty.name,
DistributionList(distributionList.senderStatesToRecord,
distributionList.peersToStatesToRecord.map { (peer, peerStatesToRecord) ->
if (peer == ourIdentity.name)
peer to statesToRecord // use actual value
else
peer to peerStatesToRecord // use hinted value
}.toMap()))
(serviceHub as ServiceHubCoreInternal).recordTransactionRecoveryMetadata(stx.id, txnMetadata, ourIdentity.name)
}
}
/**
* Hook to perform extra checks on the received transaction just before it's recorded. The transaction has already
* been resolved and verified at this point.

View File

@ -142,9 +142,8 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any,
val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY
val toTwoPhaseFinalityNode = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSideSession.counterparty)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY
if (txnMetadata != null && toTwoPhaseFinalityNode && useTwoPhaseFinality && payload is SignedTransaction) {
payload = SignedTransactionWithDistributionList(payload, txnMetadata.distributionList)
if (txnMetadata.persist)
(serviceHub as ServiceHubCoreInternal).recordTransactionRecoveryMetadata(payload.stx.id, txnMetadata.copy(initiator = ourIdentity.name), ourIdentity.name)
val encryptedDistributionList = (serviceHub as ServiceHubCoreInternal).recordSenderTransactionRecoveryMetadata(payload.id, txnMetadata.copy(initiator = ourIdentity.name))
payload = SignedTransactionWithDistributionList(payload, encryptedDistributionList!!)
}
// This loop will receive [FetchDataFlow.Request] continuously until the `otherSideSession` has all the data they need
@ -274,5 +273,5 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any,
@CordaSerializable
data class SignedTransactionWithDistributionList(
val stx: SignedTransaction,
val distributionList: DistributionList
val distributionList: ByteArray
)

View File

@ -65,13 +65,24 @@ interface ServiceHubCoreInternal : ServiceHub {
fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord)
/**
* Records [TransactionMetadata] for a given txnId.
* Records Sender [TransactionMetadata] for a given txnId.
*
* @param txnId The SecureHash of a transaction.
* @param txnMetadata The recovery metadata associated with a transaction.
* @param caller The CordaX500Name of the party calling this operation.
* @return encrypted distribution list (hashed peers -> StatesToRecord values).
*/
fun recordTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata, caller: CordaX500Name)
fun recordSenderTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata): ByteArray?
/**
* Records Received [TransactionMetadata] for a given txnId.
*
* @param txnId The SecureHash of a transaction.
* @param sender The sender of the transaction.
* @param receiver The receiver of the transaction.
* @param receiverStatesToRecord The StatesToRecord value of the receiver.
* @param encryptedDistributionList encrypted distribution list (hashed peers -> StatesToRecord values)
*/
fun recordReceiverTransactionRecoveryMetadata(txnId: SecureHash, sender: CordaX500Name, receiver: CordaX500Name, receiverStatesToRecord: StatesToRecord, encryptedDistributionList: ByteArray)
}
interface TransactionsResolver {

View File

@ -20,11 +20,13 @@ import net.corda.testing.node.internal.cordappWithPackages
import org.assertj.core.api.Assertions.assertThat
import org.junit.BeforeClass
import org.junit.ClassRule
import org.junit.Ignore
import org.junit.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
@Suppress("FunctionName")
@Ignore
class DeterministicContractWithSerializationWhitelistTest {
companion object {
val logger = loggerFor<DeterministicContractWithSerializationWhitelistTest>()

View File

@ -195,8 +195,11 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) =
recordTransactions(statesToRecord, txs, SIGNATURE_VERIFICATION_DISABLED)
override fun recordTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata, caller: CordaX500Name) =
validatedTransactions.addTransactionRecoveryMetadata(txnId, txnMetadata, caller)
override fun recordSenderTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata) =
validatedTransactions.addSenderTransactionRecoveryMetadata(txnId, txnMetadata)
override fun recordReceiverTransactionRecoveryMetadata(txnId: SecureHash, sender: CordaX500Name, receiver: CordaX500Name, receiverStatesToRecord: StatesToRecord, encryptedDistributionList: ByteArray) =
validatedTransactions.addReceiverTransactionRecoveryMetadata(txnId, sender, receiver, receiverStatesToRecord, encryptedDistributionList)
@Suppress("NestedBlockDepth")
@VisibleForTesting
@ -367,13 +370,24 @@ interface WritableTransactionStorage : TransactionStorage {
fun addUnnotarisedTransaction(transaction: SignedTransaction): Boolean
/**
* Record transaction recovery metadata for a given transaction id.
* Records Sender [TransactionMetadata] for a given txnId.
*
* @param id The SecureHash of the transaction to be recorded.
* @param metadata transaction recovery metadata.
* @param caller The CordaX500Name of the party calling this operation.
* @param id The SecureHash of a transaction.
* @param metadata The recovery metadata associated with a transaction.
* @return encrypted distribution list (hashed peers -> StatesToRecord values).
*/
fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name)
fun addSenderTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata): ByteArray?
/**
* Records Received [TransactionMetadata] for a given txnId.
*
* @param id The SecureHash of a transaction.
* @param sender The sender of the transaction.
* @param receiver The receiver of the transaction.
* @param receiverStatesToRecord The StatesToRecord value of the receiver.
* @param encryptedDistributionList encrypted distribution list (hashed peers -> StatesToRecord values)
*/
fun addReceiverTransactionRecoveryMetadata(id: SecureHash, sender: CordaX500Name, receiver: CordaX500Name, receiverStatesToRecord: StatesToRecord, encryptedDistributionList: ByteArray)
/**
* Removes an un-notarised transaction (with a status of *MISSING_TRANSACTION_SIG*) from the data store.

View File

@ -11,6 +11,7 @@ import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.node.StatesToRecord
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
@ -214,7 +215,9 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac
false
}
override fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name) { }
override fun addSenderTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata): ByteArray? { return null }
override fun addReceiverTransactionRecoveryMetadata(id: SecureHash, sender: CordaX500Name, receiver: CordaX500Name, receiverStatesToRecord: StatesToRecord, encryptedDistributionList: ByteArray) { }
override fun finalizeTransaction(transaction: SignedTransaction) =
addTransaction(transaction) {

View File

@ -8,17 +8,17 @@ import net.corda.core.internal.NamedCacheFactory
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.node.CordaClock
import net.corda.node.services.network.PersistentPartyInfoCache
import net.corda.nodeapi.internal.cryptoservice.CryptoService
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.serialization.internal.CordaSerializationEncoding
import org.hibernate.annotations.Immutable
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.DataInputStream
import java.io.DataOutputStream
import java.io.Serializable
import java.lang.IllegalStateException
import java.time.Instant
import java.util.concurrent.atomic.AtomicLong
import javax.persistence.Column
@ -87,37 +87,34 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
@Column(name = "sender_party_id", nullable = true)
val senderPartyId: Long,
/** Encrypted information for use by Sender (eg. partyId's of flow peers) **/
/** Encrypted recovery information for sole use by Sender **/
@Lob
@Column(name = "distribution_list", nullable = false)
val distributionList: ByteArray,
/** states to record: NONE, ALL_VISIBLE, ONLY_RELEVANT */
@Column(name = "receiver_states_to_record", nullable = false)
val receiverStatesToRecord: StatesToRecord,
/** states to record: NONE, ALL_VISIBLE, ONLY_RELEVANT */
@Column(name = "sender_states_to_record", nullable = false)
val senderStatesToRecord: StatesToRecord
val receiverStatesToRecord: StatesToRecord
) {
constructor(key: Key, txId: SecureHash, initiatorPartyId: Long, peersToStatesToRecord: Map<Long, StatesToRecord>, senderStatesToRecord: StatesToRecord, receiverStatesToRecord: StatesToRecord, cryptoService: CryptoService) :
constructor(key: Key, txId: SecureHash, initiatorPartyId: Long, encryptedDistributionList: ByteArray, receiverStatesToRecord: StatesToRecord) :
this(PersistentKey(key),
txId = txId.toString(),
senderPartyId = initiatorPartyId,
distributionList = cryptoService.encrypt(peersToStatesToRecord.serialize(context = contextToUse().withEncoding(CordaSerializationEncoding.SNAPPY)).bytes),
receiverStatesToRecord = receiverStatesToRecord,
senderStatesToRecord = senderStatesToRecord
distributionList = encryptedDistributionList,
receiverStatesToRecord = receiverStatesToRecord
)
fun toReceiverDistributionRecord(cryptoService: CryptoService) =
ReceiverDistributionRecord(
fun toReceiverDistributionRecord(cryptoService: CryptoService): ReceiverDistributionRecord {
val hashedDL = HashedDistributionList.deserialize(cryptoService.decrypt(this.distributionList))
return ReceiverDistributionRecord(
SecureHash.parse(this.txId),
this.senderPartyId,
cryptoService.decrypt(this.distributionList).deserialize(context = contextToUse()),
hashedDL.peerHashToStatesToRecord,
this.receiverStatesToRecord,
this.senderStatesToRecord,
hashedDL.senderStatesToRecord,
this.compositeKey.timestamp
)
}
}
@Entity
@ -142,10 +139,9 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
}
}
@Suppress("IMPLICIT_CAST_TO_ANY")
override fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name) {
database.transaction {
if (caller == metadata.initiator) {
override fun addSenderTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata): ByteArray {
return database.transaction {
if (metadata.persist)
metadata.distributionList.peersToStatesToRecord.map { (peer, _) ->
val senderDistributionRecord = DBSenderDistributionRecord(PersistentKey(Key(clock.instant())),
id.toString(),
@ -153,19 +149,22 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
metadata.distributionList.senderStatesToRecord)
session.save(senderDistributionRecord)
}
} else {
val receiverStatesToRecord = metadata.distributionList.peersToStatesToRecord[caller] ?: throw IllegalStateException("Missing peer $caller in distribution list of Receiver recovery metadata")
val receiverDistributionRecord =
DBReceiverDistributionRecord(Key(clock.instant()),
id,
partyInfoCache.getPartyIdByCordaX500Name(metadata.initiator),
metadata.distributionList.peersToStatesToRecord.map { (peer, statesToRecord) ->
partyInfoCache.getPartyIdByCordaX500Name(peer) to statesToRecord }.toMap(),
metadata.distributionList.senderStatesToRecord,
receiverStatesToRecord,
cryptoService)
session.save(receiverDistributionRecord)
}
val hashedPeersToStatesToRecord = metadata.distributionList.peersToStatesToRecord.map { (peer, statesToRecord) ->
partyInfoCache.getPartyIdByCordaX500Name(peer) to statesToRecord }.toMap()
val hashedDistributionList = HashedDistributionList(metadata.distributionList.senderStatesToRecord, hashedPeersToStatesToRecord)
cryptoService.encrypt(hashedDistributionList.serialize())
}
}
override fun addReceiverTransactionRecoveryMetadata(id: SecureHash, sender: CordaX500Name, receiver: CordaX500Name, receiverStatesToRecord: StatesToRecord, encryptedDistributionList: ByteArray) {
database.transaction {
val receiverDistributionRecord =
DBReceiverDistributionRecord(Key(clock.instant()),
id,
partyInfoCache.getPartyIdByCordaX500Name(sender),
encryptedDistributionList,
receiverStatesToRecord)
session.save(receiverDistributionRecord)
}
}
@ -285,7 +284,7 @@ private fun CryptoService.decrypt(bytes: ByteArray): ByteArray {
}
// TO DO: https://r3-cev.atlassian.net/browse/ENT-9876
private fun CryptoService.encrypt(bytes: ByteArray): ByteArray {
fun CryptoService.encrypt(bytes: ByteArray): ByteArray {
return bytes
}
@ -319,5 +318,42 @@ enum class DistributionRecordType {
SENDER, RECEIVER, ALL
}
@CordaSerializable
data class HashedDistributionList(
val senderStatesToRecord: StatesToRecord,
val peerHashToStatesToRecord: Map<Long, StatesToRecord>
) {
fun serialize(): ByteArray {
val baos = ByteArrayOutputStream()
val out = DataOutputStream(baos)
out.use {
out.writeByte(SERIALIZER_VERSION_ID)
out.writeByte(senderStatesToRecord.ordinal)
out.writeInt(peerHashToStatesToRecord.size)
for(entry in peerHashToStatesToRecord) {
out.writeLong(entry.key)
out.writeByte(entry.value.ordinal)
}
out.flush()
return baos.toByteArray()
}
}
companion object {
const val SERIALIZER_VERSION_ID = 1
fun deserialize(bytes: ByteArray): HashedDistributionList {
val input = DataInputStream(ByteArrayInputStream(bytes))
input.use {
assert(input.readByte().toInt() == SERIALIZER_VERSION_ID) { "Serialization version conflict." }
val senderStatesToRecord = StatesToRecord.values()[input.readByte().toInt()]
val numPeerHashToStatesToRecords = input.readInt()
val peerHashToStatesToRecord = mutableMapOf<Long, StatesToRecord>()
repeat (numPeerHashToStatesToRecords) {
peerHashToStatesToRecord[input.readLong()] = StatesToRecord.values()[input.readByte().toInt()]
}
return HashedDistributionList(senderStatesToRecord, peerHashToStatesToRecord)
}
}
}
}

View File

@ -60,9 +60,6 @@
<column name="distribution_list" type="BLOB">
<constraints nullable="false"/>
</column>
<column name="sender_states_to_record" type="INT">
<constraints nullable="false"/>
</column>
<column name="receiver_states_to_record" type="INT">
<constraints nullable="false"/>
</column>

View File

@ -31,6 +31,7 @@ import net.corda.core.internal.concurrent.map
import net.corda.core.internal.rootCause
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.Vault
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
@ -809,9 +810,15 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
return true
}
override fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name) {
override fun addSenderTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata): ByteArray? {
return database.transaction {
delegate.addSenderTransactionRecoveryMetadata(id, metadata)
}
}
override fun addReceiverTransactionRecoveryMetadata(id: SecureHash, sender: CordaX500Name, receiver: CordaX500Name, receiverStatesToRecord: StatesToRecord, encryptedDistributionList: ByteArray) {
database.transaction {
delegate.addTransactionRecoveryMetadata(id, metadata, caller)
delegate.addReceiverTransactionRecoveryMetadata(id, sender, receiver, receiverStatesToRecord, encryptedDistributionList)
}
}

View File

@ -7,12 +7,12 @@ import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.sign
import net.corda.core.flows.DistributionList
import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.RecoveryTimeWindow
import net.corda.core.flows.TransactionMetadata
import net.corda.core.node.NodeInfo
import net.corda.core.node.StatesToRecord.ALL_VISIBLE
import net.corda.core.node.StatesToRecord.ONLY_RELEVANT
import net.corda.core.node.StatesToRecord.NONE
import net.corda.core.node.StatesToRecord.ONLY_RELEVANT
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.NetworkHostAndPort
@ -84,7 +84,7 @@ class DBTransactionStorageLedgerRecoveryTests {
val beforeFirstTxn = now()
val txn = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn)
transactionRecovery.addTransactionRecoveryMetadata(txn.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))), ALICE_NAME)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))))
val timeWindow = RecoveryTimeWindow(fromTime = beforeFirstTxn,
untilTime = beforeFirstTxn.plus(1, ChronoUnit.MINUTES))
val results = transactionRecovery.querySenderDistributionRecords(timeWindow)
@ -93,7 +93,7 @@ class DBTransactionStorageLedgerRecoveryTests {
val afterFirstTxn = now()
val txn2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn2)
transactionRecovery.addTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT))), ALICE_NAME)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT))))
assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow).size)
assertEquals(1, transactionRecovery.querySenderDistributionRecords(RecoveryTimeWindow(fromTime = afterFirstTxn)).size)
}
@ -102,10 +102,10 @@ class DBTransactionStorageLedgerRecoveryTests {
fun `query local ledger for transactions within timeWindow and excluding remoteTransactionIds`() {
val transaction1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction1)
transactionRecovery.addTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))), ALICE_NAME)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))))
val transaction2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction2)
transactionRecovery.addTransactionRecoveryMetadata(transaction2.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))), ALICE_NAME)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction2.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))))
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
val results = transactionRecovery.querySenderDistributionRecords(timeWindow, excludingTxnIds = setOf(transaction1.id))
assertEquals(1, results.size)
@ -116,11 +116,12 @@ class DBTransactionStorageLedgerRecoveryTests {
val transaction1 = newTransaction()
// sender txn
transactionRecovery.addUnnotarisedTransaction(transaction1)
transactionRecovery.addTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))), ALICE_NAME)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))))
val transaction2 = newTransaction()
// receiver txn
transactionRecovery.addUnnotarisedTransaction(transaction2)
transactionRecovery.addTransactionRecoveryMetadata(transaction2.id, TransactionMetadata(BOB_NAME, DistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE))), ALICE_NAME)
transactionRecovery.addReceiverTransactionRecoveryMetadata(transaction2.id, BOB_NAME, ALICE_NAME, ALL_VISIBLE,
DistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE)).toWire())
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.SENDER).let {
assertEquals(1, it.size)
@ -140,19 +141,19 @@ class DBTransactionStorageLedgerRecoveryTests {
fun `query for sender distribution records by peers`() {
val txn1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn1)
transactionRecovery.addTransactionRecoveryMetadata(txn1.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))), ALICE_NAME)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn1.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))))
val txn2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn2)
transactionRecovery.addTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT))), ALICE_NAME)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT))))
val txn3 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn3)
transactionRecovery.addTransactionRecoveryMetadata(txn3.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT, CHARLIE_NAME to ALL_VISIBLE))), ALICE_NAME)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn3.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT, CHARLIE_NAME to ALL_VISIBLE))))
val txn4 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn4)
transactionRecovery.addTransactionRecoveryMetadata(txn4.id, TransactionMetadata(BOB_NAME, DistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ONLY_RELEVANT))), BOB_NAME)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn4.id, TransactionMetadata(BOB_NAME, DistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ONLY_RELEVANT))))
val txn5 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn5)
transactionRecovery.addTransactionRecoveryMetadata(txn5.id, TransactionMetadata(CHARLIE_NAME, DistributionList(ONLY_RELEVANT, emptyMap())), CHARLIE_NAME)
transactionRecovery.addSenderTransactionRecoveryMetadata(txn5.id, TransactionMetadata(CHARLIE_NAME, DistributionList(ONLY_RELEVANT, emptyMap())))
assertEquals(5, readSenderDistributionRecordFromDB().size)
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
@ -169,24 +170,24 @@ class DBTransactionStorageLedgerRecoveryTests {
fun `query for receiver distribution records by initiator`() {
val txn1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn1)
transactionRecovery.addTransactionRecoveryMetadata(txn1.id, TransactionMetadata(ALICE_NAME,
DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE, CHARLIE_NAME to ALL_VISIBLE))), BOB_NAME)
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn1.id, ALICE_NAME, BOB_NAME, ALL_VISIBLE,
DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE, CHARLIE_NAME to ALL_VISIBLE)).toWire())
val txn2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn2)
transactionRecovery.addTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME,
DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT))), BOB_NAME)
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn2.id, ALICE_NAME, BOB_NAME, ONLY_RELEVANT,
DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT)).toWire())
val txn3 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn3)
transactionRecovery.addTransactionRecoveryMetadata(txn3.id, TransactionMetadata(ALICE_NAME,
DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to NONE))), CHARLIE_NAME)
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn3.id, ALICE_NAME, CHARLIE_NAME, NONE,
DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to NONE)).toWire())
val txn4 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn4)
transactionRecovery.addTransactionRecoveryMetadata(txn4.id, TransactionMetadata(BOB_NAME,
DistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE))), ALICE_NAME)
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn4.id, BOB_NAME, ALICE_NAME, ONLY_RELEVANT,
DistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE)).toWire())
val txn5 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn5)
transactionRecovery.addTransactionRecoveryMetadata(txn5.id, TransactionMetadata(CHARLIE_NAME,
DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT))), BOB_NAME)
transactionRecovery.addReceiverTransactionRecoveryMetadata(txn5.id, CHARLIE_NAME, BOB_NAME, ONLY_RELEVANT,
DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT)).toWire())
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(ALICE_NAME)).let {
@ -204,7 +205,7 @@ class DBTransactionStorageLedgerRecoveryTests {
fun `transaction without peers does not store recovery metadata in database`() {
val senderTransaction = newTransaction()
transactionRecovery.addUnnotarisedTransaction(senderTransaction)
transactionRecovery.addTransactionRecoveryMetadata(senderTransaction.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, emptyMap())), ALICE_NAME)
transactionRecovery.addSenderTransactionRecoveryMetadata(senderTransaction.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, emptyMap())))
assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status)
assertEquals(0, readSenderDistributionRecordFromDB(senderTransaction.id).size)
}
@ -213,8 +214,8 @@ class DBTransactionStorageLedgerRecoveryTests {
fun `create un-notarised transaction with flow metadata and validate status in db`() {
val senderTransaction = newTransaction()
transactionRecovery.addUnnotarisedTransaction(senderTransaction)
transactionRecovery.addTransactionRecoveryMetadata(senderTransaction.id,
TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))), ALICE_NAME)
transactionRecovery.addSenderTransactionRecoveryMetadata(senderTransaction.id,
TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))))
assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status)
readSenderDistributionRecordFromDB(senderTransaction.id).let {
assertEquals(1, it.size)
@ -224,8 +225,8 @@ class DBTransactionStorageLedgerRecoveryTests {
val receiverTransaction = newTransaction()
transactionRecovery.addUnnotarisedTransaction(receiverTransaction)
transactionRecovery.addTransactionRecoveryMetadata(receiverTransaction.id,
TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE))), BOB_NAME)
transactionRecovery.addReceiverTransactionRecoveryMetadata(receiverTransaction.id, ALICE_NAME, BOB_NAME, ALL_VISIBLE,
DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE)).toWire())
assertEquals(IN_FLIGHT, readTransactionFromDB(receiverTransaction.id).status)
readReceiverDistributionRecordFromDB(receiverTransaction.id).let {
assertEquals(ALL_VISIBLE, it.statesToRecord)
@ -239,8 +240,8 @@ class DBTransactionStorageLedgerRecoveryTests {
fun `finalize transaction with recovery metadata`() {
val transaction = newTransaction(notarySig = false)
transactionRecovery.finalizeTransaction(transaction)
transactionRecovery.addTransactionRecoveryMetadata(transaction.id,
TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ALL_VISIBLE))), ALICE_NAME)
transactionRecovery.addSenderTransactionRecoveryMetadata(transaction.id,
TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ALL_VISIBLE))))
assertEquals(VERIFIED, readTransactionFromDB(transaction.id).status)
readSenderDistributionRecordFromDB(transaction.id).apply {
assertEquals(1, this.size)
@ -252,8 +253,8 @@ class DBTransactionStorageLedgerRecoveryTests {
fun `remove un-notarised transaction and associated recovery metadata`() {
val senderTransaction = newTransaction(notarySig = false)
transactionRecovery.addUnnotarisedTransaction(senderTransaction)
transactionRecovery.addTransactionRecoveryMetadata(senderTransaction.id, TransactionMetadata(ALICE.name,
DistributionList(ONLY_RELEVANT, mapOf(BOB.name to ONLY_RELEVANT, CHARLIE_NAME to ONLY_RELEVANT))), BOB.name)
transactionRecovery.addReceiverTransactionRecoveryMetadata(senderTransaction.id, ALICE.name, BOB.name, ONLY_RELEVANT,
DistributionList(ONLY_RELEVANT, mapOf(BOB.name to ONLY_RELEVANT, CHARLIE_NAME to ONLY_RELEVANT)).toWire())
assertNull(transactionRecovery.getTransaction(senderTransaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status)
@ -264,8 +265,8 @@ class DBTransactionStorageLedgerRecoveryTests {
val receiverTransaction = newTransaction(notarySig = false)
transactionRecovery.addUnnotarisedTransaction(receiverTransaction)
transactionRecovery.addTransactionRecoveryMetadata(receiverTransaction.id, TransactionMetadata(ALICE.name,
DistributionList(ONLY_RELEVANT, mapOf(BOB.name to ONLY_RELEVANT))), ALICE.name)
transactionRecovery.addReceiverTransactionRecoveryMetadata(receiverTransaction.id, ALICE.name, BOB.name, ONLY_RELEVANT,
DistributionList(ONLY_RELEVANT, mapOf(BOB.name to ONLY_RELEVANT)).toWire())
assertNull(transactionRecovery.getTransaction(receiverTransaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(receiverTransaction.id).status)
@ -275,6 +276,13 @@ class DBTransactionStorageLedgerRecoveryTests {
assertNull(transactionRecovery.getTransactionInternal(receiverTransaction.id))
}
@Test(timeout = 300_000)
fun `test lightweight serialization and deserialization of hashed distribution list payload`() {
val dl = HashedDistributionList(ALL_VISIBLE,
mapOf(BOB.name.hashCode().toLong() to NONE, CHARLIE_NAME.hashCode().toLong() to ONLY_RELEVANT))
assertEquals(dl, dl.serialize().let { HashedDistributionList.deserialize(it) })
}
private fun readTransactionFromDB(id: SecureHash): DBTransactionStorage.DBTransaction {
val fromDb = database.transaction {
session.createQuery(
@ -361,4 +369,11 @@ class DBTransactionStorageLedgerRecoveryTests {
private fun notarySig(txId: SecureHash) =
DUMMY_NOTARY.keyPair.sign(SignableData(txId, SignatureMetadata(1, Crypto.findSignatureScheme(DUMMY_NOTARY.publicKey).schemeNumberID)))
private fun DistributionList.toWire(cryptoService: CryptoService = MockCryptoService(emptyMap())): ByteArray {
val hashedPeersToStatesToRecord = this.peersToStatesToRecord.map { (peer, statesToRecord) ->
partyInfoCache.getPartyIdByCordaX500Name(peer) to statesToRecord }.toMap()
val hashedDistributionList = HashedDistributionList(this.senderStatesToRecord, hashedPeersToStatesToRecord)
return cryptoService.encrypt(hashedDistributionList.serialize())
}
}

View File

@ -12,6 +12,7 @@ import net.corda.node.services.api.WritableTransactionStorage
import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.TransactionStatus
import net.corda.core.identity.CordaX500Name
import net.corda.core.node.StatesToRecord
import net.corda.testing.node.MockServices
import rx.Observable
import rx.subjects.PublishSubject
@ -60,7 +61,9 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali
return txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.IN_FLIGHT)) == null
}
override fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name) { }
override fun addSenderTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata): ByteArray? { return null }
override fun addReceiverTransactionRecoveryMetadata(id: SecureHash, sender: CordaX500Name, receiver: CordaX500Name, receiverStatesToRecord: StatesToRecord, encryptedDistributionList: ByteArray) { }
override fun removeUnnotarisedTransaction(id: SecureHash): Boolean {
return txns.remove(id) != null

View File

@ -148,7 +148,9 @@ data class TestTransactionDSLInterpreter private constructor(
override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord) {}
override fun recordTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata, caller: CordaX500Name) {}
override fun recordSenderTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata): ByteArray? { return null }
override fun recordReceiverTransactionRecoveryMetadata(txnId: SecureHash, sender: CordaX500Name, receiver: CordaX500Name, receiverStatesToRecord: StatesToRecord, encryptedDistributionList: ByteArray) {}
}
private fun copy(): TestTransactionDSLInterpreter =