mirror of
synced 2025-03-23 12:35:23 +00:00
Merged in rnicoll-commit-tx-protocol (pull request #286)
Add CommitTransactionProtocol
This commit is contained in:
@ -61,7 +61,7 @@ class CashTests {
fun issueMoney() {
fun `issue by move`() {
// Check we can't "move" money into existence.
transaction {
input { DummyState() }
@ -70,7 +70,10 @@ class CashTests {
this `fails with` "there is at least one asset input"
fun issue() {
// Check we can issue money only as long as the issuer institution is a command signer, i.e. any recognised
// institution is allowed to issue as much cash as they want.
transaction {
@ -92,25 +95,38 @@ class CashTests {
command(MINI_CORP_PUBKEY) { Cash.Commands.Issue() }
fun generateIssueRaw() {
// Test generation works.
val ptx = TransactionType.General.Builder(DUMMY_NOTARY)
Cash().generateIssue(ptx, 100.DOLLARS `issued by` MINI_CORP.ref(12, 34), owner = DUMMY_PUBKEY_1, notary = DUMMY_NOTARY)
val s = ptx.outputStates()[0].data as Cash.State
val tx: WireTransaction = TransactionType.General.Builder(notary = null).apply {
Cash().generateIssue(this, 100.DOLLARS `issued by` MINI_CORP.ref(12, 34), owner = DUMMY_PUBKEY_1, notary = DUMMY_NOTARY)
val s = tx.outputs[0].data as Cash.State
assertEquals(100.DOLLARS `issued by` MINI_CORP.ref(12, 34), s.amount)
assertEquals(MINI_CORP, s.deposit.party)
assertEquals(DUMMY_PUBKEY_1, s.owner)
assertTrue(ptx.commands()[0].value is Cash.Commands.Issue)
assertEquals(MINI_CORP_PUBKEY, ptx.commands()[0].signers[0])
assertTrue(tx.commands[0].value is Cash.Commands.Issue)
assertEquals(MINI_CORP_PUBKEY, tx.commands[0].signers[0])
// Test issuance from the issuance definition
fun generateIssueFromAmount() {
// Test issuance from an issued amount
val amount = 100.DOLLARS `issued by` MINI_CORP.ref(12, 34)
val templatePtx = TransactionType.General.Builder(DUMMY_NOTARY)
Cash().generateIssue(templatePtx, amount, owner = DUMMY_PUBKEY_1, notary = DUMMY_NOTARY)
assertEquals(ptx.outputStates()[0], templatePtx.outputStates()[0])
val tx: WireTransaction = TransactionType.General.Builder(notary = null).apply {
Cash().generateIssue(this, amount, owner = DUMMY_PUBKEY_1, notary = DUMMY_NOTARY)
assertEquals(tx.outputs[0], tx.outputs[0])
fun `extended issue examples`() {
// We can consume $1000 in a transaction and output $2000 as long as it's signed by an issuer.
transaction {
input { issuerInState }
@ -119,19 +119,21 @@ class ObligationTests {
// Test generation works.
val ptx = TransactionType.General.Builder(DUMMY_NOTARY)
Obligation<Currency>().generateIssue(ptx, MINI_CORP, megaCorpDollarSettlement, 100.DOLLARS.quantity,
beneficiary = DUMMY_PUBKEY_1, notary = DUMMY_NOTARY)
val tx = TransactionType.General.Builder(notary = null).apply {
Obligation<Currency>().generateIssue(this, MINI_CORP, megaCorpDollarSettlement, 100.DOLLARS.quantity,
beneficiary = DUMMY_PUBKEY_1, notary = DUMMY_NOTARY)
val expected = Obligation.State(
obligor = MINI_CORP,
quantity = 100.DOLLARS.quantity,
beneficiary = DUMMY_PUBKEY_1,
template = megaCorpDollarSettlement
assertEquals(ptx.outputStates()[0].data, expected)
assertTrue(ptx.commands()[0].value is Obligation.Commands.Issue)
assertEquals(MINI_CORP_PUBKEY, ptx.commands()[0].signers[0])
assertEquals(tx.outputs[0].data, expected)
assertTrue(tx.commands[0].value is Obligation.Commands.Issue)
assertEquals(MINI_CORP_PUBKEY, tx.commands[0].signers[0])
// We can consume $1000 in a transaction and output $2000 as long as it's signed by an issuer.
transaction {
@ -0,0 +1,49 @@
package com.r3corda.core.contracts
import com.r3corda.core.crypto.Party
import com.r3corda.core.serialization.OpaqueBytes
import java.security.PublicKey
import java.util.*
* A command from the monitoring client, to the node.
* @param id ID used to tag event(s) resulting from a command.
sealed class ClientToServiceCommand(val id: UUID) {
* Issue cash state objects.
* @param amount the amount of currency to issue on to the ledger.
* @param issueRef the reference to specify on the issuance, used to differentiate pools of cash. Convention is
* to use the single byte "0x01" as a default.
* @param recipient the party to issue the cash to.
* @param notary the notary to use for this transaction.
* @param id the ID to be provided in events resulting from this request.
class IssueCash(val amount: Amount<Currency>,
val issueRef: OpaqueBytes,
val recipient: Party,
val notary: Party,
id: UUID = UUID.randomUUID()) : ClientToServiceCommand(id)
* Pay cash to someone else.
* @param amount the amount of currency to issue on to the ledger.
* @param recipient the party to issue the cash to.
* @param id the ID to be provided in events resulting from this request.
class PayCash(val amount: Amount<Issued<Currency>>, val recipient: Party,
id: UUID = UUID.randomUUID()) : ClientToServiceCommand(id)
* Exit cash from the ledger.
* @param amount the amount of currency to exit from the ledger.
* @param issueRef the reference previously specified on the issuance.
* @param id the ID to be provided in events resulting from this request.
class ExitCash(val amount: Amount<Currency>, val issueRef: OpaqueBytes,
id: UUID = UUID.randomUUID()) : ClientToServiceCommand(id)
@ -1,6 +1,7 @@
package com.r3corda.core.protocols
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.crypto.Party
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.utilities.UntrustworthyData
@ -23,4 +24,9 @@ interface ProtocolStateMachine<R> {
val serviceHub: ServiceHub
val logger: Logger
/** Unique ID for this machine, valid only while it is in memory. */
val machineId: Long
/** This future will complete when the call method returns. */
val resultFuture: ListenableFuture<R>
@ -0,0 +1,56 @@
package com.r3corda.protocols
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.serialize
import java.util.*
* Notify all involved parties about a transaction, including storing a copy. Normally this would be called via
* [FinalityProtocol].
* @param notarisedTransaction transaction which has been notarised (if needed) and is ready to notify nodes about.
* @param events information on the event(s) which triggered the transaction.
* @param participants a list of participants involved in the transaction.
* @return a list of participants who were successfully notified of the transaction.
// TODO: Event needs to be replaced with something that's meaningful, but won't ever contain sensitive
// information (such as internal details of an account to take payment from). Suggest
// splitting ClientToServiceCommand into public and private parts, with only the public parts
// relayed here.
class BroadcastTransactionProtocol(val notarisedTransaction: SignedTransaction,
val events: Set<ClientToServiceCommand>,
val participants: Set<Party>) : ProtocolLogic<Unit>() {
companion object {
/** Topic for messages notifying a node of a new transaction */
val TOPIC = "platform.wallet.notify_tx"
override val topic: String = TOPIC
data class NotifyTxRequestMessage(
val tx: SignedTransaction,
val events: Set<ClientToServiceCommand>,
override val replyToParty: Party,
override val sessionID: Long
) : PartyRequestMessage
override fun call() {
// Record it locally
// TODO: Messaging layer should handle this broadcast for us (although we need to not be sending
// session ID, for that to work, as well).
participants.filter { it != serviceHub.storageService.myLegalIdentity }.forEach { participant ->
val sessionID = random63BitValue()
val msg = NotifyTxRequestMessage(notarisedTransaction, events, serviceHub.storageService.myLegalIdentity, sessionID)
send(participant, 0, msg)
@ -0,0 +1,63 @@
package com.r3corda.protocols
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.contracts.TransactionBuilder
import com.r3corda.core.contracts.WireTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.ProgressTracker
import java.util.*
* Finalise a transaction by notarising it, then recording it locally, and then sending it to all involved parties.
* @param transaction to commit.
* @param events information on the event(s) which triggered the transaction.
* @param participants a list of participants involved in the transaction.
* @return a list of participants who were successfully notified of the transaction.
// TODO: Event needs to be replaced with something that's meaningful, but won't ever contain sensitive
// information (such as internal details of an account to take payment from). Suggest
// splitting ClientToServiceCommand into public and private parts, with only the public parts
// relayed here.
class FinalityProtocol(val transaction: SignedTransaction,
val events: Set<ClientToServiceCommand>,
val participants: Set<Party>,
override val progressTracker: ProgressTracker = tracker()): ProtocolLogic<Unit>() {
companion object {
object NOTARISING : ProgressTracker.Step("Requesting signature by notary service")
object BROADCASTING : ProgressTracker.Step("Broadcasting transaction to participants")
fun tracker() = ProgressTracker(NOTARISING, BROADCASTING)
override val topic: String
get() = throw UnsupportedOperationException()
override fun call() {
progressTracker.currentStep = NOTARISING
// Notarise the transaction if needed
val notarisedTransaction = if (needsNotarySignature(transaction)) {
val notarySig = subProtocol(NotaryProtocol.Client(transaction))
} else {
// Let everyone else know about the transaction
progressTracker.currentStep = BROADCASTING
subProtocol(BroadcastTransactionProtocol(notarisedTransaction, events, participants))
private fun needsNotarySignature(transaction: SignedTransaction) = expectsNotarySignature(transaction.tx) && hasNoNotarySignature(transaction)
private fun expectsNotarySignature(transaction: WireTransaction) = transaction.notary != null && transaction.notary.owningKey in transaction.signers
private fun hasNoNotarySignature(transaction: SignedTransaction) = transaction.tx.notary?.owningKey !in transaction.sigs.map { it.by }
@ -98,9 +98,26 @@ To run one of these services the node has to simply specify either ``SimpleNotar
Obtaining a signature
To obtain a signature from a notary use ``NotaryProtocol.Client``, passing in a ``WireTransaction``.
The protocol will work out which notary needs to be called based on the input states and the timestamp command.
For example, the following snippet can be used when writing a custom protocol:
Once a transaction is built and ready to be finalised, normally you would call ``FinalityProtocol`` passing in a
``SignedTransaction`` (including signatures from the participants) and a list of participants to notify. This requests a
notary signature if needed, and then sends a copy of the notarised transaction to all participants for them to store.
``FinalityProtocol`` delegates to ``NotaryProtocol.Client`` followed by ``BroadcastTransactionProtocol`` to do the
actual work of notarising and broadcasting the transaction. For example:
.. sourcecode:: kotlin
fun finaliseTransaction(serviceHub: ServiceHubInternal, ptx: TransactionBuilder, participants: Set<Party>)
: ListenableFuture<Unit> {
// We conclusively cannot have all the signatures, as the notary has not signed yet
val tx = ptx.toSignedTransaction(checkSufficientSignatures = false)
// The empty set would be the trigger events, which are not used here
val protocol = FinalityProtocol(tx, emptySet(), participants)
return serviceHub.startProtocol("protocol.finalisation", protocol)
To manually obtain a signature from a notary you can call ``NotaryProtocol.Client`` directly. The protocol will work out
which notary needs to be called based on the input states and the timestamp command. For example, the following snippet
can be used when writing a custom protocol:
.. sourcecode:: kotlin
@ -106,7 +106,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
override val protocolLogicRefFactory: ProtocolLogicRefFactory get() = protocolLogicFactory
override fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
return smm.add(loggerName, logic)
return smm.add(loggerName, logic).resultFuture
override fun recordTransactions(txs: Iterable<SignedTransaction>) =
@ -130,9 +130,9 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
showProgressFor(listOf(node1, node2))
showConsensusFor(listOf(node1, node2, regulators[0]))
val instigatorFuture: ListenableFuture<SignedTransaction> = node1.smm.add("instigator", instigator)
val instigatorFuture: ListenableFuture<SignedTransaction> = node1.services.startProtocol("instigator", instigator)
return Futures.transformAsync(Futures.allAsList(instigatorFuture, node2.smm.add("acceptor", acceptor))) {
return Futures.transformAsync(Futures.allAsList(instigatorFuture, node2.services.startProtocol("acceptor", acceptor))) {
@ -6,6 +6,7 @@ import com.r3corda.contracts.CommercialPaper
import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER
import com.r3corda.contracts.testing.fillWithSomeTestCash
import com.r3corda.core.contracts.DOLLARS
import com.r3corda.core.contracts.OwnableState
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.contracts.`issued by`
import com.r3corda.core.days
@ -52,7 +53,7 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
val sellerProtocol = TwoPartyTradeProtocol.Seller(
@ -60,8 +61,8 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
showConsensusFor(listOf(buyer, seller, notary))
showProgressFor(listOf(buyer, seller))
val buyerFuture = buyer.smm.add("bank.$buyerBankIndex.${TwoPartyTradeProtocol.TOPIC}.buyer", buyerProtocol)
val sellerFuture = seller.smm.add("bank.$sellerBankIndex.${TwoPartyTradeProtocol.TOPIC}.seller", sellerProtocol)
val buyerFuture = buyer.services.startProtocol("bank.$buyerBankIndex.${TwoPartyTradeProtocol.TOPIC}.buyer", buyerProtocol)
val sellerFuture = seller.services.startProtocol("bank.$sellerBankIndex.${TwoPartyTradeProtocol.TOPIC}.seller", sellerProtocol)
return Futures.successfulAsList(buyerFuture, sellerFuture)
@ -47,41 +47,3 @@ sealed class TransactionBuildResult {
class Failed(val message: String?) : TransactionBuildResult()
* A command from the monitoring client, to the node.
* @param id ID used to tag event(s) resulting from a command.
sealed class ClientToServiceCommand(val id: UUID) {
// TODO: Replace with a generic event for starting a protocol which then passes back required information, rather
// than using an event for every conceivable action.
* Issue cash state objects.
* @param currency the currency to issue.
* @param issueRef the reference to specify on the issuance, used to differentiate pools of cash. Convention is
* to use the single byte "0x01" as a default.
* @param pennies the amount to issue, in the smallest unit of the currency.
* @param recipient the public key of the recipient.
* @param notary the notary to use for this transaction.
* @param id the ID to be provided in events resulting from this request.
class IssueCash(val currency: Currency,
val issueRef: OpaqueBytes,
val pennies: Long,
val recipient: PublicKey,
val notary: Party,
id: UUID = UUID.randomUUID())
: ClientToServiceCommand(id)
class PayCash(val tokenDef: Issued<Currency>, val pennies: Long, val owner: PublicKey,
id: UUID = UUID.randomUUID())
: ClientToServiceCommand(id)
* @param id the ID to be provided in events resulting from this request.
class ExitCash(val currency: Currency, val issueRef: OpaqueBytes, val pennies: Long,
id: UUID = UUID.randomUUID())
: ClientToServiceCommand(id)
@ -1,5 +1,6 @@
package com.r3corda.node.services.monitor
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.protocols.DirectRequestMessage
@ -1,28 +1,26 @@
package com.r3corda.node.services.monitor
import co.paralleluniverse.common.util.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.contracts.asset.Cash
import com.r3corda.contracts.asset.InsufficientBalanceException
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.toStringShort
import com.r3corda.core.messaging.Message
import com.r3corda.core.messaging.MessageRecipients
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.Wallet
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.persistence.DataVending
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.utilities.AddOrRemove
import org.slf4j.LoggerFactory
import com.r3corda.protocols.BroadcastTransactionProtocol
import com.r3corda.protocols.FinalityProtocol
import java.security.KeyPair
import java.security.PublicKey
import java.time.Instant
import java.util.*
import javax.annotation.concurrent.ThreadSafe
@ -154,36 +152,22 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager,
* Notifies the node associated with the [recipient] public key. Returns a future holding a Boolean of whether the
* node accepted the transaction or not.
private fun notifyRecipientAboutTransaction(
recipient: PublicKey,
transaction: SignedTransaction
): ListenableFuture<Unit> {
val recipientNodeInfo = services.networkMapCache.getNodeByPublicKey(recipient) ?: throw PublicKeyLookupFailed(recipient)
return DataVending.Service.notify(net, services.storageService.myLegalIdentity,
recipientNodeInfo, transaction)
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
private fun initatePayment(req: ClientToServiceCommand.PayCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(null)
// TODO: Have some way of restricting this to states the caller controls
try {
Cash().generateSpend(builder, Amount(req.pennies, req.tokenDef.product), req.owner,
Cash().generateSpend(builder, req.amount.withoutIssuer(), req.recipient.owningKey,
// TODO: Move cash state filtering by issuer down to the contract itself
services.walletService.currentWallet.statesOfType<Cash.State>().filter { it.state.data.amount.token == req.tokenDef },
services.walletService.currentWallet.statesOfType<Cash.State>().filter { it.state.data.amount.token == req.amount.token },
.forEach {
val key = services.keyManagementService.keys[it] ?: throw IllegalStateException("Could not find signing key for ${it.toStringShort()}")
builder.signWith(KeyPair(it, key))
val tx = builder.toSignedTransaction()
notifyRecipientAboutTransaction(req.owner, tx)
return TransactionBuildResult.Complete(tx, "Cash payment completed")
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
val protocol = FinalityProtocol(tx, setOf(req), setOf(req.recipient))
return TransactionBuildResult.ProtocolStarted(smm.add(BroadcastTransactionProtocol.TOPIC, protocol).machineId, tx, "Cash payment transaction generated")
} catch(ex: InsufficientBalanceException) {
return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance")
@ -193,39 +177,40 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager,
private fun exitCash(req: ClientToServiceCommand.ExitCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(null)
val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef)
Cash().generateExit(builder, Amount(req.pennies, Issued(issuer, req.currency)),
Cash().generateExit(builder, req.amount.issuedBy(issuer),
services.walletService.currentWallet.statesOfType<Cash.State>().filter { it.state.data.owner == issuer.party.owningKey })
val tx = builder.toSignedTransaction()
// Notify the owners
val inputStatesNullable = services.walletService.statesForRefs(tx.tx.inputs)
// Work out who the owners of the burnt states were
val inputStatesNullable = services.walletService.statesForRefs(builder.inputStates())
val inputStates = inputStatesNullable.values.filterNotNull().map { it.data }
if (inputStatesNullable.size != inputStates.size) {
val unresolvedStateRefs = inputStatesNullable.filter { it.value == null }.map { it.key }
throw InputStateRefResolveFailed(unresolvedStateRefs)
inputStates.filterIsInstance<Cash.State>().map { it.owner }.toSet().forEach {
notifyRecipientAboutTransaction(it, tx)
return TransactionBuildResult.Complete(tx, "Cash destruction completed")
// TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them
// count as a reason to fail?
val participants: Set<Party> = inputStates.filterIsInstance<Cash.State>().map { services.identityService.partyFromKey(it.owner) }.filterNotNull().toSet()
// Commit the transaction
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
val protocol = FinalityProtocol(tx, setOf(req), participants)
return TransactionBuildResult.ProtocolStarted(smm.add(BroadcastTransactionProtocol.TOPIC, protocol).machineId, tx, "Cash destruction transaction generated")
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
private fun issueCash(req: ClientToServiceCommand.IssueCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(notary = req.notary)
val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef)
Cash().generateIssue(builder, Amount(req.pennies, Issued(issuer, req.currency)), req.recipient, req.notary)
Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary)
val tx = builder.toSignedTransaction()
notifyRecipientAboutTransaction(req.recipient, tx)
return TransactionBuildResult.Complete(tx, "Cash issuance completed")
val tx = builder.toSignedTransaction(checkSufficientSignatures = true)
// Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it
val protocol = BroadcastTransactionProtocol(tx, setOf(req), setOf(req.recipient))
return TransactionBuildResult.ProtocolStarted(smm.add(BroadcastTransactionProtocol.TOPIC, protocol).machineId, tx, "Cash issuance completed")
class PublicKeyLookupFailed(failedPublicKey: PublicKey) :
Exception("Failed to lookup public keys $failedPublicKey")
class InputStateRefResolveFailed(stateRefs: List<StateRef>) :
Exception("Failed to resolve input StateRefs $stateRefs")
@ -47,34 +47,21 @@ object DataVending {
companion object {
val logger = loggerFor<DataVending.Service>()
/** Topic for messages notifying a node of a new transaction */
val NOTIFY_TX_PROTOCOL_TOPIC = "platform.wallet.notify_tx"
* Notify a node of a transaction. Normally any notarisation required would happen before this is called.
fun notify(net: MessagingService,
myIdentity: Party,
recipient: NodeInfo,
transaction: SignedTransaction): ListenableFuture<Unit> {
val future = SettableFuture.create<Unit>()
transaction: SignedTransaction) {
val sessionID = random63BitValue()
net.runOnNextMessage(NOTIFY_TX_PROTOCOL_TOPIC, sessionID) { msg ->
// TODO: Can we improve/simplify the response from the remote node?
val data = msg.data.deserialize<NotifyTxResponseMessage>()
if (data.accepted) {
} else {
future.setException(TransactionRejectedError("Transaction $transaction rejected by remote party ${recipient.identity}"))
val msg = NotifyTxRequestMessage(transaction, myIdentity, sessionID)
net.send(net.createMessage(TopicSession(NOTIFY_TX_PROTOCOL_TOPIC, 0), msg.serialize().bits), recipient.address)
return future
val msg = BroadcastTransactionProtocol.NotifyTxRequestMessage(transaction, emptySet(), myIdentity, sessionID)
net.send(net.createMessage(TopicSession(BroadcastTransactionProtocol.TOPIC, 0), msg.serialize().bits), recipient.address)
val storage = services.storageService
data class NotifyTxRequestMessage(val tx: SignedTransaction, override val replyToParty: Party, override val sessionID: Long) : PartyRequestMessage
data class NotifyTxResponseMessage(val accepted: Boolean)
class TransactionRejectedError(msg: String) : Exception(msg)
init {
@ -86,29 +73,24 @@ object DataVending {
{ req: FetchDataProtocol.Request -> handleAttachmentRequest(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
{ req: NotifyTxRequestMessage -> handleTXNotification(req) },
{ req: BroadcastTransactionProtocol.NotifyTxRequestMessage -> handleTXNotification(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
private fun handleTXNotification(req: NotifyTxRequestMessage): Unit {
private fun handleTXNotification(req: BroadcastTransactionProtocol.NotifyTxRequestMessage): Unit {
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty))
services.startProtocol("Resolving transactions", ResolveTransactionsProtocol(req.tx, req.replyToParty))
.success {
val resp = NotifyTxResponseMessage(true)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}.failure { throwable ->
val resp = NotifyTxResponseMessage(false)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
logger.warn("Received invalid transaction ${req.tx.id} from ${req.replyToParty}", throwable)
@ -136,4 +118,4 @@ object DataVending {
@ -44,13 +44,18 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
@Transient private var _resultFuture: SettableFuture<R>? = SettableFuture.create<R>()
/** This future will complete when the call method returns. */
val resultFuture: ListenableFuture<R> get() {
override val resultFuture: ListenableFuture<R> get() {
return _resultFuture ?: run {
val f = SettableFuture.create<R>()
_resultFuture = f
return f
* Unique ID for the deserialized instance protocol state machine. This is NOT maintained across a state machine
* being serialized and then deserialized.
override val machineId: Long get() = this.id
init {
logic.psm = this
@ -12,6 +12,7 @@ import com.r3corda.core.messaging.TopicSession
import com.r3corda.core.messaging.runOnNextMessage
import com.r3corda.core.messaging.send
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolStateMachine
import com.r3corda.core.serialization.*
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.core.utilities.trace
@ -215,7 +216,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
* The state machine will be persisted when it suspends, with automated restart if the StateMachineManager is
* restarted with checkpointed state machines in the storage service.
fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ProtocolStateMachine<T> {
val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName)
// Need to add before iterating in case of immediate completion
initFiber(fiber) {
@ -244,7 +245,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
if (e.cause !is ExecutionException)
throw e
return fiber.resultFuture
return fiber
private fun updateCheckpoint(psm: ProtocolStateMachineImpl<*>,
@ -55,7 +55,7 @@ class AttachmentTests {
// Get node one to run a protocol to fetch it and insert it.
val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity))
val f1 = n1.services.startProtocol("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity))
assertEquals(0, f1.get().fromDisk.size)
@ -66,7 +66,7 @@ class AttachmentTests {
// Shut down node zero and ensure node one can still resolve the attachment.
val response: FetchDataProtocol.Result<Attachment> = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)).get()
val response: FetchDataProtocol.Result<Attachment> = n1.services.startProtocol("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)).get()
assertEquals(attachment, response.fromDisk[0])
@ -77,7 +77,7 @@ class AttachmentTests {
// Get node one to fetch a non-existent attachment.
val hash = SecureHash.randomSHA256()
val f1 = n1.smm.add("tests.fetch2", FetchAttachmentsProtocol(setOf(hash), n0.info.identity))
val f1 = n1.services.startProtocol("tests.fetch2", FetchAttachmentsProtocol(setOf(hash), n0.info.identity))
val e = assertFailsWith<FetchDataProtocol.HashNotFound> { rootCauseExceptions { f1.get() } }
assertEquals(hash, e.requested)
@ -110,7 +110,7 @@ class AttachmentTests {
// Get n1 to fetch the attachment. Should receive corrupted bytes.
val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity))
val f1 = n1.services.startProtocol("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity))
assertFailsWith<FetchDataProtocol.DownloadedVsRequestedDataMismatch> {
rootCauseExceptions { f1.get() }
@ -56,14 +56,14 @@ class TwoPartyTradeProtocolTests {
otherSide: Party, assetToSell: StateAndRef<OwnableState>, price: Amount<Currency>,
myKeyPair: KeyPair, buyerSessionID: Long): ListenableFuture<SignedTransaction> {
val seller = TwoPartyTradeProtocol.Seller(otherSide, notary, assetToSell, price, myKeyPair, buyerSessionID)
return smm.add("${TwoPartyTradeProtocol.TOPIC}.seller", seller)
return smm.add("${TwoPartyTradeProtocol.TOPIC}.seller", seller).resultFuture
private fun runBuyer(smm: StateMachineManager, notaryNode: NodeInfo,
otherSide: Party, acceptablePrice: Amount<Currency>, typeToBuy: Class<out OwnableState>,
sessionID: Long): ListenableFuture<SignedTransaction> {
val buyer = TwoPartyTradeProtocol.Buyer(otherSide, notaryNode.identity, acceptablePrice, typeToBuy, sessionID)
return smm.add("${TwoPartyTradeProtocol.TOPIC}.buyer", buyer)
return smm.add("${TwoPartyTradeProtocol.TOPIC}.buyer", buyer).resultFuture
@ -114,7 +114,7 @@ class InMemoryNetworkMapServiceTest {
// Confirm all nodes have registered themselves
var fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false))
var fetchPsm = registerNode.services.startProtocol(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false))
assertEquals(2, fetchPsm.get()?.count())
@ -123,12 +123,12 @@ class InMemoryNetworkMapServiceTest {
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val seq = 2L
val reg = NodeRegistration(registerNode.info, seq, AddOrRemove.REMOVE, expires)
val registerPsm = registerNode.smm.add(NetworkMapService.REGISTER_PROTOCOL_TOPIC, TestRegisterPSM(mapServiceNode.info, reg, nodeKey.private))
val registerPsm = registerNode.services.startProtocol(NetworkMapService.REGISTER_PROTOCOL_TOPIC, TestRegisterPSM(mapServiceNode.info, reg, nodeKey.private))
// Now only map service node should be registered
fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false))
fetchPsm = registerNode.services.startProtocol(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false))
assertEquals(mapServiceNode.info, fetchPsm.get()?.filter { it.type == AddOrRemove.ADD }?.map { it.node }?.single())
@ -140,7 +140,7 @@ class InMemoryNetworkMapServiceTest {
// Test subscribing to updates
val subscribePsm = registerNode.smm.add(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC,
val subscribePsm = registerNode.services.startProtocol(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC,
TestSubscribePSM(mapServiceNode.info, true))
@ -161,7 +161,7 @@ class InMemoryNetworkMapServiceTest {
// Send in an acknowledgment and verify the count goes down
val hash = SecureHash.sha256(wireReg.raw.bits)
val acknowledgePsm = registerNode.smm.add(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC,
val acknowledgePsm = registerNode.services.startProtocol(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC,
TestAcknowledgePSM(mapServiceNode.info, hash))
@ -62,7 +62,7 @@ open class MockServiceHubInternal(
lateinit var smm: StateMachineManager
override fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
return smm.add(loggerName, logic)
return smm.add(loggerName, logic).resultFuture
init {
@ -111,7 +111,7 @@ class NodeInterestRatesTest {
val protocol = RatesFixProtocol(tx, n2.info.identity, fixOf, "0.675".bd, "0.1".bd)
val future = n1.smm.add("rates", protocol)
val future = n1.services.startProtocol("rates", protocol)
@ -49,7 +49,7 @@ class NotaryChangeTests {
val state = issueState(clientNodeA)
val newNotary = newNotaryNode.info.identity
val protocol = Instigator(state, newNotary)
val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC, protocol)
val future = clientNodeA.services.startProtocol(NotaryChangeProtocol.TOPIC, protocol)
@ -62,7 +62,7 @@ class NotaryChangeTests {
val state = issueMultiPartyState(clientNodeA, clientNodeB)
val newNotary = newNotaryNode.info.identity
val protocol = Instigator(state, newNotary)
val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC, protocol)
val future = clientNodeA.services.startProtocol(NotaryChangeProtocol.TOPIC, protocol)
@ -78,7 +78,7 @@ class NotaryChangeTests {
val state = issueMultiPartyState(clientNodeA, clientNodeB)
val newEvilNotary = Party("Evil Notary", generateKeyPair().public)
val protocol = Instigator(state, newEvilNotary)
val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC, protocol)
val future = clientNodeA.services.startProtocol(NotaryChangeProtocol.TOPIC, protocol)
@ -46,7 +46,7 @@ class NotaryServiceTests {
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol)
val signature = future.get()
@ -62,7 +62,7 @@ class NotaryServiceTests {
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol)
val signature = future.get()
@ -79,7 +79,7 @@ class NotaryServiceTests {
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol)
val ex = assertFailsWith(ExecutionException::class) { future.get() }
@ -98,8 +98,8 @@ class NotaryServiceTests {
val firstSpend = NotaryProtocol.Client(stx)
val secondSpend = NotaryProtocol.Client(stx)
clientNode.smm.add("${NotaryProtocol.TOPIC}.first", firstSpend)
val future = clientNode.smm.add("${NotaryProtocol.TOPIC}.second", secondSpend)
clientNode.services.startProtocol("${NotaryProtocol.TOPIC}.first", firstSpend)
val future = clientNode.services.startProtocol("${NotaryProtocol.TOPIC}.second", secondSpend)
@ -45,7 +45,7 @@ class ValidatingNotaryServiceTests {
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol)
val ex = assertFailsWith(ExecutionException::class) { future.get() }
@ -65,7 +65,7 @@ class ValidatingNotaryServiceTests {
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol)
val ex = assertFailsWith(ExecutionException::class) { future.get() }
@ -22,10 +22,7 @@ import org.junit.Before
import org.junit.Test
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
import kotlin.test.fail
import kotlin.test.*
* Unit tests for the wallet monitoring service.
@ -44,7 +41,7 @@ class WalletMonitorServiceTests {
private fun authenticate(monitorServiceNode: MockNetwork.MockNode, registerNode: MockNetwork.MockNode): Long {
val sessionID = random63BitValue()
val authenticatePsm = registerNode.smm.add(WalletMonitorService.REGISTER_TOPIC,
val authenticatePsm = registerNode.services.startProtocol(WalletMonitorService.REGISTER_TOPIC,
TestRegisterPSM(monitorServiceNode.info, sessionID))
authenticatePsm.get(1, TimeUnit.SECONDS)
@ -78,7 +75,7 @@ class WalletMonitorServiceTests {
val sessionID = random63BitValue()
val authenticatePsm = registerNode.smm.add(WalletMonitorService.REGISTER_TOPIC,
val authenticatePsm = registerNode.services.startProtocol(WalletMonitorService.REGISTER_TOPIC,
TestRegisterPSM(monitorServiceNode.info, sessionID))
val result = authenticatePsm.get(1, TimeUnit.SECONDS)
@ -92,7 +89,7 @@ class WalletMonitorServiceTests {
fun `event received`() {
val (monitorServiceNode, registerNode) = network.createTwoNodes()
val sessionID = authenticate(monitorServiceNode, registerNode)
var receivePsm = registerNode.smm.add(WalletMonitorService.IN_EVENT_TOPIC,
var receivePsm = registerNode.services.startProtocol(WalletMonitorService.IN_EVENT_TOPIC,
var expected = Wallet.Update(emptySet(), emptySet())
@ -102,7 +99,7 @@ class WalletMonitorServiceTests {
assertEquals(expected.produced, actual.produced)
// Check that states are passed through correctly
receivePsm = registerNode.smm.add(WalletMonitorService.IN_EVENT_TOPIC,
receivePsm = registerNode.services.startProtocol(WalletMonitorService.IN_EVENT_TOPIC,
val consumed = setOf(StateRef(SecureHash.randomSHA256(), 0))
val producedState = TransactionState(DummyContract.SingleOwnerState(newSecureRandom().nextInt(), DUMMY_PUBKEY_1), DUMMY_NOTARY)
@ -131,29 +128,30 @@ class WalletMonitorServiceTests {
// Tell the monitoring service node to issue some cash
val recipientKey = monitorServiceNode.services.storageService.myLegalIdentityKey.public
val outEvent = ClientToServiceCommand.IssueCash(GBP, ref, quantity, recipientKey, DUMMY_NOTARY)
val recipient = monitorServiceNode.services.storageService.myLegalIdentity
val outEvent = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), ref, recipient, DUMMY_NOTARY)
val message = registerNode.net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, outEvent).serialize().bits)
registerNode.net.send(message, monitorServiceNode.net.myAddress)
val expectedState = Cash.State(Amount(quantity,
Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)),
// Check we've received a response
events.forEach { event ->
when (event) {
is ServiceToClientEvent.TransactionBuild -> {
// Check the returned event is correct
val actual = event.state as TransactionBuildResult.Complete
val expected = TransactionBuildResult.Complete(actual.transaction, null)
assertEquals(expected, actual)
val tx = (event.state as TransactionBuildResult.ProtocolStarted).transaction
assertEquals(expectedState, tx!!.tx.outputs.single().data)
is ServiceToClientEvent.OutputState -> {
// Check the generated state is correct
val actual = event.produced.single().state.data
val expected = Cash.State(Amount(quantity,
Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)),
assertEquals(expected, actual)
assertEquals(expectedState, actual)
else -> fail("Unexpected in event ${event}")
@ -178,32 +176,33 @@ class WalletMonitorServiceTests {
// Tell the monitoring service node to issue some cash
val recipientKey = monitorServiceNode.services.storageService.myLegalIdentityKey.public
val outEvent = ClientToServiceCommand.IssueCash(GBP, ref, quantity, recipientKey, DUMMY_NOTARY)
val recipient = monitorServiceNode.services.storageService.myLegalIdentity
val outEvent = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), ref, recipient, DUMMY_NOTARY)
val message = registerNode.net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, outEvent).serialize().bits)
registerNode.net.send(message, monitorServiceNode.net.myAddress)
val expectedState = Cash.State(Amount(quantity,
Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)),
// Check we've received a response
events.forEach { event ->
when (event) {
is ServiceToClientEvent.TransactionBuild -> {
// Check the returned event is correct
val actual = event.state as TransactionBuildResult.Complete
val expected = TransactionBuildResult.Complete(actual.transaction, null)
assertEquals(expected, actual)
val tx = (event.state as TransactionBuildResult.ProtocolStarted).transaction
assertEquals(expectedState, tx!!.tx.outputs.single().data)
is ServiceToClientEvent.OutputState -> {
// Check the generated state is correct
val actual = event.produced.single().state.data
val expected = Cash.State(Amount(quantity,
Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)),
assertEquals(expected, actual)
assertEquals(expectedState, actual)
else -> fail("Unexpected in event ${event}")
@ -41,12 +41,9 @@ class DataVendingServiceTests {
val tx = ptx.toSignedTransaction()
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size)
val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
walletServiceNode.info, tx)
// Check it was accepted
notifyPsm.get(1, TimeUnit.SECONDS)
// Check the transaction is in the receiving node
val actual = walletServiceNode.services.walletService.currentWallet.states.singleOrNull()
@ -73,14 +70,9 @@ class DataVendingServiceTests {
val tx = ptx.toSignedTransaction(false)
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size)
val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
walletServiceNode.info, tx)
// Check it was not accepted
assertFailsWith<DataVending.Service.TransactionRejectedError> {
rootCauseExceptions { notifyPsm.get() }
// Check the transaction is not in the receiving node
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size)
@ -87,7 +87,7 @@ fun main(args: Array<String>) {
val tx = TransactionType.General.Builder(notaryNode.identity)
tx.addOutputState(TransactionState(Cash.State(1500.DOLLARS `issued by` node.storage.myLegalIdentity.ref(1), node.keyManagement.freshKey().public), notaryNode.identity))
val protocol = RatesFixProtocol(tx, rateOracle.identity, fixOf, expectedRate, rateTolerance)
node.smm.add("demo.ratefix", protocol).get()
node.services.startProtocol("demo.ratefix", protocol).get()
// Show the user the output.
@ -182,7 +182,7 @@ private fun runSeller(node: Node, amount: Amount<Currency>, otherSide: Party) {
tradeTX = node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).single().second
} else {
val seller = TraderDemoProtocolSeller(otherSide, amount)
tradeTX = node.smm.add("demo.seller", seller)
tradeTX = node.services.startProtocol("demo.seller", seller)
tradeTX.success {
@ -217,7 +217,7 @@ private fun runBuyer(node: Node, amount: Amount<Currency>) {
// We use a simple scenario-specific wrapper protocol to make things happen.
val otherSide = message.data.deserialize<Party>()
val buyer = TraderDemoProtocolBuyer(otherSide, attachmentsPath, amount)
node.smm.add("demo.buyer", buyer)
node.services.startProtocol("demo.buyer", buyer)
Reference in New Issue
Block a user