From ad8ffca0b47fe7a2400cde2bcb441c408ad49c24 Mon Sep 17 00:00:00 2001 From: Ross Nicoll Date: Tue, 16 Aug 2016 22:53:59 +0100 Subject: [PATCH] Add CommitTransactionProtocol MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add new protocol which manages the entire process of taking a signed transaction ready for notarisation, through notarisation and onto recording it both locally and informing remote nodes. This protocol also optionally can include the ClientToServiceCommand which triggered a transaction being created, to give the remote nodes context on why a change occurred (i.e. "You are being sent £100") --- .../core/contracts/ClientToServiceCommand.kt | 49 +++++++++++++ .../core/protocols/ProtocolStateMachine.kt | 6 ++ .../protocols/BroadcastTransactionProtocol.kt | 56 +++++++++++++++ .../com/r3corda/protocols/FinalityProtocol.kt | 63 +++++++++++++++++ docs/source/consensus.rst | 23 ++++++- .../com/r3corda/node/internal/AbstractNode.kt | 2 +- .../node/internal/testing/IRSSimulation.kt | 4 +- .../node/internal/testing/TradeSimulation.kt | 7 +- .../r3corda/node/services/monitor/Events.kt | 38 ---------- .../r3corda/node/services/monitor/Messages.kt | 1 + .../services/monitor/WalletMonitorService.kt | 69 ++++++++----------- .../persistence/DataVendingService.kt | 42 ++++------- .../statemachine/ProtocolStateMachineImpl.kt | 7 +- .../statemachine/StateMachineManager.kt | 5 +- .../r3corda/node/messaging/AttachmentTests.kt | 8 +-- .../messaging/TwoPartyTradeProtocolTests.kt | 4 +- .../services/InMemoryNetworkMapServiceTest.kt | 10 +-- .../node/services/MockServiceHubInternal.kt | 2 +- .../node/services/NodeInterestRatesTest.kt | 2 +- .../node/services/NotaryChangeTests.kt | 6 +- .../node/services/NotaryServiceTests.kt | 10 +-- .../services/ValidatingNotaryServiceTests.kt | 4 +- .../services/WalletMonitorServiceTests.kt | 53 +++++++------- .../persistence/DataVendingServiceTests.kt | 12 +--- .../kotlin/com/r3corda/demos/RateFixDemo.kt | 2 +- .../kotlin/com/r3corda/demos/TraderDemo.kt | 4 +- 26 files changed, 304 insertions(+), 185 deletions(-) create mode 100644 core/src/main/kotlin/com/r3corda/core/contracts/ClientToServiceCommand.kt create mode 100644 core/src/main/kotlin/com/r3corda/protocols/BroadcastTransactionProtocol.kt create mode 100644 core/src/main/kotlin/com/r3corda/protocols/FinalityProtocol.kt diff --git a/core/src/main/kotlin/com/r3corda/core/contracts/ClientToServiceCommand.kt b/core/src/main/kotlin/com/r3corda/core/contracts/ClientToServiceCommand.kt new file mode 100644 index 0000000000..2e037f8439 --- /dev/null +++ b/core/src/main/kotlin/com/r3corda/core/contracts/ClientToServiceCommand.kt @@ -0,0 +1,49 @@ +package com.r3corda.core.contracts + +import com.r3corda.core.crypto.Party +import com.r3corda.core.serialization.OpaqueBytes +import java.security.PublicKey +import java.util.* + +/** + * A command from the monitoring client, to the node. + * + * @param id ID used to tag event(s) resulting from a command. + */ +sealed class ClientToServiceCommand(val id: UUID) { + /** + * Issue cash state objects. + * + * @param amount the amount of currency to issue on to the ledger. + * @param issueRef the reference to specify on the issuance, used to differentiate pools of cash. Convention is + * to use the single byte "0x01" as a default. + * @param recipient the party to issue the cash to. + * @param notary the notary to use for this transaction. + * @param id the ID to be provided in events resulting from this request. + */ + class IssueCash(val amount: Amount, + val issueRef: OpaqueBytes, + val recipient: Party, + val notary: Party, + id: UUID = UUID.randomUUID()) : ClientToServiceCommand(id) + + /** + * Pay cash to someone else. + * + * @param amount the amount of currency to issue on to the ledger. + * @param recipient the party to issue the cash to. + * @param id the ID to be provided in events resulting from this request. + */ + class PayCash(val amount: Amount>, val recipient: Party, + id: UUID = UUID.randomUUID()) : ClientToServiceCommand(id) + + /** + * Exit cash from the ledger. + * + * @param amount the amount of currency to exit from the ledger. + * @param issueRef the reference previously specified on the issuance. + * @param id the ID to be provided in events resulting from this request. + */ + class ExitCash(val amount: Amount, val issueRef: OpaqueBytes, + id: UUID = UUID.randomUUID()) : ClientToServiceCommand(id) +} diff --git a/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt index c48af0afae..54a972d26f 100644 --- a/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt +++ b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt @@ -1,6 +1,7 @@ package com.r3corda.core.protocols import co.paralleluniverse.fibers.Suspendable +import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.crypto.Party import com.r3corda.core.node.ServiceHub import com.r3corda.core.utilities.UntrustworthyData @@ -23,4 +24,9 @@ interface ProtocolStateMachine { val serviceHub: ServiceHub val logger: Logger + + /** Unique ID for this machine, valid only while it is in memory. */ + val machineId: Long + /** This future will complete when the call method returns. */ + val resultFuture: ListenableFuture } diff --git a/core/src/main/kotlin/com/r3corda/protocols/BroadcastTransactionProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/BroadcastTransactionProtocol.kt new file mode 100644 index 0000000000..c95fdfe780 --- /dev/null +++ b/core/src/main/kotlin/com/r3corda/protocols/BroadcastTransactionProtocol.kt @@ -0,0 +1,56 @@ +package com.r3corda.protocols + +import co.paralleluniverse.fibers.Suspendable +import com.r3corda.core.contracts.ClientToServiceCommand +import com.r3corda.core.contracts.SignedTransaction +import com.r3corda.core.crypto.Party +import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.random63BitValue +import com.r3corda.core.serialization.serialize +import java.util.* + + +/** + * Notify all involved parties about a transaction, including storing a copy. Normally this would be called via + * [FinalityProtocol]. + * + * @param notarisedTransaction transaction which has been notarised (if needed) and is ready to notify nodes about. + * @param events information on the event(s) which triggered the transaction. + * @param participants a list of participants involved in the transaction. + * @return a list of participants who were successfully notified of the transaction. + */ +// TODO: Event needs to be replaced with something that's meaningful, but won't ever contain sensitive +// information (such as internal details of an account to take payment from). Suggest +// splitting ClientToServiceCommand into public and private parts, with only the public parts +// relayed here. +class BroadcastTransactionProtocol(val notarisedTransaction: SignedTransaction, + val events: Set, + val participants: Set) : ProtocolLogic() { + companion object { + /** Topic for messages notifying a node of a new transaction */ + val TOPIC = "platform.wallet.notify_tx" + } + + override val topic: String = TOPIC + + data class NotifyTxRequestMessage( + val tx: SignedTransaction, + val events: Set, + override val replyToParty: Party, + override val sessionID: Long + ) : PartyRequestMessage + + @Suspendable + override fun call() { + // Record it locally + serviceHub.recordTransactions(notarisedTransaction) + + // TODO: Messaging layer should handle this broadcast for us (although we need to not be sending + // session ID, for that to work, as well). + participants.filter { it != serviceHub.storageService.myLegalIdentity }.forEach { participant -> + val sessionID = random63BitValue() + val msg = NotifyTxRequestMessage(notarisedTransaction, events, serviceHub.storageService.myLegalIdentity, sessionID) + send(participant, 0, msg) + } + } +} diff --git a/core/src/main/kotlin/com/r3corda/protocols/FinalityProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/FinalityProtocol.kt new file mode 100644 index 0000000000..4a8409a709 --- /dev/null +++ b/core/src/main/kotlin/com/r3corda/protocols/FinalityProtocol.kt @@ -0,0 +1,63 @@ +package com.r3corda.protocols + +import co.paralleluniverse.fibers.Suspendable +import com.google.common.util.concurrent.ListenableFuture +import com.r3corda.core.contracts.ClientToServiceCommand +import com.r3corda.core.contracts.SignedTransaction +import com.r3corda.core.contracts.TransactionBuilder +import com.r3corda.core.contracts.WireTransaction +import com.r3corda.core.crypto.Party +import com.r3corda.core.node.ServiceHub +import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.random63BitValue +import com.r3corda.core.serialization.serialize +import com.r3corda.core.utilities.ProgressTracker +import java.util.* + + +/** + * Finalise a transaction by notarising it, then recording it locally, and then sending it to all involved parties. + * + * @param transaction to commit. + * @param events information on the event(s) which triggered the transaction. + * @param participants a list of participants involved in the transaction. + * @return a list of participants who were successfully notified of the transaction. + */ +// TODO: Event needs to be replaced with something that's meaningful, but won't ever contain sensitive +// information (such as internal details of an account to take payment from). Suggest +// splitting ClientToServiceCommand into public and private parts, with only the public parts +// relayed here. +class FinalityProtocol(val transaction: SignedTransaction, + val events: Set, + val participants: Set, + override val progressTracker: ProgressTracker = tracker()): ProtocolLogic() { + companion object { + object NOTARISING : ProgressTracker.Step("Requesting signature by notary service") + object BROADCASTING : ProgressTracker.Step("Broadcasting transaction to participants") + + fun tracker() = ProgressTracker(NOTARISING, BROADCASTING) + } + + override val topic: String + get() = throw UnsupportedOperationException() + + @Suspendable + override fun call() { + progressTracker.currentStep = NOTARISING + // Notarise the transaction if needed + val notarisedTransaction = if (needsNotarySignature(transaction)) { + val notarySig = subProtocol(NotaryProtocol.Client(transaction)) + transaction.withAdditionalSignature(notarySig) + } else { + transaction + } + + // Let everyone else know about the transaction + progressTracker.currentStep = BROADCASTING + subProtocol(BroadcastTransactionProtocol(notarisedTransaction, events, participants)) + } + + private fun needsNotarySignature(transaction: SignedTransaction) = expectsNotarySignature(transaction.tx) && hasNoNotarySignature(transaction) + private fun expectsNotarySignature(transaction: WireTransaction) = transaction.notary != null && transaction.notary.owningKey in transaction.signers + private fun hasNoNotarySignature(transaction: SignedTransaction) = transaction.tx.notary?.owningKey !in transaction.sigs.map { it.by } +} diff --git a/docs/source/consensus.rst b/docs/source/consensus.rst index 85e789acf9..e03c6f99c9 100644 --- a/docs/source/consensus.rst +++ b/docs/source/consensus.rst @@ -98,9 +98,26 @@ To run one of these services the node has to simply specify either ``SimpleNotar Obtaining a signature --------------------- -To obtain a signature from a notary use ``NotaryProtocol.Client``, passing in a ``WireTransaction``. -The protocol will work out which notary needs to be called based on the input states and the timestamp command. -For example, the following snippet can be used when writing a custom protocol: +Once a transaction is built and ready to be finalised, normally you would call ``FinalityProtocol`` passing in a +``SignedTransaction`` (including signatures from the participants) and a list of participants to notify. This requests a +notary signature if needed, and then sends a copy of the notarised transaction to all participants for them to store. +``FinalityProtocol`` delegates to ``NotaryProtocol.Client`` followed by ``BroadcastTransactionProtocol`` to do the +actual work of notarising and broadcasting the transaction. For example: + +.. sourcecode:: kotlin + + fun finaliseTransaction(serviceHub: ServiceHubInternal, ptx: TransactionBuilder, participants: Set) + : ListenableFuture { + // We conclusively cannot have all the signatures, as the notary has not signed yet + val tx = ptx.toSignedTransaction(checkSufficientSignatures = false) + // The empty set would be the trigger events, which are not used here + val protocol = FinalityProtocol(tx, emptySet(), participants) + return serviceHub.startProtocol("protocol.finalisation", protocol) + } + +To manually obtain a signature from a notary you can call ``NotaryProtocol.Client`` directly. The protocol will work out +which notary needs to be called based on the input states and the timestamp command. For example, the following snippet +can be used when writing a custom protocol: .. sourcecode:: kotlin diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 89a899e264..3060c3c9ff 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -106,7 +106,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, override val protocolLogicRefFactory: ProtocolLogicRefFactory get() = protocolLogicFactory override fun startProtocol(loggerName: String, logic: ProtocolLogic): ListenableFuture { - return smm.add(loggerName, logic) + return smm.add(loggerName, logic).resultFuture } override fun recordTransactions(txs: Iterable) = diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/IRSSimulation.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/IRSSimulation.kt index 9211f198c7..c15e83bff3 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/IRSSimulation.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/IRSSimulation.kt @@ -130,9 +130,9 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten showProgressFor(listOf(node1, node2)) showConsensusFor(listOf(node1, node2, regulators[0])) - val instigatorFuture: ListenableFuture = node1.smm.add("instigator", instigator) + val instigatorFuture: ListenableFuture = node1.services.startProtocol("instigator", instigator) - return Futures.transformAsync(Futures.allAsList(instigatorFuture, node2.smm.add("acceptor", acceptor))) { + return Futures.transformAsync(Futures.allAsList(instigatorFuture, node2.services.startProtocol("acceptor", acceptor))) { instigatorFuture } } diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt index fe751f55de..ba175ff811 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt @@ -6,6 +6,7 @@ import com.r3corda.contracts.CommercialPaper import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER import com.r3corda.contracts.testing.fillWithSomeTestCash import com.r3corda.core.contracts.DOLLARS +import com.r3corda.core.contracts.OwnableState import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.contracts.`issued by` import com.r3corda.core.days @@ -52,7 +53,7 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo val sellerProtocol = TwoPartyTradeProtocol.Seller( buyer.info.identity, notary.info, - issuance.tx.outRef(0), + issuance.tx.outRef(0), amount, seller.storage.myLegalIdentityKey, sessionID) @@ -60,8 +61,8 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo showConsensusFor(listOf(buyer, seller, notary)) showProgressFor(listOf(buyer, seller)) - val buyerFuture = buyer.smm.add("bank.$buyerBankIndex.${TwoPartyTradeProtocol.TOPIC}.buyer", buyerProtocol) - val sellerFuture = seller.smm.add("bank.$sellerBankIndex.${TwoPartyTradeProtocol.TOPIC}.seller", sellerProtocol) + val buyerFuture = buyer.services.startProtocol("bank.$buyerBankIndex.${TwoPartyTradeProtocol.TOPIC}.buyer", buyerProtocol) + val sellerFuture = seller.services.startProtocol("bank.$sellerBankIndex.${TwoPartyTradeProtocol.TOPIC}.seller", sellerProtocol) return Futures.successfulAsList(buyerFuture, sellerFuture) } diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt index 1f9e80307e..748af3f9c4 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt @@ -47,41 +47,3 @@ sealed class TransactionBuildResult { */ class Failed(val message: String?) : TransactionBuildResult() } - -/** - * A command from the monitoring client, to the node. - * - * @param id ID used to tag event(s) resulting from a command. - */ -sealed class ClientToServiceCommand(val id: UUID) { - // TODO: Replace with a generic event for starting a protocol which then passes back required information, rather - // than using an event for every conceivable action. - /** - * Issue cash state objects. - * - * @param currency the currency to issue. - * @param issueRef the reference to specify on the issuance, used to differentiate pools of cash. Convention is - * to use the single byte "0x01" as a default. - * @param pennies the amount to issue, in the smallest unit of the currency. - * @param recipient the public key of the recipient. - * @param notary the notary to use for this transaction. - * @param id the ID to be provided in events resulting from this request. - */ - class IssueCash(val currency: Currency, - val issueRef: OpaqueBytes, - val pennies: Long, - val recipient: PublicKey, - val notary: Party, - id: UUID = UUID.randomUUID()) - : ClientToServiceCommand(id) - class PayCash(val tokenDef: Issued, val pennies: Long, val owner: PublicKey, - id: UUID = UUID.randomUUID()) - : ClientToServiceCommand(id) - - /** - * @param id the ID to be provided in events resulting from this request. - */ - class ExitCash(val currency: Currency, val issueRef: OpaqueBytes, val pennies: Long, - id: UUID = UUID.randomUUID()) - : ClientToServiceCommand(id) -} \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/Messages.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/Messages.kt index e3eae18617..798f319c8f 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/monitor/Messages.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/monitor/Messages.kt @@ -1,5 +1,6 @@ package com.r3corda.node.services.monitor +import com.r3corda.core.contracts.ClientToServiceCommand import com.r3corda.core.contracts.ContractState import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.protocols.DirectRequestMessage diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt index b06cefbc6a..8c7690f0cb 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt @@ -1,28 +1,26 @@ package com.r3corda.node.services.monitor import co.paralleluniverse.common.util.VisibleForTesting -import com.google.common.util.concurrent.ListenableFuture import com.r3corda.contracts.asset.Cash import com.r3corda.contracts.asset.InsufficientBalanceException import com.r3corda.core.contracts.* +import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.toStringShort import com.r3corda.core.messaging.Message import com.r3corda.core.messaging.MessageRecipients import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.DEFAULT_SESSION_ID -import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.Wallet import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.serialization.serialize import com.r3corda.core.utilities.loggerFor import com.r3corda.node.services.api.AbstractNodeService -import com.r3corda.node.services.persistence.DataVending import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.utilities.AddOrRemove -import org.slf4j.LoggerFactory +import com.r3corda.protocols.BroadcastTransactionProtocol +import com.r3corda.protocols.FinalityProtocol import java.security.KeyPair -import java.security.PublicKey import java.time.Instant import java.util.* import javax.annotation.concurrent.ThreadSafe @@ -154,36 +152,22 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager, monitor.recipients) } - /** - * Notifies the node associated with the [recipient] public key. Returns a future holding a Boolean of whether the - * node accepted the transaction or not. - */ - private fun notifyRecipientAboutTransaction( - recipient: PublicKey, - transaction: SignedTransaction - ): ListenableFuture { - val recipientNodeInfo = services.networkMapCache.getNodeByPublicKey(recipient) ?: throw PublicKeyLookupFailed(recipient) - return DataVending.Service.notify(net, services.storageService.myLegalIdentity, - recipientNodeInfo, transaction) - } - // TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service private fun initatePayment(req: ClientToServiceCommand.PayCash): TransactionBuildResult { val builder: TransactionBuilder = TransactionType.General.Builder(null) // TODO: Have some way of restricting this to states the caller controls try { - Cash().generateSpend(builder, Amount(req.pennies, req.tokenDef.product), req.owner, + Cash().generateSpend(builder, req.amount.withoutIssuer(), req.recipient.owningKey, // TODO: Move cash state filtering by issuer down to the contract itself - services.walletService.currentWallet.statesOfType().filter { it.state.data.amount.token == req.tokenDef }, - setOf(req.tokenDef.issuer.party)) + services.walletService.currentWallet.statesOfType().filter { it.state.data.amount.token == req.amount.token }, + setOf(req.amount.token.issuer.party)) .forEach { val key = services.keyManagementService.keys[it] ?: throw IllegalStateException("Could not find signing key for ${it.toStringShort()}") builder.signWith(KeyPair(it, key)) } - val tx = builder.toSignedTransaction() - services.walletService.notify(tx.tx) - notifyRecipientAboutTransaction(req.owner, tx) - return TransactionBuildResult.Complete(tx, "Cash payment completed") + val tx = builder.toSignedTransaction(checkSufficientSignatures = false) + val protocol = FinalityProtocol(tx, setOf(req), setOf(req.recipient)) + return TransactionBuildResult.ProtocolStarted(smm.add(BroadcastTransactionProtocol.TOPIC, protocol).machineId, tx, "Cash payment transaction generated") } catch(ex: InsufficientBalanceException) { return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance") } @@ -193,39 +177,40 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager, private fun exitCash(req: ClientToServiceCommand.ExitCash): TransactionBuildResult { val builder: TransactionBuilder = TransactionType.General.Builder(null) val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef) - Cash().generateExit(builder, Amount(req.pennies, Issued(issuer, req.currency)), + Cash().generateExit(builder, req.amount.issuedBy(issuer), services.walletService.currentWallet.statesOfType().filter { it.state.data.owner == issuer.party.owningKey }) builder.signWith(services.storageService.myLegalIdentityKey) - val tx = builder.toSignedTransaction() - services.walletService.notify(tx.tx) - // Notify the owners - val inputStatesNullable = services.walletService.statesForRefs(tx.tx.inputs) + + // Work out who the owners of the burnt states were + val inputStatesNullable = services.walletService.statesForRefs(builder.inputStates()) val inputStates = inputStatesNullable.values.filterNotNull().map { it.data } if (inputStatesNullable.size != inputStates.size) { val unresolvedStateRefs = inputStatesNullable.filter { it.value == null }.map { it.key } throw InputStateRefResolveFailed(unresolvedStateRefs) } - inputStates.filterIsInstance().map { it.owner }.toSet().forEach { - notifyRecipientAboutTransaction(it, tx) - } - return TransactionBuildResult.Complete(tx, "Cash destruction completed") + + // TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them + // count as a reason to fail? + val participants: Set = inputStates.filterIsInstance().map { services.identityService.partyFromKey(it.owner) }.filterNotNull().toSet() + + // Commit the transaction + val tx = builder.toSignedTransaction(checkSufficientSignatures = false) + val protocol = FinalityProtocol(tx, setOf(req), participants) + return TransactionBuildResult.ProtocolStarted(smm.add(BroadcastTransactionProtocol.TOPIC, protocol).machineId, tx, "Cash destruction transaction generated") } // TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service private fun issueCash(req: ClientToServiceCommand.IssueCash): TransactionBuildResult { val builder: TransactionBuilder = TransactionType.General.Builder(notary = req.notary) val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef) - Cash().generateIssue(builder, Amount(req.pennies, Issued(issuer, req.currency)), req.recipient, req.notary) + Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary) builder.signWith(services.storageService.myLegalIdentityKey) - val tx = builder.toSignedTransaction() - services.walletService.notify(tx.tx) - notifyRecipientAboutTransaction(req.recipient, tx) - return TransactionBuildResult.Complete(tx, "Cash issuance completed") + val tx = builder.toSignedTransaction(checkSufficientSignatures = true) + // Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it + val protocol = BroadcastTransactionProtocol(tx, setOf(req), setOf(req.recipient)) + return TransactionBuildResult.ProtocolStarted(smm.add(BroadcastTransactionProtocol.TOPIC, protocol).machineId, tx, "Cash issuance completed") } - class PublicKeyLookupFailed(failedPublicKey: PublicKey) : - Exception("Failed to lookup public keys $failedPublicKey") - class InputStateRefResolveFailed(stateRefs: List) : Exception("Failed to resolve input StateRefs $stateRefs") } diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt index e94b3e35c6..aad4f93234 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt @@ -47,34 +47,21 @@ object DataVending { companion object { val logger = loggerFor() - /** Topic for messages notifying a node of a new transaction */ - val NOTIFY_TX_PROTOCOL_TOPIC = "platform.wallet.notify_tx" - + /** + * Notify a node of a transaction. Normally any notarisation required would happen before this is called. + */ fun notify(net: MessagingService, myIdentity: Party, recipient: NodeInfo, - transaction: SignedTransaction): ListenableFuture { - val future = SettableFuture.create() + transaction: SignedTransaction) { val sessionID = random63BitValue() - net.runOnNextMessage(NOTIFY_TX_PROTOCOL_TOPIC, sessionID) { msg -> - // TODO: Can we improve/simplify the response from the remote node? - val data = msg.data.deserialize() - if (data.accepted) { - future.set(Unit) - } else { - future.setException(TransactionRejectedError("Transaction $transaction rejected by remote party ${recipient.identity}")) - } - } - val msg = NotifyTxRequestMessage(transaction, myIdentity, sessionID) - net.send(net.createMessage(TopicSession(NOTIFY_TX_PROTOCOL_TOPIC, 0), msg.serialize().bits), recipient.address) - return future + val msg = BroadcastTransactionProtocol.NotifyTxRequestMessage(transaction, emptySet(), myIdentity, sessionID) + net.send(net.createMessage(TopicSession(BroadcastTransactionProtocol.TOPIC, 0), msg.serialize().bits), recipient.address) } } val storage = services.storageService - data class NotifyTxRequestMessage(val tx: SignedTransaction, override val replyToParty: Party, override val sessionID: Long) : PartyRequestMessage - data class NotifyTxResponseMessage(val accepted: Boolean) class TransactionRejectedError(msg: String) : Exception(msg) init { @@ -86,29 +73,24 @@ object DataVending { { req: FetchDataProtocol.Request -> handleAttachmentRequest(req) }, { message, e -> logger.error("Failure processing data vending request.", e) } ) - addMessageHandler(NOTIFY_TX_PROTOCOL_TOPIC, - { req: NotifyTxRequestMessage -> handleTXNotification(req) }, + addMessageHandler(BroadcastTransactionProtocol.TOPIC, + { req: BroadcastTransactionProtocol.NotifyTxRequestMessage -> handleTXNotification(req) }, { message, e -> logger.error("Failure processing data vending request.", e) } ) } - private fun handleTXNotification(req: NotifyTxRequestMessage): Unit { + private fun handleTXNotification(req: BroadcastTransactionProtocol.NotifyTxRequestMessage): Unit { // TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction // includes us in any outside that list. Potentially just if it includes any outside that list at all. // TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming // cash without from unknown parties? - services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty)) + services.startProtocol("Resolving transactions", ResolveTransactionsProtocol(req.tx, req.replyToParty)) .success { services.recordTransactions(req.tx) - val resp = NotifyTxResponseMessage(true) - val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits) - net.send(msg, req.getReplyTo(services.networkMapCache)) }.failure { throwable -> - val resp = NotifyTxResponseMessage(false) - val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits) - net.send(msg, req.getReplyTo(services.networkMapCache)) + logger.warn("Received invalid transaction ${req.tx.id} from ${req.replyToParty}", throwable) } } @@ -136,4 +118,4 @@ object DataVending { } } } -} \ No newline at end of file +} diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt index 9ece842c44..8a4400b620 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt @@ -44,13 +44,18 @@ class ProtocolStateMachineImpl(val logic: ProtocolLogic, @Transient private var _resultFuture: SettableFuture? = SettableFuture.create() /** This future will complete when the call method returns. */ - val resultFuture: ListenableFuture get() { + override val resultFuture: ListenableFuture get() { return _resultFuture ?: run { val f = SettableFuture.create() _resultFuture = f return f } } + /** + * Unique ID for the deserialized instance protocol state machine. This is NOT maintained across a state machine + * being serialized and then deserialized. + */ + override val machineId: Long get() = this.id init { logic.psm = this diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt index a84f48d4a0..a44881da14 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt @@ -12,6 +12,7 @@ import com.r3corda.core.messaging.TopicSession import com.r3corda.core.messaging.runOnNextMessage import com.r3corda.core.messaging.send import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.protocols.ProtocolStateMachine import com.r3corda.core.serialization.* import com.r3corda.core.utilities.ProgressTracker import com.r3corda.core.utilities.trace @@ -215,7 +216,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService * The state machine will be persisted when it suspends, with automated restart if the StateMachineManager is * restarted with checkpointed state machines in the storage service. */ - fun add(loggerName: String, logic: ProtocolLogic): ListenableFuture { + fun add(loggerName: String, logic: ProtocolLogic): ProtocolStateMachine { val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName) // Need to add before iterating in case of immediate completion initFiber(fiber) { @@ -244,7 +245,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService if (e.cause !is ExecutionException) throw e } - return fiber.resultFuture + return fiber } private fun updateCheckpoint(psm: ProtocolStateMachineImpl<*>, diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt index cff507f1a0..4cdc8a6207 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt @@ -55,7 +55,7 @@ class AttachmentTests { // Get node one to run a protocol to fetch it and insert it. network.runNetwork() - val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)) + val f1 = n1.services.startProtocol("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)) network.runNetwork() assertEquals(0, f1.get().fromDisk.size) @@ -66,7 +66,7 @@ class AttachmentTests { // Shut down node zero and ensure node one can still resolve the attachment. n0.stop() - val response: FetchDataProtocol.Result = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)).get() + val response: FetchDataProtocol.Result = n1.services.startProtocol("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)).get() assertEquals(attachment, response.fromDisk[0]) } @@ -77,7 +77,7 @@ class AttachmentTests { // Get node one to fetch a non-existent attachment. val hash = SecureHash.randomSHA256() network.runNetwork() - val f1 = n1.smm.add("tests.fetch2", FetchAttachmentsProtocol(setOf(hash), n0.info.identity)) + val f1 = n1.services.startProtocol("tests.fetch2", FetchAttachmentsProtocol(setOf(hash), n0.info.identity)) network.runNetwork() val e = assertFailsWith { rootCauseExceptions { f1.get() } } assertEquals(hash, e.requested) @@ -110,7 +110,7 @@ class AttachmentTests { // Get n1 to fetch the attachment. Should receive corrupted bytes. network.runNetwork() - val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)) + val f1 = n1.services.startProtocol("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)) network.runNetwork() assertFailsWith { rootCauseExceptions { f1.get() } diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt index 77291e4431..1ab602bf29 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt @@ -56,14 +56,14 @@ class TwoPartyTradeProtocolTests { otherSide: Party, assetToSell: StateAndRef, price: Amount, myKeyPair: KeyPair, buyerSessionID: Long): ListenableFuture { val seller = TwoPartyTradeProtocol.Seller(otherSide, notary, assetToSell, price, myKeyPair, buyerSessionID) - return smm.add("${TwoPartyTradeProtocol.TOPIC}.seller", seller) + return smm.add("${TwoPartyTradeProtocol.TOPIC}.seller", seller).resultFuture } private fun runBuyer(smm: StateMachineManager, notaryNode: NodeInfo, otherSide: Party, acceptablePrice: Amount, typeToBuy: Class, sessionID: Long): ListenableFuture { val buyer = TwoPartyTradeProtocol.Buyer(otherSide, notaryNode.identity, acceptablePrice, typeToBuy, sessionID) - return smm.add("${TwoPartyTradeProtocol.TOPIC}.buyer", buyer) + return smm.add("${TwoPartyTradeProtocol.TOPIC}.buyer", buyer).resultFuture } @Before diff --git a/node/src/test/kotlin/com/r3corda/node/services/InMemoryNetworkMapServiceTest.kt b/node/src/test/kotlin/com/r3corda/node/services/InMemoryNetworkMapServiceTest.kt index 6b90ddba87..33a3e29fc8 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/InMemoryNetworkMapServiceTest.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/InMemoryNetworkMapServiceTest.kt @@ -114,7 +114,7 @@ class InMemoryNetworkMapServiceTest { // Confirm all nodes have registered themselves network.runNetwork() - var fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false)) + var fetchPsm = registerNode.services.startProtocol(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false)) network.runNetwork() assertEquals(2, fetchPsm.get()?.count()) @@ -123,12 +123,12 @@ class InMemoryNetworkMapServiceTest { val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD val seq = 2L val reg = NodeRegistration(registerNode.info, seq, AddOrRemove.REMOVE, expires) - val registerPsm = registerNode.smm.add(NetworkMapService.REGISTER_PROTOCOL_TOPIC, TestRegisterPSM(mapServiceNode.info, reg, nodeKey.private)) + val registerPsm = registerNode.services.startProtocol(NetworkMapService.REGISTER_PROTOCOL_TOPIC, TestRegisterPSM(mapServiceNode.info, reg, nodeKey.private)) network.runNetwork() assertTrue(registerPsm.get().success) // Now only map service node should be registered - fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false)) + fetchPsm = registerNode.services.startProtocol(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false)) network.runNetwork() assertEquals(mapServiceNode.info, fetchPsm.get()?.filter { it.type == AddOrRemove.ADD }?.map { it.node }?.single()) } @@ -140,7 +140,7 @@ class InMemoryNetworkMapServiceTest { // Test subscribing to updates network.runNetwork() - val subscribePsm = registerNode.smm.add(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, + val subscribePsm = registerNode.services.startProtocol(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, TestSubscribePSM(mapServiceNode.info, true)) network.runNetwork() subscribePsm.get() @@ -161,7 +161,7 @@ class InMemoryNetworkMapServiceTest { // Send in an acknowledgment and verify the count goes down val hash = SecureHash.sha256(wireReg.raw.bits) - val acknowledgePsm = registerNode.smm.add(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, + val acknowledgePsm = registerNode.services.startProtocol(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, TestAcknowledgePSM(mapServiceNode.info, hash)) network.runNetwork() acknowledgePsm.get() diff --git a/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt index d1273e6dd6..bded65214b 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt @@ -62,7 +62,7 @@ open class MockServiceHubInternal( lateinit var smm: StateMachineManager override fun startProtocol(loggerName: String, logic: ProtocolLogic): ListenableFuture { - return smm.add(loggerName, logic) + return smm.add(loggerName, logic).resultFuture } init { diff --git a/node/src/test/kotlin/com/r3corda/node/services/NodeInterestRatesTest.kt b/node/src/test/kotlin/com/r3corda/node/services/NodeInterestRatesTest.kt index 3dfeae6b26..93aeb339ed 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/NodeInterestRatesTest.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/NodeInterestRatesTest.kt @@ -111,7 +111,7 @@ class NodeInterestRatesTest { val protocol = RatesFixProtocol(tx, n2.info.identity, fixOf, "0.675".bd, "0.1".bd) LogHelper.setLevel("rates") net.runNetwork() - val future = n1.smm.add("rates", protocol) + val future = n1.services.startProtocol("rates", protocol) net.runNetwork() future.get() diff --git a/node/src/test/kotlin/com/r3corda/node/services/NotaryChangeTests.kt b/node/src/test/kotlin/com/r3corda/node/services/NotaryChangeTests.kt index 94bcddb9df..32b84ca8bf 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/NotaryChangeTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/NotaryChangeTests.kt @@ -49,7 +49,7 @@ class NotaryChangeTests { val state = issueState(clientNodeA) val newNotary = newNotaryNode.info.identity val protocol = Instigator(state, newNotary) - val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC, protocol) + val future = clientNodeA.services.startProtocol(NotaryChangeProtocol.TOPIC, protocol) net.runNetwork() @@ -62,7 +62,7 @@ class NotaryChangeTests { val state = issueMultiPartyState(clientNodeA, clientNodeB) val newNotary = newNotaryNode.info.identity val protocol = Instigator(state, newNotary) - val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC, protocol) + val future = clientNodeA.services.startProtocol(NotaryChangeProtocol.TOPIC, protocol) net.runNetwork() @@ -78,7 +78,7 @@ class NotaryChangeTests { val state = issueMultiPartyState(clientNodeA, clientNodeB) val newEvilNotary = Party("Evil Notary", generateKeyPair().public) val protocol = Instigator(state, newEvilNotary) - val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC, protocol) + val future = clientNodeA.services.startProtocol(NotaryChangeProtocol.TOPIC, protocol) net.runNetwork() diff --git a/node/src/test/kotlin/com/r3corda/node/services/NotaryServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/NotaryServiceTests.kt index eaa1a830a6..d37d7be1b5 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/NotaryServiceTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/NotaryServiceTests.kt @@ -46,7 +46,7 @@ class NotaryServiceTests { } val protocol = NotaryProtocol.Client(stx) - val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol) + val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol) net.runNetwork() val signature = future.get() @@ -62,7 +62,7 @@ class NotaryServiceTests { } val protocol = NotaryProtocol.Client(stx) - val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol) + val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol) net.runNetwork() val signature = future.get() @@ -79,7 +79,7 @@ class NotaryServiceTests { } val protocol = NotaryProtocol.Client(stx) - val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol) + val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol) net.runNetwork() val ex = assertFailsWith(ExecutionException::class) { future.get() } @@ -98,8 +98,8 @@ class NotaryServiceTests { val firstSpend = NotaryProtocol.Client(stx) val secondSpend = NotaryProtocol.Client(stx) - clientNode.smm.add("${NotaryProtocol.TOPIC}.first", firstSpend) - val future = clientNode.smm.add("${NotaryProtocol.TOPIC}.second", secondSpend) + clientNode.services.startProtocol("${NotaryProtocol.TOPIC}.first", firstSpend) + val future = clientNode.services.startProtocol("${NotaryProtocol.TOPIC}.second", secondSpend) net.runNetwork() diff --git a/node/src/test/kotlin/com/r3corda/node/services/ValidatingNotaryServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/ValidatingNotaryServiceTests.kt index f430be55e1..e03ab0afb6 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/ValidatingNotaryServiceTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/ValidatingNotaryServiceTests.kt @@ -45,7 +45,7 @@ class ValidatingNotaryServiceTests { } val protocol = NotaryProtocol.Client(stx) - val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol) + val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol) net.runNetwork() val ex = assertFailsWith(ExecutionException::class) { future.get() } @@ -65,7 +65,7 @@ class ValidatingNotaryServiceTests { } val protocol = NotaryProtocol.Client(stx) - val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol) + val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol) net.runNetwork() val ex = assertFailsWith(ExecutionException::class) { future.get() } diff --git a/node/src/test/kotlin/com/r3corda/node/services/WalletMonitorServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/WalletMonitorServiceTests.kt index 853a52a641..ecaae0a627 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/WalletMonitorServiceTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/WalletMonitorServiceTests.kt @@ -22,10 +22,7 @@ import org.junit.Before import org.junit.Test import java.util.* import java.util.concurrent.TimeUnit -import kotlin.test.assertEquals -import kotlin.test.assertFalse -import kotlin.test.assertTrue -import kotlin.test.fail +import kotlin.test.* /** * Unit tests for the wallet monitoring service. @@ -44,7 +41,7 @@ class WalletMonitorServiceTests { private fun authenticate(monitorServiceNode: MockNetwork.MockNode, registerNode: MockNetwork.MockNode): Long { network.runNetwork() val sessionID = random63BitValue() - val authenticatePsm = registerNode.smm.add(WalletMonitorService.REGISTER_TOPIC, + val authenticatePsm = registerNode.services.startProtocol(WalletMonitorService.REGISTER_TOPIC, TestRegisterPSM(monitorServiceNode.info, sessionID)) network.runNetwork() authenticatePsm.get(1, TimeUnit.SECONDS) @@ -78,7 +75,7 @@ class WalletMonitorServiceTests { network.runNetwork() val sessionID = random63BitValue() - val authenticatePsm = registerNode.smm.add(WalletMonitorService.REGISTER_TOPIC, + val authenticatePsm = registerNode.services.startProtocol(WalletMonitorService.REGISTER_TOPIC, TestRegisterPSM(monitorServiceNode.info, sessionID)) network.runNetwork() val result = authenticatePsm.get(1, TimeUnit.SECONDS) @@ -92,7 +89,7 @@ class WalletMonitorServiceTests { fun `event received`() { val (monitorServiceNode, registerNode) = network.createTwoNodes() val sessionID = authenticate(monitorServiceNode, registerNode) - var receivePsm = registerNode.smm.add(WalletMonitorService.IN_EVENT_TOPIC, + var receivePsm = registerNode.services.startProtocol(WalletMonitorService.IN_EVENT_TOPIC, TestReceiveWalletUpdatePSM(sessionID)) var expected = Wallet.Update(emptySet(), emptySet()) monitorServiceNode.inNodeWalletMonitorService!!.notifyWalletUpdate(expected) @@ -102,7 +99,7 @@ class WalletMonitorServiceTests { assertEquals(expected.produced, actual.produced) // Check that states are passed through correctly - receivePsm = registerNode.smm.add(WalletMonitorService.IN_EVENT_TOPIC, + receivePsm = registerNode.services.startProtocol(WalletMonitorService.IN_EVENT_TOPIC, TestReceiveWalletUpdatePSM(sessionID)) val consumed = setOf(StateRef(SecureHash.randomSHA256(), 0)) val producedState = TransactionState(DummyContract.SingleOwnerState(newSecureRandom().nextInt(), DUMMY_PUBKEY_1), DUMMY_NOTARY) @@ -131,29 +128,30 @@ class WalletMonitorServiceTests { assertFalse(monitorServiceNode.services.walletService.currentWallet.states.iterator().hasNext()) // Tell the monitoring service node to issue some cash - val recipientKey = monitorServiceNode.services.storageService.myLegalIdentityKey.public - val outEvent = ClientToServiceCommand.IssueCash(GBP, ref, quantity, recipientKey, DUMMY_NOTARY) + val recipient = monitorServiceNode.services.storageService.myLegalIdentity + val outEvent = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), ref, recipient, DUMMY_NOTARY) val message = registerNode.net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID, ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, outEvent).serialize().bits) registerNode.net.send(message, monitorServiceNode.net.myAddress) network.runNetwork() + val expectedState = Cash.State(Amount(quantity, + Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)), + recipient.owningKey) + // Check we've received a response events.forEach { event -> when (event) { is ServiceToClientEvent.TransactionBuild -> { // Check the returned event is correct - val actual = event.state as TransactionBuildResult.Complete - val expected = TransactionBuildResult.Complete(actual.transaction, null) - assertEquals(expected, actual) + val tx = (event.state as TransactionBuildResult.ProtocolStarted).transaction + assertNotNull(tx) + assertEquals(expectedState, tx!!.tx.outputs.single().data) } is ServiceToClientEvent.OutputState -> { // Check the generated state is correct val actual = event.produced.single().state.data - val expected = Cash.State(Amount(quantity, - Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)), - recipientKey) - assertEquals(expected, actual) + assertEquals(expectedState, actual) } else -> fail("Unexpected in event ${event}") } @@ -178,32 +176,33 @@ class WalletMonitorServiceTests { assertFalse(monitorServiceNode.services.walletService.currentWallet.states.iterator().hasNext()) // Tell the monitoring service node to issue some cash - val recipientKey = monitorServiceNode.services.storageService.myLegalIdentityKey.public - val outEvent = ClientToServiceCommand.IssueCash(GBP, ref, quantity, recipientKey, DUMMY_NOTARY) + val recipient = monitorServiceNode.services.storageService.myLegalIdentity + val outEvent = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), ref, recipient, DUMMY_NOTARY) val message = registerNode.net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID, ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, outEvent).serialize().bits) registerNode.net.send(message, monitorServiceNode.net.myAddress) network.runNetwork() + val expectedState = Cash.State(Amount(quantity, + Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)), + recipient.owningKey) + // Check we've received a response events.forEach { event -> when (event) { is ServiceToClientEvent.TransactionBuild -> { // Check the returned event is correct - val actual = event.state as TransactionBuildResult.Complete - val expected = TransactionBuildResult.Complete(actual.transaction, null) - assertEquals(expected, actual) + val tx = (event.state as TransactionBuildResult.ProtocolStarted).transaction + assertNotNull(tx) + assertEquals(expectedState, tx!!.tx.outputs.single().data) } is ServiceToClientEvent.OutputState -> { // Check the generated state is correct val actual = event.produced.single().state.data - val expected = Cash.State(Amount(quantity, - Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)), - recipientKey) - assertEquals(expected, actual) + assertEquals(expectedState, actual) } else -> fail("Unexpected in event ${event}") } } } -} \ No newline at end of file +} diff --git a/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt index 66a09a4b3b..8a2f06773e 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/persistence/DataVendingServiceTests.kt @@ -41,12 +41,9 @@ class DataVendingServiceTests { ptx.signWith(registerNode.services.storageService.myLegalIdentityKey) val tx = ptx.toSignedTransaction() assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size) - val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity, + DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity, walletServiceNode.info, tx) - - // Check it was accepted network.runNetwork() - notifyPsm.get(1, TimeUnit.SECONDS) // Check the transaction is in the receiving node val actual = walletServiceNode.services.walletService.currentWallet.states.singleOrNull() @@ -73,14 +70,9 @@ class DataVendingServiceTests { ptx.signWith(registerNode.services.storageService.myLegalIdentityKey) val tx = ptx.toSignedTransaction(false) assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size) - val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity, + DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity, walletServiceNode.info, tx) - - // Check it was not accepted network.runNetwork() - assertFailsWith { - rootCauseExceptions { notifyPsm.get() } - } // Check the transaction is not in the receiving node assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size) diff --git a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt index 2010f4f24a..407b261ca4 100644 --- a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt @@ -87,7 +87,7 @@ fun main(args: Array) { val tx = TransactionType.General.Builder(notaryNode.identity) tx.addOutputState(TransactionState(Cash.State(1500.DOLLARS `issued by` node.storage.myLegalIdentity.ref(1), node.keyManagement.freshKey().public), notaryNode.identity)) val protocol = RatesFixProtocol(tx, rateOracle.identity, fixOf, expectedRate, rateTolerance) - node.smm.add("demo.ratefix", protocol).get() + node.services.startProtocol("demo.ratefix", protocol).get() node.stop() // Show the user the output. diff --git a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt index 7c3ce5c88c..3b4ab79a40 100644 --- a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt @@ -182,7 +182,7 @@ private fun runSeller(node: Node, amount: Amount, otherSide: Party) { tradeTX = node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).single().second } else { val seller = TraderDemoProtocolSeller(otherSide, amount) - tradeTX = node.smm.add("demo.seller", seller) + tradeTX = node.services.startProtocol("demo.seller", seller) } tradeTX.success { @@ -217,7 +217,7 @@ private fun runBuyer(node: Node, amount: Amount) { // We use a simple scenario-specific wrapper protocol to make things happen. val otherSide = message.data.deserialize() val buyer = TraderDemoProtocolBuyer(otherSide, attachmentsPath, amount) - node.smm.add("demo.buyer", buyer) + node.services.startProtocol("demo.buyer", buyer) } }