Merged in cor-133-events-modelling (pull request #132)

Scheduled activities / events support
This commit is contained in:
Rick Parker 2016-06-30 09:11:37 +01:00
commit 4213da31b2
31 changed files with 1375 additions and 308 deletions

View File

@ -3,6 +3,9 @@ package com.r3corda.contracts
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.utilities.suggestInterestRateAnnouncementTimeWindow
import com.r3corda.protocols.TwoPartyDealProtocol
import org.apache.commons.jexl3.JexlBuilder
import org.apache.commons.jexl3.MapContext
import java.math.BigDecimal
@ -588,7 +591,7 @@ class InterestRateSwap() : Contract {
val floatingLeg: FloatingLeg,
val calculation: Calculation,
val common: Common
) : FixableDealState {
) : FixableDealState, SchedulableState {
override val contract = IRS_PROGRAM_ID
override val thread = SecureHash.sha256(common.tradeID)
@ -604,6 +607,14 @@ class InterestRateSwap() : Contract {
override val parties: Array<Party>
get() = arrayOf(fixedLeg.fixedRatePayer, floatingLeg.floatingRatePayer)
override fun nextScheduledActivity(thisStateRef: StateRef, protocolLogicRefFactory: ProtocolLogicRefFactory): ScheduledActivity? {
val nextFixingOf = nextFixingOf() ?: return null
// This is perhaps not how we should determine the time point in the business day, but instead expect the schedule to detail some of these aspects
val (instant, duration) = suggestInterestRateAnnouncementTimeWindow(index = nextFixingOf.name, source = floatingLeg.indexSource, date = nextFixingOf.forDay)
return ScheduledActivity(protocolLogicRefFactory.create(TwoPartyDealProtocol.FixingRoleDecider::class.java, thisStateRef, duration), instant)
}
// TODO: This changing of the public key violates the assumption that Party is a fixed identity key.
override fun withPublicKey(before: Party, after: PublicKey): DealState {
val newParty = Party(before.name, after)

View File

@ -14,7 +14,6 @@ import java.nio.file.Path
import java.time.Duration
import java.time.temporal.Temporal
import java.util.concurrent.Executor
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
import java.util.zip.ZipInputStream
import kotlin.concurrent.withLock
@ -143,9 +142,13 @@ inline fun <T> logElapsedTime(label: String, logger: Logger? = null, body: () ->
*
* val ii = state.locked { i }
*/
class ThreadBox<T>(content: T, val lock: Lock = ReentrantLock()) {
class ThreadBox<T>(content: T, val lock: ReentrantLock = ReentrantLock()) {
val content = content
inline fun <R> locked(body: T.() -> R): R = lock.withLock { body(content) }
inline fun <R> alreadyLocked(body: T.() -> R): R {
check(lock.isHeldByCurrentThread, { "Expected $lock to already be locked." })
return body(content)
}
}
/**

View File

@ -3,6 +3,8 @@ package com.r3corda.core.contracts
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.crypto.toStringShort
import com.r3corda.core.protocols.ProtocolLogicRef
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.serialization.OpaqueBytes
import com.r3corda.core.serialization.serialize
import java.io.FileNotFoundException
@ -141,6 +143,33 @@ interface OwnableState : ContractState {
fun withNewOwner(newOwner: PublicKey): Pair<CommandData, OwnableState>
}
/** Something which is scheduled to happen at a point in time */
interface Scheduled {
val scheduledAt: Instant
}
/**
* Represents a contract state (unconsumed output) of type [LinearState] and a point in time that a lifecycle event is expected to take place
* for that contract state.
*
* This is effectively the input to a scheduler, which wakes up at that point in time and asks the contract state what
* lifecycle processing needs to take place. e.g. a fixing or a late payment etc.
*/
data class ScheduledStateRef(val ref: StateRef, override val scheduledAt: Instant) : Scheduled
/**
* This class represents the lifecycle activity that a contract state of type [LinearState] would like to perform at a given point in time.
* e.g. run a fixing protocol
*
* Note the use of [ProtocolLogicRef] to represent a safe way to transport a [ProtocolLogic] out of the contract sandbox.
*
* Currently we support only protocol based activities as we expect there to be a transaction generated off the back of
* the activity, otherwise we have to start tracking secondary state on the platform of which scheduled activities
* for a particular [ContractState] have been processed/fired etc. If the activity is not "on ledger" then the
* scheduled activity shouldn't be either.
*/
data class ScheduledActivity(val logicRef: ProtocolLogicRef, override val scheduledAt: Instant) : Scheduled
/**
* A state that evolves by superseding itself, all of which share the common "thread"
*
@ -154,12 +183,24 @@ interface LinearState : ContractState {
fun isRelevant(ourKeys: Set<PublicKey>): Boolean
}
interface SchedulableState : ContractState {
/**
* Indicate whether there is some activity to be performed at some future point in time with respect to this
* [ContractState], what that activity is and at what point in time it should be initiated.
* This can be used to implement deadlines for payment or processing of financial instruments according to a schedule.
*
* The state has no reference to it's own StateRef, so supply that for use as input to any ProtocolLogic constructed.
*
* @return null if there is no activity to schedule
*/
fun nextScheduledActivity(thisStateRef: StateRef, protocolLogicRefFactory: ProtocolLogicRefFactory): ScheduledActivity?
}
/**
* Interface representing an agreement that exposes various attributes that are common. Implementing it simplifies
* implementation of general protocols that manipulate many agreement types.
*/
interface DealState : LinearState {
/** Human readable well known reference (e.g. trade reference) */
val ref: String
@ -187,8 +228,6 @@ interface DealState : LinearState {
interface FixableDealState : DealState {
/**
* When is the next fixing and what is the fixing for?
*
* TODO: In future we would use this to register for an event to trigger a/the fixing protocol
*/
fun nextFixingOf(): FixOf?

View File

@ -20,6 +20,7 @@ interface ServiceHub {
val storageService: StorageService
val networkService: MessagingService
val networkMapCache: NetworkMapCache
val schedulerService: SchedulerService
val clock: Clock
/**

View File

@ -1,5 +1,7 @@
package com.r3corda.core.node.services
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
@ -113,6 +115,17 @@ interface WalletService {
* the update.
*/
val updates: rx.Observable<Wallet.Update>
/**
* Provide a [Future] for when a [StateRef] is consumed, which can be very useful in building tests.
*/
fun whenConsumed(ref: StateRef): ListenableFuture<Wallet.Update> {
val future = SettableFuture.create<Wallet.Update>()
updates.filter { ref in it.consumed }.first().subscribe {
future.set(it)
}
return future
}
}
inline fun <reified T : LinearState> WalletService.linearHeadsOfType() = linearHeadsOfType_(T::class.java)
@ -172,4 +185,24 @@ interface TxWritableStorageService : StorageService {
override val validatedTransactions: TransactionStorage
}
/**
* Provides access to schedule activity at some point in time. This interface might well be expanded to
* increase the feature set in the future.
*
* If the point in time is in the past, the expectation is that the activity will happen shortly after it is scheduled.
*
* The main consumer initially is an observer of the wallet to schedule activities based on transactions as they are
* recorded.
*/
interface SchedulerService {
/**
* Schedule a new activity for a TX output, probably because it was just produced.
*
* Only one activity can be scheduled for a particular [StateRef] at any one time. Scheduling a [ScheduledStateRef]
* replaces any previously scheduled [ScheduledStateRef] for any one [StateRef].
*/
fun scheduleStateActivity(action: ScheduledStateRef)
/** Unschedule all activity for a TX output, probably because it was consumed. */
fun unscheduleStateActivity(ref: StateRef)
}

View File

@ -71,9 +71,15 @@ abstract class ProtocolLogic<T> {
private fun maybeWireUpProgressTracking(subLogic: ProtocolLogic<*>) {
val ours = progressTracker
val theirs = subLogic.progressTracker
if (ours != null && theirs != null)
if (ours != null && theirs != null) {
if (ours.currentStep == ProgressTracker.UNSTARTED) {
logger.warn("ProgressTracker has not been started for $this")
ours.nextStep()
}
ours.setChildProgressTracker(ours.currentStep, theirs)
}
}
/**

View File

@ -0,0 +1,163 @@
package com.r3corda.core.protocols
import com.google.common.primitives.Primitives
import com.r3corda.core.contracts.StateRef
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.protocols.TwoPartyDealProtocol
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.time.Duration
import java.util.*
import kotlin.reflect.KFunction
import kotlin.reflect.KParameter
import kotlin.reflect.jvm.javaType
import kotlin.reflect.primaryConstructor
/**
* A class for conversion to and from [ProtocolLogic] and [ProtocolLogicRef] instances
*
* Validation of types is performed on the way in and way out in case this object is passed between JVMs which might have differing
* whitelists.
*
* TODO: Ways to populate whitelist of "blessed" protocols per node/party
* TODO: Ways to populate argument types whitelist. Per node/party or global?
* TODO: Align with API related logic for passing in ProtocolLogic references (ProtocolRef)
* TODO: Actual support for AppContext / AttachmentsClassLoader
*/
class ProtocolLogicRefFactory(private val protocolLogicClassNameWhitelist: Set<String>, private val argsClassNameWhitelist: Set<String>) : SingletonSerializeAsToken() {
constructor() : this(setOf(TwoPartyDealProtocol.FixingRoleDecider::class.java.name), setOf(StateRef::class.java.name, Duration::class.java.name))
// Pending real dependence on AppContext for class loading etc
@Suppress("UNUSED_PARAMETER")
private fun validateProtocolClassName(className: String, appContext: AppContext) {
// TODO: make this specific to the attachments in the [AppContext] by including [SecureHash] in whitelist check
require(className in protocolLogicClassNameWhitelist) { "${ProtocolLogic::class.java.simpleName} of ${ProtocolLogicRef::class.java.simpleName} must have type on the whitelist: $className" }
}
// Pending real dependence on AppContext for class loading etc
@Suppress("UNUSED_PARAMETER")
private fun validateArgClassName(className: String, appContext: AppContext) {
// TODO: make this specific to the attachments in the [AppContext] by including [SecureHash] in whitelist check
require(className in argsClassNameWhitelist) { "Args to ${ProtocolLogicRef::class.java.simpleName} must have types on the args whitelist: $className" }
}
/**
* Create a [ProtocolLogicRef] for the Kotlin primary constructor or Java constructor and the given args.
*/
fun create(type: Class<out ProtocolLogic<*>>, vararg args: Any?): ProtocolLogicRef {
val constructor = type.kotlin.primaryConstructor ?: return createJava(type, *args)
if (constructor.parameters.size < args.size) {
throw IllegalProtocolLogicException(type, "due to too many arguments supplied to kotlin primary constructor")
}
// Build map of args from array
val argsMap = args.zip(constructor.parameters).map { Pair(it.second.name!!, it.first) }.toMap()
return createKotlin(type, argsMap)
}
/**
* Create a [ProtocolLogicRef] by trying to find a Kotlin constructor that matches the given args.
*
* TODO: Rethink language specific naming.
*/
fun createKotlin(type: Class<out ProtocolLogic<*>>, args: Map<String, Any?>): ProtocolLogicRef {
// TODO: we need to capture something about the class loader or "application context" into the ref,
// perhaps as some sort of ThreadLocal style object. For now, just create an empty one.
val appContext = AppContext(emptyList())
validateProtocolClassName(type.name, appContext)
// Check we can find a constructor and populate the args to it, but don't call it
createConstructor(appContext, type, args)
return ProtocolLogicRef(type.name, appContext, args)
}
/**
* Create a [ProtocolLogicRef] by trying to find a Java constructor that matches the given args.
*/
private fun createJava(type: Class<out ProtocolLogic<*>>, vararg args: Any?): ProtocolLogicRef {
// Build map for each
val argsMap = HashMap<String, Any?>(args.size)
var index = 0
args.forEach { argsMap["arg${index++}"] = it }
return createKotlin(type, argsMap)
}
fun toProtocolLogic(ref: ProtocolLogicRef): ProtocolLogic<*> {
validateProtocolClassName(ref.protocolLogicClassName, ref.appContext)
val klass = Class.forName(ref.protocolLogicClassName, true, ref.appContext.classLoader).asSubclass(ProtocolLogic::class.java)
return createConstructor(ref.appContext, klass, ref.args)()
}
private fun createConstructor(appContext: AppContext, clazz: Class<out ProtocolLogic<*>>, args: Map<String, Any?>): () -> ProtocolLogic<*> {
for (constructor in clazz.kotlin.constructors) {
val params = buildParams(appContext, constructor, args) ?: continue
// If we get here then we matched every parameter
return { constructor.callBy(params) }
}
throw IllegalProtocolLogicException(clazz, "as could not find matching constructor for: $args")
}
private fun buildParams(appContext: AppContext, constructor: KFunction<ProtocolLogic<*>>, args: Map<String, Any?>): HashMap<KParameter, Any?>? {
val params = hashMapOf<KParameter, Any?>()
val usedKeys = hashSetOf<String>()
for (parameter in constructor.parameters) {
if (!tryBuildParam(args, parameter, params)) {
return null
} else {
usedKeys += parameter.name!!
}
}
if ((args.keys - usedKeys).isNotEmpty()) {
// Not all args were used
return null
}
params.values.forEach { if (it is Any) validateArgClassName(it.javaClass.name, appContext) }
return params
}
private fun tryBuildParam(args: Map<String, Any?>, parameter: KParameter, params: HashMap<KParameter, Any?>): Boolean {
val containsKey = parameter.name in args
// OK to be missing if optional
return (parameter.isOptional && !containsKey) || (containsKey && paramCanBeBuilt(args, parameter, params))
}
private fun paramCanBeBuilt(args: Map<String, Any?>, parameter: KParameter, params: HashMap<KParameter, Any?>): Boolean {
val value = args[parameter.name]
params[parameter] = value
return (value is Any && parameterAssignableFrom(parameter.type.javaType, value)) || parameter.type.isMarkedNullable
}
private fun parameterAssignableFrom(type: Type, value: Any): Boolean {
if (type is Class<*>) {
if (type.isPrimitive) {
return Primitives.unwrap(value.javaClass) == type
} else {
return type.isAssignableFrom(value.javaClass)
}
} else if (type is ParameterizedType) {
return parameterAssignableFrom(type.rawType, value)
} else {
return false
}
}
}
class IllegalProtocolLogicException(type: Class<*>, msg: String) : IllegalArgumentException("${ProtocolLogicRef::class.java.simpleName} cannot be constructed for ${ProtocolLogic::class.java.simpleName} of type ${type.name} $msg")
/**
* A class representing a [ProtocolLogic] instance which would be possible to safely pass out of the contract sandbox
*
* Only allows a String reference to the ProtocolLogic class, and only allows restricted argument types as per [ProtocolLogicRefFactory]
*/
// TODO: align this with the existing [ProtocolRef] in the bank-side API (probably replace some of the API classes)
data class ProtocolLogicRef internal constructor(val protocolLogicClassName: String, val appContext: AppContext, val args: Map<String, Any?>)
/**
* This is just some way to track what attachments need to be in the class loader, but may later include some app
* properties loaded from the attachments. And perhaps the authenticated user for an API call?
*/
data class AppContext(val attachments: List<SecureHash>) {
// TODO: build a real [AttachmentsClassLoader] etc
val classLoader: ClassLoader
get() = this.javaClass.classLoader
}

View File

@ -0,0 +1,19 @@
package com.r3corda.core.utilities
import java.time.*
/**
* This whole file exists as short cuts to get demos working. In reality we'd have static data and/or rules engine
* defining things like this. It currently resides in the core module because it needs to be visible to the IRS
* contract.
*/
// We at some future point may implement more than just this constant announcement window and thus use the params.
@Suppress("UNUSED_PARAMETER")
fun suggestInterestRateAnnouncementTimeWindow(index: String, source: String, date: LocalDate): TimeWindow {
// TODO: we would ordinarily convert clock to same time zone as the index/source would announce in
// and suggest an announcement time for the interest rate
// Here we apply a blanket announcement time of 11:45 London irrespective of source or index
val time = LocalTime.of(11, 45)
val zoneId = ZoneId.of("Europe/London")
return TimeWindow(ZonedDateTime.of(date, time, zoneId).toInstant(), Duration.ofHours(24))
}

View File

@ -1,7 +1,6 @@
package com.r3corda.protocols
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.*
import com.r3corda.core.contracts.Fix
import com.r3corda.core.contracts.FixOf
import com.r3corda.core.contracts.TransactionBuilder
@ -10,8 +9,12 @@ import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.core.utilities.suggestInterestRateAnnouncementTimeWindow
import java.math.BigDecimal
import java.time.Duration
import java.time.Instant
import java.util.*
// This code is unit tested in NodeInterestRates.kt
@ -29,6 +32,7 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder,
private val fixOf: FixOf,
private val expectedRate: BigDecimal,
private val rateTolerance: BigDecimal,
private val timeOut: Duration,
override val progressTracker: ProgressTracker = RatesFixProtocol.tracker(fixOf.name)) : ProtocolLogic<Unit>() {
companion object {
val TOPIC = "platform.rates.interest.fix"
@ -44,7 +48,7 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder,
class FixOutOfRange(val byAmount: BigDecimal) : Exception()
class QueryRequest(val queries: List<FixOf>, replyTo: SingleMessageRecipient, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
class QueryRequest(val queries: List<FixOf>, replyTo: SingleMessageRecipient, sessionID: Long, val deadline: Instant) : AbstractRequestMessage(replyTo, sessionID)
class SignRequest(val tx: WireTransaction, replyTo: SingleMessageRecipient, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
@Suspendable
@ -92,7 +96,9 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder,
@Suspendable
fun query(): Fix {
val sessionID = random63BitValue()
val req = QueryRequest(listOf(fixOf), serviceHub.networkService.myAddress, sessionID)
val deadline = suggestInterestRateAnnouncementTimeWindow(fixOf.name, oracle.identity.name, fixOf.forDay).end
val req = QueryRequest(listOf(fixOf), serviceHub.networkService.myAddress, sessionID, deadline)
// TODO: add deadline to receive
val resp = sendAndReceive<ArrayList<Fix>>(TOPIC_QUERY, oracle.address, 0, sessionID, req)
return resp.validate {

View File

@ -1,6 +1,7 @@
package com.r3corda.protocols
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.TransientProperty
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party
@ -17,6 +18,7 @@ import java.math.BigDecimal
import java.security.KeyPair
import java.security.PublicKey
import java.security.SignatureException
import java.time.Duration
/**
* Classes for manipulating a two party deal or agreement.
@ -25,6 +27,8 @@ import java.security.SignatureException
*
* TODO: Also, the term Deal is used here where we might prefer Agreement.
*
* TODO: Consider whether we can merge this with [TwoPartyTradeProtocol]
*
*/
object TwoPartyDealProtocol {
val DEAL_TOPIC = "platform.deal"
@ -52,12 +56,7 @@ object TwoPartyDealProtocol {
* There's a good chance we can push at least some of this logic down into core protocol logic
* and helper methods etc.
*/
abstract class Primary<U>(val payload: U,
val otherSide: SingleMessageRecipient,
val otherSessionID: Long,
val myKeyPair: KeyPair,
val notaryNode: NodeInfo,
override val progressTracker: ProgressTracker = Primary.tracker()) : ProtocolLogic<SignedTransaction>() {
abstract class Primary<U>(override val progressTracker: ProgressTracker = Primary.tracker()) : ProtocolLogic<SignedTransaction>() {
companion object {
object AWAITING_PROPOSAL : ProgressTracker.Step("Handshaking and awaiting transaction proposal")
@ -71,6 +70,12 @@ object TwoPartyDealProtocol {
fun tracker() = ProgressTracker(AWAITING_PROPOSAL, VERIFYING, SIGNING, NOTARY, SENDING_SIGS, RECORDING, COPYING_TO_REGULATOR)
}
abstract val payload: U
abstract val notaryNode: NodeInfo
abstract val otherSide: SingleMessageRecipient
abstract val otherSessionID: Long
abstract val myKeyPair: KeyPair
@Suspendable
fun getPartialTransaction(): UntrustworthyData<SignedTransaction> {
progressTracker.currentStep = AWAITING_PROPOSAL
@ -79,7 +84,6 @@ object TwoPartyDealProtocol {
// Make the first message we'll send to kick off the protocol.
val hello = Handshake(payload, myKeyPair.public, sessionID)
val maybeSTX = sendAndReceive<SignedTransaction>(DEAL_TOPIC, otherSide, otherSessionID, sessionID, hello)
return maybeSTX
@ -143,6 +147,7 @@ object TwoPartyDealProtocol {
logger.trace { "Deal stored" }
progressTracker.currentStep = COPYING_TO_REGULATOR
val regulators = serviceHub.networkMapCache.regulators
if (regulators.isNotEmpty()) {
// Copy the transaction to every regulator in the network. This is obviously completely bogus, it's
@ -186,10 +191,7 @@ object TwoPartyDealProtocol {
* There's a good chance we can push at least some of this logic down into core protocol logic
* and helper methods etc.
*/
abstract class Secondary<U>(val otherSide: SingleMessageRecipient,
val notary: Party,
val sessionID: Long,
override val progressTracker: ProgressTracker = Secondary.tracker()) : ProtocolLogic<SignedTransaction>() {
abstract class Secondary<U>(override val progressTracker: ProgressTracker = Secondary.tracker()) : ProtocolLogic<SignedTransaction>() {
companion object {
object RECEIVING : ProgressTracker.Step("Waiting for deal info")
@ -201,6 +203,9 @@ object TwoPartyDealProtocol {
fun tracker() = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SWAPPING_SIGNATURES, RECORDING)
}
abstract val otherSide: SingleMessageRecipient
abstract val sessionID: Long
@Suspendable
override fun call(): SignedTransaction {
val handshake = receiveAndValidateHandshake()
@ -241,7 +246,7 @@ object TwoPartyDealProtocol {
@Suspendable
private fun swapSignaturesWithPrimary(stx: SignedTransaction, theirSessionID: Long): SignaturesFromPrimary {
progressTracker.currentStep = SWAPPING_SIGNATURES
logger.trace { "Sending partially signed transaction to seller" }
logger.trace { "Sending partially signed transaction to other party" }
// TODO: Protect against the seller terminating here and leaving us in the lurch without the final tx.
@ -271,45 +276,47 @@ object TwoPartyDealProtocol {
/**
* One side of the protocol for inserting a pre-agreed deal.
*/
open class Instigator<T : DealState>(otherSide: SingleMessageRecipient,
notaryNode: NodeInfo,
dealBeingOffered: T,
myKeyPair: KeyPair,
buyerSessionID: Long,
override val progressTracker: ProgressTracker = Primary.tracker()) : Primary<T>(dealBeingOffered, otherSide, buyerSessionID, myKeyPair, notaryNode)
open class Instigator<T : DealState>(override val otherSide: SingleMessageRecipient,
val notary: Party,
override val payload: T,
override val myKeyPair: KeyPair,
override val otherSessionID: Long,
override val progressTracker: ProgressTracker = Primary.tracker()) : Primary<T>() {
override val notaryNode: NodeInfo get() =
serviceHub.networkMapCache.notaryNodes.filter { it.identity == notary }.single()
}
/**
* One side of the protocol for inserting a pre-agreed deal.
*/
open class Acceptor<T : DealState>(otherSide: SingleMessageRecipient,
notary: Party,
open class Acceptor<T : DealState>(override val otherSide: SingleMessageRecipient,
val notary: Party,
val dealToBuy: T,
sessionID: Long,
override val progressTracker: ProgressTracker = Secondary.tracker()) : Secondary<T>(otherSide, notary, sessionID) {
override val sessionID: Long,
override val progressTracker: ProgressTracker = Secondary.tracker()) : Secondary<T>() {
override fun validateHandshake(handshake: Handshake<T>): Handshake<T> {
with(handshake) {
// What is the seller trying to sell us?
val deal: T = handshake.payload
val otherKey = handshake.publicKey
logger.trace { "Got deal request for: ${handshake.payload}" }
// What is the seller trying to sell us?
val deal: T = handshake.payload
val otherKey = handshake.publicKey
logger.trace { "Got deal request for: ${handshake.payload.ref}" }
// Check the start message for acceptability.
check(handshake.sessionID > 0)
if (dealToBuy != deal)
throw DealMismatchException(dealToBuy, deal)
// Check the start message for acceptability.
check(handshake.sessionID > 0)
check(dealToBuy == deal)
// We need to substitute in the new public keys for the Parties
val myName = serviceHub.storageService.myLegalIdentity.name
val myOldParty = deal.parties.single { it.name == myName }
val theirOldParty = deal.parties.single { it.name != myName }
// We need to substitute in the new public keys for the Parties
val myName = serviceHub.storageService.myLegalIdentity.name
val myOldParty = deal.parties.single { it.name == myName }
val theirOldParty = deal.parties.single { it.name != myName }
@Suppress("UNCHECKED_CAST")
val newDeal = deal.
withPublicKey(myOldParty, serviceHub.keyManagementService.freshKey().public).
withPublicKey(theirOldParty, otherKey) as T
@Suppress("UNCHECKED_CAST")
val newDeal = deal.
withPublicKey(myOldParty, serviceHub.keyManagementService.freshKey().public).
withPublicKey(theirOldParty, otherKey) as T
return handshake.copy(payload = newDeal)
}
return handshake.copy(payload = newDeal)
}
@ -328,59 +335,62 @@ object TwoPartyDealProtocol {
* One side of the fixing protocol for an interest rate swap, but could easily be generalised further.
*
* Do not infer too much from the name of the class. This is just to indicate that it is the "side"
* of the protocol that is run by the party with the fixed leg of swap deal, which is the basis for decided
* of the protocol that is run by the party with the fixed leg of swap deal, which is the basis for deciding
* who does what in the protocol.
*/
open class Fixer<T : FixableDealState>(otherSide: SingleMessageRecipient,
notary: Party,
val dealToFix: StateAndRef<T>,
sessionID: Long,
val replacementProgressTracker: ProgressTracker? = null) : Secondary<StateRef>(otherSide, notary, sessionID) {
private val ratesFixTracker = RatesFixProtocol.tracker(dealToFix.state.data.nextFixingOf()!!.name)
class Fixer(val initiation: FixingSessionInitiation, override val progressTracker: ProgressTracker = Secondary.tracker()) : Secondary<StateRef>() {
override val progressTracker: ProgressTracker = replacementProgressTracker ?: createTracker()
override val sessionID: Long get() = initiation.sessionID
fun createTracker(): ProgressTracker = Secondary.tracker().apply {
setChildProgressTracker(SIGNING, ratesFixTracker)
}
override val otherSide: SingleMessageRecipient get() = initiation.sender
private lateinit var txState: TransactionState<*>
private lateinit var deal: FixableDealState
override fun validateHandshake(handshake: Handshake<StateRef>): Handshake<StateRef> {
with(handshake) {
logger.trace { "Got fixing request for: ${dealToFix.state}" }
logger.trace { "Got fixing request for: ${handshake.payload}" }
// Check the start message for acceptability.
if (dealToFix.ref != handshake.payload)
throw DealRefMismatchException(dealToFix.ref, handshake.payload)
// Check the handshake and initiation for acceptability.
check(handshake.sessionID > 0)
txState = serviceHub.loadState(handshake.payload)
deal = txState.data as FixableDealState
return handshake
}
// validate the party that initiated is the one on the deal and that the recipient corresponds with it.
// TODO: this is in no way secure and will be replaced by general session initiation logic in the future
val myName = serviceHub.storageService.myLegalIdentity.name
val otherParty = deal.parties.filter { it.name != myName }.single()
check(otherParty == initiation.party)
val otherPartyAddress = serviceHub.networkMapCache.getNodeByLegalName(otherParty.name)!!.address
check(otherPartyAddress == otherSide)
// Also check we are one of the parties
deal.parties.filter { it.name == myName }.single()
return handshake
}
@Suspendable
override fun assembleSharedTX(handshake: Handshake<StateRef>): Pair<TransactionBuilder, List<PublicKey>> {
val fixOf = dealToFix.state.data.nextFixingOf()!!
@Suppress("UNCHECKED_CAST")
val fixOf = deal.nextFixingOf()!!
// TODO Do we need/want to substitute in new public keys for the Parties?
val myName = serviceHub.storageService.myLegalIdentity.name
val deal: T = dealToFix.state.data
val myOldParty = deal.parties.single { it.name == myName }
@Suppress("UNCHECKED_CAST")
val newDeal = deal
val ptx = TransactionType.General.Builder()
val addFixing = object : RatesFixProtocol(ptx, serviceHub.networkMapCache.ratesOracleNodes[0], fixOf, BigDecimal.ZERO, BigDecimal.ONE) {
val addFixing = object : RatesFixProtocol(ptx, serviceHub.networkMapCache.ratesOracleNodes[0], fixOf, BigDecimal.ZERO, BigDecimal.ONE, initiation.timeout) {
@Suspendable
override fun beforeSigning(fix: Fix) {
newDeal.generateFix(ptx, dealToFix, fix)
newDeal.generateFix(ptx, StateAndRef(txState, handshake.payload), fix)
// And add a request for timestamping: it may be that none of the contracts need this! But it can't hurt
// to have one.
ptx.setTime(serviceHub.clock.instant(), notary, 30.seconds)
ptx.setTime(serviceHub.clock.instant(), txState.notary, 30.seconds)
}
}
subProtocol(addFixing)
return Pair(ptx, arrayListOf(myOldParty.owningKey))
}
}
@ -392,11 +402,72 @@ object TwoPartyDealProtocol {
* is just the "side" of the protocol run by the party with the floating leg as a way of deciding who
* does what in the protocol.
*/
open class Floater<T : FixableDealState>(otherSide: SingleMessageRecipient,
otherSessionID: Long,
notary: NodeInfo,
dealToFix: StateAndRef<T>,
myKeyPair: KeyPair,
val sessionID: Long,
override val progressTracker: ProgressTracker = Primary.tracker()) : Primary<StateRef>(dealToFix.ref, otherSide, otherSessionID, myKeyPair, notary)
class Floater(override val payload: StateRef,
override val otherSessionID: Long,
override val progressTracker: ProgressTracker = Primary.tracker()) : Primary<StateRef>() {
@Suppress("UNCHECKED_CAST")
internal val dealToFix: StateAndRef<FixableDealState> by TransientProperty {
val state = serviceHub.loadState(payload) as TransactionState<FixableDealState>
StateAndRef(state, payload)
}
override val myKeyPair: KeyPair get() {
val myName = serviceHub.storageService.myLegalIdentity.name
val publicKey = dealToFix.state.data.parties.filter { it.name == myName }.single().owningKey
return serviceHub.keyManagementService.toKeyPair(publicKey)
}
override val otherSide: SingleMessageRecipient get() {
// TODO: what happens if there's no node? Move to messaging taking Party and then handled in messaging layer
val myName = serviceHub.storageService.myLegalIdentity.name
val otherParty = dealToFix.state.data.parties.filter { it.name != myName }.single()
return serviceHub.networkMapCache.getNodeByLegalName(otherParty.name)!!.address
}
override val notaryNode: NodeInfo get() =
serviceHub.networkMapCache.notaryNodes.filter { it.identity == dealToFix.state.notary }.single()
}
/** This topic exists purely for [FixingSessionInitiation] to be sent from [FixingRoleDecider] to [FixingSessionInitiationHandler] */
val FIX_INITIATE_TOPIC = "platform.fix.initiate"
/** Used to set up the session between [Floater] and [Fixer] */
data class FixingSessionInitiation(val sessionID: Long, val party: Party, val sender: SingleMessageRecipient, val timeout: Duration)
/**
* This protocol looks at the deal and decides whether to be the Fixer or Floater role in agreeing a fixing.
*
* It is kicked off as an activity on both participant nodes by the scheduler when it's time for a fixing. If the
* Fixer role is chosen, then that will be initiated by the [FixingSessionInitiation] message sent from the other party and
* handled by the [FixingSessionInitiationHandler].
*
* TODO: Replace [FixingSessionInitiation] and [FixingSessionInitiationHandler] with generic session initiation logic once it exists.
*/
class FixingRoleDecider(val ref: StateRef, val timeout: Duration, override val progressTracker: ProgressTracker = tracker(ref.toString())) : ProtocolLogic<Unit>() {
companion object {
class LOADING(ref: String) : ProgressTracker.Step("Loading state $ref to decide fixing role")
fun tracker(ref: String) = ProgressTracker(LOADING(ref))
}
@Suspendable
override fun call(): Unit {
progressTracker.nextStep()
val dealToFix = serviceHub.loadState(ref)
// TODO: this is not the eventual mechanism for identifying the parties
val sortedParties = (dealToFix.data as FixableDealState).parties.sortedBy { it.name }
if (sortedParties[0].name == serviceHub.storageService.myLegalIdentity.name) {
// Generate sessionID
val sessionID = random63BitValue()
val initation = FixingSessionInitiation(sessionID, sortedParties[0], serviceHub.networkService.myAddress, timeout)
// Send initiation to other side to launch one side of the fixing protocol (the Fixer).
send(FIX_INITIATE_TOPIC, serviceHub.networkMapCache.getNodeByLegalName(sortedParties[1].name)!!.address, 0, initation)
// Then start the other side of the fixing protocol.
val protocol = Floater(ref, sessionID)
subProtocol(protocol)
}
}
}
}

View File

@ -0,0 +1,42 @@
package com.r3corda.core.protocols;
import com.google.common.collect.Sets;
import org.junit.Test;
public class ProtocolLogicRefFromJavaTest {
public static class JavaProtocolLogic extends ProtocolLogic<Void> {
public JavaProtocolLogic(int A, String b) {
}
@Override
public Void call() {
return null;
}
}
public static class JavaNoArgProtocolLogic extends ProtocolLogic<Void> {
public JavaNoArgProtocolLogic() {
}
@Override
public Void call() {
return null;
}
}
@Test
public void test() {
ProtocolLogicRefFactory factory = new ProtocolLogicRefFactory(Sets.newHashSet(JavaProtocolLogic.class.getName()), Sets.newHashSet(Integer.class.getName(), String.class.getName()));
factory.create(JavaProtocolLogic.class, 1, "Hello Jack");
}
@Test
public void testNoArg() {
ProtocolLogicRefFactory factory = new ProtocolLogicRefFactory(Sets.newHashSet(JavaNoArgProtocolLogic.class.getName()), Sets.newHashSet(Integer.class.getName(), String.class.getName()));
factory.create(JavaNoArgProtocolLogic.class);
}
}

View File

@ -0,0 +1,80 @@
package com.r3corda.core.protocols
import com.google.common.collect.Sets
import com.r3corda.core.days
import org.junit.Before
import org.junit.Test
import java.time.Duration
class ProtocolLogicRefTest {
@Suppress("UNUSED_PARAMETER") // We will never use A or b
class KotlinProtocolLogic(A: Int, b: String) : ProtocolLogic<Unit>() {
constructor() : this(1, "2")
constructor(C: String) : this(1, C)
constructor(illegal: Duration) : this(1, illegal.toString())
override fun call(): Unit {
}
}
class KotlinNoArgProtocolLogic : ProtocolLogic<Unit>() {
override fun call(): Unit {
}
}
@Suppress("UNUSED_PARAMETER") // We will never use A or b
class NotWhiteListedKotlinProtocolLogic(A: Int, b: String) : ProtocolLogic<Unit>() {
override fun call(): Unit {
}
}
lateinit var factory: ProtocolLogicRefFactory
@Before
fun setup() {
// We have to allow Java boxed primitives but Kotlin warns we shouldn't be using them
factory = ProtocolLogicRefFactory(Sets.newHashSet(KotlinProtocolLogic::class.java.name, KotlinNoArgProtocolLogic::class.java.name),
Sets.newHashSet(@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") Integer::class.java.name, String::class.java.name))
}
@Test
fun testCreateKotlinNoArg() {
factory.create(KotlinNoArgProtocolLogic::class.java)
}
@Test
fun testCreateKotlin() {
val args = mapOf(Pair("A", 1), Pair("b", "Hello Jack"))
factory.createKotlin(KotlinProtocolLogic::class.java, args)
}
@Test
fun testCreatePrimary() {
factory.create(KotlinProtocolLogic::class.java, 1, "Hello Jack")
}
@Test(expected = IllegalArgumentException::class)
fun testCreateNotWhiteListed() {
factory.create(NotWhiteListedKotlinProtocolLogic::class.java, 1, "Hello Jack")
}
@Test
fun testCreateKotlinVoid() {
factory.createKotlin(KotlinProtocolLogic::class.java, emptyMap())
}
@Test
fun testCreateKotlinNonPrimary() {
val args = mapOf(Pair("C", "Hello Jack"))
factory.createKotlin(KotlinProtocolLogic::class.java, args)
}
@Test(expected = IllegalArgumentException::class)
fun testCreateArgNotWhiteListed() {
val args = mapOf(Pair("illegal", 1.days))
factory.createKotlin(KotlinProtocolLogic::class.java, args)
}
}

View File

@ -0,0 +1,102 @@
.. highlight:: kotlin
.. raw:: html
<script type="text/javascript" src="_static/jquery.js"></script>
<script type="text/javascript" src="_static/codesets.js"></script>
Event scheduling
================
This article explains our experimental approach to modelling time based events in code. It explains how a contract
state can expose an upcoming event and what action to take if the scheduled time for that event is reached.
Introduction
------------
Many financial instruments have time sensitive components to them. For example, an Interest Rate Swap has a schedule
for when:
* Interest rate fixings should take place for floating legs, so that the interest rate used as the basis for payments
can be agreed.
* Any payments between the parties are expected to take place.
* Any payments between the parties become overdue.
Each of these is dependent on the current state of the financial instrument. What payments and interest rate fixings
have already happened should already be recorded in the state, for example. This means that the *next* time sensitive
event is thus a property of the current contract state. By next, we mean earliest in chronological terms, that is still
due. If a contract state is consumed in the UTXO model, then what *was* the next event becomes irrelevant and obsolete
and the next time sensitive event is determined by any successor contract state.
Knowing when the next time sensitive event is due to occur is useful, but typically some *activity* is expected to take
place when this event occurs. We already have a model for business processes in the form of the protocol state machines,
so in the platform we have introduced the concept of *scheduled activities* that can invoke protocol state machines
at a scheduled time. A contract state can optionally described the next scheduled activity for itself. If it omits
to do so, then nothing will be scheduled.
How to implement scheduled events
---------------------------------
There are two main steps to implementing scheduled events:
* Have your ``ContractState`` implementation also implement ``SchedulableState``. This requires a method named
``nextScheduledActivity`` to be implemented which returns an optional ``ScheduledActivity`` instance.
``ScheduledActivity`` captures what ``ProtocolLogic`` instance each node will run, to perform the activity, and when it
will run is described by a ``java.time.Instant``. Once your state implements this interface and is tracked by the
wallet, it can expect to be queried for the next activity when recorded via the ``ServiceHub.recordTransactions``
method during protocols execution.
* If nothing suitable exists, implement a ``ProtocolLogic`` to be executed by each node as the activity itself.
The important thing to remember is that each node that is party to the transaction, in the current implementation,
will execute the same ``ProtocolLogic`` so that needs to establish roles in the business process based on the contract
state and the node it is running on, and follow different but complementary paths through the business logic.
.. note:: The scheduler's clock always operates in the UTC time zone for uniformity, so any time zone logic must be
performed by the contract, using ``ZonedDateTime``.
In the short term, until we have automatic protocol session set up, you will also likely need to install a network
handler to help with obtaining a unqiue and secure random session. An example is described below.
The production and consumption of ``ContractStates`` is observed by the scheduler and the activities associated with
any consumed states are unscheduled. Any newly produced states are then queried via the ``nextScheduledActivity``
method and if they do not return ``null`` then that activity is scheduled based on the content of the
``ScheduledActivity`` object returned.
An example
----------
Let's take an example of the Interest Rate Swap fixings for our scheduled events. The first task is to implement the
``nextScheduledActivity`` method on the ``State``.
.. container:: codeset
.. sourcecode:: kotlin
override fun nextScheduledActivity(thisStateRef: StateRef,
protocolLogicRefFactory: ProtocolLogicRefFactory): ScheduledActivity? {
val nextFixingOf = nextFixingOf() ?: return null
// This is perhaps not how we should determine the time point in the business day, but instead expect the
// schedule to detail some of these aspects.
val (instant, duration) = suggestInterestRateAnnouncementTimeWindow(index = nextFixingOf.name,
source = floatingLeg.indexSource,
date = nextFixingOf.forDay)
return ScheduledActivity(protocolLogicRefFactory.create(TwoPartyDealProtocol.FixingRoleDecider::class.java,
thisStateRef, duration), instant)
}
The first thing this does is establish if there are any remaining fixings. If there are none, then it returns ``null``
to indicate that there is no activity to schedule. Otherwise it calculates the ``Instant`` at which the interest rate
should become available and schedules an activity at that time to work out what roles each node will take in the fixing
business process and to take on those roles. That ``ProtocolLogic`` will be handed the ``StateRef`` for the interest
rate swap ``State`` in question, as well as a tolerance ``Duration`` of how long to wait after the activity is triggered
for the interest rate before indicating an error.
.. note:: The use of the factory to create a ``ProtocolLogicRef`` instance to embed in the ``ScheduledActivity``. This is a
way to create a reference to the ``ProtocolLogic`` class and it's constructor parameters to instantiate that can be
checked against a per node whitelist of approved and allowable types as part of our overall security sandboxing.
As previously mentioned, we currently need a small network handler to assist with session setup until the work to
automate that is complete. See the interest rate swap specific implementation ``FixingSessionInitiationHandler`` which
is responsible for starting a ``ProtocolLogic`` to perform one role in the fixing protocol with the ``sessionID`` sent
by the ``FixingRoleDecider`` on the other node which then launches the other role in the fixing protocol. Currently
the handler needs to be manually installed in the node.

View File

@ -41,6 +41,7 @@ Read on to learn:
tutorial-contract
protocol-state-machines
oracles
event-scheduling
.. toctree::
:maxdepth: 2

View File

@ -12,6 +12,8 @@ import com.r3corda.core.node.CityDatabase
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.PhysicalLocation
import com.r3corda.core.node.services.*
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.random63BitValue
import com.r3corda.core.seconds
import com.r3corda.core.serialization.deserialize
@ -24,6 +26,8 @@ import com.r3corda.node.services.api.MonitoringService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.events.NodeSchedulerService
import com.r3corda.node.services.events.ScheduledActivityObserver
import com.r3corda.node.services.identity.InMemoryIdentityService
import com.r3corda.node.services.keys.E2ETestKeyManagementService
import com.r3corda.node.services.network.InMemoryNetworkMapCache
@ -48,7 +52,6 @@ import java.nio.file.Path
import java.security.KeyPair
import java.security.Security
import java.time.Clock
import java.time.Instant
import java.util.*
/**
@ -89,9 +92,17 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
override val walletService: WalletService get() = wallet
override val keyManagementService: KeyManagementService get() = keyManagement
override val identityService: IdentityService get() = identity
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val schedulerService: SchedulerService get() = scheduler
override val clock: Clock = platformClock
// Internal only
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val protocolLogicRefFactory = ProtocolLogicRefFactory()
override fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
return smm.add(loggerName, logic)
}
override fun recordTransactions(txs: Iterable<SignedTransaction>) =
recordTransactionsInternal(storage, txs)
}
@ -112,6 +123,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
lateinit var identity: IdentityService
lateinit var net: MessagingService
lateinit var api: APIServer
lateinit var scheduler: SchedulerService
var isPreviousCheckpointsPresent = false
private set
@ -140,7 +152,11 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
// the identity key. But the infrastructure to make that easy isn't here yet.
keyManagement = E2ETestKeyManagementService(setOf(storage.myLegalIdentityKey))
api = APIServerImpl(this)
smm = StateMachineManager(services, listOf(storage, net, wallet, keyManagement, identity, platformClock), checkpointStorage, serverThread)
scheduler = NodeSchedulerService(services)
smm = StateMachineManager(services,
listOf(storage, net, wallet, keyManagement, identity, platformClock, scheduler, interestRatesService),
checkpointStorage,
serverThread)
// This object doesn't need to be referenced from this class because it registers handlers on the network
// service and so that keeps it from being collected.
@ -154,6 +170,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
ANSIProgressObserver(smm)
// Add wallet observers
CashBalanceAsMetricsObserver(services)
ScheduledActivityObserver(services)
startMessagingService()
_networkMapRegistrationFuture.setFuture(registerWithNetworkMap())
@ -210,7 +227,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
private fun updateRegistration(serviceInfo: NodeInfo, type: AddOrRemove): ListenableFuture<NetworkMapService.RegistrationResponse> {
// Register this node against the network
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val expires = platformClock.instant() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(info, networkMapSeq++, type, expires)
val sessionID = random63BitValue()
val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID)
@ -227,7 +244,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
}
open protected fun makeNetworkMapService() {
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val expires = platformClock.instant() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(info, Long.MAX_VALUE, AddOrRemove.ADD, expires)
inNodeNetworkMapService = InMemoryNetworkMapService(net, reg, services.networkMapCache)
}

View File

@ -15,6 +15,7 @@ import com.r3corda.core.node.services.linearHeadsOfType
import com.r3corda.core.node.services.testing.MockIdentityService
import com.r3corda.core.random63BitValue
import com.r3corda.core.success
import com.r3corda.node.services.FixingSessionInitiationHandler
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.utilities.JsonSupport
import com.r3corda.protocols.TwoPartyDealProtocol
@ -30,7 +31,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
val om = JsonSupport.createDefaultMapper(MockIdentityService(network.identities))
init {
currentDay = LocalDate.of(2016, 3, 10) // Should be 12th but the actual first fixing date gets rolled backwards.
currentDateAndTime = LocalDate.of(2016, 3, 8).atStartOfDay()
}
private var nodeAKey: KeyPair? = null
@ -39,6 +40,11 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
private val executeOnNextIteration = Collections.synchronizedList(LinkedList<() -> Unit>())
override fun startMainSimulation(): ListenableFuture<Unit> {
// TODO: until we have general session initiation
FixingSessionInitiationHandler.register(banks[0])
FixingSessionInitiationHandler.register(banks[1])
val future = SettableFuture.create<Unit>()
nodeAKey = banks[0].keyManagement.freshKey()
@ -80,7 +86,6 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
val node1: SimulatedNode = banks[i]
val node2: SimulatedNode = banks[j]
val sessionID = random63BitValue()
val swaps: Map<SecureHash, StateAndRef<InterestRateSwap.State>> = node1.services.walletService.linearHeadsOfType<InterestRateSwap.State>()
val theDealRef: StateAndRef<InterestRateSwap.State> = swaps.values.single()
@ -89,30 +94,23 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
extraNodeLabels[node1] = "Fixing event on $nextFixingDate"
extraNodeLabels[node2] = "Fixing event on $nextFixingDate"
// For some reason the first fix is always before the effective date.
if (nextFixingDate > currentDay)
currentDay = nextFixingDate
val sideA = TwoPartyDealProtocol.Floater(node2.net.myAddress, sessionID, notary.info, theDealRef, nodeAKey!!, sessionID)
val sideB = TwoPartyDealProtocol.Fixer(node1.net.myAddress, notary.info.identity, theDealRef, sessionID)
linkConsensus(listOf(node1, node2, regulators[0]), sideB)
linkProtocolProgress(node1, sideA)
linkProtocolProgress(node2, sideB)
// We have to start the protocols in separate iterations, as adding to the SMM effectively 'iterates' that node
// in the simulation, so if we don't do this then the two sides seem to act simultaneously.
val retFuture = SettableFuture.create<Unit>()
val futA = node1.smm.add("floater", sideA)
val futB = node2.smm.add("fixer", sideB)
executeOnNextIteration += {
Futures.allAsList(futA, futB) success {
retFuture.set(null)
} failure { throwable ->
retFuture.setException(throwable)
}
// Complete the future when the state has been consumed on both nodes
val futA = node1.services.walletService.whenConsumed(theDealRef.ref)
val futB = node2.services.walletService.whenConsumed(theDealRef.ref)
Futures.allAsList(futA, futB) success {
retFuture.set(null)
} failure { throwable ->
retFuture.setException(throwable)
}
showProgressFor(listOf(node1, node2))
showConsensusFor(listOf(node1, node2, regulators[0]))
// For some reason the first fix is always before the effective date.
if (nextFixingDate > currentDateAndTime.toLocalDate())
currentDateAndTime = nextFixingDate.atTime(15, 0)
return retFuture
}
@ -132,13 +130,11 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
val sessionID = random63BitValue()
val instigator = TwoPartyDealProtocol.Instigator(node2.net.myAddress, notary.info, irs, nodeAKey!!, sessionID)
val instigator = TwoPartyDealProtocol.Instigator(node2.net.myAddress, notary.info.identity, irs, nodeAKey!!, sessionID)
val acceptor = TwoPartyDealProtocol.Acceptor(node1.net.myAddress, notary.info.identity, irs, sessionID)
// TODO: Eliminate the need for linkProtocolProgress
linkConsensus(listOf(node1, node2, regulators[0]), acceptor)
linkProtocolProgress(node1, instigator)
linkProtocolProgress(node2, acceptor)
showProgressFor(listOf(node1, node2))
showConsensusFor(listOf(node1, node2, regulators[0]))
val instigatorFuture: ListenableFuture<SignedTransaction> = node1.smm.add("instigator", instigator)

View File

@ -1,6 +1,5 @@
package com.r3corda.node.internal.testing
import com.google.common.base.Function
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.node.CityDatabase
@ -15,11 +14,14 @@ import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.node.utilities.AddOrRemove
import rx.Observable
import rx.subjects.PublishSubject
import java.nio.file.Path
import java.security.KeyPair
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.util.*
/**
@ -145,6 +147,8 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
val serviceProviders: List<SimulatedNode> = listOf(notary, ratesOracle, networkMap)
val banks: List<SimulatedNode> = bankFactory.createAll()
val clocks = (serviceProviders + regulators + banks).map { it.services.clock as TestClock }
private val _allProtocolSteps = PublishSubject.create<Pair<SimulatedNode, ProgressTracker.Change>>()
private val _doneSteps = PublishSubject.create<Collection<SimulatedNode>>()
val allProtocolSteps: Observable<Pair<SimulatedNode, ProgressTracker.Change>> = _allProtocolSteps
@ -156,14 +160,21 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
* The current simulated date. By default this never changes. If you want it to change, you should do so from
* within your overridden [iterate] call. Changes in the current day surface in the [dateChanges] observable.
*/
var currentDay: LocalDate = LocalDate.now()
var currentDateAndTime: LocalDateTime = LocalDate.now().atStartOfDay()
protected set(value) {
field = value
_dateChanges.onNext(value)
}
private val _dateChanges = PublishSubject.create<LocalDate>()
val dateChanges: Observable<LocalDate> = _dateChanges
private val _dateChanges = PublishSubject.create<LocalDateTime>()
val dateChanges: Observable<LocalDateTime> get() = _dateChanges
init {
// Advance node clocks when current time is changed
dateChanges.subscribe {
clocks.setTo(currentDateAndTime.toInstant(ZoneOffset.UTC))
}
}
/**
* A place for simulations to stash human meaningful text about what the node is "thinking", which might appear
@ -201,7 +212,15 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
return null
}
protected fun linkProtocolProgress(node: SimulatedNode, protocol: ProtocolLogic<*>) {
protected fun showProgressFor(nodes: List<SimulatedNode>) {
nodes.forEach { node ->
node.smm.changes.filter { it.second == AddOrRemove.ADD }.first().subscribe {
linkProtocolProgress(node, it.first)
}
}
}
private fun linkProtocolProgress(node: SimulatedNode, protocol: ProtocolLogic<*>) {
val pt = protocol.progressTracker ?: return
pt.changes.subscribe { change: ProgressTracker.Change ->
// Runs on node thread.
@ -211,7 +230,15 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
_allProtocolSteps.onNext(Pair(node, ProgressTracker.Change.Position(pt, pt.steps[1])))
}
protected fun linkConsensus(nodes: Collection<SimulatedNode>, protocol: ProtocolLogic<*>) {
protected fun showConsensusFor(nodes: List<SimulatedNode>) {
val node = nodes.first()
node.smm.changes.filter { it.second == AddOrRemove.ADD }.first().subscribe {
linkConsensus(nodes, it.first)
}
}
private fun linkConsensus(nodes: Collection<SimulatedNode>, protocol: ProtocolLogic<*>) {
protocol.progressTracker?.changes?.subscribe { change: ProgressTracker.Change ->
// Runs on node thread.
if (protocol.progressTracker!!.currentStep == ProgressTracker.DONE) {

View File

@ -49,3 +49,9 @@ class TestClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableC
return delegateClock.zone
}
}
/**
* A helper method to set several [TestClock]s to approximately the same time. The clocks may drift by the time it
* takes for each [TestClock] to have it's time set and any observers to execute.
*/
fun Iterable<TestClock>.setTo(instant: Instant) = this.forEach { it.setTo(instant) }

View File

@ -49,9 +49,8 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
val sellerProtocol = TwoPartyTradeProtocol.Seller(buyer.net.myAddress, notary.info,
issuance.tx.outRef(0), amount, seller.storage.myLegalIdentityKey, sessionID)
linkConsensus(listOf(buyer, seller, notary), sellerProtocol)
linkProtocolProgress(buyer, buyerProtocol)
linkProtocolProgress(seller, sellerProtocol)
showConsensusFor(listOf(buyer, seller, notary))
showProgressFor(listOf(buyer, seller))
val buyerFuture = buyer.smm.add("bank.$buyerBankIndex.${TwoPartyTradeProtocol.TRADE_TOPIC}.buyer", buyerProtocol)
val sellerFuture = seller.smm.add("bank.$sellerBankIndex.${TwoPartyTradeProtocol.TRADE_TOPIC}.seller", sellerProtocol)

View File

@ -0,0 +1,22 @@
package com.r3corda.node.services
import com.r3corda.core.serialization.deserialize
import com.r3corda.node.internal.AbstractNode
import com.r3corda.protocols.TwoPartyDealProtocol
/**
* This is a temporary handler required for establishing random sessionIDs for the [Fixer] and [Floater] as part of
* running scheduled fixings for the [InterestRateSwap] contract.
*
* TODO: This will be replaced with the automatic sessionID / session setup work.
*/
object FixingSessionInitiationHandler {
fun register(node: AbstractNode) {
node.net.addMessageHandler("${TwoPartyDealProtocol.FIX_INITIATE_TOPIC}.0") { msg, registration ->
val initiation = msg.data.deserialize<TwoPartyDealProtocol.FixingSessionInitiation>()
val protocol = TwoPartyDealProtocol.Fixer(initiation)
node.smm.add("fixings", protocol)
}
}
}

View File

@ -1,11 +1,15 @@
package com.r3corda.node.services.api
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.TxWritableStorageService
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory
abstract class ServiceHubInternal : ServiceHub {
abstract val monitoringService: MonitoringService
abstract val protocolLogicRefFactory: ProtocolLogicRefFactory
/**
* Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then
@ -18,4 +22,11 @@ abstract class ServiceHubInternal : ServiceHub {
txs.forEach { writableStorageService.validatedTransactions.addTransaction(it) }
walletService.notifyAll(txs.map { it.tx })
}
/**
* TODO: borrowing this method from service manager work in another branch. It's required to avoid circular dependency
* between SMM and the scheduler. That particular problem should also be resolved by the service manager work
* itself, at which point this method would not be needed (by the scheduler)
*/
abstract fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T>
}

View File

@ -1,5 +1,7 @@
package com.r3corda.node.services.clientapi
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.RetryableException
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party
@ -8,14 +10,19 @@ import com.r3corda.core.math.CubicSplineInterpolator
import com.r3corda.core.math.Interpolator
import com.r3corda.core.math.InterpolatorFactory
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.api.AcceptsFileUpload
import org.slf4j.LoggerFactory
import com.r3corda.node.utilities.FiberBox
import com.r3corda.protocols.RatesFixProtocol
import org.slf4j.LoggerFactory
import java.io.InputStream
import java.math.BigDecimal
import java.security.KeyPair
import java.time.Clock
import java.time.Instant
import java.time.LocalDate
import java.util.*
import javax.annotation.concurrent.ThreadSafe
@ -36,7 +43,7 @@ object NodeInterestRates {
*/
class Service(node: AbstractNode) : AcceptsFileUpload, AbstractNodeService(node.services.networkService) {
val ss = node.services.storageService
val oracle = NodeInterestRates.Oracle(ss.myLegalIdentity, ss.myLegalIdentityKey)
val oracle = Oracle(ss.myLegalIdentity, ss.myLegalIdentityKey, node.services.clock)
private val logger = LoggerFactory.getLogger(Service::class.java)
@ -46,11 +53,42 @@ object NodeInterestRates {
{ message, e -> logger.error("Exception during interest rate oracle request processing", e) }
)
addMessageHandler(RatesFixProtocol.TOPIC_QUERY,
{ req: RatesFixProtocol.QueryRequest -> oracle.query(req.queries) },
{ req: RatesFixProtocol.QueryRequest ->
/**
* We put this into a protocol so that if it blocks waiting for the interest rate to become
* available, we a) don't block this thread and b) allow the fact we are waiting
* to be persisted/checkpointed.
* Interest rates become available when they are uploaded via the web as per [DataUploadServlet],
* if they haven't already been uploaded that way.
*/
node.smm.add("fixing", FixQueryHandler(this, req))
return@addMessageHandler
},
{ message, e -> logger.error("Exception during interest rate oracle request processing", e) }
)
}
private class FixQueryHandler(val service: Service, val request: RatesFixProtocol.QueryRequest) : ProtocolLogic<Unit>() {
companion object {
object RECEIVED : ProgressTracker.Step("Received fix request")
object SENDING : ProgressTracker.Step("Sending fix response")
}
override val progressTracker = ProgressTracker(RECEIVED, SENDING)
init {
progressTracker.currentStep = RECEIVED
}
@Suspendable
override fun call(): Unit {
val answers = service.oracle.query(request.queries, request.deadline)
progressTracker.currentStep = SENDING
send("${RatesFixProtocol.TOPIC}.query", request.replyTo, request.sessionID!!, answers)
}
}
// File upload support
override val dataTypePrefix = "interest-rates"
override val acceptableFileExtensions = listOf(".rates", ".txt")
@ -73,25 +111,44 @@ object NodeInterestRates {
* The oracle will try to interpolate the missing value of a tenor for the given fix name and date.
*/
@ThreadSafe
class Oracle(val identity: Party, private val signingKey: KeyPair) {
class Oracle(val identity: Party, private val signingKey: KeyPair, val clock: Clock) {
private class InnerState {
var container: FixContainer = FixContainer(emptyList<Fix>())
}
private val mutex = FiberBox(InnerState())
var knownFixes: FixContainer
set(value) {
require(value.size > 0)
mutex.write {
container = value
}
}
get() = mutex.read { container }
// Make this the last bit of initialisation logic so fully constructed when entered into instances map
init {
require(signingKey.public == identity.owningKey)
}
@Volatile var knownFixes = FixContainer(emptyList<Fix>())
set(value) {
require(value.size > 0)
field = value
}
fun query(queries: List<FixOf>): List<Fix> {
/**
* This method will now wait until the given deadline if the fix for the given [FixOf] is not immediately
* available. To implement this, [readWithDeadline] will loop if the deadline is not reached and we throw
* [UnknownFix] as it implements [RetryableException] which has special meaning to this function.
*/
@Suspendable
fun query(queries: List<FixOf>, deadline: Instant): List<Fix> {
require(queries.isNotEmpty())
val knownFixes = knownFixes // Snapshot
val answers: List<Fix?> = queries.map { knownFixes[it] }
val firstNull = answers.indexOf(null)
if (firstNull != -1)
throw UnknownFix(queries[firstNull])
return answers.filterNotNull()
return mutex.readWithDeadline(clock, deadline) {
val answers: List<Fix?> = queries.map { container[it] }
val firstNull = answers.indexOf(null)
if (firstNull != -1) {
throw UnknownFix(queries[firstNull])
} else {
answers.filterNotNull()
}
}
}
fun sign(wtx: WireTransaction): DigitalSignature.LegallyIdentifiable {
@ -120,9 +177,8 @@ object NodeInterestRates {
}
}
class UnknownFix(val fix: FixOf) : Exception() {
override fun toString() = "Unknown fix: $fix"
}
// TODO: can we split into two? Fix not available (retryable/transient) and unknown (permanent)
class UnknownFix(val fix: FixOf) : RetryableException("Unknown fix: $fix")
/** Fix container, for every fix name & date pair stores a tenor to interest rate map - [InterpolatingRateMap] */
class FixContainer(val fixes: List<Fix>, val factory: InterpolatorFactory = CubicSplineInterpolator) {

View File

@ -0,0 +1,177 @@
package com.r3corda.node.services.events
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.ThreadBox
import com.r3corda.core.contracts.SchedulableState
import com.r3corda.core.contracts.ScheduledStateRef
import com.r3corda.core.contracts.StateRef
import com.r3corda.core.node.services.SchedulerService
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.utilities.awaitWithDeadline
import java.time.Instant
import java.util.*
import java.util.concurrent.Executor
import java.util.concurrent.Executors
import javax.annotation.concurrent.ThreadSafe
/**
* A first pass of a simple [SchedulerService] that works with [MutableClock]s for testing, demonstrations and simulations
* that also encompasses the [Wallet] observer for processing transactions.
*
* This will observe transactions as they are stored and schedule and unschedule activities based on the States consumed
* or produced.
*
* TODO: Needs extensive support from persistence and protocol frameworks to be truly reliable and atomic.
*
* Currently does not provide any system state other than the ContractState so the expectation is that a transaction
* is the outcome of the activity in order to schedule another activity. Once we have implemented more persistence
* in the nodes, maybe we can consider multiple activities and whether the activities have been completed or not,
* but that starts to sound a lot like off-ledger state.
*
* @param services Core node services.
* @param protocolLogicRefFactory Factory for restoring [ProtocolLogic] instances from references.
* @param schedulerTimerExecutor The executor the scheduler blocks on waiting for the clock to advance to the next
* activity. Only replace this for unit testing purposes. This is not the executor the [ProtocolLogic] is launched on.
*/
@ThreadSafe
class NodeSchedulerService(private val services: ServiceHubInternal,
private val protocolLogicRefFactory: ProtocolLogicRefFactory = ProtocolLogicRefFactory(),
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor())
: SchedulerService, SingletonSerializeAsToken() {
private val log = loggerFor<NodeSchedulerService>()
// Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're
// inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference
// to somewhere.
private class InnerState {
// TODO: This has no persistence, and we don't consider initialising from non-empty map if we add persistence.
// If we were to rebuild the wallet at start up by replaying transactions and re-calculating, then
// persistence here would be unnecessary.
var scheduledStates = HashMap<StateRef, ScheduledStateRef>()
var earliestState: ScheduledStateRef? = null
var rescheduled: SettableFuture<Boolean>? = null
internal fun recomputeEarliest() {
earliestState = scheduledStates.map { it.value }.sortedBy { it.scheduledAt }.firstOrNull()
}
}
private val mutex = ThreadBox(InnerState())
override fun scheduleStateActivity(action: ScheduledStateRef) {
log.trace { "Schedule $action" }
mutex.locked {
scheduledStates[action.ref] = action
if (action.scheduledAt.isBefore(earliestState?.scheduledAt ?: Instant.MAX)) {
// We are earliest
earliestState = action
rescheduleWakeUp()
} else if (earliestState?.ref == action.ref && earliestState!!.scheduledAt != action.scheduledAt) {
// We were earliest but might not be any more
recomputeEarliest()
rescheduleWakeUp()
}
}
}
override fun unscheduleStateActivity(ref: StateRef) {
log.trace { "Unschedule $ref" }
mutex.locked {
val removedAction = scheduledStates.remove(ref)
if (removedAction == earliestState && removedAction != null) {
recomputeEarliest()
rescheduleWakeUp()
}
}
}
/**
* This method first cancels the [Future] for any pending action so that the [awaitWithDeadline] used below
* drops through without running the action. We then create a new [Future] for the new action (so it too can be
* cancelled), and then await the arrival of the scheduled time. If we reach the scheduled time (the deadline)
* without the [Future] being cancelled then we run the scheduled action. Finally we remove that action from the
* scheduled actions and recompute the next scheduled action.
*/
private fun rescheduleWakeUp() {
// Note, we already have the mutex but we need the scope again here
val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked {
rescheduled?.cancel(false)
rescheduled = SettableFuture.create()
Pair(earliestState, rescheduled!!)
}
if (scheduledState != null) {
schedulerTimerExecutor.execute() {
log.trace { "Scheduling as next $scheduledState" }
// This will block the scheduler single thread until the scheduled time (returns false) OR
// the Future is cancelled due to rescheduling (returns true).
if (!services.clock.awaitWithDeadline(scheduledState.scheduledAt, ourRescheduledFuture)) {
log.trace { "Invoking as next $scheduledState" }
onTimeReached(scheduledState)
} else {
log.trace { "Recheduled $scheduledState" }
}
}
}
}
private fun onTimeReached(scheduledState: ScheduledStateRef) {
try {
runScheduledActionForState(scheduledState)
} finally {
// Unschedule once complete (or checkpointed)
mutex.locked {
// need to remove us from those scheduled, but only if we are still next
scheduledStates.compute(scheduledState.ref) { ref, value ->
if (value === scheduledState) null else value
}
// and schedule the next one
recomputeEarliest()
rescheduleWakeUp()
}
}
}
private fun runScheduledActionForState(scheduledState: ScheduledStateRef) {
val txState = services.loadState(scheduledState.ref)
// It's OK to return if it's null as there's nothing scheduled
// TODO: implement sandboxing as necessary
val scheduledActivity = sandbox {
val state = txState.data as SchedulableState
state.nextScheduledActivity(scheduledState.ref, protocolLogicRefFactory)
} ?: return
if (scheduledActivity.scheduledAt.isAfter(services.clock.instant())) {
// I suppose it might turn out that the action is no longer due (a bug, maybe), so we need to defend against that and re-schedule
// TODO: warn etc
mutex.locked {
// Replace with updated instant
scheduledStates.compute(scheduledState.ref) { ref, value ->
if (value === scheduledState) ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt) else value
}
}
} else {
/**
* TODO: align with protocol invocation via API... make it the same code
* TODO: Persistence and durability issues:
* a) Need to consider potential to run activity twice if restart between here and removing from map if we add persistence
* b) But if remove from map first, there's potential to run zero times if restart
* c) Address by switch to 3rd party scheduler? Only benefit of this impl. is support for DemoClock or other MutableClocks (e.g. for testing)
* TODO: ProtocolLogicRefFactory needs to sort out the class loader etc
*/
val logic = protocolLogicRefFactory.toProtocolLogic(scheduledActivity.logicRef)
log.trace { "Firing ProtocolLogic $logic" }
// TODO: ProtocolLogic should be checkpointed by the time this returns
services.startProtocol("scheduled", logic)
}
}
// TODO: Does nothing right now, but beware we are calling dynamically loaded code in the contract inside here.
private inline fun <T : Any> sandbox(code: () -> T?): T? {
return code()
}
}

View File

@ -0,0 +1,35 @@
package com.r3corda.node.services.events
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.SchedulableState
import com.r3corda.core.contracts.ScheduledStateRef
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.node.services.api.ServiceHubInternal
/**
* This observes the wallet and schedules and unschedules activities appropriately based on state production and
* consumption.
*/
class ScheduledActivityObserver(val services: ServiceHubInternal) {
init {
// TODO: Need to consider failure scenarios. This needs to run if the TX is successfully recorded
services.walletService.updates.subscribe { update ->
update.consumed.forEach { services.schedulerService.unscheduleStateActivity(it) }
update.produced.forEach { scheduleStateActivity(it, services.protocolLogicRefFactory) }
}
}
private fun scheduleStateActivity(produced: StateAndRef<ContractState>, protocolLogicRefFactory: ProtocolLogicRefFactory) {
val producedState = produced.state.data
if (producedState is SchedulableState) {
val scheduledAt = sandbox { producedState.nextScheduledActivity(produced.ref, protocolLogicRefFactory)?.scheduledAt } ?: return
services.schedulerService.scheduleStateActivity(ScheduledStateRef(produced.ref, scheduledAt))
}
}
// TODO: Beware we are calling dynamically loaded contract code inside here.
private inline fun <T : Any> sandbox(code: () -> T?): T? {
return code()
}
}

View File

@ -2,9 +2,12 @@ package com.r3corda.node.services
import com.codahale.metrics.MetricRegistry
import com.r3corda.core.contracts.SignedTransaction
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.*
import com.r3corda.core.node.services.testing.MockStorageService
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.MonitoringService
@ -12,6 +15,7 @@ import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.network.MockNetworkMapCache
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.persistence.DataVendingService
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.wallet.NodeWalletService
import java.time.Clock
@ -23,10 +27,11 @@ open class MockServices(
val storage: TxWritableStorageService? = MockStorageService(),
val mapCache: NetworkMapCache? = MockNetworkMapCache(),
val mapService: NetworkMapService? = null,
val overrideClock: Clock? = NodeClock()
val scheduler: SchedulerService? = null,
val overrideClock: Clock? = NodeClock(),
val protocolFactory: ProtocolLogicRefFactory? = ProtocolLogicRefFactory()
) : ServiceHubInternal() {
override val walletService: WalletService = customWallet ?: NodeWalletService(this)
override val keyManagementService: KeyManagementService
get() = keyManagement ?: throw UnsupportedOperationException()
override val identityService: IdentityService
@ -37,10 +42,15 @@ open class MockServices(
get() = mapCache ?: throw UnsupportedOperationException()
override val storageService: StorageService
get() = storage ?: throw UnsupportedOperationException()
override val schedulerService: SchedulerService
get() = scheduler ?: throw UnsupportedOperationException()
override val clock: Clock
get() = overrideClock ?: throw UnsupportedOperationException()
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val protocolLogicRefFactory: ProtocolLogicRefFactory
get() = protocolFactory ?: throw UnsupportedOperationException()
// We isolate the storage service with writable TXes so that it can't be accessed except via recordTransactions()
private val txStorageService: TxWritableStorageService
get() = storage ?: throw UnsupportedOperationException()
@ -48,6 +58,12 @@ open class MockServices(
override fun recordTransactions(txs: Iterable<SignedTransaction>) =
recordTransactionsInternal(txStorageService, txs)
lateinit var smm: StateMachineManager
override fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
return smm.add(loggerName, logic)
}
init {
if (net != null && storage != null) {
// Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener

View File

@ -21,6 +21,8 @@ import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.protocols.RatesFixProtocol
import org.junit.Assert
import org.junit.Test
import java.time.Clock
import java.time.Duration
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
@ -37,11 +39,12 @@ class NodeInterestRatesTest {
val DUMMY_CASH_ISSUER_KEY = generateKeyPair()
val DUMMY_CASH_ISSUER = Party("Cash issuer", DUMMY_CASH_ISSUER_KEY.public)
val oracle = NodeInterestRates.Oracle(MEGA_CORP, MEGA_CORP_KEY).apply { knownFixes = TEST_DATA }
val clock = Clock.systemUTC()
val oracle = NodeInterestRates.Oracle(MEGA_CORP, MEGA_CORP_KEY, clock).apply { knownFixes = TEST_DATA }
@Test fun `query successfully`() {
val q = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M")
val res = oracle.query(listOf(q))
val res = oracle.query(listOf(q), clock.instant())
assertEquals(1, res.size)
assertEquals("0.678".bd, res[0].value)
assertEquals(q, res[0].of)
@ -50,13 +53,13 @@ class NodeInterestRatesTest {
@Test fun `query with one success and one missing`() {
val q1 = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M")
val q2 = NodeInterestRates.parseFixOf("LIBOR 2016-03-15 1M")
val e = assertFailsWith<NodeInterestRates.UnknownFix> { oracle.query(listOf(q1, q2)) }
val e = assertFailsWith<NodeInterestRates.UnknownFix> { oracle.query(listOf(q1, q2), clock.instant()) }
assertEquals(e.fix, q2)
}
@Test fun `query successfully with interpolated rate`() {
val q = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 5M")
val res = oracle.query(listOf(q))
val res = oracle.query(listOf(q), clock.instant())
assertEquals(1, res.size)
Assert.assertEquals(0.7316228, res[0].value.toDouble(), 0.0000001)
assertEquals(q, res[0].of)
@ -64,11 +67,11 @@ class NodeInterestRatesTest {
@Test fun `rate missing and unable to interpolate`() {
val q = NodeInterestRates.parseFixOf("EURIBOR 2016-03-15 3M")
assertFailsWith<NodeInterestRates.UnknownFix> { oracle.query(listOf(q)) }
assertFailsWith<NodeInterestRates.UnknownFix> { oracle.query(listOf(q), clock.instant()) }
}
@Test fun `empty query`() {
assertFailsWith<IllegalArgumentException> { oracle.query(emptyList()) }
assertFailsWith<IllegalArgumentException> { oracle.query(emptyList(), clock.instant()) }
}
@Test fun `refuse to sign with no relevant commands`() {
@ -80,7 +83,7 @@ class NodeInterestRatesTest {
@Test fun `sign successfully`() {
val tx = makeTX()
val fix = oracle.query(listOf(NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M"))).first()
val fix = oracle.query(listOf(NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M")), clock.instant()).first()
tx.addCommand(fix, oracle.identity.owningKey)
// Sign successfully.
@ -106,7 +109,7 @@ class NodeInterestRatesTest {
val tx = TransactionType.General.Builder()
val fixOf = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M")
val protocol = RatesFixProtocol(tx, n2.info, fixOf, "0.675".bd, "0.1".bd)
val protocol = RatesFixProtocol(tx, n2.info, fixOf, "0.675".bd, "0.1".bd, Duration.ofNanos(1))
BriefLogFormatter.initVerbose("rates")
val future = n1.smm.add("rates", protocol)

View File

@ -0,0 +1,247 @@
package com.r3corda.node.services
import com.google.common.jimfs.Configuration
import com.google.common.jimfs.Jimfs
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.days
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.testing.MockKeyManagementService
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRef
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.testing.ALICE_KEY
import com.r3corda.core.testing.DUMMY_NOTARY
import com.r3corda.node.internal.testing.TestClock
import com.r3corda.node.services.events.NodeSchedulerService
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.utilities.AffinityExecutor
import org.assertj.core.api.Assertions.assertThat
import org.junit.Before
import org.junit.Test
import java.nio.file.FileSystem
import java.security.PublicKey
import java.time.Clock
import java.time.Instant
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
// Use an in memory file system for testing attachment storage.
val fs: FileSystem = Jimfs.newFileSystem(Configuration.unix())
val realClock: Clock = Clock.systemUTC()
val stoppedClock = Clock.fixed(realClock.instant(), realClock.zone)
val testClock = TestClock(stoppedClock)
val smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
val schedulerGatedExecutor = AffinityExecutor.Gate(true)
// We have to allow Java boxed primitives but Kotlin warns we shouldn't be using them
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
val factory = ProtocolLogicRefFactory(setOf(TestProtocolLogic::class.java.name), setOf(NodeSchedulerServiceTest::class.java.name, Integer::class.java.name))
val scheduler: NodeSchedulerService
val services: ServiceHub
/**
* Have a reference to this test added to [ServiceHub] so that when the [ProtocolLogic] runs it can access the test instance.
* The [TestState] is serialized and deserialized so attempting to use a transient field won't work, as it just
* results in NPE.
*/
interface TestReference {
val testReference: NodeSchedulerServiceTest
}
init {
val kms = MockKeyManagementService(ALICE_KEY)
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"))
val mockServices = object : MockServices(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
override val testReference = this@NodeSchedulerServiceTest
}
services = mockServices
scheduler = NodeSchedulerService(mockServices, factory, schedulerGatedExecutor)
val mockSMM = StateMachineManager(mockServices, listOf(mockServices), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor)
mockServices.smm = mockSMM
}
lateinit var countDown: CountDownLatch
var calls: Int = 0
@Before
fun setup() {
countDown = CountDownLatch(1)
calls = 0
}
class TestState(val protocolLogicRef: ProtocolLogicRef, val instant: Instant) : LinearState, SchedulableState {
override val participants: List<PublicKey>
get() = throw UnsupportedOperationException()
override val thread = SecureHash.sha256("does not matter but we need it to be unique ${Math.random()}")
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean = true
override fun nextScheduledActivity(thisStateRef: StateRef, protocolLogicRefFactory: ProtocolLogicRefFactory): ScheduledActivity? = ScheduledActivity(protocolLogicRef, instant)
override val contract: Contract
get() = throw UnsupportedOperationException()
}
class TestProtocolLogic(val increment: Int = 1) : ProtocolLogic<Unit>() {
override fun call() {
(serviceHub as TestReference).testReference.calls += increment
(serviceHub as TestReference).testReference.countDown.countDown()
}
}
class Command : TypeOnlyCommandData()
@Test
fun `test activity due now`() {
val time = stoppedClock.instant()
scheduleTX(time)
assertThat(calls).isEqualTo(0)
schedulerGatedExecutor.waitAndRun()
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(1)
}
@Test
fun `test activity due in the past`() {
val time = stoppedClock.instant() - 1.days
scheduleTX(time)
assertThat(calls).isEqualTo(0)
schedulerGatedExecutor.waitAndRun()
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(1)
}
@Test
fun `test activity due in the future`() {
val time = stoppedClock.instant() + 1.days
scheduleTX(time)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
testClock.advanceBy(1.days)
backgroundExecutor.shutdown()
backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(1)
}
@Test
fun `test activity due in the future and schedule another earlier`() {
val time = stoppedClock.instant() + 1.days
scheduleTX(time + 1.days)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduleTX(time, 3)
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
testClock.advanceBy(1.days)
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(3)
backgroundExecutor.shutdown()
backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)
}
@Test
fun `test activity due in the future and schedule another later`() {
val time = stoppedClock.instant() + 1.days
scheduleTX(time)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduleTX(time + 1.days, 3)
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
testClock.advanceBy(1.days)
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(1)
testClock.advanceBy(1.days)
backgroundExecutor.shutdown()
backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)
}
@Test
fun `test activity due in the future and schedule another for same time`() {
val time = stoppedClock.instant() + 1.days
scheduleTX(time)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduleTX(time, 3)
testClock.advanceBy(1.days)
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(1)
backgroundExecutor.shutdown()
backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)
}
@Test
fun `test activity due in the future and schedule another for same time then unschedule original`() {
val time = stoppedClock.instant() + 1.days
var scheduledRef1 = scheduleTX(time)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduleTX(time, 3)
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
scheduler.unscheduleStateActivity(scheduledRef1!!.ref)
testClock.advanceBy(1.days)
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(3)
backgroundExecutor.shutdown()
backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)
}
@Test
fun `test activity due in the future then unschedule`() {
var scheduledRef1 = scheduleTX(stoppedClock.instant() + 1.days)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduler.unscheduleStateActivity(scheduledRef1!!.ref)
testClock.advanceBy(1.days)
assertThat(calls).isEqualTo(0)
backgroundExecutor.shutdown()
backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)
}
private fun scheduleTX(instant: Instant, increment: Int = 1): ScheduledStateRef? {
var scheduledRef: ScheduledStateRef? = null
apply {
val freshKey = services.keyManagementService.freshKey()
val state = TestState(factory.create(TestProtocolLogic::class.java, increment), instant)
val usefulTX = TransactionType.General.Builder(DUMMY_NOTARY).apply {
addOutputState(state)
addCommand(Command(), freshKey.public)
signWith(freshKey)
}.toSignedTransaction()
val txHash = usefulTX.id
services.recordTransactions(usefulTX)
scheduledRef = ScheduledStateRef(StateRef(txHash, 0), state.instant)
scheduler.scheduleStateActivity(scheduledRef!!)
}
return scheduledRef
}
}

View File

@ -1,18 +1,11 @@
package com.r3corda.demos
import com.google.common.net.HostAndPort
import com.typesafe.config.ConfigFactory
import com.r3corda.core.crypto.Party
import com.r3corda.core.logElapsedTime
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.node.internal.Node
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.core.node.NodeInfo
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.core.node.services.ServiceType
import com.r3corda.node.services.messaging.ArtemisMessagingService
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.utilities.BriefLogFormatter
import com.r3corda.demos.api.InterestRateSwapAPI
@ -20,13 +13,23 @@ import com.r3corda.demos.protocols.AutoOfferProtocol
import com.r3corda.demos.protocols.ExitServerProtocol
import com.r3corda.demos.protocols.UpdateBusinessDayProtocol
import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.internal.Node
import com.r3corda.node.internal.testing.MockNetwork
import com.r3corda.node.services.FixingSessionInitiationHandler
import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.node.services.messaging.ArtemisMessagingService
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.typesafe.config.ConfigFactory
import joptsimple.OptionParser
import joptsimple.OptionSet
import org.apache.commons.io.IOUtils
import java.io.DataOutputStream
import java.io.File
import java.net.HttpURLConnection
import java.net.SocketTimeoutException
import java.net.URL
import java.nio.file.Files
import java.nio.file.Path
@ -34,8 +37,6 @@ import java.nio.file.Paths
import java.util.*
import kotlin.concurrent.fixedRateTimer
import kotlin.system.exitProcess
import org.apache.commons.io.IOUtils
import java.net.SocketTimeoutException
// IRS DEMO
//
@ -304,6 +305,7 @@ private fun runNode(cliParams: CliParams.RunNode): Int {
AutoOfferProtocol.Handler.register(node)
UpdateBusinessDayProtocol.Handler.register(node)
ExitServerProtocol.Handler.register(node)
FixingSessionInitiationHandler.register(node)
if (cliParams.uploadRates) {
runUploadRates(cliParams.apiAddress)

View File

@ -4,6 +4,7 @@ import com.google.common.net.HostAndPort
import com.r3corda.contracts.cash.Cash
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.hours
import com.r3corda.core.logElapsedTime
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.ServiceType
@ -83,7 +84,7 @@ fun main(args: Array<String>) {
// Make a garbage transaction that includes a rate fix.
val tx = TransactionType.General.Builder()
tx.addOutputState(TransactionState(Cash.State(1500.DOLLARS `issued by` node.storage.myLegalIdentity.ref(1), node.keyManagement.freshKey().public), notary.identity))
val protocol = RatesFixProtocol(tx, oracleNode, fixOf, expectedRate, rateTolerance)
val protocol = RatesFixProtocol(tx, oracleNode, fixOf, expectedRate, rateTolerance, 24.hours)
node.smm.add("demo.ratefix", protocol).get()
node.stop()

View File

@ -25,6 +25,7 @@ object AutoOfferProtocol {
val TOPIC = "autooffer.topic"
data class AutoOfferMessage(val otherSide: SingleMessageRecipient,
val notary: Party,
val otherSessionID: Long, val dealBeingOffered: DealState)
object Handler {
@ -53,9 +54,7 @@ object AutoOfferProtocol {
val autoOfferMessage = msg.data.deserialize<AutoOfferMessage>()
// Put the deal onto the ledger
progressTracker.currentStep = DEALING
// TODO required as messaging layer does not currently queue messages that arrive before we expect them
Thread.sleep(100)
val seller = TwoPartyDealProtocol.Instigator(autoOfferMessage.otherSide, node.services.networkMapCache.notaryNodes.first(),
val seller = TwoPartyDealProtocol.Instigator(autoOfferMessage.otherSide, autoOfferMessage.notary,
autoOfferMessage.dealBeingOffered, node.services.keyManagementService.freshKey(), autoOfferMessage.otherSessionID, progressTracker.getChildProgressTracker(DEALING)!!)
val future = node.smm.add("${TwoPartyDealProtocol.DEAL_TOPIC}.seller", seller)
// This is required because we are doing child progress outside of a subprotocol. In future, we should just wrap things like this in a protocol to avoid it
@ -94,16 +93,16 @@ object AutoOfferProtocol {
require(serviceHub.networkMapCache.notaryNodes.isNotEmpty()) { "No notary nodes registered" }
val ourSessionID = random63BitValue()
val notary = serviceHub.networkMapCache.notaryNodes.first()
val notary = serviceHub.networkMapCache.notaryNodes.first().identity
// need to pick which ever party is not us
val otherParty = notUs(*dealToBeOffered.parties).single()
val otherNode = (serviceHub.networkMapCache.getNodeByLegalName(otherParty.name))
requireNotNull(otherNode) { "Cannot identify other party " + otherParty.name + ", know about: " + serviceHub.networkMapCache.partyNodes.map { it.identity } }
val otherSide = otherNode!!.address
progressTracker.currentStep = ANNOUNCING
send(TOPIC, otherSide, 0, AutoOfferMessage(serviceHub.networkService.myAddress, ourSessionID, dealToBeOffered))
send(TOPIC, otherSide, 0, AutoOfferMessage(serviceHub.networkService.myAddress, notary, ourSessionID, dealToBeOffered))
progressTracker.currentStep = DEALING
val stx = subProtocol(TwoPartyDealProtocol.Acceptor(otherSide, notary.identity, dealToBeOffered, ourSessionID, progressTracker.getChildProgressTracker(DEALING)!!))
val stx = subProtocol(TwoPartyDealProtocol.Acceptor(otherSide, notary, dealToBeOffered, ourSessionID, progressTracker.getChildProgressTracker(DEALING)!!))
return stx
}

View File

@ -1,171 +1,50 @@
package com.r3corda.demos.protocols
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.contracts.InterestRateSwap
import com.r3corda.core.contracts.DealState
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.contracts.TransactionState
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.linearHeadsOfType
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.demos.DemoClock
import com.r3corda.node.internal.Node
import com.r3corda.node.services.network.MockNetworkMapCache
import com.r3corda.node.utilities.ANSIProgressRenderer
import com.r3corda.protocols.TwoPartyDealProtocol
import java.time.LocalDate
/**
* This is a very temporary, demo-oriented way of initiating processing of temporal events and is not
* intended as the way things will necessarily be done longer term
* This is a less temporary, demo-oriented way of initiating processing of temporal events
*/
object UpdateBusinessDayProtocol {
val TOPIC = "businessday.topic"
class Updater(val date: LocalDate, val sessionID: Long,
override val progressTracker: ProgressTracker = Updater.tracker()) : ProtocolLogic<Boolean>() {
companion object {
object FETCHING : ProgressTracker.Step("Fetching deals")
object ITERATING_DEALS : ProgressTracker.Step("Interating over deals")
object ITERATING_FIXINGS : ProgressTracker.Step("Iterating over fixings")
object FIXING : ProgressTracker.Step("Fixing deal")
fun tracker() = ProgressTracker(FETCHING, ITERATING_DEALS, ITERATING_FIXINGS, FIXING)
}
@Suspendable
override fun call(): Boolean {
// Get deals
progressTracker.currentStep = FETCHING
val dealStateRefs = serviceHub.walletService.linearHeadsOfType<DealState>()
val otherPartyToDeals = dealStateRefs.values.groupBy { otherParty(it.state.data) }
// TODO we need to process these in parallel to stop there being an ordering problem across more than two nodes
val sortedParties = otherPartyToDeals.keys.sortedBy { it.identity.name }
for (party in sortedParties) {
val sortedDeals = otherPartyToDeals[party]!!.sortedBy { it.state.data.ref }
for (deal in sortedDeals) {
progressTracker.currentStep = ITERATING_DEALS
processDeal(party, deal, date, sessionID)
}
}
return false
}
// This assumes we definitely have one key or the other
fun otherParty(deal: DealState): NodeInfo {
val ourKeys = serviceHub.keyManagementService.keys.keys
return serviceHub.networkMapCache.getNodeByLegalName(deal.parties.single { !ourKeys.contains(it.owningKey) }.name)!!
}
// TODO we should make this more object oriented when we can ask a state for it's contract
@Suspendable
fun processDeal(party: NodeInfo, deal: StateAndRef<DealState>, date: LocalDate, sessionID: Long) {
val s = deal.state.data
when (s) {
is InterestRateSwap.State -> processInterestRateSwap(party, StateAndRef(TransactionState(s, deal.state.notary), deal.ref), date, sessionID)
}
}
// TODO and this would move to the InterestRateSwap and cope with permutations of Fixed/Floating and Floating/Floating etc
@Suspendable
fun processInterestRateSwap(party: NodeInfo, deal: StateAndRef<InterestRateSwap.State>, date: LocalDate, sessionID: Long) {
var dealStateAndRef: StateAndRef<InterestRateSwap.State>? = deal
var nextFixingDate = deal.state.data.calculation.nextFixingDate()
while (nextFixingDate != null && !nextFixingDate.isAfter(date)) {
progressTracker.currentStep = ITERATING_FIXINGS
/*
* Note that this choice of fixed versus floating leg is simply to assign roles in
* the fixing protocol and doesn't infer roles or responsibilities in a business sense.
* One of the parties needs to take the lead in the coordination and this is a reliable deterministic way
* to do it.
*/
if (party.identity.name == deal.state.data.fixedLeg.fixedRatePayer.name) {
dealStateAndRef = nextFixingFloatingLeg(dealStateAndRef!!, party, sessionID)
} else {
dealStateAndRef = nextFixingFixedLeg(dealStateAndRef!!, party, sessionID)
}
nextFixingDate = dealStateAndRef?.state?.data?.calculation?.nextFixingDate()
}
}
@Suspendable
private fun nextFixingFloatingLeg(dealStateAndRef: StateAndRef<InterestRateSwap.State>, party: NodeInfo, sessionID: Long): StateAndRef<InterestRateSwap.State>? {
progressTracker.setChildProgressTracker(FIXING, TwoPartyDealProtocol.Primary.tracker())
progressTracker.currentStep = FIXING
val myName = serviceHub.storageService.myLegalIdentity.name
val deal: InterestRateSwap.State = dealStateAndRef.state.data
val myOldParty = deal.parties.single { it.name == myName }
val keyPair = serviceHub.keyManagementService.toKeyPair(myOldParty.owningKey)
val participant = TwoPartyDealProtocol.Floater(party.address, sessionID, serviceHub.networkMapCache.notaryNodes[0], dealStateAndRef,
keyPair,
sessionID, progressTracker.getChildProgressTracker(FIXING)!!)
val result = subProtocol(participant)
return result.tx.outRef(0)
}
@Suspendable
private fun nextFixingFixedLeg(dealStateAndRef: StateAndRef<InterestRateSwap.State>, party: NodeInfo, sessionID: Long): StateAndRef<InterestRateSwap.State>? {
progressTracker.setChildProgressTracker(FIXING, TwoPartyDealProtocol.Secondary.tracker())
progressTracker.currentStep = FIXING
val participant = TwoPartyDealProtocol.Fixer(
party.address,
serviceHub.networkMapCache.notaryNodes[0].identity,
dealStateAndRef,
sessionID,
progressTracker.getChildProgressTracker(FIXING)!!)
val result = subProtocol(participant)
return result.tx.outRef(0)
}
}
data class UpdateBusinessDayMessage(val date: LocalDate, val sessionID: Long)
data class UpdateBusinessDayMessage(val date: LocalDate)
object Handler {
fun register(node: Node) {
node.net.addMessageHandler("$TOPIC.0") { msg, registration ->
// Just to validate we got the message
node.net.addMessageHandler("${TOPIC}.0") { msg, registration ->
val updateBusinessDayMessage = msg.data.deserialize<UpdateBusinessDayMessage>()
if ((node.services.clock as DemoClock).updateDate(updateBusinessDayMessage.date)) {
val participant = Updater(updateBusinessDayMessage.date, updateBusinessDayMessage.sessionID)
node.smm.add("update.business.day", participant)
}
(node.services.clock as DemoClock).updateDate(updateBusinessDayMessage.date)
}
}
}
class Broadcast(val date: LocalDate,
override val progressTracker: ProgressTracker = Broadcast.tracker()) : ProtocolLogic<Boolean>() {
override val progressTracker: ProgressTracker = Broadcast.tracker()) : ProtocolLogic<Unit>() {
companion object {
object NOTIFYING : ProgressTracker.Step("Notifying peer")
object LOCAL : ProgressTracker.Step("Updating locally") {
override fun childProgressTracker(): ProgressTracker = Updater.tracker()
}
object NOTIFYING : ProgressTracker.Step("Notifying peers")
fun tracker() = ProgressTracker(NOTIFYING, LOCAL)
fun tracker() = ProgressTracker(NOTIFYING)
}
@Suspendable
override fun call(): Boolean {
val message = UpdateBusinessDayMessage(date, random63BitValue())
override fun call(): Unit {
progressTracker.currentStep = NOTIFYING
val message = UpdateBusinessDayMessage(date)
for (recipient in serviceHub.networkMapCache.partyNodes) {
progressTracker.currentStep = NOTIFYING
doNextRecipient(recipient, message)
}
if ((serviceHub.clock as DemoClock).updateDate(message.date)) {
progressTracker.currentStep = LOCAL
subProtocol(Updater(message.date, message.sessionID, progressTracker.getChildProgressTracker(LOCAL)!!))
}
return true
}
@Suspendable
@ -173,10 +52,7 @@ object UpdateBusinessDayProtocol {
if (recipient.address is MockNetworkMapCache.MockAddress) {
// Ignore
} else {
// TODO: messaging ourselves seems to trigger a bug for the time being and we continuously receive messages
if (recipient.identity != serviceHub.storageService.myLegalIdentity) {
send(TOPIC, recipient.address, 0, message)
}
send(TOPIC, recipient.address, 0, message)
}
}
}