From fa33336d3886f720709761e1088f5147e41f6a71 Mon Sep 17 00:00:00 2001 From: josecoll Date: Thu, 16 Feb 2017 11:02:36 +0000 Subject: [PATCH] Initial implementation of Vault Persistence using Requery (#191) * Initial prototyping with Requery as a persistence replacement for Exposed/Hibernate Applied changes following PR review by RP Updated timestamp naming (removed committedTimestamp) and StateStatus (removed AWAITING_CONSENSUS) after discussion with RP. Removed FungibleState and LinearState schemas (and associated tests) - awaiting Requery uni-directional relationship fix. Added Transaction propagation such that requery re-uses any existing transaction context. Made requery default logging configurable (disabled by default) Nullable fields are now truly nullable (in the Kotlin and DDL sense) Fix for SimmValuation integration test. Workarounds applied to resolve Requery issues when sharing Transactional context. Addressed PR review comments from MH. Further updates following re-review by RP/MH Further updates following additional PR review comments by RP Minor update following additional PR review comments by RP Optimised makeUpdate state processing code. Resolved conflicts after rebase. Additional Unit tests and bug fix for correct spending of multiple contract state types within a single transaction. Required interface change to states() API to take a setOf (ContractStateClassTypes) Minor code clean-up. Re-write NodeVaultService consumed state makeUpdate function using SQL. * Resolve conflict after rebase from master --- build.gradle | 5 +- core/build.gradle | 6 +- .../net/corda/core/node/services/Services.kt | 52 +- .../net/corda/core/schemas/PersistentTypes.kt | 7 +- .../core/schemas/requery/PersistentState.kt | 26 + .../requery/converters/InstantConverter.kt | 30 + .../requery/converters/StateRefConverter.kt | 28 + .../converters/VaultStateStatusConverter.kt | 15 + .../core/flows/ContractUpgradeFlowTest.kt | 3 +- .../core/node/services/VaultEnumTypesTest.kt | 17 + .../corda/docs/FxTransactionBuildTutorial.kt | 3 +- finance/build.gradle | 3 + .../contracts/testing/DummyDealContract.kt | 31 + .../contracts/testing/DummyLinearContract.kt | 6 +- .../corda/contracts/testing/VaultFiller.kt | 32 +- .../kotlin/net/corda/flows/CashExitFlow.kt | 4 +- .../main/kotlin/net/corda/flows/CashFlow.kt | 0 .../kotlin/net/corda/schemas/CashSchemaV1.kt | 4 +- .../corda/schemas/CommercialPaperSchemaV1.kt | 2 + .../corda/contracts/CommercialPaperTests.kt | 14 +- .../net/corda/contracts/asset/CashTests.kt | 27 +- node-schemas/build.gradle | 34 ++ .../services/vault/schemas/VaultSchema.kt | 77 +++ .../services/vault/schemas/VaultSchemaTest.kt | 530 ++++++++++++++++++ node/build.gradle | 3 + .../net/corda/node/internal/APIServerImpl.kt | 0 .../net/corda/node/internal/AbstractNode.kt | 4 +- .../corda/node/internal/CordaRPCOpsImpl.kt | 2 +- .../KotlinConfigurationTransactionWrapper.kt | 149 +++++ .../services/database/RequeryConfiguration.kt | 52 ++ .../node/services/vault/NodeVaultService.kt | 236 ++++---- .../net/corda/node/CordaRPCOpsImplTest.kt | 3 +- .../node/messaging/TwoPartyTradeFlowTests.kt | 8 +- .../node/services/MockServiceHubInternal.kt | 7 +- .../node/services/NodeSchedulerServiceTest.kt | 6 +- .../node/services/NodeVaultServiceTest.kt | 43 +- .../corda/node/services/VaultWithCashTest.kt | 108 +++- .../database/RequeryConfigurationTest.kt | 172 ++++++ .../persistence/DBTransactionStorageTests.kt | 15 + .../persistence/DataVendingServiceTests.kt | 9 +- settings.gradle | 4 +- .../kotlin/net/corda/testing/node/MockNode.kt | 2 +- .../net/corda/testing/node/MockServices.kt | 3 +- 43 files changed, 1574 insertions(+), 208 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/schemas/requery/PersistentState.kt create mode 100644 core/src/main/kotlin/net/corda/core/schemas/requery/converters/InstantConverter.kt create mode 100644 core/src/main/kotlin/net/corda/core/schemas/requery/converters/StateRefConverter.kt create mode 100644 core/src/main/kotlin/net/corda/core/schemas/requery/converters/VaultStateStatusConverter.kt create mode 100644 core/src/test/kotlin/net/corda/core/node/services/VaultEnumTypesTest.kt create mode 100644 finance/src/main/kotlin/net/corda/contracts/testing/DummyDealContract.kt rename test-utils/src/main/kotlin/net/corda/testing/DummyLinearState.kt => finance/src/main/kotlin/net/corda/contracts/testing/DummyLinearContract.kt (94%) create mode 100644 finance/src/main/kotlin/net/corda/flows/CashFlow.kt create mode 100644 node-schemas/build.gradle create mode 100644 node-schemas/src/main/kotlin/net/corda/node/services/vault/schemas/VaultSchema.kt create mode 100644 node-schemas/src/test/kotlin/net/corda/node/services/vault/schemas/VaultSchemaTest.kt create mode 100644 node/src/main/kotlin/net/corda/node/internal/APIServerImpl.kt create mode 100644 node/src/main/kotlin/net/corda/node/services/database/KotlinConfigurationTransactionWrapper.kt create mode 100644 node/src/main/kotlin/net/corda/node/services/database/RequeryConfiguration.kt create mode 100644 node/src/test/kotlin/net/corda/node/services/database/RequeryConfigurationTest.kt diff --git a/build.gradle b/build.gradle index 78c6cd1a9d..244ec8ea9a 100644 --- a/build.gradle +++ b/build.gradle @@ -10,7 +10,7 @@ buildscript { // Dependency versions. Can run 'gradle dependencyUpdates' to find new versions of things. // // TODO: Sort this alphabetically. - ext.kotlin_version = '1.0.5-2' + ext.kotlin_version = '1.0.6' ext.quasar_version = '0.7.6' // TODO: Upgrade to 0.7.7+ when Quasar bug 238 is resolved. ext.asm_version = '0.5.3' ext.artemis_version = '1.5.1' @@ -29,6 +29,8 @@ buildscript { ext.jopt_simple_version = '5.0.2' ext.jansi_version = '1.14' ext.hibernate_version = '5.2.6.Final' + ext.rxjava_version = '1.2.4' + ext.requery_version = '1.1.1' ext.dokka_version = '0.9.13' repositories { @@ -43,6 +45,7 @@ buildscript { classpath "net.corda.plugins:quasar-utils:$gradle_plugins_version" classpath "net.corda.plugins:cordformation:$gradle_plugins_version" classpath 'com.github.ben-manes:gradle-versions-plugin:0.13.0' + classpath "org.jetbrains.kotlin:kotlin-noarg:$kotlin_version" classpath "org.jetbrains.dokka:dokka-gradle-plugin:${dokka_version}" } } diff --git a/core/build.gradle b/core/build.gradle index 2afd7e2634..0a191e70ca 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -31,6 +31,7 @@ sourceSets { } dependencies { + testCompile "junit:junit:$junit_version" testCompile "commons-fileupload:commons-fileupload:1.3.2" @@ -63,7 +64,7 @@ dependencies { compile "com.google.guava:guava:$guava_version" // RxJava: observable streams of events. - compile "io.reactivex:rxjava:1.2.4" + compile "io.reactivex:rxjava:$rxjava_version" // Kryo: object graph serialization. compile "com.esotericsoftware:kryo:4.0.0" @@ -88,4 +89,7 @@ dependencies { // RS API: Response type and codes for ApiUtils. compile "javax.ws.rs:javax.ws.rs-api:2.0" + + // Requery: SQL based query & persistence for Kotlin + compile "io.requery:requery-kotlin:$requery_version" } diff --git a/core/src/main/kotlin/net/corda/core/node/services/Services.kt b/core/src/main/kotlin/net/corda/core/node/services/Services.kt index c5acba89eb..688d4d8676 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/Services.kt @@ -32,13 +32,11 @@ val DEFAULT_SESSION_ID = 0L * * This abstract class has no references to Cash contracts. * - * [states] Holds the states that are *active* and *relevant*. + * [states] Holds a [VaultService] queried subset of states that are *active* and *relevant*. * Active means they haven't been consumed yet (or we don't know about it). * Relevant means they contain at least one of our pubkeys. */ -class Vault(val states: List>) { - @Suppress("UNCHECKED_CAST") - inline fun statesOfType() = states.filter { it.state.data is T } as List> +class Vault(val states: Iterable>) { /** * Represents an update observed by the vault that will be notified to observers. Include the [StateRef]s of @@ -81,6 +79,10 @@ class Vault(val states: List>) { companion object { val NoUpdate = Update(emptySet(), emptySet()) } + + enum class StateStatus { + UNCONSUMED, CONSUMED + } } /** @@ -92,11 +94,6 @@ class Vault(val states: List>) { * Note that transactions we've seen are held by the storage service, not the vault. */ interface VaultService { - /** - * Returns a read-only snapshot of the vault at the time the call is made. Note that if you consume states or - * keys in this vault, you must inform the vault service so it can update its internal state. - */ - val currentVault: Vault /** * Prefer the use of [updates] unless you know why you want to use this instead. @@ -125,25 +122,13 @@ interface VaultService { * Atomically get the current vault and a stream of updates. Note that the Observable buffers updates until the * first subscriber is registered so as to avoid racing with early updates. */ - fun track(): Pair> + fun track(): Pair, Observable> /** - * Returns a snapshot of the heads of LinearStates. + * Return unconsumed [ContractState]s for a given set of [StateRef]s + * TODO: revisit and generalize this exposed API function. */ - val linearHeads: Map> - - // TODO: When KT-10399 is fixed, rename this and remove the inline version below. - - /** Returns the [linearHeads] only when the type of the state would be considered an 'instanceof' the given type. */ - @Suppress("UNCHECKED_CAST") - fun linearHeadsOfType_(stateType: Class): Map> { - return linearHeads.filterValues { stateType.isInstance(it.state.data) }.mapValues { StateAndRef(it.value.state as TransactionState, it.value.ref) } - } - - fun statesForRefs(refs: List): Map?> { - val refsToStates = currentVault.states.associateBy { it.ref } - return refs.associateBy({ it }) { refsToStates[it]?.state } - } + fun statesForRefs(refs: List): Map?> /** * Possibly update the vault by marking as spent states that these transactions consume, and adding any relevant @@ -213,9 +198,24 @@ interface VaultService { amount: Amount, to: CompositeKey, onlyFromParties: Set? = null): Pair> + + /** + * Return [ContractState]s of a given [Contract] type and list of [Vault.StateStatus] + */ + fun states(clazzes: Set>, statuses: EnumSet): List> } -inline fun VaultService.linearHeadsOfType() = linearHeadsOfType_(T::class.java) +inline fun VaultService.unconsumedStates(): List> = + states(setOf(T::class.java), EnumSet.of(Vault.StateStatus.UNCONSUMED)) + +inline fun VaultService.consumedStates(): List> = + states(setOf(T::class.java), EnumSet.of(Vault.StateStatus.CONSUMED)) + +/** Returns the [linearState] heads only when the type of the state would be considered an 'instanceof' the given type. */ +inline fun VaultService.linearHeadsOfType() = + states(setOf(T::class.java), EnumSet.of(Vault.StateStatus.UNCONSUMED)) + .associateBy { it.state.data.linearId }.mapValues { it.value } + inline fun VaultService.dealsWith(party: AnonymousParty) = linearHeadsOfType().values.filter { it.state.data.parties.any { it == party } } diff --git a/core/src/main/kotlin/net/corda/core/schemas/PersistentTypes.kt b/core/src/main/kotlin/net/corda/core/schemas/PersistentTypes.kt index e690cd583d..7288f9fdcb 100644 --- a/core/src/main/kotlin/net/corda/core/schemas/PersistentTypes.kt +++ b/core/src/main/kotlin/net/corda/core/schemas/PersistentTypes.kt @@ -1,5 +1,6 @@ package net.corda.core.schemas +import io.requery.Persistable import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateRef import net.corda.core.serialization.toHexString @@ -48,7 +49,7 @@ abstract class MappedSchema(schemaFamily: Class<*>, * A super class for all mapped states exported to a schema that ensures the [StateRef] appears on the database row. The * [StateRef] will be set to the correct value by the framework (there's no need to set during mapping generation by the state itself). */ -@MappedSuperclass open class PersistentState(@EmbeddedId var stateRef: PersistentStateRef? = null) +@MappedSuperclass open class PersistentState(@EmbeddedId var stateRef: PersistentStateRef? = null) : Persistable /** * Embedded [StateRef] representation used in state mapping. @@ -62,5 +63,9 @@ data class PersistentStateRef( var index: Int? ) : Serializable { constructor(stateRef: StateRef) : this(stateRef.txhash.bytes.toHexString(), stateRef.index) + /* + JPA Query requirement: + @Entity classes should have a default (non-arg) constructor to instantiate the objects when retrieving them from the database. + */ constructor() : this(null, null) } diff --git a/core/src/main/kotlin/net/corda/core/schemas/requery/PersistentState.kt b/core/src/main/kotlin/net/corda/core/schemas/requery/PersistentState.kt new file mode 100644 index 0000000000..45db4afe24 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/schemas/requery/PersistentState.kt @@ -0,0 +1,26 @@ +package net.corda.core.schemas.requery + +import io.requery.Key +import io.requery.Persistable +import io.requery.Superclass +import net.corda.core.contracts.StateRef + +import javax.persistence.Column + +object Requery { + /** + * A super class for all mapped states exported to a schema that ensures the [StateRef] appears on the database row. The + * [StateRef] will be set to the correct value by the framework (there's no need to set during mapping generation by the state itself). + */ + // TODO: this interface will supercede the existing [PersistentState] interface defined in PersistentTypes.kt + // once we cut-over all existing Hibernate ContractState persistence to Requery + @Superclass interface PersistentState : Persistable { + @get:Key + @get:Column(name = "transaction_id", length = 64) + var txId: String + + @get:Key + @get:Column(name = "output_index") + var index: Int + } +} diff --git a/core/src/main/kotlin/net/corda/core/schemas/requery/converters/InstantConverter.kt b/core/src/main/kotlin/net/corda/core/schemas/requery/converters/InstantConverter.kt new file mode 100644 index 0000000000..da58628dbf --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/schemas/requery/converters/InstantConverter.kt @@ -0,0 +1,30 @@ +package net.corda.core.schemas.requery.converters + +import io.requery.Converter + +import java.sql.* +import java.time.* + +/** + * Converts from a [Instant] to a [java.sql.Timestamp] for Java 8. Note that + * when converting between the time type and the database type all times will be converted to the + * UTC zone offset. + */ +class InstantConverter : Converter { + + override fun getMappedType(): Class { return Instant::class.java } + + override fun getPersistedType(): Class { return Timestamp::class.java } + + override fun getPersistedSize(): Int? { return null } + + override fun convertToPersisted(value: Instant?): Timestamp? { + if (value == null) { return null } + return Timestamp.from(value) + } + + override fun convertToMapped(type: Class, value: Timestamp?): Instant? { + if (value == null) { return null } + return value.toInstant() + } +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/schemas/requery/converters/StateRefConverter.kt b/core/src/main/kotlin/net/corda/core/schemas/requery/converters/StateRefConverter.kt new file mode 100644 index 0000000000..11ab984bbc --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/schemas/requery/converters/StateRefConverter.kt @@ -0,0 +1,28 @@ +package net.corda.core.schemas.requery.converters + +import io.requery.Converter +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash + +/** + * Converts from a [StateRef] to a Composite Key defined by a [String] txnHash and an [Int] index + */ +class StateRefConverter : Converter> { + + override fun getMappedType(): Class { return StateRef::class.java } + + @Suppress("UNCHECKED_CAST") + override fun getPersistedType(): Class> { return Pair::class.java as Class> } + + override fun getPersistedSize(): Int? { return null } + + override fun convertToPersisted(value: StateRef?): Pair? { + if (value == null) { return null } + return Pair(value.txhash.toString(), value.index) + } + + override fun convertToMapped(type: Class, value: Pair?): StateRef? { + if (value == null) { return null } + return StateRef(SecureHash.parse(value.first), value.second) + } +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/schemas/requery/converters/VaultStateStatusConverter.kt b/core/src/main/kotlin/net/corda/core/schemas/requery/converters/VaultStateStatusConverter.kt new file mode 100644 index 0000000000..83ec825296 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/schemas/requery/converters/VaultStateStatusConverter.kt @@ -0,0 +1,15 @@ +package net.corda.core.schemas.requery.converters + +import io.requery.Converter +import io.requery.converter.EnumOrdinalConverter +import io.requery.sql.Mapping +import net.corda.core.contracts.ContractState +import net.corda.core.node.services.Vault + +import java.sql.* +import java.time.* + +/** + * Converter which persists a [Vault.StateStatus] enum using its enum ordinal representation + */ +class VaultStateStatusConverter() : EnumOrdinalConverter(Vault.StateStatus::class.java) diff --git a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt index b87e9dc668..3e470d5487 100644 --- a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt +++ b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt @@ -7,6 +7,7 @@ import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow +import net.corda.core.node.services.unconsumedStates import net.corda.core.serialization.OpaqueBytes import net.corda.core.utilities.Emoji import net.corda.flows.CashIssueFlow @@ -159,7 +160,7 @@ class ContractUpgradeFlowTest { a.services.startFlow(ContractUpgradeFlow.Instigator(stateAndRef, CashV2::class.java)) mockNet.runNetwork() // Get contract state form the vault. - val state = databaseTransaction(a.database) { a.vault.currentVault.states } + val state = databaseTransaction(a.database) { a.vault.unconsumedStates() } assertTrue(state.single().state.data is CashV2.State, "Contract state is upgraded to the new version.") assertEquals(Amount(1000000, USD).`issued by`(a.info.legalIdentity.ref(1)), (state.first().state.data as CashV2.State).amount, "Upgraded cash contain the correct amount.") assertEquals(listOf(a.info.legalIdentity.owningKey), (state.first().state.data as CashV2.State).owners, "Upgraded cash belongs to the right owner.") diff --git a/core/src/test/kotlin/net/corda/core/node/services/VaultEnumTypesTest.kt b/core/src/test/kotlin/net/corda/core/node/services/VaultEnumTypesTest.kt new file mode 100644 index 0000000000..fd775e2ba2 --- /dev/null +++ b/core/src/test/kotlin/net/corda/core/node/services/VaultEnumTypesTest.kt @@ -0,0 +1,17 @@ +package net.corda.core.node.services + +import org.assertj.core.api.Assertions +import org.junit.Test + +class VaultEnumTypesTest { + @Test + fun vaultStatusReflectsOrdinalValues() { + /** + * Warning!!! Do not change the order of this Enum as ordinal values are stored in the database + */ + val vaultStateStatusUnconsumed = Vault.StateStatus.UNCONSUMED + Assertions.assertThat(vaultStateStatusUnconsumed.ordinal).isEqualTo(0) + val vaultStateStatusConsumed = Vault.StateStatus.CONSUMED + Assertions.assertThat(vaultStateStatusConsumed.ordinal).isEqualTo(1) + } +} \ No newline at end of file diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt index cf07bc3faf..7780be034c 100644 --- a/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt @@ -13,6 +13,7 @@ import net.corda.core.crypto.signWithECDSA import net.corda.core.flows.FlowLogic import net.corda.core.node.PluginServiceHub import net.corda.core.node.ServiceHub +import net.corda.core.node.services.unconsumedStates import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.unwrap import net.corda.flows.FinalityFlow @@ -42,7 +43,7 @@ private fun gatherOurInputs(serviceHub: ServiceHub, amountRequired: Amount>, notary: Party?): Pair>, Long> { // Collect cash type inputs - val cashStates = serviceHub.vaultService.currentVault.statesOfType() + val cashStates = serviceHub.vaultService.unconsumedStates() // extract our key identity for convenience val ourKey = serviceHub.myInfo.legalIdentity.owningKey // Filter down to our own cash states with right currency and issuer diff --git a/finance/build.gradle b/finance/build.gradle index 4445138d1d..7c12ec55df 100644 --- a/finance/build.gradle +++ b/finance/build.gradle @@ -1,4 +1,7 @@ apply plugin: 'kotlin' +// Java Persistence API support: create no-arg constructor +// see: http://stackoverflow.com/questions/32038177/kotlin-with-jpa-default-constructor-hell +apply plugin: 'kotlin-jpa' apply plugin: CanonicalizerPlugin apply plugin: 'net.corda.plugins.publish-utils' apply plugin: 'net.corda.plugins.quasar-utils' diff --git a/finance/src/main/kotlin/net/corda/contracts/testing/DummyDealContract.kt b/finance/src/main/kotlin/net/corda/contracts/testing/DummyDealContract.kt new file mode 100644 index 0000000000..07e4dcb0da --- /dev/null +++ b/finance/src/main/kotlin/net/corda/contracts/testing/DummyDealContract.kt @@ -0,0 +1,31 @@ +package net.corda.contracts.testing + +import net.corda.core.contracts.Contract +import net.corda.core.contracts.DealState +import net.corda.core.contracts.TransactionForContract +import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.crypto.CompositeKey +import net.corda.core.crypto.Party +import net.corda.core.crypto.SecureHash +import net.corda.core.transactions.TransactionBuilder +import java.security.PublicKey + +class DummyDealContract: Contract { + override val legalContractReference: SecureHash = SecureHash.sha256("TestDeal") + + override fun verify(tx: TransactionForContract) {} + + data class State( + override val contract: Contract = DummyDealContract(), + override val participants: List = listOf(), + override val linearId: UniqueIdentifier = UniqueIdentifier(), + override val ref: String, + override val parties: List = listOf()) : DealState { + override fun isRelevant(ourKeys: Set): Boolean { + return participants.any { it.containsAny(ourKeys) } + } + override fun generateAgreement(notary: Party): TransactionBuilder { + throw UnsupportedOperationException("not implemented") + } + } +} \ No newline at end of file diff --git a/test-utils/src/main/kotlin/net/corda/testing/DummyLinearState.kt b/finance/src/main/kotlin/net/corda/contracts/testing/DummyLinearContract.kt similarity index 94% rename from test-utils/src/main/kotlin/net/corda/testing/DummyLinearState.kt rename to finance/src/main/kotlin/net/corda/contracts/testing/DummyLinearContract.kt index 64368a5f3e..5aa1c6f80d 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/DummyLinearState.kt +++ b/finance/src/main/kotlin/net/corda/contracts/testing/DummyLinearContract.kt @@ -1,4 +1,4 @@ -package net.corda.testing +package net.corda.contracts.testing import net.corda.core.contracts.* import net.corda.core.contracts.clauses.Clause @@ -16,7 +16,7 @@ class DummyLinearContract: Contract { FilterOn(clause, { states -> states.filterIsInstance() }), emptyList()) - class State( + data class State( override val linearId: UniqueIdentifier = UniqueIdentifier(), override val contract: Contract = DummyLinearContract(), override val participants: List = listOf(), @@ -26,4 +26,4 @@ class DummyLinearContract: Contract { return participants.any { it.containsAny(ourKeys) } } } -} +} \ No newline at end of file diff --git a/finance/src/main/kotlin/net/corda/contracts/testing/VaultFiller.kt b/finance/src/main/kotlin/net/corda/contracts/testing/VaultFiller.kt index a4284a41a3..ae2e9c097e 100644 --- a/finance/src/main/kotlin/net/corda/contracts/testing/VaultFiller.kt +++ b/finance/src/main/kotlin/net/corda/contracts/testing/VaultFiller.kt @@ -11,14 +11,44 @@ import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.TransactionType import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.Party +import net.corda.core.crypto.composite import net.corda.core.node.ServiceHub +import net.corda.core.node.recordTransactions import net.corda.core.node.services.Vault import net.corda.core.serialization.OpaqueBytes import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_NOTARY +import net.corda.core.utilities.DUMMY_NOTARY_KEY import java.security.KeyPair import java.util.* +fun ServiceHub.fillWithSomeTestDeals(dealIds: List) { + val freshKey = keyManagementService.freshKey() + val transactions: List = dealIds.map { + // Issue a deal state + val dummyIssue = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply { + addOutputState(DummyDealContract.State(ref = it, participants = listOf(freshKey.public.composite))) + signWith(freshKey) + signWith(DUMMY_NOTARY_KEY) + } + return@map dummyIssue.toSignedTransaction() + } + + recordTransactions(transactions) +} + +fun ServiceHub.fillWithSomeTestLinearStates(numberToCreate: Int) { + val freshKey = keyManagementService.freshKey() + for (i in 1..numberToCreate) { + // Issue a deal state + val dummyIssue = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply { + addOutputState(DummyLinearContract.State(participants = listOf(freshKey.public.composite))) + signWith(freshKey) + signWith(DUMMY_NOTARY_KEY) + } + recordTransactions(dummyIssue.toSignedTransaction()) + } +} /** * Creates a random set of between (by default) 3 and 10 cash states that add up to the given amount and adds them @@ -38,7 +68,7 @@ fun ServiceHub.fillWithSomeTestCash(howMuch: Amount, ref: OpaqueBytes = OpaqueBytes(ByteArray(1, { 1 })), ownedBy: CompositeKey? = null, issuedBy: PartyAndReference = DUMMY_CASH_ISSUER, - issuerKey: KeyPair = DUMMY_CASH_ISSUER_KEY): Vault { + issuerKey: KeyPair = DUMMY_CASH_ISSUER_KEY): Vault { val amounts = calculateRandomlySizedAmounts(howMuch, atLeastThisManyStates, atMostThisManyStates, rng) val myKey: CompositeKey = ownedBy ?: myInfo.legalIdentity.owningKey diff --git a/finance/src/main/kotlin/net/corda/flows/CashExitFlow.kt b/finance/src/main/kotlin/net/corda/flows/CashExitFlow.kt index de495bb9eb..e2664241d3 100644 --- a/finance/src/main/kotlin/net/corda/flows/CashExitFlow.kt +++ b/finance/src/main/kotlin/net/corda/flows/CashExitFlow.kt @@ -4,6 +4,8 @@ import co.paralleluniverse.fibers.Suspendable import net.corda.contracts.asset.Cash import net.corda.core.contracts.* import net.corda.core.crypto.Party +import net.corda.core.node.services.Vault +import net.corda.core.node.services.unconsumedStates import net.corda.core.serialization.OpaqueBytes import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder @@ -34,7 +36,7 @@ class CashExitFlow(val amount: Amount, val issueRef: OpaqueBytes, prog Cash().generateExit( builder, amount.issuedBy(issuer), - serviceHub.vaultService.currentVault.statesOfType().filter { it.state.data.owner == issuer.party.owningKey }) + serviceHub.vaultService.unconsumedStates().filter { it.state.data.owner == issuer.party.owningKey }) } catch (e: InsufficientBalanceException) { throw CashException("Exiting more cash than exists", e) } diff --git a/finance/src/main/kotlin/net/corda/flows/CashFlow.kt b/finance/src/main/kotlin/net/corda/flows/CashFlow.kt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/finance/src/main/kotlin/net/corda/schemas/CashSchemaV1.kt b/finance/src/main/kotlin/net/corda/schemas/CashSchemaV1.kt index 8972dd9254..692257c2ac 100644 --- a/finance/src/main/kotlin/net/corda/schemas/CashSchemaV1.kt +++ b/finance/src/main/kotlin/net/corda/schemas/CashSchemaV1.kt @@ -2,9 +2,7 @@ package net.corda.schemas import net.corda.core.schemas.MappedSchema import net.corda.core.schemas.PersistentState -import javax.persistence.Column -import javax.persistence.Entity -import javax.persistence.Table +import javax.persistence.* /** * An object used to fully qualify the [CashSchema] family name (i.e. independent of version). diff --git a/finance/src/main/kotlin/net/corda/schemas/CommercialPaperSchemaV1.kt b/finance/src/main/kotlin/net/corda/schemas/CommercialPaperSchemaV1.kt index 4181855b70..540004d127 100644 --- a/finance/src/main/kotlin/net/corda/schemas/CommercialPaperSchemaV1.kt +++ b/finance/src/main/kotlin/net/corda/schemas/CommercialPaperSchemaV1.kt @@ -1,7 +1,9 @@ package net.corda.schemas +import io.requery.Convert import net.corda.core.schemas.MappedSchema import net.corda.core.schemas.PersistentState +import net.corda.core.schemas.requery.converters.InstantConverter import java.time.Instant import javax.persistence.Column import javax.persistence.Entity diff --git a/finance/src/test/kotlin/net/corda/contracts/CommercialPaperTests.kt b/finance/src/test/kotlin/net/corda/contracts/CommercialPaperTests.kt index 98c4acdaf4..e5a4569172 100644 --- a/finance/src/test/kotlin/net/corda/contracts/CommercialPaperTests.kt +++ b/finance/src/test/kotlin/net/corda/contracts/CommercialPaperTests.kt @@ -210,24 +210,25 @@ class CommercialPaperTestsGeneric { */ private lateinit var bigCorpServices: MockServices - private lateinit var bigCorpVault: Vault + private lateinit var bigCorpVault: Vault private lateinit var bigCorpVaultService: VaultService private lateinit var aliceServices: MockServices private lateinit var aliceVaultService: VaultService - private lateinit var alicesVault: Vault + private lateinit var alicesVault: Vault private lateinit var moveTX: SignedTransaction @Test fun `issue move and then redeem`() { - val dataSourceAndDatabaseAlice = configureDatabase(makeTestDataSourceProperties()) + val dataSourcePropsAlice = makeTestDataSourceProperties() + val dataSourceAndDatabaseAlice = configureDatabase(dataSourcePropsAlice) val databaseAlice = dataSourceAndDatabaseAlice.second databaseTransaction(databaseAlice) { aliceServices = object : MockServices() { - override val vaultService: VaultService = NodeVaultService(this) + override val vaultService: VaultService = NodeVaultService(this, dataSourcePropsAlice) override fun recordTransactions(txs: Iterable) { for (stx in txs) { @@ -241,12 +242,13 @@ class CommercialPaperTestsGeneric { aliceVaultService = aliceServices.vaultService } - val dataSourceAndDatabaseBigCorp = configureDatabase(makeTestDataSourceProperties()) + val dataSourcePropsBigCorp = makeTestDataSourceProperties() + val dataSourceAndDatabaseBigCorp = configureDatabase(dataSourcePropsBigCorp) val databaseBigCorp = dataSourceAndDatabaseBigCorp.second databaseTransaction(databaseBigCorp) { bigCorpServices = object : MockServices() { - override val vaultService: VaultService = NodeVaultService(this) + override val vaultService: VaultService = NodeVaultService(this, dataSourcePropsBigCorp) override fun recordTransactions(txs: Iterable) { for (stx in txs) { diff --git a/finance/src/test/kotlin/net/corda/contracts/asset/CashTests.kt b/finance/src/test/kotlin/net/corda/contracts/asset/CashTests.kt index e1e4c8e65d..da84c600d0 100644 --- a/finance/src/test/kotlin/net/corda/contracts/asset/CashTests.kt +++ b/finance/src/test/kotlin/net/corda/contracts/asset/CashTests.kt @@ -3,8 +3,8 @@ package net.corda.contracts.asset import net.corda.contracts.testing.fillWithSomeTestCash import net.corda.core.contracts.* import net.corda.core.crypto.* -import net.corda.core.node.services.Vault import net.corda.core.node.services.VaultService +import net.corda.core.node.services.unconsumedStates import net.corda.core.serialization.OpaqueBytes import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction @@ -46,18 +46,19 @@ class CashTests { val vault: VaultService get() = services.vaultService lateinit var dataSource: Closeable lateinit var database: Database - lateinit var vaultService: Vault + lateinit var vaultStatesUnconsumed: List> @Before fun setUp() { LogHelper.setLevel(NodeVaultService::class) - val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties()) + val dataSourceProps = makeTestDataSourceProperties() + val dataSourceAndDatabase = configureDatabase(dataSourceProps) dataSource = dataSourceAndDatabase.first database = dataSourceAndDatabase.second databaseTransaction(database) { services = object : MockServices() { override val keyManagementService: MockKeyManagementService = MockKeyManagementService(MINI_CORP_KEY, MEGA_CORP_KEY, OUR_KEY) - override val vaultService: VaultService = NodeVaultService(this) + override val vaultService: VaultService = NodeVaultService(this, dataSourceProps) override fun recordTransactions(txs: Iterable) { for (stx in txs) { @@ -77,7 +78,7 @@ class CashTests { services.fillWithSomeTestCash(howMuch = 80.SWISS_FRANCS, atLeastThisManyStates = 1, atMostThisManyStates = 1, issuedBy = MINI_CORP.ref(1), issuerKey = MINI_CORP_KEY, ownedBy = OUR_PUBKEY_1) - vaultService = services.vaultService.currentVault + vaultStatesUnconsumed = services.vaultService.unconsumedStates() } } @@ -566,7 +567,7 @@ class CashTests { val wtx = makeSpend(100.DOLLARS, THEIR_PUBKEY_1) @Suppress("UNCHECKED_CAST") - val vaultState = vaultService.states.elementAt(0) as StateAndRef + val vaultState = vaultStatesUnconsumed.elementAt(0) assertEquals(vaultState.ref, wtx.inputs[0]) assertEquals(vaultState.state.data.copy(owner = THEIR_PUBKEY_1), wtx.outputs[0].data) assertEquals(OUR_PUBKEY_1, wtx.commands.single { it.value is Cash.Commands.Move }.signers[0]) @@ -581,7 +582,7 @@ class CashTests { val tx = TransactionType.General.Builder(DUMMY_NOTARY) vault.generateSpend(tx, 80.DOLLARS, ALICE_PUBKEY, setOf(MINI_CORP)) - assertEquals(vaultService.states.elementAt(2).ref, tx.inputStates()[0]) + assertEquals(vaultStatesUnconsumed.elementAt(2).ref, tx.inputStates()[0]) } } @@ -593,7 +594,7 @@ class CashTests { val wtx = makeSpend(10.DOLLARS, THEIR_PUBKEY_1) @Suppress("UNCHECKED_CAST") - val vaultState = vaultService.states.elementAt(0) as StateAndRef + val vaultState = vaultStatesUnconsumed.elementAt(0) assertEquals(vaultState.ref, wtx.inputs[0]) assertEquals(vaultState.state.data.copy(owner = THEIR_PUBKEY_1, amount = 10.DOLLARS `issued by` defaultIssuer), wtx.outputs[0].data) assertEquals(vaultState.state.data.copy(amount = 90.DOLLARS `issued by` defaultIssuer), wtx.outputs[1].data) @@ -608,8 +609,8 @@ class CashTests { val wtx = makeSpend(500.DOLLARS, THEIR_PUBKEY_1) @Suppress("UNCHECKED_CAST") - val vaultState0 = vaultService.states.elementAt(0) as StateAndRef - val vaultState1 = vaultService.states.elementAt(1) + val vaultState0 = vaultStatesUnconsumed.elementAt(0) + val vaultState1 = vaultStatesUnconsumed.elementAt(1) assertEquals(vaultState0.ref, wtx.inputs[0]) assertEquals(vaultState1.ref, wtx.inputs[1]) assertEquals(vaultState0.state.data.copy(owner = THEIR_PUBKEY_1, amount = 500.DOLLARS `issued by` defaultIssuer), wtx.outputs[0].data) @@ -625,10 +626,10 @@ class CashTests { assertEquals(3, wtx.inputs.size) @Suppress("UNCHECKED_CAST") - val vaultState0 = vaultService.states.elementAt(0) as StateAndRef - val vaultState1 = vaultService.states.elementAt(1) + val vaultState0 = vaultStatesUnconsumed.elementAt(0) + val vaultState1 = vaultStatesUnconsumed.elementAt(1) @Suppress("UNCHECKED_CAST") - val vaultState2 = vaultService.states.elementAt(2) as StateAndRef + val vaultState2 = vaultStatesUnconsumed.elementAt(2) assertEquals(vaultState0.ref, wtx.inputs[0]) assertEquals(vaultState1.ref, wtx.inputs[1]) assertEquals(vaultState2.ref, wtx.inputs[2]) diff --git a/node-schemas/build.gradle b/node-schemas/build.gradle new file mode 100644 index 0000000000..1a2980f413 --- /dev/null +++ b/node-schemas/build.gradle @@ -0,0 +1,34 @@ +apply plugin: 'kotlin' +apply plugin: 'kotlin-kapt' + +description 'Corda node database schemas' + +repositories { + mavenLocal() + mavenCentral() + maven { + url 'http://oss.sonatype.org/content/repositories/snapshots' + } + jcenter() + maven { + url 'https://dl.bintray.com/kotlin/exposed' + } +} + +sourceSets { + main { + kotlin { + srcDir "${buildDir}/generated/source/kapt/main/" + } + } +} + +dependencies { + compile project(':core') + + // Requery: SQL based query & persistence for Kotlin + kapt "io.requery:requery-processor:$requery_version" + + // For H2 database support in persistence + testCompile "com.h2database:h2:1.4.193" +} diff --git a/node-schemas/src/main/kotlin/net/corda/node/services/vault/schemas/VaultSchema.kt b/node-schemas/src/main/kotlin/net/corda/node/services/vault/schemas/VaultSchema.kt new file mode 100644 index 0000000000..3c0c566581 --- /dev/null +++ b/node-schemas/src/main/kotlin/net/corda/node/services/vault/schemas/VaultSchema.kt @@ -0,0 +1,77 @@ +package net.corda.node.services.vault.schemas + +import io.requery.* +import net.corda.core.node.services.Vault +import net.corda.core.schemas.requery.Requery +import net.corda.core.schemas.requery.converters.InstantConverter +import java.time.Instant + +object VaultSchema { + + @Table(name = "vault_transaction_notes") + @Entity(model = "vault") + interface VaultTxnNote : Persistable { + @get:Key + @get:Generated + @get:Column(name = "seq_no", index = true) + var seqNo: Int + + @get:Column(name = "transaction_id", length = 64, index = true) + var txId: String + + @get:Column(name = "note") + var note: String + } + + @Table(name = "vault_cash_balances") + @Entity(model = "vault") + interface VaultCashBalances : Persistable { + @get:Key + @get:Column(name = "currency_code", length = 3) + var currency: String + + @get:Column(name = "amount", value = "0") + var amount: Long + } + + @Table(name = "vault_states") + @Entity(model = "vault") + interface VaultStates : Requery.PersistentState { + /** refers to the notary a state is attached to */ + @get:Column(name = "notary_name") + var notaryName: String + + @get:Column(name = "notary_key") + var notaryKey: String + + /** references a concrete ContractState that is [QueryableState] and has a [MappedSchema] */ + @get:Column(name = "contract_state_class_name") + var contractStateClassName: String + + /** refers to serialized transaction Contract State */ + // TODO: define contract state size maximum size and adjust length accordingly + @get:Column(name = "contract_state", length = 10000) + var contractState: ByteArray + + /** state lifecycle: unconsumed, consumed */ + @get:Column(name = "state_status") + var stateStatus: Vault.StateStatus + + /** refers to timestamp recorded upon entering UNCONSUMED state */ + @get:Column(name = "recorded_timestamp") + var recordedTime: Instant + + /** refers to timestamp recorded upon entering CONSUMED state */ + @get:Column(name = "consumed_timestamp", nullable = true) + var consumedTime: Instant? + + /** used to denote a state has been soft locked (to prevent double spend) + * will contain a temporary unique [UUID] obtained from a flow session */ + @get:Column(name = "lock_id", nullable = true) + var lockId: String? + + /** refers to the last time a lock was taken (reserved) or updated (released, re-reserved) */ + @get:Column(name = "lock_timestamp", nullable = true) + var lockUpdateTime: Instant? + } +} \ No newline at end of file diff --git a/node-schemas/src/test/kotlin/net/corda/node/services/vault/schemas/VaultSchemaTest.kt b/node-schemas/src/test/kotlin/net/corda/node/services/vault/schemas/VaultSchemaTest.kt new file mode 100644 index 0000000000..c2f9023081 --- /dev/null +++ b/node-schemas/src/test/kotlin/net/corda/node/services/vault/schemas/VaultSchemaTest.kt @@ -0,0 +1,530 @@ +package net.corda.node.services.vault.schemas + +import io.requery.Persistable +import io.requery.TransactionIsolation +import io.requery.kotlin.eq +import io.requery.kotlin.invoke +import io.requery.rx.KotlinRxEntityStore +import io.requery.sql.* +import io.requery.sql.platform.Generic +import net.corda.core.contracts.* +import net.corda.core.crypto.CompositeKey +import net.corda.core.crypto.Party +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.composite +import net.corda.core.node.services.Vault +import net.corda.core.schemas.requery.converters.InstantConverter +import net.corda.core.schemas.requery.converters.VaultStateStatusConverter +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.utilities.DUMMY_NOTARY +import net.corda.core.utilities.DUMMY_NOTARY_KEY +import net.corda.core.utilities.DUMMY_PUBKEY_1 +import net.corda.core.utilities.DUMMY_PUBKEY_2 +import org.h2.jdbcx.JdbcDataSource +import org.junit.After +import org.junit.Assert +import org.junit.Before +import org.junit.Test +import rx.Observable +import java.sql.Connection +import java.sql.DriverManager +import java.time.Instant +import java.util.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull +import kotlin.test.assertTrue + +class VaultSchemaTest { + + var instance : KotlinEntityDataStore? = null + val data : KotlinEntityDataStore get() = instance!! + + var oinstance : KotlinRxEntityStore? = null + val odata : KotlinRxEntityStore get() = oinstance!! + + var transaction : LedgerTransaction? = null + + var jdbcInstance : Connection? = null + val jdbcConn : Connection get() = jdbcInstance!! + + @Before + fun setup() { + val dataSource = JdbcDataSource() + dataSource.setURL("jdbc:h2:mem:vault_persistence;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=-1") + val configuration = KotlinConfiguration(dataSource = dataSource, model = Models.VAULT, mapping = setupCustomMapping()) + instance = KotlinEntityDataStore(configuration) + oinstance = KotlinRxEntityStore(KotlinEntityDataStore(configuration)) + val tables = SchemaModifier(configuration) + val mode = TableCreationMode.DROP_CREATE + tables.createTables(mode) + + jdbcInstance = DriverManager.getConnection(dataSource.getURL()) + + // create dummy test data + setupDummyData() + } + + private fun setupCustomMapping(): Mapping? { + val mapping = GenericMapping(Generic()) + val instantConverter = InstantConverter() + mapping.addConverter(instantConverter, instantConverter.mappedType) + val vaultStateStatusConverter = VaultStateStatusConverter() + mapping.addConverter(vaultStateStatusConverter, vaultStateStatusConverter.mappedType) + return mapping + } + + @After + fun tearDown() { + data.close() + } + + private class VaultNoopContract() : Contract { + override val legalContractReference = SecureHash.sha256("") + data class VaultNoopState(override val owner: CompositeKey) : OwnableState { + override val contract = VaultNoopContract() + override val participants: List + get() = listOf(owner) + override fun withNewOwner(newOwner: CompositeKey) = Pair(Commands.Create(), copy(owner = newOwner)) + } + interface Commands : CommandData { + class Create : TypeOnlyCommandData(), Commands + } + + override fun verify(tx: TransactionForContract) { + // Always accepts. + } + } + + private fun setupDummyData() { + // dummy Transaction + val notary: Party = DUMMY_NOTARY + val inState1 = TransactionState(DummyContract.SingleOwnerState(0, DUMMY_PUBKEY_1), notary) + val inState2 = TransactionState(DummyContract.MultiOwnerState(0, + listOf(DUMMY_PUBKEY_1, DUMMY_PUBKEY_2)), notary) + val inState3 = TransactionState(VaultNoopContract.VaultNoopState(DUMMY_PUBKEY_1), notary) + val outState1 = inState1.copy() + val outState2 = inState2.copy() + val outState3 = inState3.copy() + val inputs = listOf(StateAndRef(inState1, StateRef(SecureHash.randomSHA256(), 0)), + StateAndRef(inState2, StateRef(SecureHash.randomSHA256(), 0)), + StateAndRef(inState3, StateRef(SecureHash.randomSHA256(), 0))) + val outputs = listOf(outState1, outState2, outState3) + val commands = emptyList>() + val attachments = emptyList() + val id = SecureHash.randomSHA256() + val signers = listOf(DUMMY_NOTARY_KEY.public.composite) + val timestamp: Timestamp? = null + transaction = LedgerTransaction( + inputs, + outputs, + commands, + attachments, + id, + notary, + signers, + timestamp, + TransactionType.General() + ) + } + + private fun createTxnWithTwoStateTypes(): LedgerTransaction { + val notary: Party = DUMMY_NOTARY + val inState1 = TransactionState(DummyContract.SingleOwnerState(0, DUMMY_PUBKEY_1), notary) + val inState2 = TransactionState(DummyContract.MultiOwnerState(0, + listOf(DUMMY_PUBKEY_1, DUMMY_PUBKEY_2)), notary) + val outState1 = inState1.copy() + val outState2 = inState2.copy() + val state1TxHash = SecureHash.randomSHA256() + val state2TxHash = SecureHash.randomSHA256() + val inputs = listOf(StateAndRef(inState1, StateRef(state1TxHash, 0)), + StateAndRef(inState1, StateRef(state1TxHash, 1)), + StateAndRef(inState2, StateRef(state2TxHash, 0)), + StateAndRef(inState1, StateRef(state1TxHash, 2))) // bogus state not in db + val outputs = listOf(outState1, outState2) + val commands = emptyList>() + val attachments = emptyList() + val id = SecureHash.randomSHA256() + val signers = listOf(DUMMY_NOTARY_KEY.public.composite) + val timestamp: Timestamp? = null + return LedgerTransaction( + inputs, + outputs, + commands, + attachments, + id, + notary, + signers, + timestamp, + TransactionType.General() + ) + } + + private fun dummyStatesInsert(txn: LedgerTransaction) { + data.invoke { + // skip inserting the last txn state (to mimic spend attempt of non existent unconsumed state) + txn.inputs.subList(0 , txn.inputs.lastIndex).forEach { + insert(createStateEntity(it)) + // create additional state entities with idx >0 + for (i in 3..4) { + try { + createStateEntity(it, idx = i).apply { + insert(this) + } + } catch(e: Exception) {} + } + // create additional state entities with different txn id + for (i in 1..3) { + createStateEntity(it, txHash = SecureHash.randomSHA256().toString()).apply { + insert(this) + } + } + } + // insert an additional MultiOwnerState with idx 1 + insert(createStateEntity(txn.inputs[2], idx = 1)) + + // insert entities with other state type + for (i in 1..5) { + VaultStatesEntity().apply { + txId = SecureHash.randomSHA256().toString() + index = 0 + contractStateClassName = VaultNoopContract.VaultNoopState::class.java.name + stateStatus = Vault.StateStatus.UNCONSUMED + insert(this) + } + } + } + + // check total numner of inserted states + assertEquals(3+4+9+1+5, data.select(VaultSchema.VaultStates::class).get().count()) + } + + /** + * Vault Schema: VaultStates + */ + @Test + fun testInsertState() { + val state = VaultStatesEntity() + state.txId = "12345" + state.index = 0 + data.invoke { + insert(state) + val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq state.txId) + Assert.assertSame(state, result().first()) + } + } + + @Test + fun testUpsertUnconsumedState() { + val stateEntity = createStateEntity(transaction!!.inputs[0]) + data.invoke { + upsert(stateEntity) + val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq stateEntity.txId) + Assert.assertSame(stateEntity, result().first()) + } + } + + @Test + fun testUpsertConsumedState() { + val stateEntity = createStateEntity(transaction!!.inputs[0]) + data.invoke { + upsert(stateEntity) + } + val keys = mapOf(VaultStatesEntity.TX_ID to stateEntity.txId, + VaultStatesEntity.INDEX to stateEntity.index) + val key = io.requery.proxy.CompositeKey(keys) + data.invoke { + val state = findByKey(VaultStatesEntity::class, key) + state?.run { + stateStatus = Vault.StateStatus.CONSUMED + consumedTime = Instant.now() + update(state) + val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq state.txId) + assertEquals(Vault.StateStatus.CONSUMED, result().first().stateStatus) + } + } + } + + @Test + fun testCashBalanceUpdate() { + val cashBalanceEntity = VaultCashBalancesEntity() + cashBalanceEntity.currency = "USD" + cashBalanceEntity.amount = 100 + data.invoke { + val state = findByKey(VaultCashBalancesEntity::class, cashBalanceEntity.currency) + assertNull(state) + upsert(cashBalanceEntity) + } + data.invoke { + val state = findByKey(VaultCashBalancesEntity::class, cashBalanceEntity.currency) + state?.let { + state.amount -= 80 + upsert(state) + } + assertEquals(20, state!!.amount) + } + } + + @Test + fun testTransactionalUpsertState() { + data.withTransaction(TransactionIsolation.REPEATABLE_READ) { + transaction!!.inputs.forEach { + val stateEntity = createStateEntity(it) + insert(stateEntity) + } + val result = select(VaultSchema.VaultStates::class) + Assert.assertSame(3, result().toList().size) + } + data.invoke { + val result = select(VaultSchema.VaultStates::class) + Assert.assertSame(3, result().toList().size) + } + } + + private fun createStateEntity(stateAndRef: StateAndRef<*>, idx: Int? = null, txHash: String? = null): VaultStatesEntity { + val stateRef = stateAndRef.ref + val state = stateAndRef.state + return VaultStatesEntity().apply { + txId = txHash ?: stateRef.txhash.toString() + index = idx ?: stateRef.index + stateStatus = Vault.StateStatus.UNCONSUMED + contractStateClassName = state.data.javaClass.name + contractState = state.serialize().bytes + notaryName = state.notary.name + notaryKey = state.notary.owningKey.toBase58String() + recordedTime = Instant.now() + } + } + + /** + * Vault Schema: Transaction Notes + */ + @Test + fun testInsertTxnNote() { + val txnNoteEntity = VaultTxnNoteEntity() + txnNoteEntity.txId = "12345" + txnNoteEntity.note = "Sample transaction note" + data.invoke { + insert(txnNoteEntity) + val result = select(VaultSchema.VaultTxnNote::class) + Assert.assertSame(txnNoteEntity, result().first()) + } + } + + @Test + fun testFindTxnNote() { + val txnNoteEntity = VaultTxnNoteEntity() + txnNoteEntity.txId = "12345" + txnNoteEntity.note = "Sample transaction note #1" + val txnNoteEntity2 = VaultTxnNoteEntity() + txnNoteEntity2.txId = "23456" + txnNoteEntity2.note = "Sample transaction note #2" + data.invoke { + insert(txnNoteEntity) + insert(txnNoteEntity2) + } + data.invoke { + val result = select(VaultSchema.VaultTxnNote::class) where (VaultSchema.VaultTxnNote::txId eq txnNoteEntity2.txId) + assertEquals(result().count(), 1) + Assert.assertSame(txnNoteEntity2, result().first()) + } + } + + /** + * Vault Schema: Cash Balances + */ + @Test + fun testInsertCashBalance() { + val cashBalanceEntity = VaultCashBalancesEntity() + cashBalanceEntity.currency = "GPB" + cashBalanceEntity.amount = 12345 + data.invoke { + insert(cashBalanceEntity) + val result = select(VaultSchema.VaultCashBalances::class) + Assert.assertSame(cashBalanceEntity, result().first()) + } + } + + @Test + fun testUpdateCashBalance() { + val cashBalanceEntity = VaultCashBalancesEntity() + cashBalanceEntity.currency = "GPB" + cashBalanceEntity.amount = 12345 + data.invoke { + insert(cashBalanceEntity) + } + data.invoke { + val state = findByKey(VaultCashBalancesEntity::class, cashBalanceEntity.currency) + assertNotNull(state) + state?.let { + state.amount += 10000 + update(state) + val result = select(VaultCashBalancesEntity::class) + assertEquals(22345, result().first().amount) + } + } + } + + @Test + fun testUpsertCashBalance() { + val cashBalanceEntity = VaultCashBalancesEntity() + cashBalanceEntity.currency = "GPB" + cashBalanceEntity.amount = 12345 + data.invoke { + val state = findByKey(VaultCashBalancesEntity::class, cashBalanceEntity.currency) + state?.let { + state.amount += 10000 + } + val result = upsert(state ?: cashBalanceEntity) + assertEquals(12345, result.amount) + } + } + + @Test + fun testAllUnconsumedStates() { + data.invoke { + transaction!!.inputs.forEach { + insert(createStateEntity(it)) + } + } + val stateAndRefs = unconsumedStates() + assertNotNull(stateAndRefs) + assertTrue { stateAndRefs.size == 3 } + } + + @Test + fun tesUnconsumedDummyStates() { + data.invoke { + transaction!!.inputs.forEach { + insert(createStateEntity(it)) + } + } + val stateAndRefs = unconsumedStates() + assertNotNull(stateAndRefs) + assertTrue { stateAndRefs.size == 2 } + } + + @Test + fun tesUnconsumedDummySingleOwnerStates() { + data.invoke { + transaction!!.inputs.forEach { + insert(createStateEntity(it)) + } + } + val stateAndRefs = unconsumedStates() + assertNotNull(stateAndRefs) + assertTrue { stateAndRefs.size == 1 } + } + + inline fun unconsumedStates(): List> { + val stateAndRefs = + data.invoke { + val result = select(VaultSchema.VaultStates::class) + .where(VaultSchema.VaultStates::stateStatus eq Vault.StateStatus.UNCONSUMED) + result.get() + .map { it -> + val stateRef = StateRef(SecureHash.parse(it.txId), it.index) + val state = it.contractState.deserialize>() + StateAndRef(state, stateRef) + }.filter { + T::class.java.isAssignableFrom(it.state.data.javaClass) + }.toList() + } + return stateAndRefs + } + + /** + * Observables testing + */ + @Test + @Throws(Exception::class) + fun testInsert() { + val stateEntity = createStateEntity(transaction!!.inputs[0]) + val latch = CountDownLatch(1) + odata.insert(stateEntity).subscribe { stateEntity -> + Assert.assertNotNull(stateEntity.txId) + Assert.assertTrue(stateEntity.txId.isNotEmpty()) + val cached = data.select(VaultSchema.VaultStates::class) + .where(VaultSchema.VaultStates::txId.eq(stateEntity.txId)).get().first() + Assert.assertSame(cached, stateEntity) + latch.countDown() + } + latch.await() + } + + @Test + @Throws(Exception::class) + fun testInsertCount() { + val stateEntity = createStateEntity(transaction!!.inputs[0]) + Observable.just(stateEntity) + .concatMap { person -> odata.insert(person).toObservable() } + odata.insert(stateEntity).toBlocking().value() + Assert.assertNotNull(stateEntity.txId) + Assert.assertTrue(stateEntity.txId.isNotEmpty()) + val count = data.count(VaultSchema.VaultStates::class).get().value() + Assert.assertEquals(1, count.toLong()) + } + + @Test + @Throws(Exception::class) + fun testQueryEmpty() { + val latch = CountDownLatch(1) + odata.select(VaultSchema.VaultStates::class).get().toObservable() + .subscribe({ Assert.fail() }, { Assert.fail() }) { latch.countDown() } + if (!latch.await(1, TimeUnit.SECONDS)) { + Assert.fail() + } + } + + @Test + @Throws(Exception::class) + fun testQueryObservable() { + transaction!!.inputs.forEach { + val stateEntity = createStateEntity(it) + odata.insert(stateEntity).toBlocking().value() + } + val states = ArrayList() + odata.select(VaultSchema.VaultStates::class).get() + .toObservable() + .subscribe { it -> states.add(it as VaultStatesEntity) } + Assert.assertEquals(3, states.size) + } + + @Test + fun testQueryWithCompositeKey() { + // txn entity with 4 input states (SingleOwnerState x 3, MultiOwnerState x 1) + val txn = createTxnWithTwoStateTypes() + dummyStatesInsert(txn) + + data.invoke { + // Requery does not support SQL-92 select by composite key: + // Raised Issue: + // https://github.com/requery/requery/issues/434 + + // Test Requery raw query for single key field + val refs = txn.inputs.map { it.ref } + val objArgsTxHash = refs.map { it.txhash.toString() } + val objArgsIndex = refs.map { it.index } + + val queryByTxHashString = "SELECT * FROM VAULT_STATES WHERE transaction_id IN ?" + val resultRawQueryTxHash = raw(VaultStatesEntity::class, queryByTxHashString, *objArgsTxHash.toTypedArray()) + assertEquals(8, resultRawQueryTxHash.count()) + + val queryByIndexString = "SELECT * FROM VAULT_STATES WHERE output_index IN ?" + val resultRawQueryIndex = raw(VaultStatesEntity::class, queryByIndexString, *objArgsIndex.toTypedArray()) + assertEquals(18, resultRawQueryIndex.count()) + + // Use JDBC native query for composite key + val stateRefs = refs.fold("") { stateRefs, it -> stateRefs + "('${it.txhash}','${it.index}')," }.dropLast(1) + val statement = jdbcConn.createStatement() + val rs = statement.executeQuery("SELECT transaction_id, output_index, contract_state FROM VAULT_STATES WHERE ((transaction_id, output_index) IN ($stateRefs)) AND (state_status = 0)") + var count = 0 + while (rs.next()) count++ + assertEquals(3, count) + } + } +} \ No newline at end of file diff --git a/node/build.gradle b/node/build.gradle index a061ad7f4f..82dc0f5e18 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -51,7 +51,10 @@ sourceSets { // build/reports/project/dependencies/index.html for green highlighted parts of the tree. dependencies { + compile project(':finance') + compile project(':node-schemas') + compile "com.google.code.findbugs:jsr305:3.0.1" // Log4J: logging framework (with SLF4J bindings) diff --git a/node/src/main/kotlin/net/corda/node/internal/APIServerImpl.kt b/node/src/main/kotlin/net/corda/node/internal/APIServerImpl.kt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index a01f53c47e..d02d58b720 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -203,7 +203,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, netMapCache = InMemoryNetworkMapCache() net = makeMessagingService() schemas = makeSchemaService() - vault = makeVaultService() + vault = makeVaultService(configuration.dataSourceProperties) info = makeInfo() identity = makeIdentityService() @@ -452,7 +452,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, } // TODO: sort out ordering of open & protected modifiers of functions in this class. - protected open fun makeVaultService(): VaultService = NodeVaultService(services) + protected open fun makeVaultService(dataSourceProperties: Properties): VaultService = NodeVaultService(services, dataSourceProperties) protected open fun makeSchemaService(): SchemaService = NodeSchemaService() diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 0b7dd19fd4..a0132f8423 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -48,7 +48,7 @@ class CordaRPCOpsImpl( override fun vaultAndUpdates(): Pair>, Observable> { return databaseTransaction(database) { val (vault, updates) = services.vaultService.track() - Pair(vault.states, updates) + Pair(vault.states.toList(), updates) } } diff --git a/node/src/main/kotlin/net/corda/node/services/database/KotlinConfigurationTransactionWrapper.kt b/node/src/main/kotlin/net/corda/node/services/database/KotlinConfigurationTransactionWrapper.kt new file mode 100644 index 0000000000..5c6973242f --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/database/KotlinConfigurationTransactionWrapper.kt @@ -0,0 +1,149 @@ +package net.corda.node.services.database + +import io.requery.EntityCache +import io.requery.TransactionIsolation +import io.requery.TransactionListener +import io.requery.cache.WeakEntityCache +import io.requery.meta.EntityModel +import io.requery.sql.* +import io.requery.sql.platform.H2 +import io.requery.util.function.Function +import io.requery.util.function.Supplier +import net.corda.core.schemas.requery.converters.InstantConverter +import net.corda.core.schemas.requery.converters.StateRefConverter +import net.corda.core.schemas.requery.converters.VaultStateStatusConverter +import org.jetbrains.exposed.sql.transactions.TransactionManager +import java.sql.Connection +import java.util.* +import java.util.concurrent.Executor +import javax.sql.DataSource + +/** + * Requery KotlinConfiguration wrapper class to enable us to pass in an existing database connection and + * associated transaction context. + */ +class KotlinConfigurationTransactionWrapper(private val model: EntityModel, + dataSource: DataSource, + private val mapping: Mapping? = null, + private val platform: Platform? = null, + private val cache: EntityCache = WeakEntityCache(), + private val useDefaultLogging: Boolean = false, + private val statementCacheSize: Int = 0, + private val batchUpdateSize: Int = 64, + private val quoteTableNames: Boolean = false, + private val quoteColumnNames: Boolean = false, + private val tableTransformer: Function? = null, + private val columnTransformer: Function? = null, + private val transactionMode: TransactionMode = TransactionMode.NONE, + private val transactionIsolation: TransactionIsolation? = null, + private val statementListeners: Set = LinkedHashSet(), + private val entityStateListeners: Set> = LinkedHashSet(), + private val transactionListeners: Set> = LinkedHashSet(), + private val writeExecutor: Executor? = null) : Configuration { + + private val connectionProvider = CordaDataSourceConnectionProvider(dataSource) + + override fun getBatchUpdateSize(): Int { + return batchUpdateSize + } + + override fun getConnectionProvider(): ConnectionProvider? { + return connectionProvider + } + + override fun getCache(): EntityCache? { + return cache + } + + override fun getEntityStateListeners(): Set> { + return entityStateListeners + } + + override fun getMapping(): Mapping? { + // TODO: database platform provider to become configurable and parameterised into this configuration + val customMapping = GenericMapping(H2()) + + // register our custom converters + val instantConverter = InstantConverter() + customMapping.addConverter(instantConverter, instantConverter.mappedType) + val vaultStateStatusConverter = VaultStateStatusConverter() + customMapping.addConverter(vaultStateStatusConverter, vaultStateStatusConverter.mappedType) + customMapping.addConverter(StateRefConverter(), StateRefConverter::getMappedType.javaClass) + return customMapping + } + + override fun getModel(): EntityModel { + return model + } + + override fun getPlatform(): Platform? { + return platform + } + + override fun getQuoteTableNames(): Boolean { + return quoteTableNames + } + + override fun getQuoteColumnNames(): Boolean { + return quoteColumnNames + } + + override fun getTableTransformer(): Function? { + return tableTransformer + } + + override fun getColumnTransformer(): Function? { + return columnTransformer + } + + override fun getStatementCacheSize(): Int { + return statementCacheSize + } + + override fun getStatementListeners(): Set? { + return statementListeners + } + + override fun getTransactionMode(): TransactionMode? { + return transactionMode + } + + override fun getTransactionIsolation(): TransactionIsolation? { + return transactionIsolation + } + + override fun getTransactionListenerFactories(): Set>? { + return transactionListeners + } + + override fun getUseDefaultLogging(): Boolean { + return useDefaultLogging + } + + override fun getWriteExecutor(): Executor? { + return writeExecutor + } + + class CordaDataSourceConnectionProvider(val dataSource: DataSource) : ConnectionProvider { + override fun getConnection(): Connection { + val tx = TransactionManager.manager.currentOrNull() + return CordaConnection( + tx?.connection ?: + TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ).connection + ) + } + } + + class CordaConnection(val connection: Connection) : Connection by connection { + override fun close() { + // TODO: address requery auto-closing the connection in SchemaModifier upon table creation + // https://github.com/requery/requery/issues/424 + } + + override fun setAutoCommit(autoCommit: Boolean) { + // TODO: address requery bug in ConnectionTransaction commit() + // https://github.com/requery/requery/issues/423 + connection.autoCommit = false + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/database/RequeryConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/database/RequeryConfiguration.kt new file mode 100644 index 0000000000..16ca32f87e --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/database/RequeryConfiguration.kt @@ -0,0 +1,52 @@ +package net.corda.node.services.database + +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import io.requery.Persistable +import io.requery.meta.EntityModel +import io.requery.sql.* +import net.corda.core.utilities.loggerFor +import org.jetbrains.exposed.sql.transactions.TransactionManager +import java.sql.Connection +import java.util.* +import java.util.concurrent.ConcurrentHashMap + +class RequeryConfiguration(val properties: Properties, val useDefaultLogging: Boolean = false) { + + companion object { + val logger = loggerFor() + } + + // TODO: + // 1. schemaService schemaOptions needs to be applied: eg. default schema, table prefix + // 2. set other generic database configuration options: show_sql, format_sql + // 3. Configure Requery Database platform specific features (see http://requery.github.io/javadoc/io/requery/sql/Platform.html) + // 4. Configure Cache Manager and Cache Provider and set in Requery Configuration (see http://requery.github.io/javadoc/io/requery/EntityCache.html) + // 5. Consider database schema deployment/upgrade strategies to replace dynamic table creation. + + // Note: Annotations are pre-processed using (kapt) so no need to register dynamically + val config = HikariConfig(properties) + val dataSource = HikariDataSource(config) + + // TODO: make this a guava cache or similar to limit ability for this to grow forever. + private val sessionFactories = ConcurrentHashMap>() + + fun sessionForModel(model: EntityModel): KotlinEntityDataStore { + return sessionFactories.computeIfAbsent(model, { makeSessionFactoryForModel(it) }) + } + + fun makeSessionFactoryForModel(model: EntityModel): KotlinEntityDataStore { + val configuration = KotlinConfigurationTransactionWrapper(model, dataSource, useDefaultLogging = this.useDefaultLogging) + val tables = SchemaModifier(configuration) + val mode = TableCreationMode.DROP_CREATE + tables.createTables(mode) + return KotlinEntityDataStore(configuration) + } + + // TODO: remove once Requery supports QUERY WITH COMPOSITE_KEY IN + fun jdbcSession(): Connection { + val ctx = TransactionManager.manager.currentOrNull() + return ctx?.connection ?: TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ).connection + } +} + diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 6d11a1abbb..7ea95da214 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -1,6 +1,8 @@ package net.corda.node.services.vault -import com.google.common.collect.Sets +import io.requery.TransactionIsolation +import io.requery.kotlin.`in` +import io.requery.kotlin.eq import net.corda.contracts.asset.Cash import net.corda.core.ThreadBox import net.corda.core.bufferUntilSubscribed @@ -11,16 +13,20 @@ import net.corda.core.crypto.SecureHash import net.corda.core.node.ServiceHub import net.corda.core.node.services.Vault import net.corda.core.node.services.VaultService +import net.corda.core.node.services.unconsumedStates import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.serialization.createKryo +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize import net.corda.core.tee import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.WireTransaction import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace -import net.corda.node.utilities.* -import org.jetbrains.exposed.sql.ResultRow -import org.jetbrains.exposed.sql.select -import org.jetbrains.exposed.sql.statements.InsertStatement +import net.corda.node.services.database.RequeryConfiguration +import net.corda.node.services.vault.schemas.* +import net.corda.node.utilities.bufferUntilDatabaseCommit +import net.corda.node.utilities.wrapWithDatabaseTransaction import rx.Observable import rx.subjects.PublishSubject import java.security.PublicKey @@ -37,66 +43,16 @@ import java.util.* * TODO: keep an audit trail with time stamps of previously unconsumed states "as of" a particular point in time. * TODO: have transaction storage do some caching. */ -class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsToken(), VaultService { +class NodeVaultService(private val services: ServiceHub, dataSourceProperties: Properties) : SingletonSerializeAsToken(), VaultService { private companion object { val log = loggerFor() } - private object StatesSetTable : JDBCHashedTable("${NODE_DATABASE_PREFIX}vault_unconsumed_states") { - val stateRef = stateRef("transaction_id", "output_index") - } - - private data class TxnNote(val txnId: SecureHash, val note: String) { - override fun toString() = "$txnId: $note" - } - - private object CashBalanceTable : JDBCHashedTable("${NODE_DATABASE_PREFIX}vault_cash_balances") { - val currency = varchar("currency", 3) - val amount = long("amount") - } - - private object TransactionNotesTable : JDBCHashedTable("${NODE_DATABASE_PREFIX}vault_txn_notes") { - val txnId = secureHash("txnId").index() - val note = text("note") - } + val configuration = RequeryConfiguration(dataSourceProperties) + val session = configuration.sessionForModel(Models.VAULT) private val mutex = ThreadBox(content = object { - val unconsumedStates = object : AbstractJDBCHashSet(StatesSetTable) { - override fun elementFromRow(row: ResultRow): StateRef = StateRef(row[table.stateRef.txId], row[table.stateRef.index]) - - override fun addElementToInsert(insert: InsertStatement, entry: StateRef, finalizables: MutableList<() -> Unit>) { - insert[table.stateRef.txId] = entry.txhash - insert[table.stateRef.index] = entry.index - } - } - - val transactionNotes = object : AbstractJDBCHashSet(TransactionNotesTable) { - override fun elementFromRow(row: ResultRow): TxnNote = TxnNote(row[table.txnId], row[table.note]) - - override fun addElementToInsert(insert: InsertStatement, entry: TxnNote, finalizables: MutableList<() -> Unit>) { - insert[table.txnId] = entry.txnId - insert[table.note] = entry.note - } - - // TODO: caching (2nd tier db cache) and db results filtering (max records, date, other) - fun select(txnId: SecureHash): Iterable { - return table.select { table.txnId.eq(txnId) }.map { row -> row[table.note] }.toSet().asIterable() - } - } - - val cashBalances = object : AbstractJDBCHashMap, CashBalanceTable>(CashBalanceTable) { - override fun keyFromRow(row: ResultRow): Currency = Currency.getInstance(row[table.currency]) - override fun valueFromRow(row: ResultRow): Amount = Amount(row[table.amount], keyFromRow(row)) - - override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry>, finalizables: MutableList<() -> Unit>) { - insert[table.currency] = entry.key.currencyCode - } - - override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry>, finalizables: MutableList<() -> Unit>) { - insert[table.amount] = entry.value.quantity - } - } val _updatesPublisher = PublishSubject.create() val _rawUpdatesPublisher = PublishSubject.create() @@ -105,22 +61,39 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT // For use during publishing only. val updatesPublisher: rx.Observer get() = _updatesPublisher.bufferUntilDatabaseCommit().tee(_rawUpdatesPublisher) - fun allUnconsumedStates(): List> { - // Ideally we'd map this transform onto a sequence, but we can't have a lazy list here, since accessing it - // from a flow might end up trying to serialize the captured context - vault internal state or db context. - return unconsumedStates.map { - val storedTx = services.storageService.validatedTransactions.getTransaction(it.txhash) ?: throw Error("Found transaction hash ${it.txhash} in unconsumed contract states that is not in transaction storage.") - StateAndRef(storedTx.tx.outputs[it.index], it) - } - } - fun recordUpdate(update: Vault.Update): Vault.Update { if (update != Vault.NoUpdate) { val producedStateRefs = update.produced.map { it.ref } + val producedStateRefsMap = update.produced.associateBy { it.ref } val consumedStateRefs = update.consumed.map { it.ref } log.trace { "Removing $consumedStateRefs consumed contract states and adding $producedStateRefs produced contract states to the database." } - unconsumedStates.removeAll(consumedStateRefs) - unconsumedStates.addAll(producedStateRefs) + + session.withTransaction(TransactionIsolation.REPEATABLE_READ) { + producedStateRefsMap.forEach { it -> + val state = VaultStatesEntity().apply { + txId = it.key.txhash.toString() + index = it.key.index + stateStatus = Vault.StateStatus.UNCONSUMED + contractStateClassName = it.value.state.data.javaClass.name + // TODO: revisit Kryo bug when using THREAD_LOCAL_KYRO + contractState = it.value.state.serialize(createKryo()).bytes + notaryName = it.value.state.notary.name + notaryKey = it.value.state.notary.owningKey.toBase58String() + recordedTime = services.clock.instant() + } + insert(state) + } + consumedStateRefs.forEach { stateRef -> + val queryKey = io.requery.proxy.CompositeKey(mapOf(VaultStatesEntity.TX_ID to stateRef.txhash.toString(), + VaultStatesEntity.INDEX to stateRef.index)) + val state = findByKey(VaultStatesEntity::class, queryKey) + state?.run { + stateStatus = Vault.StateStatus.CONSUMED + consumedTime = services.clock.instant() + update(state) + } + } + } } return update } @@ -133,8 +106,18 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT (produced.keys + consumed.keys).map { currency -> val producedAmount = produced[currency] ?: Amount(0, currency) val consumedAmount = consumed[currency] ?: Amount(0, currency) - val currentBalance = cashBalances[currency] ?: Amount(0, currency) - cashBalances[currency] = currentBalance + producedAmount - consumedAmount + + val cashBalanceEntity = VaultCashBalancesEntity() + cashBalanceEntity.currency = currency.currencyCode + cashBalanceEntity.amount = producedAmount.quantity - consumedAmount.quantity + + session.withTransaction(TransactionIsolation.REPEATABLE_READ) { + val state = findByKey(VaultCashBalancesEntity::class, currency.currencyCode) + state?.run { + amount += producedAmount.quantity - consumedAmount.quantity + } + upsert(state ?: cashBalanceEntity) + } } } } @@ -147,9 +130,15 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT } }) - override val cashBalances: Map> get() = mutex.locked { HashMap(cashBalances) } - - override val currentVault: Vault get() = mutex.locked { Vault(allUnconsumedStates()) } + override val cashBalances: Map> get() { + val cashBalancesByCurrency = + session.withTransaction(TransactionIsolation.REPEATABLE_READ) { + val balances = select(VaultSchema.VaultCashBalances::class) + balances.get().toList() + } + return cashBalancesByCurrency.associateBy({ Currency.getInstance(it.currency) }, + { Amount(it.amount, Currency.getInstance(it.currency)) }) + } override val rawUpdates: Observable get() = mutex.locked { _rawUpdatesPublisher } @@ -157,23 +146,55 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT override val updates: Observable get() = mutex.locked { _updatesInDbTx } - override fun track(): Pair> { + override fun track(): Pair, Observable> { return mutex.locked { - Pair(Vault(allUnconsumedStates()), _updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction()) + Pair(Vault(unconsumedStates()), _updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction()) } } - /** - * Returns a snapshot of the heads of LinearStates. - * - * TODO: Represent this using an actual JDBCHashMap or look at vault design further. - */ - override val linearHeads: Map> - get() = currentVault.states.filterStatesOfType().associateBy { it.state.data.linearId }.mapValues { it.value } + override fun states(clazzes: Set>, statuses: EnumSet): List> { + val stateAndRefs = + session.withTransaction(TransactionIsolation.REPEATABLE_READ) { + var result = select(VaultSchema.VaultStates::class) + .where(VaultSchema.VaultStates::stateStatus `in` statuses) + // TODO: temporary fix to continue supporting track() function (until becomes Typed) + if (!clazzes.map {it.name}.contains(ContractState::class.java.name)) + result.and (VaultSchema.VaultStates::contractStateClassName `in` (clazzes.map { it.name })) + result.get() + .map { it -> + val stateRef = StateRef(SecureHash.parse(it.txId), it.index) + // TODO: revisit Kryo bug when using THREAD_LOCAL_KYRO + val state = it.contractState.deserialize>(createKryo()) + StateAndRef(state, stateRef) + }.toList() + } + return stateAndRefs + } + + override fun statesForRefs(refs: List): Map?> { + val stateAndRefs = + session.withTransaction(TransactionIsolation.REPEATABLE_READ) { + var results: List> = emptyList() + refs.forEach { + val result = select(VaultSchema.VaultStates::class) + .where(VaultSchema.VaultStates::stateStatus eq Vault.StateStatus.UNCONSUMED) + .and(VaultSchema.VaultStates::txId eq it.txhash.toString()) + .and(VaultSchema.VaultStates::index eq it.index) + result.get()?.each { + val stateRef = StateRef(SecureHash.parse(it.txId), it.index) + val state = it.contractState.deserialize>() + results += StateAndRef(state, stateRef) + } + } + results + } + + return stateAndRefs.associateBy({ it.ref }, { it.state }) + } override fun notifyAll(txns: Iterable) { val ourKeys = services.keyManagementService.keys.keys - val netDelta = txns.fold(Vault.NoUpdate) { netDelta, txn -> netDelta + makeUpdate(txn, netDelta, ourKeys) } + val netDelta = txns.fold(Vault.NoUpdate) { netDelta, txn -> netDelta + makeUpdate(txn, ourKeys) } if (netDelta != Vault.NoUpdate) { mutex.locked { recordUpdate(netDelta) @@ -184,14 +205,17 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT } override fun addNoteToTransaction(txnId: SecureHash, noteText: String) { - mutex.locked { - transactionNotes.add(TxnNote(txnId, noteText)) + session.withTransaction(TransactionIsolation.REPEATABLE_READ) { + val txnNoteEntity = VaultTxnNoteEntity() + txnNoteEntity.txId = txnId.toString() + txnNoteEntity.note = noteText + insert(txnNoteEntity) } } override fun getTransactionNotes(txnId: SecureHash): Iterable { - mutex.locked { - return transactionNotes.select(txnId) + return session.withTransaction(TransactionIsolation.REPEATABLE_READ) { + (select(VaultSchema.VaultTxnNote::class) where (VaultSchema.VaultTxnNote::txId eq txnId.toString())).get().asIterable().map { it.note } } } @@ -219,7 +243,7 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT // // Finally, we add the states to the provided partial transaction. - val assetsStates = currentVault.statesOfType() + val assetsStates = unconsumedStates() val currency = amount.token var acceptableCoins = run { @@ -308,32 +332,36 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT return Pair(gathered, gatheredAmount) } - private fun makeUpdate(tx: WireTransaction, netDelta: Vault.Update, ourKeys: Set): Vault.Update { + private fun makeUpdate(tx: WireTransaction, ourKeys: Set): Vault.Update { val ourNewStates = tx.outputs. filter { isRelevant(it.data, ourKeys) }. map { tx.outRef(it.data) } - // Now calculate the states that are being spent by this transaction. - val consumedRefs = tx.inputs.toHashSet() - // We use Guava union here as it's lazy for contains() which is how retainAll() is implemented. - // i.e. retainAll() iterates over consumed, checking contains() on the parameter. Sets.union() does not physically create - // a new collection and instead contains() just checks the contains() of both parameters, and so we don't end up - // iterating over all (a potentially very large) unconsumedStates at any point. - mutex.locked { - consumedRefs.retainAll(Sets.union(netDelta.produced, unconsumedStates)) + // Retrieve all unconsumed states for this transaction's inputs + val consumedStates = HashSet>() + if (tx.inputs.isNotEmpty()) { + val stateRefs = tx.inputs.fold("") { stateRefs, it -> stateRefs + "('${it.txhash}','${it.index}')," }.dropLast(1) + // TODO: using native JDBC until requery supports SELECT WHERE COMPOSITE_KEY IN + // https://github.com/requery/requery/issues/434 + val statement = configuration.jdbcSession().createStatement() + val rs = statement.executeQuery("SELECT transaction_id, output_index, contract_state " + + "FROM vault_states " + + "WHERE ((transaction_id, output_index) IN ($stateRefs)) " + + "AND (state_status = 0)") + while (rs.next()) { + val txHash = SecureHash.parse(rs.getString(1)) + val index = rs.getInt(2) + val state = rs.getBytes(3).deserialize>(createKryo()) + consumedStates.add(StateAndRef(state, StateRef(txHash, index))) + } } // Is transaction irrelevant? - if (consumedRefs.isEmpty() && ourNewStates.isEmpty()) { + if (consumedStates.isEmpty() && ourNewStates.isEmpty()) { log.trace { "tx ${tx.id} was irrelevant to this vault, ignoring" } return Vault.NoUpdate } - val consumedStates = consumedRefs.map { - val state = services.loadState(it) - StateAndRef(state, it) - }.toSet() - return Vault.Update(consumedStates, ourNewStates.toHashSet()) } diff --git a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt index 634a2855ac..82ced2da6a 100644 --- a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt @@ -7,6 +7,7 @@ import net.corda.core.messaging.StateMachineUpdate import net.corda.core.messaging.startFlow import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.Vault +import net.corda.core.node.services.unconsumedStates import net.corda.core.serialization.OpaqueBytes import net.corda.core.transactions.SignedTransaction import net.corda.flows.CashIssueFlow @@ -67,7 +68,7 @@ class CordaRPCOpsImplTest { // Check the monitoring service wallet is empty databaseTransaction(aliceNode.database) { - assertFalse(aliceNode.services.vaultService.currentVault.states.iterator().hasNext()) + assertFalse(aliceNode.services.vaultService.unconsumedStates().iterator().hasNext()) } // Tell the monitoring service node to issue some cash diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt index de17df82a4..3f11bcda92 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt @@ -488,7 +488,7 @@ class TwoPartyTradeFlowTests { withError: Boolean, owner: CompositeKey, issuer: AnonymousParty, - notary: Party): Pair> { + notary: Party): Pair, List> { val interimOwnerKey = MEGA_CORP_PUBKEY // Bob (Buyer) has some cash he got from the Bank of Elbonia, Alice (Seller) has some commercial paper she // wants to sell to Bob. @@ -526,7 +526,7 @@ class TwoPartyTradeFlowTests { this.verifies() } - val vault = Vault(listOf("bob cash 1".outputStateAndRef(), "bob cash 2".outputStateAndRef())) + val vault = Vault(listOf("bob cash 1".outputStateAndRef(), "bob cash 2".outputStateAndRef())) return Pair(vault, listOf(eb1, bc1, bc2)) } @@ -535,7 +535,7 @@ class TwoPartyTradeFlowTests { owner: CompositeKey, amount: Amount>, attachmentID: SecureHash?, - notary: Party): Pair> { + notary: Party): Pair, List> { val ap = transaction(transactionBuilder = TransactionBuilder(notary = notary)) { output("alice's paper", notary = notary) { CommercialPaper.State(MEGA_CORP.ref(1, 2, 3), owner, amount, TEST_TX_TIME + 7.days) @@ -552,7 +552,7 @@ class TwoPartyTradeFlowTests { } } - val vault = Vault(listOf("alice's paper".outputStateAndRef())) + val vault = Vault(listOf("alice's paper".outputStateAndRef())) return Pair(vault, listOf(ap)) } diff --git a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt index 0309eaa766..2a9ee03122 100644 --- a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt +++ b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt @@ -16,7 +16,6 @@ import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.persistence.DataVending import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.statemachine.StateMachineManager -import net.corda.node.services.vault.NodeVaultService import net.corda.testing.MOCK_IDENTITY_SERVICE import net.corda.testing.node.MockNetworkMapCache import net.corda.testing.node.MockStorageService @@ -24,9 +23,8 @@ import java.time.Clock import java.util.concurrent.ConcurrentHashMap import kotlin.reflect.KClass -@Suppress("LeakingThis") open class MockServiceHubInternal( - customVault: VaultService? = null, + val customVault: VaultService? = null, val keyManagement: KeyManagementService? = null, val net: MessagingServiceInternal? = null, val identity: IdentityService? = MOCK_IDENTITY_SERVICE, @@ -37,7 +35,8 @@ open class MockServiceHubInternal( val flowFactory: FlowLogicRefFactory? = FlowLogicRefFactory(), val schemas: SchemaService? = NodeSchemaService() ) : ServiceHubInternal() { - override val vaultService: VaultService = customVault ?: NodeVaultService(this) + override val vaultService: VaultService + get() = customVault ?: throw UnsupportedOperationException() override val keyManagementService: KeyManagementService get() = keyManagement ?: throw UnsupportedOperationException() override val identityService: IdentityService diff --git a/node/src/test/kotlin/net/corda/node/services/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/NodeSchedulerServiceTest.kt index 1ed483629e..e75762dd48 100644 --- a/node/src/test/kotlin/net/corda/node/services/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/NodeSchedulerServiceTest.kt @@ -9,8 +9,10 @@ import net.corda.core.flows.FlowLogicRef import net.corda.core.flows.FlowLogicRefFactory import net.corda.core.node.ServiceHub import net.corda.core.node.recordTransactions +import net.corda.core.node.services.VaultService import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.DUMMY_NOTARY +import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.events.NodeSchedulerService import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.statemachine.StateMachineManager @@ -74,7 +76,8 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() { countDown = CountDownLatch(1) smmHasRemovedAllFlows = CountDownLatch(1) calls = 0 - val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties()) + val dataSourceProps = makeTestDataSourceProperties() + val dataSourceAndDatabase = configureDatabase(dataSourceProps) dataSource = dataSourceAndDatabase.first database = dataSourceAndDatabase.second @@ -82,6 +85,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() { val kms = MockKeyManagementService(ALICE_KEY) val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.PeerHandle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), database) services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference { + override val vaultService: VaultService = NodeVaultService(this, dataSourceProps) override val testReference = this@NodeSchedulerServiceTest } scheduler = NodeSchedulerService(database, services, factory, schedulerGatedExecutor) diff --git a/node/src/test/kotlin/net/corda/node/services/NodeVaultServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/NodeVaultServiceTest.kt index 744e4c5073..3687d119ed 100644 --- a/node/src/test/kotlin/net/corda/node/services/NodeVaultServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/NodeVaultServiceTest.kt @@ -9,6 +9,7 @@ import net.corda.core.contracts.`issued by` import net.corda.core.crypto.composite import net.corda.core.node.services.TxWritableStorageService import net.corda.core.node.services.VaultService +import net.corda.core.node.services.unconsumedStates import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.LogHelper @@ -31,11 +32,12 @@ import kotlin.test.assertEquals class NodeVaultServiceTest { lateinit var dataSource: Closeable lateinit var database: Database + private val dataSourceProps = makeTestDataSourceProperties() @Before fun setUp() { LogHelper.setLevel(NodeVaultService::class) - val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties()) + val dataSourceAndDatabase = configureDatabase(dataSourceProps) dataSource = dataSourceAndDatabase.first database = dataSourceAndDatabase.second } @@ -50,7 +52,7 @@ class NodeVaultServiceTest { fun `states not local to instance`() { databaseTransaction(database) { val services1 = object : MockServices() { - override val vaultService: VaultService = NodeVaultService(this) + override val vaultService: VaultService = NodeVaultService(this, dataSourceProps) override fun recordTransactions(txs: Iterable) { for (stx in txs) { @@ -61,12 +63,13 @@ class NodeVaultServiceTest { } services1.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) - val w1 = services1.vaultService.currentVault - assertThat(w1.states).hasSize(3) + val w1 = services1.vaultService.unconsumedStates() + assertThat(w1).hasSize(3) val originalStorage = services1.storageService + val originalVault = services1.vaultService val services2 = object : MockServices() { - override val vaultService: VaultService = NodeVaultService(this) + override val vaultService: VaultService get() = originalVault // We need to be able to find the same transactions as before, too. override val storageService: TxWritableStorageService get() = originalStorage @@ -79,8 +82,32 @@ class NodeVaultServiceTest { } } - val w2 = services2.vaultService.currentVault - assertThat(w2.states).hasSize(3) + val w2 = services2.vaultService.unconsumedStates() + assertThat(w2).hasSize(3) + } + } + + @Test + fun `states for refs`() { + databaseTransaction(database) { + val services1 = object : MockServices() { + override val vaultService: VaultService = NodeVaultService(this, dataSourceProps) + + override fun recordTransactions(txs: Iterable) { + for (stx in txs) { + storageService.validatedTransactions.addTransaction(stx) + vaultService.notify(stx.tx) + } + } + } + services1.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) + + val w1 = services1.vaultService.unconsumedStates() + assertThat(w1).hasSize(3) + + val stateRefs = listOf(w1[1].ref, w1[2].ref) + val states = services1.vaultService.statesForRefs(stateRefs) + assertThat(states).hasSize(2) } } @@ -88,7 +115,7 @@ class NodeVaultServiceTest { fun addNoteToTransaction() { databaseTransaction(database) { val services = object : MockServices() { - override val vaultService: VaultService = NodeVaultService(this) + override val vaultService: VaultService = NodeVaultService(this, dataSourceProps) override fun recordTransactions(txs: Iterable) { for (stx in txs) { diff --git a/node/src/test/kotlin/net/corda/node/services/VaultWithCashTest.kt b/node/src/test/kotlin/net/corda/node/services/VaultWithCashTest.kt index c47d2906aa..b20830a935 100644 --- a/node/src/test/kotlin/net/corda/node/services/VaultWithCashTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/VaultWithCashTest.kt @@ -3,10 +3,14 @@ package net.corda.node.services import net.corda.contracts.asset.Cash import net.corda.contracts.asset.DUMMY_CASH_ISSUER import net.corda.contracts.testing.fillWithSomeTestCash +import net.corda.contracts.testing.fillWithSomeTestDeals +import net.corda.contracts.testing.fillWithSomeTestLinearStates import net.corda.core.contracts.* import net.corda.core.crypto.composite import net.corda.core.node.recordTransactions import net.corda.core.node.services.VaultService +import net.corda.core.node.services.consumedStates +import net.corda.core.node.services.unconsumedStates import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY_KEY @@ -14,7 +18,10 @@ import net.corda.core.utilities.LogHelper import net.corda.node.services.vault.NodeVaultService import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.databaseTransaction -import net.corda.testing.* +import net.corda.testing.BOB_KEY +import net.corda.testing.BOB_PUBKEY +import net.corda.testing.MEGA_CORP +import net.corda.testing.MEGA_CORP_KEY import net.corda.testing.node.MockServices import net.corda.testing.node.makeTestDataSourceProperties import org.assertj.core.api.Assertions.assertThatThrownBy @@ -37,13 +44,14 @@ class VaultWithCashTest { @Before fun setUp() { - LogHelper.setLevel(NodeVaultService::class) - val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties()) + LogHelper.setLevel(VaultWithCashTest::class) + val dataSourceProps = makeTestDataSourceProperties() + val dataSourceAndDatabase = configureDatabase(dataSourceProps) dataSource = dataSourceAndDatabase.first database = dataSourceAndDatabase.second databaseTransaction(database) { services = object : MockServices() { - override val vaultService: VaultService = NodeVaultService(this) + override val vaultService: VaultService = NodeVaultService(this, dataSourceProps) override fun recordTransactions(txs: Iterable) { for (stx in txs) { @@ -58,7 +66,7 @@ class VaultWithCashTest { @After fun tearDown() { - LogHelper.reset(NodeVaultService::class) + LogHelper.reset(VaultWithCashTest::class) dataSource.close() } @@ -68,15 +76,15 @@ class VaultWithCashTest { // Fix the PRNG so that we get the same splits every time. services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) - val w = vault.currentVault - assertEquals(3, w.states.toList().size) + val w = vault.unconsumedStates() + assertEquals(3, w.toList().size) - val state = w.states.toList()[0].state.data as Cash.State + val state = w.toList()[0].state.data assertEquals(30.45.DOLLARS `issued by` DUMMY_CASH_ISSUER, state.amount) assertEquals(services.key.public.composite, state.owner) - assertEquals(34.70.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states.toList()[2].state.data as Cash.State).amount) - assertEquals(34.85.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.states.toList()[1].state.data as Cash.State).amount) + assertEquals(34.70.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.toList()[2].state.data).amount) + assertEquals(34.85.DOLLARS `issued by` DUMMY_CASH_ISSUER, (w.toList()[1].state.data).amount) } } @@ -119,7 +127,6 @@ class VaultWithCashTest { } } - @Test fun `branching LinearStates fails to verify`() { databaseTransaction(database) { @@ -128,8 +135,8 @@ class VaultWithCashTest { // Issue a linear state val dummyIssue = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply { - addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite))) - addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite))) + addOutputState(net.corda.contracts.testing.DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite))) + addOutputState(net.corda.contracts.testing.DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite))) signWith(freshKey) signWith(DUMMY_NOTARY_KEY) }.toSignedTransaction() @@ -149,7 +156,7 @@ class VaultWithCashTest { // Issue a linear state val dummyIssue = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply { - addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite))) + addOutputState(net.corda.contracts.testing.DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite))) signWith(freshKey) signWith(DUMMY_NOTARY_KEY) }.toSignedTransaction() @@ -157,11 +164,11 @@ class VaultWithCashTest { dummyIssue.toLedgerTransaction(services).verify() services.recordTransactions(dummyIssue) - assertEquals(1, vault.currentVault.states.toList().size) + assertEquals(1, vault.unconsumedStates().size) // Move the same state val dummyMove = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply { - addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite))) + addOutputState(net.corda.contracts.testing.DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public.composite))) addInputState(dummyIssue.tx.outRef(0)) signWith(DUMMY_NOTARY_KEY) }.toSignedTransaction() @@ -169,7 +176,74 @@ class VaultWithCashTest { dummyIssue.toLedgerTransaction(services).verify() services.recordTransactions(dummyMove) - assertEquals(1, vault.currentVault.states.toList().size) + assertEquals(1, vault.unconsumedStates().size) + } + } + + @Test + fun `spending cash in vault of mixed state types works`() { + + val freshKey = services.keyManagementService.freshKey() + databaseTransaction(database) { + services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L), ownedBy = freshKey.public.composite) + services.fillWithSomeTestCash(100.SWISS_FRANCS, DUMMY_NOTARY, 2, 2, Random(0L)) + services.fillWithSomeTestCash(100.POUNDS, DUMMY_NOTARY, 1, 1, Random(0L)) + val cash = vault.unconsumedStates() + cash.forEach { println(it.state.data.amount) } + + services.fillWithSomeTestDeals(listOf("123","456","789")) + val deals = vault.unconsumedStates() + deals.forEach { println(it.state.data.ref) } + } + + databaseTransaction(database) { + // A tx that spends our money. + val spendTX = TransactionType.General.Builder(DUMMY_NOTARY).apply { + vault.generateSpend(this, 80.DOLLARS, BOB_PUBKEY) + signWith(freshKey) + signWith(DUMMY_NOTARY_KEY) + }.toSignedTransaction() + services.recordTransactions(spendTX) + + val consumedStates = vault.consumedStates() + assertEquals(3, consumedStates.count()) + + val unconsumedStates = vault.unconsumedStates() + assertEquals(7, unconsumedStates.count()) + } + } + + @Test + fun `consuming multiple contract state types in same transaction`() { + + val freshKey = services.keyManagementService.freshKey() + databaseTransaction(database) { + + services.fillWithSomeTestDeals(listOf("123","456","789")) + val deals = vault.unconsumedStates() + deals.forEach { println(it.state.data.ref) } + + services.fillWithSomeTestLinearStates(3) + val linearStates = vault.unconsumedStates() + linearStates.forEach { println(it.state.data.linearId) } + + // Create a txn consuming different contract types + val dummyMove = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply { + addOutputState(net.corda.contracts.testing.DummyLinearContract.State(participants = listOf(freshKey.public.composite))) + addOutputState(net.corda.contracts.testing.DummyDealContract.State(ref = "999", participants = listOf(freshKey.public.composite))) + addInputState(linearStates[0]) + addInputState(deals[0]) + signWith(DUMMY_NOTARY_KEY) + }.toSignedTransaction() + + dummyMove.toLedgerTransaction(services).verify() + services.recordTransactions(dummyMove) + + val consumedStates = vault.consumedStates() + assertEquals(2, consumedStates.count()) + + val unconsumedStates = vault.unconsumedStates() + assertEquals(6, unconsumedStates.count()) } } } diff --git a/node/src/test/kotlin/net/corda/node/services/database/RequeryConfigurationTest.kt b/node/src/test/kotlin/net/corda/node/services/database/RequeryConfigurationTest.kt new file mode 100644 index 0000000000..862b3fdeff --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/database/RequeryConfigurationTest.kt @@ -0,0 +1,172 @@ +package net.corda.node.services.database + +import io.requery.Persistable +import io.requery.kotlin.eq +import io.requery.sql.KotlinEntityDataStore +import net.corda.core.contracts.DummyContract +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionType +import net.corda.core.crypto.DigitalSignature +import net.corda.core.crypto.NullPublicKey +import net.corda.core.crypto.SecureHash +import net.corda.core.node.services.Vault +import net.corda.core.serialization.createKryo +import net.corda.core.serialization.serialize +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.WireTransaction +import net.corda.core.utilities.DUMMY_NOTARY +import net.corda.core.utilities.DUMMY_PUBKEY_1 +import net.corda.node.services.persistence.DBTransactionStorage +import net.corda.node.services.vault.schemas.Models +import net.corda.node.services.vault.schemas.VaultCashBalancesEntity +import net.corda.node.services.vault.schemas.VaultSchema +import net.corda.node.services.vault.schemas.VaultStatesEntity +import net.corda.node.utilities.configureDatabase +import net.corda.node.utilities.databaseTransaction +import net.corda.testing.node.makeTestDataSourceProperties +import org.assertj.core.api.Assertions +import org.jetbrains.exposed.sql.Database +import org.junit.After +import org.junit.Assert.assertEquals +import org.junit.Before +import org.junit.Test +import java.io.Closeable +import java.time.Instant +import java.util.* + +class RequeryConfigurationTest { + + lateinit var dataSource: Closeable + lateinit var database: Database + lateinit var transactionStorage: DBTransactionStorage + lateinit var requerySession: KotlinEntityDataStore + + @Before + fun setUp() { + val dataSourceProperties = makeTestDataSourceProperties() + val dataSourceAndDatabase = configureDatabase(dataSourceProperties) + dataSource = dataSourceAndDatabase.first + database = dataSourceAndDatabase.second + newTransactionStorage() + newRequeryStorage(dataSourceProperties) + } + + @After + fun cleanUp() { + dataSource.close() + } + + @Test + fun `transaction inserts in same DB transaction scope across two persistence engines`() { + val txn = newTransaction() + + databaseTransaction(database) { + transactionStorage.addTransaction(txn) + requerySession.withTransaction { + insert(createVaultStateEntity(txn)) + } + } + + databaseTransaction(database) { + Assertions.assertThat(transactionStorage.transactions).containsOnly(txn) + requerySession.withTransaction { + val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq txn.tx.inputs[0].txhash.toString()) + Assertions.assertThat(result.get().first().txId).isEqualTo(txn.tx.inputs[0].txhash.toString()) + } + } + } + + @Test + fun `transaction operations in same DB transaction scope across two persistence engines`() { + val txn = newTransaction() + + databaseTransaction(database) { + transactionStorage.addTransaction(txn) + requerySession.withTransaction { + upsert(createCashBalance()) + select(VaultSchema.VaultCashBalances::class).get().first() + insert(createVaultStateEntity(txn)) + } + } + + databaseTransaction(database) { + Assertions.assertThat(transactionStorage.transactions).containsOnly(txn) + requerySession.withTransaction { + val cashQuery = select(VaultSchema.VaultCashBalances::class) where (VaultSchema.VaultCashBalances::currency eq "GBP") + assertEquals(12345, cashQuery.get().first().amount) + val stateQuery = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq txn.tx.inputs[0].txhash.toString()) + Assertions.assertThat(stateQuery.get().first().txId).isEqualTo(txn.tx.inputs[0].txhash.toString()) + } + } + } + + @Test + fun `transaction rollback in same DB transaction scope across two persistence engines`() { + val txn = newTransaction() + + databaseTransaction(database) { + transactionStorage.addTransaction(txn) + requerySession.withTransaction { + insert(createVaultStateEntity(txn)) + } + rollback() + } + + databaseTransaction(database) { + Assertions.assertThat(transactionStorage.transactions).isEmpty() + requerySession.withTransaction { + val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq txn.tx.inputs[0].txhash.toString()) + Assertions.assertThat(result.get().count() == 0) + } + } + } + + private fun createVaultStateEntity(txn: SignedTransaction): VaultStatesEntity { + val txnState = txn.tx.inputs[0] + val state = VaultStatesEntity().apply { + txId = txnState.txhash.toString() + index = txnState.index + stateStatus = Vault.StateStatus.UNCONSUMED + contractStateClassName = DummyContract.SingleOwnerState::class.java.name + contractState = DummyContract.SingleOwnerState(owner = DUMMY_PUBKEY_1).serialize(createKryo()).bytes + notaryName = txn.tx.notary!!.name + notaryKey = txn.tx.notary!!.owningKey.toBase58String() + recordedTime = Instant.now() + } + return state + } + + private fun createCashBalance(): VaultCashBalancesEntity { + val cashBalanceEntity = VaultCashBalancesEntity() + cashBalanceEntity.currency = "GBP" + cashBalanceEntity.amount = 12345 + return cashBalanceEntity + } + + private fun newTransactionStorage() { + databaseTransaction(database) { + transactionStorage = DBTransactionStorage() + } + } + + private fun newRequeryStorage(dataSourceProperties: Properties) { + databaseTransaction(database) { + val configuration = RequeryConfiguration(dataSourceProperties, true) + requerySession = configuration.sessionForModel(Models.VAULT) + } + } + + private fun newTransaction(): SignedTransaction { + val wtx = WireTransaction( + inputs = listOf(StateRef(SecureHash.randomSHA256(), 0)), + attachments = emptyList(), + outputs = emptyList(), + commands = emptyList(), + notary = DUMMY_NOTARY, + signers = emptyList(), + type = TransactionType.General(), + timestamp = null + ) + return SignedTransaction(wtx.serialized, listOf(DigitalSignature.WithKey(NullPublicKey, ByteArray(1))), wtx.id) + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt index 4456e5836d..4247bbe007 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt @@ -92,6 +92,21 @@ class DBTransactionStorageTests { } } + @Test + fun `two transactions with rollback`() { + val firstTransaction = newTransaction() + val secondTransaction = newTransaction() + databaseTransaction(database) { + transactionStorage.addTransaction(firstTransaction) + transactionStorage.addTransaction(secondTransaction) + rollback() + } + + databaseTransaction(database) { + assertThat(transactionStorage.transactions).isEmpty() + } + } + @Test fun `two transactions in same DB transaction scope`() { val firstTransaction = newTransaction() diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt index c5aeba062f..a0880cee88 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt @@ -8,6 +8,7 @@ import net.corda.core.contracts.TransactionType import net.corda.core.contracts.USD import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic +import net.corda.core.node.services.unconsumedStates import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_NOTARY import net.corda.flows.BroadcastTransactionFlow.NotifyTxRequest @@ -47,12 +48,12 @@ class DataVendingServiceTests { ptx.signWith(registerKey) val tx = ptx.toSignedTransaction() databaseTransaction(vaultServiceNode.database) { - assertEquals(0, vaultServiceNode.services.vaultService.currentVault.states.toList().size) + assertEquals(0, vaultServiceNode.services.vaultService.unconsumedStates().size) registerNode.sendNotifyTx(tx, vaultServiceNode) // Check the transaction is in the receiving node - val actual = vaultServiceNode.services.vaultService.currentVault.states.singleOrNull() + val actual = vaultServiceNode.services.vaultService.unconsumedStates().singleOrNull() val expected = tx.tx.outRef(0) assertEquals(expected, actual) @@ -78,12 +79,12 @@ class DataVendingServiceTests { ptx.signWith(registerKey) val tx = ptx.toSignedTransaction(false) databaseTransaction(vaultServiceNode.database) { - assertEquals(0, vaultServiceNode.services.vaultService.currentVault.states.toList().size) + assertEquals(0, vaultServiceNode.services.vaultService.unconsumedStates().size) registerNode.sendNotifyTx(tx, vaultServiceNode) // Check the transaction is not in the receiving node - assertEquals(0, vaultServiceNode.services.vaultService.currentVault.states.toList().size) + assertEquals(0, vaultServiceNode.services.vaultService.unconsumedStates().size) } } diff --git a/settings.gradle b/settings.gradle index f20919b9d1..8fab36dfdb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,6 +4,7 @@ rootProject.name = 'corda-project' include 'finance' include 'finance:isolated' include 'core' +include 'node-schemas' include 'node' include 'node:capsule' include 'client' @@ -19,4 +20,5 @@ include 'samples:irs-demo' include 'samples:network-visualiser' include 'samples:simm-valuation-demo' include 'samples:raft-notary-demo' -include 'samples:bank-of-corda-demo' \ No newline at end of file +include 'samples:bank-of-corda-demo' + diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index a715b708d9..e17e4262da 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -149,7 +149,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, override fun makeIdentityService() = MockIdentityService(mockNet.identities) - override fun makeVaultService(): VaultService = NodeVaultService(services) + override fun makeVaultService(dataSourceProperties: Properties): VaultService = NodeVaultService(services, dataSourceProperties) override fun makeKeyManagementService(): KeyManagementService { return E2ETestKeyManagementService(partyKeys + (overrideServices?.values ?: emptySet())) diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt index c72bb66b17..80da704332 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -1,6 +1,5 @@ package net.corda.testing.node -import kotlinx.support.jdk8.collections.putIfAbsent import net.corda.core.contracts.Attachment import net.corda.core.contracts.PartyAndReference import net.corda.core.crypto.* @@ -60,7 +59,7 @@ open class MockServices(val key: KeyPair = generateKeyPair()) : ServiceHub { override val vaultService: VaultService get() = throw UnsupportedOperationException() override val networkService: MessagingService get() = throw UnsupportedOperationException() override val networkMapCache: NetworkMapCache get() = throw UnsupportedOperationException() - override val clock: Clock get() = throw UnsupportedOperationException() + override val clock: Clock get() = Clock.systemUTC() override val schedulerService: SchedulerService get() = throw UnsupportedOperationException() override val myInfo: NodeInfo get() = NodeInfo(object : SingleMessageRecipient {}, Party("MegaCorp", key.public.composite)) }