From e0b684b3ea112f3785975058b6d42f7eb2a7d8eb Mon Sep 17 00:00:00 2001 From: Christian Sailer Date: Tue, 10 Oct 2017 16:33:16 +0100 Subject: [PATCH] Simple trade flow for commercial paper --- .../corda/ptflows/flows/TwoPartyTradeFlow.kt | 242 ++++++ .../contracts/PtCommercialPaperTests.kt | 4 +- .../contracts/flows/TwoPartyTradeFlowTest.kt | 779 ++++++++++++++++++ 3 files changed, 1023 insertions(+), 2 deletions(-) create mode 100644 perftestflows/src/main/kotlin/net/corda/ptflows/flows/TwoPartyTradeFlow.kt create mode 100644 perftestflows/src/test/kotlin/net/corda/ptflows/contracts/flows/TwoPartyTradeFlowTest.kt diff --git a/perftestflows/src/main/kotlin/net/corda/ptflows/flows/TwoPartyTradeFlow.kt b/perftestflows/src/main/kotlin/net/corda/ptflows/flows/TwoPartyTradeFlow.kt new file mode 100644 index 0000000000..0f40dda661 --- /dev/null +++ b/perftestflows/src/main/kotlin/net/corda/ptflows/flows/TwoPartyTradeFlow.kt @@ -0,0 +1,242 @@ +package net.corda.ptflows.flows + +import co.paralleluniverse.fibers.Suspendable +import net.corda.confidential.IdentitySyncFlow +import net.corda.core.contracts.* +import net.corda.core.flows.* +import net.corda.core.identity.Party +import net.corda.core.identity.PartyAndCertificate +import net.corda.core.serialization.CordaSerializable +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.ProgressTracker +import net.corda.core.utilities.seconds +import net.corda.core.utilities.unwrap +import net.corda.ptflows.contracts.asset.PtCash +import net.corda.ptflows.utils.sumCashBy +import java.security.PublicKey +import java.util.* + +/** + * This asset trading flow implements a "delivery vs payment" type swap. It has two parties (B and S for buyer + * and seller) and the following steps: + * + * 1. S sends the [StateAndRef] pointing to what they want to sell to B, along with info about the price they require + * B to pay. For example this has probably been agreed on an exchange. + * 2. B sends to S a [SignedTransaction] that includes the state as input, B's cash as input, the state with the new + * owner key as output, and any change cash as output. It contains a single signature from B but isn't valid because + * it lacks a signature from S authorising movement of the asset. + * 3. S signs it and commits it to the ledger, notarising it and distributing the final signed transaction back + * to B. + * + * Assuming no malicious termination, they both end the flow being in possession of a valid, signed transaction + * that represents an atomic asset swap. + * + * Note that it's the *seller* who initiates contact with the buyer, not vice-versa as you might imagine. + */ +object TwoPartyTradeFlow { + // TODO: Common elements in multi-party transaction consensus and signing should be refactored into a superclass of this + // and [AbstractStateReplacementFlow]. + + class UnacceptablePriceException(givenPrice: Amount) : FlowException("Unacceptable price: $givenPrice") + + class AssetMismatchException(val expectedTypeName: String, val typeName: String) : FlowException() { + override fun toString() = "The submitted asset didn't match the expected type: $expectedTypeName vs $typeName" + } + + /** + * This object is serialised to the network and is the first flow message the seller sends to the buyer. + * + * @param payToIdentity anonymous identity of the seller, for payment to be sent to. + */ + @CordaSerializable + data class SellerTradeInfo( + val price: Amount, + val payToIdentity: PartyAndCertificate + ) + + open class Seller(private val otherSideSession: FlowSession, + private val assetToSell: StateAndRef, + private val price: Amount, + private val myParty: PartyAndCertificate, // TODO Left because in tests it's used to pass anonymous party. + override val progressTracker: ProgressTracker = Seller.tracker()) : FlowLogic() { + + companion object { + object AWAITING_PROPOSAL : ProgressTracker.Step("Awaiting transaction proposal") + // DOCSTART 3 + object VERIFYING_AND_SIGNING : ProgressTracker.Step("Verifying and signing transaction proposal") { + override fun childProgressTracker() = SignTransactionFlow.tracker() + } + // DOCEND 3 + + fun tracker() = ProgressTracker(AWAITING_PROPOSAL, VERIFYING_AND_SIGNING) + } + + // DOCSTART 4 + @Suspendable + override fun call(): SignedTransaction { + progressTracker.currentStep = AWAITING_PROPOSAL + // Make the first message we'll send to kick off the flow. + val hello = SellerTradeInfo(price, myParty) + // What we get back from the other side is a transaction that *might* be valid and acceptable to us, + // but we must check it out thoroughly before we sign! + // SendTransactionFlow allows seller to access our data to resolve the transaction. + subFlow(SendStateAndRefFlow(otherSideSession, listOf(assetToSell))) + otherSideSession.send(hello) + + // Verify and sign the transaction. + progressTracker.currentStep = VERIFYING_AND_SIGNING + + // Sync identities to ensure we know all of the identities involved in the transaction we're about to + // be asked to sign + subFlow(IdentitySyncFlow.Receive(otherSideSession)) + + // DOCSTART 5 + val signTransactionFlow = object : SignTransactionFlow(otherSideSession, VERIFYING_AND_SIGNING.childProgressTracker()) { + override fun checkTransaction(stx: SignedTransaction) { + // Verify that we know who all the participants in the transaction are + val states: Iterable = stx.tx.inputs.map { serviceHub.loadState(it).data } + stx.tx.outputs.map { it.data } + states.forEach { state -> + state.participants.forEach { anon -> + require(serviceHub.identityService.wellKnownPartyFromAnonymous(anon) != null) { + "Transaction state $state involves unknown participant $anon" + } + } + } + + if (stx.tx.outputStates.sumCashBy(myParty.party).withoutIssuer() != price) + throw FlowException("Transaction is not sending us the right amount of cash") + } + } + + val txId = subFlow(signTransactionFlow).id + // DOCEND 5 + + return waitForLedgerCommit(txId) + } + // DOCEND 4 + + // Following comment moved here so that it doesn't appear in the docsite: + // There are all sorts of funny games a malicious secondary might play with it sends maybeSTX, + // we should fix them: + // + // - This tx may attempt to send some assets we aren't intending to sell to the secondary, if + // we're reusing keys! So don't reuse keys! + // - This tx may include output states that impose odd conditions on the movement of the cash, + // once we implement state pairing. + // + // but the goal of this code is not to be fully secure (yet), but rather, just to find good ways to + // express flow state machines on top of the messaging layer. + } + + open class Buyer(private val sellerSession: FlowSession, + private val notary: Party, + private val acceptablePrice: Amount, + private val typeToBuy: Class, + private val anonymous: Boolean) : FlowLogic() { + constructor(otherSideSession: FlowSession, notary: Party, acceptablePrice: Amount, typeToBuy: Class) : + this(otherSideSession, notary, acceptablePrice, typeToBuy, true) + // DOCSTART 2 + object RECEIVING : ProgressTracker.Step("Waiting for seller trading info") + + object VERIFYING : ProgressTracker.Step("Verifying seller assets") + object SIGNING : ProgressTracker.Step("Generating and signing transaction proposal") + object COLLECTING_SIGNATURES : ProgressTracker.Step("Collecting signatures from other parties") { + override fun childProgressTracker() = CollectSignaturesFlow.tracker() + } + + object RECORDING : ProgressTracker.Step("Recording completed transaction") { + // TODO: Currently triggers a race condition on Team City. See https://github.com/corda/corda/issues/733. + // override fun childProgressTracker() = FinalityFlow.tracker() + } + + override val progressTracker = ProgressTracker(RECEIVING, VERIFYING, SIGNING, COLLECTING_SIGNATURES, RECORDING) + // DOCEND 2 + + // DOCSTART 1 + @Suspendable + override fun call(): SignedTransaction { + // Wait for a trade request to come in from the other party. + progressTracker.currentStep = RECEIVING + val (assetForSale, tradeRequest) = receiveAndValidateTradeRequest() + + // Create the identity we'll be paying to, and send the counterparty proof we own the identity + val buyerAnonymousIdentity = if (anonymous) + serviceHub.keyManagementService.freshKeyAndCert(ourIdentityAndCert, false) + else + ourIdentityAndCert + // Put together a proposed transaction that performs the trade, and sign it. + progressTracker.currentStep = SIGNING + val (ptx, cashSigningPubKeys) = assembleSharedTX(assetForSale, tradeRequest, buyerAnonymousIdentity) + + // Now sign the transaction with whatever keys we need to move the cash. + val partSignedTx = serviceHub.signInitialTransaction(ptx, cashSigningPubKeys) + + // Sync up confidential identities in the transaction with our counterparty + subFlow(IdentitySyncFlow.Send(sellerSession, ptx.toWireTransaction(serviceHub))) + + // Send the signed transaction to the seller, who must then sign it themselves and commit + // it to the ledger by sending it to the notary. + progressTracker.currentStep = COLLECTING_SIGNATURES + val sellerSignature = subFlow(CollectSignatureFlow(partSignedTx, sellerSession, sellerSession.counterparty.owningKey)) + val twiceSignedTx = partSignedTx + sellerSignature + + // Notarise and record the transaction. + progressTracker.currentStep = RECORDING + return subFlow(FinalityFlow(twiceSignedTx)) + } + + @Suspendable + private fun receiveAndValidateTradeRequest(): Pair, SellerTradeInfo> { + val assetForSale = subFlow(ReceiveStateAndRefFlow(sellerSession)).single() + return assetForSale to sellerSession.receive().unwrap { + progressTracker.currentStep = VERIFYING + // What is the seller trying to sell us? + val asset = assetForSale.state.data + val assetTypeName = asset.javaClass.name + + // The asset must either be owned by the well known identity of the counterparty, or we must be able to + // prove the owner is a confidential identity of the counterparty. + val assetForSaleIdentity = serviceHub.identityService.wellKnownPartyFromAnonymous(asset.owner) + require(assetForSaleIdentity == sellerSession.counterparty) + + // Register the identity we're about to send payment to. This shouldn't be the same as the asset owner + // identity, so that anonymity is enforced. + val wellKnownPayToIdentity = serviceHub.identityService.verifyAndRegisterIdentity(it.payToIdentity) + require(wellKnownPayToIdentity?.party == sellerSession.counterparty) { "Well known identity to pay to must match counterparty identity" } + + if (it.price > acceptablePrice) + throw UnacceptablePriceException(it.price) + if (!typeToBuy.isInstance(asset)) + throw AssetMismatchException(typeToBuy.name, assetTypeName) + + it + } + } + + @Suspendable + private fun assembleSharedTX(assetForSale: StateAndRef, tradeRequest: SellerTradeInfo, buyerAnonymousIdentity: PartyAndCertificate): SharedTx { + val ptx = TransactionBuilder(notary) + + // Add input and output states for the movement of cash, by using the Cash contract to generate the states + val (tx, cashSigningPubKeys) = PtCash.generateSpend(serviceHub, ptx, tradeRequest.price, ourIdentityAndCert, tradeRequest.payToIdentity.party) + + // Add inputs/outputs/a command for the movement of the asset. + tx.addInputState(assetForSale) + + val (command, state) = assetForSale.state.data.withNewOwner(buyerAnonymousIdentity.party) + tx.addOutputState(state, assetForSale.state.contract, assetForSale.state.notary) + tx.addCommand(command, assetForSale.state.data.owner.owningKey) + + // We set the transaction's time-window: it may be that none of the contracts need this! + // But it can't hurt to have one. + val currentTime = serviceHub.clock.instant() + tx.setTimeWindow(currentTime, 30.seconds) + + return SharedTx(tx, cashSigningPubKeys) + } + // DOCEND 1 + + data class SharedTx(val tx: TransactionBuilder, val cashSigningPubKeys: List) + } +} diff --git a/perftestflows/src/test/kotlin/net/corda/ptflows/contracts/PtCommercialPaperTests.kt b/perftestflows/src/test/kotlin/net/corda/ptflows/contracts/PtCommercialPaperTests.kt index d6b11c698b..c1104548f5 100644 --- a/perftestflows/src/test/kotlin/net/corda/ptflows/contracts/PtCommercialPaperTests.kt +++ b/perftestflows/src/test/kotlin/net/corda/ptflows/contracts/PtCommercialPaperTests.kt @@ -9,8 +9,8 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.days import net.corda.core.utilities.seconds -import net.corda.finance.DOLLARS -import net.corda.finance.`issued by` +import net.corda.ptflows.DOLLARS +import net.corda.ptflows.`issued by` import net.corda.ptflows.contracts.asset.* import net.corda.testing.* import net.corda.ptflows.contracts.asset.fillWithSomeTestCash diff --git a/perftestflows/src/test/kotlin/net/corda/ptflows/contracts/flows/TwoPartyTradeFlowTest.kt b/perftestflows/src/test/kotlin/net/corda/ptflows/contracts/flows/TwoPartyTradeFlowTest.kt new file mode 100644 index 0000000000..81f7299dfe --- /dev/null +++ b/perftestflows/src/test/kotlin/net/corda/ptflows/contracts/flows/TwoPartyTradeFlowTest.kt @@ -0,0 +1,779 @@ +package net.corda.ptflows.contracts.flows + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.concurrent.CordaFuture +import net.corda.core.contracts.* +import net.corda.core.crypto.* +import net.corda.core.flows.* +import net.corda.core.identity.AbstractParty +import net.corda.core.identity.AnonymousParty +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.concurrent.map +import net.corda.core.internal.rootCause +import net.corda.core.messaging.DataFeed +import net.corda.core.messaging.SingleMessageRecipient +import net.corda.core.messaging.StateMachineTransactionMapping +import net.corda.core.node.services.Vault +import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.toFuture +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.transactions.WireTransaction +import net.corda.core.utilities.days +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.toNonEmptySet +import net.corda.core.utilities.unwrap +import net.corda.ptflows.DOLLARS +import net.corda.ptflows.`issued by` +import net.corda.ptflows.contracts.PtCommercialPaper +import net.corda.ptflows.contracts.asset.CASH +import net.corda.ptflows.contracts.asset.PtCash +import net.corda.ptflows.contracts.asset.`issued by` +import net.corda.ptflows.contracts.asset.`owned by` +import net.corda.ptflows.flows.TwoPartyTradeFlow.Buyer +import net.corda.ptflows.flows.TwoPartyTradeFlow.Seller +import net.corda.node.internal.StartedNode +import net.corda.node.services.api.WritableTransactionStorage +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.persistence.DBTransactionStorage +import net.corda.node.utilities.CordaPersistence +import net.corda.nodeapi.internal.ServiceInfo +import net.corda.testing.* +import net.corda.ptflows.contracts.asset.fillWithSomeTestCash +import net.corda.testing.node.InMemoryMessagingNetwork +import net.corda.testing.node.MockNetwork +import net.corda.testing.node.pumpReceive +import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Before +import org.junit.Test +import rx.Observable +import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream +import java.math.BigInteger +import java.security.KeyPair +import java.util.* +import java.util.jar.JarOutputStream +import java.util.zip.ZipEntry +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue + +/** + * In this example, Alice wishes to sell her commercial paper to Bob in return for $1,000,000 and they wish to do + * it on the ledger atomically. Therefore they must work together to build a transaction. + * + * We assume that Alice and Bob already found each other via some market, and have agreed the details already. + */ +class TwoPartyTradeFlowTests { + private lateinit var mockNet: MockNetwork + + @Before + fun before() { + setCordappPackages("net.corda.ptflows.contracts") + LogHelper.setLevel("platform.trade", "core.contract.TransactionGroup", "recordingmap") + } + + @After + fun after() { + mockNet.stopNodes() + LogHelper.reset("platform.trade", "core.contract.TransactionGroup", "recordingmap") + unsetCordappPackages() + } + + @Test + fun `trade cash for commercial paper`() { + // We run this in parallel threads to help catch any race conditions that may exist. The other tests + // we run in the unit test thread exclusively to speed things up, ensure deterministic results and + // allow interruption half way through. + mockNet = MockNetwork(false, true) + + ledger(initialiseSerialization = false) { + val basketOfNodes = mockNet.createSomeNodes(3) + val notaryNode = basketOfNodes.notaryNode + val aliceNode = basketOfNodes.partyNodes[0] + val bobNode = basketOfNodes.partyNodes[1] + val bankNode = basketOfNodes.partyNodes[2] + val cashIssuer = bankNode.info.chooseIdentity().ref(1) + val cpIssuer = bankNode.info.chooseIdentity().ref(1, 2, 3) + val notary = aliceNode.services.getDefaultNotary() + + + aliceNode.internals.disableDBCloseOnStop() + bobNode.internals.disableDBCloseOnStop() + + bobNode.database.transaction { + bobNode.services.fillWithSomeTestCash(2000.DOLLARS, bankNode.services, outputNotary = notary, + issuedBy = cashIssuer) + } + + val alicesFakePaper = aliceNode.database.transaction { + fillUpForSeller(false, cpIssuer, aliceNode.info.chooseIdentity(), + 1200.DOLLARS `issued by` bankNode.info.chooseIdentity().ref(0), null, notary).second + } + + insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, bankNode) + + val (bobStateMachine, aliceResult) = runBuyerAndSeller(notary, aliceNode, bobNode, + "alice's paper".outputStateAndRef()) + + // TODO: Verify that the result was inserted into the transaction database. + // assertEquals(bobResult.get(), aliceNode.storage.validatedTransactions[aliceResult.get().id]) + assertEquals(aliceResult.getOrThrow(), bobStateMachine.getOrThrow().resultFuture.getOrThrow()) + + aliceNode.dispose() + bobNode.dispose() + +// aliceNode.database.transaction { +// assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty() +// } + aliceNode.internals.manuallyCloseDB() +// bobNode.database.transaction { +// assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty() +// } + bobNode.internals.manuallyCloseDB() + } + } + + @Test(expected = InsufficientBalanceException::class) + fun `trade cash for commercial paper fails using soft locking`() { + mockNet = MockNetwork(false, true) + + ledger(initialiseSerialization = false) { + val notaryNode = mockNet.createNotaryNode(null, DUMMY_NOTARY.name) + val aliceNode = mockNet.createPartyNode(notaryNode.network.myAddress, ALICE.name) + val bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name) + val bankNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOC.name) + val issuer = bankNode.info.chooseIdentity().ref(1) + val notary = aliceNode.services.getDefaultNotary() + + aliceNode.internals.disableDBCloseOnStop() + bobNode.internals.disableDBCloseOnStop() + + val cashStates = bobNode.database.transaction { + bobNode.services.fillWithSomeTestCash(2000.DOLLARS, bankNode.services, notary, 3, 3, + issuedBy = issuer) + } + + val alicesFakePaper = aliceNode.database.transaction { + fillUpForSeller(false, issuer, aliceNode.info.chooseIdentity(), + 1200.DOLLARS `issued by` bankNode.info.chooseIdentity().ref(0), null, notary).second + } + + insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, bankNode) + + val cashLockId = UUID.randomUUID() + bobNode.database.transaction { + // lock the cash states with an arbitrary lockId (to prevent the Buyer flow from claiming the states) + val refs = cashStates.states.map { it.ref } + if (refs.isNotEmpty()) { + bobNode.services.vaultService.softLockReserve(cashLockId, refs.toNonEmptySet()) + } + } + + val (bobStateMachine, aliceResult) = runBuyerAndSeller(notary, aliceNode, bobNode, + "alice's paper".outputStateAndRef()) + + assertEquals(aliceResult.getOrThrow(), bobStateMachine.getOrThrow().resultFuture.getOrThrow()) + + aliceNode.dispose() + bobNode.dispose() + +// aliceNode.database.transaction { +// assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty() +// } + aliceNode.internals.manuallyCloseDB() +// bobNode.database.transaction { +// assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty() +// } + bobNode.internals.manuallyCloseDB() + } + } + + @Test + fun `shutdown and restore`() { + mockNet = MockNetwork(false) + ledger(initialiseSerialization = false) { + val notaryNode = mockNet.createNotaryNode(null, DUMMY_NOTARY.name) + val aliceNode = mockNet.createPartyNode(notaryNode.network.myAddress, ALICE.name) + var bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name) + val bankNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOC.name) + val issuer = bankNode.info.chooseIdentity().ref(1, 2, 3) + + // Let the nodes know about each other - normally the network map would handle this + mockNet.registerIdentities() + + aliceNode.database.transaction { + aliceNode.services.identityService.verifyAndRegisterIdentity(bobNode.info.chooseIdentityAndCert()) + } + bobNode.database.transaction { + bobNode.services.identityService.verifyAndRegisterIdentity(aliceNode.info.chooseIdentityAndCert()) + } + aliceNode.internals.disableDBCloseOnStop() + bobNode.internals.disableDBCloseOnStop() + + val bobAddr = bobNode.network.myAddress as InMemoryMessagingNetwork.PeerHandle + val networkMapAddress = notaryNode.network.myAddress + + mockNet.runNetwork() // Clear network map registration messages + val notary = aliceNode.services.getDefaultNotary() + + bobNode.database.transaction { + bobNode.services.fillWithSomeTestCash(2000.DOLLARS, bankNode.services, outputNotary = notary, + issuedBy = issuer) + } + val alicesFakePaper = aliceNode.database.transaction { + fillUpForSeller(false, issuer, aliceNode.info.chooseIdentity(), + 1200.DOLLARS `issued by` bankNode.info.chooseIdentity().ref(0), null, notary).second + } + insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, bankNode) + val aliceFuture = runBuyerAndSeller(notary, aliceNode, bobNode, "alice's paper".outputStateAndRef()).sellerResult + + // Everything is on this thread so we can now step through the flow one step at a time. + // Seller Alice already sent a message to Buyer Bob. Pump once: + bobNode.pumpReceive() + + // Bob sends a couple of queries for the dependencies back to Alice. Alice reponds. + aliceNode.pumpReceive() + bobNode.pumpReceive() + aliceNode.pumpReceive() + bobNode.pumpReceive() + aliceNode.pumpReceive() + bobNode.pumpReceive() + +// // OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature. +// bobNode.database.transaction { +// assertThat(bobNode.checkpointStorage.checkpoints()).hasSize(1) +// } + + val storage = bobNode.services.validatedTransactions + val bobTransactionsBeforeCrash = bobNode.database.transaction { + (storage as DBTransactionStorage).transactions + } + assertThat(bobTransactionsBeforeCrash).isNotEmpty + + // .. and let's imagine that Bob's computer has a power cut. He now has nothing now beyond what was on disk. + bobNode.dispose() + + // Alice doesn't know that and carries on: she wants to know about the cash transactions he's trying to use. + // She will wait around until Bob comes back. + assertThat(aliceNode.pumpReceive()).isNotNull() + + // FIXME: Knowledge of confidential identities is lost on node shutdown, so Bob's node now refuses to sign the + // transaction because it has no idea who the parties are. + + // ... bring the node back up ... the act of constructing the SMM will re-register the message handlers + // that Bob was waiting on before the reboot occurred. + bobNode = mockNet.createNode(networkMapAddress, bobAddr.id, object : MockNetwork.Factory { + override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?, + advertisedServices: Set, id: Int, overrideServices: Map?, + entropyRoot: BigInteger): MockNetwork.MockNode { + return MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, bobAddr.id, overrideServices, entropyRoot) + } + }, BOB.name) + + // Find the future representing the result of this state machine again. + val bobFuture = bobNode.smm.findStateMachines(BuyerAcceptor::class.java).single().second + + // And off we go again. + mockNet.runNetwork() + + // Bob is now finished and has the same transaction as Alice. + assertThat(bobFuture.getOrThrow()).isEqualTo(aliceFuture.getOrThrow()) + + assertThat(bobNode.smm.findStateMachines(Buyer::class.java)).isEmpty() +// bobNode.database.transaction { +// assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty() +// } +// aliceNode.database.transaction { +// assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty() +// } + + bobNode.database.transaction { + val restoredBobTransactions = bobTransactionsBeforeCrash.filter { + bobNode.services.validatedTransactions.getTransaction(it.id) != null + } + assertThat(restoredBobTransactions).containsAll(bobTransactionsBeforeCrash) + } + + aliceNode.internals.manuallyCloseDB() + bobNode.internals.manuallyCloseDB() + } + } + + // Creates a mock node with an overridden storage service that uses a RecordingMap, that lets us test the order + // of gets and puts. + private fun makeNodeWithTracking( + networkMapAddress: SingleMessageRecipient?, + name: CordaX500Name): StartedNode { + // Create a node in the mock network ... + return mockNet.createNode(networkMapAddress, nodeFactory = object : MockNetwork.Factory { + override fun create(config: NodeConfiguration, + network: MockNetwork, + networkMapAddr: SingleMessageRecipient?, + advertisedServices: Set, id: Int, + overrideServices: Map?, + entropyRoot: BigInteger): MockNetwork.MockNode { + return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) { + // That constructs a recording tx storage + override fun makeTransactionStorage(): WritableTransactionStorage { + return RecordingTransactionStorage(database, super.makeTransactionStorage()) + } + } + } + }, legalName = name) + } + + @Test + fun `check dependencies of sale asset are resolved`() { + mockNet = MockNetwork(false) + + val notaryNode = mockNet.createNotaryNode(null, DUMMY_NOTARY.name) + val aliceNode = makeNodeWithTracking(notaryNode.network.myAddress, ALICE.name) + val bobNode = makeNodeWithTracking(notaryNode.network.myAddress, BOB.name) + val bankNode = makeNodeWithTracking(notaryNode.network.myAddress, BOC.name) + val issuer = bankNode.info.chooseIdentity().ref(1, 2, 3) + mockNet.runNetwork() + notaryNode.internals.ensureRegistered() + val notary = aliceNode.services.getDefaultNotary() + + mockNet.registerIdentities() + + ledger(aliceNode.services, initialiseSerialization = false) { + + // Insert a prospectus type attachment into the commercial paper transaction. + val stream = ByteArrayOutputStream() + JarOutputStream(stream).use { + it.putNextEntry(ZipEntry("Prospectus.txt")) + it.write("Our commercial paper is top notch stuff".toByteArray()) + it.closeEntry() + } + val attachmentID = aliceNode.database.transaction { + attachment(ByteArrayInputStream(stream.toByteArray())) + } + + val bobsFakeCash = bobNode.database.transaction { + fillUpForBuyer(false, issuer, AnonymousParty(bobNode.info.chooseIdentity().owningKey), notary) + }.second + val bobsSignedTxns = insertFakeTransactions(bobsFakeCash, bobNode, notaryNode, bankNode) + val alicesFakePaper = aliceNode.database.transaction { + fillUpForSeller(false, issuer, aliceNode.info.chooseIdentity(), + 1200.DOLLARS `issued by` bankNode.info.chooseIdentity().ref(0), attachmentID, notary).second + } + val alicesSignedTxns = insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, bankNode) + + mockNet.runNetwork() // Clear network map registration messages + + runBuyerAndSeller(notary, aliceNode, bobNode, "alice's paper".outputStateAndRef()) + + mockNet.runNetwork() + + run { + val records = (bobNode.services.validatedTransactions as RecordingTransactionStorage).records + // Check Bobs's database accesses as Bob's cash transactions are downloaded by Alice. + records.expectEvents(isStrict = false) { + sequence( + // Buyer Bob is told about Alice's commercial paper, but doesn't know it .. + expect(TxRecord.Get(alicesFakePaper[0].id)), + // He asks and gets the tx, validates it, sees it's a self issue with no dependencies, stores. + expect(TxRecord.Add(alicesSignedTxns.values.first())), + // Alice gets Bob's proposed transaction and doesn't know his two cash states. She asks, Bob answers. + expect(TxRecord.Get(bobsFakeCash[1].id)), + expect(TxRecord.Get(bobsFakeCash[2].id)), + // Alice notices that Bob's cash txns depend on a third tx she also doesn't know. She asks, Bob answers. + expect(TxRecord.Get(bobsFakeCash[0].id)) + ) + } + + // Bob has downloaded the attachment. + bobNode.database.transaction { + bobNode.services.attachments.openAttachment(attachmentID)!!.openAsJAR().use { + it.nextJarEntry + val contents = it.reader().readText() + assertTrue(contents.contains("Our commercial paper is top notch stuff")) + } + } + } + + // And from Alice's perspective ... + run { + val records = (aliceNode.services.validatedTransactions as RecordingTransactionStorage).records + records.expectEvents(isStrict = false) { + sequence( + // Seller Alice sends her seller info to Bob, who wants to check the asset for sale. + // He requests, Alice looks up in her DB to send the tx to Bob + expect(TxRecord.Get(alicesFakePaper[0].id)), + // Seller Alice gets a proposed tx which depends on Bob's two cash txns and her own tx. + expect(TxRecord.Get(bobsFakeCash[1].id)), + expect(TxRecord.Get(bobsFakeCash[2].id)), + expect(TxRecord.Get(alicesFakePaper[0].id)), + // Alice notices that Bob's cash txns depend on a third tx she also doesn't know. + expect(TxRecord.Get(bobsFakeCash[0].id)), + // Bob answers with the transactions that are now all verifiable, as Alice bottomed out. + // Bob's transactions are valid, so she commits to the database + expect(TxRecord.Add(bobsSignedTxns[bobsFakeCash[0].id]!!)), + expect(TxRecord.Get(bobsFakeCash[0].id)), // Verify + expect(TxRecord.Add(bobsSignedTxns[bobsFakeCash[2].id]!!)), + expect(TxRecord.Get(bobsFakeCash[0].id)), // Verify + expect(TxRecord.Add(bobsSignedTxns[bobsFakeCash[1].id]!!)), + // Now she verifies the transaction is contract-valid (not signature valid) which means + // looking up the states again. + expect(TxRecord.Get(bobsFakeCash[1].id)), + expect(TxRecord.Get(bobsFakeCash[2].id)), + expect(TxRecord.Get(alicesFakePaper[0].id)), + // Alice needs to look up the input states to find out which Notary they point to + expect(TxRecord.Get(bobsFakeCash[1].id)), + expect(TxRecord.Get(bobsFakeCash[2].id)), + expect(TxRecord.Get(alicesFakePaper[0].id)) + ) + } + } + } + } + + @Test + fun `track works`() { + mockNet = MockNetwork(false) + + val notaryNode = mockNet.createNotaryNode(null, DUMMY_NOTARY.name) + val aliceNode = makeNodeWithTracking(notaryNode.network.myAddress, ALICE.name) + val bobNode = makeNodeWithTracking(notaryNode.network.myAddress, BOB.name) + val bankNode = makeNodeWithTracking(notaryNode.network.myAddress, BOC.name) + val issuer = bankNode.info.chooseIdentity().ref(1, 2, 3) + + mockNet.runNetwork() + notaryNode.internals.ensureRegistered() + val notary = aliceNode.services.getDefaultNotary() + + mockNet.registerIdentities() + + ledger(aliceNode.services, initialiseSerialization = false) { + // Insert a prospectus type attachment into the commercial paper transaction. + val stream = ByteArrayOutputStream() + JarOutputStream(stream).use { + it.putNextEntry(ZipEntry("Prospectus.txt")) + it.write("Our commercial paper is top notch stuff".toByteArray()) + it.closeEntry() + } + val attachmentID = aliceNode.database.transaction { + attachment(ByteArrayInputStream(stream.toByteArray())) + } + + val bobsKey = bobNode.services.keyManagementService.keys.single() + val bobsFakeCash = bobNode.database.transaction { + fillUpForBuyer(false, issuer, AnonymousParty(bobsKey), notary) + }.second + insertFakeTransactions(bobsFakeCash, bobNode, notaryNode, bankNode) + + val alicesFakePaper = aliceNode.database.transaction { + fillUpForSeller(false, issuer, aliceNode.info.chooseIdentity(), + 1200.DOLLARS `issued by` bankNode.info.chooseIdentity().ref(0), attachmentID, notary).second + } + + insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, bankNode) + + mockNet.runNetwork() // Clear network map registration messages + + val aliceTxStream = aliceNode.services.validatedTransactions.track().updates + val aliceTxMappings = with(aliceNode) { + database.transaction { services.stateMachineRecordedTransactionMapping.track().updates } + } + val aliceSmId = runBuyerAndSeller(notary, aliceNode, bobNode, + "alice's paper".outputStateAndRef()).sellerId + + mockNet.runNetwork() + + // We need to declare this here, if we do it inside [expectEvents] kotlin throws an internal compiler error(!). + val aliceTxExpectations = sequence( + expect { tx: SignedTransaction -> + require(tx.id == bobsFakeCash[0].id) + }, + expect { tx: SignedTransaction -> + require(tx.id == bobsFakeCash[2].id) + }, + expect { tx: SignedTransaction -> + require(tx.id == bobsFakeCash[1].id) + } + ) + aliceTxStream.expectEvents { aliceTxExpectations } + val aliceMappingExpectations = sequence( + expect { (stateMachineRunId, transactionId) -> + require(stateMachineRunId == aliceSmId) + require(transactionId == bobsFakeCash[0].id) + }, + expect { (stateMachineRunId, transactionId) -> + require(stateMachineRunId == aliceSmId) + require(transactionId == bobsFakeCash[2].id) + }, + expect { (stateMachineRunId, transactionId) -> + require(stateMachineRunId == aliceSmId) + require(transactionId == bobsFakeCash[1].id) + } + ) + aliceTxMappings.expectEvents { aliceMappingExpectations } + } + } + + @Test + fun `dependency with error on buyer side`() { + mockNet = MockNetwork(false) + ledger(initialiseSerialization = false) { + runWithError(true, false, "at least one cash input") + } + } + + @Test + fun `dependency with error on seller side`() { + mockNet = MockNetwork(false) + ledger(initialiseSerialization = false) { + runWithError(false, true, "Issuances have a time-window") + } + } + + private data class RunResult( + // The buyer is not created immediately, only when the seller starts running + val buyer: CordaFuture>, + val sellerResult: CordaFuture, + val sellerId: StateMachineRunId + ) + + private fun runBuyerAndSeller(notary: Party, + sellerNode: StartedNode, + buyerNode: StartedNode, + assetToSell: StateAndRef, + anonymous: Boolean = true): RunResult { + val buyerFlows: Observable> = buyerNode.internals.registerInitiatedFlow(BuyerAcceptor::class.java) + val firstBuyerFiber = buyerFlows.toFuture().map { it.stateMachine } + val seller = SellerInitiator(buyerNode.info.chooseIdentity(), notary, assetToSell, 1000.DOLLARS, anonymous) + val sellerResult = sellerNode.services.startFlow(seller).resultFuture + return RunResult(firstBuyerFiber, sellerResult, seller.stateMachine.id) + } + + @InitiatingFlow + class SellerInitiator(private val buyer: Party, + private val notary: Party, + private val assetToSell: StateAndRef, + private val price: Amount, + private val anonymous: Boolean) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + val myPartyAndCert = if (anonymous) { + serviceHub.keyManagementService.freshKeyAndCert(ourIdentityAndCert, false) + } else { + ourIdentityAndCert + } + val buyerSession = initiateFlow(buyer) + buyerSession.send(TestTx(notary, price, anonymous)) + return subFlow(Seller( + buyerSession, + assetToSell, + price, + myPartyAndCert)) + } + } + + @InitiatedBy(SellerInitiator::class) + class BuyerAcceptor(private val sellerSession: FlowSession) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + val (notary, price, anonymous) = sellerSession.receive().unwrap { + require(serviceHub.networkMapCache.isNotary(it.notaryIdentity)) { "${it.notaryIdentity} is not a notary" } + it + } + return subFlow(Buyer(sellerSession, notary, price, PtCommercialPaper.State::class.java, anonymous)) + } + } + + @CordaSerializable + data class TestTx(val notaryIdentity: Party, val price: Amount, val anonymous: Boolean) + + private fun LedgerDSL.runWithError( + bobError: Boolean, + aliceError: Boolean, + expectedMessageSubstring: String + ) { + val notaryNode = mockNet.createNotaryNode(null, DUMMY_NOTARY.name) + val aliceNode = mockNet.createPartyNode(notaryNode.network.myAddress, ALICE.name) + val bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name) + val bankNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOC.name) + val issuer = bankNode.info.chooseIdentity().ref(1, 2, 3) + + mockNet.runNetwork() + notaryNode.internals.ensureRegistered() + val notary = aliceNode.services.getDefaultNotary() + + // Let the nodes know about each other - normally the network map would handle this + mockNet.registerIdentities() + + val bobsBadCash = bobNode.database.transaction { + fillUpForBuyer(bobError, issuer, bobNode.info.chooseIdentity(), + notary).second + } + val alicesFakePaper = aliceNode.database.transaction { + fillUpForSeller(aliceError, issuer, aliceNode.info.chooseIdentity(), + 1200.DOLLARS `issued by` issuer, null, notary).second + } + + insertFakeTransactions(bobsBadCash, bobNode, notaryNode, bankNode) + insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, bankNode) + + mockNet.runNetwork() // Clear network map registration messages + + val (bobStateMachine, aliceResult) = runBuyerAndSeller(notary, aliceNode, bobNode, "alice's paper".outputStateAndRef()) + + mockNet.runNetwork() + + val e = assertFailsWith { + if (bobError) + aliceResult.getOrThrow() + else + bobStateMachine.getOrThrow().resultFuture.getOrThrow() + } + val underlyingMessage = e.rootCause.message!! + if (expectedMessageSubstring !in underlyingMessage) { + assertEquals(expectedMessageSubstring, underlyingMessage) + } + } + + + private fun insertFakeTransactions( + wtxToSign: List, + node: StartedNode<*>, + notaryNode: StartedNode<*>, + vararg extraSigningNodes: StartedNode<*>): Map { + + val signed = wtxToSign.map { + val id = it.id + val sigs = mutableListOf() + val nodeKey = node.info.chooseIdentity().owningKey + sigs.add(node.services.keyManagementService.sign(SignableData(id, SignatureMetadata(1, Crypto.findSignatureScheme(nodeKey).schemeNumberID)), nodeKey)) + sigs.add(notaryNode.services.keyManagementService.sign(SignableData(id, SignatureMetadata(1, + Crypto.findSignatureScheme(notaryNode.info.legalIdentities[1].owningKey).schemeNumberID)), notaryNode.info.legalIdentities[1].owningKey)) + extraSigningNodes.forEach { currentNode -> + sigs.add(currentNode.services.keyManagementService.sign( + SignableData(id, SignatureMetadata(1, Crypto.findSignatureScheme(currentNode.info.chooseIdentity().owningKey).schemeNumberID)), + currentNode.info.chooseIdentity().owningKey) + ) + } + SignedTransaction(it, sigs) + } + return node.database.transaction { + node.services.recordTransactions(signed) + val validatedTransactions = node.services.validatedTransactions + if (validatedTransactions is RecordingTransactionStorage) { + validatedTransactions.records.clear() + } + signed.associateBy { it.id } + } + } + + private fun LedgerDSL.fillUpForBuyer( + withError: Boolean, + issuer: PartyAndReference, + owner: AbstractParty, + notary: Party): Pair, List> { + val interimOwner = issuer.party + // Bob (Buyer) has some cash he got from the Bank of Elbonia, Alice (Seller) has some commercial paper she + // wants to sell to Bob. + val eb1 = transaction(transactionBuilder = TransactionBuilder(notary = notary)) { + // Issued money to itself. + output(PtCash.PROGRAM_ID, "elbonian money 1", notary = notary) { 800.DOLLARS.CASH `issued by` issuer `owned by` interimOwner } + output(PtCash.PROGRAM_ID, "elbonian money 2", notary = notary) { 1000.DOLLARS.CASH `issued by` issuer `owned by` interimOwner } + if (!withError) { + command(issuer.party.owningKey) { PtCash.Commands.Issue() } + } else { + // Put a broken command on so at least a signature is created + command(issuer.party.owningKey) { PtCash.Commands.Move() } + } + timeWindow(TEST_TX_TIME) + if (withError) { + this.fails() + } else { + this.verifies() + } + } + + // Bob gets some cash onto the ledger from BoE + val bc1 = transaction(transactionBuilder = TransactionBuilder(notary = notary)) { + input("elbonian money 1") + output(PtCash.PROGRAM_ID, "bob cash 1", notary = notary) { 800.DOLLARS.CASH `issued by` issuer `owned by` owner } + command(interimOwner.owningKey) { PtCash.Commands.Move() } + this.verifies() + } + + val bc2 = transaction(transactionBuilder = TransactionBuilder(notary = notary)) { + input("elbonian money 2") + output(PtCash.PROGRAM_ID, "bob cash 2", notary = notary) { 300.DOLLARS.CASH `issued by` issuer `owned by` owner } + output(PtCash.PROGRAM_ID, notary = notary) { 700.DOLLARS.CASH `issued by` issuer `owned by` interimOwner } // Change output. + command(interimOwner.owningKey) { PtCash.Commands.Move() } + this.verifies() + } + + val vault = Vault(listOf("bob cash 1".outputStateAndRef(), "bob cash 2".outputStateAndRef())) + return Pair(vault, listOf(eb1, bc1, bc2)) + } + + private fun LedgerDSL.fillUpForSeller( + withError: Boolean, + issuer: PartyAndReference, + owner: AbstractParty, + amount: Amount>, + attachmentID: SecureHash?, + notary: Party): Pair, List> { + val ap = transaction(transactionBuilder = TransactionBuilder(notary = notary)) { + output(PtCommercialPaper.CP_PROGRAM_ID, "alice's paper", notary = notary) { + PtCommercialPaper.State(issuer, owner, amount, TEST_TX_TIME + 7.days) + } + command(issuer.party.owningKey) { PtCommercialPaper.Commands.Issue() } + if (!withError) + timeWindow(time = TEST_TX_TIME) + if (attachmentID != null) + attachment(attachmentID) + if (withError) { + this.fails() + } else { + this.verifies() + } + } + + val vault = Vault(listOf("alice's paper".outputStateAndRef())) + return Pair(vault, listOf(ap)) + } + + + class RecordingTransactionStorage(val database: CordaPersistence, val delegate: WritableTransactionStorage) : WritableTransactionStorage, SingletonSerializeAsToken() { + override fun track(): DataFeed, SignedTransaction> { + return database.transaction { + delegate.track() + } + } + + val records: MutableList = Collections.synchronizedList(ArrayList()) + override val updates: Observable + get() = delegate.updates + + override fun addTransaction(transaction: SignedTransaction): Boolean { + database.transaction { + records.add(TxRecord.Add(transaction)) + delegate.addTransaction(transaction) + } + return true + } + + override fun getTransaction(id: SecureHash): SignedTransaction? { + return database.transaction { + records.add(TxRecord.Get(id)) + delegate.getTransaction(id) + } + } + } + + interface TxRecord { + data class Add(val transaction: SignedTransaction) : TxRecord + data class Get(val id: SecureHash) : TxRecord + } + +}