diff --git a/client/mock/src/main/kotlin/net/corda/client/mock/EventGenerator.kt b/client/mock/src/main/kotlin/net/corda/client/mock/EventGenerator.kt index 99eed3a4e0..bb3af8db6d 100644 --- a/client/mock/src/main/kotlin/net/corda/client/mock/EventGenerator.kt +++ b/client/mock/src/main/kotlin/net/corda/client/mock/EventGenerator.kt @@ -60,10 +60,10 @@ class EventGenerator( val issueRefGenerator = Generator.intRange(0, 1).map { number -> OpaqueBytes(ByteArray(1, { number.toByte() })) } - val amountGenerator = Generator.intRange(0, 10000).combine(currencyGenerator) { quantity, currency -> Amount(quantity.toLong(), currency) } + val amountToIssueGenerator = Generator.intRange(10000, 1000000).combine(currencyGenerator) { quantity, currency -> Amount(quantity.toLong(), currency) } val issueCashGenerator = - amountGenerator.combine(partyGenerator, issueRefGenerator) { amount, to, issueRef -> + amountToIssueGenerator.combine(partyGenerator, issueRefGenerator) { amount, to, issueRef -> CashFlowCommand.IssueCash( amount, issueRef, @@ -81,10 +81,10 @@ class EventGenerator( } val exitCashGenerator = - amountIssuedGenerator.map { + amountToIssueGenerator.combine(partyGenerator, issueRefGenerator) { amount, to, issueRef -> CashFlowCommand.ExitCash( - it.withoutIssuer(), - it.token.issuer.reference + amount, + issueRef ) } diff --git a/core/src/main/kotlin/net/corda/core/node/services/Services.kt b/core/src/main/kotlin/net/corda/core/node/services/Services.kt index 96deae575d..2336c0c68a 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/Services.kt @@ -1,8 +1,10 @@ package net.corda.core.node.services +import co.paralleluniverse.fibers.Suspendable import com.google.common.util.concurrent.ListenableFuture import net.corda.core.contracts.* import net.corda.core.crypto.* +import net.corda.core.flows.FlowException import net.corda.core.serialization.CordaSerializable import net.corda.core.toFuture import net.corda.core.transactions.TransactionBuilder @@ -49,7 +51,7 @@ class Vault(val states: Iterable>) { * other transactions observed, then the changes are observed "net" of those. */ @CordaSerializable - data class Update(val consumed: Set>, val produced: Set>) { + data class Update(val consumed: Set>, val produced: Set>, val flowId: UUID? = null) { /** Checks whether the update contains a state of the specified type. */ inline fun containsType() = consumed.any { it.state.data is T } || produced.any { it.state.data is T } @@ -197,19 +199,57 @@ interface VaultService { * there is insufficient quantity for a given currency (and optionally set of Issuer Parties). */ @Throws(InsufficientBalanceException::class) + @Suspendable fun generateSpend(tx: TransactionBuilder, amount: Amount, to: CompositeKey, onlyFromParties: Set? = null): Pair> + // DOCSTART VaultStatesQuery /** * Return [ContractState]s of a given [Contract] type and [Iterable] of [Vault.StateStatus]. + * Optionally may specify whether to include [StateRef] that have been marked as soft locked (default is true) */ - fun states(clazzes: Set>, statuses: EnumSet): Iterable> + fun states(clazzes: Set>, statuses: EnumSet, includeSoftLockedStates: Boolean = true): Iterable> + // DOCEND VaultStatesQuery + + /** + * Soft locking is used to prevent multiple transactions trying to use the same output simultaneously. + * Violation of a soft lock would result in a double spend being created and rejected by the notary. + */ + + // DOCSTART SoftLockAPI + + /** + * Reserve a set of [StateRef] for a given [UUID] unique identifier. + * Typically, the unique identifier will refer to a Flow id associated with a [Transaction] in an in-flight flow. + * In the case of coin selection, soft locks are automatically taken upon gathering relevant unconsumed input refs. + * + * @throws [StatesNotAvailableException] when not possible to softLock all of requested [StateRef] + */ + @Throws(StatesNotAvailableException::class) + fun softLockReserve(id: UUID, stateRefs: Set) + + /** + * Release all or an explicitly specified set of [StateRef] for a given [UUID] unique identifier. + * A vault soft lock manager is automatically notified of a Flows that are terminated, such that any soft locked states + * may be released. + * In the case of coin selection, softLock are automatically released once previously gathered unconsumed input refs + * are consumed as part of cash spending. + */ + fun softLockRelease(id: UUID, stateRefs: Set? = null) + + /** + * Retrieve softLockStates for a given [UUID] or return all softLockStates in vault for a given + * [ContractState] type + */ + fun softLockedStates(lockId: UUID? = null): List> + + // DOCEND SoftLockAPI } -inline fun VaultService.unconsumedStates(): Iterable> = - states(setOf(T::class.java), EnumSet.of(Vault.StateStatus.UNCONSUMED)) +inline fun VaultService.unconsumedStates(includeSoftLockedStates: Boolean = true): Iterable> = + states(setOf(T::class.java), EnumSet.of(Vault.StateStatus.UNCONSUMED), includeSoftLockedStates) inline fun VaultService.consumedStates(): Iterable> = states(setOf(T::class.java), EnumSet.of(Vault.StateStatus.CONSUMED)) @@ -223,6 +263,10 @@ inline fun VaultService.dealsWith(party: AbstractParty) it.state.data.parties.any { it == party } } +class StatesNotAvailableException(override val message: String?, override val cause: Throwable? = null) : FlowException(message, cause) { + override fun toString() = "Soft locking error: $message" +} + /** * The KMS is responsible for storing and using private keys to sign things. An implementation of this may, for example, * call out to a hardware security module that enforces various auditing and frequency-of-use requirements. diff --git a/core/src/main/kotlin/net/corda/core/transactions/TransactionBuilder.kt b/core/src/main/kotlin/net/corda/core/transactions/TransactionBuilder.kt index 5ff4e92044..024c4c51de 100644 --- a/core/src/main/kotlin/net/corda/core/transactions/TransactionBuilder.kt +++ b/core/src/main/kotlin/net/corda/core/transactions/TransactionBuilder.kt @@ -1,7 +1,9 @@ package net.corda.core.transactions +import co.paralleluniverse.strands.Strand import net.corda.core.contracts.* import net.corda.core.crypto.* +import net.corda.core.flows.FlowStateMachine import net.corda.core.serialization.serialize import java.security.KeyPair import java.time.Duration @@ -27,6 +29,7 @@ import java.util.* open class TransactionBuilder( protected val type: TransactionType = TransactionType.General(), var notary: Party? = null, + var lockId: UUID = (Strand.currentStrand() as? FlowStateMachine<*>)?.id?.uuid ?: UUID.randomUUID(), protected val inputs: MutableList = arrayListOf(), protected val attachments: MutableList = arrayListOf(), protected val outputs: MutableList> = arrayListOf(), diff --git a/core/src/main/kotlin/net/corda/core/transactions/WireTransaction.kt b/core/src/main/kotlin/net/corda/core/transactions/WireTransaction.kt index 77e0c5cd28..ef1f82d0e2 100644 --- a/core/src/main/kotlin/net/corda/core/transactions/WireTransaction.kt +++ b/core/src/main/kotlin/net/corda/core/transactions/WireTransaction.kt @@ -14,6 +14,7 @@ import net.corda.core.serialization.p2PKryo import net.corda.core.serialization.serialize import net.corda.core.utilities.Emoji import java.security.PublicKey +import java.util.* /** * A transaction ready for serialisation, without any signatures attached. A WireTransaction is usually wrapped diff --git a/docs/source/key-concepts-vault.rst b/docs/source/key-concepts-vault.rst index 19d1ca811e..4ccbf7b773 100644 --- a/docs/source/key-concepts-vault.rst +++ b/docs/source/key-concepts-vault.rst @@ -13,13 +13,18 @@ The vault keeps track of both unconsumed and consumed states: By fungible we refer to assets of measurable quantity (eg. a cash currency, units of stock) which can be combined together to represent a single ledger state. -Like with a cryptocurrency wallet, the Corda vault can create transactions that send value (eg. transfer of state) to someone else -by combining fungible states and possibly adding a change output that makes the values balance (this process is usually referred to as ‘coin selection’). -Vault spending ensures that transactions respect the fungibility rules in order to ensure that the issuer and reference data is preserved as the assets pass from hand to hand. +Like with a cryptocurrency wallet, the Corda vault can create transactions that send value (eg. transfer of state) to +someone else by combining fungible states and possibly adding a change output that makes the values balance (this +process is usually referred to as ‘coin selection’). Vault spending ensures that transactions respect the fungibility +rules in order to ensure that the issuer and reference data is preserved as the assets pass from hand to hand. + +A feature called **soft locking** provides the ability to automatically or explicitly reserve states to prevent +multiple transactions within the same node from trying to use the same output simultaneously. Whilst this scenario would +ultimately be detected by a notary, *soft locking* provides a mechanism of early detection for such unwarranted and +invalid scenarios. :doc:`soft-locking` describes this feature in detail. .. note:: Basic 'coin selection' is currently implemented. Future work includes fungible state optimisation (splitting and - merging of states in the background), 'soft locking' (ability to automatically or explicitly reserve states to prevent - multiple transactions trying to use the same output simultaneously), 'state re-issuance' (sending of states back to the + merging of states in the background), and 'state re-issuance' (sending of states back to the issuer for re-issuance, thus pruning long transaction chains and improving privacy). There is also a facility for attaching descriptive textual notes against any transaction stored in the vault. @@ -45,8 +50,7 @@ Note the following: * a vault update API is internally used by transaction recording flows. * the vault database schemas are directly accessible via JDBC for customer joins and queries -Section 8 of the `Technical white paper`_ describes features of the vault yet to be implemented including private key managament, -soft state locking, state splitting and merging, asset re-issuance and node event scheduling. +Section 8 of the `Technical white paper`_ describes features of the vault yet to be implemented including private key managament, state splitting and merging, asset re-issuance and node event scheduling. .. _`Technical white paper`: _static/corda-technical-whitepaper.pdf diff --git a/docs/source/soft-locking.rst b/docs/source/soft-locking.rst new file mode 100644 index 0000000000..8cf05b3985 --- /dev/null +++ b/docs/source/soft-locking.rst @@ -0,0 +1,59 @@ +Soft Locking +============ + +Soft Locking is implemented in the vault to try and prevent a node constructing transactions that attempt to use the same input(s) simultaneously. +Such transactions would result in naturally wasted work when the notary rejects them as double spend attempts. + +Soft locks are automatically applied to coin selection (eg. cash spending) to ensure that no two transactions attempt to +spend the same fungible states. The outcome of such an eventuality will result in an ``InsufficientBalanceException`` for one +of the requesters if there are insufficient number of fungible states available to satisfy both requests. + +.. note:: The Cash Contract schema table is now automatically generated upon node startup as Coin Selection now uses + this table to ensure correct locking and selection of states to satisfy minimum requested spending amounts. + +Soft locks are also automatically applied within flows that issue or receive new states. +These states are effectively soft locked until flow termination (exit or error) or by explicit release. + +In addition, the ``VaultService`` exposes a number of functions a developer may use to explicitly reserve, release and +query soft locks associated with states as required by their CorDapp application logic: + +.. literalinclude:: ../../core/src/main/kotlin/net/corda/core/node/services/Services.kt + :language: kotlin + :start-after: DOCSTART SoftLockAPI + :end-before: DOCEND SoftLockAPI + +You can also control whether soft locked states are retrieved in general vault queries by setting an optional boolean +`includeSoftLockedStates` flag (which is set to *true* by default) + +.. literalinclude:: ../../core/src/main/kotlin/net/corda/core/node/services/Services.kt + :language: kotlin + :start-after: DOCSTART VaultStatesQuery + :end-before: DOCEND VaultStatesQuery + +Explicit Usage +-------------- + +Soft locks are associated with transactions, and typically within the lifecycle of a flow. Specifically, every time a +flow is started a soft lock identifier is associated with that flow for its duration (and released upon it's natural +termination or in the event of an exception). The ``VaultSoftLockManager`` is responsible within the Node for +automatically managing this soft lock registration and release process for flows. The ``TransactionBuilder`` class has a +new ``lockId`` field for the purpose of tracking lockable states. By default, it is automatically set to a random +``UUID`` (outside of a flow) or to a flow's unique ID (within a flow). + +Upon building a new transaction to perform some action for a set of states on a contract, a developer must explicitly +register any states they may wish to hold until that transaction is committed to the ledger. These states will be effectively 'soft +locked' (not usable by any other transaction) until the developer explicitly releases these or the flow terminates or errors +(at which point they are automatically released). + +Use Cases +--------- + +A prime example where *soft locking* is automatically enabled is within the process of issuance and transfer of fungible +state (eg. Cash). An issuer of some fungible asset (eg. Bank of Corda) may wish to transfer that new issue immediately +to the issuance requester (eg. Big Corporation). This issuance and transfer operation must be *atomic* such that another +flow (or instance of the same flow) does not step in and unintentionally spend the states issued by Bank of Corda +before they are transferred to the intended recipient. Soft locking will automatically prevent new issued states within +``IssuerFlow`` from being spendable by any other flow until such time as the ``IssuerFlow`` itself terminates. + +Other use cases for *soft locking* may involve competing flows attempting to match trades or any other concurrent +activities that may involve operating on an identical set of unconsumed states. diff --git a/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt b/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt index be160dc427..cbb22fa9dd 100644 --- a/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt +++ b/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt @@ -70,8 +70,6 @@ object IssuerFlow { return txn } - // TODO: resolve race conditions caused by the 2 separate Cashflow commands (Issue and Pay) not reusing the same - // state references (thus causing Notarisation double spend exceptions). @Suspendable private fun issueCashTo(amount: Amount, issueTo: Party, diff --git a/finance/src/test/kotlin/net/corda/contracts/CommercialPaperTests.kt b/finance/src/test/kotlin/net/corda/contracts/CommercialPaperTests.kt index e5a4569172..6e8b14fb20 100644 --- a/finance/src/test/kotlin/net/corda/contracts/CommercialPaperTests.kt +++ b/finance/src/test/kotlin/net/corda/contracts/CommercialPaperTests.kt @@ -17,6 +17,8 @@ import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.core.utilities.DUMMY_PUBKEY_1 import net.corda.core.utilities.TEST_TX_TIME +import net.corda.node.services.schema.HibernateObserver +import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.vault.NodeVaultService import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.databaseTransaction @@ -228,7 +230,7 @@ class CommercialPaperTestsGeneric { databaseTransaction(databaseAlice) { aliceServices = object : MockServices() { - override val vaultService: VaultService = NodeVaultService(this, dataSourcePropsAlice) + override val vaultService: VaultService = makeVaultService(dataSourcePropsAlice) override fun recordTransactions(txs: Iterable) { for (stx in txs) { @@ -248,7 +250,7 @@ class CommercialPaperTestsGeneric { databaseTransaction(databaseBigCorp) { bigCorpServices = object : MockServices() { - override val vaultService: VaultService = NodeVaultService(this, dataSourcePropsBigCorp) + override val vaultService: VaultService = makeVaultService(dataSourcePropsBigCorp) override fun recordTransactions(txs: Iterable) { for (stx in txs) { @@ -290,32 +292,38 @@ class CommercialPaperTestsGeneric { } databaseTransaction(databaseBigCorp) { - fun makeRedeemTX(time: Instant): SignedTransaction { - val ptx = TransactionType.General.Builder(DUMMY_NOTARY) - ptx.setTime(time, 30.seconds) - CommercialPaper().generateRedeem(ptx, moveTX.tx.outRef(1), bigCorpVaultService) - ptx.signWith(aliceServices.key) - ptx.signWith(bigCorpServices.key) - ptx.signWith(DUMMY_NOTARY_KEY) - return ptx.toSignedTransaction() - } - - val tooEarlyRedemption = makeRedeemTX(TEST_TX_TIME + 10.days) - val validRedemption = makeRedeemTX(TEST_TX_TIME + 31.days) - // Verify the txns are valid and insert into both sides. listOf(issueTX, moveTX).forEach { it.toLedgerTransaction(aliceServices).verify() aliceServices.recordTransactions(it) bigCorpServices.recordTransactions(it) } + } + databaseTransaction(databaseBigCorp) { + fun makeRedeemTX(time: Instant): Pair { + val ptx = TransactionType.General.Builder(DUMMY_NOTARY) + ptx.setTime(time, 30.seconds) + CommercialPaper().generateRedeem(ptx, moveTX.tx.outRef(1), bigCorpVaultService) + ptx.signWith(aliceServices.key) + ptx.signWith(bigCorpServices.key) + ptx.signWith(DUMMY_NOTARY_KEY) + return Pair(ptx.toSignedTransaction(), ptx.lockId) + } + + val redeemTX = makeRedeemTX(TEST_TX_TIME + 10.days) + val tooEarlyRedemption = redeemTX.first + val tooEarlyRedemptionLockId = redeemTX.second val e = assertFailsWith(TransactionVerificationException::class) { tooEarlyRedemption.toLedgerTransaction(aliceServices).verify() } + // manually release locks held by this failing transaction + aliceServices.vaultService.softLockRelease(tooEarlyRedemptionLockId) assertTrue(e.cause!!.message!!.contains("paper must have matured")) + val validRedemption = makeRedeemTX(TEST_TX_TIME + 31.days).first validRedemption.toLedgerTransaction(aliceServices).verify() + // soft lock not released after success either!!! (as transaction not recorded) } } } diff --git a/finance/src/test/kotlin/net/corda/contracts/asset/CashTests.kt b/finance/src/test/kotlin/net/corda/contracts/asset/CashTests.kt index 7519d4f7d6..7fccd4e586 100644 --- a/finance/src/test/kotlin/net/corda/contracts/asset/CashTests.kt +++ b/finance/src/test/kotlin/net/corda/contracts/asset/CashTests.kt @@ -58,7 +58,7 @@ class CashTests { databaseTransaction(database) { services = object : MockServices() { override val keyManagementService: MockKeyManagementService = MockKeyManagementService(MINI_CORP_KEY, MEGA_CORP_KEY, OUR_KEY) - override val vaultService: VaultService = NodeVaultService(this, dataSourceProps) + override val vaultService: VaultService = makeVaultService(dataSourceProps) override fun recordTransactions(txs: Iterable) { for (stx in txs) { @@ -580,7 +580,7 @@ class CashTests { databaseTransaction(database) { val tx = TransactionType.General.Builder(DUMMY_NOTARY) - vault.generateSpend(tx, 80.DOLLARS, ALICE_PUBKEY, setOf(MINI_CORP)) + vault.generateSpend(tx, 80.DOLLARS, ALICE_PUBKEY, setOf(MINI_CORP.toAnonymous())) assertEquals(vaultStatesUnconsumed.elementAt(2).ref, tx.inputStates()[0]) } diff --git a/finance/src/test/kotlin/net/corda/flows/IssuerFlowTest.kt b/finance/src/test/kotlin/net/corda/flows/IssuerFlowTest.kt index fcab773981..0d22d6c5fe 100644 --- a/finance/src/test/kotlin/net/corda/flows/IssuerFlowTest.kt +++ b/finance/src/test/kotlin/net/corda/flows/IssuerFlowTest.kt @@ -1,6 +1,7 @@ package net.corda.flows import com.google.common.util.concurrent.ListenableFuture +import net.corda.contracts.testing.calculateRandomlySizedAmounts import net.corda.core.contracts.Amount import net.corda.core.contracts.DOLLARS import net.corda.core.contracts.PartyAndReference @@ -15,6 +16,7 @@ import net.corda.core.utilities.DUMMY_NOTARY import net.corda.flows.IssuerFlow.IssuanceRequester import net.corda.testing.* import net.corda.testing.node.MockNetwork +import net.corda.testing.node.MockNetwork.MockNode import org.junit.Test import java.util.* import kotlin.test.assertEquals @@ -22,9 +24,9 @@ import kotlin.test.assertFailsWith class IssuerFlowTest { lateinit var net: MockNetwork - lateinit var notaryNode: MockNetwork.MockNode - lateinit var bankOfCordaNode: MockNetwork.MockNode - lateinit var bankClientNode: MockNetwork.MockNode + lateinit var notaryNode: MockNode + lateinit var bankOfCordaNode: MockNode + lateinit var bankClientNode: MockNode @Test fun `test issuer flow`() { @@ -36,30 +38,71 @@ class IssuerFlowTest { // using default IssueTo Party Reference val issueToPartyAndRef = bankClientNode.info.legalIdentity.ref(OpaqueBytes.Companion.of(123)) - val (issuer, issuerResult) = runIssuerAndIssueRequester(1000000.DOLLARS, issueToPartyAndRef) + val (issuer, issuerResult) = runIssuerAndIssueRequester(bankOfCordaNode, bankClientNode, 1000000.DOLLARS, issueToPartyAndRef) assertEquals(issuerResult.get(), issuer.get().resultFuture.get()) // try to issue an amount of a restricted currency assertFailsWith { - runIssuerAndIssueRequester(Amount(100000L, currency("BRL")), issueToPartyAndRef).issueRequestResult.getOrThrow() + runIssuerAndIssueRequester(bankOfCordaNode, bankClientNode, Amount(100000L, currency("BRL")), issueToPartyAndRef).issueRequestResult.getOrThrow() } bankOfCordaNode.stop() bankClientNode.stop() - - bankOfCordaNode.manuallyCloseDB() - bankClientNode.manuallyCloseDB() } } - private fun runIssuerAndIssueRequester(amount: Amount, issueToPartyAndRef: PartyAndReference) : RunResult { - val resolvedIssuerParty = bankOfCordaNode.services.identityService.partyFromAnonymous(issueToPartyAndRef) ?: throw IllegalStateException() - val issuerFuture = bankOfCordaNode.initiateSingleShotFlow(IssuerFlow.IssuanceRequester::class) { + @Test + fun `test issue flow to self`() { + net = MockNetwork(false, true) + ledger { + notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name) + bankOfCordaNode = net.createPartyNode(notaryNode.info.address, BOC.name) + + // using default IssueTo Party Reference + val issueToPartyAndRef = bankOfCordaNode.info.legalIdentity.ref(OpaqueBytes.Companion.of(123)) + val (issuer, issuerResult) = runIssuerAndIssueRequester(bankOfCordaNode, bankOfCordaNode, 1000000.DOLLARS, issueToPartyAndRef) + assertEquals(issuerResult.get(), issuer.get().resultFuture.get()) + + bankOfCordaNode.stop() + } + } + + @Test + fun `test concurrent issuer flow`() { + + net = MockNetwork(false, true) + ledger { + notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name) + bankOfCordaNode = net.createPartyNode(notaryNode.info.address, BOC.name) + bankClientNode = net.createPartyNode(notaryNode.info.address, MEGA_CORP.name) + + // using default IssueTo Party Reference + val issueToPartyAndRef = bankClientNode.info.legalIdentity.ref(OpaqueBytes.Companion.of(123)) + + // this test exercises the Cashflow issue and move subflows to ensure consistent spending of issued states + val amount = 10000.DOLLARS + val amounts = calculateRandomlySizedAmounts(10000.DOLLARS, 10, 10, Random()) + val handles = amounts.map { pennies -> + runIssuerAndIssueRequester(bankOfCordaNode, bankClientNode, Amount(pennies, amount.token), issueToPartyAndRef) + } + handles.forEach { + require (it.issueRequestResult.get() is SignedTransaction) + } + + bankOfCordaNode.stop() + bankClientNode.stop() + } + } + + private fun runIssuerAndIssueRequester(issuerNode: MockNode, issueToNode: MockNode, + amount: Amount, issueToPartyAndRef: PartyAndReference) : RunResult { + val resolvedIssuerParty = issuerNode.services.identityService.partyFromAnonymous(issueToPartyAndRef) ?: throw IllegalStateException() + val issuerFuture = issuerNode.initiateSingleShotFlow(IssuerFlow.IssuanceRequester::class) { otherParty -> IssuerFlow.Issuer(resolvedIssuerParty) }.map { it.stateMachine } - val issueRequest = IssuanceRequester(amount, resolvedIssuerParty, issueToPartyAndRef.reference, bankOfCordaNode.info.legalIdentity) - val issueRequestResultFuture = bankClientNode.services.startFlow(issueRequest).resultFuture + val issueRequest = IssuanceRequester(amount, resolvedIssuerParty, issueToPartyAndRef.reference, issuerNode.info.legalIdentity) + val issueRequestResultFuture = issueToNode.services.startFlow(issueRequest).resultFuture return IssuerFlowTest.RunResult(issuerFuture, issueRequestResultFuture) } diff --git a/node-schemas/src/test/kotlin/net/corda/node/services/vault/schemas/VaultSchemaTest.kt b/node-schemas/src/test/kotlin/net/corda/node/services/vault/schemas/VaultSchemaTest.kt index bc89b95eda..cbee19cff0 100644 --- a/node-schemas/src/test/kotlin/net/corda/node/services/vault/schemas/VaultSchemaTest.kt +++ b/node-schemas/src/test/kotlin/net/corda/node/services/vault/schemas/VaultSchemaTest.kt @@ -2,8 +2,10 @@ package net.corda.node.services.vault.schemas import io.requery.Persistable import io.requery.TransactionIsolation +import io.requery.kotlin.`in` import io.requery.kotlin.eq import io.requery.kotlin.invoke +import io.requery.kotlin.isNull import io.requery.rx.KotlinRxEntityStore import io.requery.sql.* import io.requery.sql.platform.Generic @@ -525,6 +527,155 @@ class VaultSchemaTest { } } + /** + * Soft locking tests + */ + @Test + fun testSingleSoftLockUpdate() { + + // insert unconsumed state + val stateEntity = createStateEntity(transaction!!.inputs[0]) + data.invoke { + upsert(stateEntity) + } + + // reserve soft lock on state + stateEntity.apply { + this.lockId = "LOCK#1" + this.lockUpdateTime = Instant.now() + data.invoke { + upsert(stateEntity) + } + } + + // select unlocked states + data.invoke { + val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq stateEntity.txId) + .and (VaultSchema.VaultStates::lockId.isNull()) + assertEquals(0, result.get().count()) + } + + // release soft lock on state + data.invoke { + val update = update(VaultStatesEntity::class) + .set(VaultStatesEntity.LOCK_ID, null) + .set(VaultStatesEntity.LOCK_UPDATE_TIME, Instant.now()) + .where (VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED) + .and (VaultStatesEntity.LOCK_ID eq "LOCK#1").get() + assertEquals(1, update.value()) + } + + // select unlocked states + data.invoke { + val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId eq stateEntity.txId) + .and (VaultSchema.VaultStates::lockId.isNull()) + assertEquals(1, result.get().count()) + } + } + + @Test + fun testMultipleSoftLocksUpdate() { + + // insert unconsumed state + data.withTransaction(TransactionIsolation.REPEATABLE_READ) { + transaction!!.inputs.forEach { + val stateEntity = createStateEntity(it) + insert(stateEntity) + } + val result = select(VaultSchema.VaultStates::class) + Assert.assertSame(3, result().toList().size) + } + + // reserve soft locks on states + transaction!!.inputs.forEach { + val stateEntity = createStateEntity(it) + stateEntity.apply { + this.lockId = "LOCK#1" + this.lockUpdateTime = Instant.now() + data.invoke { + upsert(stateEntity) + } + } + } + + // select unlocked states + val txnIds = transaction!!.inputs.map { it.ref.txhash.toString() }.toSet() + data.invoke { + val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId `in` txnIds) + .and (VaultSchema.VaultStates::lockId eq "") + assertEquals(0, result.get().count()) + } + + // release soft lock on states + data.invoke { + val query = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId `in` txnIds) + .and(VaultSchema.VaultStates::lockId eq "LOCK#1") + val result = query.get() + assertEquals(3, result.count()) + result.forEach { + it.lockId = "" + it.lockUpdateTime = Instant.now() + upsert(it) + } + } + + // select unlocked states + data.invoke { + val result = select(VaultSchema.VaultStates::class) where (VaultSchema.VaultStates::txId `in` txnIds) + .and (VaultSchema.VaultStates::lockId eq "") + assertEquals(3, result.get().count()) + } + } + + @Test + fun testMultipleSoftLocksUsingNativeJDBC() { + // NOTE: + // - Requery using raw SelectForUpdate not working + // - Requery using raw Update not working + + // using native JDBC + val refs = transaction!!.inputs.map { it.ref } + + // insert unconsumed state + data.invoke { + transaction!!.inputs.forEach { + val stateEntity = createStateEntity(it) + insert(stateEntity) + } + } + + // update refs with soft lock id + val stateRefs = refs.fold("") { stateRefs, it -> stateRefs + "('${it.txhash}','${it.index}')," }.dropLast(1) + val lockId = "LOCK#1" + val selectForUpdateStatement = """ + SELECT transaction_id, output_index, lock_id, lock_timestamp FROM VAULT_STATES + WHERE ((transaction_id, output_index) IN ($stateRefs)) FOR UPDATE + """ + + val statement = jdbcConn.createStatement() + val rs = statement.executeQuery(selectForUpdateStatement) + while (rs.next()) { + val txHash = SecureHash.parse(rs.getString(1)) + val index = rs.getInt(2) + val statement = jdbcConn.createStatement() + val updateStatement = """ + UPDATE VAULT_STATES SET lock_id = '$lockId', lock_timestamp = '${Instant.now()}' + WHERE (transaction_id = '$txHash' AND output_index = $index) + """ + statement.executeUpdate(updateStatement) + } + + // count locked state refs + val selectStatement = """ + SELECT transaction_id, output_index, contract_state FROM VAULT_STATES + WHERE ((transaction_id, output_index) IN ($stateRefs)) AND (lock_id != '') + """ + val rsQuery = statement.executeQuery(selectStatement) + var countQuery = 0 + while (rsQuery.next()) countQuery++ + assertEquals(3, countQuery) + } + @Test fun insertWithBigCompositeKey() { val keys = (1..314).map { generateKeyPair().public.composite } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index a0387db8e3..a9fe847eb3 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -45,6 +45,7 @@ import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.transactions.* import net.corda.node.services.vault.CashBalanceAsMetricsObserver import net.corda.node.services.vault.NodeVaultService +import net.corda.node.services.vault.VaultSoftLockManager import net.corda.node.utilities.AddOrRemove.ADD import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.configureDatabase @@ -278,9 +279,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, } private fun makeVaultObservers() { + VaultSoftLockManager(vault, smm) CashBalanceAsMetricsObserver(services, database) ScheduledActivityObserver(services) - HibernateObserver(services) + HibernateObserver(vault, schemas) } private fun makeInfo(): NodeInfo { diff --git a/node/src/main/kotlin/net/corda/node/services/api/SchemaService.kt b/node/src/main/kotlin/net/corda/node/services/api/SchemaService.kt index c3a701b841..a297c99cd3 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/SchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/SchemaService.kt @@ -12,7 +12,7 @@ interface SchemaService { /** * Represents any options configured on the node for a schema. */ - data class SchemaOptions(val databaseSchema: String?, val tablePrefix: String?) + data class SchemaOptions(val databaseSchema: String? = null, val tablePrefix: String? = null) /** * Options configured for this node's schemas. A missing entry for a schema implies all properties are null. diff --git a/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt b/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt index 5e32108c33..1e80a8b29a 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt @@ -4,12 +4,13 @@ import kotlinx.support.jdk7.use import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateRef +import net.corda.core.node.services.VaultService import net.corda.core.schemas.MappedSchema import net.corda.core.schemas.PersistentStateRef import net.corda.core.schemas.QueryableState import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor -import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.api.SchemaService import org.hibernate.SessionFactory import org.hibernate.boot.model.naming.Identifier import org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl @@ -25,17 +26,19 @@ import java.util.concurrent.ConcurrentHashMap * A vault observer that extracts Object Relational Mappings for contract states that support it, and persists them with Hibernate. */ // TODO: Manage version evolution of the schemas via additional tooling. -class HibernateObserver(services: ServiceHubInternal) { +class HibernateObserver(vaultService: VaultService, val schemaService: SchemaService) { companion object { val logger = loggerFor() } - val schemaService = services.schemaService // TODO: make this a guava cache or similar to limit ability for this to grow forever. val sessionFactories = ConcurrentHashMap() init { - services.vaultService.rawUpdates.subscribe { persist(it.produced) } + schemaService.schemaOptions.map { it.key }.forEach { + makeSessionFactoryForSchema(it) + } + vaultService.rawUpdates.subscribe { persist(it.produced) } } private fun sessionFactoryForSchema(schema: MappedSchema): SessionFactory { diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index 7202dffdf4..ffbad9a60e 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -5,6 +5,7 @@ import net.corda.core.schemas.PersistentState import net.corda.core.schemas.QueryableState import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.node.services.api.SchemaService +import net.corda.schemas.CashSchemaV1 /** * Most basic implementation of [SchemaService]. @@ -12,10 +13,15 @@ import net.corda.node.services.api.SchemaService * TODO: support loading schema options from node configuration. * TODO: support configuring what schemas are to be selected for persistence. * TODO: support plugins for schema version upgrading or custom mapping not supported by original [QueryableState]. + * TODO: create whitelisted tables when a CorDapp is first installed */ class NodeSchemaService : SchemaService, SingletonSerializeAsToken() { // Currently does not support configuring schema options. - override val schemaOptions: Map = emptyMap() + + // Whitelisted tables are those required by internal Corda services + // For example, cash is used by the vault for coin selection + // This whitelist will grow as we add further functionality (eg. other fungible assets) + override val schemaOptions: Map = mapOf(Pair(CashSchemaV1, SchemaService.SchemaOptions())) // Currently returns all schemas supported by the state, with no filtering or enrichment. override fun selectSchemas(state: QueryableState): Iterable { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 3152b8c6a6..993797b6a0 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -27,6 +27,7 @@ import org.jetbrains.exposed.sql.Transaction import org.jetbrains.exposed.sql.transactions.TransactionManager import org.slf4j.Logger import org.slf4j.LoggerFactory +import java.sql.Connection import java.sql.SQLException import java.util.* import java.util.concurrent.TimeUnit @@ -46,6 +47,24 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, * Return the current [FlowStateMachineImpl] or null if executing outside of one. */ fun currentStateMachine(): FlowStateMachineImpl<*>? = Strand.currentStrand() as? FlowStateMachineImpl<*> + + /** + * Provide a mechanism to sleep within a Strand without locking any transactional state + */ + // TODO: inlined due to an intermittent Quasar error (to be fully investigated) + @Suppress("NOTHING_TO_INLINE") + @Suspendable + inline fun sleep(millis: Long) { + if (currentStateMachine() != null) { + val db = StrandLocalTransactionManager.database + TransactionManager.current().commit() + TransactionManager.current().close() + Strand.sleep(millis) + StrandLocalTransactionManager.database = db + TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ) + } + else Strand.sleep(millis) + } } // These fields shouldn't be serialised, so they are marked @Transient. @@ -92,7 +111,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, // Check if the FlowException was propagated by looking at where the stack trace originates (see suspendAndExpectReceive). val propagated = e.stackTrace[0].className == javaClass.name processException(e, propagated) - logger.debug(if (propagated) "Flow ended due to receiving exception" else "Flow finished with exception", e) + logger.error(if (propagated) "Flow ended due to receiving exception" else "Flow finished with exception", e) return } catch (t: Throwable) { recordDuration(startTime, success = false) diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 5a0eb29da4..c1b5e4c291 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -1,16 +1,22 @@ package net.corda.node.services.vault +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.strands.Strand import io.requery.TransactionIsolation import io.requery.kotlin.`in` import io.requery.kotlin.eq +import io.requery.kotlin.isNull +import io.requery.kotlin.notNull import net.corda.contracts.asset.Cash import net.corda.core.ThreadBox import net.corda.core.bufferUntilSubscribed import net.corda.core.contracts.* import net.corda.core.crypto.AbstractParty import net.corda.core.crypto.CompositeKey +import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.node.ServiceHub +import net.corda.core.node.services.StatesNotAvailableException import net.corda.core.node.services.Vault import net.corda.core.node.services.VaultService import net.corda.core.node.services.unconsumedStates @@ -24,13 +30,20 @@ import net.corda.core.transactions.WireTransaction import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace import net.corda.node.services.database.RequeryConfiguration +import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.services.vault.schemas.* +import net.corda.node.utilities.StrandLocalTransactionManager import net.corda.node.utilities.bufferUntilDatabaseCommit import net.corda.node.utilities.wrapWithDatabaseTransaction +import org.jetbrains.exposed.sql.transactions.TransactionManager import rx.Observable import rx.subjects.PublishSubject import java.security.PublicKey +import java.sql.Connection +import java.sql.SQLException import java.util.* +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock /** * Currently, the node vault service is a very simple RDBMS backed implementation. It will change significantly when @@ -60,74 +73,85 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P // For use during publishing only. val updatesPublisher: rx.Observer get() = _updatesPublisher.bufferUntilDatabaseCommit().tee(_rawUpdatesPublisher) + }) - fun recordUpdate(update: Vault.Update): Vault.Update { - if (update != Vault.NoUpdate) { - val producedStateRefs = update.produced.map { it.ref } - val producedStateRefsMap = update.produced.associateBy { it.ref } - val consumedStateRefs = update.consumed.map { it.ref } - log.trace { "Removing $consumedStateRefs consumed contract states and adding $producedStateRefs produced contract states to the database." } + private fun recordUpdate(update: Vault.Update): Vault.Update { + if (update != Vault.NoUpdate) { + val producedStateRefs = update.produced.map { it.ref } + val producedStateRefsMap = update.produced.associateBy { it.ref } + val consumedStateRefs = update.consumed.map { it.ref } + log.trace { "Removing $consumedStateRefs consumed contract states and adding $producedStateRefs produced contract states to the database." } + + session.withTransaction(TransactionIsolation.REPEATABLE_READ) { + producedStateRefsMap.forEach { it -> + val state = VaultStatesEntity().apply { + txId = it.key.txhash.toString() + index = it.key.index + stateStatus = Vault.StateStatus.UNCONSUMED + contractStateClassName = it.value.state.data.javaClass.name + contractState = it.value.state.serialize(storageKryo()).bytes + notaryName = it.value.state.notary.name + notaryKey = it.value.state.notary.owningKey.toBase58String() + recordedTime = services.clock.instant() + } + insert(state) + } + // TODO: awaiting support of UPDATE WHERE IN in Requery DSL + consumedStateRefs.forEach { stateRef -> + val queryKey = io.requery.proxy.CompositeKey(mapOf(VaultStatesEntity.TX_ID to stateRef.txhash.toString(), + VaultStatesEntity.INDEX to stateRef.index)) + val state = findByKey(VaultStatesEntity::class, queryKey) + state?.run { + stateStatus = Vault.StateStatus.CONSUMED + consumedTime = services.clock.instant() + // remove lock (if held) + if (lockId != null) { + lockId = null + lockUpdateTime = services.clock.instant() + log.trace("Releasing soft lock on consumed state: $stateRef") + } + update(state) + } + } + } + } + return update + } + + // TODO: consider moving this logic outside the vault + // TODO: revisit the concurrency safety of this logic when we move beyond single threaded SMM. + // For example, we update currency totals in a non-deterministic order and so expose ourselves to deadlock. + private fun maybeUpdateCashBalances(update: Vault.Update) { + if (update.containsType()) { + val consumed = sumCashStates(update.consumed) + val produced = sumCashStates(update.produced) + (produced.keys + consumed.keys).map { currency -> + val producedAmount = produced[currency] ?: Amount(0, currency) + val consumedAmount = consumed[currency] ?: Amount(0, currency) + + val cashBalanceEntity = VaultCashBalancesEntity() + cashBalanceEntity.currency = currency.currencyCode + cashBalanceEntity.amount = producedAmount.quantity - consumedAmount.quantity session.withTransaction(TransactionIsolation.REPEATABLE_READ) { - producedStateRefsMap.forEach { it -> - val state = VaultStatesEntity().apply { - txId = it.key.txhash.toString() - index = it.key.index - stateStatus = Vault.StateStatus.UNCONSUMED - contractStateClassName = it.value.state.data.javaClass.name - contractState = it.value.state.serialize(storageKryo()).bytes - notaryName = it.value.state.notary.name - notaryKey = it.value.state.notary.owningKey.toBase58String() - recordedTime = services.clock.instant() - } - insert(state) - } - consumedStateRefs.forEach { stateRef -> - val queryKey = io.requery.proxy.CompositeKey(mapOf(VaultStatesEntity.TX_ID to stateRef.txhash.toString(), - VaultStatesEntity.INDEX to stateRef.index)) - val state = findByKey(VaultStatesEntity::class, queryKey) - state?.run { - stateStatus = Vault.StateStatus.CONSUMED - consumedTime = services.clock.instant() - update(state) - } - } - } - } - return update - } - - // TODO: consider moving this logic outside the vault - fun maybeUpdateCashBalances(update: Vault.Update) { - if (update.containsType()) { - val consumed = sumCashStates(update.consumed) - val produced = sumCashStates(update.produced) - (produced.keys + consumed.keys).map { currency -> - val producedAmount = produced[currency] ?: Amount(0, currency) - val consumedAmount = consumed[currency] ?: Amount(0, currency) - - val cashBalanceEntity = VaultCashBalancesEntity() - cashBalanceEntity.currency = currency.currencyCode - cashBalanceEntity.amount = producedAmount.quantity - consumedAmount.quantity - - session.withTransaction(TransactionIsolation.REPEATABLE_READ) { - val state = findByKey(VaultCashBalancesEntity::class, currency.currencyCode) - state?.run { - amount += producedAmount.quantity - consumedAmount.quantity - } - upsert(state ?: cashBalanceEntity) + val state = findByKey(VaultCashBalancesEntity::class, currency.currencyCode) + state?.run { + amount += producedAmount.quantity - consumedAmount.quantity } + upsert(state ?: cashBalanceEntity) + val total = state?.amount ?: cashBalanceEntity.amount + log.trace{"Updating Cash balance for $currency by ${cashBalanceEntity.amount} pennies (total: $total)"} } } } + } - @Suppress("UNCHECKED_CAST") - private fun sumCashStates(states: Iterable>): Map> { - return states.mapNotNull { (it.state.data as? FungibleAsset)?.amount } - .groupBy { it.token.product } - .mapValues { it.value.map { Amount(it.quantity, it.token.product) }.sumOrThrow() } - } - }) + @Suppress("UNCHECKED_CAST") + private fun sumCashStates(states: Iterable>): Map> { + return states.mapNotNull { (it.state.data as? FungibleAsset)?.amount } + .groupBy { it.token.product } + .mapValues { it.value.map { Amount(it.quantity, it.token.product) }.sumOrThrow() } + } override val cashBalances: Map> get() { val cashBalancesByCurrency = @@ -151,15 +175,17 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P } } - override fun states(clazzes: Set>, statuses: EnumSet): Iterable> { + override fun states(clazzes: Set>, statuses: EnumSet, includeSoftLockedStates: Boolean): Iterable> { val stateAndRefs = session.withTransaction(TransactionIsolation.REPEATABLE_READ) { - var result = select(VaultSchema.VaultStates::class) + var query = select(VaultSchema.VaultStates::class) .where(VaultSchema.VaultStates::stateStatus `in` statuses) // TODO: temporary fix to continue supporting track() function (until becomes Typed) if (!clazzes.map {it.name}.contains(ContractState::class.java.name)) - result.and (VaultSchema.VaultStates::contractStateClassName `in` (clazzes.map { it.name })) - val iterator = result.get().iterator() + query.and (VaultSchema.VaultStates::contractStateClassName `in` (clazzes.map { it.name })) + if (!includeSoftLockedStates) + query.and(VaultSchema.VaultStates::lockId.isNull()) + val iterator = query.get().iterator() Sequence{iterator} .map { it -> val stateRef = StateRef(SecureHash.parse(it.txId), it.index) @@ -195,10 +221,13 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P val ourKeys = services.keyManagementService.keys.keys val netDelta = txns.fold(Vault.NoUpdate) { netDelta, txn -> netDelta + makeUpdate(txn, ourKeys) } if (netDelta != Vault.NoUpdate) { + recordUpdate(netDelta) + maybeUpdateCashBalances(netDelta) mutex.locked { - recordUpdate(netDelta) - maybeUpdateCashBalances(netDelta) - updatesPublisher.onNext(netDelta) + // flowId required by SoftLockManager to perform auto-registration of soft locks for new states + val uuid = (Strand.currentStrand() as? FlowStateMachineImpl<*>)?.id?.uuid + val vaultUpdate = if (uuid != null) netDelta.copy(flowId = uuid) else netDelta + updatesPublisher.onNext(vaultUpdate) } } } @@ -218,6 +247,208 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P } } + @Throws(StatesNotAvailableException::class) + override fun softLockReserve(id: UUID, stateRefs: Set) { + if (stateRefs.isNotEmpty()) { + val stateRefsAsStr = stateRefsToCompositeKeyStr(stateRefs.toList()) + val softLockTimestamp = services.clock.instant() + // TODO: awaiting support of UPDATE WHERE IN in Requery DSL + val updateStatement = """ + UPDATE VAULT_STATES SET lock_id = '$id', lock_timestamp = '$softLockTimestamp' + WHERE ((transaction_id, output_index) IN ($stateRefsAsStr)) + AND (state_status = 0) + AND ((lock_id = '$id') OR (lock_id is null)); + """ + val statement = configuration.jdbcSession().createStatement() + log.debug(updateStatement) + try { + val rs = statement.executeUpdate(updateStatement) + if (rs > 0 && rs == stateRefs.size) { + log.trace("Reserving soft lock states for $id: $stateRefs") + } + else { + // revert partial soft locks + val revertUpdateStatement = """ + UPDATE VAULT_STATES SET lock_id = null + WHERE ((transaction_id, output_index) IN ($stateRefsAsStr)) + AND (lock_timestamp = '$softLockTimestamp') AND (lock_id = '$id'); + """ + log.debug(revertUpdateStatement) + val rsr = statement.executeUpdate(revertUpdateStatement) + if (rsr > 0) { + log.trace("Reverting $rsr partially soft locked states for $id") + } + throw StatesNotAvailableException("Attempted to reserve $stateRefs for $id but only $rs rows available") + } + } + catch (e: SQLException) { + log.error("""soft lock update error attempting to reserve states: $stateRefs for $id + $e. + """) + throw StatesNotAvailableException("Failed to reserve $stateRefs for $id", e) + } + finally { statement.close() } + } + } + + override fun softLockRelease(id: UUID, stateRefs: Set?) { + if (stateRefs == null) { + session.withTransaction(TransactionIsolation.REPEATABLE_READ) { + val update = update(VaultStatesEntity::class) + .set(VaultStatesEntity.LOCK_ID, null) + .set(VaultStatesEntity.LOCK_UPDATE_TIME, services.clock.instant()) + .where (VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED) + .and (VaultStatesEntity.LOCK_ID eq id.toString()).get() + if (update.value() > 0) { + log.trace("Releasing ${update.value()} soft locked states for $id") + } + } + } + else if (stateRefs.isNotEmpty()) { + val stateRefsAsStr = stateRefsToCompositeKeyStr(stateRefs.toList()) + // TODO: awaiting support of UPDATE WHERE IN in Requery DSL + val updateStatement = """ + UPDATE VAULT_STATES SET lock_id = null, lock_timestamp = '${services.clock.instant()}' + WHERE (transaction_id, output_index) IN ($stateRefsAsStr) + AND (state_status = 0) AND (lock_id = '$id'); + """ + val statement = configuration.jdbcSession().createStatement() + log.debug(updateStatement) + try { + val rs = statement.executeUpdate(updateStatement) + if (rs > 0) { + log.trace("Releasing $rs soft locked states for $id and stateRefs $stateRefs") + } + } catch (e: SQLException) { + log.error("""soft lock update error attempting to release states for $id and $stateRefs") + $e. + """) + } finally { + statement.close() + } + } + } + + // coin selection retry loop counter, sleep (msecs) and lock for selecting states + val MAX_RETRIES = 5 + val RETRY_SLEEP = 100 + val spendLock: ReentrantLock = ReentrantLock() + + @Suspendable + internal fun unconsumedStatesForSpending(amount: Amount, onlyFromIssuerParties: Set? = null, notary: Party? = null, lockId: UUID): List> { + + val issuerKeysStr = onlyFromIssuerParties?.fold("") { left, right -> left + "('${right.owningKey.toBase58String()}')," }?.dropLast(1) + var stateAndRefs = mutableListOf>() + + // TODO: Need to provide a database provider independent means of performing this function. + // We are using an H2 specific means of selecting a minimum set of rows that match a request amount of coins: + // 1) There is no standard SQL mechanism of calculating a cumulative total on a field and restricting row selection on the + // running total of such an accumulator + // 2) H2 uses session variables to perform this accumulator function: + // http://www.h2database.com/html/functions.html#set + // 3) H2 does not support JOIN's in FOR UPDATE (hence we are forced to execute 2 queries) + + for (retryCount in 1..MAX_RETRIES) { + + spendLock.withLock { + val statement = configuration.jdbcSession().createStatement() + try { + statement.execute("CALL SET(@t, 0);") + + // we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null) + // the softLockReserve update will detect whether we try to lock states locked by others + val selectJoin = """ + SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id + FROM vault_states AS vs, contract_cash_states AS ccs + WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index + AND vs.state_status = 0 + AND ccs.ccy_code = '${amount.token}' and @t < ${amount.quantity} + AND (vs.lock_id = '$lockId' OR vs.lock_id is null) + """ + + (if (notary != null) + " AND vs.notary_key = '${notary.owningKey.toBase58String()}'" else "") + + (if (issuerKeysStr != null) + " AND ccs.issuer_key IN $issuerKeysStr" else "") + + // Retrieve spendable state refs + val rs = statement.executeQuery(selectJoin) + stateAndRefs.clear() + log.debug(selectJoin) + var totalPennies = 0L + while (rs.next()) { + val txHash = SecureHash.parse(rs.getString(1)) + val index = rs.getInt(2) + val stateRef = StateRef(txHash, index) + val state = rs.getBytes(3).deserialize>(storageKryo()) + val pennies = rs.getLong(4) + totalPennies = rs.getLong(5) + val rowLockId = rs.getString(6) + stateAndRefs.add(StateAndRef(state, stateRef)) + log.trace { "ROW: $rowLockId ($lockId): $stateRef : $pennies ($totalPennies)" } + } + + if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) { + // we should have a minimum number of states to satisfy our selection `amount` criteria + log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs") + + // update database + softLockReserve(lockId, stateAndRefs.map { it.ref }.toSet()) + return stateAndRefs + } + log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}") + // retry as more states may become available + } catch (e: SQLException) { + log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId] + $e. + """) + } catch (e: StatesNotAvailableException) { + stateAndRefs.clear() + log.warn(e.message) + // retry only if there are locked states that may become available again (or consumed with change) + } finally { + statement.close() + } + } + + log.warn("Coin selection failed on attempt $retryCount") + // TODO: revisit the back off strategy for contended spending. + if (retryCount != MAX_RETRIES) { + FlowStateMachineImpl.sleep(RETRY_SLEEP * retryCount.toLong()) + } + } + + log.warn("Insufficient spendable states identified for $amount") + return stateAndRefs + } + + override fun softLockedStates(lockId: UUID?): List> { + val stateAndRefs = + session.withTransaction(TransactionIsolation.REPEATABLE_READ) { + var query = select(VaultSchema.VaultStates::class) + .where(VaultSchema.VaultStates::stateStatus eq Vault.StateStatus.UNCONSUMED) + .and(VaultSchema.VaultStates::contractStateClassName eq Cash.State::class.java.name) + if (lockId != null) + query.and(VaultSchema.VaultStates::lockId eq lockId) + else + query.and(VaultSchema.VaultStates::lockId.notNull()) + query.get() + .map { it -> + val stateRef = StateRef(SecureHash.parse(it.txId), it.index) + val state = it.contractState.deserialize>(storageKryo()) + StateAndRef(state, stateRef) + }.toList() + } + return stateAndRefs + } + + /** + * Generate a transaction that moves an amount of currency to the given pubkey. + * + * @param onlyFromParties if non-null, the asset states will be filtered to only include those issued by the set + * of given parties. This can be useful if the party you're trying to pay has expectations + * about which type of asset claims they are willing to accept. + */ + @Suspendable override fun generateSpend(tx: TransactionBuilder, amount: Amount, to: CompositeKey, @@ -242,30 +473,25 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P // // Finally, we add the states to the provided partial transaction. - val assetsStates = unconsumedStates() + // Retrieve unspent and unlocked cash states that meet our spending criteria. + val acceptableCoins = unconsumedStatesForSpending(amount, onlyFromParties, tx.notary, tx.lockId) - val currency = amount.token - var acceptableCoins = run { - val ofCurrency = assetsStates.filter { it.state.data.amount.token.product == currency } - if (onlyFromParties != null) - ofCurrency.filter { it.state.data.amount.token.issuer.party in onlyFromParties } - else - ofCurrency - } - tx.notary = acceptableCoins.firstOrNull()?.state?.notary // TODO: We should be prepared to produce multiple transactions spending inputs from // different notaries, or at least group states by notary and take the set with the - // highest total value - acceptableCoins = acceptableCoins.filter { it.state.notary == tx.notary } + // highest total value. + + // notary may be associated with locked state only + tx.notary = acceptableCoins.firstOrNull()?.state?.notary val (gathered, gatheredAmount) = gatherCoins(acceptableCoins, amount) + val takeChangeFrom = gathered.firstOrNull() val change = if (takeChangeFrom != null && gatheredAmount > amount) { Amount(gatheredAmount.quantity - amount.quantity, takeChangeFrom.state.data.amount.token) } else { null } - val keysUsed = gathered.map { it.state.data.owner }.toSet() + val keysUsed = gathered.map { it.state.data.owner } val states = gathered.groupBy { it.state.data.amount.token.issuer }.map { val coins = it.value @@ -293,15 +519,14 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P for (state in outputs) tx.addOutputState(state) // What if we already have a move command with the right keys? Filter it out here or in platform code? - val keysList = keysUsed.toList() - tx.addCommand(Cash().generateMoveCommand(), keysList) + tx.addCommand(Cash().generateMoveCommand(), keysUsed) // update Vault // notify(tx.toWireTransaction()) // Vault update must be completed AFTER transaction is recorded to ledger storage!!! // (this is accomplished within the recordTransaction function) - return Pair(tx, keysList) + return Pair(tx, keysUsed) } private fun deriveState(txState: TransactionState, amount: Amount>, owner: CompositeKey) @@ -325,8 +550,12 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P gatheredAmount += Amount(c.state.data.amount.quantity, amount.token) } - if (gatheredAmount < amount) + if (gatheredAmount < amount) { + log.trace("Insufficient balance: requested $amount, available $gatheredAmount (total balance ${cashBalances[amount.token]})") throw InsufficientBalanceException(amount - gatheredAmount) + } + + log.trace("Gathered coins: requested $amount, available $gatheredAmount, change: ${gatheredAmount - amount}") return Pair(gathered, gatheredAmount) } @@ -339,20 +568,28 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P // Retrieve all unconsumed states for this transaction's inputs val consumedStates = HashSet>() if (tx.inputs.isNotEmpty()) { - val stateRefs = tx.inputs.fold("") { stateRefs, it -> stateRefs + "('${it.txhash}','${it.index}')," }.dropLast(1) + val stateRefs = stateRefsToCompositeKeyStr(tx.inputs) // TODO: using native JDBC until requery supports SELECT WHERE COMPOSITE_KEY IN // https://github.com/requery/requery/issues/434 val statement = configuration.jdbcSession().createStatement() - val rs = statement.executeQuery("SELECT transaction_id, output_index, contract_state " + - "FROM vault_states " + - "WHERE ((transaction_id, output_index) IN ($stateRefs)) " + - "AND (state_status = 0)") - while (rs.next()) { - val txHash = SecureHash.parse(rs.getString(1)) - val index = rs.getInt(2) - val state = rs.getBytes(3).deserialize>(storageKryo()) - consumedStates.add(StateAndRef(state, StateRef(txHash, index))) + try { + // TODO: upgrade to Requery 1.2.0 and rewrite with Requery DSL (https://github.com/requery/requery/issues/434) + val rs = statement.executeQuery("SELECT transaction_id, output_index, contract_state " + + "FROM vault_states " + + "WHERE ((transaction_id, output_index) IN ($stateRefs)) " + + "AND (state_status = 0)") + while (rs.next()) { + val txHash = SecureHash.parse(rs.getString(1)) + val index = rs.getInt(2) + val state = rs.getBytes(3).deserialize>(storageKryo()) + consumedStates.add(StateAndRef(state, StateRef(txHash, index))) + } + } catch (e: SQLException) { + log.error("""Failed retrieving state refs for: $stateRefs + $e. + """) } + finally { statement.close() } } // Is transaction irrelevant? @@ -387,4 +624,11 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P is LinearState -> state.isRelevant(ourKeys) else -> false } + + /** + * Helper method to generate a string formatted list of Composite Keys for SQL IN clause + */ + private fun stateRefsToCompositeKeyStr(stateRefs: List): String { + return stateRefs.fold("") { stateRefsAsStr, it -> stateRefsAsStr + "('${it.txhash}','${it.index}')," }.dropLast(1) + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt b/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt new file mode 100644 index 0000000000..a034c042ca --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt @@ -0,0 +1,61 @@ +package net.corda.node.services.vault +import net.corda.core.contracts.StateRef +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StateMachineRunId +import net.corda.core.node.services.VaultService +import net.corda.core.utilities.loggerFor +import net.corda.node.services.statemachine.StateMachineManager +import net.corda.node.utilities.AddOrRemove +import java.util.* + +class VaultSoftLockManager(val vault: VaultService, smm: StateMachineManager) { + + private companion object { + val log = loggerFor() + } + + private val trackingFlowIds: MutableSet = Collections.synchronizedSet(HashSet()) + + init { + smm.changes.subscribe { change -> + if (change.addOrRemove == AddOrRemove.REMOVE && trackingFlowIds.contains(change.id.uuid)) { + log.trace( "${change.addOrRemove} Flow name ${change.logic.javaClass} with id ${change.id}") + unregisterSoftLocks(change.id, change.logic) + } + trackingFlowIds.remove(change.id.uuid) + } + + // Discussion + // + // The intent of the following approach is to support what might be a common pattern in a flow: + // 1. Create state + // 2. Do something with state + // without possibility of another flow intercepting the state between 1 and 2, + // since we cannot lock the state before it exists. e.g. Issue and then Move some Cash. + // + // The downside is we could have a long running flow that holds a lock for a long period of time. + // However, the lock can be programmatically released, like any other soft lock, + // should we want a long running flow that creates a visible state mid way through. + + vault.rawUpdates.subscribe { update -> + update.flowId?.let { + if (update.produced.isNotEmpty()) { + registerSoftLocks(update.flowId as UUID, update.produced.map { it.ref }) + trackingFlowIds.add(update.flowId as UUID) + } + } + } + } + + private fun registerSoftLocks(flowId: UUID, stateRefs: List) { + log.trace("Reserving soft locks for flow id $flowId and states $stateRefs") + vault.softLockReserve(flowId, stateRefs.toSet()) + } + + private fun unregisterSoftLocks(id: StateMachineRunId, logic: FlowLogic<*>) { + val flowClassName = logic.javaClass.simpleName + log.trace("Releasing soft locks for flow $flowClassName with flow id ${id.uuid}") + vault.softLockRelease(id.uuid) + + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/NodeVaultServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/NodeVaultServiceTest.kt deleted file mode 100644 index e520677ce5..0000000000 --- a/node/src/test/kotlin/net/corda/node/services/NodeVaultServiceTest.kt +++ /dev/null @@ -1,156 +0,0 @@ -package net.corda.node.services - -import net.corda.contracts.asset.Cash -import net.corda.contracts.testing.fillWithSomeTestCash -import net.corda.core.contracts.DOLLARS -import net.corda.core.contracts.POUNDS -import net.corda.core.contracts.TransactionType -import net.corda.core.contracts.`issued by` -import net.corda.core.crypto.composite -import net.corda.core.node.services.TxWritableStorageService -import net.corda.core.node.services.VaultService -import net.corda.core.node.services.unconsumedStates -import net.corda.core.transactions.SignedTransaction -import net.corda.core.utilities.DUMMY_NOTARY -import net.corda.core.utilities.LogHelper -import net.corda.node.services.vault.NodeVaultService -import net.corda.node.utilities.configureDatabase -import net.corda.node.utilities.databaseTransaction -import net.corda.testing.MEGA_CORP -import net.corda.testing.MEGA_CORP_KEY -import net.corda.testing.node.MockServices -import net.corda.testing.node.makeTestDataSourceProperties -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.exposed.sql.Database -import org.junit.After -import org.junit.Before -import org.junit.Test -import java.io.Closeable -import java.util.* -import kotlin.test.assertEquals - -class NodeVaultServiceTest { - lateinit var dataSource: Closeable - lateinit var database: Database - private val dataSourceProps = makeTestDataSourceProperties() - - @Before - fun setUp() { - LogHelper.setLevel(NodeVaultService::class) - val dataSourceAndDatabase = configureDatabase(dataSourceProps) - dataSource = dataSourceAndDatabase.first - database = dataSourceAndDatabase.second - } - - @After - fun tearDown() { - dataSource.close() - LogHelper.reset(NodeVaultService::class) - } - - @Test - fun `states not local to instance`() { - databaseTransaction(database) { - val services1 = object : MockServices() { - override val vaultService: VaultService = NodeVaultService(this, dataSourceProps) - - override fun recordTransactions(txs: Iterable) { - for (stx in txs) { - storageService.validatedTransactions.addTransaction(stx) - vaultService.notify(stx.tx) - } - } - } - services1.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) - - val w1 = services1.vaultService.unconsumedStates() - assertThat(w1).hasSize(3) - - val originalStorage = services1.storageService - val originalVault = services1.vaultService - val services2 = object : MockServices() { - override val vaultService: VaultService get() = originalVault - - // We need to be able to find the same transactions as before, too. - override val storageService: TxWritableStorageService get() = originalStorage - - override fun recordTransactions(txs: Iterable) { - for (stx in txs) { - storageService.validatedTransactions.addTransaction(stx) - vaultService.notify(stx.tx) - } - } - } - - val w2 = services2.vaultService.unconsumedStates() - assertThat(w2).hasSize(3) - } - } - - @Test - fun `states for refs`() { - databaseTransaction(database) { - val services1 = object : MockServices() { - override val vaultService: VaultService = NodeVaultService(this, dataSourceProps) - - override fun recordTransactions(txs: Iterable) { - for (stx in txs) { - storageService.validatedTransactions.addTransaction(stx) - vaultService.notify(stx.tx) - } - } - } - services1.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) - - val w1 = services1.vaultService.unconsumedStates().toList() - assertThat(w1).hasSize(3) - - val stateRefs = listOf(w1[1].ref, w1[2].ref) - val states = services1.vaultService.statesForRefs(stateRefs) - assertThat(states).hasSize(2) - } - } - - @Test - fun addNoteToTransaction() { - databaseTransaction(database) { - val services = object : MockServices() { - override val vaultService: VaultService = NodeVaultService(this, dataSourceProps) - - override fun recordTransactions(txs: Iterable) { - for (stx in txs) { - storageService.validatedTransactions.addTransaction(stx) - } - // Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions. - vaultService.notifyAll(txs.map { it.tx }) - } - } - - val freshKey = services.legalIdentityKey - - // Issue a txn to Send us some Money - val usefulTX = TransactionType.General.Builder(null).apply { - Cash().generateIssue(this, 100.DOLLARS `issued by` MEGA_CORP.ref(1), freshKey.public.composite, DUMMY_NOTARY) - signWith(MEGA_CORP_KEY) - }.toSignedTransaction() - - services.recordTransactions(listOf(usefulTX)) - - services.vaultService.addNoteToTransaction(usefulTX.id, "USD Sample Note 1") - services.vaultService.addNoteToTransaction(usefulTX.id, "USD Sample Note 2") - services.vaultService.addNoteToTransaction(usefulTX.id, "USD Sample Note 3") - assertEquals(3, services.vaultService.getTransactionNotes(usefulTX.id).count()) - - // Issue more Money (GBP) - val anotherTX = TransactionType.General.Builder(null).apply { - Cash().generateIssue(this, 200.POUNDS `issued by` MEGA_CORP.ref(1), freshKey.public.composite, DUMMY_NOTARY) - signWith(MEGA_CORP_KEY) - }.toSignedTransaction() - - services.recordTransactions(listOf(anotherTX)) - - services.vaultService.addNoteToTransaction(anotherTX.id, "GPB Sample Note 1") - assertEquals(1, services.vaultService.getTransactionNotes(anotherTX.id).count()) - } - } -} diff --git a/node/src/test/kotlin/net/corda/node/services/VaultWithCashTest.kt b/node/src/test/kotlin/net/corda/node/services/VaultWithCashTest.kt index 345926ed36..735ade1011 100644 --- a/node/src/test/kotlin/net/corda/node/services/VaultWithCashTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/VaultWithCashTest.kt @@ -7,7 +7,6 @@ import net.corda.contracts.testing.fillWithSomeTestDeals import net.corda.contracts.testing.fillWithSomeTestLinearStates import net.corda.core.contracts.* import net.corda.core.crypto.composite -import net.corda.core.node.recordTransactions import net.corda.core.node.services.VaultService import net.corda.core.node.services.consumedStates import net.corda.core.node.services.unconsumedStates @@ -15,6 +14,8 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.core.utilities.LogHelper +import net.corda.node.services.schema.HibernateObserver +import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.vault.NodeVaultService import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.databaseTransaction @@ -32,6 +33,8 @@ import org.junit.Before import org.junit.Test import java.io.Closeable import java.util.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors import kotlin.test.assertEquals import kotlin.test.assertNull @@ -52,7 +55,7 @@ class VaultWithCashTest { database = dataSourceAndDatabase.second databaseTransaction(database) { services = object : MockServices() { - override val vaultService: VaultService = NodeVaultService(this, dataSourceProps) + override val vaultService: VaultService = makeVaultService(dataSourceProps) override fun recordTransactions(txs: Iterable) { for (stx in txs) { @@ -128,6 +131,98 @@ class VaultWithCashTest { } } + @Test + fun `issue and attempt double spend`() { + val freshKey = services.keyManagementService.freshKey() + + databaseTransaction(database) { + // A tx that sends us money. + services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 10, 10, Random(0L), + issuedBy = MEGA_CORP.ref(1), + issuerKey = MEGA_CORP_KEY, + ownedBy = freshKey.public.composite) + println("Cash balance: ${vault.cashBalances[USD]}") + + assertThat(vault.unconsumedStates()).hasSize(10) + assertThat(vault.softLockedStates()).hasSize(0) + } + + val backgroundExecutor = Executors.newFixedThreadPool(2) + val countDown = CountDownLatch(2) + // 1st tx that spends our money. + backgroundExecutor.submit { + databaseTransaction(database) { + try { + val txn1 = + TransactionType.General.Builder(DUMMY_NOTARY).apply { + vault.generateSpend(this, 60.DOLLARS, BOB_PUBKEY) + signWith(freshKey) + signWith(DUMMY_NOTARY_KEY) + }.toSignedTransaction() + println("txn1: ${txn1.id} spent ${((txn1.tx.outputs[0].data) as Cash.State).amount}") + println("""txn1 states: + UNCONSUMED: ${vault.unconsumedStates().count()} : ${vault.unconsumedStates()}, + CONSUMED: ${vault.consumedStates().count()} : ${vault.consumedStates()}, + LOCKED: ${vault.softLockedStates().count()} : ${vault.softLockedStates()} + """) + services.recordTransactions(txn1) + println("txn1: Cash balance: ${vault.cashBalances[USD]}") + println("""txn1 states: + UNCONSUMED: ${vault.unconsumedStates().count()} : ${vault.unconsumedStates()}, + CONSUMED: ${vault.consumedStates().count()} : ${vault.consumedStates()}, + LOCKED: ${vault.softLockedStates().count()} : ${vault.softLockedStates()} + """) + txn1 + } + catch(e: Exception) { + println(e) + } + } + println("txn1 COMMITTED!") + countDown.countDown() + } + + // 2nd tx that attempts to spend same money + backgroundExecutor.submit { + databaseTransaction(database) { + try { + val txn2 = + TransactionType.General.Builder(DUMMY_NOTARY).apply { + vault.generateSpend(this, 80.DOLLARS, BOB_PUBKEY) + signWith(freshKey) + signWith(DUMMY_NOTARY_KEY) + }.toSignedTransaction() + println("txn2: ${txn2.id} spent ${((txn2.tx.outputs[0].data) as Cash.State).amount}") + println("""txn2 states: + UNCONSUMED: ${vault.unconsumedStates().count()} : ${vault.unconsumedStates()}, + CONSUMED: ${vault.consumedStates().count()} : ${vault.consumedStates()}, + LOCKED: ${vault.softLockedStates().count()} : ${vault.softLockedStates()} + """) + services.recordTransactions(txn2) + println("txn2: Cash balance: ${vault.cashBalances[USD]}") + println("""txn2 states: + UNCONSUMED: ${vault.unconsumedStates().count()} : ${vault.unconsumedStates()}, + CONSUMED: ${vault.consumedStates().count()} : ${vault.consumedStates()}, + LOCKED: ${vault.softLockedStates().count()} : ${vault.softLockedStates()} + """) + txn2 + } + catch(e: Exception) { + println(e) + } + } + println("txn2 COMMITTED!") + + countDown.countDown() + } + + countDown.await() + databaseTransaction(database) { + println("Cash balance: ${vault.cashBalances[USD]}") + assertThat(vault.cashBalances[USD]).isIn(DOLLARS(20),DOLLARS(40)) + } + } + @Test fun `branching LinearStates fails to verify`() { databaseTransaction(database) { diff --git a/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt new file mode 100644 index 0000000000..fcb198ccac --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt @@ -0,0 +1,393 @@ +package net.corda.node.services.vault + +import net.corda.contracts.asset.Cash +import net.corda.contracts.testing.fillWithSomeTestCash +import net.corda.core.contracts.* +import net.corda.core.crypto.composite +import net.corda.core.flows.FlowException +import net.corda.core.node.services.TxWritableStorageService +import net.corda.core.node.services.VaultService +import net.corda.core.node.services.unconsumedStates +import net.corda.core.transactions.SignedTransaction +import net.corda.core.utilities.DUMMY_NOTARY +import net.corda.core.utilities.LogHelper +import net.corda.node.services.schema.HibernateObserver +import net.corda.node.services.schema.NodeSchemaService +import net.corda.node.utilities.configureDatabase +import net.corda.node.utilities.databaseTransaction +import net.corda.testing.MEGA_CORP +import net.corda.testing.MEGA_CORP_KEY +import net.corda.testing.node.MockServices +import net.corda.testing.node.makeTestDataSourceProperties +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.jetbrains.exposed.sql.Database +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.io.Closeable +import java.util.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import kotlin.test.assertEquals +import kotlin.test.assertNull + +class NodeVaultServiceTest { + lateinit var services: MockServices + val vault: VaultService get() = services.vaultService + lateinit var dataSource: Closeable + lateinit var database: Database + + @Before + fun setUp() { + LogHelper.setLevel(NodeVaultService::class) + val dataSourceProps = makeTestDataSourceProperties() + val dataSourceAndDatabase = configureDatabase(dataSourceProps) + dataSource = dataSourceAndDatabase.first + database = dataSourceAndDatabase.second + databaseTransaction(database) { + services = object : MockServices() { + override val vaultService: VaultService = makeVaultService(dataSourceProps) + + override fun recordTransactions(txs: Iterable) { + for (stx in txs) { + storageService.validatedTransactions.addTransaction(stx) + } + // Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions. + vaultService.notifyAll(txs.map { it.tx }) + } + } + } + } + + @After + fun tearDown() { + dataSource.close() + LogHelper.reset(NodeVaultService::class) + } + + @Test + fun `states not local to instance`() { + databaseTransaction(database) { + + services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) + + val w1 = services.vaultService.unconsumedStates() + assertThat(w1).hasSize(3) + + val originalStorage = services.storageService + val originalVault = services.vaultService + val services2 = object : MockServices() { + override val vaultService: VaultService get() = originalVault + + // We need to be able to find the same transactions as before, too. + override val storageService: TxWritableStorageService get() = originalStorage + + override fun recordTransactions(txs: Iterable) { + for (stx in txs) { + storageService.validatedTransactions.addTransaction(stx) + vaultService.notify(stx.tx) + } + } + } + + val w2 = services2.vaultService.unconsumedStates() + assertThat(w2).hasSize(3) + } + } + + @Test + fun `states for refs`() { + databaseTransaction(database) { + + services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) + + val w1 = services.vaultService.unconsumedStates().toList() + assertThat(w1).hasSize(3) + + val stateRefs = listOf(w1[1].ref, w1[2].ref) + val states = services.vaultService.statesForRefs(stateRefs) + assertThat(states).hasSize(2) + } + } + + @Test + fun `states soft locking reserve and release`() { + databaseTransaction(database) { + + services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) + + val unconsumedStates = services.vaultService.unconsumedStates().toList() + assertThat(unconsumedStates).hasSize(3) + + val stateRefsToSoftLock = setOf(unconsumedStates[1].ref, unconsumedStates[2].ref) + + // soft lock two of the three states + val softLockId = UUID.randomUUID() + services.vaultService.softLockReserve(softLockId, stateRefsToSoftLock) + + // all softlocked states + assertThat(services.vaultService.softLockedStates()).hasSize(2) + // my softlocked states + assertThat(services.vaultService.softLockedStates(softLockId)).hasSize(2) + + // excluding softlocked states + val unlockedStates1 = services.vaultService.unconsumedStates(includeSoftLockedStates = false) + assertThat(unlockedStates1).hasSize(1) + + // soft lock release one of the states explicitly + services.vaultService.softLockRelease(softLockId, setOf(unconsumedStates[1].ref)) + val unlockedStates2 = services.vaultService.unconsumedStates(includeSoftLockedStates = false) + assertThat(unlockedStates2).hasSize(2) + + // soft lock release the rest by id + services.vaultService.softLockRelease(softLockId) + val unlockedStates = services.vaultService.unconsumedStates(includeSoftLockedStates = false).toList() + assertThat(unlockedStates).hasSize(3) + + // should be back to original states + assertThat(unlockedStates).isEqualTo(unconsumedStates) + } + } + + @Test + fun `soft locking attempt concurrent reserve`() { + + val backgroundExecutor = Executors.newFixedThreadPool(2) + val countDown = CountDownLatch(2) + + val softLockId1 = UUID.randomUUID() + val softLockId2 = UUID.randomUUID() + + val vaultStates = + databaseTransaction(database) { + assertNull(vault.cashBalances[USD]) + services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) + } + val stateRefsToSoftLock = vaultStates.states.map { it.ref }.toSet() + println("State Refs:: $stateRefsToSoftLock") + + // 1st tx locks states + backgroundExecutor.submit { + try { + databaseTransaction(database) { + vault.softLockReserve(softLockId1, stateRefsToSoftLock) + assertThat(vault.softLockedStates(softLockId1)).hasSize(3) + } + println("SOFT LOCK STATES #1 succeeded") + } catch(e: Throwable) { + println("SOFT LOCK STATES #1 failed") + } finally { + countDown.countDown() + } + } + + // 2nd tx attempts to lock same states + backgroundExecutor.submit { + try { + Thread.sleep(100) // let 1st thread soft lock them 1st + databaseTransaction(database) { + vault.softLockReserve(softLockId2, stateRefsToSoftLock) + assertThat(vault.softLockedStates(softLockId2)).hasSize(3) + } + println("SOFT LOCK STATES #2 succeeded") + } catch(e: Throwable) { + println("SOFT LOCK STATES #2 failed") + } finally { + countDown.countDown() + } + } + + countDown.await() + databaseTransaction(database) { + val lockStatesId1 = vault.softLockedStates(softLockId1) + println("SOFT LOCK #1 final states: $lockStatesId1") + assertThat(lockStatesId1.size).isIn(0, 3) + val lockStatesId2 = vault.softLockedStates(softLockId2) + println("SOFT LOCK #2 final states: $lockStatesId2") + assertThat(lockStatesId2.size).isIn(0, 3) + } + } + + @Test + fun `soft locking partial reserve states fails`() { + + val softLockId1 = UUID.randomUUID() + val softLockId2 = UUID.randomUUID() + + val vaultStates = + databaseTransaction(database) { + assertNull(vault.cashBalances[USD]) + services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) + } + val stateRefsToSoftLock = vaultStates.states.map { it.ref }.toSet() + println("State Refs:: $stateRefsToSoftLock") + + // lock 1st state with LockId1 + databaseTransaction(database) { + vault.softLockReserve(softLockId1, setOf(stateRefsToSoftLock.first())) + assertThat(vault.softLockedStates(softLockId1)).hasSize(1) + } + + // attempt to lock all 3 states with LockId2 + databaseTransaction(database) { + assertThatExceptionOfType(FlowException::class.java).isThrownBy( + { vault.softLockReserve(softLockId2, stateRefsToSoftLock) } + ).withMessageContaining("only 2 rows available").withNoCause() + } + } + + @Test + fun `attempt to lock states already soft locked by me`() { + + val softLockId1 = UUID.randomUUID() + + val vaultStates = + databaseTransaction(database) { + assertNull(vault.cashBalances[USD]) + services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) + } + val stateRefsToSoftLock = vaultStates.states.map { it.ref }.toSet() + println("State Refs:: $stateRefsToSoftLock") + + // lock states with LockId1 + databaseTransaction(database) { + vault.softLockReserve(softLockId1, stateRefsToSoftLock) + assertThat(vault.softLockedStates(softLockId1)).hasSize(3) + } + + // attempt to relock same states with LockId1 + databaseTransaction(database) { + vault.softLockReserve(softLockId1, stateRefsToSoftLock) + assertThat(vault.softLockedStates(softLockId1)).hasSize(3) + } + } + + @Test + fun `lock additional states to some already soft locked by me`() { + + val softLockId1 = UUID.randomUUID() + + val vaultStates = + databaseTransaction(database) { + assertNull(vault.cashBalances[USD]) + services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L)) + } + val stateRefsToSoftLock = vaultStates.states.map { it.ref }.toSet() + println("State Refs:: $stateRefsToSoftLock") + + // lock states with LockId1 + databaseTransaction(database) { + vault.softLockReserve(softLockId1, setOf(stateRefsToSoftLock.first())) + assertThat(vault.softLockedStates(softLockId1)).hasSize(1) + } + + // attempt to lock all states with LockId1 (including previously already locked one) + databaseTransaction(database) { + vault.softLockReserve(softLockId1, stateRefsToSoftLock) + assertThat(vault.softLockedStates(softLockId1)).hasSize(3) + } + } + + @Test + fun `unconsumedStatesForSpending exact amount`() { + databaseTransaction(database) { + + services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 1, 1, Random(0L)) + + val unconsumedStates = services.vaultService.unconsumedStates().toList() + assertThat(unconsumedStates).hasSize(1) + + val spendableStatesUSD = (services.vaultService as NodeVaultService).unconsumedStatesForSpending(100.DOLLARS, lockId = UUID.randomUUID()) + spendableStatesUSD.forEach(::println) + assertThat(spendableStatesUSD).hasSize(1) + assertThat(spendableStatesUSD[0].state.data.amount.quantity).isEqualTo(100L*100) + assertThat(services.vaultService.softLockedStates()).hasSize(1) + } + } + + @Test + fun `unconsumedStatesForSpending insufficient amount`() { + databaseTransaction(database) { + + services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 1, 1, Random(0L)) + + val unconsumedStates = services.vaultService.unconsumedStates().toList() + assertThat(unconsumedStates).hasSize(1) + + val spendableStatesUSD = (services.vaultService as NodeVaultService).unconsumedStatesForSpending(110.DOLLARS, lockId = UUID.randomUUID()) + spendableStatesUSD.forEach(::println) + assertThat(spendableStatesUSD).hasSize(1) + assertThat(services.vaultService.softLockedStates()).hasSize(0) + } + } + + @Test + fun `unconsumedStatesForSpending small amount`() { + databaseTransaction(database) { + + services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 2, 2, Random(0L)) + + val unconsumedStates = services.vaultService.unconsumedStates().toList() + assertThat(unconsumedStates).hasSize(2) + + val spendableStatesUSD = (services.vaultService as NodeVaultService).unconsumedStatesForSpending(1.DOLLARS, lockId = UUID.randomUUID()) + spendableStatesUSD.forEach(::println) + assertThat(spendableStatesUSD).hasSize(1) + assertThat(spendableStatesUSD[0].state.data.amount.quantity).isGreaterThanOrEqualTo(1L*100) + assertThat(services.vaultService.softLockedStates()).hasSize(1) + } + } + + @Test + fun `states soft locking query granularity`() { + databaseTransaction(database) { + + services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 10, 10, Random(0L)) + services.fillWithSomeTestCash(100.POUNDS, DUMMY_NOTARY, 10, 10, Random(0L)) + services.fillWithSomeTestCash(100.SWISS_FRANCS, DUMMY_NOTARY, 10, 10, Random(0L)) + + val allStates = services.vaultService.unconsumedStates() + assertThat(allStates).hasSize(30) + + for (i in 1..5) { + val spendableStatesUSD = (services.vaultService as NodeVaultService).unconsumedStatesForSpending(20.DOLLARS, lockId = UUID.randomUUID()) + spendableStatesUSD.forEach(::println) + } + // note only 3 spend attempts succeed with a total of 8 states + assertThat(services.vaultService.softLockedStates()).hasSize(8) + } + } + + @Test + fun addNoteToTransaction() { + databaseTransaction(database) { + + val freshKey = services.legalIdentityKey + + // Issue a txn to Send us some Money + val usefulTX = TransactionType.General.Builder(null).apply { + Cash().generateIssue(this, 100.DOLLARS `issued by` MEGA_CORP.ref(1), freshKey.public.composite, DUMMY_NOTARY) + signWith(MEGA_CORP_KEY) + }.toSignedTransaction() + + services.recordTransactions(listOf(usefulTX)) + + services.vaultService.addNoteToTransaction(usefulTX.id, "USD Sample Note 1") + services.vaultService.addNoteToTransaction(usefulTX.id, "USD Sample Note 2") + services.vaultService.addNoteToTransaction(usefulTX.id, "USD Sample Note 3") + assertEquals(3, services.vaultService.getTransactionNotes(usefulTX.id).count()) + + // Issue more Money (GBP) + val anotherTX = TransactionType.General.Builder(null).apply { + Cash().generateIssue(this, 200.POUNDS `issued by` MEGA_CORP.ref(1), freshKey.public.composite, DUMMY_NOTARY) + signWith(MEGA_CORP_KEY) + }.toSignedTransaction() + + services.recordTransactions(listOf(anotherTX)) + + services.vaultService.addNoteToTransaction(anotherTX.id, "GPB Sample Note 1") + assertEquals(1, services.vaultService.getTransactionNotes(anotherTX.id).count()) + } + } +} diff --git a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/TraderDemoClientApi.kt b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/TraderDemoClientApi.kt index 3a516322de..58b9419c87 100644 --- a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/TraderDemoClientApi.kt +++ b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/TraderDemoClientApi.kt @@ -4,6 +4,7 @@ import com.google.common.util.concurrent.Futures import net.corda.contracts.testing.calculateRandomlySizedAmounts import net.corda.core.contracts.Amount import net.corda.core.contracts.DOLLARS +import net.corda.core.contracts.Issued import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow @@ -24,14 +25,14 @@ class TraderDemoClientApi(val rpc: CordaRPCOps) { val logger = loggerFor() } - fun runBuyer(amount: Amount = 30000.0.DOLLARS) { + fun runBuyer(amount: Amount = 30000.DOLLARS) { val bankOfCordaParty = rpc.partyFromName(BOC.name) ?: throw Exception("Unable to locate ${BOC.name} in Network Map Service") val me = rpc.nodeIdentity() - // TODO: revert back to multiple issue request amounts (3,10) when soft locking implemented - val amounts = calculateRandomlySizedAmounts(amount, 1, 1, Random()) - val resultFutures = amounts.map { - rpc.startFlow(::IssuanceRequester, amount, me.legalIdentity, OpaqueBytes.of(1), bankOfCordaParty).returnValue + val amounts = calculateRandomlySizedAmounts(amount, 3, 10, Random()) + // issuer random amounts of currency totaling 30000.DOLLARS in parallel + val resultFutures = amounts.map { pennies -> + rpc.startFlow(::IssuanceRequester, Amount(pennies, amount.token), me.legalIdentity, OpaqueBytes.of(1), bankOfCordaParty).returnValue } Futures.allAsList(resultFutures).getOrThrow() diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt index 6426d5c59b..189beca997 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -15,6 +15,9 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_NOTARY import net.corda.node.services.persistence.InMemoryStateMachineRecordedTransactionMappingStorage +import net.corda.node.services.schema.HibernateObserver +import net.corda.node.services.schema.NodeSchemaService +import net.corda.node.services.vault.NodeVaultService import net.corda.testing.MEGA_CORP import net.corda.testing.MINI_CORP import net.corda.testing.MOCK_VERSION @@ -65,6 +68,13 @@ open class MockServices(val key: KeyPair = generateKeyPair()) : ServiceHub { override val clock: Clock get() = Clock.systemUTC() override val schedulerService: SchedulerService get() = throw UnsupportedOperationException() override val myInfo: NodeInfo get() = NodeInfo(object : SingleMessageRecipient {}, Party("MegaCorp", key.public.composite), MOCK_VERSION) + + fun makeVaultService(dataSourceProps: Properties): VaultService { + val vaultService = NodeVaultService(this, dataSourceProps) + // Vault cash spending requires access to contract_cash_states and their updates + HibernateObserver(vaultService, NodeSchemaService()) + return vaultService + } } @ThreadSafe @@ -170,7 +180,7 @@ class MockStorageService(override val attachments: AttachmentStorage = MockAttac fun makeTestDataSourceProperties(nodeName: String = SecureHash.randomSHA256().toString()): Properties { val props = Properties() props.setProperty("dataSourceClassName", "org.h2.jdbcx.JdbcDataSource") - props.setProperty("dataSource.url", "jdbc:h2:mem:${nodeName}_persistence;DB_CLOSE_ON_EXIT=FALSE") + props.setProperty("dataSource.url", "jdbc:h2:mem:${nodeName}_persistence;LOCK_TIMEOUT=10000;DB_CLOSE_ON_EXIT=FALSE") props.setProperty("dataSource.user", "sa") props.setProperty("dataSource.password", "") return props diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt index da770d18f7..93c4ca0670 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt @@ -12,10 +12,14 @@ import joptsimple.OptionParser import net.corda.client.jfx.model.Models import net.corda.client.jfx.model.observableValue import net.corda.client.mock.EventGenerator +import net.corda.contracts.asset.Cash import net.corda.core.contracts.GBP import net.corda.core.contracts.USD +import net.corda.core.messaging.FlowHandle import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceType +import net.corda.core.transactions.SignedTransaction +import net.corda.core.utilities.loggerFor import net.corda.explorer.model.CordaViewModel import net.corda.explorer.model.SettingsModel import net.corda.explorer.views.* @@ -35,6 +39,9 @@ import tornadofx.App import tornadofx.addStageIcon import tornadofx.find import java.util.* +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.ExecutionException +import kotlin.concurrent.thread /** * Main class for Explorer, you will need Tornado FX to run the explorer. @@ -43,6 +50,10 @@ class Main : App(MainView::class) { private val loginView by inject() private val fullscreen by observableValue(SettingsModel::fullscreenProperty) + companion object { + val log = loggerFor
() + } + override fun start(stage: Stage) { // Login to Corda node super.start(stage) @@ -207,34 +218,93 @@ fun main(args: Array) { currencies = listOf(USD) ) - for (i in 0..1000) { + val maxIterations = 100000 + val flowHandles = mapOf( + "GBPIssuer" to ArrayBlockingQueue>(maxIterations+1), + "USDIssuer" to ArrayBlockingQueue>(maxIterations+1), + "Alice" to ArrayBlockingQueue>(maxIterations+1), + "Bob" to ArrayBlockingQueue>(maxIterations+1), + "GBPExit" to ArrayBlockingQueue>(maxIterations+1), + "USDExit" to ArrayBlockingQueue>(maxIterations+1) + ) + + flowHandles.forEach { + thread { + for (i in 0..maxIterations) { + val item = it.value.take() + val out = "[$i] ${it.key} ${item.id} :" + try { + val result = item.returnValue.get() + Main.log.info("$out ${result.id} ${(result.tx.outputs.first().data as Cash.State).amount}") + } catch(e: ExecutionException) { + Main.log.info("$out ${e.cause!!.message}") + } + } + } + } + + for (i in 0..maxIterations) { Thread.sleep(500) - // Party pay requests - listOf(aliceRPC, bobRPC).forEach { - eventGenerator.clientCommandGenerator.map { command -> - command.startFlow(it) + + // Issuer requests + if ((i % 5) == 0) { + issuerGBPEventGenerator.bankOfCordaIssueGenerator.map { command -> + println("[$i] ISSUING ${command.amount} with ref ${command.issueRef} to ${command.recipient}") + val cmd = command.startFlow(issuerRPCGBP) + flowHandles["GBPIssuer"]?.add(cmd) + cmd?.progress?.subscribe({},{})?.unsubscribe() + Unit + }.generate(SplittableRandom()) + issuerUSDEventGenerator.bankOfCordaIssueGenerator.map { command -> + println("[$i] ISSUING ${command.amount} with ref ${command.issueRef} to ${command.recipient}") + val cmd = command.startFlow(issuerRPCUSD) + flowHandles["USDIssuer"]?.add(cmd) + cmd?.progress?.subscribe({},{})?.unsubscribe() Unit }.generate(SplittableRandom()) } + // Exit requests - issuerGBPEventGenerator.bankOfCordaExitGenerator.map { command -> - command.startFlow(issuerRPCGBP) + if ((i % 10) == 0) { + issuerGBPEventGenerator.bankOfCordaExitGenerator.map { command -> + println("[$i] EXITING ${command.amount} with ref ${command.issueRef}") + val cmd = command.startFlow(issuerRPCGBP) + flowHandles["GBPExit"]?.add(cmd) + cmd?.progress?.subscribe({},{})?.unsubscribe() + Unit + }.generate(SplittableRandom()) + issuerUSDEventGenerator.bankOfCordaExitGenerator.map { command -> + println("[$i] EXITING ${command.amount} with ref ${command.issueRef}") + val cmd = command.startFlow(issuerRPCUSD) + flowHandles["USDExit"]?.add(cmd) + cmd?.progress?.subscribe({},{})?.unsubscribe() + Unit + }.generate(SplittableRandom()) + } + + // Party pay requests + + // Alice + eventGenerator.clientCommandGenerator.map { command -> + println("[$i] SENDING ${command.amount} from ${aliceRPC.nodeIdentity().legalIdentity} to ${command.recipient}") + val cmd = command.startFlow(aliceRPC) + flowHandles["Alice"]?.add(cmd) + cmd?.progress?.subscribe({},{})?.unsubscribe() Unit }.generate(SplittableRandom()) - issuerUSDEventGenerator.bankOfCordaExitGenerator.map { command -> - command.startFlow(issuerRPCUSD) - Unit - }.generate(SplittableRandom()) - // Issuer requests - issuerGBPEventGenerator.bankOfCordaIssueGenerator.map { command -> - command.startFlow(issuerRPCGBP) - Unit - }.generate(SplittableRandom()) - issuerUSDEventGenerator.bankOfCordaIssueGenerator.map { command -> - command.startFlow(issuerRPCUSD) + + // Bob + eventGenerator.clientCommandGenerator.map { command -> + println("[$i] SENDING ${command.amount} from ${bobRPC.nodeIdentity().legalIdentity} to ${command.recipient}") + val cmd = command.startFlow(bobRPC) + flowHandles["Bob"]?.add(cmd) + cmd?.progress?.subscribe({},{})?.unsubscribe() Unit }.generate(SplittableRandom()) } + + println("Simulation completed") + aliceClient.close() bobClient.close() issuerClientGBP.close()