diff --git a/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt b/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt index 867a719949..c7a6cfd112 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt @@ -85,6 +85,7 @@ sealed class FetchDataFlow( for (hash in toFetch) { // We skip the validation here (with unwrap { it }) because we will do it below in validateFetchResponse. // The only thing checked is the object type. It is a protocol violation to send results out of order. + // TODO We need to page here after large messages will work. maybeItems += otherSideSession.sendAndReceive>(Request.Data(NonEmptySet.of(hash), dataType)).unwrap { it } } // Check for a buggy/malicious peer answering with something that we didn't ask for. diff --git a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt index 5ceb17d094..d9a6f2c715 100644 --- a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt @@ -12,6 +12,8 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction import net.corda.core.utilities.exactAdd import java.util.* +import kotlin.collections.ArrayList +import kotlin.math.min // TODO: This code is currently unit tested by TwoPartyTradeFlowTests, it should have its own tests. /** @@ -20,8 +22,12 @@ import java.util.* * * @return a list of verified [SignedTransaction] objects, in a depth-first order. */ -class ResolveTransactionsFlow(private val txHashes: Set, - private val otherSide: FlowSession) : FlowLogic>() { +class ResolveTransactionsFlow(txHashesArg: Set, + private val otherSide: FlowSession) : FlowLogic() { + + // Need it ordered in terms of iteration. Needs to be a variable for the check-pointing logic to work. + private val txHashes = txHashesArg.toList() + /** * Resolves and validates the dependencies of the specified [SignedTransaction]. Fetches the attachments, but does * *not* validate or store the [SignedTransaction] itself. @@ -35,6 +41,8 @@ class ResolveTransactionsFlow(private val txHashes: Set, companion object { private fun dependencyIDs(stx: SignedTransaction) = stx.inputs.map { it.txhash }.toSet() + private const val RESOLUTION_PAGE_SIZE = 100 + /** * Topologically sorts the given transactions such that dependencies are listed before dependers. */ @JvmStatic @@ -83,10 +91,16 @@ class ResolveTransactionsFlow(private val txHashes: Set, @Suspendable @Throws(FetchDataFlow.HashNotFound::class) - override fun call(): List { + override fun call() { + val newTxns = ArrayList(txHashes.size) // Start fetching data. - val newTxns = downloadDependencies(txHashes) - fetchMissingAttachments(signedTransaction?.let { newTxns + it } ?: newTxns) + for (pageNumber in 0..(txHashes.size - 1) / RESOLUTION_PAGE_SIZE) { + val page = page(pageNumber, RESOLUTION_PAGE_SIZE) + + newTxns += downloadDependencies(page) + val txsWithMissingAttachments = if (pageNumber == 0) signedTransaction?.let { newTxns + it } ?: newTxns else newTxns + fetchMissingAttachments(txsWithMissingAttachments) + } otherSide.send(FetchDataFlow.Request.End) // Finish fetching data. @@ -99,13 +113,17 @@ class ResolveTransactionsFlow(private val txHashes: Set, it.verify(serviceHub) serviceHub.recordTransactions(StatesToRecord.NONE, listOf(it)) } + } - return signedTransaction?.let { - result + it - } ?: result + private fun page(pageNumber: Int, pageSize: Int): Set { + val offset = pageNumber * pageSize + val limit = min(offset + pageSize, txHashes.size) + // call toSet() is needed because sub-lists are not checkpoint-friendly. + return txHashes.subList(offset, limit).toSet() } @Suspendable + // TODO use paging here (we literally get the entire dependencies graph in memory) private fun downloadDependencies(depsToCheck: Set): List { // Maintain a work queue of all hashes to load/download, initialised with our starting set. Then do a breadth // first traversal across the dependency graph. @@ -132,13 +150,14 @@ class ResolveTransactionsFlow(private val txHashes: Set, while (nextRequests.isNotEmpty()) { // Don't re-download the same tx when we haven't verified it yet but it's referenced multiple times in the // graph we're traversing. - val notAlreadyFetched = nextRequests.filterNot { it in resultQ }.toSet() + val notAlreadyFetched: Set = nextRequests - resultQ.keys nextRequests.clear() if (notAlreadyFetched.isEmpty()) // Done early. break // Request the standalone transaction data (which may refer to things we don't yet have). + // TODO use paging here val downloads: List = subFlow(FetchTransactionsFlow(notAlreadyFetched, otherSide)).downloaded for (stx in downloads) diff --git a/core/src/main/kotlin/net/corda/core/transactions/ContractUpgradeTransactions.kt b/core/src/main/kotlin/net/corda/core/transactions/ContractUpgradeTransactions.kt index e85b50f82e..d44cee239d 100644 --- a/core/src/main/kotlin/net/corda/core/transactions/ContractUpgradeTransactions.kt +++ b/core/src/main/kotlin/net/corda/core/transactions/ContractUpgradeTransactions.kt @@ -41,6 +41,7 @@ data class ContractUpgradeWireTransaction( init { check(inputs.isNotEmpty()) { "A contract upgrade transaction must have inputs" } + checkBaseInvariants() } /** diff --git a/core/src/main/kotlin/net/corda/core/transactions/NotaryChangeTransactions.kt b/core/src/main/kotlin/net/corda/core/transactions/NotaryChangeTransactions.kt index 0704ec0db2..fbee73d680 100644 --- a/core/src/main/kotlin/net/corda/core/transactions/NotaryChangeTransactions.kt +++ b/core/src/main/kotlin/net/corda/core/transactions/NotaryChangeTransactions.kt @@ -45,6 +45,7 @@ data class NotaryChangeWireTransaction( init { check(inputs.isNotEmpty()) { "A notary change transaction must have inputs" } check(notary != newNotary) { "The old and new notaries must be different – $newNotary" } + checkBaseInvariants() } /** diff --git a/core/src/test/kotlin/net/corda/core/internal/ResolveTransactionsFlowTest.kt b/core/src/test/kotlin/net/corda/core/internal/ResolveTransactionsFlowTest.kt index 8b762ecb9a..b652959156 100644 --- a/core/src/test/kotlin/net/corda/core/internal/ResolveTransactionsFlowTest.kt +++ b/core/src/test/kotlin/net/corda/core/internal/ResolveTransactionsFlowTest.kt @@ -58,8 +58,7 @@ class ResolveTransactionsFlowTest { val p = TestFlow(setOf(stx2.id), megaCorp) val future = miniCorpNode.startFlow(p) mockNet.runNetwork() - val results = future.getOrThrow() - assertEquals(listOf(stx1.id, stx2.id), results.map { it.id }) + future.getOrThrow() miniCorpNode.transaction { assertEquals(stx1, miniCorpNode.services.validatedTransactions.getTransaction(stx1.id)) assertEquals(stx2, miniCorpNode.services.validatedTransactions.getTransaction(stx2.id)) @@ -189,16 +188,16 @@ class ResolveTransactionsFlowTest { // DOCEND 2 @InitiatingFlow - private class TestFlow(val otherSide: Party, private val resolveTransactionsFlowFactory: (FlowSession) -> ResolveTransactionsFlow, private val txCountLimit: Int? = null) : FlowLogic>() { + private class TestFlow(val otherSide: Party, private val resolveTransactionsFlowFactory: (FlowSession) -> ResolveTransactionsFlow, private val txCountLimit: Int? = null) : FlowLogic() { constructor(txHashes: Set, otherSide: Party, txCountLimit: Int? = null) : this(otherSide, { ResolveTransactionsFlow(txHashes, it) }, txCountLimit = txCountLimit) constructor(stx: SignedTransaction, otherSide: Party) : this(otherSide, { ResolveTransactionsFlow(stx, it) }) @Suspendable - override fun call(): List { + override fun call() { val session = initiateFlow(otherSide) val resolveTransactionsFlow = resolveTransactionsFlowFactory(session) txCountLimit?.let { resolveTransactionsFlow.transactionCountLimit = it } - return subFlow(resolveTransactionsFlow) + subFlow(resolveTransactionsFlow) } } diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index ae292a8b71..4deb351720 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -6,6 +6,7 @@ release, see :doc:`upgrade-notes`. Unreleased ========== +* Fixed an error thrown by NodeVaultService upon recording a transaction with a number of inputs greater than the default page size. * Fixed incorrect computation of ``totalStates`` from ``otherResults`` in ``NodeVaultService``. diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 038d25b562..817d0d7a40 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -188,9 +188,18 @@ class NodeVaultService( } private fun loadStates(refs: Collection): Collection> { - return if (refs.isNotEmpty()) - queryBy(QueryCriteria.VaultQueryCriteria(stateRefs = refs.toList())).states - else emptySet() + val states = mutableListOf>() + if (refs.isNotEmpty()) { + val refsList = refs.toList() + val pageSize = PageSpecification().pageSize + (0..(refsList.size - 1) / pageSize).forEach { + val offset = it * pageSize + val limit = minOf(offset + pageSize, refsList.size) + val page = queryBy(QueryCriteria.VaultQueryCriteria(stateRefs = refsList.subList(offset, limit))).states + states.addAll(page) + } + } + return states } private fun processAndNotify(updates: List>) { @@ -507,4 +516,4 @@ class NodeVaultService( } return myInterfaces } -} +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt index cc8f47a2a0..a09a027cee 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt @@ -63,7 +63,8 @@ class NotaryServiceTests { } private fun generateTransaction(node: StartedNode, party: Party, notary: Party): SignedTransaction { - val inputs = (1..10_005).map { StateRef(SecureHash.randomSHA256(), 0) } + val txHash = SecureHash.randomSHA256() + val inputs = (1..10_005).map { StateRef(txHash, it) } val tx = NotaryChangeTransactionBuilder(inputs, notary, party).build() return node.services.run { diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt index 3271385473..d08c63eaff 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt @@ -1,18 +1,58 @@ package net.corda.node.services.vault -import net.corda.core.contracts.* +import net.corda.core.contracts.Amount +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.FungibleAsset +import net.corda.core.contracts.LinearState +import net.corda.core.contracts.PartyAndReference +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.SignatureMetadata import net.corda.core.crypto.generateKeyPair import net.corda.core.crypto.toStringShort import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.packageName -import net.corda.core.node.services.* -import net.corda.core.node.services.vault.* -import net.corda.core.node.services.vault.QueryCriteria.* +import net.corda.core.node.services.IdentityService +import net.corda.core.node.services.Vault +import net.corda.core.node.services.VaultQueryException +import net.corda.core.node.services.VaultService +import net.corda.core.node.services.queryBy +import net.corda.core.node.services.trackBy +import net.corda.core.node.services.vault.BinaryComparisonOperator +import net.corda.core.node.services.vault.ColumnPredicate +import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM +import net.corda.core.node.services.vault.DEFAULT_PAGE_SIZE +import net.corda.core.node.services.vault.MAX_PAGE_SIZE +import net.corda.core.node.services.vault.PageSpecification +import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.node.services.vault.QueryCriteria.FungibleAssetQueryCriteria +import net.corda.core.node.services.vault.QueryCriteria.LinearStateQueryCriteria +import net.corda.core.node.services.vault.QueryCriteria.SoftLockingCondition +import net.corda.core.node.services.vault.QueryCriteria.SoftLockingType +import net.corda.core.node.services.vault.QueryCriteria.TimeCondition +import net.corda.core.node.services.vault.QueryCriteria.TimeInstantType +import net.corda.core.node.services.vault.QueryCriteria.VaultCustomQueryCriteria +import net.corda.core.node.services.vault.QueryCriteria.VaultQueryCriteria +import net.corda.core.node.services.vault.Sort +import net.corda.core.node.services.vault.SortAttribute +import net.corda.core.node.services.vault.builder import net.corda.core.transactions.TransactionBuilder -import net.corda.core.utilities.* -import net.corda.finance.* +import net.corda.core.utilities.NonEmptySet +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.days +import net.corda.core.utilities.seconds +import net.corda.core.utilities.toHexString +import net.corda.finance.AMOUNT +import net.corda.finance.CHF +import net.corda.finance.DOLLARS +import net.corda.finance.GBP +import net.corda.finance.POUNDS +import net.corda.finance.SWISS_FRANCS +import net.corda.finance.USD +import net.corda.finance.`issued by` import net.corda.finance.contracts.CommercialPaper import net.corda.finance.contracts.Commodity import net.corda.finance.contracts.DealState @@ -27,7 +67,18 @@ import net.corda.node.internal.configureDatabase import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseTransaction -import net.corda.testing.core.* +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.BOC_NAME +import net.corda.testing.core.CHARLIE_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.core.TestIdentity +import net.corda.testing.core.dummyCommand +import net.corda.testing.core.expect +import net.corda.testing.core.expectEvents +import net.corda.testing.core.sequence +import net.corda.testing.core.singleIdentityAndCert import net.corda.testing.internal.TEST_TX_TIME import net.corda.testing.internal.rigorousMock import net.corda.testing.internal.vault.DUMMY_LINEAR_CONTRACT_PROGRAM_ID @@ -38,6 +89,7 @@ import net.corda.testing.node.MockServices import net.corda.testing.node.MockServices.Companion.makeTestDatabaseAndMockServices import net.corda.testing.node.makeTestIdentityService import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatCode import org.junit.ClassRule import org.junit.Ignore import org.junit.Rule @@ -2235,6 +2287,24 @@ abstract class VaultQueryTestsBase : VaultQueryParties { assertThat(exitStates).hasSize(0) } } + + @Test + fun `record a transaction with number of inputs greater than vault page size`() { + val notary = dummyNotary + val issuerKey = notary.keyPair + val signatureMetadata = SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(issuerKey.public).schemeNumberID) + val states = database.transaction { + vaultFiller.fillWithSomeTestLinearStates(PageSpecification().pageSize + 1).states + } + + database.transaction { + val statesExitingTx = TransactionBuilder(notary.party).withItems(*states.toList().toTypedArray()).addCommand(dummyCommand()) + val signedStatesExitingTx = services.signInitialTransaction(statesExitingTx).withAdditionalSignature(issuerKey, signatureMetadata) + + assertThatCode { services.recordTransactions(signedStatesExitingTx) }.doesNotThrowAnyException() + } + } + /** * USE CASE demonstrations (outside of mainline Corda) *