From 5271882dcdfdf1e1eadc66d20e2acf8555c2f9c7 Mon Sep 17 00:00:00 2001 From: "rick.parker" Date: Tue, 24 May 2016 10:03:29 +0100 Subject: [PATCH] Event scheduling and docs for event scheduling --- .../main/kotlin/com/r3corda/contracts/IRS.kt | 13 +- .../src/main/kotlin/com/r3corda/core/Utils.kt | 7 +- .../com/r3corda/core/contracts/Structures.kt | 45 +++- .../com/r3corda/core/node/ServiceHub.kt | 1 + .../r3corda/core/node/services/Services.kt | 33 +++ .../r3corda/core/protocols/ProtocolLogic.kt | 8 +- .../core/protocols/ProtocolLogicRef.kt | 163 ++++++++++++ .../com/r3corda/core/utilities/OracleUtils.kt | 19 ++ .../com/r3corda/protocols/RatesFixProtocol.kt | 12 +- .../r3corda/protocols/TwoPartyDealProtocol.kt | 217 +++++++++------ .../ProtocolLogicRefFromJavaTest.java | 42 +++ .../core/protocols/ProtocolLogicRefTest.kt | 80 ++++++ docs/source/event-scheduling.rst | 102 ++++++++ docs/source/index.rst | 1 + .../com/r3corda/node/internal/AbstractNode.kt | 27 +- .../node/internal/testing/IRSSimulation.kt | 54 ++-- .../node/internal/testing/Simulation.kt | 39 ++- .../node/internal/testing/TestClock.kt | 6 + .../node/internal/testing/TradeSimulation.kt | 5 +- .../FixingSessionInitiationHandler.kt | 22 ++ .../node/services/api/ServiceHubInternal.kt | 11 + .../services/clientapi/NodeInterestRates.kt | 96 +++++-- .../services/events/NodeSchedulerService.kt | 177 +++++++++++++ .../events/ScheduledActivityObserver.kt | 35 +++ .../com/r3corda/node/services/MockServices.kt | 20 +- .../node/services/NodeInterestRatesTest.kt | 19 +- .../node/services/NodeSchedulerServiceTest.kt | 247 ++++++++++++++++++ src/main/kotlin/com/r3corda/demos/IRSDemo.kt | 20 +- .../kotlin/com/r3corda/demos/RateFixDemo.kt | 3 +- .../demos/protocols/AutoOfferProtocol.kt | 11 +- .../protocols/UpdateBusinessDayProtocol.kt | 148 +---------- 31 files changed, 1375 insertions(+), 308 deletions(-) create mode 100644 core/src/main/kotlin/com/r3corda/core/protocols/ProtocolLogicRef.kt create mode 100644 core/src/main/kotlin/com/r3corda/core/utilities/OracleUtils.kt create mode 100644 core/src/test/java/com/r3corda/core/protocols/ProtocolLogicRefFromJavaTest.java create mode 100644 core/src/test/kotlin/com/r3corda/core/protocols/ProtocolLogicRefTest.kt create mode 100644 docs/source/event-scheduling.rst create mode 100644 node/src/main/kotlin/com/r3corda/node/services/FixingSessionInitiationHandler.kt create mode 100644 node/src/main/kotlin/com/r3corda/node/services/events/NodeSchedulerService.kt create mode 100644 node/src/main/kotlin/com/r3corda/node/services/events/ScheduledActivityObserver.kt create mode 100644 node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt diff --git a/contracts/src/main/kotlin/com/r3corda/contracts/IRS.kt b/contracts/src/main/kotlin/com/r3corda/contracts/IRS.kt index cc09c7d657..33c2c0bb2d 100644 --- a/contracts/src/main/kotlin/com/r3corda/contracts/IRS.kt +++ b/contracts/src/main/kotlin/com/r3corda/contracts/IRS.kt @@ -3,6 +3,9 @@ package com.r3corda.contracts import com.r3corda.core.contracts.* import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.protocols.ProtocolLogicRefFactory +import com.r3corda.core.utilities.suggestInterestRateAnnouncementTimeWindow +import com.r3corda.protocols.TwoPartyDealProtocol import org.apache.commons.jexl3.JexlBuilder import org.apache.commons.jexl3.MapContext import java.math.BigDecimal @@ -588,7 +591,7 @@ class InterestRateSwap() : Contract { val floatingLeg: FloatingLeg, val calculation: Calculation, val common: Common - ) : FixableDealState { + ) : FixableDealState, SchedulableState { override val contract = IRS_PROGRAM_ID override val thread = SecureHash.sha256(common.tradeID) @@ -604,6 +607,14 @@ class InterestRateSwap() : Contract { override val parties: Array get() = arrayOf(fixedLeg.fixedRatePayer, floatingLeg.floatingRatePayer) + override fun nextScheduledActivity(thisStateRef: StateRef, protocolLogicRefFactory: ProtocolLogicRefFactory): ScheduledActivity? { + val nextFixingOf = nextFixingOf() ?: return null + + // This is perhaps not how we should determine the time point in the business day, but instead expect the schedule to detail some of these aspects + val (instant, duration) = suggestInterestRateAnnouncementTimeWindow(index = nextFixingOf.name, source = floatingLeg.indexSource, date = nextFixingOf.forDay) + return ScheduledActivity(protocolLogicRefFactory.create(TwoPartyDealProtocol.FixingRoleDecider::class.java, thisStateRef, duration), instant) + } + // TODO: This changing of the public key violates the assumption that Party is a fixed identity key. override fun withPublicKey(before: Party, after: PublicKey): DealState { val newParty = Party(before.name, after) diff --git a/core/src/main/kotlin/com/r3corda/core/Utils.kt b/core/src/main/kotlin/com/r3corda/core/Utils.kt index 73b7f87599..4598e15a2a 100644 --- a/core/src/main/kotlin/com/r3corda/core/Utils.kt +++ b/core/src/main/kotlin/com/r3corda/core/Utils.kt @@ -14,7 +14,6 @@ import java.nio.file.Path import java.time.Duration import java.time.temporal.Temporal import java.util.concurrent.Executor -import java.util.concurrent.locks.Lock import java.util.concurrent.locks.ReentrantLock import java.util.zip.ZipInputStream import kotlin.concurrent.withLock @@ -143,9 +142,13 @@ inline fun logElapsedTime(label: String, logger: Logger? = null, body: () -> * * val ii = state.locked { i } */ -class ThreadBox(content: T, val lock: Lock = ReentrantLock()) { +class ThreadBox(content: T, val lock: ReentrantLock = ReentrantLock()) { val content = content inline fun locked(body: T.() -> R): R = lock.withLock { body(content) } + inline fun alreadyLocked(body: T.() -> R): R { + check(lock.isHeldByCurrentThread, { "Expected $lock to already be locked." }) + return body(content) + } } /** diff --git a/core/src/main/kotlin/com/r3corda/core/contracts/Structures.kt b/core/src/main/kotlin/com/r3corda/core/contracts/Structures.kt index c68e0012d4..2048474434 100644 --- a/core/src/main/kotlin/com/r3corda/core/contracts/Structures.kt +++ b/core/src/main/kotlin/com/r3corda/core/contracts/Structures.kt @@ -3,6 +3,8 @@ package com.r3corda.core.contracts import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.toStringShort +import com.r3corda.core.protocols.ProtocolLogicRef +import com.r3corda.core.protocols.ProtocolLogicRefFactory import com.r3corda.core.serialization.OpaqueBytes import com.r3corda.core.serialization.serialize import java.io.FileNotFoundException @@ -141,6 +143,33 @@ interface OwnableState : ContractState { fun withNewOwner(newOwner: PublicKey): Pair } +/** Something which is scheduled to happen at a point in time */ +interface Scheduled { + val scheduledAt: Instant +} + +/** + * Represents a contract state (unconsumed output) of type [LinearState] and a point in time that a lifecycle event is expected to take place + * for that contract state. + * + * This is effectively the input to a scheduler, which wakes up at that point in time and asks the contract state what + * lifecycle processing needs to take place. e.g. a fixing or a late payment etc. + */ +data class ScheduledStateRef(val ref: StateRef, override val scheduledAt: Instant) : Scheduled + +/** + * This class represents the lifecycle activity that a contract state of type [LinearState] would like to perform at a given point in time. + * e.g. run a fixing protocol + * + * Note the use of [ProtocolLogicRef] to represent a safe way to transport a [ProtocolLogic] out of the contract sandbox. + * + * Currently we support only protocol based activities as we expect there to be a transaction generated off the back of + * the activity, otherwise we have to start tracking secondary state on the platform of which scheduled activities + * for a particular [ContractState] have been processed/fired etc. If the activity is not "on ledger" then the + * scheduled activity shouldn't be either. + */ +data class ScheduledActivity(val logicRef: ProtocolLogicRef, override val scheduledAt: Instant) : Scheduled + /** * A state that evolves by superseding itself, all of which share the common "thread" * @@ -154,12 +183,24 @@ interface LinearState : ContractState { fun isRelevant(ourKeys: Set): Boolean } +interface SchedulableState : ContractState { + /** + * Indicate whether there is some activity to be performed at some future point in time with respect to this + * [ContractState], what that activity is and at what point in time it should be initiated. + * This can be used to implement deadlines for payment or processing of financial instruments according to a schedule. + * + * The state has no reference to it's own StateRef, so supply that for use as input to any ProtocolLogic constructed. + * + * @return null if there is no activity to schedule + */ + fun nextScheduledActivity(thisStateRef: StateRef, protocolLogicRefFactory: ProtocolLogicRefFactory): ScheduledActivity? +} + /** * Interface representing an agreement that exposes various attributes that are common. Implementing it simplifies * implementation of general protocols that manipulate many agreement types. */ interface DealState : LinearState { - /** Human readable well known reference (e.g. trade reference) */ val ref: String @@ -187,8 +228,6 @@ interface DealState : LinearState { interface FixableDealState : DealState { /** * When is the next fixing and what is the fixing for? - * - * TODO: In future we would use this to register for an event to trigger a/the fixing protocol */ fun nextFixingOf(): FixOf? diff --git a/core/src/main/kotlin/com/r3corda/core/node/ServiceHub.kt b/core/src/main/kotlin/com/r3corda/core/node/ServiceHub.kt index 3e80aa0644..893f83a868 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/ServiceHub.kt @@ -20,6 +20,7 @@ interface ServiceHub { val storageService: StorageService val networkService: MessagingService val networkMapCache: NetworkMapCache + val schedulerService: SchedulerService val clock: Clock /** diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt index 8bb335b309..7596967949 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt @@ -1,5 +1,7 @@ package com.r3corda.core.node.services +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture import com.r3corda.core.contracts.* import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash @@ -113,6 +115,17 @@ interface WalletService { * the update. */ val updates: rx.Observable + + /** + * Provide a [Future] for when a [StateRef] is consumed, which can be very useful in building tests. + */ + fun whenConsumed(ref: StateRef): ListenableFuture { + val future = SettableFuture.create() + updates.filter { ref in it.consumed }.first().subscribe { + future.set(it) + } + return future + } } inline fun WalletService.linearHeadsOfType() = linearHeadsOfType_(T::class.java) @@ -172,4 +185,24 @@ interface TxWritableStorageService : StorageService { override val validatedTransactions: TransactionStorage } +/** + * Provides access to schedule activity at some point in time. This interface might well be expanded to + * increase the feature set in the future. + * + * If the point in time is in the past, the expectation is that the activity will happen shortly after it is scheduled. + * + * The main consumer initially is an observer of the wallet to schedule activities based on transactions as they are + * recorded. + */ +interface SchedulerService { + /** + * Schedule a new activity for a TX output, probably because it was just produced. + * + * Only one activity can be scheduled for a particular [StateRef] at any one time. Scheduling a [ScheduledStateRef] + * replaces any previously scheduled [ScheduledStateRef] for any one [StateRef]. + */ + fun scheduleStateActivity(action: ScheduledStateRef) + /** Unschedule all activity for a TX output, probably because it was consumed. */ + fun unscheduleStateActivity(ref: StateRef) +} \ No newline at end of file diff --git a/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolLogic.kt b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolLogic.kt index d602e040ef..0ab15a3be2 100644 --- a/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolLogic.kt +++ b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolLogic.kt @@ -71,9 +71,15 @@ abstract class ProtocolLogic { private fun maybeWireUpProgressTracking(subLogic: ProtocolLogic<*>) { val ours = progressTracker + val theirs = subLogic.progressTracker - if (ours != null && theirs != null) + if (ours != null && theirs != null) { + if (ours.currentStep == ProgressTracker.UNSTARTED) { + logger.warn("ProgressTracker has not been started for $this") + ours.nextStep() + } ours.setChildProgressTracker(ours.currentStep, theirs) + } } /** diff --git a/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolLogicRef.kt b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolLogicRef.kt new file mode 100644 index 0000000000..2e8bffb531 --- /dev/null +++ b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolLogicRef.kt @@ -0,0 +1,163 @@ +package com.r3corda.core.protocols + +import com.google.common.primitives.Primitives +import com.r3corda.core.contracts.StateRef +import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.serialization.SingletonSerializeAsToken +import com.r3corda.protocols.TwoPartyDealProtocol +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type +import java.time.Duration +import java.util.* +import kotlin.reflect.KFunction +import kotlin.reflect.KParameter +import kotlin.reflect.jvm.javaType +import kotlin.reflect.primaryConstructor + +/** + * A class for conversion to and from [ProtocolLogic] and [ProtocolLogicRef] instances + * + * Validation of types is performed on the way in and way out in case this object is passed between JVMs which might have differing + * whitelists. + * + * TODO: Ways to populate whitelist of "blessed" protocols per node/party + * TODO: Ways to populate argument types whitelist. Per node/party or global? + * TODO: Align with API related logic for passing in ProtocolLogic references (ProtocolRef) + * TODO: Actual support for AppContext / AttachmentsClassLoader + */ +class ProtocolLogicRefFactory(private val protocolLogicClassNameWhitelist: Set, private val argsClassNameWhitelist: Set) : SingletonSerializeAsToken() { + + constructor() : this(setOf(TwoPartyDealProtocol.FixingRoleDecider::class.java.name), setOf(StateRef::class.java.name, Duration::class.java.name)) + + // Pending real dependence on AppContext for class loading etc + @Suppress("UNUSED_PARAMETER") + private fun validateProtocolClassName(className: String, appContext: AppContext) { + // TODO: make this specific to the attachments in the [AppContext] by including [SecureHash] in whitelist check + require(className in protocolLogicClassNameWhitelist) { "${ProtocolLogic::class.java.simpleName} of ${ProtocolLogicRef::class.java.simpleName} must have type on the whitelist: $className" } + } + + // Pending real dependence on AppContext for class loading etc + @Suppress("UNUSED_PARAMETER") + private fun validateArgClassName(className: String, appContext: AppContext) { + // TODO: make this specific to the attachments in the [AppContext] by including [SecureHash] in whitelist check + require(className in argsClassNameWhitelist) { "Args to ${ProtocolLogicRef::class.java.simpleName} must have types on the args whitelist: $className" } + } + + /** + * Create a [ProtocolLogicRef] for the Kotlin primary constructor or Java constructor and the given args. + */ + fun create(type: Class>, vararg args: Any?): ProtocolLogicRef { + val constructor = type.kotlin.primaryConstructor ?: return createJava(type, *args) + if (constructor.parameters.size < args.size) { + throw IllegalProtocolLogicException(type, "due to too many arguments supplied to kotlin primary constructor") + } + // Build map of args from array + val argsMap = args.zip(constructor.parameters).map { Pair(it.second.name!!, it.first) }.toMap() + return createKotlin(type, argsMap) + } + + /** + * Create a [ProtocolLogicRef] by trying to find a Kotlin constructor that matches the given args. + * + * TODO: Rethink language specific naming. + */ + fun createKotlin(type: Class>, args: Map): ProtocolLogicRef { + // TODO: we need to capture something about the class loader or "application context" into the ref, + // perhaps as some sort of ThreadLocal style object. For now, just create an empty one. + val appContext = AppContext(emptyList()) + validateProtocolClassName(type.name, appContext) + // Check we can find a constructor and populate the args to it, but don't call it + createConstructor(appContext, type, args) + return ProtocolLogicRef(type.name, appContext, args) + } + + /** + * Create a [ProtocolLogicRef] by trying to find a Java constructor that matches the given args. + */ + private fun createJava(type: Class>, vararg args: Any?): ProtocolLogicRef { + // Build map for each + val argsMap = HashMap(args.size) + var index = 0 + args.forEach { argsMap["arg${index++}"] = it } + return createKotlin(type, argsMap) + } + + fun toProtocolLogic(ref: ProtocolLogicRef): ProtocolLogic<*> { + validateProtocolClassName(ref.protocolLogicClassName, ref.appContext) + val klass = Class.forName(ref.protocolLogicClassName, true, ref.appContext.classLoader).asSubclass(ProtocolLogic::class.java) + return createConstructor(ref.appContext, klass, ref.args)() + } + + private fun createConstructor(appContext: AppContext, clazz: Class>, args: Map): () -> ProtocolLogic<*> { + for (constructor in clazz.kotlin.constructors) { + val params = buildParams(appContext, constructor, args) ?: continue + // If we get here then we matched every parameter + return { constructor.callBy(params) } + } + throw IllegalProtocolLogicException(clazz, "as could not find matching constructor for: $args") + } + + private fun buildParams(appContext: AppContext, constructor: KFunction>, args: Map): HashMap? { + val params = hashMapOf() + val usedKeys = hashSetOf() + for (parameter in constructor.parameters) { + if (!tryBuildParam(args, parameter, params)) { + return null + } else { + usedKeys += parameter.name!! + } + } + if ((args.keys - usedKeys).isNotEmpty()) { + // Not all args were used + return null + } + params.values.forEach { if (it is Any) validateArgClassName(it.javaClass.name, appContext) } + return params + } + + private fun tryBuildParam(args: Map, parameter: KParameter, params: HashMap): Boolean { + val containsKey = parameter.name in args + // OK to be missing if optional + return (parameter.isOptional && !containsKey) || (containsKey && paramCanBeBuilt(args, parameter, params)) + } + + private fun paramCanBeBuilt(args: Map, parameter: KParameter, params: HashMap): Boolean { + val value = args[parameter.name] + params[parameter] = value + return (value is Any && parameterAssignableFrom(parameter.type.javaType, value)) || parameter.type.isMarkedNullable + } + + private fun parameterAssignableFrom(type: Type, value: Any): Boolean { + if (type is Class<*>) { + if (type.isPrimitive) { + return Primitives.unwrap(value.javaClass) == type + } else { + return type.isAssignableFrom(value.javaClass) + } + } else if (type is ParameterizedType) { + return parameterAssignableFrom(type.rawType, value) + } else { + return false + } + } +} + +class IllegalProtocolLogicException(type: Class<*>, msg: String) : IllegalArgumentException("${ProtocolLogicRef::class.java.simpleName} cannot be constructed for ${ProtocolLogic::class.java.simpleName} of type ${type.name} $msg") + +/** + * A class representing a [ProtocolLogic] instance which would be possible to safely pass out of the contract sandbox + * + * Only allows a String reference to the ProtocolLogic class, and only allows restricted argument types as per [ProtocolLogicRefFactory] + */ +// TODO: align this with the existing [ProtocolRef] in the bank-side API (probably replace some of the API classes) +data class ProtocolLogicRef internal constructor(val protocolLogicClassName: String, val appContext: AppContext, val args: Map) + +/** + * This is just some way to track what attachments need to be in the class loader, but may later include some app + * properties loaded from the attachments. And perhaps the authenticated user for an API call? + */ +data class AppContext(val attachments: List) { + // TODO: build a real [AttachmentsClassLoader] etc + val classLoader: ClassLoader + get() = this.javaClass.classLoader +} \ No newline at end of file diff --git a/core/src/main/kotlin/com/r3corda/core/utilities/OracleUtils.kt b/core/src/main/kotlin/com/r3corda/core/utilities/OracleUtils.kt new file mode 100644 index 0000000000..6414b29272 --- /dev/null +++ b/core/src/main/kotlin/com/r3corda/core/utilities/OracleUtils.kt @@ -0,0 +1,19 @@ +package com.r3corda.core.utilities + +import java.time.* + +/** + * This whole file exists as short cuts to get demos working. In reality we'd have static data and/or rules engine + * defining things like this. It currently resides in the core module because it needs to be visible to the IRS + * contract. + */ +// We at some future point may implement more than just this constant announcement window and thus use the params. +@Suppress("UNUSED_PARAMETER") +fun suggestInterestRateAnnouncementTimeWindow(index: String, source: String, date: LocalDate): TimeWindow { + // TODO: we would ordinarily convert clock to same time zone as the index/source would announce in + // and suggest an announcement time for the interest rate + // Here we apply a blanket announcement time of 11:45 London irrespective of source or index + val time = LocalTime.of(11, 45) + val zoneId = ZoneId.of("Europe/London") + return TimeWindow(ZonedDateTime.of(date, time, zoneId).toInstant(), Duration.ofHours(24)) +} diff --git a/core/src/main/kotlin/com/r3corda/protocols/RatesFixProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/RatesFixProtocol.kt index 62f0e020cc..1b910c8698 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/RatesFixProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/RatesFixProtocol.kt @@ -1,7 +1,6 @@ package com.r3corda.protocols import co.paralleluniverse.fibers.Suspendable -import com.r3corda.core.* import com.r3corda.core.contracts.Fix import com.r3corda.core.contracts.FixOf import com.r3corda.core.contracts.TransactionBuilder @@ -10,8 +9,12 @@ import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.random63BitValue import com.r3corda.core.utilities.ProgressTracker +import com.r3corda.core.utilities.suggestInterestRateAnnouncementTimeWindow import java.math.BigDecimal +import java.time.Duration +import java.time.Instant import java.util.* // This code is unit tested in NodeInterestRates.kt @@ -29,6 +32,7 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder, private val fixOf: FixOf, private val expectedRate: BigDecimal, private val rateTolerance: BigDecimal, + private val timeOut: Duration, override val progressTracker: ProgressTracker = RatesFixProtocol.tracker(fixOf.name)) : ProtocolLogic() { companion object { val TOPIC = "platform.rates.interest.fix" @@ -44,7 +48,7 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder, class FixOutOfRange(val byAmount: BigDecimal) : Exception() - class QueryRequest(val queries: List, replyTo: SingleMessageRecipient, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID) + class QueryRequest(val queries: List, replyTo: SingleMessageRecipient, sessionID: Long, val deadline: Instant) : AbstractRequestMessage(replyTo, sessionID) class SignRequest(val tx: WireTransaction, replyTo: SingleMessageRecipient, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID) @Suspendable @@ -92,7 +96,9 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder, @Suspendable fun query(): Fix { val sessionID = random63BitValue() - val req = QueryRequest(listOf(fixOf), serviceHub.networkService.myAddress, sessionID) + val deadline = suggestInterestRateAnnouncementTimeWindow(fixOf.name, oracle.identity.name, fixOf.forDay).end + val req = QueryRequest(listOf(fixOf), serviceHub.networkService.myAddress, sessionID, deadline) + // TODO: add deadline to receive val resp = sendAndReceive>(TOPIC_QUERY, oracle.address, 0, sessionID, req) return resp.validate { diff --git a/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt index 8e0bf8ecd9..d78fe77fef 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt @@ -1,6 +1,7 @@ package com.r3corda.protocols import co.paralleluniverse.fibers.Suspendable +import com.r3corda.core.TransientProperty import com.r3corda.core.contracts.* import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.Party @@ -17,6 +18,7 @@ import java.math.BigDecimal import java.security.KeyPair import java.security.PublicKey import java.security.SignatureException +import java.time.Duration /** * Classes for manipulating a two party deal or agreement. @@ -25,6 +27,8 @@ import java.security.SignatureException * * TODO: Also, the term Deal is used here where we might prefer Agreement. * + * TODO: Consider whether we can merge this with [TwoPartyTradeProtocol] + * */ object TwoPartyDealProtocol { val DEAL_TOPIC = "platform.deal" @@ -52,12 +56,7 @@ object TwoPartyDealProtocol { * There's a good chance we can push at least some of this logic down into core protocol logic * and helper methods etc. */ - abstract class Primary(val payload: U, - val otherSide: SingleMessageRecipient, - val otherSessionID: Long, - val myKeyPair: KeyPair, - val notaryNode: NodeInfo, - override val progressTracker: ProgressTracker = Primary.tracker()) : ProtocolLogic() { + abstract class Primary(override val progressTracker: ProgressTracker = Primary.tracker()) : ProtocolLogic() { companion object { object AWAITING_PROPOSAL : ProgressTracker.Step("Handshaking and awaiting transaction proposal") @@ -71,6 +70,12 @@ object TwoPartyDealProtocol { fun tracker() = ProgressTracker(AWAITING_PROPOSAL, VERIFYING, SIGNING, NOTARY, SENDING_SIGS, RECORDING, COPYING_TO_REGULATOR) } + abstract val payload: U + abstract val notaryNode: NodeInfo + abstract val otherSide: SingleMessageRecipient + abstract val otherSessionID: Long + abstract val myKeyPair: KeyPair + @Suspendable fun getPartialTransaction(): UntrustworthyData { progressTracker.currentStep = AWAITING_PROPOSAL @@ -79,7 +84,6 @@ object TwoPartyDealProtocol { // Make the first message we'll send to kick off the protocol. val hello = Handshake(payload, myKeyPair.public, sessionID) - val maybeSTX = sendAndReceive(DEAL_TOPIC, otherSide, otherSessionID, sessionID, hello) return maybeSTX @@ -143,6 +147,7 @@ object TwoPartyDealProtocol { logger.trace { "Deal stored" } + progressTracker.currentStep = COPYING_TO_REGULATOR val regulators = serviceHub.networkMapCache.regulators if (regulators.isNotEmpty()) { // Copy the transaction to every regulator in the network. This is obviously completely bogus, it's @@ -186,10 +191,7 @@ object TwoPartyDealProtocol { * There's a good chance we can push at least some of this logic down into core protocol logic * and helper methods etc. */ - abstract class Secondary(val otherSide: SingleMessageRecipient, - val notary: Party, - val sessionID: Long, - override val progressTracker: ProgressTracker = Secondary.tracker()) : ProtocolLogic() { + abstract class Secondary(override val progressTracker: ProgressTracker = Secondary.tracker()) : ProtocolLogic() { companion object { object RECEIVING : ProgressTracker.Step("Waiting for deal info") @@ -201,6 +203,9 @@ object TwoPartyDealProtocol { fun tracker() = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SWAPPING_SIGNATURES, RECORDING) } + abstract val otherSide: SingleMessageRecipient + abstract val sessionID: Long + @Suspendable override fun call(): SignedTransaction { val handshake = receiveAndValidateHandshake() @@ -241,7 +246,7 @@ object TwoPartyDealProtocol { @Suspendable private fun swapSignaturesWithPrimary(stx: SignedTransaction, theirSessionID: Long): SignaturesFromPrimary { progressTracker.currentStep = SWAPPING_SIGNATURES - logger.trace { "Sending partially signed transaction to seller" } + logger.trace { "Sending partially signed transaction to other party" } // TODO: Protect against the seller terminating here and leaving us in the lurch without the final tx. @@ -271,45 +276,47 @@ object TwoPartyDealProtocol { /** * One side of the protocol for inserting a pre-agreed deal. */ - open class Instigator(otherSide: SingleMessageRecipient, - notaryNode: NodeInfo, - dealBeingOffered: T, - myKeyPair: KeyPair, - buyerSessionID: Long, - override val progressTracker: ProgressTracker = Primary.tracker()) : Primary(dealBeingOffered, otherSide, buyerSessionID, myKeyPair, notaryNode) + open class Instigator(override val otherSide: SingleMessageRecipient, + val notary: Party, + override val payload: T, + override val myKeyPair: KeyPair, + override val otherSessionID: Long, + override val progressTracker: ProgressTracker = Primary.tracker()) : Primary() { + + override val notaryNode: NodeInfo get() = + serviceHub.networkMapCache.notaryNodes.filter { it.identity == notary }.single() + } /** * One side of the protocol for inserting a pre-agreed deal. */ - open class Acceptor(otherSide: SingleMessageRecipient, - notary: Party, + open class Acceptor(override val otherSide: SingleMessageRecipient, + val notary: Party, val dealToBuy: T, - sessionID: Long, - override val progressTracker: ProgressTracker = Secondary.tracker()) : Secondary(otherSide, notary, sessionID) { + override val sessionID: Long, + override val progressTracker: ProgressTracker = Secondary.tracker()) : Secondary() { + override fun validateHandshake(handshake: Handshake): Handshake { - with(handshake) { - // What is the seller trying to sell us? - val deal: T = handshake.payload - val otherKey = handshake.publicKey - logger.trace { "Got deal request for: ${handshake.payload}" } + // What is the seller trying to sell us? + val deal: T = handshake.payload + val otherKey = handshake.publicKey + logger.trace { "Got deal request for: ${handshake.payload.ref}" } - // Check the start message for acceptability. - check(handshake.sessionID > 0) - if (dealToBuy != deal) - throw DealMismatchException(dealToBuy, deal) + // Check the start message for acceptability. + check(handshake.sessionID > 0) + check(dealToBuy == deal) - // We need to substitute in the new public keys for the Parties - val myName = serviceHub.storageService.myLegalIdentity.name - val myOldParty = deal.parties.single { it.name == myName } - val theirOldParty = deal.parties.single { it.name != myName } + // We need to substitute in the new public keys for the Parties + val myName = serviceHub.storageService.myLegalIdentity.name + val myOldParty = deal.parties.single { it.name == myName } + val theirOldParty = deal.parties.single { it.name != myName } - @Suppress("UNCHECKED_CAST") - val newDeal = deal. - withPublicKey(myOldParty, serviceHub.keyManagementService.freshKey().public). - withPublicKey(theirOldParty, otherKey) as T + @Suppress("UNCHECKED_CAST") + val newDeal = deal. + withPublicKey(myOldParty, serviceHub.keyManagementService.freshKey().public). + withPublicKey(theirOldParty, otherKey) as T - return handshake.copy(payload = newDeal) - } + return handshake.copy(payload = newDeal) } @@ -328,59 +335,62 @@ object TwoPartyDealProtocol { * One side of the fixing protocol for an interest rate swap, but could easily be generalised further. * * Do not infer too much from the name of the class. This is just to indicate that it is the "side" - * of the protocol that is run by the party with the fixed leg of swap deal, which is the basis for decided + * of the protocol that is run by the party with the fixed leg of swap deal, which is the basis for deciding * who does what in the protocol. */ - open class Fixer(otherSide: SingleMessageRecipient, - notary: Party, - val dealToFix: StateAndRef, - sessionID: Long, - val replacementProgressTracker: ProgressTracker? = null) : Secondary(otherSide, notary, sessionID) { - private val ratesFixTracker = RatesFixProtocol.tracker(dealToFix.state.data.nextFixingOf()!!.name) + class Fixer(val initiation: FixingSessionInitiation, override val progressTracker: ProgressTracker = Secondary.tracker()) : Secondary() { - override val progressTracker: ProgressTracker = replacementProgressTracker ?: createTracker() + override val sessionID: Long get() = initiation.sessionID - fun createTracker(): ProgressTracker = Secondary.tracker().apply { - setChildProgressTracker(SIGNING, ratesFixTracker) - } + override val otherSide: SingleMessageRecipient get() = initiation.sender + + private lateinit var txState: TransactionState<*> + private lateinit var deal: FixableDealState override fun validateHandshake(handshake: Handshake): Handshake { - with(handshake) { - logger.trace { "Got fixing request for: ${dealToFix.state}" } + logger.trace { "Got fixing request for: ${handshake.payload}" } - // Check the start message for acceptability. - if (dealToFix.ref != handshake.payload) - throw DealRefMismatchException(dealToFix.ref, handshake.payload) + // Check the handshake and initiation for acceptability. + check(handshake.sessionID > 0) + txState = serviceHub.loadState(handshake.payload) + deal = txState.data as FixableDealState - return handshake - } + // validate the party that initiated is the one on the deal and that the recipient corresponds with it. + // TODO: this is in no way secure and will be replaced by general session initiation logic in the future + val myName = serviceHub.storageService.myLegalIdentity.name + val otherParty = deal.parties.filter { it.name != myName }.single() + check(otherParty == initiation.party) + val otherPartyAddress = serviceHub.networkMapCache.getNodeByLegalName(otherParty.name)!!.address + check(otherPartyAddress == otherSide) + // Also check we are one of the parties + deal.parties.filter { it.name == myName }.single() + + return handshake } @Suspendable override fun assembleSharedTX(handshake: Handshake): Pair> { - val fixOf = dealToFix.state.data.nextFixingOf()!! + @Suppress("UNCHECKED_CAST") + val fixOf = deal.nextFixingOf()!! // TODO Do we need/want to substitute in new public keys for the Parties? val myName = serviceHub.storageService.myLegalIdentity.name - val deal: T = dealToFix.state.data val myOldParty = deal.parties.single { it.name == myName } - @Suppress("UNCHECKED_CAST") val newDeal = deal val ptx = TransactionType.General.Builder() - val addFixing = object : RatesFixProtocol(ptx, serviceHub.networkMapCache.ratesOracleNodes[0], fixOf, BigDecimal.ZERO, BigDecimal.ONE) { + val addFixing = object : RatesFixProtocol(ptx, serviceHub.networkMapCache.ratesOracleNodes[0], fixOf, BigDecimal.ZERO, BigDecimal.ONE, initiation.timeout) { @Suspendable override fun beforeSigning(fix: Fix) { - newDeal.generateFix(ptx, dealToFix, fix) + newDeal.generateFix(ptx, StateAndRef(txState, handshake.payload), fix) // And add a request for timestamping: it may be that none of the contracts need this! But it can't hurt // to have one. - ptx.setTime(serviceHub.clock.instant(), notary, 30.seconds) + ptx.setTime(serviceHub.clock.instant(), txState.notary, 30.seconds) } } subProtocol(addFixing) - return Pair(ptx, arrayListOf(myOldParty.owningKey)) } } @@ -392,11 +402,72 @@ object TwoPartyDealProtocol { * is just the "side" of the protocol run by the party with the floating leg as a way of deciding who * does what in the protocol. */ - open class Floater(otherSide: SingleMessageRecipient, - otherSessionID: Long, - notary: NodeInfo, - dealToFix: StateAndRef, - myKeyPair: KeyPair, - val sessionID: Long, - override val progressTracker: ProgressTracker = Primary.tracker()) : Primary(dealToFix.ref, otherSide, otherSessionID, myKeyPair, notary) + class Floater(override val payload: StateRef, + override val otherSessionID: Long, + override val progressTracker: ProgressTracker = Primary.tracker()) : Primary() { + @Suppress("UNCHECKED_CAST") + internal val dealToFix: StateAndRef by TransientProperty { + val state = serviceHub.loadState(payload) as TransactionState + StateAndRef(state, payload) + } + + override val myKeyPair: KeyPair get() { + val myName = serviceHub.storageService.myLegalIdentity.name + val publicKey = dealToFix.state.data.parties.filter { it.name == myName }.single().owningKey + return serviceHub.keyManagementService.toKeyPair(publicKey) + } + + override val otherSide: SingleMessageRecipient get() { + // TODO: what happens if there's no node? Move to messaging taking Party and then handled in messaging layer + val myName = serviceHub.storageService.myLegalIdentity.name + val otherParty = dealToFix.state.data.parties.filter { it.name != myName }.single() + return serviceHub.networkMapCache.getNodeByLegalName(otherParty.name)!!.address + } + + override val notaryNode: NodeInfo get() = + serviceHub.networkMapCache.notaryNodes.filter { it.identity == dealToFix.state.notary }.single() + } + + /** This topic exists purely for [FixingSessionInitiation] to be sent from [FixingRoleDecider] to [FixingSessionInitiationHandler] */ + val FIX_INITIATE_TOPIC = "platform.fix.initiate" + + /** Used to set up the session between [Floater] and [Fixer] */ + data class FixingSessionInitiation(val sessionID: Long, val party: Party, val sender: SingleMessageRecipient, val timeout: Duration) + + /** + * This protocol looks at the deal and decides whether to be the Fixer or Floater role in agreeing a fixing. + * + * It is kicked off as an activity on both participant nodes by the scheduler when it's time for a fixing. If the + * Fixer role is chosen, then that will be initiated by the [FixingSessionInitiation] message sent from the other party and + * handled by the [FixingSessionInitiationHandler]. + * + * TODO: Replace [FixingSessionInitiation] and [FixingSessionInitiationHandler] with generic session initiation logic once it exists. + */ + class FixingRoleDecider(val ref: StateRef, val timeout: Duration, override val progressTracker: ProgressTracker = tracker(ref.toString())) : ProtocolLogic() { + companion object { + class LOADING(ref: String) : ProgressTracker.Step("Loading state $ref to decide fixing role") + + fun tracker(ref: String) = ProgressTracker(LOADING(ref)) + } + + @Suspendable + override fun call(): Unit { + progressTracker.nextStep() + val dealToFix = serviceHub.loadState(ref) + // TODO: this is not the eventual mechanism for identifying the parties + val sortedParties = (dealToFix.data as FixableDealState).parties.sortedBy { it.name } + if (sortedParties[0].name == serviceHub.storageService.myLegalIdentity.name) { + // Generate sessionID + val sessionID = random63BitValue() + val initation = FixingSessionInitiation(sessionID, sortedParties[0], serviceHub.networkService.myAddress, timeout) + + // Send initiation to other side to launch one side of the fixing protocol (the Fixer). + send(FIX_INITIATE_TOPIC, serviceHub.networkMapCache.getNodeByLegalName(sortedParties[1].name)!!.address, 0, initation) + + // Then start the other side of the fixing protocol. + val protocol = Floater(ref, sessionID) + subProtocol(protocol) + } + } + } } \ No newline at end of file diff --git a/core/src/test/java/com/r3corda/core/protocols/ProtocolLogicRefFromJavaTest.java b/core/src/test/java/com/r3corda/core/protocols/ProtocolLogicRefFromJavaTest.java new file mode 100644 index 0000000000..4e7a3a707f --- /dev/null +++ b/core/src/test/java/com/r3corda/core/protocols/ProtocolLogicRefFromJavaTest.java @@ -0,0 +1,42 @@ +package com.r3corda.core.protocols; + + +import com.google.common.collect.Sets; +import org.junit.Test; + +public class ProtocolLogicRefFromJavaTest { + + public static class JavaProtocolLogic extends ProtocolLogic { + + public JavaProtocolLogic(int A, String b) { + } + + @Override + public Void call() { + return null; + } + } + + public static class JavaNoArgProtocolLogic extends ProtocolLogic { + + public JavaNoArgProtocolLogic() { + } + + @Override + public Void call() { + return null; + } + } + + @Test + public void test() { + ProtocolLogicRefFactory factory = new ProtocolLogicRefFactory(Sets.newHashSet(JavaProtocolLogic.class.getName()), Sets.newHashSet(Integer.class.getName(), String.class.getName())); + factory.create(JavaProtocolLogic.class, 1, "Hello Jack"); + } + + @Test + public void testNoArg() { + ProtocolLogicRefFactory factory = new ProtocolLogicRefFactory(Sets.newHashSet(JavaNoArgProtocolLogic.class.getName()), Sets.newHashSet(Integer.class.getName(), String.class.getName())); + factory.create(JavaNoArgProtocolLogic.class); + } +} diff --git a/core/src/test/kotlin/com/r3corda/core/protocols/ProtocolLogicRefTest.kt b/core/src/test/kotlin/com/r3corda/core/protocols/ProtocolLogicRefTest.kt new file mode 100644 index 0000000000..a3ad42d0de --- /dev/null +++ b/core/src/test/kotlin/com/r3corda/core/protocols/ProtocolLogicRefTest.kt @@ -0,0 +1,80 @@ +package com.r3corda.core.protocols + +import com.google.common.collect.Sets +import com.r3corda.core.days +import org.junit.Before +import org.junit.Test +import java.time.Duration + +class ProtocolLogicRefTest { + + @Suppress("UNUSED_PARAMETER") // We will never use A or b + class KotlinProtocolLogic(A: Int, b: String) : ProtocolLogic() { + constructor() : this(1, "2") + + constructor(C: String) : this(1, C) + + constructor(illegal: Duration) : this(1, illegal.toString()) + + override fun call(): Unit { + } + } + + class KotlinNoArgProtocolLogic : ProtocolLogic() { + override fun call(): Unit { + } + } + + @Suppress("UNUSED_PARAMETER") // We will never use A or b + class NotWhiteListedKotlinProtocolLogic(A: Int, b: String) : ProtocolLogic() { + override fun call(): Unit { + } + } + + lateinit var factory: ProtocolLogicRefFactory + + @Before + fun setup() { + // We have to allow Java boxed primitives but Kotlin warns we shouldn't be using them + factory = ProtocolLogicRefFactory(Sets.newHashSet(KotlinProtocolLogic::class.java.name, KotlinNoArgProtocolLogic::class.java.name), + Sets.newHashSet(@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") Integer::class.java.name, String::class.java.name)) + } + + @Test + fun testCreateKotlinNoArg() { + factory.create(KotlinNoArgProtocolLogic::class.java) + } + + @Test + fun testCreateKotlin() { + val args = mapOf(Pair("A", 1), Pair("b", "Hello Jack")) + factory.createKotlin(KotlinProtocolLogic::class.java, args) + } + + @Test + fun testCreatePrimary() { + factory.create(KotlinProtocolLogic::class.java, 1, "Hello Jack") + } + + @Test(expected = IllegalArgumentException::class) + fun testCreateNotWhiteListed() { + factory.create(NotWhiteListedKotlinProtocolLogic::class.java, 1, "Hello Jack") + } + + @Test + fun testCreateKotlinVoid() { + factory.createKotlin(KotlinProtocolLogic::class.java, emptyMap()) + } + + @Test + fun testCreateKotlinNonPrimary() { + val args = mapOf(Pair("C", "Hello Jack")) + factory.createKotlin(KotlinProtocolLogic::class.java, args) + } + + @Test(expected = IllegalArgumentException::class) + fun testCreateArgNotWhiteListed() { + val args = mapOf(Pair("illegal", 1.days)) + factory.createKotlin(KotlinProtocolLogic::class.java, args) + } +} diff --git a/docs/source/event-scheduling.rst b/docs/source/event-scheduling.rst new file mode 100644 index 0000000000..9c2cacfd92 --- /dev/null +++ b/docs/source/event-scheduling.rst @@ -0,0 +1,102 @@ +.. highlight:: kotlin +.. raw:: html + + + + +Event scheduling +================ + +This article explains our experimental approach to modelling time based events in code. It explains how a contract +state can expose an upcoming event and what action to take if the scheduled time for that event is reached. + +Introduction +------------ + +Many financial instruments have time sensitive components to them. For example, an Interest Rate Swap has a schedule +for when: + +* Interest rate fixings should take place for floating legs, so that the interest rate used as the basis for payments + can be agreed. +* Any payments between the parties are expected to take place. +* Any payments between the parties become overdue. + +Each of these is dependent on the current state of the financial instrument. What payments and interest rate fixings +have already happened should already be recorded in the state, for example. This means that the *next* time sensitive +event is thus a property of the current contract state. By next, we mean earliest in chronological terms, that is still +due. If a contract state is consumed in the UTXO model, then what *was* the next event becomes irrelevant and obsolete +and the next time sensitive event is determined by any successor contract state. + +Knowing when the next time sensitive event is due to occur is useful, but typically some *activity* is expected to take +place when this event occurs. We already have a model for business processes in the form of the protocol state machines, +so in the platform we have introduced the concept of *scheduled activities* that can invoke protocol state machines +at a scheduled time. A contract state can optionally described the next scheduled activity for itself. If it omits +to do so, then nothing will be scheduled. + +How to implement scheduled events +--------------------------------- + +There are two main steps to implementing scheduled events: + +* Have your ``ContractState`` implementation also implement ``SchedulableState``. This requires a method named + ``nextScheduledActivity`` to be implemented which returns an optional ``ScheduledActivity`` instance. + ``ScheduledActivity`` captures what ``ProtocolLogic`` instance each node will run, to perform the activity, and when it + will run is described by a ``java.time.Instant``. Once your state implements this interface and is tracked by the + wallet, it can expect to be queried for the next activity when recorded via the ``ServiceHub.recordTransactions`` + method during protocols execution. +* If nothing suitable exists, implement a ``ProtocolLogic`` to be executed by each node as the activity itself. + The important thing to remember is that each node that is party to the transaction, in the current implementation, + will execute the same ``ProtocolLogic`` so that needs to establish roles in the business process based on the contract + state and the node it is running on, and follow different but complementary paths through the business logic. + +.. note:: The scheduler's clock always operates in the UTC time zone for uniformity, so any time zone logic must be + performed by the contract, using ``ZonedDateTime``. + +In the short term, until we have automatic protocol session set up, you will also likely need to install a network +handler to help with obtaining a unqiue and secure random session. An example is described below. + +The production and consumption of ``ContractStates`` is observed by the scheduler and the activities associated with +any consumed states are unscheduled. Any newly produced states are then queried via the ``nextScheduledActivity`` +method and if they do not return ``null`` then that activity is scheduled based on the content of the +``ScheduledActivity`` object returned. + +An example +---------- + +Let's take an example of the Interest Rate Swap fixings for our scheduled events. The first task is to implement the +``nextScheduledActivity`` method on the ``State``. + + +.. container:: codeset + + .. sourcecode:: kotlin + + override fun nextScheduledActivity(thisStateRef: StateRef, + protocolLogicRefFactory: ProtocolLogicRefFactory): ScheduledActivity? { + val nextFixingOf = nextFixingOf() ?: return null + + // This is perhaps not how we should determine the time point in the business day, but instead expect the + // schedule to detail some of these aspects. + val (instant, duration) = suggestInterestRateAnnouncementTimeWindow(index = nextFixingOf.name, + source = floatingLeg.indexSource, + date = nextFixingOf.forDay) + return ScheduledActivity(protocolLogicRefFactory.create(TwoPartyDealProtocol.FixingRoleDecider::class.java, + thisStateRef, duration), instant) + } + +The first thing this does is establish if there are any remaining fixings. If there are none, then it returns ``null`` +to indicate that there is no activity to schedule. Otherwise it calculates the ``Instant`` at which the interest rate +should become available and schedules an activity at that time to work out what roles each node will take in the fixing +business process and to take on those roles. That ``ProtocolLogic`` will be handed the ``StateRef`` for the interest +rate swap ``State`` in question, as well as a tolerance ``Duration`` of how long to wait after the activity is triggered +for the interest rate before indicating an error. + +.. note:: The use of the factory to create a ``ProtocolLogicRef`` instance to embed in the ``ScheduledActivity``. This is a + way to create a reference to the ``ProtocolLogic`` class and it's constructor parameters to instantiate that can be + checked against a per node whitelist of approved and allowable types as part of our overall security sandboxing. + +As previously mentioned, we currently need a small network handler to assist with session setup until the work to +automate that is complete. See the interest rate swap specific implementation ``FixingSessionInitiationHandler`` which +is responsible for starting a ``ProtocolLogic`` to perform one role in the fixing protocol with the ``sessionID`` sent +by the ``FixingRoleDecider`` on the other node which then launches the other role in the fixing protocol. Currently +the handler needs to be manually installed in the node. diff --git a/docs/source/index.rst b/docs/source/index.rst index f18a72c5a4..df3e876da5 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -41,6 +41,7 @@ Read on to learn: tutorial-contract protocol-state-machines oracles + event-scheduling .. toctree:: :maxdepth: 2 diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 7d5b00a219..06abdc96b1 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -12,6 +12,8 @@ import com.r3corda.core.node.CityDatabase import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.PhysicalLocation import com.r3corda.core.node.services.* +import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.protocols.ProtocolLogicRefFactory import com.r3corda.core.random63BitValue import com.r3corda.core.seconds import com.r3corda.core.serialization.deserialize @@ -24,6 +26,8 @@ import com.r3corda.node.services.api.MonitoringService import com.r3corda.node.services.api.ServiceHubInternal import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.node.services.config.NodeConfiguration +import com.r3corda.node.services.events.NodeSchedulerService +import com.r3corda.node.services.events.ScheduledActivityObserver import com.r3corda.node.services.identity.InMemoryIdentityService import com.r3corda.node.services.keys.E2ETestKeyManagementService import com.r3corda.node.services.network.InMemoryNetworkMapCache @@ -48,7 +52,6 @@ import java.nio.file.Path import java.security.KeyPair import java.security.Security import java.time.Clock -import java.time.Instant import java.util.* /** @@ -89,9 +92,17 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, override val walletService: WalletService get() = wallet override val keyManagementService: KeyManagementService get() = keyManagement override val identityService: IdentityService get() = identity - override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) + override val schedulerService: SchedulerService get() = scheduler override val clock: Clock = platformClock + // Internal only + override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) + override val protocolLogicRefFactory = ProtocolLogicRefFactory() + + override fun startProtocol(loggerName: String, logic: ProtocolLogic): ListenableFuture { + return smm.add(loggerName, logic) + } + override fun recordTransactions(txs: Iterable) = recordTransactionsInternal(storage, txs) } @@ -112,6 +123,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, lateinit var identity: IdentityService lateinit var net: MessagingService lateinit var api: APIServer + lateinit var scheduler: SchedulerService var isPreviousCheckpointsPresent = false private set @@ -140,7 +152,11 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, // the identity key. But the infrastructure to make that easy isn't here yet. keyManagement = E2ETestKeyManagementService(setOf(storage.myLegalIdentityKey)) api = APIServerImpl(this) - smm = StateMachineManager(services, listOf(storage, net, wallet, keyManagement, identity, platformClock), checkpointStorage, serverThread) + scheduler = NodeSchedulerService(services) + smm = StateMachineManager(services, + listOf(storage, net, wallet, keyManagement, identity, platformClock, scheduler, interestRatesService), + checkpointStorage, + serverThread) // This object doesn't need to be referenced from this class because it registers handlers on the network // service and so that keeps it from being collected. @@ -154,6 +170,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, ANSIProgressObserver(smm) // Add wallet observers CashBalanceAsMetricsObserver(services) + ScheduledActivityObserver(services) startMessagingService() _networkMapRegistrationFuture.setFuture(registerWithNetworkMap()) @@ -210,7 +227,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, private fun updateRegistration(serviceInfo: NodeInfo, type: AddOrRemove): ListenableFuture { // Register this node against the network - val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD + val expires = platformClock.instant() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD val reg = NodeRegistration(info, networkMapSeq++, type, expires) val sessionID = random63BitValue() val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID) @@ -227,7 +244,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, } open protected fun makeNetworkMapService() { - val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD + val expires = platformClock.instant() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD val reg = NodeRegistration(info, Long.MAX_VALUE, AddOrRemove.ADD, expires) inNodeNetworkMapService = InMemoryNetworkMapService(net, reg, services.networkMapCache) } diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/IRSSimulation.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/IRSSimulation.kt index 6c25d57130..d3bf112590 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/IRSSimulation.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/IRSSimulation.kt @@ -15,6 +15,7 @@ import com.r3corda.core.node.services.linearHeadsOfType import com.r3corda.core.node.services.testing.MockIdentityService import com.r3corda.core.random63BitValue import com.r3corda.core.success +import com.r3corda.node.services.FixingSessionInitiationHandler import com.r3corda.node.services.network.InMemoryMessagingNetwork import com.r3corda.node.utilities.JsonSupport import com.r3corda.protocols.TwoPartyDealProtocol @@ -30,7 +31,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten val om = JsonSupport.createDefaultMapper(MockIdentityService(network.identities)) init { - currentDay = LocalDate.of(2016, 3, 10) // Should be 12th but the actual first fixing date gets rolled backwards. + currentDateAndTime = LocalDate.of(2016, 3, 8).atStartOfDay() } private var nodeAKey: KeyPair? = null @@ -39,6 +40,11 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten private val executeOnNextIteration = Collections.synchronizedList(LinkedList<() -> Unit>()) override fun startMainSimulation(): ListenableFuture { + + // TODO: until we have general session initiation + FixingSessionInitiationHandler.register(banks[0]) + FixingSessionInitiationHandler.register(banks[1]) + val future = SettableFuture.create() nodeAKey = banks[0].keyManagement.freshKey() @@ -80,7 +86,6 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten val node1: SimulatedNode = banks[i] val node2: SimulatedNode = banks[j] - val sessionID = random63BitValue() val swaps: Map> = node1.services.walletService.linearHeadsOfType() val theDealRef: StateAndRef = swaps.values.single() @@ -89,30 +94,23 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten extraNodeLabels[node1] = "Fixing event on $nextFixingDate" extraNodeLabels[node2] = "Fixing event on $nextFixingDate" - // For some reason the first fix is always before the effective date. - if (nextFixingDate > currentDay) - currentDay = nextFixingDate - - val sideA = TwoPartyDealProtocol.Floater(node2.net.myAddress, sessionID, notary.info, theDealRef, nodeAKey!!, sessionID) - val sideB = TwoPartyDealProtocol.Fixer(node1.net.myAddress, notary.info.identity, theDealRef, sessionID) - - linkConsensus(listOf(node1, node2, regulators[0]), sideB) - linkProtocolProgress(node1, sideA) - linkProtocolProgress(node2, sideB) - - // We have to start the protocols in separate iterations, as adding to the SMM effectively 'iterates' that node - // in the simulation, so if we don't do this then the two sides seem to act simultaneously. - val retFuture = SettableFuture.create() - val futA = node1.smm.add("floater", sideA) - val futB = node2.smm.add("fixer", sideB) - executeOnNextIteration += { - Futures.allAsList(futA, futB) success { - retFuture.set(null) - } failure { throwable -> - retFuture.setException(throwable) - } + // Complete the future when the state has been consumed on both nodes + val futA = node1.services.walletService.whenConsumed(theDealRef.ref) + val futB = node2.services.walletService.whenConsumed(theDealRef.ref) + Futures.allAsList(futA, futB) success { + retFuture.set(null) + } failure { throwable -> + retFuture.setException(throwable) } + + showProgressFor(listOf(node1, node2)) + showConsensusFor(listOf(node1, node2, regulators[0])) + + // For some reason the first fix is always before the effective date. + if (nextFixingDate > currentDateAndTime.toLocalDate()) + currentDateAndTime = nextFixingDate.atTime(15, 0) + return retFuture } @@ -132,13 +130,11 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten val sessionID = random63BitValue() - val instigator = TwoPartyDealProtocol.Instigator(node2.net.myAddress, notary.info, irs, nodeAKey!!, sessionID) + val instigator = TwoPartyDealProtocol.Instigator(node2.net.myAddress, notary.info.identity, irs, nodeAKey!!, sessionID) val acceptor = TwoPartyDealProtocol.Acceptor(node1.net.myAddress, notary.info.identity, irs, sessionID) - // TODO: Eliminate the need for linkProtocolProgress - linkConsensus(listOf(node1, node2, regulators[0]), acceptor) - linkProtocolProgress(node1, instigator) - linkProtocolProgress(node2, acceptor) + showProgressFor(listOf(node1, node2)) + showConsensusFor(listOf(node1, node2, regulators[0])) val instigatorFuture: ListenableFuture = node1.smm.add("instigator", instigator) diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/Simulation.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/Simulation.kt index 2ab72e33b6..8227c1b885 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/Simulation.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/Simulation.kt @@ -1,6 +1,5 @@ package com.r3corda.node.internal.testing -import com.google.common.base.Function import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.node.CityDatabase @@ -15,11 +14,14 @@ import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.network.InMemoryMessagingNetwork import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.transactions.SimpleNotaryService +import com.r3corda.node.utilities.AddOrRemove import rx.Observable import rx.subjects.PublishSubject import java.nio.file.Path import java.security.KeyPair import java.time.LocalDate +import java.time.LocalDateTime +import java.time.ZoneOffset import java.util.* /** @@ -145,6 +147,8 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, val serviceProviders: List = listOf(notary, ratesOracle, networkMap) val banks: List = bankFactory.createAll() + val clocks = (serviceProviders + regulators + banks).map { it.services.clock as TestClock } + private val _allProtocolSteps = PublishSubject.create>() private val _doneSteps = PublishSubject.create>() val allProtocolSteps: Observable> = _allProtocolSteps @@ -156,14 +160,21 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, * The current simulated date. By default this never changes. If you want it to change, you should do so from * within your overridden [iterate] call. Changes in the current day surface in the [dateChanges] observable. */ - var currentDay: LocalDate = LocalDate.now() + var currentDateAndTime: LocalDateTime = LocalDate.now().atStartOfDay() protected set(value) { field = value _dateChanges.onNext(value) } - private val _dateChanges = PublishSubject.create() - val dateChanges: Observable = _dateChanges + private val _dateChanges = PublishSubject.create() + val dateChanges: Observable get() = _dateChanges + + init { + // Advance node clocks when current time is changed + dateChanges.subscribe { + clocks.setTo(currentDateAndTime.toInstant(ZoneOffset.UTC)) + } + } /** * A place for simulations to stash human meaningful text about what the node is "thinking", which might appear @@ -201,7 +212,15 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, return null } - protected fun linkProtocolProgress(node: SimulatedNode, protocol: ProtocolLogic<*>) { + protected fun showProgressFor(nodes: List) { + nodes.forEach { node -> + node.smm.changes.filter { it.second == AddOrRemove.ADD }.first().subscribe { + linkProtocolProgress(node, it.first) + } + } + } + + private fun linkProtocolProgress(node: SimulatedNode, protocol: ProtocolLogic<*>) { val pt = protocol.progressTracker ?: return pt.changes.subscribe { change: ProgressTracker.Change -> // Runs on node thread. @@ -211,7 +230,15 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, _allProtocolSteps.onNext(Pair(node, ProgressTracker.Change.Position(pt, pt.steps[1]))) } - protected fun linkConsensus(nodes: Collection, protocol: ProtocolLogic<*>) { + + protected fun showConsensusFor(nodes: List) { + val node = nodes.first() + node.smm.changes.filter { it.second == AddOrRemove.ADD }.first().subscribe { + linkConsensus(nodes, it.first) + } + } + + private fun linkConsensus(nodes: Collection, protocol: ProtocolLogic<*>) { protocol.progressTracker?.changes?.subscribe { change: ProgressTracker.Change -> // Runs on node thread. if (protocol.progressTracker!!.currentStep == ProgressTracker.DONE) { diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/TestClock.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/TestClock.kt index 009c4f0d3c..baa7085c21 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/TestClock.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/TestClock.kt @@ -49,3 +49,9 @@ class TestClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableC return delegateClock.zone } } + +/** + * A helper method to set several [TestClock]s to approximately the same time. The clocks may drift by the time it + * takes for each [TestClock] to have it's time set and any observers to execute. + */ +fun Iterable.setTo(instant: Instant) = this.forEach { it.setTo(instant) } diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt index 571eb13252..1acf61bc08 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt @@ -49,9 +49,8 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo val sellerProtocol = TwoPartyTradeProtocol.Seller(buyer.net.myAddress, notary.info, issuance.tx.outRef(0), amount, seller.storage.myLegalIdentityKey, sessionID) - linkConsensus(listOf(buyer, seller, notary), sellerProtocol) - linkProtocolProgress(buyer, buyerProtocol) - linkProtocolProgress(seller, sellerProtocol) + showConsensusFor(listOf(buyer, seller, notary)) + showProgressFor(listOf(buyer, seller)) val buyerFuture = buyer.smm.add("bank.$buyerBankIndex.${TwoPartyTradeProtocol.TRADE_TOPIC}.buyer", buyerProtocol) val sellerFuture = seller.smm.add("bank.$sellerBankIndex.${TwoPartyTradeProtocol.TRADE_TOPIC}.seller", sellerProtocol) diff --git a/node/src/main/kotlin/com/r3corda/node/services/FixingSessionInitiationHandler.kt b/node/src/main/kotlin/com/r3corda/node/services/FixingSessionInitiationHandler.kt new file mode 100644 index 0000000000..102ded3db8 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/FixingSessionInitiationHandler.kt @@ -0,0 +1,22 @@ +package com.r3corda.node.services + +import com.r3corda.core.serialization.deserialize +import com.r3corda.node.internal.AbstractNode +import com.r3corda.protocols.TwoPartyDealProtocol + +/** + * This is a temporary handler required for establishing random sessionIDs for the [Fixer] and [Floater] as part of + * running scheduled fixings for the [InterestRateSwap] contract. + * + * TODO: This will be replaced with the automatic sessionID / session setup work. + */ +object FixingSessionInitiationHandler { + + fun register(node: AbstractNode) { + node.net.addMessageHandler("${TwoPartyDealProtocol.FIX_INITIATE_TOPIC}.0") { msg, registration -> + val initiation = msg.data.deserialize() + val protocol = TwoPartyDealProtocol.Fixer(initiation) + node.smm.add("fixings", protocol) + } + } +} diff --git a/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt index 8fc9ce4d8e..e206a28644 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt @@ -1,11 +1,15 @@ package com.r3corda.node.services.api +import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.TxWritableStorageService +import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.protocols.ProtocolLogicRefFactory abstract class ServiceHubInternal : ServiceHub { abstract val monitoringService: MonitoringService + abstract val protocolLogicRefFactory: ProtocolLogicRefFactory /** * Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then @@ -18,4 +22,11 @@ abstract class ServiceHubInternal : ServiceHub { txs.forEach { writableStorageService.validatedTransactions.addTransaction(it) } walletService.notifyAll(txs.map { it.tx }) } + + /** + * TODO: borrowing this method from service manager work in another branch. It's required to avoid circular dependency + * between SMM and the scheduler. That particular problem should also be resolved by the service manager work + * itself, at which point this method would not be needed (by the scheduler) + */ + abstract fun startProtocol(loggerName: String, logic: ProtocolLogic): ListenableFuture } \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/services/clientapi/NodeInterestRates.kt b/node/src/main/kotlin/com/r3corda/node/services/clientapi/NodeInterestRates.kt index 19f7b9449b..83102bd4ff 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/clientapi/NodeInterestRates.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/clientapi/NodeInterestRates.kt @@ -1,5 +1,7 @@ package com.r3corda.node.services.clientapi +import co.paralleluniverse.fibers.Suspendable +import com.r3corda.core.RetryableException import com.r3corda.core.contracts.* import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.Party @@ -8,14 +10,19 @@ import com.r3corda.core.math.CubicSplineInterpolator import com.r3corda.core.math.Interpolator import com.r3corda.core.math.InterpolatorFactory import com.r3corda.core.node.services.ServiceType +import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.utilities.ProgressTracker import com.r3corda.node.internal.AbstractNode import com.r3corda.node.services.api.AbstractNodeService import com.r3corda.node.services.api.AcceptsFileUpload -import org.slf4j.LoggerFactory +import com.r3corda.node.utilities.FiberBox import com.r3corda.protocols.RatesFixProtocol +import org.slf4j.LoggerFactory import java.io.InputStream import java.math.BigDecimal import java.security.KeyPair +import java.time.Clock +import java.time.Instant import java.time.LocalDate import java.util.* import javax.annotation.concurrent.ThreadSafe @@ -36,7 +43,7 @@ object NodeInterestRates { */ class Service(node: AbstractNode) : AcceptsFileUpload, AbstractNodeService(node.services.networkService) { val ss = node.services.storageService - val oracle = NodeInterestRates.Oracle(ss.myLegalIdentity, ss.myLegalIdentityKey) + val oracle = Oracle(ss.myLegalIdentity, ss.myLegalIdentityKey, node.services.clock) private val logger = LoggerFactory.getLogger(Service::class.java) @@ -46,11 +53,42 @@ object NodeInterestRates { { message, e -> logger.error("Exception during interest rate oracle request processing", e) } ) addMessageHandler(RatesFixProtocol.TOPIC_QUERY, - { req: RatesFixProtocol.QueryRequest -> oracle.query(req.queries) }, + { req: RatesFixProtocol.QueryRequest -> + /** + * We put this into a protocol so that if it blocks waiting for the interest rate to become + * available, we a) don't block this thread and b) allow the fact we are waiting + * to be persisted/checkpointed. + * Interest rates become available when they are uploaded via the web as per [DataUploadServlet], + * if they haven't already been uploaded that way. + */ + node.smm.add("fixing", FixQueryHandler(this, req)) + return@addMessageHandler + }, { message, e -> logger.error("Exception during interest rate oracle request processing", e) } ) } + private class FixQueryHandler(val service: Service, val request: RatesFixProtocol.QueryRequest) : ProtocolLogic() { + companion object { + object RECEIVED : ProgressTracker.Step("Received fix request") + + object SENDING : ProgressTracker.Step("Sending fix response") + } + override val progressTracker = ProgressTracker(RECEIVED, SENDING) + + init { + progressTracker.currentStep = RECEIVED + } + + @Suspendable + override fun call(): Unit { + val answers = service.oracle.query(request.queries, request.deadline) + progressTracker.currentStep = SENDING + send("${RatesFixProtocol.TOPIC}.query", request.replyTo, request.sessionID!!, answers) + } + + } + // File upload support override val dataTypePrefix = "interest-rates" override val acceptableFileExtensions = listOf(".rates", ".txt") @@ -73,25 +111,44 @@ object NodeInterestRates { * The oracle will try to interpolate the missing value of a tenor for the given fix name and date. */ @ThreadSafe - class Oracle(val identity: Party, private val signingKey: KeyPair) { + class Oracle(val identity: Party, private val signingKey: KeyPair, val clock: Clock) { + private class InnerState { + var container: FixContainer = FixContainer(emptyList()) + + } + private val mutex = FiberBox(InnerState()) + + var knownFixes: FixContainer + set(value) { + require(value.size > 0) + mutex.write { + container = value + } + } + get() = mutex.read { container } + + // Make this the last bit of initialisation logic so fully constructed when entered into instances map init { require(signingKey.public == identity.owningKey) } - @Volatile var knownFixes = FixContainer(emptyList()) - set(value) { - require(value.size > 0) - field = value - } - - fun query(queries: List): List { + /** + * This method will now wait until the given deadline if the fix for the given [FixOf] is not immediately + * available. To implement this, [readWithDeadline] will loop if the deadline is not reached and we throw + * [UnknownFix] as it implements [RetryableException] which has special meaning to this function. + */ + @Suspendable + fun query(queries: List, deadline: Instant): List { require(queries.isNotEmpty()) - val knownFixes = knownFixes // Snapshot - val answers: List = queries.map { knownFixes[it] } - val firstNull = answers.indexOf(null) - if (firstNull != -1) - throw UnknownFix(queries[firstNull]) - return answers.filterNotNull() + return mutex.readWithDeadline(clock, deadline) { + val answers: List = queries.map { container[it] } + val firstNull = answers.indexOf(null) + if (firstNull != -1) { + throw UnknownFix(queries[firstNull]) + } else { + answers.filterNotNull() + } + } } fun sign(wtx: WireTransaction): DigitalSignature.LegallyIdentifiable { @@ -120,9 +177,8 @@ object NodeInterestRates { } } - class UnknownFix(val fix: FixOf) : Exception() { - override fun toString() = "Unknown fix: $fix" - } + // TODO: can we split into two? Fix not available (retryable/transient) and unknown (permanent) + class UnknownFix(val fix: FixOf) : RetryableException("Unknown fix: $fix") /** Fix container, for every fix name & date pair stores a tenor to interest rate map - [InterpolatingRateMap] */ class FixContainer(val fixes: List, val factory: InterpolatorFactory = CubicSplineInterpolator) { diff --git a/node/src/main/kotlin/com/r3corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/com/r3corda/node/services/events/NodeSchedulerService.kt new file mode 100644 index 0000000000..dd6849d942 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/events/NodeSchedulerService.kt @@ -0,0 +1,177 @@ +package com.r3corda.node.services.events + +import com.google.common.util.concurrent.SettableFuture +import com.r3corda.core.ThreadBox +import com.r3corda.core.contracts.SchedulableState +import com.r3corda.core.contracts.ScheduledStateRef +import com.r3corda.core.contracts.StateRef +import com.r3corda.core.node.services.SchedulerService +import com.r3corda.core.protocols.ProtocolLogicRefFactory +import com.r3corda.core.serialization.SingletonSerializeAsToken +import com.r3corda.core.utilities.loggerFor +import com.r3corda.core.utilities.trace +import com.r3corda.node.services.api.ServiceHubInternal +import com.r3corda.node.utilities.awaitWithDeadline +import java.time.Instant +import java.util.* +import java.util.concurrent.Executor +import java.util.concurrent.Executors +import javax.annotation.concurrent.ThreadSafe + +/** + * A first pass of a simple [SchedulerService] that works with [MutableClock]s for testing, demonstrations and simulations + * that also encompasses the [Wallet] observer for processing transactions. + * + * This will observe transactions as they are stored and schedule and unschedule activities based on the States consumed + * or produced. + * + * TODO: Needs extensive support from persistence and protocol frameworks to be truly reliable and atomic. + * + * Currently does not provide any system state other than the ContractState so the expectation is that a transaction + * is the outcome of the activity in order to schedule another activity. Once we have implemented more persistence + * in the nodes, maybe we can consider multiple activities and whether the activities have been completed or not, + * but that starts to sound a lot like off-ledger state. + * + * @param services Core node services. + * @param protocolLogicRefFactory Factory for restoring [ProtocolLogic] instances from references. + * @param schedulerTimerExecutor The executor the scheduler blocks on waiting for the clock to advance to the next + * activity. Only replace this for unit testing purposes. This is not the executor the [ProtocolLogic] is launched on. + */ +@ThreadSafe +class NodeSchedulerService(private val services: ServiceHubInternal, + private val protocolLogicRefFactory: ProtocolLogicRefFactory = ProtocolLogicRefFactory(), + private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor()) +: SchedulerService, SingletonSerializeAsToken() { + + private val log = loggerFor() + + // Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're + // inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference + // to somewhere. + private class InnerState { + // TODO: This has no persistence, and we don't consider initialising from non-empty map if we add persistence. + // If we were to rebuild the wallet at start up by replaying transactions and re-calculating, then + // persistence here would be unnecessary. + var scheduledStates = HashMap() + var earliestState: ScheduledStateRef? = null + var rescheduled: SettableFuture? = null + + internal fun recomputeEarliest() { + earliestState = scheduledStates.map { it.value }.sortedBy { it.scheduledAt }.firstOrNull() + } + } + + private val mutex = ThreadBox(InnerState()) + + override fun scheduleStateActivity(action: ScheduledStateRef) { + log.trace { "Schedule $action" } + mutex.locked { + scheduledStates[action.ref] = action + if (action.scheduledAt.isBefore(earliestState?.scheduledAt ?: Instant.MAX)) { + // We are earliest + earliestState = action + rescheduleWakeUp() + } else if (earliestState?.ref == action.ref && earliestState!!.scheduledAt != action.scheduledAt) { + // We were earliest but might not be any more + recomputeEarliest() + rescheduleWakeUp() + } + } + } + + override fun unscheduleStateActivity(ref: StateRef) { + log.trace { "Unschedule $ref" } + mutex.locked { + val removedAction = scheduledStates.remove(ref) + if (removedAction == earliestState && removedAction != null) { + recomputeEarliest() + rescheduleWakeUp() + } + } + } + + /** + * This method first cancels the [Future] for any pending action so that the [awaitWithDeadline] used below + * drops through without running the action. We then create a new [Future] for the new action (so it too can be + * cancelled), and then await the arrival of the scheduled time. If we reach the scheduled time (the deadline) + * without the [Future] being cancelled then we run the scheduled action. Finally we remove that action from the + * scheduled actions and recompute the next scheduled action. + */ + private fun rescheduleWakeUp() { + // Note, we already have the mutex but we need the scope again here + val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked { + rescheduled?.cancel(false) + rescheduled = SettableFuture.create() + Pair(earliestState, rescheduled!!) + } + if (scheduledState != null) { + schedulerTimerExecutor.execute() { + log.trace { "Scheduling as next $scheduledState" } + // This will block the scheduler single thread until the scheduled time (returns false) OR + // the Future is cancelled due to rescheduling (returns true). + if (!services.clock.awaitWithDeadline(scheduledState.scheduledAt, ourRescheduledFuture)) { + log.trace { "Invoking as next $scheduledState" } + onTimeReached(scheduledState) + } else { + log.trace { "Recheduled $scheduledState" } + } + } + } + } + + private fun onTimeReached(scheduledState: ScheduledStateRef) { + try { + runScheduledActionForState(scheduledState) + } finally { + // Unschedule once complete (or checkpointed) + mutex.locked { + // need to remove us from those scheduled, but only if we are still next + scheduledStates.compute(scheduledState.ref) { ref, value -> + if (value === scheduledState) null else value + } + // and schedule the next one + recomputeEarliest() + rescheduleWakeUp() + } + } + } + + private fun runScheduledActionForState(scheduledState: ScheduledStateRef) { + val txState = services.loadState(scheduledState.ref) + // It's OK to return if it's null as there's nothing scheduled + // TODO: implement sandboxing as necessary + val scheduledActivity = sandbox { + val state = txState.data as SchedulableState + state.nextScheduledActivity(scheduledState.ref, protocolLogicRefFactory) + } ?: return + + if (scheduledActivity.scheduledAt.isAfter(services.clock.instant())) { + // I suppose it might turn out that the action is no longer due (a bug, maybe), so we need to defend against that and re-schedule + // TODO: warn etc + mutex.locked { + // Replace with updated instant + scheduledStates.compute(scheduledState.ref) { ref, value -> + if (value === scheduledState) ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt) else value + } + } + } else { + /** + * TODO: align with protocol invocation via API... make it the same code + * TODO: Persistence and durability issues: + * a) Need to consider potential to run activity twice if restart between here and removing from map if we add persistence + * b) But if remove from map first, there's potential to run zero times if restart + * c) Address by switch to 3rd party scheduler? Only benefit of this impl. is support for DemoClock or other MutableClocks (e.g. for testing) + * TODO: ProtocolLogicRefFactory needs to sort out the class loader etc + */ + val logic = protocolLogicRefFactory.toProtocolLogic(scheduledActivity.logicRef) + log.trace { "Firing ProtocolLogic $logic" } + // TODO: ProtocolLogic should be checkpointed by the time this returns + services.startProtocol("scheduled", logic) + } + } + + // TODO: Does nothing right now, but beware we are calling dynamically loaded code in the contract inside here. + private inline fun sandbox(code: () -> T?): T? { + return code() + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/services/events/ScheduledActivityObserver.kt b/node/src/main/kotlin/com/r3corda/node/services/events/ScheduledActivityObserver.kt new file mode 100644 index 0000000000..be75393bed --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/events/ScheduledActivityObserver.kt @@ -0,0 +1,35 @@ +package com.r3corda.node.services.events + +import com.r3corda.core.contracts.ContractState +import com.r3corda.core.contracts.SchedulableState +import com.r3corda.core.contracts.ScheduledStateRef +import com.r3corda.core.contracts.StateAndRef +import com.r3corda.core.protocols.ProtocolLogicRefFactory +import com.r3corda.node.services.api.ServiceHubInternal + +/** + * This observes the wallet and schedules and unschedules activities appropriately based on state production and + * consumption. + */ +class ScheduledActivityObserver(val services: ServiceHubInternal) { + init { + // TODO: Need to consider failure scenarios. This needs to run if the TX is successfully recorded + services.walletService.updates.subscribe { update -> + update.consumed.forEach { services.schedulerService.unscheduleStateActivity(it) } + update.produced.forEach { scheduleStateActivity(it, services.protocolLogicRefFactory) } + } + } + + private fun scheduleStateActivity(produced: StateAndRef, protocolLogicRefFactory: ProtocolLogicRefFactory) { + val producedState = produced.state.data + if (producedState is SchedulableState) { + val scheduledAt = sandbox { producedState.nextScheduledActivity(produced.ref, protocolLogicRefFactory)?.scheduledAt } ?: return + services.schedulerService.scheduleStateActivity(ScheduledStateRef(produced.ref, scheduledAt)) + } + } + + // TODO: Beware we are calling dynamically loaded contract code inside here. + private inline fun sandbox(code: () -> T?): T? { + return code() + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt b/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt index b600350ad8..72bd57df44 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt @@ -2,9 +2,12 @@ package com.r3corda.node.services import com.codahale.metrics.MetricRegistry import com.r3corda.core.contracts.SignedTransaction +import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.services.* import com.r3corda.core.node.services.testing.MockStorageService +import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.protocols.ProtocolLogicRefFactory import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE import com.r3corda.node.serialization.NodeClock import com.r3corda.node.services.api.MonitoringService @@ -12,6 +15,7 @@ import com.r3corda.node.services.api.ServiceHubInternal import com.r3corda.node.services.network.MockNetworkMapCache import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.persistence.DataVendingService +import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.services.wallet.NodeWalletService import java.time.Clock @@ -23,10 +27,11 @@ open class MockServices( val storage: TxWritableStorageService? = MockStorageService(), val mapCache: NetworkMapCache? = MockNetworkMapCache(), val mapService: NetworkMapService? = null, - val overrideClock: Clock? = NodeClock() + val scheduler: SchedulerService? = null, + val overrideClock: Clock? = NodeClock(), + val protocolFactory: ProtocolLogicRefFactory? = ProtocolLogicRefFactory() ) : ServiceHubInternal() { override val walletService: WalletService = customWallet ?: NodeWalletService(this) - override val keyManagementService: KeyManagementService get() = keyManagement ?: throw UnsupportedOperationException() override val identityService: IdentityService @@ -37,10 +42,15 @@ open class MockServices( get() = mapCache ?: throw UnsupportedOperationException() override val storageService: StorageService get() = storage ?: throw UnsupportedOperationException() + override val schedulerService: SchedulerService + get() = scheduler ?: throw UnsupportedOperationException() override val clock: Clock get() = overrideClock ?: throw UnsupportedOperationException() override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) + override val protocolLogicRefFactory: ProtocolLogicRefFactory + get() = protocolFactory ?: throw UnsupportedOperationException() + // We isolate the storage service with writable TXes so that it can't be accessed except via recordTransactions() private val txStorageService: TxWritableStorageService get() = storage ?: throw UnsupportedOperationException() @@ -48,6 +58,12 @@ open class MockServices( override fun recordTransactions(txs: Iterable) = recordTransactionsInternal(txStorageService, txs) + lateinit var smm: StateMachineManager + + override fun startProtocol(loggerName: String, logic: ProtocolLogic): ListenableFuture { + return smm.add(loggerName, logic) + } + init { if (net != null && storage != null) { // Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener diff --git a/node/src/test/kotlin/com/r3corda/node/services/NodeInterestRatesTest.kt b/node/src/test/kotlin/com/r3corda/node/services/NodeInterestRatesTest.kt index 4062170da1..983db252d8 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/NodeInterestRatesTest.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/NodeInterestRatesTest.kt @@ -21,6 +21,8 @@ import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.protocols.RatesFixProtocol import org.junit.Assert import org.junit.Test +import java.time.Clock +import java.time.Duration import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -37,11 +39,12 @@ class NodeInterestRatesTest { val DUMMY_CASH_ISSUER_KEY = generateKeyPair() val DUMMY_CASH_ISSUER = Party("Cash issuer", DUMMY_CASH_ISSUER_KEY.public) - val oracle = NodeInterestRates.Oracle(MEGA_CORP, MEGA_CORP_KEY).apply { knownFixes = TEST_DATA } + val clock = Clock.systemUTC() + val oracle = NodeInterestRates.Oracle(MEGA_CORP, MEGA_CORP_KEY, clock).apply { knownFixes = TEST_DATA } @Test fun `query successfully`() { val q = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M") - val res = oracle.query(listOf(q)) + val res = oracle.query(listOf(q), clock.instant()) assertEquals(1, res.size) assertEquals("0.678".bd, res[0].value) assertEquals(q, res[0].of) @@ -50,13 +53,13 @@ class NodeInterestRatesTest { @Test fun `query with one success and one missing`() { val q1 = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M") val q2 = NodeInterestRates.parseFixOf("LIBOR 2016-03-15 1M") - val e = assertFailsWith { oracle.query(listOf(q1, q2)) } + val e = assertFailsWith { oracle.query(listOf(q1, q2), clock.instant()) } assertEquals(e.fix, q2) } @Test fun `query successfully with interpolated rate`() { val q = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 5M") - val res = oracle.query(listOf(q)) + val res = oracle.query(listOf(q), clock.instant()) assertEquals(1, res.size) Assert.assertEquals(0.7316228, res[0].value.toDouble(), 0.0000001) assertEquals(q, res[0].of) @@ -64,11 +67,11 @@ class NodeInterestRatesTest { @Test fun `rate missing and unable to interpolate`() { val q = NodeInterestRates.parseFixOf("EURIBOR 2016-03-15 3M") - assertFailsWith { oracle.query(listOf(q)) } + assertFailsWith { oracle.query(listOf(q), clock.instant()) } } @Test fun `empty query`() { - assertFailsWith { oracle.query(emptyList()) } + assertFailsWith { oracle.query(emptyList(), clock.instant()) } } @Test fun `refuse to sign with no relevant commands`() { @@ -80,7 +83,7 @@ class NodeInterestRatesTest { @Test fun `sign successfully`() { val tx = makeTX() - val fix = oracle.query(listOf(NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M"))).first() + val fix = oracle.query(listOf(NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M")), clock.instant()).first() tx.addCommand(fix, oracle.identity.owningKey) // Sign successfully. @@ -106,7 +109,7 @@ class NodeInterestRatesTest { val tx = TransactionType.General.Builder() val fixOf = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M") - val protocol = RatesFixProtocol(tx, n2.info, fixOf, "0.675".bd, "0.1".bd) + val protocol = RatesFixProtocol(tx, n2.info, fixOf, "0.675".bd, "0.1".bd, Duration.ofNanos(1)) BriefLogFormatter.initVerbose("rates") val future = n1.smm.add("rates", protocol) diff --git a/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt new file mode 100644 index 0000000000..20a7d14edb --- /dev/null +++ b/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt @@ -0,0 +1,247 @@ +package com.r3corda.node.services + +import com.google.common.jimfs.Configuration +import com.google.common.jimfs.Jimfs +import com.r3corda.core.contracts.* +import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.days +import com.r3corda.core.node.ServiceHub +import com.r3corda.core.node.services.testing.MockKeyManagementService +import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.protocols.ProtocolLogicRef +import com.r3corda.core.protocols.ProtocolLogicRefFactory +import com.r3corda.core.serialization.SingletonSerializeAsToken +import com.r3corda.core.testing.ALICE_KEY +import com.r3corda.core.testing.DUMMY_NOTARY +import com.r3corda.node.internal.testing.TestClock +import com.r3corda.node.services.events.NodeSchedulerService +import com.r3corda.node.services.network.InMemoryMessagingNetwork +import com.r3corda.node.services.persistence.PerFileCheckpointStorage +import com.r3corda.node.services.statemachine.StateMachineManager +import com.r3corda.node.utilities.AffinityExecutor +import org.assertj.core.api.Assertions.assertThat +import org.junit.Before +import org.junit.Test +import java.nio.file.FileSystem +import java.security.PublicKey +import java.time.Clock +import java.time.Instant +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +class NodeSchedulerServiceTest : SingletonSerializeAsToken() { + // Use an in memory file system for testing attachment storage. + val fs: FileSystem = Jimfs.newFileSystem(Configuration.unix()) + + val realClock: Clock = Clock.systemUTC() + val stoppedClock = Clock.fixed(realClock.instant(), realClock.zone) + val testClock = TestClock(stoppedClock) + + val smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1) + val schedulerGatedExecutor = AffinityExecutor.Gate(true) + + // We have to allow Java boxed primitives but Kotlin warns we shouldn't be using them + @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") + val factory = ProtocolLogicRefFactory(setOf(TestProtocolLogic::class.java.name), setOf(NodeSchedulerServiceTest::class.java.name, Integer::class.java.name)) + + val scheduler: NodeSchedulerService + val services: ServiceHub + + /** + * Have a reference to this test added to [ServiceHub] so that when the [ProtocolLogic] runs it can access the test instance. + * The [TestState] is serialized and deserialized so attempting to use a transient field won't work, as it just + * results in NPE. + */ + interface TestReference { + val testReference: NodeSchedulerServiceTest + } + + init { + val kms = MockKeyManagementService(ALICE_KEY) + val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None")) + val mockServices = object : MockServices(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference { + override val testReference = this@NodeSchedulerServiceTest + } + services = mockServices + scheduler = NodeSchedulerService(mockServices, factory, schedulerGatedExecutor) + val mockSMM = StateMachineManager(mockServices, listOf(mockServices), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor) + mockServices.smm = mockSMM + } + + lateinit var countDown: CountDownLatch + var calls: Int = 0 + + @Before + fun setup() { + countDown = CountDownLatch(1) + calls = 0 + } + + class TestState(val protocolLogicRef: ProtocolLogicRef, val instant: Instant) : LinearState, SchedulableState { + override val participants: List + get() = throw UnsupportedOperationException() + + override val thread = SecureHash.sha256("does not matter but we need it to be unique ${Math.random()}") + + override fun isRelevant(ourKeys: Set): Boolean = true + + override fun nextScheduledActivity(thisStateRef: StateRef, protocolLogicRefFactory: ProtocolLogicRefFactory): ScheduledActivity? = ScheduledActivity(protocolLogicRef, instant) + + override val contract: Contract + get() = throw UnsupportedOperationException() + } + + class TestProtocolLogic(val increment: Int = 1) : ProtocolLogic() { + override fun call() { + (serviceHub as TestReference).testReference.calls += increment + (serviceHub as TestReference).testReference.countDown.countDown() + } + } + + class Command : TypeOnlyCommandData() + + @Test + fun `test activity due now`() { + val time = stoppedClock.instant() + scheduleTX(time) + + assertThat(calls).isEqualTo(0) + schedulerGatedExecutor.waitAndRun() + countDown.await(60, TimeUnit.SECONDS) + assertThat(calls).isEqualTo(1) + } + + @Test + fun `test activity due in the past`() { + val time = stoppedClock.instant() - 1.days + scheduleTX(time) + + assertThat(calls).isEqualTo(0) + schedulerGatedExecutor.waitAndRun() + countDown.await(60, TimeUnit.SECONDS) + assertThat(calls).isEqualTo(1) + } + + @Test + fun `test activity due in the future`() { + val time = stoppedClock.instant() + 1.days + scheduleTX(time) + + val backgroundExecutor = Executors.newSingleThreadExecutor() + backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } + assertThat(calls).isEqualTo(0) + testClock.advanceBy(1.days) + backgroundExecutor.shutdown() + backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS) + countDown.await(60, TimeUnit.SECONDS) + assertThat(calls).isEqualTo(1) + } + + @Test + fun `test activity due in the future and schedule another earlier`() { + val time = stoppedClock.instant() + 1.days + scheduleTX(time + 1.days) + + val backgroundExecutor = Executors.newSingleThreadExecutor() + backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } + assertThat(calls).isEqualTo(0) + scheduleTX(time, 3) + + backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } + testClock.advanceBy(1.days) + countDown.await(60, TimeUnit.SECONDS) + assertThat(calls).isEqualTo(3) + backgroundExecutor.shutdown() + backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS) + } + + @Test + fun `test activity due in the future and schedule another later`() { + val time = stoppedClock.instant() + 1.days + scheduleTX(time) + + val backgroundExecutor = Executors.newSingleThreadExecutor() + backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } + assertThat(calls).isEqualTo(0) + scheduleTX(time + 1.days, 3) + + backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } + testClock.advanceBy(1.days) + countDown.await(60, TimeUnit.SECONDS) + assertThat(calls).isEqualTo(1) + testClock.advanceBy(1.days) + backgroundExecutor.shutdown() + backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS) + } + + @Test + fun `test activity due in the future and schedule another for same time`() { + val time = stoppedClock.instant() + 1.days + scheduleTX(time) + + val backgroundExecutor = Executors.newSingleThreadExecutor() + backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } + assertThat(calls).isEqualTo(0) + scheduleTX(time, 3) + + testClock.advanceBy(1.days) + countDown.await(60, TimeUnit.SECONDS) + assertThat(calls).isEqualTo(1) + backgroundExecutor.shutdown() + backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS) + } + + @Test + fun `test activity due in the future and schedule another for same time then unschedule original`() { + val time = stoppedClock.instant() + 1.days + var scheduledRef1 = scheduleTX(time) + + val backgroundExecutor = Executors.newSingleThreadExecutor() + backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } + assertThat(calls).isEqualTo(0) + scheduleTX(time, 3) + + backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } + scheduler.unscheduleStateActivity(scheduledRef1!!.ref) + testClock.advanceBy(1.days) + countDown.await(60, TimeUnit.SECONDS) + assertThat(calls).isEqualTo(3) + backgroundExecutor.shutdown() + backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS) + } + + @Test + fun `test activity due in the future then unschedule`() { + var scheduledRef1 = scheduleTX(stoppedClock.instant() + 1.days) + + val backgroundExecutor = Executors.newSingleThreadExecutor() + backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } + assertThat(calls).isEqualTo(0) + + scheduler.unscheduleStateActivity(scheduledRef1!!.ref) + testClock.advanceBy(1.days) + assertThat(calls).isEqualTo(0) + backgroundExecutor.shutdown() + backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS) + } + + private fun scheduleTX(instant: Instant, increment: Int = 1): ScheduledStateRef? { + var scheduledRef: ScheduledStateRef? = null + apply { + val freshKey = services.keyManagementService.freshKey() + val state = TestState(factory.create(TestProtocolLogic::class.java, increment), instant) + val usefulTX = TransactionType.General.Builder(DUMMY_NOTARY).apply { + addOutputState(state) + addCommand(Command(), freshKey.public) + signWith(freshKey) + }.toSignedTransaction() + val txHash = usefulTX.id + + services.recordTransactions(usefulTX) + scheduledRef = ScheduledStateRef(StateRef(txHash, 0), state.instant) + scheduler.scheduleStateActivity(scheduledRef!!) + } + return scheduledRef + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt index e1d18d390a..57a9bc63cf 100644 --- a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt @@ -1,18 +1,11 @@ package com.r3corda.demos import com.google.common.net.HostAndPort -import com.typesafe.config.ConfigFactory import com.r3corda.core.crypto.Party import com.r3corda.core.logElapsedTime import com.r3corda.core.messaging.SingleMessageRecipient -import com.r3corda.node.internal.Node -import com.r3corda.node.services.config.NodeConfiguration -import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.core.node.NodeInfo -import com.r3corda.node.services.network.NetworkMapService -import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.core.node.services.ServiceType -import com.r3corda.node.services.messaging.ArtemisMessagingService import com.r3corda.core.serialization.deserialize import com.r3corda.core.utilities.BriefLogFormatter import com.r3corda.demos.api.InterestRateSwapAPI @@ -20,13 +13,23 @@ import com.r3corda.demos.protocols.AutoOfferProtocol import com.r3corda.demos.protocols.ExitServerProtocol import com.r3corda.demos.protocols.UpdateBusinessDayProtocol import com.r3corda.node.internal.AbstractNode +import com.r3corda.node.internal.Node import com.r3corda.node.internal.testing.MockNetwork +import com.r3corda.node.services.FixingSessionInitiationHandler +import com.r3corda.node.services.clientapi.NodeInterestRates +import com.r3corda.node.services.config.NodeConfiguration +import com.r3corda.node.services.config.NodeConfigurationFromConfig +import com.r3corda.node.services.messaging.ArtemisMessagingService +import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.transactions.SimpleNotaryService +import com.typesafe.config.ConfigFactory import joptsimple.OptionParser import joptsimple.OptionSet +import org.apache.commons.io.IOUtils import java.io.DataOutputStream import java.io.File import java.net.HttpURLConnection +import java.net.SocketTimeoutException import java.net.URL import java.nio.file.Files import java.nio.file.Path @@ -34,8 +37,6 @@ import java.nio.file.Paths import java.util.* import kotlin.concurrent.fixedRateTimer import kotlin.system.exitProcess -import org.apache.commons.io.IOUtils -import java.net.SocketTimeoutException // IRS DEMO // @@ -304,6 +305,7 @@ private fun runNode(cliParams: CliParams.RunNode): Int { AutoOfferProtocol.Handler.register(node) UpdateBusinessDayProtocol.Handler.register(node) ExitServerProtocol.Handler.register(node) + FixingSessionInitiationHandler.register(node) if (cliParams.uploadRates) { runUploadRates(cliParams.apiAddress) diff --git a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt index 97bbbf2e4c..007cc51e36 100644 --- a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt @@ -4,6 +4,7 @@ import com.google.common.net.HostAndPort import com.r3corda.contracts.cash.Cash import com.r3corda.core.contracts.* import com.r3corda.core.crypto.Party +import com.r3corda.core.hours import com.r3corda.core.logElapsedTime import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.ServiceType @@ -83,7 +84,7 @@ fun main(args: Array) { // Make a garbage transaction that includes a rate fix. val tx = TransactionType.General.Builder() tx.addOutputState(TransactionState(Cash.State(1500.DOLLARS `issued by` node.storage.myLegalIdentity.ref(1), node.keyManagement.freshKey().public), notary.identity)) - val protocol = RatesFixProtocol(tx, oracleNode, fixOf, expectedRate, rateTolerance) + val protocol = RatesFixProtocol(tx, oracleNode, fixOf, expectedRate, rateTolerance, 24.hours) node.smm.add("demo.ratefix", protocol).get() node.stop() diff --git a/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt b/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt index c90561608c..2be25b79b4 100644 --- a/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt +++ b/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt @@ -25,6 +25,7 @@ object AutoOfferProtocol { val TOPIC = "autooffer.topic" data class AutoOfferMessage(val otherSide: SingleMessageRecipient, + val notary: Party, val otherSessionID: Long, val dealBeingOffered: DealState) object Handler { @@ -53,9 +54,7 @@ object AutoOfferProtocol { val autoOfferMessage = msg.data.deserialize() // Put the deal onto the ledger progressTracker.currentStep = DEALING - // TODO required as messaging layer does not currently queue messages that arrive before we expect them - Thread.sleep(100) - val seller = TwoPartyDealProtocol.Instigator(autoOfferMessage.otherSide, node.services.networkMapCache.notaryNodes.first(), + val seller = TwoPartyDealProtocol.Instigator(autoOfferMessage.otherSide, autoOfferMessage.notary, autoOfferMessage.dealBeingOffered, node.services.keyManagementService.freshKey(), autoOfferMessage.otherSessionID, progressTracker.getChildProgressTracker(DEALING)!!) val future = node.smm.add("${TwoPartyDealProtocol.DEAL_TOPIC}.seller", seller) // This is required because we are doing child progress outside of a subprotocol. In future, we should just wrap things like this in a protocol to avoid it @@ -94,16 +93,16 @@ object AutoOfferProtocol { require(serviceHub.networkMapCache.notaryNodes.isNotEmpty()) { "No notary nodes registered" } val ourSessionID = random63BitValue() - val notary = serviceHub.networkMapCache.notaryNodes.first() + val notary = serviceHub.networkMapCache.notaryNodes.first().identity // need to pick which ever party is not us val otherParty = notUs(*dealToBeOffered.parties).single() val otherNode = (serviceHub.networkMapCache.getNodeByLegalName(otherParty.name)) requireNotNull(otherNode) { "Cannot identify other party " + otherParty.name + ", know about: " + serviceHub.networkMapCache.partyNodes.map { it.identity } } val otherSide = otherNode!!.address progressTracker.currentStep = ANNOUNCING - send(TOPIC, otherSide, 0, AutoOfferMessage(serviceHub.networkService.myAddress, ourSessionID, dealToBeOffered)) + send(TOPIC, otherSide, 0, AutoOfferMessage(serviceHub.networkService.myAddress, notary, ourSessionID, dealToBeOffered)) progressTracker.currentStep = DEALING - val stx = subProtocol(TwoPartyDealProtocol.Acceptor(otherSide, notary.identity, dealToBeOffered, ourSessionID, progressTracker.getChildProgressTracker(DEALING)!!)) + val stx = subProtocol(TwoPartyDealProtocol.Acceptor(otherSide, notary, dealToBeOffered, ourSessionID, progressTracker.getChildProgressTracker(DEALING)!!)) return stx } diff --git a/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt b/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt index 452e315871..c702be19fc 100644 --- a/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt +++ b/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt @@ -1,171 +1,50 @@ package com.r3corda.demos.protocols import co.paralleluniverse.fibers.Suspendable -import com.r3corda.contracts.InterestRateSwap -import com.r3corda.core.contracts.DealState -import com.r3corda.core.contracts.StateAndRef -import com.r3corda.core.contracts.TransactionState import com.r3corda.core.node.NodeInfo -import com.r3corda.core.node.services.linearHeadsOfType import com.r3corda.core.protocols.ProtocolLogic -import com.r3corda.core.random63BitValue import com.r3corda.core.serialization.deserialize import com.r3corda.core.utilities.ProgressTracker import com.r3corda.demos.DemoClock import com.r3corda.node.internal.Node import com.r3corda.node.services.network.MockNetworkMapCache -import com.r3corda.node.utilities.ANSIProgressRenderer -import com.r3corda.protocols.TwoPartyDealProtocol import java.time.LocalDate /** - * This is a very temporary, demo-oriented way of initiating processing of temporal events and is not - * intended as the way things will necessarily be done longer term + * This is a less temporary, demo-oriented way of initiating processing of temporal events */ object UpdateBusinessDayProtocol { val TOPIC = "businessday.topic" - class Updater(val date: LocalDate, val sessionID: Long, - override val progressTracker: ProgressTracker = Updater.tracker()) : ProtocolLogic() { - - companion object { - object FETCHING : ProgressTracker.Step("Fetching deals") - object ITERATING_DEALS : ProgressTracker.Step("Interating over deals") - object ITERATING_FIXINGS : ProgressTracker.Step("Iterating over fixings") - object FIXING : ProgressTracker.Step("Fixing deal") - - fun tracker() = ProgressTracker(FETCHING, ITERATING_DEALS, ITERATING_FIXINGS, FIXING) - } - - @Suspendable - override fun call(): Boolean { - // Get deals - progressTracker.currentStep = FETCHING - val dealStateRefs = serviceHub.walletService.linearHeadsOfType() - val otherPartyToDeals = dealStateRefs.values.groupBy { otherParty(it.state.data) } - - // TODO we need to process these in parallel to stop there being an ordering problem across more than two nodes - val sortedParties = otherPartyToDeals.keys.sortedBy { it.identity.name } - for (party in sortedParties) { - val sortedDeals = otherPartyToDeals[party]!!.sortedBy { it.state.data.ref } - for (deal in sortedDeals) { - progressTracker.currentStep = ITERATING_DEALS - processDeal(party, deal, date, sessionID) - } - } - return false - } - - // This assumes we definitely have one key or the other - fun otherParty(deal: DealState): NodeInfo { - val ourKeys = serviceHub.keyManagementService.keys.keys - return serviceHub.networkMapCache.getNodeByLegalName(deal.parties.single { !ourKeys.contains(it.owningKey) }.name)!! - } - - // TODO we should make this more object oriented when we can ask a state for it's contract - @Suspendable - fun processDeal(party: NodeInfo, deal: StateAndRef, date: LocalDate, sessionID: Long) { - val s = deal.state.data - when (s) { - is InterestRateSwap.State -> processInterestRateSwap(party, StateAndRef(TransactionState(s, deal.state.notary), deal.ref), date, sessionID) - } - } - - // TODO and this would move to the InterestRateSwap and cope with permutations of Fixed/Floating and Floating/Floating etc - @Suspendable - fun processInterestRateSwap(party: NodeInfo, deal: StateAndRef, date: LocalDate, sessionID: Long) { - var dealStateAndRef: StateAndRef? = deal - var nextFixingDate = deal.state.data.calculation.nextFixingDate() - while (nextFixingDate != null && !nextFixingDate.isAfter(date)) { - progressTracker.currentStep = ITERATING_FIXINGS - /* - * Note that this choice of fixed versus floating leg is simply to assign roles in - * the fixing protocol and doesn't infer roles or responsibilities in a business sense. - * One of the parties needs to take the lead in the coordination and this is a reliable deterministic way - * to do it. - */ - if (party.identity.name == deal.state.data.fixedLeg.fixedRatePayer.name) { - dealStateAndRef = nextFixingFloatingLeg(dealStateAndRef!!, party, sessionID) - } else { - dealStateAndRef = nextFixingFixedLeg(dealStateAndRef!!, party, sessionID) - } - nextFixingDate = dealStateAndRef?.state?.data?.calculation?.nextFixingDate() - } - } - - @Suspendable - private fun nextFixingFloatingLeg(dealStateAndRef: StateAndRef, party: NodeInfo, sessionID: Long): StateAndRef? { - progressTracker.setChildProgressTracker(FIXING, TwoPartyDealProtocol.Primary.tracker()) - progressTracker.currentStep = FIXING - - val myName = serviceHub.storageService.myLegalIdentity.name - val deal: InterestRateSwap.State = dealStateAndRef.state.data - val myOldParty = deal.parties.single { it.name == myName } - val keyPair = serviceHub.keyManagementService.toKeyPair(myOldParty.owningKey) - val participant = TwoPartyDealProtocol.Floater(party.address, sessionID, serviceHub.networkMapCache.notaryNodes[0], dealStateAndRef, - keyPair, - sessionID, progressTracker.getChildProgressTracker(FIXING)!!) - val result = subProtocol(participant) - return result.tx.outRef(0) - } - - @Suspendable - private fun nextFixingFixedLeg(dealStateAndRef: StateAndRef, party: NodeInfo, sessionID: Long): StateAndRef? { - progressTracker.setChildProgressTracker(FIXING, TwoPartyDealProtocol.Secondary.tracker()) - progressTracker.currentStep = FIXING - - val participant = TwoPartyDealProtocol.Fixer( - party.address, - serviceHub.networkMapCache.notaryNodes[0].identity, - dealStateAndRef, - sessionID, - progressTracker.getChildProgressTracker(FIXING)!!) - val result = subProtocol(participant) - return result.tx.outRef(0) - } - } - - data class UpdateBusinessDayMessage(val date: LocalDate, val sessionID: Long) + data class UpdateBusinessDayMessage(val date: LocalDate) object Handler { + fun register(node: Node) { - node.net.addMessageHandler("$TOPIC.0") { msg, registration -> - // Just to validate we got the message + node.net.addMessageHandler("${TOPIC}.0") { msg, registration -> val updateBusinessDayMessage = msg.data.deserialize() - if ((node.services.clock as DemoClock).updateDate(updateBusinessDayMessage.date)) { - val participant = Updater(updateBusinessDayMessage.date, updateBusinessDayMessage.sessionID) - node.smm.add("update.business.day", participant) - } + (node.services.clock as DemoClock).updateDate(updateBusinessDayMessage.date) } } } class Broadcast(val date: LocalDate, - override val progressTracker: ProgressTracker = Broadcast.tracker()) : ProtocolLogic() { + override val progressTracker: ProgressTracker = Broadcast.tracker()) : ProtocolLogic() { companion object { - object NOTIFYING : ProgressTracker.Step("Notifying peer") - object LOCAL : ProgressTracker.Step("Updating locally") { - override fun childProgressTracker(): ProgressTracker = Updater.tracker() - } + object NOTIFYING : ProgressTracker.Step("Notifying peers") - fun tracker() = ProgressTracker(NOTIFYING, LOCAL) + fun tracker() = ProgressTracker(NOTIFYING) } @Suspendable - override fun call(): Boolean { - val message = UpdateBusinessDayMessage(date, random63BitValue()) - + override fun call(): Unit { + progressTracker.currentStep = NOTIFYING + val message = UpdateBusinessDayMessage(date) for (recipient in serviceHub.networkMapCache.partyNodes) { - progressTracker.currentStep = NOTIFYING doNextRecipient(recipient, message) } - if ((serviceHub.clock as DemoClock).updateDate(message.date)) { - progressTracker.currentStep = LOCAL - subProtocol(Updater(message.date, message.sessionID, progressTracker.getChildProgressTracker(LOCAL)!!)) - } - return true } @Suspendable @@ -173,10 +52,7 @@ object UpdateBusinessDayProtocol { if (recipient.address is MockNetworkMapCache.MockAddress) { // Ignore } else { - // TODO: messaging ourselves seems to trigger a bug for the time being and we continuously receive messages - if (recipient.identity != serviceHub.storageService.myLegalIdentity) { - send(TOPIC, recipient.address, 0, message) - } + send(TOPIC, recipient.address, 0, message) } } }