@ -168,7 +168,7 @@ public class JavaCommercialPaper implements Contract {
if (!inputs.isEmpty()) {
throw new IllegalStateException("Failed Requirement: there is no input state");
if (output.faceValue.getPennies() == 0) {
if (output.faceValue.getQuantity() == 0) {
throw new IllegalStateException("Failed Requirement: the face value is not zero");
@ -117,7 +117,7 @@ class CommercialPaper : Contract {
// Don't allow people to issue commercial paper under other entities identities.
"the issuance is signed by the claimed issuer of the paper" by
(output.issuance.party.owningKey in command.signers)
"the face value is not zero" by (output.faceValue.pennies > 0)
"the face value is not zero" by (output.faceValue.quantity > 0)
"the maturity date is not in the past" by (time < output.maturityDate)
// Don't allow an existing CP state to be replaced by this issuance.
// TODO: Consider how to handle the case of mistaken issuances, or other need to patch.
@ -88,7 +88,7 @@ class CrowdFund : Contract {
"there is no input state" by tx.inStates.filterIsInstance<State>().isEmpty()
"the transaction is signed by the owner of the crowdsourcing" by (command.signers.contains(outputCrowdFund.campaign.owner))
"the output registration is empty of pledges" by (outputCrowdFund.pledges.isEmpty())
"the output registration has a non-zero target" by (outputCrowdFund.campaign.target.pennies > 0)
"the output registration has a non-zero target" by (outputCrowdFund.campaign.target.quantity > 0)
"the output registration has a name" by (outputCrowdFund.campaign.name.isNotBlank())
"the output registration has a closing time in the future" by (time < outputCrowdFund.campaign.closingTime)
"the output registration has an open state" by (!outputCrowdFund.closed)
@ -112,7 +112,7 @@ class FixedRatePaymentEvent(date: LocalDate,
override val flow: Amount<Currency> get() =
Amount<Currency>(dayCountFactor.times(BigDecimal(notional.pennies)).times(rate.ratioUnit!!.value).toLong(), notional.token)
Amount<Currency>(dayCountFactor.times(BigDecimal(notional.quantity)).times(rate.ratioUnit!!.value).toLong(), notional.token)
override fun toString(): String =
"FixedRatePaymentEvent $accrualStartDate -> $accrualEndDate : $dayCountFactor : $days : $date : $notional : $rate : $flow"
@ -138,7 +138,7 @@ class FloatingRatePaymentEvent(date: LocalDate,
override val flow: Amount<Currency> get() {
// TODO: Should an uncalculated amount return a zero ? null ? etc.
val v = rate.ratioUnit?.value ?: return Amount<Currency>(0, notional.token)
return Amount<Currency>(dayCountFactor.times(BigDecimal(notional.pennies)).times(v).toLong(), notional.token)
return Amount<Currency>(dayCountFactor.times(BigDecimal(notional.quantity)).times(v).toLong(), notional.token)
override fun toString(): String = "FloatingPaymentEvent $accrualStartDate -> $accrualEndDate : $dayCountFactor : $days : $date : $notional : $rate (fix on $fixingDate): $flow"
@ -456,7 +456,7 @@ class InterestRateSwap() : Contract {
fun checkLegAmounts(legs: Array<CommonLeg>) {
requireThat {
"The notional is non zero" by legs.any { it.notional.pennies > (0).toLong() }
"The notional is non zero" by legs.any { it.notional.quantity > (0).toLong() }
"The notional for all legs must be the same" by legs.all { it.notional == legs[0].notional }
for (leg: CommonLeg in legs) {
@ -505,7 +505,7 @@ class InterestRateSwap() : Contract {
"There are no in states for an agreement" by inputs.isEmpty()
"There are events in the fix schedule" by (irs.calculation.fixedLegPaymentSchedule.size > 0)
"There are events in the float schedule" by (irs.calculation.floatingLegPaymentSchedule.size > 0)
"All notionals must be non zero" by (irs.fixedLeg.notional.pennies > 0 && irs.floatingLeg.notional.pennies > 0)
"All notionals must be non zero" by (irs.fixedLeg.notional.quantity > 0 && irs.floatingLeg.notional.quantity > 0)
"The fixed leg rate must be positive" by (irs.fixedLeg.fixedRate.isPositive())
"The currency of the notionals must be the same" by (irs.fixedLeg.notional.token == irs.floatingLeg.notional.token)
"All leg notionals must be the same" by (irs.fixedLeg.notional == irs.floatingLeg.notional)
@ -95,7 +95,7 @@ class ReferenceRate(val oracle: String, val tenor: Tenor, val name: String) : Fl
// TODO: For further discussion.
operator fun Amount<Currency>.times(other: RatioUnit): Amount<Currency> = Amount<Currency>((BigDecimal(this.pennies).multiply(other.value)).longValueExact(), this.token)
operator fun Amount<Currency>.times(other: RatioUnit): Amount<Currency> = Amount<Currency>((BigDecimal(this.quantity).multiply(other.value)).longValueExact(), this.token)
//operator fun Amount<Currency>.times(other: FixedRate): Amount<Currency> = Amount<Currency>((BigDecimal(this.pennies).multiply(other.value)).longValueExact(), this.currency)
//fun Amount<Currency>.times(other: InterestRateSwap.RatioUnit): Amount<Currency> = Amount<Currency>((BigDecimal(this.pennies).multiply(other.value)).longValueExact(), this.currency)
@ -8,8 +8,8 @@ import java.util.*
* Subset of cash-like contract state, containing the issuance definition. If these definitions match for two
* contracts' states, those states can be aggregated.
interface CashIssuanceDefinition : IssuanceDefinition {
/** Where the underlying currency backing this ledger entry can be found (propagated) */
interface AssetIssuanceDefinition<T> : IssuanceDefinition {
/** Where the underlying asset backing this ledger entry can be found (propagated) */
val deposit: PartyAndReference
val currency: Currency
val token: T
@ -18,8 +18,6 @@ import java.util.*
val CASH_PROGRAM_ID = Cash()
class InsufficientBalanceException(val amountMissing: Amount<Currency>) : Exception()
* A cash transaction may split and merge money represented by a set of (issuer, depositRef) pairs, across multiple
* input and output states. Imagine a Bitcoin transaction but in which all UTXOs had a colour
@ -33,7 +31,7 @@ class InsufficientBalanceException(val amountMissing: Amount<Currency>) : Except
* At the same time, other contracts that just want money and don't care much who is currently holding it in their
* vaults can ignore the issuer/depositRefs and just examine the amount fields.
class Cash : Contract {
class Cash : FungibleAsset<Currency>() {
* 1) hash should be of the contents, not the URI
@ -46,12 +44,12 @@ class Cash : Contract {
override val legalContractReference: SecureHash = SecureHash.sha256("https://www.big-book-of-banking-law.gov/cash-claims.html")
data class IssuanceDefinition(
data class IssuanceDefinition<T>(
/** Where the underlying currency backing this ledger entry can be found (propagated) */
override val deposit: PartyAndReference,
override val currency: Currency
) : CashIssuanceDefinition
override val token: T
) : AssetIssuanceDefinition<T>
/** A state representing a cash claim against some party */
data class State(
@ -64,9 +62,9 @@ class Cash : Contract {
override val owner: PublicKey,
override val notary: Party
) : CommonCashState<Cash.IssuanceDefinition> {
override val issuanceDef: Cash.IssuanceDefinition
get() = Cash.IssuanceDefinition(deposit, amount.token)
) : FungibleAsset.State<Currency> {
override val issuanceDef: IssuanceDefinition<Currency>
get() = IssuanceDefinition(deposit, amount.token)
override val contract = CASH_PROGRAM_ID
override fun toString() = "${Emoji.bagOfCash}Cash($amount at $deposit owned by ${owner.toStringShort()})"
@ -76,93 +74,26 @@ class Cash : Contract {
// Just for grouping
interface Commands : CommandData {
class Move() : TypeOnlyCommandData(), Commands
class Move() : TypeOnlyCommandData(), FungibleAsset.Commands.Move
* Allows new cash states to be issued into existence: the nonce ("number used once") ensures the transaction
* has a unique ID even when there are no inputs.
data class Issue(val nonce: Long = SecureRandom.getInstanceStrong().nextLong()) : Commands
data class Issue(override val nonce: Long = SecureRandom.getInstanceStrong().nextLong()) : FungibleAsset.Commands.Issue
* A command stating that money has been withdrawn from the shared ledger and is now accounted for
* in some other way.
data class Exit(val amount: Amount<Currency>) : Commands
/** This is the function EVERYONE runs */
override fun verify(tx: TransactionForVerification) {
// Each group is a set of input/output states with distinct (deposit, currency) attributes. These types
// of cash are not fungible and must be kept separated for bookkeeping purposes.
val groups = tx.groupStates() { it: Cash.State -> it.issuanceDef }
for ((inputs, outputs, key) in groups) {
// Either inputs or outputs could be empty.
val deposit = key.deposit
val currency = key.currency
val issuer = deposit.party
requireThat {
"there are no zero sized outputs" by outputs.none { it.amount.pennies == 0L }
val issueCommand = tx.commands.select<Commands.Issue>().firstOrNull()
if (issueCommand != null) {
verifyIssueCommand(inputs, outputs, tx, issueCommand, currency, issuer)
} else {
val inputAmount = inputs.sumCashOrNull() ?: throw IllegalArgumentException("there is at least one cash input for this group")
val outputAmount = outputs.sumCashOrZero(currency)
// If we want to remove cash from the ledger, that must be signed for by the issuer.
// A mis-signed or duplicated exit command will just be ignored here and result in the exit amount being zero.
val exitCommand = tx.commands.select<Commands.Exit>(party = issuer).singleOrNull()
val amountExitingLedger = exitCommand?.value?.amount ?: Amount(0, currency)
requireThat {
"there are no zero sized inputs" by inputs.none { it.amount.pennies == 0L }
"for deposit ${deposit.reference} at issuer ${deposit.party.name} the amounts balance" by
(inputAmount == outputAmount + amountExitingLedger)
verifyMoveCommands<Commands.Move>(inputs, tx)
private fun verifyIssueCommand(inputs: List<State>,
outputs: List<State>,
tx: TransactionForVerification,
issueCommand: AuthenticatedObject<Commands.Issue>,
currency: Currency,
issuer: Party) {
// If we have an issue command, perform special processing: the group is allowed to have no inputs,
// and the output states must have a deposit reference owned by the signer.
// Whilst the transaction *may* have no inputs, it can have them, and in this case the outputs must
// sum to more than the inputs. An issuance of zero size is not allowed.
// Note that this means literally anyone with access to the network can issue cash claims of arbitrary
// amounts! It is up to the recipient to decide if the backing party is trustworthy or not, via some
// as-yet-unwritten identity service. See ADP-22 for discussion.
// The grouping ensures that all outputs have the same deposit reference and currency.
val inputAmount = inputs.sumCashOrZero(currency)
val outputAmount = outputs.sumCash()
val cashCommands = tx.commands.select<Cash.Commands>()
requireThat {
"the issue command has a nonce" by (issueCommand.value.nonce != 0L)
"output deposits are owned by a command signer" by (issuer in issueCommand.signingParties)
"output values sum to more than the inputs" by (outputAmount > inputAmount)
"there is only a single issue command" by (cashCommands.count() == 1)
data class Exit(override val amount: Amount<Currency>) : Commands, FungibleAsset.Commands.Exit<Currency>
* Puts together an issuance transaction from the given template, that starts out being owned by the given pubkey.
fun generateIssue(tx: TransactionBuilder, issuanceDef: CashIssuanceDefinition, pennies: Long, owner: PublicKey, notary: Party)
= generateIssue(tx, Amount(pennies, issuanceDef.currency), issuanceDef.deposit, owner, notary)
fun generateIssue(tx: TransactionBuilder, issuanceDef: AssetIssuanceDefinition<Currency>, pennies: Long, owner: PublicKey, notary: Party)
= generateIssue(tx, Amount(pennies, issuanceDef.token), issuanceDef.deposit, owner, notary)
* Puts together an issuance transaction for the specified amount that starts out being owned by the given pubkey.
@ -234,7 +165,7 @@ class Cash : Contract {
State(deposit, totalAmount, to, coins.first().state.notary)
val outputs = if (change.pennies > 0) {
val outputs = if (change.quantity > 0) {
// Just copy a key across as the change key. In real life of course, this works but leaks private data.
// In bitcoinj we derive a fresh key here and then shuffle the outputs to ensure it's hard to follow
// value flows through the transaction graph.
@ -0,0 +1,148 @@
package com.r3corda.contracts.cash
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.crypto.toStringShort
import com.r3corda.core.utilities.Emoji
import java.security.PublicKey
import java.security.SecureRandom
import java.util.*
// Cash-like
class InsufficientBalanceException(val amountMissing: Amount<*>) : Exception()
* Superclass for contracts representing assets which are fungible, countable and issued by a specific party. States
* contain assets which are equivalent (such as cash of the same currency), so records of their existence can
* be merged or split as needed where the issuer is the same. For instance, dollars issued by the Fed are fungible and
* countable (in cents), barrels of West Texas crude are fungible and countable (oil from two small containers
* can be poured into one large container), shares of the same class in a specific company are fungible and
* countable, and so on.
* See [Cash] for an example subclass that implements currency.
* @param T a type that represents the asset in question. This should describe the basic type of the asset
* (GBP, USD, oil, shares in company <X>, etc.) and any additional metadata (issuer, grade, class, etc.)
abstract class FungibleAsset<T> : Contract {
/** A state representing a claim against some party */
interface State<T> : FungibleAssetState<T, AssetIssuanceDefinition<T>> {
/** Where the underlying asset backing this ledger entry can be found (propagated) */
override val deposit: PartyAndReference
override val amount: Amount<T>
/** There must be a MoveCommand signed by this key to claim the amount */
override val owner: PublicKey
override val notary: Party
// Just for grouping
interface Commands : CommandData {
interface Move : Commands
* Allows new asset states to be issued into existence: the nonce ("number used once") ensures the transaction
* has a unique ID even when there are no inputs.
interface Issue : Commands { val nonce: Long }
* A command stating that money has been withdrawn from the shared ledger and is now accounted for
* in some other way.
interface Exit<T> : Commands { val amount: Amount<T> }
/** This is the function EVERYONE runs */
override fun verify(tx: TransactionForVerification) {
// Each group is a set of input/output states with distinct issuance definitions. These assets are not fungible
// and must be kept separated for bookkeeping purposes.
val groups = tx.groupStates() { it: FungibleAsset.State<T> -> it.issuanceDef }
for ((inputs, outputs, key) in groups) {
// Either inputs or outputs could be empty.
val deposit = key.deposit
val token = key.token
val issuer = deposit.party
requireThat {
"there are no zero sized outputs" by outputs.none { it.amount.quantity == 0L }
val issueCommand = tx.commands.select<Commands.Issue>().firstOrNull()
if (issueCommand != null) {
verifyIssueCommand(inputs, outputs, tx, issueCommand, token, issuer)
} else {
val inputAmount = inputs.sumFungibleOrNull<T>() ?: throw IllegalArgumentException("there is at least one asset input for this group")
val outputAmount = outputs.sumFungibleOrZero(token)
// If we want to remove assets from the ledger, that must be signed for by the issuer.
// A mis-signed or duplicated exit command will just be ignored here and result in the exit amount being zero.
val exitCommand = tx.commands.select<Commands.Exit<T>>(party = issuer).singleOrNull()
val amountExitingLedger = exitCommand?.value?.amount ?: Amount(0, token)
requireThat {
"there are no zero sized inputs" by inputs.none { it.amount.quantity == 0L }
"for deposit ${deposit.reference} at issuer ${deposit.party.name} the amounts balance" by
(inputAmount == outputAmount + amountExitingLedger)
verifyMoveCommands<Commands.Move>(inputs, tx)
private fun verifyIssueCommand(inputs: List<State<T>>,
outputs: List<State<T>>,
tx: TransactionForVerification,
issueCommand: AuthenticatedObject<Commands.Issue>,
token: T,
issuer: Party) {
// If we have an issue command, perform special processing: the group is allowed to have no inputs,
// and the output states must have a deposit reference owned by the signer.
// Whilst the transaction *may* have no inputs, it can have them, and in this case the outputs must
// sum to more than the inputs. An issuance of zero size is not allowed.
// Note that this means literally anyone with access to the network can issue asset claims of arbitrary
// amounts! It is up to the recipient to decide if the backing party is trustworthy or not, via some
// external mechanism (such as locally defined rules on which parties are trustworthy).
// The grouping ensures that all outputs have the same deposit reference and token.
val inputAmount = inputs.sumFungibleOrZero(token)
val outputAmount = outputs.sumFungible<T>()
val assetCommands = tx.commands.select<FungibleAsset.Commands>()
requireThat {
"the issue command has a nonce" by (issueCommand.value.nonce != 0L)
"output deposits are owned by a command signer" by (issuer in issueCommand.signingParties)
"output values sum to more than the inputs" by (outputAmount > inputAmount)
"there is only a single issue command" by (assetCommands.count() == 1)
// Small DSL extensions.
* Sums the asset states in the list belonging to a single owner, throwing an exception
* if there are none, or if any of the asset states cannot be added together (i.e. are
* different tokens).
fun <T> Iterable<ContractState>.sumFungibleBy(owner: PublicKey) = filterIsInstance<FungibleAsset.State<T>>().filter { it.owner == owner }.map { it.amount }.sumOrThrow()
* Sums the asset states in the list, throwing an exception if there are none, or if any of the asset
* states cannot be added together (i.e. are different tokens).
fun <T> Iterable<ContractState>.sumFungible() = filterIsInstance<FungibleAsset.State<T>>().map { it.amount }.sumOrThrow()
/** Sums the asset states in the list, returning null if there are none. */
fun <T> Iterable<ContractState>.sumFungibleOrNull() = filterIsInstance<FungibleAsset.State<T>>().map { it.amount }.sumOrNull()
/** Sums the asset states in the list, returning zero of the given token if there are none. */
fun <T> Iterable<ContractState>.sumFungibleOrZero(token: T) = filterIsInstance<FungibleAsset.State<T>>().map { it.amount }.sumOrZero(token)
@ -8,9 +8,9 @@ import java.util.Currency
* Common elements of cash contract states.
interface CommonCashState<I : CashIssuanceDefinition> : OwnableState {
interface FungibleAssetState<T, I : AssetIssuanceDefinition<T>> : OwnableState {
val issuanceDef: I
/** Where the underlying currency backing this ledger entry can be found (propagated) */
val deposit: PartyAndReference
val amount: Amount<Currency>
val amount: Amount<T>
@ -19,7 +19,7 @@ data class Zero(val dummy: Int = 0) : Kontract
// should be replaced with something that uses Corda assets and/or cash
data class Transfer(val amount: Observable<Long>, val currency: Currency, val from: Party, val to: Party) : Kontract {
constructor(amount: Amount<Currency>, from: Party, to: Party ) : this(const(amount.pennies), amount.token, from, to)
constructor(amount: Amount<Currency>, from: Party, to: Party ) : this(const(amount.quantity), amount.token, from, to)
data class And(val kontracts: Set<Kontract>) : Kontract
@ -97,7 +97,7 @@ object TwoPartyTradeProtocol {
private fun getNotarySignature(stx: SignedTransaction): DigitalSignature.LegallyIdentifiable {
progressTracker.currentStep = NOTARY
return subProtocol(NotaryProtocol(stx.tx))
return subProtocol(NotaryProtocol.Client(stx.tx))
@ -340,18 +340,18 @@ class IRSTests {
fun `expression calculation testing`() {
val dummyIRS = singleIRS()
val stuffToPrint: ArrayList<String> = arrayListOf(
"fixedLeg.notional.pennies * 10",
"fixedLeg.notional.pennies * fixedLeg.fixedRate.ratioUnit.value",
"fixedLeg.notional.quantity * 10",
"fixedLeg.notional.quantity * fixedLeg.fixedRate.ratioUnit.value",
"(fixedLeg.notional.token.currencyCode.equals('GBP')) ? 365 : 360 ",
"(fixedLeg.notional.pennies * (fixedLeg.fixedRate.ratioUnit.value))"
"(fixedLeg.notional.quantity * (fixedLeg.fixedRate.ratioUnit.value))"
// "calculation.floatingLegPaymentSchedule.get(context.getDate('currentDate')).rate"
// "calculation.floatingLegPaymentSchedule.get(context.getDate('currentDate')).rate.ratioUnit.value",
//"( fixedLeg.notional.pennies * (fixedLeg.fixedRate.ratioUnit.value)) - (floatingLeg.notional.pennies * (calculation.fixingSchedule.get(context.getDate('currentDate')).rate.ratioUnit.value))",
@ -450,7 +450,7 @@ class IRSTests {
val irs = singleIRS()
transaction {
output() {
irs.copy(irs.fixedLeg.copy(notional = irs.fixedLeg.notional.copy(pennies = 0)))
irs.copy(irs.fixedLeg.copy(notional = irs.fixedLeg.notional.copy(quantity = 0)))
arg(MEGA_CORP_PUBKEY) { InterestRateSwap.Commands.Agree() }
@ -459,7 +459,7 @@ class IRSTests {
transaction {
output() {
irs.copy(irs.fixedLeg.copy(notional = irs.floatingLeg.notional.copy(pennies = 0)))
irs.copy(irs.fixedLeg.copy(notional = irs.floatingLeg.notional.copy(quantity = 0)))
arg(MEGA_CORP_PUBKEY) { InterestRateSwap.Commands.Agree() }
@ -487,7 +487,7 @@ class IRSTests {
fun `ensure same currency notionals`() {
val irs = singleIRS()
val modifiedIRS = irs.copy(fixedLeg = irs.fixedLeg.copy(notional = Amount(irs.fixedLeg.notional.pennies, Currency.getInstance("JPY"))))
val modifiedIRS = irs.copy(fixedLeg = irs.fixedLeg.copy(notional = Amount(irs.fixedLeg.notional.quantity, Currency.getInstance("JPY"))))
transaction {
output() {
@ -501,7 +501,7 @@ class IRSTests {
fun `ensure notional amounts are equal`() {
val irs = singleIRS()
val modifiedIRS = irs.copy(fixedLeg = irs.fixedLeg.copy(notional = Amount(irs.floatingLeg.notional.pennies + 1, irs.floatingLeg.notional.token)))
val modifiedIRS = irs.copy(fixedLeg = irs.fixedLeg.copy(notional = Amount(irs.floatingLeg.notional.quantity + 1, irs.floatingLeg.notional.token)))
transaction {
output() {
@ -619,7 +619,7 @@ class IRSTests {
val firstResetKey = newIRS.calculation.floatingLegPaymentSchedule.keys.first()
val firstResetValue = newIRS.calculation.floatingLegPaymentSchedule[firstResetKey]
var modifiedFirstResetValue = firstResetValue!!.copy(notional = Amount(firstResetValue.notional.pennies, Currency.getInstance("JPY")))
var modifiedFirstResetValue = firstResetValue!!.copy(notional = Amount(firstResetValue.notional.quantity, Currency.getInstance("JPY")))
output() {
@ -640,7 +640,7 @@ class IRSTests {
arg(ORACLE_PUBKEY) { Fix(FixOf("ICE LIBOR", ld, Tenor("3M")), bd) }
val latestReset = newIRS.calculation.floatingLegPaymentSchedule.filter { it.value.rate is FixedRate }.maxBy { it.key }
var modifiedLatestResetValue = latestReset!!.value.copy(notional = Amount(latestReset.value.notional.pennies, Currency.getInstance("JPY")))
var modifiedLatestResetValue = latestReset!!.value.copy(notional = Amount(latestReset.value.notional.quantity, Currency.getInstance("JPY")))
output() {
@ -41,7 +41,7 @@ class CashTests {
tweak {
output { outState }
// No command arguments
this `fails requirement` "required com.r3corda.contracts.cash.Cash.Commands.Move command"
this `fails requirement` "required com.r3corda.contracts.cash.FungibleAsset.Commands.Move command"
tweak {
output { outState }
@ -52,7 +52,7 @@ class CashTests {
output { outState }
output { outState `issued by` MINI_CORP }
arg(DUMMY_PUBKEY_1) { Cash.Commands.Move() }
this `fails requirement` "at least one cash input"
this `fails requirement` "at least one asset input"
// Simple reallocation works.
tweak {
@ -71,7 +71,7 @@ class CashTests {
output { outState }
arg(MINI_CORP_PUBKEY) { Cash.Commands.Move() }
this `fails requirement` "there is at least one cash input"
this `fails requirement` "there is at least one asset input"
// Check we can issue money only as long as the issuer institution is a command signer, i.e. any recognised
@ -112,7 +112,7 @@ class CashTests {
// Test issuance from the issuance definition
val issuanceDef = Cash.IssuanceDefinition(MINI_CORP.ref(12, 34), USD)
val templatePtx = TransactionBuilder()
Cash().generateIssue(templatePtx, issuanceDef, 100.DOLLARS.pennies, owner = DUMMY_PUBKEY_1, notary = DUMMY_NOTARY)
Cash().generateIssue(templatePtx, issuanceDef, 100.DOLLARS.quantity, owner = DUMMY_PUBKEY_1, notary = DUMMY_NOTARY)
assertEquals(ptx.outputStates()[0], templatePtx.outputStates()[0])
@ -297,7 +297,7 @@ class CashTests {
tweak {
arg(MEGA_CORP_PUBKEY) { Cash.Commands.Exit(200.DOLLARS) }
this `fails requirement` "required com.r3corda.contracts.cash.Cash.Commands.Move command"
this `fails requirement` "required com.r3corda.contracts.cash.FungibleAsset.Commands.Move command"
tweak {
arg(DUMMY_PUBKEY_1) { Cash.Commands.Move() }
@ -30,6 +30,8 @@ val Double.bd: BigDecimal get() = BigDecimal(this)
val String.bd: BigDecimal get() = BigDecimal(this)
val Long.bd: BigDecimal get() = BigDecimal(this)
fun String.abbreviate(maxWidth: Int): String = if (length <= maxWidth) this else take(maxWidth - 1) + "…"
* Returns a random positive long generated using a secure RNG. This function sacrifies a bit of entropy in order to
* avoid potential bugs where the value is used in a context where negative numbers are not expected.
@ -33,40 +33,40 @@ import java.util.*
* @param T the type of the token, for example [Currency].
data class Amount<T>(val pennies: Long, val token: T) : Comparable<Amount<T>> {
data class Amount<T>(val quantity: Long, val token: T) : Comparable<Amount<T>> {
init {
// Negative amounts are of course a vital part of any ledger, but negative values are only valid in certain
// contexts: you cannot send a negative amount of cash, but you can (sometimes) have a negative balance.
// If you want to express a negative amount, for now, use a long.
require(pennies >= 0) { "Negative amounts are not allowed: $pennies" }
require(quantity >= 0) { "Negative amounts are not allowed: $quantity" }
constructor(amount: BigDecimal, currency: T) : this(amount.toLong(), currency)
operator fun plus(other: Amount<T>): Amount<T> {
return Amount(Math.addExact(pennies, other.pennies), token)
return Amount(Math.addExact(quantity, other.quantity), token)
operator fun minus(other: Amount<T>): Amount<T> {
return Amount(Math.subtractExact(pennies, other.pennies), token)
return Amount(Math.subtractExact(quantity, other.quantity), token)
private fun checkCurrency(other: Amount<T>) {
require(other.token == token) { "Currency mismatch: ${other.token} vs $token" }
operator fun div(other: Long): Amount<T> = Amount(pennies / other, token)
operator fun times(other: Long): Amount<T> = Amount(Math.multiplyExact(pennies, other), token)
operator fun div(other: Int): Amount<T> = Amount(pennies / other, token)
operator fun times(other: Int): Amount<T> = Amount(Math.multiplyExact(pennies, other.toLong()), token)
operator fun div(other: Long): Amount<T> = Amount(quantity / other, token)
operator fun times(other: Long): Amount<T> = Amount(Math.multiplyExact(quantity, other), token)
operator fun div(other: Int): Amount<T> = Amount(quantity / other, token)
operator fun times(other: Int): Amount<T> = Amount(Math.multiplyExact(quantity, other.toLong()), token)
override fun toString(): String = (BigDecimal(pennies).divide(BigDecimal(100))).setScale(2).toPlainString()
override fun toString(): String = (BigDecimal(quantity).divide(BigDecimal(100))).setScale(2).toPlainString()
override fun compareTo(other: Amount<T>): Int {
return pennies.compareTo(other.pennies)
return quantity.compareTo(other.quantity)
@ -4,6 +4,7 @@ import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.serialization.serialize
import java.time.Instant
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.concurrent.ThreadSafe
@ -68,14 +69,17 @@ interface MessagingService {
* take the registration object, unlike the callback to [MessagingService.addMessageHandler].
fun MessagingService.runOnNextMessage(topic: String = "", executor: Executor? = null, callback: (Message) -> Unit) {
val consumed = AtomicBoolean()
addMessageHandler(topic, executor) { msg, reg ->
check(!consumed.getAndSet(true)) { "Called more than once" }
check(msg.topic == topic) { "Topic mismatch: ${msg.topic} vs $topic" }
fun MessagingService.send(topic: String, obj: Any, to: MessageRecipients) {
send(createMessage(topic, obj.serialize().bits), to)
fun MessagingService.send(topic: String, payload: Any, to: MessageRecipients) {
send(createMessage(topic, payload.serialize().bits), to)
@ -12,9 +12,8 @@ import java.time.Clock
* mocked out. This class is useful to pass to chunks of pluggable code that might have need of many different kinds of
* functionality and you don't want to hard-code which types in the interface.
* All services exposed to protocols (public view) need to implement [SerializeAsToken] or similar to avoid being serialized in checkpoints.
* TODO: Split into a public (to contracts etc) and private (to node) view
* Any services exposed to protocols (public view) need to implement [SerializeAsToken] or similar to avoid their internal
* state from being serialized in checkpoints.
interface ServiceHub {
val walletService: WalletService
@ -1,11 +1,3 @@
@ -23,11 +15,13 @@ abstract class ServiceType(val id: String) {
override operator fun equals(other: Any?): Boolean =
if (other is ServiceType) {
id == other.id
} else {
if (other is ServiceType) {
id == other.id
} else {
fun isSubTypeOf(superType: ServiceType) = (id == superType.id) || id.startsWith(superType.id + ".")
override fun hashCode(): Int = id.hashCode()
override fun toString(): String = id.toString()
@ -1,4 +1,4 @@
package com.r3corda.node.services.transactions
package com.r3corda.core.node.services
import com.r3corda.core.contracts.TimestampCommand
import com.r3corda.core.seconds
@ -9,7 +9,7 @@ import java.time.Duration
* Checks if the given timestamp falls within the allowed tolerance interval
class TimestampChecker(val clock: Clock = Clock.systemDefaultZone(),
class TimestampChecker(val clock: Clock = Clock.systemUTC(),
val tolerance: Duration = 30.seconds) {
fun isValid(timestampCommand: TimestampCommand): Boolean {
val before = timestampCommand.before
@ -10,6 +10,7 @@ import com.r3corda.core.node.services.AttachmentStorage
import com.r3corda.core.node.services.IdentityService
import com.r3corda.core.node.services.KeyManagementService
import com.r3corda.core.node.services.StorageService
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.RecordingMap
import org.slf4j.LoggerFactory
import java.io.ByteArrayInputStream
@ -24,7 +25,7 @@ import java.util.jar.JarInputStream
import javax.annotation.concurrent.ThreadSafe
class MockIdentityService(val identities: List<Party>) : IdentityService {
class MockIdentityService(val identities: List<Party>) : IdentityService, SingletonSerializeAsToken() {
private val keyToParties: Map<PublicKey, Party>
get() = synchronized(identities) { identities.associateBy { it.owningKey } }
private val nameToParties: Map<String, Party>
@ -36,7 +37,7 @@ class MockIdentityService(val identities: List<Party>) : IdentityService {
class MockKeyManagementService(vararg initialKeys: KeyPair) : KeyManagementService {
class MockKeyManagementService(vararg initialKeys: KeyPair) : SingletonSerializeAsToken(), KeyManagementService {
override val keys: MutableMap<PublicKey, PrivateKey>
init {
@ -88,7 +89,7 @@ class MockStorageService(override val attachments: AttachmentStorage = MockAttac
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public),
// This parameter is for unit tests that want to observe operation details.
val recordingAs: (String) -> String = { tableName -> "" })
: StorageService {
: SingletonSerializeAsToken(), StorageService {
protected val tables = HashMap<String, MutableMap<*, *>>()
private fun <K, V> getMapOriginal(tableName: String): MutableMap<K, V> {
@ -3,12 +3,14 @@ package com.r3corda.core.serialization
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.serializers.JavaSerializer
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.crypto.sha256
@ -17,6 +19,8 @@ import com.r3corda.core.node.services.AttachmentStorage
import de.javakaffee.kryoserializers.ArraysAsListSerializer
import org.objenesis.strategy.StdInstantiatorStrategy
import java.io.ByteArrayOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.lang.reflect.InvocationTargetException
import java.nio.file.Files
import java.nio.file.Path
@ -252,7 +256,7 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
isRegistrationRequired = false
// Allow construction of objects using a JVM backdoor that skips invoking the constructors, if there is no
// no-arg constructor available.
instantiatorStrategy = Kryo.DefaultInstantiatorStrategy(StdInstantiatorStrategy())
instantiatorStrategy = DefaultInstantiatorStrategy(StdInstantiatorStrategy())
register(Arrays.asList("").javaClass, ArraysAsListSerializer());
@ -262,18 +266,16 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
register(Kryo::class.java, object : Serializer<Kryo>() {
override fun write(kryo: Kryo, output: Output, obj: Kryo) {
override fun read(kryo: Kryo, input: Input, type: Class<Kryo>): Kryo {
return createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo)
// Some things where the JRE provides an efficient custom serialisation.
val ser = JavaSerializer()
val keyPair = generateKeyPair()
register(keyPair.public.javaClass, ser)
register(keyPair.private.javaClass, ser)
register(Instant::class.java, ser)
register(keyPair.public.javaClass, ReferencesAwareJavaSerializer)
register(keyPair.private.javaClass, ReferencesAwareJavaSerializer)
register(Instant::class.java, ReferencesAwareJavaSerializer)
// Some classes have to be handled with the ImmutableClassSerializer because they need to have their
// constructors be invoked (typically for lazy members).
@ -284,6 +286,71 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
// This ensures a SerializedBytes<Foo> wrapper is written out as just a byte array.
register(SerializedBytes::class.java, SerializedBytesSerializer)
addDefaultSerializer(SerializeAsToken::class.java, SerializeAsTokenSerializer<SerializeAsToken>())
// This is required to make all the unit tests pass
* Use this method to mark any types which can have the same instance within it more than once. This will make sure
* the serialised form is stable across multiple serialise-deserialise cycles. Using this on a type with internal cyclic
* references will throw a stack overflow exception during serialisation.
inline fun <reified T : Any> Kryo.noReferencesWithin() {
register(T::class.java, NoReferencesSerializer(getSerializer(T::class.java)))
class NoReferencesSerializer<T>(val baseSerializer: Serializer<T>) : Serializer<T>() {
override fun read(kryo: Kryo, input: Input, type: Class<T>): T {
val previousValue = kryo.setReferences(false)
try {
return baseSerializer.read(kryo, input, type)
} finally {
kryo.references = previousValue
override fun write(kryo: Kryo, output: Output, obj: T) {
val previousValue = kryo.setReferences(false)
try {
baseSerializer.write(kryo, output, obj)
} finally {
kryo.references = previousValue
* Improvement to the builtin JavaSerializer by honouring the [Kryo.getReferences] setting.
object ReferencesAwareJavaSerializer : JavaSerializer() {
override fun write(kryo: Kryo, output: Output, obj: Any) {
if (kryo.references) {
super.write(kryo, output, obj)
else {
ObjectOutputStream(output).use {
override fun read(kryo: Kryo, input: Input, type: Class<Any>): Any {
return if (kryo.references) {
super.read(kryo, input, type)
else {
ObjectInputStream(input).use {
@ -1,12 +1,10 @@
package com.r3corda.core.serialization
import com.esotericsoftware.kryo.DefaultSerializer
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import java.lang.ref.WeakReference
import java.util.*
@ -23,76 +21,113 @@ import java.util.*
* they are serialized because they have a lot of internal state that does not serialize (well).
* This models a similar pattern to the readReplace/writeReplace methods in Java serialization.
* With Kryo serialisation, these classes should also annotate themselves with <code>@DefaultSerializer</code>. See below.
interface SerializeAsToken {
val token: SerializationToken
fun toToken(context: SerializeAsTokenContext): SerializationToken
* This represents a token in the serialized stream for an instance of a type that implements [SerializeAsToken]
interface SerializationToken {
fun fromToken(): Any
fun fromToken(context: SerializeAsTokenContext): Any
* A Kryo serializer for [SerializeAsToken] implementations.
* Annotate the [SerializeAsToken] with <code>@DefaultSerializer(SerializeAsTokenSerializer::class)</code>
* This is registered in [createKryo].
class SerializeAsTokenSerializer<T : SerializeAsToken> : Serializer<T>() {
override fun write(kryo: Kryo, output: Output, obj: T) {
kryo.writeClassAndObject(output, obj.token)
kryo.writeClassAndObject(output, obj.toToken(getContext(kryo) ?: throw KryoException("Attempt to write a ${SerializeAsToken::class.simpleName} instance of ${obj.javaClass.name} without initialising a context")))
override fun read(kryo: Kryo, input: Input, type: Class<T>): T {
val token = (kryo.readClassAndObject(input) as? SerializationToken) ?: throw KryoException("Non-token read for tokenized type: ${type.name}")
val fromToken = token.fromToken()
val fromToken = token.fromToken(getContext(kryo) ?: throw KryoException("Attempt to read a token for a ${SerializeAsToken::class.simpleName} instance of ${type.name} without initialising a context"))
if (type.isAssignableFrom(fromToken.javaClass)) {
return type.cast(fromToken)
} else {
throw KryoException("Token read did not return tokenized type: ${type.name}")
throw KryoException("Token read ($token) did not return expected tokenized type: ${type.name}")
companion object {
private fun getContext(kryo: Kryo): SerializeAsTokenContext? = kryo.context.get(SerializeAsTokenContext::class.java) as? SerializeAsTokenContext
fun setContext(kryo: Kryo, context: SerializeAsTokenContext) {
kryo.context.put(SerializeAsTokenContext::class.java, context)
fun clearContext(kryo: Kryo) {
* A class representing a [SerializationToken] for some object that is not serializable but can be re-created or looked up
* (when deserialized) via a [String] key.
* A context for mapping SerializationTokens to/from SerializeAsTokens.
* A context is initialised with an object containing all the instances of [SerializeAsToken] to eagerly register all the tokens.
* In our case this can be the [ServiceHub].
* Then it is a case of using the companion object methods on [SerializeAsTokenSerializer] to set and clear context as necessary
* on the Kryo instance when serializing to enable/disable tokenization.
private data class SerializationStringToken(private val key: String, private val className: String) : SerializationToken {
class SerializeAsTokenContext(toBeTokenized: Any, kryo: Kryo = createKryo()) {
internal val tokenToTokenized = HashMap<SerializationToken, SerializeAsToken>()
internal var readOnly = false
constructor(key: String, toBeProxied: SerializeAsStringToken) : this(key, toBeProxied.javaClass.name) {
tokenized.put(this, WeakReference(toBeProxied))
init {
* Go ahead and eagerly serialize the object to register all of the tokens in the context.
* This results in the toToken() method getting called for any [SerializeAsStringToken] instances which
* are encountered in the object graph as they are serialized by Kryo and will therefore register the token to
* object mapping for those instances. We then immediately set the readOnly flag to stop further adhoc or
* accidental registrations from occuring as these could not be deserialized in a deserialization-first
* scenario if they are not part of this iniital context construction serialization.
SerializeAsTokenSerializer.setContext(kryo, this)
readOnly = true
* A class representing a [SerializationToken] for some object that is not serializable but can be looked up
* (when deserialized) via just the class name.
data class SingletonSerializationToken private constructor(private val className: String) : SerializationToken {
constructor(toBeTokenized: SerializeAsToken) : this(toBeTokenized.javaClass.name)
override fun fromToken(context: SerializeAsTokenContext): Any = context.tokenToTokenized[this] ?:
throw IllegalStateException("Unable to find tokenized instance of ${className} in context $context")
companion object {
val tokenized = Collections.synchronizedMap(WeakHashMap<SerializationStringToken, WeakReference<SerializeAsStringToken>>())
fun registerWithContext(token: SingletonSerializationToken, toBeTokenized: SerializeAsToken, context: SerializeAsTokenContext): SerializationToken =
if (token in context.tokenToTokenized) token else registerNewToken(token, toBeTokenized, context)
override fun fromToken(): Any = tokenized.get(this)?.get() ?:
throw IllegalStateException("Unable to find tokenized instance of ${className} for key $key")
// Only allowable if we are in SerializeAsTokenContext init (readOnly == false)
private fun registerNewToken(token: SingletonSerializationToken, toBeTokenized: SerializeAsToken, context: SerializeAsTokenContext): SerializationToken {
if (context.readOnly) throw UnsupportedOperationException("Attempt to write token for lazy registered ${toBeTokenized.javaClass.name}. " +
"All tokens should be registered during context construction.")
context.tokenToTokenized[token] = toBeTokenized
return token
* A base class for implementing large objects / components / services that need to serialize themselves to a string token
* to indicate which instance the token is a serialized form of.
* This class will also double check that the class is annotated for Kryo serialization. Note it does this on every
* instance constructed but given this is designed to represent heavyweight services or components, this should not be significant.
abstract class SerializeAsStringToken(val key: String) : SerializeAsToken {
abstract class SingletonSerializeAsToken() : SerializeAsToken {
init {
// Verify we have the annotation
val annotation = javaClass.getAnnotation(DefaultSerializer::class.java)
if (annotation == null || annotation.value.java.name != SerializeAsTokenSerializer::class.java.name) {
throw IllegalStateException("${this.javaClass.name} is not annotated with @${DefaultSerializer::class.java.simpleName} set to ${SerializeAsTokenSerializer::class.java.simpleName}")
private val token = SingletonSerializationToken(this)
override val token: SerializationToken = SerializationStringToken(key, this)
override fun toToken(context: SerializeAsTokenContext) = SingletonSerializationToken.registerWithContext(token, this, context)
@ -119,25 +119,7 @@ class ProgressTracker(vararg steps: Step) {
* Writable map that lets you insert child [ProgressTracker]s for particular steps. It's OK to edit this even
* after a progress tracker has been started.
var childrenFor = object : HashMap<Step, ProgressTracker>() {
override fun put(key: Step, value: ProgressTracker): ProgressTracker? {
val r = super.put(key, value)
childSubscriptions[value] = value.changes.subscribe({ _changes.onNext(it) }, { _changes.onError(it) })
value.parent = this@ProgressTracker
_changes.onNext(Change.Structural(this@ProgressTracker, key))
return r
override fun remove(key: Step): ProgressTracker? {
val tracker = this[key]
if (tracker != null) {
tracker.parent = null
childSubscriptions[tracker]?.let { it.unsubscribe(); childSubscriptions.remove(tracker) }
_changes.onNext(Change.Structural(this@ProgressTracker, key))
return super.remove(key)
val childrenFor: ChildrenProgressTrackers = ChildrenProgressTrackersImpl()
/** The parent of this tracker: set automatically by the parent when a tracker is added as a child */
var parent: ProgressTracker? = null
@ -150,8 +132,6 @@ class ProgressTracker(vararg steps: Step) {
return cursor
private val childSubscriptions = HashMap<ProgressTracker, Subscription>()
private fun _allSteps(level: Int = 0): List<Pair<Int, Step>> {
val result = ArrayList<Pair<Int, Step>>()
for (step in steps) {
@ -188,4 +168,37 @@ class ProgressTracker(vararg steps: Step) {
* if a step changed its label or rendering).
val changes: Observable<Change> get() = _changes
// TODO remove this interface and add its three methods directly into ProgressTracker
interface ChildrenProgressTrackers {
operator fun get(step: ProgressTracker.Step): ProgressTracker?
operator fun set(step: ProgressTracker.Step, childProgressTracker: ProgressTracker)
fun remove(step: ProgressTracker.Step)
private inner class ChildrenProgressTrackersImpl : ChildrenProgressTrackers {
private val map = HashMap<Step, Pair<ProgressTracker, Subscription>>()
override fun get(step: Step): ProgressTracker? = map[step]?.first
override fun set(step: Step, childProgressTracker: ProgressTracker) {
val subscription = childProgressTracker.changes.subscribe({ _changes.onNext(it) }, { _changes.onError(it) })
map[step] = Pair(childProgressTracker, subscription)
childProgressTracker.parent = this@ProgressTracker
_changes.onNext(Change.Structural(this@ProgressTracker, step))
override fun remove(step: Step) {
map.remove(step)?.let {
it.first.parent = null
_changes.onNext(Change.Structural(this@ProgressTracker, step))
@ -1,95 +1,191 @@
package com.r3corda.protocols
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.crypto.Party
import com.r3corda.core.contracts.TimestampCommand
import com.r3corda.core.contracts.WireTransaction
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SignedData
import com.r3corda.core.crypto.signWithECDSA
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.TimestampChecker
import com.r3corda.core.node.services.UniquenessException
import com.r3corda.core.node.services.UniquenessProvider
import com.r3corda.core.noneOrSingle
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.core.utilities.UntrustworthyData
import java.security.PublicKey
* A protocol to be used for obtaining a signature from a [NotaryService] ascertaining the transaction
* timestamp is correct and none of its inputs have been used in another completed transaction
* @throws NotaryException in case the any of the inputs to the transaction have been consumed
* by another transaction or the timestamp is invalid
class NotaryProtocol(private val wtx: WireTransaction,
override val progressTracker: ProgressTracker = NotaryProtocol.tracker()) : ProtocolLogic<DigitalSignature.LegallyIdentifiable>() {
companion object {
val TOPIC = "platform.notary.request"
object NotaryProtocol {
val TOPIC = "platform.notary.request"
val TOPIC_INITIATE = "platform.notary.initiate"
object REQUESTING : ProgressTracker.Step("Requesting signature by Notary service")
* A protocol to be used for obtaining a signature from a [NotaryService] ascertaining the transaction
* timestamp is correct and none of its inputs have been used in another completed transaction
* @throws NotaryException in case the any of the inputs to the transaction have been consumed
* by another transaction or the timestamp is invalid
class Client(private val wtx: WireTransaction,
override val progressTracker: ProgressTracker = Client.tracker()) : ProtocolLogic<DigitalSignature.LegallyIdentifiable>() {
companion object {
object VALIDATING : ProgressTracker.Step("Validating response from Notary service")
object REQUESTING : ProgressTracker.Step("Requesting signature by Notary service")
fun tracker() = ProgressTracker(REQUESTING, VALIDATING)
object VALIDATING : ProgressTracker.Step("Validating response from Notary service")
lateinit var notaryNode: NodeInfo
fun tracker() = ProgressTracker(REQUESTING, VALIDATING)
override fun call(): DigitalSignature.LegallyIdentifiable {
progressTracker.currentStep = REQUESTING
notaryNode = findNotaryNode()
lateinit var notaryNode: NodeInfo
val sessionID = random63BitValue()
val request = SignRequest(wtx.serialized, serviceHub.storageService.myLegalIdentity, serviceHub.networkService.myAddress, sessionID)
val response = sendAndReceive<Result>(TOPIC, notaryNode.address, 0, sessionID, request)
override fun call(): DigitalSignature.LegallyIdentifiable {
progressTracker.currentStep = REQUESTING
notaryNode = findNotaryNode()
val notaryResult = validateResponse(response)
return notaryResult.sig ?: throw NotaryException(notaryResult.error!!)
val sendSessionID = random63BitValue()
val receiveSessionID = random63BitValue()
private fun validateResponse(response: UntrustworthyData<Result>): Result {
progressTracker.currentStep = VALIDATING
val handshake = Handshake(serviceHub.networkService.myAddress, sendSessionID, receiveSessionID)
sendAndReceive<Unit>(TOPIC_INITIATE, notaryNode.address, 0, receiveSessionID, handshake)
response.validate {
if (it.sig != null) validateSignature(it.sig, wtx.serialized)
else if (it.error is NotaryError.Conflict) it.error.conflict.verified()
else if (it.error == null || it.error !is NotaryError)
throw IllegalStateException("Received invalid result from Notary service '${notaryNode.identity}'")
return it
val request = SignRequest(wtx.serialized, serviceHub.storageService.myLegalIdentity)
val response = sendAndReceive<Result>(TOPIC, notaryNode.address, sendSessionID, receiveSessionID, request)
val notaryResult = validateResponse(response)
return notaryResult.sig ?: throw NotaryException(notaryResult.error!!)
private fun validateResponse(response: UntrustworthyData<Result>): Result {
progressTracker.currentStep = VALIDATING
response.validate {
if (it.sig != null) validateSignature(it.sig, wtx.serialized)
else if (it.error is NotaryError.Conflict) it.error.conflict.verified()
else if (it.error == null || it.error !is NotaryError)
throw IllegalStateException("Received invalid result from Notary service '${notaryNode.identity}'")
return it
private fun validateSignature(sig: DigitalSignature.LegallyIdentifiable, data: SerializedBytes<WireTransaction>) {
check(sig.signer == notaryNode.identity) { "Notary result not signed by the correct service" }
private fun findNotaryNode(): NodeInfo {
var maybeNotaryKey: PublicKey? = null
val timestampCommand = wtx.commands.singleOrNull { it.value is TimestampCommand }
if (timestampCommand != null) maybeNotaryKey = timestampCommand.signers.first()
for (stateRef in wtx.inputs) {
val inputNotaryKey = serviceHub.loadState(stateRef).notary.owningKey
if (maybeNotaryKey != null)
check(maybeNotaryKey == inputNotaryKey) { "Input states and timestamp must have the same Notary" }
else maybeNotaryKey = inputNotaryKey
val notaryKey = maybeNotaryKey ?: throw IllegalStateException("Transaction does not specify a Notary")
val notaryNode = serviceHub.networkMapCache.getNodeByPublicKey(notaryKey)
return notaryNode ?: throw IllegalStateException("No Notary node can be found with the specified public key")
private fun validateSignature(sig: DigitalSignature.LegallyIdentifiable, data: SerializedBytes<WireTransaction>) {
check(sig.signer == notaryNode.identity) { "Notary result not signed by the correct service" }
* Checks that the timestamp command is valid (if present) and commits the input state, or returns a conflict
* if any of the input states have been previously committed.
* Extend this class, overriding _beforeCommit_ to add custom transaction processing/validation logic.
* TODO: the notary service should only be able to see timestamp commands and inputs
open class Service(val otherSide: SingleMessageRecipient,
val sendSessionID: Long,
val receiveSessionID: Long,
val timestampChecker: TimestampChecker,
val uniquenessProvider: UniquenessProvider) : ProtocolLogic<Unit>() {
override fun call() {
val request = receive<SignRequest>(TOPIC, receiveSessionID).validate { it }
val txBits = request.txBits
val reqIdentity = request.callerIdentity
private fun findNotaryNode(): NodeInfo {
var maybeNotaryKey: PublicKey? = null
val wtx = txBits.deserialize()
val result: Result
try {
beforeCommit(wtx, reqIdentity)
commitInputStates(wtx, reqIdentity)
val timestampCommand = wtx.commands.singleOrNull { it.value is TimestampCommand }
if (timestampCommand != null) maybeNotaryKey = timestampCommand.signers.first()
val sig = sign(txBits)
result = Result.noError(sig)
for (stateRef in wtx.inputs) {
val inputNotaryKey = serviceHub.loadState(stateRef).notary.owningKey
if (maybeNotaryKey != null)
check(maybeNotaryKey == inputNotaryKey) { "Input states and timestamp must have the same Notary" }
else maybeNotaryKey = inputNotaryKey
} catch(e: NotaryException) {
result = Result.withError(e.error)
send(TOPIC, otherSide, sendSessionID, result)
val notaryKey = maybeNotaryKey ?: throw IllegalStateException("Transaction does not specify a Notary")
val notaryNode = serviceHub.networkMapCache.getNodeByPublicKey(notaryKey)
return notaryNode ?: throw IllegalStateException("No Notary node can be found with the specified public key")
private fun validateTimestamp(tx: WireTransaction) {
val timestampCmd = try {
tx.commands.noneOrSingle { it.value is TimestampCommand } ?: return
} catch (e: IllegalArgumentException) {
throw NotaryException(NotaryError.MoreThanOneTimestamp())
val myIdentity = serviceHub.storageService.myLegalIdentity
if (!timestampCmd.signers.contains(myIdentity.owningKey))
throw NotaryException(NotaryError.NotForMe())
if (!timestampChecker.isValid(timestampCmd.value as TimestampCommand))
throw NotaryException(NotaryError.TimestampInvalid())
* No pre-commit processing is done. Transaction is not checked for contract-validity, as that would require fully
* resolving it into a [TransactionForVerification], for which the caller would have to reveal the whole transaction
* history chain.
* As a result, the Notary _will commit invalid transactions_ as well, but as it also records the identity of
* the caller, it is possible to raise a dispute and verify the validity of the transaction and subsequently
* undo the commit of the input states (the exact mechanism still needs to be worked out)
open fun beforeCommit(wtx: WireTransaction, reqIdentity: Party) {
private fun commitInputStates(tx: WireTransaction, reqIdentity: Party) {
try {
uniquenessProvider.commit(tx, reqIdentity)
} catch (e: UniquenessException) {
val conflictData = e.error.serialize()
val signedConflict = SignedData(conflictData, sign(conflictData))
throw NotaryException(NotaryError.Conflict(tx, signedConflict))
private fun <T : Any> sign(bits: SerializedBytes<T>): DigitalSignature.LegallyIdentifiable {
val mySigningKey = serviceHub.storageService.myLegalIdentityKey
val myIdentity = serviceHub.storageService.myLegalIdentity
return mySigningKey.signWithECDSA(bits, myIdentity)
class Handshake(
replyTo: SingleMessageRecipient,
val sendSessionID: Long,
sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
/** TODO: The caller must authenticate instead of just specifying its identity */
class SignRequest(val txBits: SerializedBytes<WireTransaction>,
val callerIdentity: Party,
replyTo: SingleMessageRecipient,
sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
val callerIdentity: Party)
data class Result private constructor(val sig: DigitalSignature.LegallyIdentifiable?, val error: NotaryError?) {
companion object {
@ -97,6 +193,24 @@ class NotaryProtocol(private val wtx: WireTransaction,
fun noError(sig: DigitalSignature.LegallyIdentifiable) = Result(sig, null)
interface Factory {
fun create(otherSide: SingleMessageRecipient,
sendSessionID: Long,
receiveSessionID: Long,
timestampChecker: TimestampChecker,
uniquenessProvider: UniquenessProvider): Service
object DefaultFactory : Factory {
override fun create(otherSide: SingleMessageRecipient,
sendSessionID: Long,
receiveSessionID: Long,
timestampChecker: TimestampChecker,
uniquenessProvider: UniquenessProvider): Service {
return Service(otherSide, sendSessionID, receiveSessionID, timestampChecker, uniquenessProvider)
class NotaryException(val error: NotaryError) : Exception() {
@ -115,4 +229,6 @@ sealed class NotaryError {
/** Thrown if the time specified in the timestamp command is outside the allowed tolerance */
class TimestampInvalid : NotaryError()
class TransactionInvalid : NotaryError()
@ -1,7 +1,6 @@
package com.r3corda.protocols
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.*
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.messaging.SingleMessageRecipient
@ -158,7 +158,7 @@ object TwoPartyDealProtocol {
private fun getNotarySignature(stx: SignedTransaction): DigitalSignature.LegallyIdentifiable {
progressTracker.currentStep = NOTARY
return subProtocol(NotaryProtocol(stx.tx))
return subProtocol(NotaryProtocol.Client(stx.tx))
open fun signWithOurKey(partialTX: SignedTransaction): DigitalSignature.WithKey {
@ -0,0 +1,48 @@
package com.r3corda.protocols
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.contracts.TransactionVerificationException
import com.r3corda.core.contracts.WireTransaction
import com.r3corda.core.contracts.toLedgerTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.services.TimestampChecker
import com.r3corda.core.node.services.UniquenessProvider
import java.security.SignatureException
* A notary commit protocol that makes sure a given transaction is valid before committing it. This does mean that the calling
* party has to reveal the whole transaction history; however, we avoid complex conflict resolution logic where a party
* has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was
* indeed valid
class ValidatingNotaryProtocol(otherSide: SingleMessageRecipient,
sessionIdForSend: Long,
sessionIdForReceive: Long,
timestampChecker: TimestampChecker,
uniquenessProvider: UniquenessProvider) : NotaryProtocol.Service(otherSide, sessionIdForSend, sessionIdForReceive, timestampChecker, uniquenessProvider) {
override fun beforeCommit(wtx: WireTransaction, reqIdentity: Party) {
try {
validateDependencies(reqIdentity, wtx)
} catch (e: Exception) {
when (e) {
is TransactionVerificationException,
is SignatureException -> throw NotaryException(NotaryError.TransactionInvalid())
else -> throw e
private fun checkContractValid(wtx: WireTransaction) {
val ltx = wtx.toLedgerTransaction(serviceHub.identityService, serviceHub.storageService.attachments)
private fun validateDependencies(reqIdentity: Party, wtx: WireTransaction) {
val otherSide = serviceHub.networkMapCache.getNodeByPublicKey(reqIdentity.owningKey)!!.address
subProtocol(ResolveTransactionsProtocol(wtx, otherSide))
@ -1,34 +1,68 @@
package com.r3corda.core.serialization
import com.esotericsoftware.kryo.Kryo
import com.google.common.primitives.Ints
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import java.time.Instant
import kotlin.test.assertEquals
import kotlin.test.assertNull
import java.util.*
class KryoTests {
data class Person(val name: String, val birthday: Instant?)
private val kryo: Kryo = createKryo()
private val kryo = createKryo()
fun ok() {
val april_17th = Instant.parse("1984-04-17T00:30:00.00Z")
val mike = Person("mike", april_17th)
val birthday = Instant.parse("1984-04-17T00:30:00.00Z")
val mike = Person("mike", birthday)
val bits = mike.serialize(kryo)
with(bits.deserialize<Person>(kryo)) {
assertEquals("mike", name)
assertEquals(april_17th, birthday)
assertThat(bits.deserialize(kryo)).isEqualTo(Person("mike", birthday))
fun nullables() {
val bob = Person("bob", null)
val bits = bob.serialize(kryo)
with(bits.deserialize<Person>(kryo)) {
assertEquals("bob", name)
assertThat(bits.deserialize(kryo)).isEqualTo(Person("bob", null))
fun `serialised form is stable when the same object instance is added to the deserialised object graph`() {
val obj = Ints.toByteArray(0x01234567).opaque()
val originalList = arrayListOf(obj)
val deserialisedList = originalList.serialize(kryo).deserialize(kryo)
originalList += obj
deserialisedList += obj
fun `serialised form is stable when the same object instance occurs more than once, and using java serialisation`() {
val instant = Instant.ofEpochMilli(123)
val instantCopy = Instant.ofEpochMilli(123)
val listWithCopies = arrayListOf(instant, instantCopy)
val listWithSameInstances = arrayListOf(instant, instant)
fun `cyclic object graph`() {
val cyclic = Cyclic(3)
val bits = cyclic.serialize(kryo)
private data class Person(val name: String, val birthday: Instant?)
private class Cyclic(val value: Int) {
val thisInstance = this
override fun equals(other: Any?): Boolean = (this === other) || (other is Cyclic && this.value == other.value)
override fun hashCode(): Int = value.hashCode()
override fun toString(): String = "Cyclic($value)"
@ -1,17 +1,34 @@
package com.r3corda.core.serialization
import com.esotericsoftware.kryo.DefaultSerializer
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.io.Output
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
import java.io.ByteArrayOutputStream
class SerializationTokenTest {
lateinit var kryo: Kryo
fun setup() {
kryo = THREAD_LOCAL_KRYO.get()
fun cleanup() {
// Large tokenizable object so we can tell from the smaller number of serialized bytes it was actually tokenized
private class LargeTokenizable(size: Int) : SerializeAsStringToken(size.toString()) {
val bytes = OpaqueBytes(ByteArray(size))
private class LargeTokenizable : SingletonSerializeAsToken() {
val bytes = OpaqueBytes(ByteArray(1024))
val numBytes: Int
get() = bytes.size
override fun hashCode() = bytes.bits.size
@ -20,61 +37,78 @@ class SerializationTokenTest {
fun `write token and read tokenizable`() {
val numBytes = 1024
val tokenizableBefore = LargeTokenizable(numBytes)
val serializedBytes = tokenizableBefore.serialize()
val tokenizableAfter = serializedBytes.deserialize()
assertEquals(tokenizableBefore, tokenizableAfter)
fun `check same sized tokenizable equal`() {
val tokenizableBefore = LargeTokenizable(1024)
val tokenizableAfter = LargeTokenizable(1024)
assertEquals(tokenizableBefore, tokenizableAfter)
fun `check different sized tokenizable not equal`() {
val tokenizableBefore = LargeTokenizable(1024)
val tokenizableAfter = LargeTokenizable(1025)
assertNotEquals(tokenizableBefore, tokenizableAfter)
private class IntegerSerializeAsKeyedToken(val value: Int) : SerializeAsStringToken(value.toString())
fun `write and read keyed`() {
val tokenizableBefore1 = IntegerSerializeAsKeyedToken(123)
val tokenizableBefore2 = IntegerSerializeAsKeyedToken(456)
val serializedBytes1 = tokenizableBefore1.serialize()
val tokenizableAfter1 = serializedBytes1.deserialize()
val serializedBytes2 = tokenizableBefore2.serialize()
val tokenizableAfter2 = serializedBytes2.deserialize()
private class UnitSerializeAsSingletonToken : SerializeAsStringToken("Unit0")
fun `write and read singleton`() {
val tokenizableBefore = UnitSerializeAsSingletonToken()
val serializedBytes = tokenizableBefore.serialize()
val tokenizableAfter = serializedBytes.deserialize()
val tokenizableBefore = LargeTokenizable()
val context = SerializeAsTokenContext(tokenizableBefore, kryo)
SerializeAsTokenSerializer.setContext(kryo, context)
val serializedBytes = tokenizableBefore.serialize(kryo)
val tokenizableAfter = serializedBytes.deserialize(kryo)
private class UnannotatedSerializeAsSingletonToken : SerializeAsStringToken("Unannotated0")
private class UnitSerializeAsToken : SingletonSerializeAsToken()
@Test(expected = IllegalStateException::class)
fun `unannotated throws`() {
val tokenizableBefore = UnannotatedSerializeAsSingletonToken()
fun `write and read singleton`() {
val tokenizableBefore = UnitSerializeAsToken()
val context = SerializeAsTokenContext(tokenizableBefore, kryo)
SerializeAsTokenSerializer.setContext(kryo, context)
val serializedBytes = tokenizableBefore.serialize(kryo)
val tokenizableAfter = serializedBytes.deserialize(kryo)
@Test(expected = UnsupportedOperationException::class)
fun `new token encountered after context init`() {
val tokenizableBefore = UnitSerializeAsToken()
val context = SerializeAsTokenContext(emptyList<Any>(), kryo)
SerializeAsTokenSerializer.setContext(kryo, context)
@Test(expected = UnsupportedOperationException::class)
fun `deserialize unregistered token`() {
val tokenizableBefore = UnitSerializeAsToken()
val context = SerializeAsTokenContext(emptyList<Any>(), kryo)
SerializeAsTokenSerializer.setContext(kryo, context)
val serializedBytes = tokenizableBefore.toToken(SerializeAsTokenContext(emptyList<Any>(), kryo)).serialize(kryo)
@Test(expected = KryoException::class)
fun `no context set`() {
val tokenizableBefore = UnitSerializeAsToken()
@Test(expected = KryoException::class)
fun `deserialize non-token`() {
val tokenizableBefore = UnitSerializeAsToken()
val context = SerializeAsTokenContext(tokenizableBefore, kryo)
SerializeAsTokenSerializer.setContext(kryo, context)
val stream = ByteArrayOutputStream()
Output(stream).use {
kryo.writeClass(it, SingletonSerializeAsToken::class.java)
kryo.writeObject(it, emptyList<Any>())
val serializedBytes = SerializedBytes<Any>(stream.toByteArray())
private class WrongTypeSerializeAsToken : SerializeAsToken {
override fun toToken(context: SerializeAsTokenContext): SerializationToken {
return object : SerializationToken {
override fun fromToken(context: SerializeAsTokenContext): Any = UnitSerializeAsToken()
@Test(expected = KryoException::class)
fun `token returns unexpected type`() {
val tokenizableBefore = WrongTypeSerializeAsToken()
val context = SerializeAsTokenContext(tokenizableBefore, kryo)
SerializeAsTokenSerializer.setContext(kryo, context)
val serializedBytes = tokenizableBefore.serialize(kryo)
@ -10,6 +10,8 @@ Here are changes in git master that haven't yet made it to a snapshot release:
* The cash contract has moved from com.r3corda.contracts to com.r3corda.contracts.cash.
* Amount class is now generic, to support non-currency types (such as assets, or currency with additional information).
* Refactored the Cash contract to have a new FungibleAsset superclass, to model all countable assets that can be merged
and split (currency, barrels of oil, etc.)
Milestone 0
@ -20,9 +20,6 @@ import com.r3corda.node.services.api.AcceptsFileUpload
import com.r3corda.node.services.api.CheckpointStorage
import com.r3corda.node.services.api.MonitoringService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
import com.r3corda.node.services.transactions.NotaryService
import com.r3corda.node.services.transactions.TimestampChecker
import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.identity.InMemoryIdentityService
@ -36,6 +33,10 @@ import com.r3corda.node.services.persistence.NodeAttachmentService
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
import com.r3corda.node.services.persistence.StorageServiceImpl
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
import com.r3corda.node.services.transactions.NotaryService
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.node.services.transactions.ValidatingNotaryService
import com.r3corda.node.services.wallet.NodeWalletService
import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.node.utilities.AffinityExecutor
@ -87,7 +88,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
override val keyManagementService: KeyManagementService get() = keyManagement
override val identityService: IdentityService get() = identity
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val clock: Clock get() = platformClock
override val clock: Clock = platformClock
val info: NodeInfo by lazy {
@ -106,6 +107,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
lateinit var identity: IdentityService
lateinit var net: MessagingService
lateinit var api: APIServer
var isPreviousCheckpointsPresent = false
private set
/** Completes once the node has successfully registered with the network map service. Null until [start] returns. */
@Volatile var networkMapRegistrationFuture: ListenableFuture<Unit>? = null
@ -123,27 +126,35 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
storage = storageServices.first
checkpointStorage = storageServices.second
net = makeMessagingService()
smm = StateMachineManager(services, checkpointStorage, serverThread)
wallet = NodeWalletService(services)
keyManagement = E2ETestKeyManagementService()
api = APIServerImpl(this)
// Build services we're advertising
if (NetworkMapService.Type in info.advertisedServices) makeNetworkMapService()
if (NotaryService.Type in info.advertisedServices) makeNotaryService()
identity = makeIdentityService()
api = APIServerImpl(this)
smm = StateMachineManager(services, listOf(storage, net, wallet, keyManagement, identity, platformClock), checkpointStorage, serverThread)
// This object doesn't need to be referenced from this class because it registers handlers on the network
// service and so that keeps it from being collected.
DataVendingService(net, storage)
networkMapRegistrationFuture = registerWithNetworkMap()
isPreviousCheckpointsPresent = checkpointStorage.checkpoints.any()
started = true
return this
private fun buildAdvertisedServices() {
val serviceTypes = info.advertisedServices
if (NetworkMapService.Type in serviceTypes) makeNetworkMapService()
val notaryServiceType = serviceTypes.singleOrNull { it.isSubTypeOf(NotaryService.Type) }
if (notaryServiceType != null) makeNotaryService(notaryServiceType)
* Register this node with the network map cache, and load network map from a remote service (and register for
* updates) if one has been supplied.
@ -197,10 +208,15 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
inNodeNetworkMapService = InMemoryNetworkMapService(net, reg, services.networkMapCache)
open protected fun makeNotaryService() {
open protected fun makeNotaryService(type: ServiceType) {
val uniquenessProvider = InMemoryUniquenessProvider()
val timestampChecker = TimestampChecker(platformClock, 30.seconds)
inNodeNotaryService = NotaryService(net, storage.myLegalIdentity, storage.myLegalIdentityKey, uniquenessProvider, timestampChecker)
inNodeNotaryService = when (type) {
is SimpleNotaryService.Type -> SimpleNotaryService(smm, net, timestampChecker, uniquenessProvider)
is ValidatingNotaryService.Type -> ValidatingNotaryService(smm, net, timestampChecker, uniquenessProvider)
else -> null
lateinit var interestRatesService: NodeInterestRates.Service
@ -243,7 +259,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
val checkpointStorage = PerFileCheckpointStorage(dir.resolve("checkpoints"))
_servicesThatAcceptUploads += attachments
val (identity, keypair) = obtainKeyPair(dir)
return Pair(constructStorageService(attachments, keypair, identity),checkpointStorage)
return Pair(constructStorageService(attachments, keypair, identity), checkpointStorage)
protected open fun constructStorageService(attachments: NodeAttachmentService, keypair: KeyPair, identity: Party) =
@ -7,6 +7,7 @@ import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.api.APIServer
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingService
import com.r3corda.node.servlets.AttachmentDownloadServlet
@ -52,7 +53,7 @@ class ConfigurationException(message: String) : Exception(message)
class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration,
networkMapAddress: NodeInfo?, advertisedServices: Set<ServiceType>,
clock: Clock = Clock.systemUTC(),
clock: Clock = NodeClock(),
val clientAPIs: List<Class<*>> = listOf()) : AbstractNode(dir, configuration, networkMapAddress, advertisedServices, clock) {
companion object {
/** The port that is used by default if none is specified. As you know, 31337 is the most elite number. */
@ -12,16 +12,16 @@ import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.testing.MockIdentityService
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.NotaryService
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.node.utilities.AffinityExecutor
import org.slf4j.Logger
import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyPair
import java.time.Clock
import java.util.*
@ -66,7 +66,7 @@ class MockNetwork(private val threadPerNode: Boolean = false,
open class MockNode(dir: Path, config: NodeConfiguration, val mockNet: MockNetwork, networkMapAddr: NodeInfo?,
advertisedServices: Set<ServiceType>, val id: Int, val keyPair: KeyPair?) : AbstractNode(dir, config, networkMapAddr, advertisedServices, Clock.systemUTC()) {
advertisedServices: Set<ServiceType>, val id: Int, val keyPair: KeyPair?) : AbstractNode(dir, config, networkMapAddr, advertisedServices, NodeClock()) {
override val log: Logger = loggerFor<MockNode>()
override val serverThread: AffinityExecutor =
if (mockNet.threadPerNode)
@ -149,12 +149,12 @@ class MockNetwork(private val threadPerNode: Boolean = false,
fun createTwoNodes(nodeFactory: Factory = defaultFactory, notaryKeyPair: KeyPair? = null): Pair<MockNode, MockNode> {
return Pair(
createNode(null, -1, nodeFactory, true, null, notaryKeyPair, NetworkMapService.Type, NotaryService.Type),
createNode(null, -1, nodeFactory, true, null, notaryKeyPair, NetworkMapService.Type, SimpleNotaryService.Type),
createNode(nodes[0].info, -1, nodeFactory, true, null)
fun createNotaryNode(legalName: String? = null, keyPair: KeyPair? = null) = createNode(null, -1, defaultFactory, true, legalName, keyPair, NetworkMapService.Type, NotaryService.Type)
fun createNotaryNode(legalName: String? = null, keyPair: KeyPair? = null) = createNode(null, -1, defaultFactory, true, legalName, keyPair, NetworkMapService.Type, SimpleNotaryService.Type)
fun createPartyNode(networkMapAddr: NodeInfo, legalName: String? = null, keyPair: KeyPair? = null) = createNode(networkMapAddr, -1, defaultFactory, true, legalName, keyPair)
fun addressToNode(address: SingleMessageRecipient): MockNode = nodes.single { it.net.myAddress == address }
@ -9,11 +9,11 @@ import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.then
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.services.transactions.NotaryService
import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.SimpleNotaryService
import rx.Observable
import rx.subjects.PublishSubject
import java.nio.file.Path
@ -82,7 +82,7 @@ abstract class Simulation(val runAsync: Boolean,
object NotaryNodeFactory : MockNetwork.Factory {
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode {
val cfg = object : NodeConfiguration {
override val myLegalName: String = "Notary Service"
override val exportJMXto: String = ""
@ -134,7 +134,7 @@ abstract class Simulation(val runAsync: Boolean,
val networkMap: SimulatedNode
= network.createNode(null, nodeFactory = NetworkMapNodeFactory, advertisedServices = NetworkMapService.Type) as SimulatedNode
val notary: SimulatedNode
= network.createNode(networkMap.info, nodeFactory = NotaryNodeFactory, advertisedServices = NotaryService.Type) as SimulatedNode
= network.createNode(networkMap.info, nodeFactory = NotaryNodeFactory, advertisedServices = SimpleNotaryService.Type) as SimulatedNode
val regulators: List<SimulatedNode> = listOf(network.createNode(networkMap.info, start = false, nodeFactory = RegulatorFactory) as SimulatedNode)
val ratesOracle: SimulatedNode
= network.createNode(networkMap.info, start = false, nodeFactory = RatesOracleFactory, advertisedServices = NodeInterestRates.Type) as SimulatedNode
@ -1,20 +1,29 @@
package com.r3corda.node.testutils
package com.r3corda.node.internal.testing
import com.r3corda.contracts.DummyContract
import com.r3corda.core.contracts.StateRef
import com.r3corda.core.crypto.Party
import com.r3corda.core.seconds
import com.r3corda.core.testing.DUMMY_NOTARY
import com.r3corda.core.testing.DUMMY_NOTARY_KEY
import com.r3corda.node.internal.AbstractNode
import java.time.Instant
import java.util.*
fun issueState(node: AbstractNode, notary: Party = DUMMY_NOTARY): StateRef {
val tx = DummyContract().generateInitial(node.info.identity.ref(0), Random().nextInt(), DUMMY_NOTARY)
val tx = DummyContract().generateInitial(node.info.identity.ref(0), Random().nextInt(), notary)
val stx = tx.toSignedTransaction()
return StateRef(stx.id, 0)
fun issueInvalidState(node: AbstractNode, notary: Party = DUMMY_NOTARY): StateRef {
val tx = DummyContract().generateInitial(node.info.identity.ref(0), Random().nextInt(), notary)
tx.setTime(Instant.now(), notary, 30.seconds)
val stx = tx.toSignedTransaction(false)
return StateRef(stx.id, 0)
@ -0,0 +1,35 @@
package com.r3corda.node.serialization
import com.r3corda.core.serialization.SerializeAsToken
import com.r3corda.core.serialization.SerializeAsTokenContext
import com.r3corda.core.serialization.SingletonSerializationToken
import java.time.Clock
import java.time.Instant
import java.time.ZoneId
import javax.annotation.concurrent.ThreadSafe
* A [Clock] that tokenizes itself when serialized, and delegates to an underlying [Clock] implementation.
class NodeClock(private val delegateClock: Clock = Clock.systemUTC()) : Clock(), SerializeAsToken {
private val token = SingletonSerializationToken(this)
override fun toToken(context: SerializeAsTokenContext) = SingletonSerializationToken.registerWithContext(token, this, context)
override fun instant(): Instant {
return delegateClock.instant()
// Do not use this. Instead seek to use ZonedDateTime methods.
override fun withZone(zone: ZoneId): Clock {
throw UnsupportedOperationException("Tokenized clock does not support withZone()")
override fun getZone(): ZoneId {
return delegateClock.zone
@ -1,8 +1,7 @@
package com.r3corda.node.services.api
import com.r3corda.core.crypto.sha256
import com.r3corda.core.protocols.ProtocolStateMachine
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.node.services.statemachine.ProtocolStateMachineImpl
* Thread-safe storage of fiber checkpoints.
@ -33,11 +32,8 @@ interface CheckpointStorage {
// This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo).
data class Checkpoint(
val serialisedFiber: SerializedBytes<out ProtocolStateMachine<*>>,
val awaitingTopic: String,
val awaitingObjectOfType: String // java class name
) {
override fun toString(): String {
return "Checkpoint(#serialisedFiber=${serialisedFiber.sha256()}, awaitingTopic=$awaitingTopic, awaitingObjectOfType=$awaitingObjectOfType)"
val serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>,
val awaitingTopic: String?,
val awaitingPayloadType: String?,
val receivedPayload: Any?
@ -1,10 +1,11 @@
package com.r3corda.node.services.api
import com.codahale.metrics.MetricRegistry
import com.r3corda.core.serialization.SingletonSerializeAsToken
* Provides access to various metrics and ways to notify monitoring services of things, for sysadmin purposes.
* This is not an interface because it is too lightweight to bother mocking out.
class MonitoringService(val metrics: MetricRegistry)
class MonitoringService(val metrics: MetricRegistry) : SingletonSerializeAsToken()
@ -2,6 +2,7 @@ package com.r3corda.node.services.identity
import com.r3corda.core.crypto.Party
import com.r3corda.core.node.services.IdentityService
import com.r3corda.core.serialization.SingletonSerializeAsToken
import java.security.PublicKey
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.ThreadSafe
@ -10,7 +11,7 @@ import javax.annotation.concurrent.ThreadSafe
* Simple identity service which caches parties and provides functionality for efficient lookup.
class InMemoryIdentityService() : IdentityService {
class InMemoryIdentityService() : SingletonSerializeAsToken(), IdentityService {
private val keyToParties = ConcurrentHashMap<PublicKey, Party>()
private val nameToParties = ConcurrentHashMap<String, Party>()
@ -3,6 +3,7 @@ package com.r3corda.node.services.keys
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.node.services.KeyManagementService
import com.r3corda.core.serialization.SingletonSerializeAsToken
import java.security.KeyPair
import java.security.PrivateKey
import java.security.PublicKey
@ -21,7 +22,7 @@ import javax.annotation.concurrent.ThreadSafe
* etc
class E2ETestKeyManagementService : KeyManagementService {
class E2ETestKeyManagementService() : SingletonSerializeAsToken(), KeyManagementService {
private class InnerState {
val keys = HashMap<PublicKey, PrivateKey>()
@ -4,8 +4,9 @@ import com.google.common.net.HostAndPort
import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.ThreadBox
import com.r3corda.core.messaging.*
import com.r3corda.node.internal.Node
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.internal.Node
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.api.core.client.*
@ -52,7 +53,7 @@ import javax.annotation.concurrent.ThreadSafe
class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
val defaultExecutor: Executor = RunOnCallerThread) : MessagingService {
val defaultExecutor: Executor = RunOnCallerThread) : SingletonSerializeAsToken(), MessagingService {
// In future: can contain onion routing info, etc.
private data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient
@ -124,9 +125,8 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
val secManager = ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, secConfig)
// Currently we cannot find out if something goes wrong during startup :( This is bug ARTEMIS-388 filed by me.
// TODO Currently we cannot find out if something goes wrong during startup :( This is bug ARTEMIS-388 filed by me.
// The fix should be in the 1.3.0 release:
// https://issues.apache.org/jira/browse/ARTEMIS-388
@ -136,12 +136,13 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
// Create a queue on which to receive messages and set up the handler.
session = clientFactory.createSession()
session.createQueue(myHostPort.toString(), "inbound", false)
inboundConsumer = session.createConsumer("inbound").setMessageHandler { message: ClientMessage ->
// This code runs for every inbound message.
try {
if (!message.containsProperty(TOPIC_PROPERTY)) {
log.warn("Received message without a ${TOPIC_PROPERTY} property, ignoring")
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
val topic = message.getStringProperty(TOPIC_PROPERTY)
@ -159,6 +160,8 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
} finally {
// TODO the message is delivered onto an executor and so we may be acking the message before we've
// finished processing it
@ -6,6 +6,7 @@ import com.google.common.util.concurrent.MoreExecutors
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.sha256
import com.r3corda.core.messaging.*
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import org.slf4j.LoggerFactory
@ -28,7 +29,7 @@ import kotlin.concurrent.thread
* testing).
class InMemoryMessagingNetwork {
class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
companion object {
val MESSAGES_LOG_NAME = "messages"
private val log = LoggerFactory.getLogger(MESSAGES_LOG_NAME)
@ -167,7 +168,7 @@ class InMemoryMessagingNetwork {
* An instance can be obtained by creating a builder and then using the start method.
inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : MessagingService {
inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : SingletonSerializeAsToken(), MessagingService {
inner class Handler(val executor: Executor?, val topic: String,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
@ -15,6 +15,7 @@ import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.node.services.api.RegulatorService
@ -30,7 +31,7 @@ import javax.annotation.concurrent.ThreadSafe
* Extremely simple in-memory cache of the network map.
open class InMemoryNetworkMapCache() : NetworkMapCache {
open class InMemoryNetworkMapCache() : SingletonSerializeAsToken(), NetworkMapCache {
override val networkMapNodes: List<NodeInfo>
get() = get(NetworkMapService.Type)
override val regulators: List<NodeInfo>
@ -46,7 +47,7 @@ open class InMemoryNetworkMapCache() : NetworkMapCache {
protected var registeredNodes = Collections.synchronizedMap(HashMap<Party, NodeInfo>())
override fun get() = registeredNodes.map { it.value }
override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.contains(serviceType) }.map { it.value }
override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.any { it.isSubTypeOf(serviceType) } }.map { it.value }
override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull()
override fun getNodeByLegalName(name: String) = get().singleOrNull { it.identity.name == name }
override fun getNodeByPublicKey(publicKey: PublicKey) = get().singleOrNull { it.identity.owningKey == publicKey }
import co.paralleluniverse.common.util.VisibleForTesting
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.DummyPublicKey
import com.r3corda.core.crypto.Party
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.NodeInfo
@ -1,10 +1,11 @@
package com.r3corda.node.services.persistence
import com.r3corda.core.crypto.Party
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.AttachmentStorage
import com.r3corda.core.node.services.StorageService
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.RecordingMap
import org.slf4j.LoggerFactory
import java.security.KeyPair
@ -15,7 +16,7 @@ open class StorageServiceImpl(override val attachments: AttachmentStorage,
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public),
// This parameter is for unit tests that want to observe operation details.
val recordingAs: (String) -> String = { tableName -> "" })
: StorageService {
: SingletonSerializeAsToken(), StorageService {
protected val tables = HashMap<String, MutableMap<*, *>>()
private fun <K, V> getMapOriginal(tableName: String): MutableMap<K, V> {
@ -3,17 +3,11 @@ package com.r3corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.FiberScheduler
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.messaging.MessageRecipients
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolStateMachine
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.createKryo
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.UntrustworthyData
import com.r3corda.node.services.api.ServiceHubInternal
import org.slf4j.Logger
@ -27,11 +21,11 @@ import org.slf4j.LoggerFactory
* a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost
* logic element gets to return the value that the entire state machine resolves to.
class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberScheduler, val loggerName: String) : Fiber<R>("protocol", scheduler), ProtocolStateMachine<R> {
class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberScheduler, private val loggerName: String) : Fiber<R>("protocol", scheduler), ProtocolStateMachine<R> {
// These fields shouldn't be serialised, so they are marked @Transient.
@Transient private var suspendAction: ((result: StateMachineManager.FiberRequest, serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>) -> Unit)? = null
@Transient private var resumeWithObject: Any? = null
@Transient private var suspendAction: ((result: StateMachineManager.FiberRequest, fiber: ProtocolStateMachineImpl<*>) -> Unit)? = null
@Transient private var receivedPayload: Any? = null
@Transient lateinit override var serviceHub: ServiceHubInternal
@Transient private var _logger: Logger? = null
@ -58,11 +52,11 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberS
fun prepareForResumeWith(serviceHub: ServiceHubInternal,
withObject: Any?,
suspendAction: (StateMachineManager.FiberRequest, SerializedBytes<ProtocolStateMachineImpl<*>>) -> Unit) {
this.suspendAction = suspendAction
this.resumeWithObject = withObject
receivedPayload: Any?,
suspendAction: (StateMachineManager.FiberRequest, ProtocolStateMachineImpl<*>) -> Unit) {
this.serviceHub = serviceHub
this.receivedPayload = receivedPayload
this.suspendAction = suspendAction
@Suspendable @Suppress("UNCHECKED_CAST")
@ -81,9 +75,10 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberS
@Suspendable @Suppress("UNCHECKED_CAST")
private fun <T : Any> suspendAndExpectReceive(with: StateMachineManager.FiberRequest): UntrustworthyData<T> {
val tmp = resumeWithObject ?: throw IllegalStateException("Expected to receive something")
resumeWithObject = null
return UntrustworthyData(tmp as T)
check(receivedPayload != null) { "Expected to receive something" }
val untrustworthy = UntrustworthyData(receivedPayload as T)
receivedPayload = null
return untrustworthy
@Suspendable @Suppress("UNCHECKED_CAST")
@ -108,11 +103,14 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberS
private fun suspend(with: StateMachineManager.FiberRequest) {
parkAndSerialize { fiber, serializer ->
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
val deserializer = getFiberSerializer(false) as KryoSerializer
val kryo = createKryo(deserializer.kryo)
suspendAction!!(with, this.serialize(kryo))
try {
suspendAction!!(with, this)
} catch (t: Throwable) {
logger.warn("Captured exception which was swallowed by Quasar", t)
// TODO to throw or not to throw, that is the question
throw t
@ -4,18 +4,15 @@ import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.FiberExecutorScheduler
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.codahale.metrics.Gauge
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.Kryo
import com.google.common.base.Throwables
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.abbreviate
import com.r3corda.core.messaging.MessageRecipients
import com.r3corda.core.messaging.runOnNextMessage
import com.r3corda.core.messaging.send
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolStateMachine
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.THREAD_LOCAL_KRYO
import com.r3corda.core.serialization.createKryo
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.*
import com.r3corda.core.then
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.core.utilities.trace
@ -27,7 +24,6 @@ import java.io.PrintWriter
import java.io.StringWriter
import java.util.*
import java.util.Collections.synchronizedMap
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.concurrent.ThreadSafe
@ -51,12 +47,10 @@ import javax.annotation.concurrent.ThreadSafe
* TODO: Timeouts
* TODO: Surfacing of exceptions via an API and/or management UI
* TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt
* TODO: Make Kryo (de)serialize markers for heavy objects that are currently in the service hub. This avoids mistakes
* where services are temporarily put on the stack.
* TODO: Implement stub/skel classes that provide a basic RPC framework on top of this.
class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStorage: CheckpointStorage, val executor: AffinityExecutor) {
class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableServices: List<Any>, val checkpointStorage: CheckpointStorage, val executor: AffinityExecutor) {
inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor)
val scheduler = FiberScheduler()
@ -76,13 +70,16 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
private val totalStartedProtocols = metrics.counter("Protocols.Started")
private val totalFinishedProtocols = metrics.counter("Protocols.Finished")
// Context for tokenized services in checkpoints
private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryo())
/** Returns a list of all state machines executing the given protocol logic at the top level (subprotocols do not count) */
fun <T> findStateMachines(klass: Class<out ProtocolLogic<T>>): List<Pair<ProtocolLogic<T>, ListenableFuture<T>>> {
fun <P : ProtocolLogic<T>, T> findStateMachines(protocolClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
synchronized(stateMachines) {
return stateMachines.keys
.map { it.logic }
.map { it to (it.psm as ProtocolStateMachineImpl<T>).resultFuture }
@ -94,57 +91,60 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
companion object {
var restoreCheckpointsOnStart = true
init {
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as ProtocolStateMachineImpl<*>).logger.error("Caught exception from protocol", throwable)
if (restoreCheckpointsOnStart)
/** Reads the database map and resurrects any serialised state machines. */
private fun restoreCheckpoints() {
for (checkpoint in checkpointStorage.checkpoints) {
// Grab the Kryo engine configured by Quasar for its own stuff, and then do our own configuration on top
// so we can deserialised the nested stream that holds the fiber.
val psm = deserializeFiber(checkpoint.serialisedFiber)
initFiber(psm, checkpoint)
val awaitingObjectOfType = Class.forName(checkpoint.awaitingObjectOfType)
val topic = checkpoint.awaitingTopic
fun start() {
checkpointStorage.checkpoints.forEach { restoreCheckpoint(it) }
psm.logger.info("restored ${psm.logic} - was previously awaiting on topic $topic")
private fun restoreCheckpoint(checkpoint: Checkpoint) {
val fiber = deserializeFiber(checkpoint.serialisedFiber)
initFiber(fiber, checkpoint)
// And now re-wire the deserialised continuation back up to the network service.
serviceHub.networkService.runOnNextMessage(topic, executor) { netMsg ->
// TODO: See security note below.
val obj: Any = THREAD_LOCAL_KRYO.get().readClassAndObject(Input(netMsg.data))
if (!awaitingObjectOfType.isInstance(obj))
throw ClassCastException("Received message of unexpected type: ${obj.javaClass.name} vs ${awaitingObjectOfType.name}")
psm.logger.trace { "<- $topic : message of type ${obj.javaClass.name}" }
iterateStateMachine(psm, obj) {
val topic = checkpoint.awaitingTopic
if (topic != null) {
val awaitingPayloadType = Class.forName(checkpoint.awaitingPayloadType)
fiber.logger.info("Restored ${fiber.logic} - it was previously waiting for message of type ${awaitingPayloadType.name} on topic $topic")
iterateOnResponse(fiber, awaitingPayloadType, checkpoint.serialisedFiber, topic) {
try {
Fiber.unparkDeserialized(fiber, scheduler)
} catch (e: Throwable) {
logError(e, it, topic, fiber)
} else {
fiber.logger.info("Restored ${fiber.logic} - it was not waiting on any message; received payload: ${checkpoint.receivedPayload.toString().abbreviate(50)}")
executor.executeASAP {
iterateStateMachine(fiber, checkpoint.receivedPayload) {
try {
Fiber.unparkDeserialized(it, scheduler)
} catch(e: Throwable) {
logError(e, obj, topic, it)
Fiber.unparkDeserialized(fiber, scheduler)
} catch (e: Throwable) {
logError(e, it, null, fiber)
private fun deserializeFiber(serialisedFiber: SerializedBytes<out ProtocolStateMachine<*>>): ProtocolStateMachineImpl<*> {
val deserializer = Fiber.getFiberSerializer(false) as KryoSerializer
val kryo = createKryo(deserializer.kryo)
return serialisedFiber.deserialize(kryo) as ProtocolStateMachineImpl<*>
private fun deserializeFiber(serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>): ProtocolStateMachineImpl<*> {
val kryo = quasarKryo()
// put the map of token -> tokenized into the kryo context
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
return serialisedFiber.deserialize(kryo)
private fun logError(e: Throwable, obj: Any, topic: String, psm: ProtocolStateMachineImpl<*>) {
private fun quasarKryo(): Kryo {
val serializer = Fiber.getFiberSerializer(false) as KryoSerializer
return createKryo(serializer.kryo)
private fun logError(e: Throwable, payload: Any?, topic: String?, psm: ProtocolStateMachineImpl<*>) {
psm.logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " +
"when handling a message of type ${obj.javaClass.name} on topic $topic")
"when handling a message of type ${payload?.javaClass?.name} on topic $topic")
if (psm.logger.isTraceEnabled) {
val s = StringWriter()
@ -152,11 +152,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
private fun initFiber(psm: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) {
stateMachines[psm] = checkpoint
psm.resultFuture.then(executor) {
psm.logic.progressTracker?.currentStep = ProgressTracker.DONE
val finalCheckpoint = stateMachines.remove(psm)
private fun initFiber(fiber: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) {
stateMachines[fiber] = checkpoint
fiber.resultFuture.then(executor) {
fiber.logic.progressTracker?.currentStep = ProgressTracker.DONE
val finalCheckpoint = stateMachines.remove(fiber)
if (finalCheckpoint != null) {
@ -176,7 +176,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
initFiber(fiber, null)
executor.executeASAP {
iterateStateMachine(fiber, null) {
@ -187,9 +187,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
private fun replaceCheckpoint(psm: ProtocolStateMachineImpl<*>, newCheckpoint: Checkpoint) {
// It's OK for this to be unsynchronised, as the prev/new byte arrays are specific to a continuation instance,
// and the underlying map provided by the database layer is expected to be thread safe.
private fun updateCheckpoint(psm: ProtocolStateMachineImpl<*>,
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>,
awaitingTopic: String?,
awaitingPayloadType: Class<*>?,
receivedPayload: Any?) {
val newCheckpoint = Checkpoint(serialisedFiber, awaitingTopic, awaitingPayloadType?.name, receivedPayload)
val previousCheckpoint = stateMachines.put(psm, newCheckpoint)
if (previousCheckpoint != null) {
@ -199,75 +202,93 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
private fun iterateStateMachine(psm: ProtocolStateMachineImpl<*>,
obj: Any?,
resumeFunc: (ProtocolStateMachineImpl<*>) -> Unit) {
receivedPayload: Any?,
resumeAction: (Any?) -> Unit) {
val onSuspend = fun(request: FiberRequest, serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>) {
// We have a request to do something: send, receive, or send-and-receive.
if (request is FiberRequest.ExpectingResponse<*>) {
// Prepare a listener on the network that runs in the background thread when we received a message.
checkpointAndSetupMessageHandler(psm, request, serialisedFiber)
// If an object to send was provided (not null), send it now.
request.obj?.let {
val topic = "${request.topic}.${request.sessionIDForSend}"
psm.logger.trace { "-> ${request.destination}/$topic : message of type ${it.javaClass.name}" }
serviceHub.networkService.send(topic, it, request.destination!!)
if (request is FiberRequest.NotExpectingResponse) {
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
iterateStateMachine(psm, null) {
try {
Fiber.unpark(it, QUASAR_UNBLOCKER)
} catch(e: Throwable) {
logError(e, request.obj!!, request.topic, it)
psm.prepareForResumeWith(serviceHub, receivedPayload) { request, serialisedFiber ->
psm.logger.trace { "Suspended fiber ${psm.id} ${psm.logic}" }
onNextSuspend(psm, request, serialisedFiber)
psm.logger.trace { "Waking up fiber ${psm.id} ${psm.logic}" }
private fun onNextSuspend(psm: ProtocolStateMachineImpl<*>,
request: FiberRequest,
fiber: ProtocolStateMachineImpl<*>) {
// We have a request to do something: send, receive, or send-and-receive.
if (request is FiberRequest.ExpectingResponse<*>) {
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
val kryo = quasarKryo()
// add the map of tokens -> tokenizedServices to the kyro context
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
val serialisedFiber = fiber.serialize(kryo)
// Prepare a listener on the network that runs in the background thread when we receive a message.
checkpointOnExpectingResponse(psm, request, serialisedFiber)
// If a non-null payload to send was provided, send it now.
request.payload?.let {
val topic = "${request.topic}.${request.sessionIDForSend}"
psm.logger.trace { "Sending message of type ${it.javaClass.name} using topic $topic to ${request.destination} (${it.toString().abbreviate(50)})" }
serviceHub.networkService.send(topic, it, request.destination!!)
if (request is FiberRequest.NotExpectingResponse) {
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
iterateStateMachine(psm, null) {
try {
Fiber.unpark(psm, QUASAR_UNBLOCKER)
} catch(e: Throwable) {
logError(e, request.payload, request.topic, psm)
psm.prepareForResumeWith(serviceHub, obj, onSuspend)
private fun checkpointAndSetupMessageHandler(psm: ProtocolStateMachineImpl<*>,
request: FiberRequest.ExpectingResponse<*>,
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>) {
private fun checkpointOnExpectingResponse(psm: ProtocolStateMachineImpl<*>,
request: FiberRequest.ExpectingResponse<*>,
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>) {
val topic = "${request.topic}.${request.sessionIDForReceive}"
val newCheckpoint = Checkpoint(serialisedFiber, topic, request.responseType.name)
replaceCheckpoint(psm, newCheckpoint)
psm.logger.trace { "Waiting for message of type ${request.responseType.name} on $topic" }
val consumed = AtomicBoolean()
updateCheckpoint(psm, serialisedFiber, topic, request.responseType, null)
psm.logger.trace { "Preparing to receive message of type ${request.responseType.name} on topic $topic" }
iterateOnResponse(psm, request.responseType, serialisedFiber, topic) {
try {
Fiber.unpark(psm, QUASAR_UNBLOCKER)
} catch(e: Throwable) {
logError(e, it, topic, psm)
private fun iterateOnResponse(psm: ProtocolStateMachineImpl<*>,
responseType: Class<*>,
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>,
topic: String,
resumeAction: (Any?) -> Unit) {
serviceHub.networkService.runOnNextMessage(topic, executor) { netMsg ->
// Some assertions to ensure we don't execute on the wrong thread or get executed more than once.
// Assertion to ensure we don't execute on the wrong thread.
check(netMsg.topic == topic) { "Topic mismatch: ${netMsg.topic} vs $topic" }
// TODO: This is insecure: we should not deserialise whatever we find and *then* check.
// We should instead verify as we read the data that it's what we are expecting and throw as early as
// possible. We only do it this way for convenience during the prototyping stage. Note that this means
// we could simply not require the programmer to specify the expected return type at all, and catch it
// at the last moment when we do the downcast. However this would make protocol code harder to read and
// make it more difficult to migrate to a more explicit serialisation scheme later.
val obj: Any = THREAD_LOCAL_KRYO.get().readClassAndObject(Input(netMsg.data))
if (!request.responseType.isInstance(obj))
throw IllegalStateException("Expected message of type ${request.responseType.name} but got ${obj.javaClass.name}", request.stackTraceInCaseOfProblems)
iterateStateMachine(psm, obj) {
try {
Fiber.unpark(it, QUASAR_UNBLOCKER)
} catch(e: Throwable) {
logError(e, obj, topic, it)
val payload = netMsg.data.deserialize<Any>()
check(responseType.isInstance(payload)) { "Expected message of type ${responseType.name} but got ${payload.javaClass.name}" }
// Update the fiber's checkpoint so that it's no longer waiting on a response, but rather has the received payload
updateCheckpoint(psm, serialisedFiber, null, null, payload)
psm.logger.trace { "Received message of type ${payload.javaClass.name} on topic $topic (${payload.toString().abbreviate(50)})" }
iterateStateMachine(psm, payload, resumeAction)
// TODO: Clean this up
open class FiberRequest(val topic: String, val destination: MessageRecipients?,
val sessionIDForSend: Long, val sessionIDForReceive: Long, val obj: Any?) {
open class FiberRequest(val topic: String,
val destination: MessageRecipients?,
val sessionIDForSend: Long,
val sessionIDForReceive: Long,
val payload: Any?) {
// This is used to identify where we suspended, in case of message mismatch errors and other things where we
// don't have the original stack trace because it's in a suspended fiber.
val stackTraceInCaseOfProblems = StackSnapshot()
@ -1,100 +1,46 @@
package com.r3corda.node.services.transactions
import com.r3corda.core.contracts.TimestampCommand
import com.r3corda.core.contracts.WireTransaction
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SignedData
import com.r3corda.core.crypto.signWithECDSA
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.UniquenessException
import com.r3corda.core.node.services.TimestampChecker
import com.r3corda.core.node.services.UniquenessProvider
import com.r3corda.core.noneOrSingle
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.protocols.NotaryError
import com.r3corda.protocols.NotaryException
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.protocols.NotaryProtocol
import java.security.KeyPair
* A Notary service acts as the final signer of a transaction ensuring two things:
* - The (optional) timestamp of the transaction is valid
* - None of the referenced input states have previously been consumed by a transaction signed by this Notary
* A transaction has to be signed by a Notary to be considered valid (except for output-only transactions w/o a timestamp)
* A transaction has to be signed by a Notary to be considered valid (except for output-only transactions without a timestamp).
* This is the base implementation that can be customised with specific Notary transaction commit protocol
class NotaryService(net: MessagingService,
val identity: Party,
val signingKey: KeyPair,
val uniquenessProvider: UniquenessProvider,
val timestampChecker: TimestampChecker) : AbstractNodeService(net) {
abstract class NotaryService(val smm: StateMachineManager,
net: MessagingService,
val timestampChecker: TimestampChecker,
val uniquenessProvider: UniquenessProvider) : AbstractNodeService(net) {
object Type : ServiceType("corda.notary")
private val logger = loggerFor<NotaryService>()
abstract val logger: org.slf4j.Logger
/** Implement a factory that specifies the transaction commit protocol for the notary service to use */
abstract val protocolFactory: NotaryProtocol.Factory
init {
check(identity.owningKey == signingKey.public)
{ req: NotaryProtocol.SignRequest -> processRequest(req.txBits, req.callerIdentity) },
{ message, e -> logger.error("Exception during notary service request processing", e) }
{ req: NotaryProtocol.Handshake -> processRequest(req) }
* Checks that the timestamp command is valid (if present) and commits the input state, or returns a conflict
* if any of the input states have been previously committed
* Note that the transaction is not checked for contract-validity, as that would require fully resolving it
* into a [TransactionForVerification], for which the caller would have to reveal the whole transaction history chain.
* As a result, the Notary _will commit invalid transactions_ as well, but as it also records the identity of
* the caller, it is possible to raise a dispute and verify the validity of the transaction and subsequently
* undo the commit of the input states (the exact mechanism still needs to be worked out)
* TODO: the notary service should only be able to see timestamp commands and inputs
fun processRequest(txBits: SerializedBytes<WireTransaction>, reqIdentity: Party): NotaryProtocol.Result {
val wtx = txBits.deserialize()
try {
commitInputStates(wtx, reqIdentity)
} catch(e: NotaryException) {
return NotaryProtocol.Result.withError(e.error)
val sig = sign(txBits)
return NotaryProtocol.Result.noError(sig)
private fun processRequest(req: NotaryProtocol.Handshake) {
val protocol = protocolFactory.create(req.replyTo as SingleMessageRecipient,
smm.add(NotaryProtocol.TOPIC, protocol)
private fun validateTimestamp(tx: WireTransaction) {
val timestampCmd = try {
tx.commands.noneOrSingle { it.value is TimestampCommand } ?: return
} catch (e: IllegalArgumentException) {
throw NotaryException(NotaryError.MoreThanOneTimestamp())
if (!timestampCmd.signers.contains(identity.owningKey))
throw NotaryException(NotaryError.NotForMe())
if (!timestampChecker.isValid(timestampCmd.value as TimestampCommand))
throw NotaryException(NotaryError.TimestampInvalid())
private fun commitInputStates(tx: WireTransaction, reqIdentity: Party) {
try {
uniquenessProvider.commit(tx, reqIdentity)
} catch (e: UniquenessException) {
val conflictData = e.error.serialize()
val signedConflict = SignedData(conflictData, sign(conflictData))
throw NotaryException(NotaryError.Conflict(tx, signedConflict))
private fun <T : Any> sign(bits: SerializedBytes<T>): DigitalSignature.LegallyIdentifiable {
return signingKey.signWithECDSA(bits, identity)
@ -0,0 +1,22 @@
package com.r3corda.node.services.transactions
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.TimestampChecker
import com.r3corda.core.node.services.UniquenessProvider
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.protocols.NotaryProtocol
/** A simple Notary service that does not perform transaction validation */
class SimpleNotaryService(
smm: StateMachineManager,
net: MessagingService,
timestampChecker: TimestampChecker,
uniquenessProvider: UniquenessProvider) : NotaryService(smm, net, timestampChecker, uniquenessProvider) {
object Type : ServiceType("corda.notary.simple")
override val logger = loggerFor<SimpleNotaryService>()
override val protocolFactory = NotaryProtocol.DefaultFactory
@ -0,0 +1,33 @@
package com.r3corda.node.services.transactions
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.TimestampChecker
import com.r3corda.core.node.services.UniquenessProvider
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.protocols.NotaryProtocol
import com.r3corda.protocols.ValidatingNotaryProtocol
/** A Notary service that validates the transaction chain of he submitted transaction before committing it */
class ValidatingNotaryService(
smm: StateMachineManager,
net: MessagingService,
timestampChecker: TimestampChecker,
uniquenessProvider: UniquenessProvider
) : NotaryService(smm, net, timestampChecker, uniquenessProvider) {
object Type : ServiceType("corda.notary.validating")
override val logger = loggerFor<ValidatingNotaryService>()
override val protocolFactory = object : NotaryProtocol.Factory {
override fun create(otherSide: SingleMessageRecipient,
sendSessionID: Long,
receiveSessionID: Long,
timestampChecker: TimestampChecker,
uniquenessProvider: UniquenessProvider): NotaryProtocol.Service {
return ValidatingNotaryProtocol(otherSide, sendSessionID, receiveSessionID, timestampChecker, uniquenessProvider)
@ -8,6 +8,7 @@ import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.Wallet
import com.r3corda.core.node.services.WalletService
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.ServiceHubInternal
@ -21,7 +22,7 @@ import javax.annotation.concurrent.ThreadSafe
* states relevant to us into a database and once such a wallet is implemented, this scaffolding can be removed.
class NodeWalletService(private val services: ServiceHubInternal) : WalletService {
class NodeWalletService(private val services: ServiceHubInternal) : SingletonSerializeAsToken(), WalletService {
private val log = loggerFor<NodeWalletService>()
// Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're
@ -129,7 +130,7 @@ class NodeWalletService(private val services: ServiceHubInternal) : WalletServic
m.register("WalletBalances.${balance.key}Pennies", newMetric)
metric.pennies = balance.value.pennies
metric.pennies = balance.value.quantity
@ -171,7 +172,7 @@ class NodeWalletService(private val services: ServiceHubInternal) : WalletServic
private fun calculateRandomlySizedAmounts(howMuch: Amount<Currency>, min: Int, max: Int, rng: Random): LongArray {
val numStates = min + Math.floor(rng.nextDouble() * (max - min)).toInt()
val amounts = LongArray(numStates)
val baseSize = howMuch.pennies / numStates
val baseSize = howMuch.quantity / numStates
var filledSoFar = 0L
for (i in 0..numStates - 1) {
if (i < numStates - 1) {
@ -180,7 +181,7 @@ class NodeWalletService(private val services: ServiceHubInternal) : WalletServic
filledSoFar += baseSize
} else {
// Handle inexact rounding.
amounts[i] = howMuch.pennies - filledSoFar
amounts[i] = howMuch.quantity - filledSoFar
return amounts
@ -2,7 +2,7 @@
"fixedLeg": {
"fixedRatePayer": "Bank A",
"notional": {
"pennies": 2500000000,
"quantity": 2500000000,
"token": "USD"
"paymentFrequency": "SemiAnnual",
@ -27,7 +27,7 @@
"floatingLeg": {
"floatingRatePayer": "Bank B",
"notional": {
"pennies": 2500000000,
"quantity": 2500000000,
"token": "USD"
"paymentFrequency": "Quarterly",
@ -56,7 +56,7 @@
"calculation": {
"expression": "( fixedLeg.notional.pennies * (fixedLeg.fixedRate.ratioUnit.value)) -(floatingLeg.notional.pennies * (calculation.fixingSchedule.get(context.getDate('currentDate')).rate.ratioUnit.value))",
"expression": "( fixedLeg.notional.quantity * (fixedLeg.fixedRate.ratioUnit.value)) -(floatingLeg.notional.quantity * (calculation.fixingSchedule.get(context.getDate('currentDate')).rate.ratioUnit.value))",
"floatingLegPaymentSchedule": {
"fixedLegPaymentSchedule": {
@ -67,19 +67,19 @@
"eligibleCurrency": "EUR",
"eligibleCreditSupport": "Cash in an Eligible Currency",
"independentAmounts": {
"pennies": 0,
"quantity": 0,
"token": "EUR"
"threshold": {
"pennies": 0,
"quantity": 0,
"token": "EUR"
"minimumTransferAmount": {
"pennies": 25000000,
"quantity": 25000000,
"token": "EUR"
"rounding": {
"pennies": 1000000,
"quantity": 1000000,
"token": "EUR"
"valuationDate": "Every Local Business Day",
@ -12,11 +12,11 @@ import com.r3corda.node.internal.testing.MockNetwork
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.persistence.NodeAttachmentService
import com.r3corda.node.services.transactions.NotaryService
import org.junit.Before
import org.junit.Test
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.protocols.FetchAttachmentsProtocol
import com.r3corda.protocols.FetchDataProtocol
import org.junit.Before
import org.junit.Test
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
@ -100,7 +100,7 @@ class AttachmentTests {
}, true, null, null, NetworkMapService.Type, NotaryService.Type)
}, true, null, null, NetworkMapService.Type, SimpleNotaryService.Type)
val n1 = network.createNode(n0.info)
// Insert an attachment into node zero's store directly.
@ -28,11 +28,11 @@ import com.r3corda.node.services.persistence.StorageServiceImpl
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.wallet.NodeWalletService
import com.r3corda.node.services.wallet.WalletImpl
import com.r3corda.protocols.TwoPartyTradeProtocol
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import com.r3corda.protocols.TwoPartyTradeProtocol
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.nio.file.Path
@ -218,6 +218,9 @@ class TwoPartyTradeProtocolTests {
assertEquals(bobFuture.get(), aliceFuture.get())
@ -347,7 +350,7 @@ class TwoPartyTradeProtocolTests {
fun `dependency with error on buyer side`() {
transactionGroupFor<ContractState> {
runWithError(true, false, "at least one cash input")
runWithError(true, false, "at least one asset input")
@ -5,6 +5,7 @@ import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.*
import com.r3corda.core.node.services.testing.MockStorageService
import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.Checkpoint
import com.r3corda.node.services.api.CheckpointStorage
import com.r3corda.node.services.api.MonitoringService
@ -40,7 +41,7 @@ class MockServices(
val storage: StorageService? = MockStorageService(),
val mapCache: NetworkMapCache? = MockNetworkMapCache(),
val mapService: NetworkMapService? = null,
val overrideClock: Clock? = Clock.systemUTC()
val overrideClock: Clock? = NodeClock()
) : ServiceHubInternal {
override val walletService: WalletService = customWallet ?: NodeWalletService(this)
@ -1,16 +1,19 @@
package com.r3corda.node.services
import com.r3corda.core.contracts.TimestampCommand
import com.r3corda.core.contracts.TransactionBuilder
import com.r3corda.core.seconds
import com.r3corda.core.testing.DUMMY_NOTARY
import com.r3corda.core.testing.DUMMY_NOTARY_KEY
import com.r3corda.node.internal.testing.MockNetwork
import com.r3corda.node.testutils.issueState
import org.junit.Before
import org.junit.Test
import com.r3corda.node.internal.testing.issueState
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.protocols.NotaryError
import com.r3corda.protocols.NotaryException
import com.r3corda.protocols.NotaryProtocol
import org.junit.Before
import org.junit.Test
import java.time.Instant
import java.util.concurrent.ExecutionException
import kotlin.test.assertEquals
@ -22,12 +25,14 @@ class NotaryServiceTests {
lateinit var notaryNode: MockNetwork.MockNode
lateinit var clientNode: MockNetwork.MockNode
fun setup() {
// TODO: Move into MockNetwork
@Before fun setup() {
net = MockNetwork()
notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
clientNode = net.createPartyNode(networkMapAddr = notaryNode.info)
notaryNode = net.createNode(
legalName = DUMMY_NOTARY.name,
advertisedServices = *arrayOf(NetworkMapService.Type, SimpleNotaryService.Type)
clientNode = net.createNode(networkMapAddress = notaryNode.info)
net.runNetwork() // Clear network map registration messages
@ -35,9 +40,9 @@ class NotaryServiceTests {
val inputState = issueState(clientNode)
val tx = TransactionBuilder().withItems(inputState)
tx.setTime(Instant.now(), DUMMY_NOTARY, 30.seconds)
var wtx = tx.toWireTransaction()
val wtx = tx.toWireTransaction()
val protocol = NotaryProtocol(wtx, NotaryProtocol.Companion.tracker())
val protocol = NotaryProtocol.Client(wtx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
@ -49,7 +54,7 @@ class NotaryServiceTests {
val inputState = issueState(clientNode)
val wtx = TransactionBuilder().withItems(inputState).toWireTransaction()
val protocol = NotaryProtocol(wtx, NotaryProtocol.Companion.tracker())
val protocol = NotaryProtocol.Client(wtx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
@ -61,9 +66,9 @@ class NotaryServiceTests {
val inputState = issueState(clientNode)
val tx = TransactionBuilder().withItems(inputState)
tx.setTime(Instant.now().plusSeconds(3600), DUMMY_NOTARY, 30.seconds)
var wtx = tx.toWireTransaction()
val wtx = tx.toWireTransaction()
val protocol = NotaryProtocol(wtx, NotaryProtocol.Companion.tracker())
val protocol = NotaryProtocol.Client(wtx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
@ -72,14 +77,32 @@ class NotaryServiceTests {
assertTrue(error is NotaryError.TimestampInvalid)
@Test fun `should report error for transaction with more than one timestamp`() {
val inputState = issueState(clientNode)
val tx = TransactionBuilder().withItems(inputState)
val timestamp = TimestampCommand(Instant.now(), 30.seconds)
tx.addCommand(timestamp, DUMMY_NOTARY.owningKey)
tx.addCommand(timestamp, DUMMY_NOTARY.owningKey)
val wtx = tx.toWireTransaction()
val protocol = NotaryProtocol.Client(wtx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
val ex = assertFailsWith(ExecutionException::class) { future.get() }
val error = (ex.cause as NotaryException).error
assertTrue(error is NotaryError.MoreThanOneTimestamp)
@Test fun `should report conflict for a duplicate transaction`() {
val inputState = issueState(clientNode)
val wtx = TransactionBuilder().withItems(inputState).toWireTransaction()
val firstSpend = NotaryProtocol(wtx)
val secondSpend = NotaryProtocol(wtx)
val firstSpend = NotaryProtocol.Client(wtx)
val secondSpend = NotaryProtocol.Client(wtx)
clientNode.smm.add("${NotaryProtocol.TOPIC}.first", firstSpend)
val future = clientNode.smm.add("${NotaryProtocol.TOPIC}.second", secondSpend)
val ex = assertFailsWith(ExecutionException::class) { future.get() }
@ -93,6 +93,6 @@ class PerFileCheckpointStorageTests {
private var checkpointCount = 1
private fun newCheckpoint() = Checkpoint(SerializedBytes(Ints.toByteArray(checkpointCount++)), "topic", "javaType")
private fun newCheckpoint() = Checkpoint(SerializedBytes(Ints.toByteArray(checkpointCount++)), "topic", "javaType", null)
@ -1,8 +1,8 @@
package com.r3corda.node.services
import com.r3corda.core.contracts.TimestampCommand
import com.r3corda.core.node.services.TimestampChecker
import com.r3corda.core.seconds
import com.r3corda.node.services.transactions.TimestampChecker
import org.junit.Test
import java.time.Clock
import java.time.Instant
@ -0,0 +1,47 @@
package com.r3corda.node.services
import com.r3corda.core.contracts.TransactionBuilder
import com.r3corda.core.testing.DUMMY_NOTARY
import com.r3corda.core.testing.DUMMY_NOTARY_KEY
import com.r3corda.node.internal.testing.MockNetwork
import com.r3corda.node.internal.testing.issueInvalidState
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.ValidatingNotaryService
import com.r3corda.protocols.NotaryError
import com.r3corda.protocols.NotaryException
import com.r3corda.protocols.NotaryProtocol
import org.junit.Before
import org.junit.Test
import java.util.concurrent.ExecutionException
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class ValidatingNotaryServiceTests {
lateinit var net: MockNetwork
lateinit var notaryNode: MockNetwork.MockNode
lateinit var clientNode: MockNetwork.MockNode
@Before fun setup() {
net = MockNetwork()
notaryNode = net.createNode(
legalName = DUMMY_NOTARY.name,
advertisedServices = *arrayOf(NetworkMapService.Type, ValidatingNotaryService.Type)
clientNode = net.createNode(networkMapAddress = notaryNode.info)
net.runNetwork() // Clear network map registration messages
@Test fun `should report error for invalid transaction dependency`() {
val inputState = issueInvalidState(clientNode)
val wtx = TransactionBuilder().withItems(inputState).toWireTransaction()
val protocol = NotaryProtocol.Client(wtx)
val future = clientNode.smm.add(NotaryProtocol.TOPIC, protocol)
val ex = assertFailsWith(ExecutionException::class) { future.get() }
val notaryError = (ex.cause as NotaryException).error
assertTrue(notaryError is NotaryError.TransactionInvalid, "Received wrong Notary error")
@ -2,8 +2,8 @@
"fixedLeg": {
"fixedRatePayer": "Bank A",
"notional": {
"pennies": 2500000000,
"currency": "USD"
"quantity": 2500000000,
"token": "USD"
"paymentFrequency": "SemiAnnual",
"effectiveDate": "2016-03-11",
@ -27,8 +27,8 @@
"floatingLeg": {
"floatingRatePayer": "Bank B",
"notional": {
"pennies": 2500000000,
"currency": "USD"
"quantity": 2500000000,
"token": "USD"
"paymentFrequency": "Quarterly",
"effectiveDate": "2016-03-11",
@ -56,7 +56,7 @@
"calculation": {
"expression": "( fixedLeg.notional.pennies * (fixedLeg.fixedRate.ratioUnit.value)) -(floatingLeg.notional.pennies * (calculation.fixingSchedule.get(context.getDate('currentDate')).rate.ratioUnit.value))",
"expression": "( fixedLeg.notional.quantity * (fixedLeg.fixedRate.ratioUnit.value)) -(floatingLeg.notional.quantity * (calculation.fixingSchedule.get(context.getDate('currentDate')).rate.ratioUnit.value))",
"floatingLegPaymentSchedule": {
"fixedLegPaymentSchedule": {
@ -67,20 +67,20 @@
"eligibleCurrency": "EUR",
"eligibleCreditSupport": "Cash in an Eligible Currency",
"independentAmounts": {
"pennies": 0,
"currency": "EUR"
"quantity": 0,
"token": "EUR"
"threshold": {
"pennies": 0,
"currency": "EUR"
"quantity": 0,
"token": "EUR"
"minimumTransferAmount": {
"pennies": 25000000,
"currency": "EUR"
"quantity": 25000000,
"token": "EUR"
"rounding": {
"pennies": 1000000,
"currency": "EUR"
"quantity": 1000000,
"token": "EUR"
"valuationDate": "Every Local Business Day",
"notificationTime": "2:00pm London",
@ -96,9 +96,9 @@
"addressForTransfers": "",
"exposure": {},
"localBusinessDay": [ "London" , "NewYork" ],
"dailyInterestAmount": "(CashAmount * InterestRate ) / (fixedLeg.notional.currency.currencyCode.equals('GBP')) ? 365 : 360",
"dailyInterestAmount": "(CashAmount * InterestRate ) / (fixedLeg.notional.token.currencyCode.equals('GBP')) ? 365 : 360",
"tradeID": "tradeXXX",
"hashLegalDocs": "put hash here"
"notary": "Bank A"
@ -1,5 +1,8 @@
package com.r3corda.demos
import com.r3corda.core.serialization.SerializeAsToken
import com.r3corda.core.serialization.SerializeAsTokenContext
import com.r3corda.core.serialization.SingletonSerializationToken
import com.r3corda.node.utilities.MutableClock
import java.time.*
import javax.annotation.concurrent.ThreadSafe
@ -8,7 +11,11 @@ import javax.annotation.concurrent.ThreadSafe
* A [Clock] that can have the date advanced for use in demos
class DemoClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock() {
class DemoClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock(), SerializeAsToken {
private val token = SingletonSerializationToken(this)
override fun toToken(context: SerializeAsTokenContext) = SingletonSerializationToken.registerWithContext(token, this, context)
@Synchronized fun updateDate(date: LocalDate): Boolean {
val currentDate = LocalDate.now(this)
@ -25,8 +32,9 @@ class DemoClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableC
return delegateClock.instant()
@Synchronized override fun withZone(zone: ZoneId): Clock {
return DemoClock(delegateClock.withZone(zone))
// Do not use this. Instead seek to use ZonedDateTime methods.
override fun withZone(zone: ZoneId): Clock {
throw UnsupportedOperationException("Tokenized clock does not support withZone()")
@Synchronized override fun getZone(): ZoneId {
@ -1,24 +1,24 @@
package com.r3corda.demos
import com.google.common.net.HostAndPort
import com.typesafe.config.ConfigFactory
import com.r3corda.core.crypto.Party
import com.r3corda.core.logElapsedTime
import com.r3corda.node.internal.Node
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.core.node.NodeInfo
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.transactions.NotaryService
import com.r3corda.core.node.services.ServiceType
import com.r3corda.node.services.messaging.ArtemisMessagingService
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.utilities.BriefLogFormatter
import com.r3corda.demos.api.InterestRateSwapAPI
import com.r3corda.demos.protocols.AutoOfferProtocol
import com.r3corda.demos.protocols.ExitServerProtocol
import com.r3corda.demos.protocols.UpdateBusinessDayProtocol
import com.r3corda.node.internal.Node
import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.node.services.messaging.ArtemisMessagingService
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.typesafe.config.ConfigFactory
import joptsimple.OptionParser
import java.nio.file.Files
import java.nio.file.Path
@ -65,12 +65,12 @@ fun main(args: Array<String>) {
val networkMapId = if (options.valueOf(networkMapNetAddr).equals(options.valueOf(networkAddressArg))) {
// This node provides network map and notary services
advertisedServices = setOf(NetworkMapService.Type, NotaryService.Type)
advertisedServices = setOf(NetworkMapService.Type, SimpleNotaryService.Type)
} else {
advertisedServices = setOf(NodeInterestRates.Type)
try {
nodeInfo(options.valueOf(networkMapNetAddr), options.valueOf(networkMapIdentityFile), setOf(NetworkMapService.Type, NotaryService.Type))
nodeInfo(options.valueOf(networkMapNetAddr), options.valueOf(networkMapIdentityFile), setOf(NetworkMapService.Type, SimpleNotaryService.Type))
} catch (e: Exception) {
@ -24,8 +24,7 @@ import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.node.services.messaging.ArtemisMessagingService
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.persistence.NodeAttachmentService
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.transactions.NotaryService
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.node.services.wallet.NodeWalletService
import com.r3corda.node.utilities.ANSIProgressRenderer
import com.r3corda.protocols.NotaryProtocol
@ -115,7 +114,7 @@ fun main(args: Array<String>) {
// the map is not very helpful, but we need one anyway. So just make the buyer side run the network map as it's
// the side that sticks around waiting for the seller.
val networkMapId = if (role == Role.BUYER) {
advertisedServices = setOf(NetworkMapService.Type, NotaryService.Type)
advertisedServices = setOf(NetworkMapService.Type, SimpleNotaryService.Type)
} else {
// In a real system, the identity file of the network map would be shipped with the server software, and there'd
@ -128,9 +127,6 @@ fun main(args: Array<String>) {
NodeInfo(ArtemisMessagingService.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type))
// TODO: Remove this once checkpoint resume works.
StateMachineManager.restoreCheckpointsOnStart = false
// And now construct then start the node object. It takes a little while.
val node = logElapsedTime("Node startup") {
Node(directory, myNetAddr, config, networkMapId, advertisedServices).start()
@ -175,10 +171,18 @@ fun runSeller(myNetAddr: HostAndPort, node: Node, theirNetAddr: HostAndPort) {
val otherSide = ArtemisMessagingService.makeRecipient(theirNetAddr)
val seller = TraderDemoProtocolSeller(myNetAddr, otherSide)
ANSIProgressRenderer.progressTracker = seller.progressTracker
node.smm.add("demo.seller", seller).get()
if (node.isPreviousCheckpointsPresent) {
node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).forEach {
ANSIProgressRenderer.progressTracker = it.first.progressTracker
} else {
val otherSide = ArtemisMessagingService.makeRecipient(theirNetAddr)
val seller = TraderDemoProtocolSeller(myNetAddr, otherSide)
ANSIProgressRenderer.progressTracker = seller.progressTracker
node.smm.add("demo.seller", seller).get()
@ -190,11 +194,18 @@ fun runBuyer(node: Node) {
// We use a simple scenario-specific wrapper protocol to make things happen.
val buyer = TraderDemoProtocolBuyer(attachmentsPath, node.info.identity)
ANSIProgressRenderer.progressTracker = buyer.progressTracker
// This thread will halt forever here.
node.smm.add("demo.buyer", buyer).get()
val future = if (node.isPreviousCheckpointsPresent) {
val (buyer, future) = node.smm.findStateMachines(TraderDemoProtocolBuyer::class.java).single()
ANSIProgressRenderer.progressTracker = buyer.progressTracker //TODO the SMM will soon be able to wire up the ANSIProgressRenderer automatially
} else {
// We use a simple scenario-specific wrapper protocol to make things happen.
val buyer = TraderDemoProtocolBuyer(attachmentsPath, node.info.identity)
ANSIProgressRenderer.progressTracker = buyer.progressTracker
node.smm.add("demo.buyer", buyer)
future.get() // This thread will halt forever here.
// We create a couple of ad-hoc test protocols that wrap the two party trade protocol, to give us the demo logic.
@ -339,7 +350,7 @@ class TraderDemoProtocolSeller(val myAddress: HostAndPort,
// Get the notary to sign it, thus committing the outputs.
val notarySig = subProtocol(NotaryProtocol(tx.toWireTransaction()))
val notarySig = subProtocol(NotaryProtocol.Client(tx.toWireTransaction()))
// Commit it to local storage.
@ -354,7 +365,7 @@ class TraderDemoProtocolSeller(val myAddress: HostAndPort,
val builder = TransactionBuilder()
CommercialPaper().generateMove(builder, issuance.tx.outRef(0), ownedBy)
val tx = builder.toSignedTransaction(true)
@ -363,4 +374,4 @@ class TraderDemoProtocolSeller(val myAddress: HostAndPort,
return move.tx.outRef(0)
Reference in New Issue
Block a user