Add CommitTransactionProtocol

Add new protocol which manages the entire process of taking a signed transaction ready
for notarisation, through notarisation and onto recording it both locally and informing
remote nodes.

This protocol also optionally can include the ClientToServiceCommand which triggered a transaction
being created, to give the remote nodes context on why a change occurred (i.e. "You are being sent
£100")
This commit is contained in:
Ross Nicoll 2016-08-16 22:53:59 +01:00
parent a5344f9578
commit ad8ffca0b4
26 changed files with 304 additions and 185 deletions

View File

@ -0,0 +1,49 @@
package com.r3corda.core.contracts
import com.r3corda.core.crypto.Party
import com.r3corda.core.serialization.OpaqueBytes
import java.security.PublicKey
import java.util.*
/**
* A command from the monitoring client, to the node.
*
* @param id ID used to tag event(s) resulting from a command.
*/
sealed class ClientToServiceCommand(val id: UUID) {
/**
* Issue cash state objects.
*
* @param amount the amount of currency to issue on to the ledger.
* @param issueRef the reference to specify on the issuance, used to differentiate pools of cash. Convention is
* to use the single byte "0x01" as a default.
* @param recipient the party to issue the cash to.
* @param notary the notary to use for this transaction.
* @param id the ID to be provided in events resulting from this request.
*/
class IssueCash(val amount: Amount<Currency>,
val issueRef: OpaqueBytes,
val recipient: Party,
val notary: Party,
id: UUID = UUID.randomUUID()) : ClientToServiceCommand(id)
/**
* Pay cash to someone else.
*
* @param amount the amount of currency to issue on to the ledger.
* @param recipient the party to issue the cash to.
* @param id the ID to be provided in events resulting from this request.
*/
class PayCash(val amount: Amount<Issued<Currency>>, val recipient: Party,
id: UUID = UUID.randomUUID()) : ClientToServiceCommand(id)
/**
* Exit cash from the ledger.
*
* @param amount the amount of currency to exit from the ledger.
* @param issueRef the reference previously specified on the issuance.
* @param id the ID to be provided in events resulting from this request.
*/
class ExitCash(val amount: Amount<Currency>, val issueRef: OpaqueBytes,
id: UUID = UUID.randomUUID()) : ClientToServiceCommand(id)
}

View File

@ -1,6 +1,7 @@
package com.r3corda.core.protocols
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.crypto.Party
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.utilities.UntrustworthyData
@ -23,4 +24,9 @@ interface ProtocolStateMachine<R> {
val serviceHub: ServiceHub
val logger: Logger
/** Unique ID for this machine, valid only while it is in memory. */
val machineId: Long
/** This future will complete when the call method returns. */
val resultFuture: ListenableFuture<R>
}

View File

@ -0,0 +1,56 @@
package com.r3corda.protocols
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.serialize
import java.util.*
/**
* Notify all involved parties about a transaction, including storing a copy. Normally this would be called via
* [FinalityProtocol].
*
* @param notarisedTransaction transaction which has been notarised (if needed) and is ready to notify nodes about.
* @param events information on the event(s) which triggered the transaction.
* @param participants a list of participants involved in the transaction.
* @return a list of participants who were successfully notified of the transaction.
*/
// TODO: Event needs to be replaced with something that's meaningful, but won't ever contain sensitive
// information (such as internal details of an account to take payment from). Suggest
// splitting ClientToServiceCommand into public and private parts, with only the public parts
// relayed here.
class BroadcastTransactionProtocol(val notarisedTransaction: SignedTransaction,
val events: Set<ClientToServiceCommand>,
val participants: Set<Party>) : ProtocolLogic<Unit>() {
companion object {
/** Topic for messages notifying a node of a new transaction */
val TOPIC = "platform.wallet.notify_tx"
}
override val topic: String = TOPIC
data class NotifyTxRequestMessage(
val tx: SignedTransaction,
val events: Set<ClientToServiceCommand>,
override val replyToParty: Party,
override val sessionID: Long
) : PartyRequestMessage
@Suspendable
override fun call() {
// Record it locally
serviceHub.recordTransactions(notarisedTransaction)
// TODO: Messaging layer should handle this broadcast for us (although we need to not be sending
// session ID, for that to work, as well).
participants.filter { it != serviceHub.storageService.myLegalIdentity }.forEach { participant ->
val sessionID = random63BitValue()
val msg = NotifyTxRequestMessage(notarisedTransaction, events, serviceHub.storageService.myLegalIdentity, sessionID)
send(participant, 0, msg)
}
}
}

View File

@ -0,0 +1,63 @@
package com.r3corda.protocols
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.contracts.TransactionBuilder
import com.r3corda.core.contracts.WireTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.ProgressTracker
import java.util.*
/**
* Finalise a transaction by notarising it, then recording it locally, and then sending it to all involved parties.
*
* @param transaction to commit.
* @param events information on the event(s) which triggered the transaction.
* @param participants a list of participants involved in the transaction.
* @return a list of participants who were successfully notified of the transaction.
*/
// TODO: Event needs to be replaced with something that's meaningful, but won't ever contain sensitive
// information (such as internal details of an account to take payment from). Suggest
// splitting ClientToServiceCommand into public and private parts, with only the public parts
// relayed here.
class FinalityProtocol(val transaction: SignedTransaction,
val events: Set<ClientToServiceCommand>,
val participants: Set<Party>,
override val progressTracker: ProgressTracker = tracker()): ProtocolLogic<Unit>() {
companion object {
object NOTARISING : ProgressTracker.Step("Requesting signature by notary service")
object BROADCASTING : ProgressTracker.Step("Broadcasting transaction to participants")
fun tracker() = ProgressTracker(NOTARISING, BROADCASTING)
}
override val topic: String
get() = throw UnsupportedOperationException()
@Suspendable
override fun call() {
progressTracker.currentStep = NOTARISING
// Notarise the transaction if needed
val notarisedTransaction = if (needsNotarySignature(transaction)) {
val notarySig = subProtocol(NotaryProtocol.Client(transaction))
transaction.withAdditionalSignature(notarySig)
} else {
transaction
}
// Let everyone else know about the transaction
progressTracker.currentStep = BROADCASTING
subProtocol(BroadcastTransactionProtocol(notarisedTransaction, events, participants))
}
private fun needsNotarySignature(transaction: SignedTransaction) = expectsNotarySignature(transaction.tx) && hasNoNotarySignature(transaction)
private fun expectsNotarySignature(transaction: WireTransaction) = transaction.notary != null && transaction.notary.owningKey in transaction.signers
private fun hasNoNotarySignature(transaction: SignedTransaction) = transaction.tx.notary?.owningKey !in transaction.sigs.map { it.by }
}

View File

@ -98,9 +98,26 @@ To run one of these services the node has to simply specify either ``SimpleNotar
Obtaining a signature
---------------------
To obtain a signature from a notary use ``NotaryProtocol.Client``, passing in a ``WireTransaction``.
The protocol will work out which notary needs to be called based on the input states and the timestamp command.
For example, the following snippet can be used when writing a custom protocol:
Once a transaction is built and ready to be finalised, normally you would call ``FinalityProtocol`` passing in a
``SignedTransaction`` (including signatures from the participants) and a list of participants to notify. This requests a
notary signature if needed, and then sends a copy of the notarised transaction to all participants for them to store.
``FinalityProtocol`` delegates to ``NotaryProtocol.Client`` followed by ``BroadcastTransactionProtocol`` to do the
actual work of notarising and broadcasting the transaction. For example:
.. sourcecode:: kotlin
fun finaliseTransaction(serviceHub: ServiceHubInternal, ptx: TransactionBuilder, participants: Set<Party>)
: ListenableFuture<Unit> {
// We conclusively cannot have all the signatures, as the notary has not signed yet
val tx = ptx.toSignedTransaction(checkSufficientSignatures = false)
// The empty set would be the trigger events, which are not used here
val protocol = FinalityProtocol(tx, emptySet(), participants)
return serviceHub.startProtocol("protocol.finalisation", protocol)
}
To manually obtain a signature from a notary you can call ``NotaryProtocol.Client`` directly. The protocol will work out
which notary needs to be called based on the input states and the timestamp command. For example, the following snippet
can be used when writing a custom protocol:
.. sourcecode:: kotlin

View File

@ -106,7 +106,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
override val protocolLogicRefFactory: ProtocolLogicRefFactory get() = protocolLogicFactory
override fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
return smm.add(loggerName, logic)
return smm.add(loggerName, logic).resultFuture
}
override fun recordTransactions(txs: Iterable<SignedTransaction>) =

View File

@ -130,9 +130,9 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
showProgressFor(listOf(node1, node2))
showConsensusFor(listOf(node1, node2, regulators[0]))
val instigatorFuture: ListenableFuture<SignedTransaction> = node1.smm.add("instigator", instigator)
val instigatorFuture: ListenableFuture<SignedTransaction> = node1.services.startProtocol("instigator", instigator)
return Futures.transformAsync(Futures.allAsList(instigatorFuture, node2.smm.add("acceptor", acceptor))) {
return Futures.transformAsync(Futures.allAsList(instigatorFuture, node2.services.startProtocol("acceptor", acceptor))) {
instigatorFuture
}
}

View File

@ -6,6 +6,7 @@ import com.r3corda.contracts.CommercialPaper
import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER
import com.r3corda.contracts.testing.fillWithSomeTestCash
import com.r3corda.core.contracts.DOLLARS
import com.r3corda.core.contracts.OwnableState
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.contracts.`issued by`
import com.r3corda.core.days
@ -52,7 +53,7 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
val sellerProtocol = TwoPartyTradeProtocol.Seller(
buyer.info.identity,
notary.info,
issuance.tx.outRef(0),
issuance.tx.outRef<OwnableState>(0),
amount,
seller.storage.myLegalIdentityKey,
sessionID)
@ -60,8 +61,8 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
showConsensusFor(listOf(buyer, seller, notary))
showProgressFor(listOf(buyer, seller))
val buyerFuture = buyer.smm.add("bank.$buyerBankIndex.${TwoPartyTradeProtocol.TOPIC}.buyer", buyerProtocol)
val sellerFuture = seller.smm.add("bank.$sellerBankIndex.${TwoPartyTradeProtocol.TOPIC}.seller", sellerProtocol)
val buyerFuture = buyer.services.startProtocol("bank.$buyerBankIndex.${TwoPartyTradeProtocol.TOPIC}.buyer", buyerProtocol)
val sellerFuture = seller.services.startProtocol("bank.$sellerBankIndex.${TwoPartyTradeProtocol.TOPIC}.seller", sellerProtocol)
return Futures.successfulAsList(buyerFuture, sellerFuture)
}

View File

@ -47,41 +47,3 @@ sealed class TransactionBuildResult {
*/
class Failed(val message: String?) : TransactionBuildResult()
}
/**
* A command from the monitoring client, to the node.
*
* @param id ID used to tag event(s) resulting from a command.
*/
sealed class ClientToServiceCommand(val id: UUID) {
// TODO: Replace with a generic event for starting a protocol which then passes back required information, rather
// than using an event for every conceivable action.
/**
* Issue cash state objects.
*
* @param currency the currency to issue.
* @param issueRef the reference to specify on the issuance, used to differentiate pools of cash. Convention is
* to use the single byte "0x01" as a default.
* @param pennies the amount to issue, in the smallest unit of the currency.
* @param recipient the public key of the recipient.
* @param notary the notary to use for this transaction.
* @param id the ID to be provided in events resulting from this request.
*/
class IssueCash(val currency: Currency,
val issueRef: OpaqueBytes,
val pennies: Long,
val recipient: PublicKey,
val notary: Party,
id: UUID = UUID.randomUUID())
: ClientToServiceCommand(id)
class PayCash(val tokenDef: Issued<Currency>, val pennies: Long, val owner: PublicKey,
id: UUID = UUID.randomUUID())
: ClientToServiceCommand(id)
/**
* @param id the ID to be provided in events resulting from this request.
*/
class ExitCash(val currency: Currency, val issueRef: OpaqueBytes, val pennies: Long,
id: UUID = UUID.randomUUID())
: ClientToServiceCommand(id)
}

View File

@ -1,5 +1,6 @@
package com.r3corda.node.services.monitor
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.protocols.DirectRequestMessage

View File

@ -1,28 +1,26 @@
package com.r3corda.node.services.monitor
import co.paralleluniverse.common.util.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.contracts.asset.Cash
import com.r3corda.contracts.asset.InsufficientBalanceException
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.toStringShort
import com.r3corda.core.messaging.Message
import com.r3corda.core.messaging.MessageRecipients
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.Wallet
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.persistence.DataVending
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.utilities.AddOrRemove
import org.slf4j.LoggerFactory
import com.r3corda.protocols.BroadcastTransactionProtocol
import com.r3corda.protocols.FinalityProtocol
import java.security.KeyPair
import java.security.PublicKey
import java.time.Instant
import java.util.*
import javax.annotation.concurrent.ThreadSafe
@ -154,36 +152,22 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager,
monitor.recipients)
}
/**
* Notifies the node associated with the [recipient] public key. Returns a future holding a Boolean of whether the
* node accepted the transaction or not.
*/
private fun notifyRecipientAboutTransaction(
recipient: PublicKey,
transaction: SignedTransaction
): ListenableFuture<Unit> {
val recipientNodeInfo = services.networkMapCache.getNodeByPublicKey(recipient) ?: throw PublicKeyLookupFailed(recipient)
return DataVending.Service.notify(net, services.storageService.myLegalIdentity,
recipientNodeInfo, transaction)
}
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
private fun initatePayment(req: ClientToServiceCommand.PayCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(null)
// TODO: Have some way of restricting this to states the caller controls
try {
Cash().generateSpend(builder, Amount(req.pennies, req.tokenDef.product), req.owner,
Cash().generateSpend(builder, req.amount.withoutIssuer(), req.recipient.owningKey,
// TODO: Move cash state filtering by issuer down to the contract itself
services.walletService.currentWallet.statesOfType<Cash.State>().filter { it.state.data.amount.token == req.tokenDef },
setOf(req.tokenDef.issuer.party))
services.walletService.currentWallet.statesOfType<Cash.State>().filter { it.state.data.amount.token == req.amount.token },
setOf(req.amount.token.issuer.party))
.forEach {
val key = services.keyManagementService.keys[it] ?: throw IllegalStateException("Could not find signing key for ${it.toStringShort()}")
builder.signWith(KeyPair(it, key))
}
val tx = builder.toSignedTransaction()
services.walletService.notify(tx.tx)
notifyRecipientAboutTransaction(req.owner, tx)
return TransactionBuildResult.Complete(tx, "Cash payment completed")
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
val protocol = FinalityProtocol(tx, setOf(req), setOf(req.recipient))
return TransactionBuildResult.ProtocolStarted(smm.add(BroadcastTransactionProtocol.TOPIC, protocol).machineId, tx, "Cash payment transaction generated")
} catch(ex: InsufficientBalanceException) {
return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance")
}
@ -193,39 +177,40 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager,
private fun exitCash(req: ClientToServiceCommand.ExitCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(null)
val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef)
Cash().generateExit(builder, Amount(req.pennies, Issued(issuer, req.currency)),
Cash().generateExit(builder, req.amount.issuedBy(issuer),
services.walletService.currentWallet.statesOfType<Cash.State>().filter { it.state.data.owner == issuer.party.owningKey })
builder.signWith(services.storageService.myLegalIdentityKey)
val tx = builder.toSignedTransaction()
services.walletService.notify(tx.tx)
// Notify the owners
val inputStatesNullable = services.walletService.statesForRefs(tx.tx.inputs)
// Work out who the owners of the burnt states were
val inputStatesNullable = services.walletService.statesForRefs(builder.inputStates())
val inputStates = inputStatesNullable.values.filterNotNull().map { it.data }
if (inputStatesNullable.size != inputStates.size) {
val unresolvedStateRefs = inputStatesNullable.filter { it.value == null }.map { it.key }
throw InputStateRefResolveFailed(unresolvedStateRefs)
}
inputStates.filterIsInstance<Cash.State>().map { it.owner }.toSet().forEach {
notifyRecipientAboutTransaction(it, tx)
}
return TransactionBuildResult.Complete(tx, "Cash destruction completed")
// TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them
// count as a reason to fail?
val participants: Set<Party> = inputStates.filterIsInstance<Cash.State>().map { services.identityService.partyFromKey(it.owner) }.filterNotNull().toSet()
// Commit the transaction
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
val protocol = FinalityProtocol(tx, setOf(req), participants)
return TransactionBuildResult.ProtocolStarted(smm.add(BroadcastTransactionProtocol.TOPIC, protocol).machineId, tx, "Cash destruction transaction generated")
}
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
private fun issueCash(req: ClientToServiceCommand.IssueCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(notary = req.notary)
val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef)
Cash().generateIssue(builder, Amount(req.pennies, Issued(issuer, req.currency)), req.recipient, req.notary)
Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary)
builder.signWith(services.storageService.myLegalIdentityKey)
val tx = builder.toSignedTransaction()
services.walletService.notify(tx.tx)
notifyRecipientAboutTransaction(req.recipient, tx)
return TransactionBuildResult.Complete(tx, "Cash issuance completed")
val tx = builder.toSignedTransaction(checkSufficientSignatures = true)
// Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it
val protocol = BroadcastTransactionProtocol(tx, setOf(req), setOf(req.recipient))
return TransactionBuildResult.ProtocolStarted(smm.add(BroadcastTransactionProtocol.TOPIC, protocol).machineId, tx, "Cash issuance completed")
}
class PublicKeyLookupFailed(failedPublicKey: PublicKey) :
Exception("Failed to lookup public keys $failedPublicKey")
class InputStateRefResolveFailed(stateRefs: List<StateRef>) :
Exception("Failed to resolve input StateRefs $stateRefs")
}

View File

@ -47,34 +47,21 @@ object DataVending {
companion object {
val logger = loggerFor<DataVending.Service>()
/** Topic for messages notifying a node of a new transaction */
val NOTIFY_TX_PROTOCOL_TOPIC = "platform.wallet.notify_tx"
/**
* Notify a node of a transaction. Normally any notarisation required would happen before this is called.
*/
fun notify(net: MessagingService,
myIdentity: Party,
recipient: NodeInfo,
transaction: SignedTransaction): ListenableFuture<Unit> {
val future = SettableFuture.create<Unit>()
transaction: SignedTransaction) {
val sessionID = random63BitValue()
net.runOnNextMessage(NOTIFY_TX_PROTOCOL_TOPIC, sessionID) { msg ->
// TODO: Can we improve/simplify the response from the remote node?
val data = msg.data.deserialize<NotifyTxResponseMessage>()
if (data.accepted) {
future.set(Unit)
} else {
future.setException(TransactionRejectedError("Transaction $transaction rejected by remote party ${recipient.identity}"))
}
}
val msg = NotifyTxRequestMessage(transaction, myIdentity, sessionID)
net.send(net.createMessage(TopicSession(NOTIFY_TX_PROTOCOL_TOPIC, 0), msg.serialize().bits), recipient.address)
return future
val msg = BroadcastTransactionProtocol.NotifyTxRequestMessage(transaction, emptySet(), myIdentity, sessionID)
net.send(net.createMessage(TopicSession(BroadcastTransactionProtocol.TOPIC, 0), msg.serialize().bits), recipient.address)
}
}
val storage = services.storageService
data class NotifyTxRequestMessage(val tx: SignedTransaction, override val replyToParty: Party, override val sessionID: Long) : PartyRequestMessage
data class NotifyTxResponseMessage(val accepted: Boolean)
class TransactionRejectedError(msg: String) : Exception(msg)
init {
@ -86,29 +73,24 @@ object DataVending {
{ req: FetchDataProtocol.Request -> handleAttachmentRequest(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
addMessageHandler(NOTIFY_TX_PROTOCOL_TOPIC,
{ req: NotifyTxRequestMessage -> handleTXNotification(req) },
addMessageHandler(BroadcastTransactionProtocol.TOPIC,
{ req: BroadcastTransactionProtocol.NotifyTxRequestMessage -> handleTXNotification(req) },
{ message, e -> logger.error("Failure processing data vending request.", e) }
)
}
private fun handleTXNotification(req: NotifyTxRequestMessage): Unit {
private fun handleTXNotification(req: BroadcastTransactionProtocol.NotifyTxRequestMessage): Unit {
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
services.startProtocol(NOTIFY_TX_PROTOCOL_TOPIC, ResolveTransactionsProtocol(req.tx, req.replyToParty))
services.startProtocol("Resolving transactions", ResolveTransactionsProtocol(req.tx, req.replyToParty))
.success {
services.recordTransactions(req.tx)
val resp = NotifyTxResponseMessage(true)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
}.failure { throwable ->
val resp = NotifyTxResponseMessage(false)
val msg = net.createMessage(NOTIFY_TX_PROTOCOL_TOPIC, req.sessionID, resp.serialize().bits)
net.send(msg, req.getReplyTo(services.networkMapCache))
logger.warn("Received invalid transaction ${req.tx.id} from ${req.replyToParty}", throwable)
}
}
@ -136,4 +118,4 @@ object DataVending {
}
}
}
}
}

View File

@ -44,13 +44,18 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
@Transient private var _resultFuture: SettableFuture<R>? = SettableFuture.create<R>()
/** This future will complete when the call method returns. */
val resultFuture: ListenableFuture<R> get() {
override val resultFuture: ListenableFuture<R> get() {
return _resultFuture ?: run {
val f = SettableFuture.create<R>()
_resultFuture = f
return f
}
}
/**
* Unique ID for the deserialized instance protocol state machine. This is NOT maintained across a state machine
* being serialized and then deserialized.
*/
override val machineId: Long get() = this.id
init {
logic.psm = this

View File

@ -12,6 +12,7 @@ import com.r3corda.core.messaging.TopicSession
import com.r3corda.core.messaging.runOnNextMessage
import com.r3corda.core.messaging.send
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolStateMachine
import com.r3corda.core.serialization.*
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.core.utilities.trace
@ -215,7 +216,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
* The state machine will be persisted when it suspends, with automated restart if the StateMachineManager is
* restarted with checkpointed state machines in the storage service.
*/
fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ProtocolStateMachine<T> {
val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName)
// Need to add before iterating in case of immediate completion
initFiber(fiber) {
@ -244,7 +245,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
if (e.cause !is ExecutionException)
throw e
}
return fiber.resultFuture
return fiber
}
private fun updateCheckpoint(psm: ProtocolStateMachineImpl<*>,

View File

@ -55,7 +55,7 @@ class AttachmentTests {
// Get node one to run a protocol to fetch it and insert it.
network.runNetwork()
val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity))
val f1 = n1.services.startProtocol("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity))
network.runNetwork()
assertEquals(0, f1.get().fromDisk.size)
@ -66,7 +66,7 @@ class AttachmentTests {
// Shut down node zero and ensure node one can still resolve the attachment.
n0.stop()
val response: FetchDataProtocol.Result<Attachment> = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)).get()
val response: FetchDataProtocol.Result<Attachment> = n1.services.startProtocol("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)).get()
assertEquals(attachment, response.fromDisk[0])
}
@ -77,7 +77,7 @@ class AttachmentTests {
// Get node one to fetch a non-existent attachment.
val hash = SecureHash.randomSHA256()
network.runNetwork()
val f1 = n1.smm.add("tests.fetch2", FetchAttachmentsProtocol(setOf(hash), n0.info.identity))
val f1 = n1.services.startProtocol("tests.fetch2", FetchAttachmentsProtocol(setOf(hash), n0.info.identity))
network.runNetwork()
val e = assertFailsWith<FetchDataProtocol.HashNotFound> { rootCauseExceptions { f1.get() } }
assertEquals(hash, e.requested)
@ -110,7 +110,7 @@ class AttachmentTests {
// Get n1 to fetch the attachment. Should receive corrupted bytes.
network.runNetwork()
val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity))
val f1 = n1.services.startProtocol("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity))
network.runNetwork()
assertFailsWith<FetchDataProtocol.DownloadedVsRequestedDataMismatch> {
rootCauseExceptions { f1.get() }

View File

@ -56,14 +56,14 @@ class TwoPartyTradeProtocolTests {
otherSide: Party, assetToSell: StateAndRef<OwnableState>, price: Amount<Currency>,
myKeyPair: KeyPair, buyerSessionID: Long): ListenableFuture<SignedTransaction> {
val seller = TwoPartyTradeProtocol.Seller(otherSide, notary, assetToSell, price, myKeyPair, buyerSessionID)
return smm.add("${TwoPartyTradeProtocol.TOPIC}.seller", seller)
return smm.add("${TwoPartyTradeProtocol.TOPIC}.seller", seller).resultFuture
}
private fun runBuyer(smm: StateMachineManager, notaryNode: NodeInfo,
otherSide: Party, acceptablePrice: Amount<Currency>, typeToBuy: Class<out OwnableState>,
sessionID: Long): ListenableFuture<SignedTransaction> {
val buyer = TwoPartyTradeProtocol.Buyer(otherSide, notaryNode.identity, acceptablePrice, typeToBuy, sessionID)
return smm.add("${TwoPartyTradeProtocol.TOPIC}.buyer", buyer)
return smm.add("${TwoPartyTradeProtocol.TOPIC}.buyer", buyer).resultFuture
}
@Before

View File

@ -114,7 +114,7 @@ class InMemoryNetworkMapServiceTest {
// Confirm all nodes have registered themselves
network.runNetwork()
var fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false))
var fetchPsm = registerNode.services.startProtocol(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false))
network.runNetwork()
assertEquals(2, fetchPsm.get()?.count())
@ -123,12 +123,12 @@ class InMemoryNetworkMapServiceTest {
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val seq = 2L
val reg = NodeRegistration(registerNode.info, seq, AddOrRemove.REMOVE, expires)
val registerPsm = registerNode.smm.add(NetworkMapService.REGISTER_PROTOCOL_TOPIC, TestRegisterPSM(mapServiceNode.info, reg, nodeKey.private))
val registerPsm = registerNode.services.startProtocol(NetworkMapService.REGISTER_PROTOCOL_TOPIC, TestRegisterPSM(mapServiceNode.info, reg, nodeKey.private))
network.runNetwork()
assertTrue(registerPsm.get().success)
// Now only map service node should be registered
fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false))
fetchPsm = registerNode.services.startProtocol(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false))
network.runNetwork()
assertEquals(mapServiceNode.info, fetchPsm.get()?.filter { it.type == AddOrRemove.ADD }?.map { it.node }?.single())
}
@ -140,7 +140,7 @@ class InMemoryNetworkMapServiceTest {
// Test subscribing to updates
network.runNetwork()
val subscribePsm = registerNode.smm.add(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC,
val subscribePsm = registerNode.services.startProtocol(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC,
TestSubscribePSM(mapServiceNode.info, true))
network.runNetwork()
subscribePsm.get()
@ -161,7 +161,7 @@ class InMemoryNetworkMapServiceTest {
// Send in an acknowledgment and verify the count goes down
val hash = SecureHash.sha256(wireReg.raw.bits)
val acknowledgePsm = registerNode.smm.add(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC,
val acknowledgePsm = registerNode.services.startProtocol(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC,
TestAcknowledgePSM(mapServiceNode.info, hash))
network.runNetwork()
acknowledgePsm.get()

View File

@ -62,7 +62,7 @@ open class MockServiceHubInternal(
lateinit var smm: StateMachineManager
override fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
return smm.add(loggerName, logic)
return smm.add(loggerName, logic).resultFuture
}
init {

View File

@ -111,7 +111,7 @@ class NodeInterestRatesTest {
val protocol = RatesFixProtocol(tx, n2.info.identity, fixOf, "0.675".bd, "0.1".bd)
LogHelper.setLevel("rates")
net.runNetwork()
val future = n1.smm.add("rates", protocol)
val future = n1.services.startProtocol("rates", protocol)
net.runNetwork()
future.get()

View File

@ -49,7 +49,7 @@ class NotaryChangeTests {
val state = issueState(clientNodeA)
val newNotary = newNotaryNode.info.identity
val protocol = Instigator(state, newNotary)
val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC, protocol)
val future = clientNodeA.services.startProtocol(NotaryChangeProtocol.TOPIC, protocol)
net.runNetwork()
@ -62,7 +62,7 @@ class NotaryChangeTests {
val state = issueMultiPartyState(clientNodeA, clientNodeB)
val newNotary = newNotaryNode.info.identity
val protocol = Instigator(state, newNotary)
val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC, protocol)
val future = clientNodeA.services.startProtocol(NotaryChangeProtocol.TOPIC, protocol)
net.runNetwork()
@ -78,7 +78,7 @@ class NotaryChangeTests {
val state = issueMultiPartyState(clientNodeA, clientNodeB)
val newEvilNotary = Party("Evil Notary", generateKeyPair().public)
val protocol = Instigator(state, newEvilNotary)
val future = clientNodeA.smm.add(NotaryChangeProtocol.TOPIC, protocol)
val future = clientNodeA.services.startProtocol(NotaryChangeProtocol.TOPIC, protocol)
net.runNetwork()

View File

@ -46,7 +46,7 @@ class NotaryServiceTests {
}
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol)
net.runNetwork()
val signature = future.get()
@ -62,7 +62,7 @@ class NotaryServiceTests {
}
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol)
net.runNetwork()
val signature = future.get()
@ -79,7 +79,7 @@ class NotaryServiceTests {
}
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol)
net.runNetwork()
val ex = assertFailsWith(ExecutionException::class) { future.get() }
@ -98,8 +98,8 @@ class NotaryServiceTests {
val firstSpend = NotaryProtocol.Client(stx)
val secondSpend = NotaryProtocol.Client(stx)
clientNode.smm.add("${NotaryProtocol.TOPIC}.first", firstSpend)
val future = clientNode.smm.add("${NotaryProtocol.TOPIC}.second", secondSpend)
clientNode.services.startProtocol("${NotaryProtocol.TOPIC}.first", firstSpend)
val future = clientNode.services.startProtocol("${NotaryProtocol.TOPIC}.second", secondSpend)
net.runNetwork()

View File

@ -45,7 +45,7 @@ class ValidatingNotaryServiceTests {
}
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol)
net.runNetwork()
val ex = assertFailsWith(ExecutionException::class) { future.get() }
@ -65,7 +65,7 @@ class ValidatingNotaryServiceTests {
}
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
val future = clientNode.services.startProtocol(NotaryProtocol.TOPIC, protocol)
net.runNetwork()
val ex = assertFailsWith(ExecutionException::class) { future.get() }

View File

@ -22,10 +22,7 @@ import org.junit.Before
import org.junit.Test
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
import kotlin.test.fail
import kotlin.test.*
/**
* Unit tests for the wallet monitoring service.
@ -44,7 +41,7 @@ class WalletMonitorServiceTests {
private fun authenticate(monitorServiceNode: MockNetwork.MockNode, registerNode: MockNetwork.MockNode): Long {
network.runNetwork()
val sessionID = random63BitValue()
val authenticatePsm = registerNode.smm.add(WalletMonitorService.REGISTER_TOPIC,
val authenticatePsm = registerNode.services.startProtocol(WalletMonitorService.REGISTER_TOPIC,
TestRegisterPSM(monitorServiceNode.info, sessionID))
network.runNetwork()
authenticatePsm.get(1, TimeUnit.SECONDS)
@ -78,7 +75,7 @@ class WalletMonitorServiceTests {
network.runNetwork()
val sessionID = random63BitValue()
val authenticatePsm = registerNode.smm.add(WalletMonitorService.REGISTER_TOPIC,
val authenticatePsm = registerNode.services.startProtocol(WalletMonitorService.REGISTER_TOPIC,
TestRegisterPSM(monitorServiceNode.info, sessionID))
network.runNetwork()
val result = authenticatePsm.get(1, TimeUnit.SECONDS)
@ -92,7 +89,7 @@ class WalletMonitorServiceTests {
fun `event received`() {
val (monitorServiceNode, registerNode) = network.createTwoNodes()
val sessionID = authenticate(monitorServiceNode, registerNode)
var receivePsm = registerNode.smm.add(WalletMonitorService.IN_EVENT_TOPIC,
var receivePsm = registerNode.services.startProtocol(WalletMonitorService.IN_EVENT_TOPIC,
TestReceiveWalletUpdatePSM(sessionID))
var expected = Wallet.Update(emptySet(), emptySet())
monitorServiceNode.inNodeWalletMonitorService!!.notifyWalletUpdate(expected)
@ -102,7 +99,7 @@ class WalletMonitorServiceTests {
assertEquals(expected.produced, actual.produced)
// Check that states are passed through correctly
receivePsm = registerNode.smm.add(WalletMonitorService.IN_EVENT_TOPIC,
receivePsm = registerNode.services.startProtocol(WalletMonitorService.IN_EVENT_TOPIC,
TestReceiveWalletUpdatePSM(sessionID))
val consumed = setOf(StateRef(SecureHash.randomSHA256(), 0))
val producedState = TransactionState(DummyContract.SingleOwnerState(newSecureRandom().nextInt(), DUMMY_PUBKEY_1), DUMMY_NOTARY)
@ -131,29 +128,30 @@ class WalletMonitorServiceTests {
assertFalse(monitorServiceNode.services.walletService.currentWallet.states.iterator().hasNext())
// Tell the monitoring service node to issue some cash
val recipientKey = monitorServiceNode.services.storageService.myLegalIdentityKey.public
val outEvent = ClientToServiceCommand.IssueCash(GBP, ref, quantity, recipientKey, DUMMY_NOTARY)
val recipient = monitorServiceNode.services.storageService.myLegalIdentity
val outEvent = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), ref, recipient, DUMMY_NOTARY)
val message = registerNode.net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, outEvent).serialize().bits)
registerNode.net.send(message, monitorServiceNode.net.myAddress)
network.runNetwork()
val expectedState = Cash.State(Amount(quantity,
Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)),
recipient.owningKey)
// Check we've received a response
events.forEach { event ->
when (event) {
is ServiceToClientEvent.TransactionBuild -> {
// Check the returned event is correct
val actual = event.state as TransactionBuildResult.Complete
val expected = TransactionBuildResult.Complete(actual.transaction, null)
assertEquals(expected, actual)
val tx = (event.state as TransactionBuildResult.ProtocolStarted).transaction
assertNotNull(tx)
assertEquals(expectedState, tx!!.tx.outputs.single().data)
}
is ServiceToClientEvent.OutputState -> {
// Check the generated state is correct
val actual = event.produced.single().state.data
val expected = Cash.State(Amount(quantity,
Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)),
recipientKey)
assertEquals(expected, actual)
assertEquals(expectedState, actual)
}
else -> fail("Unexpected in event ${event}")
}
@ -178,32 +176,33 @@ class WalletMonitorServiceTests {
assertFalse(monitorServiceNode.services.walletService.currentWallet.states.iterator().hasNext())
// Tell the monitoring service node to issue some cash
val recipientKey = monitorServiceNode.services.storageService.myLegalIdentityKey.public
val outEvent = ClientToServiceCommand.IssueCash(GBP, ref, quantity, recipientKey, DUMMY_NOTARY)
val recipient = monitorServiceNode.services.storageService.myLegalIdentity
val outEvent = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), ref, recipient, DUMMY_NOTARY)
val message = registerNode.net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, outEvent).serialize().bits)
registerNode.net.send(message, monitorServiceNode.net.myAddress)
network.runNetwork()
val expectedState = Cash.State(Amount(quantity,
Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)),
recipient.owningKey)
// Check we've received a response
events.forEach { event ->
when (event) {
is ServiceToClientEvent.TransactionBuild -> {
// Check the returned event is correct
val actual = event.state as TransactionBuildResult.Complete
val expected = TransactionBuildResult.Complete(actual.transaction, null)
assertEquals(expected, actual)
val tx = (event.state as TransactionBuildResult.ProtocolStarted).transaction
assertNotNull(tx)
assertEquals(expectedState, tx!!.tx.outputs.single().data)
}
is ServiceToClientEvent.OutputState -> {
// Check the generated state is correct
val actual = event.produced.single().state.data
val expected = Cash.State(Amount(quantity,
Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)),
recipientKey)
assertEquals(expected, actual)
assertEquals(expectedState, actual)
}
else -> fail("Unexpected in event ${event}")
}
}
}
}
}

View File

@ -41,12 +41,9 @@ class DataVendingServiceTests {
ptx.signWith(registerNode.services.storageService.myLegalIdentityKey)
val tx = ptx.toSignedTransaction()
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size)
val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
walletServiceNode.info, tx)
// Check it was accepted
network.runNetwork()
notifyPsm.get(1, TimeUnit.SECONDS)
// Check the transaction is in the receiving node
val actual = walletServiceNode.services.walletService.currentWallet.states.singleOrNull()
@ -73,14 +70,9 @@ class DataVendingServiceTests {
ptx.signWith(registerNode.services.storageService.myLegalIdentityKey)
val tx = ptx.toSignedTransaction(false)
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size)
val notifyPsm = DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
DataVending.Service.notify(registerNode.net, registerNode.services.storageService.myLegalIdentity,
walletServiceNode.info, tx)
// Check it was not accepted
network.runNetwork()
assertFailsWith<DataVending.Service.TransactionRejectedError> {
rootCauseExceptions { notifyPsm.get() }
}
// Check the transaction is not in the receiving node
assertEquals(0, walletServiceNode.services.walletService.currentWallet.states.toList().size)

View File

@ -87,7 +87,7 @@ fun main(args: Array<String>) {
val tx = TransactionType.General.Builder(notaryNode.identity)
tx.addOutputState(TransactionState(Cash.State(1500.DOLLARS `issued by` node.storage.myLegalIdentity.ref(1), node.keyManagement.freshKey().public), notaryNode.identity))
val protocol = RatesFixProtocol(tx, rateOracle.identity, fixOf, expectedRate, rateTolerance)
node.smm.add("demo.ratefix", protocol).get()
node.services.startProtocol("demo.ratefix", protocol).get()
node.stop()
// Show the user the output.

View File

@ -182,7 +182,7 @@ private fun runSeller(node: Node, amount: Amount<Currency>, otherSide: Party) {
tradeTX = node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).single().second
} else {
val seller = TraderDemoProtocolSeller(otherSide, amount)
tradeTX = node.smm.add("demo.seller", seller)
tradeTX = node.services.startProtocol("demo.seller", seller)
}
tradeTX.success {
@ -217,7 +217,7 @@ private fun runBuyer(node: Node, amount: Amount<Currency>) {
// We use a simple scenario-specific wrapper protocol to make things happen.
val otherSide = message.data.deserialize<Party>()
val buyer = TraderDemoProtocolBuyer(otherSide, attachmentsPath, amount)
node.smm.add("demo.buyer", buyer)
node.services.startProtocol("demo.buyer", buyer)
}
}