mirror of
https://github.com/corda/corda.git
synced 2024-12-21 22:07:55 +00:00
[CORDA-1395] [CORDA-1378]: Control the max number of transaction dependencies. (#3047)
This commit is contained in:
parent
dc66c961cb
commit
d7ef385cc7
@ -85,6 +85,7 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
|
||||
for (hash in toFetch) {
|
||||
// We skip the validation here (with unwrap { it }) because we will do it below in validateFetchResponse.
|
||||
// The only thing checked is the object type. It is a protocol violation to send results out of order.
|
||||
// TODO We need to page here after large messages will work.
|
||||
maybeItems += otherSideSession.sendAndReceive<List<W>>(Request.Data(NonEmptySet.of(hash), dataType)).unwrap { it }
|
||||
}
|
||||
// Check for a buggy/malicious peer answering with something that we didn't ask for.
|
||||
|
@ -12,6 +12,8 @@ import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.exactAdd
|
||||
import java.util.*
|
||||
import kotlin.collections.ArrayList
|
||||
import kotlin.math.min
|
||||
|
||||
// TODO: This code is currently unit tested by TwoPartyTradeFlowTests, it should have its own tests.
|
||||
/**
|
||||
@ -20,8 +22,12 @@ import java.util.*
|
||||
*
|
||||
* @return a list of verified [SignedTransaction] objects, in a depth-first order.
|
||||
*/
|
||||
class ResolveTransactionsFlow(private val txHashes: Set<SecureHash>,
|
||||
private val otherSide: FlowSession) : FlowLogic<List<SignedTransaction>>() {
|
||||
class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>,
|
||||
private val otherSide: FlowSession) : FlowLogic<Unit>() {
|
||||
|
||||
// Need it ordered in terms of iteration. Needs to be a variable for the check-pointing logic to work.
|
||||
private val txHashes = txHashesArg.toList()
|
||||
|
||||
/**
|
||||
* Resolves and validates the dependencies of the specified [SignedTransaction]. Fetches the attachments, but does
|
||||
* *not* validate or store the [SignedTransaction] itself.
|
||||
@ -35,6 +41,8 @@ class ResolveTransactionsFlow(private val txHashes: Set<SecureHash>,
|
||||
companion object {
|
||||
private fun dependencyIDs(stx: SignedTransaction) = stx.inputs.map { it.txhash }.toSet()
|
||||
|
||||
private const val RESOLUTION_PAGE_SIZE = 100
|
||||
|
||||
/**
|
||||
* Topologically sorts the given transactions such that dependencies are listed before dependers. */
|
||||
@JvmStatic
|
||||
@ -83,10 +91,16 @@ class ResolveTransactionsFlow(private val txHashes: Set<SecureHash>,
|
||||
|
||||
@Suspendable
|
||||
@Throws(FetchDataFlow.HashNotFound::class)
|
||||
override fun call(): List<SignedTransaction> {
|
||||
override fun call() {
|
||||
val newTxns = ArrayList<SignedTransaction>(txHashes.size)
|
||||
// Start fetching data.
|
||||
val newTxns = downloadDependencies(txHashes)
|
||||
fetchMissingAttachments(signedTransaction?.let { newTxns + it } ?: newTxns)
|
||||
for (pageNumber in 0..(txHashes.size - 1) / RESOLUTION_PAGE_SIZE) {
|
||||
val page = page(pageNumber, RESOLUTION_PAGE_SIZE)
|
||||
|
||||
newTxns += downloadDependencies(page)
|
||||
val txsWithMissingAttachments = if (pageNumber == 0) signedTransaction?.let { newTxns + it } ?: newTxns else newTxns
|
||||
fetchMissingAttachments(txsWithMissingAttachments)
|
||||
}
|
||||
otherSide.send(FetchDataFlow.Request.End)
|
||||
// Finish fetching data.
|
||||
|
||||
@ -99,13 +113,17 @@ class ResolveTransactionsFlow(private val txHashes: Set<SecureHash>,
|
||||
it.verify(serviceHub)
|
||||
serviceHub.recordTransactions(StatesToRecord.NONE, listOf(it))
|
||||
}
|
||||
}
|
||||
|
||||
return signedTransaction?.let {
|
||||
result + it
|
||||
} ?: result
|
||||
private fun page(pageNumber: Int, pageSize: Int): Set<SecureHash> {
|
||||
val offset = pageNumber * pageSize
|
||||
val limit = min(offset + pageSize, txHashes.size)
|
||||
// call toSet() is needed because sub-lists are not checkpoint-friendly.
|
||||
return txHashes.subList(offset, limit).toSet()
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
// TODO use paging here (we literally get the entire dependencies graph in memory)
|
||||
private fun downloadDependencies(depsToCheck: Set<SecureHash>): List<SignedTransaction> {
|
||||
// Maintain a work queue of all hashes to load/download, initialised with our starting set. Then do a breadth
|
||||
// first traversal across the dependency graph.
|
||||
@ -132,13 +150,14 @@ class ResolveTransactionsFlow(private val txHashes: Set<SecureHash>,
|
||||
while (nextRequests.isNotEmpty()) {
|
||||
// Don't re-download the same tx when we haven't verified it yet but it's referenced multiple times in the
|
||||
// graph we're traversing.
|
||||
val notAlreadyFetched = nextRequests.filterNot { it in resultQ }.toSet()
|
||||
val notAlreadyFetched: Set<SecureHash> = nextRequests - resultQ.keys
|
||||
nextRequests.clear()
|
||||
|
||||
if (notAlreadyFetched.isEmpty()) // Done early.
|
||||
break
|
||||
|
||||
// Request the standalone transaction data (which may refer to things we don't yet have).
|
||||
// TODO use paging here
|
||||
val downloads: List<SignedTransaction> = subFlow(FetchTransactionsFlow(notAlreadyFetched, otherSide)).downloaded
|
||||
|
||||
for (stx in downloads)
|
||||
|
@ -41,6 +41,7 @@ data class ContractUpgradeWireTransaction(
|
||||
|
||||
init {
|
||||
check(inputs.isNotEmpty()) { "A contract upgrade transaction must have inputs" }
|
||||
checkBaseInvariants()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -45,6 +45,7 @@ data class NotaryChangeWireTransaction(
|
||||
init {
|
||||
check(inputs.isNotEmpty()) { "A notary change transaction must have inputs" }
|
||||
check(notary != newNotary) { "The old and new notaries must be different – $newNotary" }
|
||||
checkBaseInvariants()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -58,8 +58,7 @@ class ResolveTransactionsFlowTest {
|
||||
val p = TestFlow(setOf(stx2.id), megaCorp)
|
||||
val future = miniCorpNode.startFlow(p)
|
||||
mockNet.runNetwork()
|
||||
val results = future.getOrThrow()
|
||||
assertEquals(listOf(stx1.id, stx2.id), results.map { it.id })
|
||||
future.getOrThrow()
|
||||
miniCorpNode.transaction {
|
||||
assertEquals(stx1, miniCorpNode.services.validatedTransactions.getTransaction(stx1.id))
|
||||
assertEquals(stx2, miniCorpNode.services.validatedTransactions.getTransaction(stx2.id))
|
||||
@ -189,16 +188,16 @@ class ResolveTransactionsFlowTest {
|
||||
// DOCEND 2
|
||||
|
||||
@InitiatingFlow
|
||||
private class TestFlow(val otherSide: Party, private val resolveTransactionsFlowFactory: (FlowSession) -> ResolveTransactionsFlow, private val txCountLimit: Int? = null) : FlowLogic<List<SignedTransaction>>() {
|
||||
private class TestFlow(val otherSide: Party, private val resolveTransactionsFlowFactory: (FlowSession) -> ResolveTransactionsFlow, private val txCountLimit: Int? = null) : FlowLogic<Unit>() {
|
||||
constructor(txHashes: Set<SecureHash>, otherSide: Party, txCountLimit: Int? = null) : this(otherSide, { ResolveTransactionsFlow(txHashes, it) }, txCountLimit = txCountLimit)
|
||||
constructor(stx: SignedTransaction, otherSide: Party) : this(otherSide, { ResolveTransactionsFlow(stx, it) })
|
||||
|
||||
@Suspendable
|
||||
override fun call(): List<SignedTransaction> {
|
||||
override fun call() {
|
||||
val session = initiateFlow(otherSide)
|
||||
val resolveTransactionsFlow = resolveTransactionsFlowFactory(session)
|
||||
txCountLimit?.let { resolveTransactionsFlow.transactionCountLimit = it }
|
||||
return subFlow(resolveTransactionsFlow)
|
||||
subFlow(resolveTransactionsFlow)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@ release, see :doc:`upgrade-notes`.
|
||||
|
||||
Unreleased
|
||||
==========
|
||||
* Fixed an error thrown by NodeVaultService upon recording a transaction with a number of inputs greater than the default page size.
|
||||
|
||||
* Fixed incorrect computation of ``totalStates`` from ``otherResults`` in ``NodeVaultService``.
|
||||
|
||||
|
@ -188,9 +188,18 @@ class NodeVaultService(
|
||||
}
|
||||
|
||||
private fun loadStates(refs: Collection<StateRef>): Collection<StateAndRef<ContractState>> {
|
||||
return if (refs.isNotEmpty())
|
||||
queryBy<ContractState>(QueryCriteria.VaultQueryCriteria(stateRefs = refs.toList())).states
|
||||
else emptySet()
|
||||
val states = mutableListOf<StateAndRef<ContractState>>()
|
||||
if (refs.isNotEmpty()) {
|
||||
val refsList = refs.toList()
|
||||
val pageSize = PageSpecification().pageSize
|
||||
(0..(refsList.size - 1) / pageSize).forEach {
|
||||
val offset = it * pageSize
|
||||
val limit = minOf(offset + pageSize, refsList.size)
|
||||
val page = queryBy<ContractState>(QueryCriteria.VaultQueryCriteria(stateRefs = refsList.subList(offset, limit))).states
|
||||
states.addAll(page)
|
||||
}
|
||||
}
|
||||
return states
|
||||
}
|
||||
|
||||
private fun processAndNotify(updates: List<Vault.Update<ContractState>>) {
|
||||
@ -507,4 +516,4 @@ class NodeVaultService(
|
||||
}
|
||||
return myInterfaces
|
||||
}
|
||||
}
|
||||
}
|
@ -63,7 +63,8 @@ class NotaryServiceTests {
|
||||
}
|
||||
|
||||
private fun generateTransaction(node: StartedNode<InternalMockNetwork.MockNode>, party: Party, notary: Party): SignedTransaction {
|
||||
val inputs = (1..10_005).map { StateRef(SecureHash.randomSHA256(), 0) }
|
||||
val txHash = SecureHash.randomSHA256()
|
||||
val inputs = (1..10_005).map { StateRef(txHash, it) }
|
||||
val tx = NotaryChangeTransactionBuilder(inputs, notary, party).build()
|
||||
|
||||
return node.services.run {
|
||||
|
@ -1,18 +1,58 @@
|
||||
package net.corda.node.services.vault
|
||||
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.contracts.Amount
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.FungibleAsset
|
||||
import net.corda.core.contracts.LinearState
|
||||
import net.corda.core.contracts.PartyAndReference
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignatureMetadata
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.packageName
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.node.services.vault.*
|
||||
import net.corda.core.node.services.vault.QueryCriteria.*
|
||||
import net.corda.core.node.services.IdentityService
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.VaultQueryException
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.node.services.queryBy
|
||||
import net.corda.core.node.services.trackBy
|
||||
import net.corda.core.node.services.vault.BinaryComparisonOperator
|
||||
import net.corda.core.node.services.vault.ColumnPredicate
|
||||
import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM
|
||||
import net.corda.core.node.services.vault.DEFAULT_PAGE_SIZE
|
||||
import net.corda.core.node.services.vault.MAX_PAGE_SIZE
|
||||
import net.corda.core.node.services.vault.PageSpecification
|
||||
import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.node.services.vault.QueryCriteria.FungibleAssetQueryCriteria
|
||||
import net.corda.core.node.services.vault.QueryCriteria.LinearStateQueryCriteria
|
||||
import net.corda.core.node.services.vault.QueryCriteria.SoftLockingCondition
|
||||
import net.corda.core.node.services.vault.QueryCriteria.SoftLockingType
|
||||
import net.corda.core.node.services.vault.QueryCriteria.TimeCondition
|
||||
import net.corda.core.node.services.vault.QueryCriteria.TimeInstantType
|
||||
import net.corda.core.node.services.vault.QueryCriteria.VaultCustomQueryCriteria
|
||||
import net.corda.core.node.services.vault.QueryCriteria.VaultQueryCriteria
|
||||
import net.corda.core.node.services.vault.Sort
|
||||
import net.corda.core.node.services.vault.SortAttribute
|
||||
import net.corda.core.node.services.vault.builder
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.finance.*
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.core.utilities.toHexString
|
||||
import net.corda.finance.AMOUNT
|
||||
import net.corda.finance.CHF
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.GBP
|
||||
import net.corda.finance.POUNDS
|
||||
import net.corda.finance.SWISS_FRANCS
|
||||
import net.corda.finance.USD
|
||||
import net.corda.finance.`issued by`
|
||||
import net.corda.finance.contracts.CommercialPaper
|
||||
import net.corda.finance.contracts.Commodity
|
||||
import net.corda.finance.contracts.DealState
|
||||
@ -27,7 +67,18 @@ import net.corda.node.internal.configureDatabase
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.BOC_NAME
|
||||
import net.corda.testing.core.CHARLIE_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.core.dummyCommand
|
||||
import net.corda.testing.core.expect
|
||||
import net.corda.testing.core.expectEvents
|
||||
import net.corda.testing.core.sequence
|
||||
import net.corda.testing.core.singleIdentityAndCert
|
||||
import net.corda.testing.internal.TEST_TX_TIME
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.internal.vault.DUMMY_LINEAR_CONTRACT_PROGRAM_ID
|
||||
@ -38,6 +89,7 @@ import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseAndMockServices
|
||||
import net.corda.testing.node.makeTestIdentityService
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatCode
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Ignore
|
||||
import org.junit.Rule
|
||||
@ -2235,6 +2287,24 @@ abstract class VaultQueryTestsBase : VaultQueryParties {
|
||||
assertThat(exitStates).hasSize(0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `record a transaction with number of inputs greater than vault page size`() {
|
||||
val notary = dummyNotary
|
||||
val issuerKey = notary.keyPair
|
||||
val signatureMetadata = SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(issuerKey.public).schemeNumberID)
|
||||
val states = database.transaction {
|
||||
vaultFiller.fillWithSomeTestLinearStates(PageSpecification().pageSize + 1).states
|
||||
}
|
||||
|
||||
database.transaction {
|
||||
val statesExitingTx = TransactionBuilder(notary.party).withItems(*states.toList().toTypedArray()).addCommand(dummyCommand())
|
||||
val signedStatesExitingTx = services.signInitialTransaction(statesExitingTx).withAdditionalSignature(issuerKey, signatureMetadata)
|
||||
|
||||
assertThatCode { services.recordTransactions(signedStatesExitingTx) }.doesNotThrowAnyException()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* USE CASE demonstrations (outside of mainline Corda)
|
||||
*
|
||||
|
Loading…
Reference in New Issue
Block a user