Event scheduling and docs for event scheduling

This commit is contained in:
rick.parker
2016-05-24 10:03:29 +01:00
parent b3af0ce218
commit 5271882dcd
31 changed files with 1375 additions and 308 deletions

View File

@ -12,6 +12,8 @@ import com.r3corda.core.node.CityDatabase
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.PhysicalLocation
import com.r3corda.core.node.services.*
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.random63BitValue
import com.r3corda.core.seconds
import com.r3corda.core.serialization.deserialize
@ -24,6 +26,8 @@ import com.r3corda.node.services.api.MonitoringService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.events.NodeSchedulerService
import com.r3corda.node.services.events.ScheduledActivityObserver
import com.r3corda.node.services.identity.InMemoryIdentityService
import com.r3corda.node.services.keys.E2ETestKeyManagementService
import com.r3corda.node.services.network.InMemoryNetworkMapCache
@ -48,7 +52,6 @@ import java.nio.file.Path
import java.security.KeyPair
import java.security.Security
import java.time.Clock
import java.time.Instant
import java.util.*
/**
@ -89,9 +92,17 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
override val walletService: WalletService get() = wallet
override val keyManagementService: KeyManagementService get() = keyManagement
override val identityService: IdentityService get() = identity
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val schedulerService: SchedulerService get() = scheduler
override val clock: Clock = platformClock
// Internal only
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val protocolLogicRefFactory = ProtocolLogicRefFactory()
override fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
return smm.add(loggerName, logic)
}
override fun recordTransactions(txs: Iterable<SignedTransaction>) =
recordTransactionsInternal(storage, txs)
}
@ -112,6 +123,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
lateinit var identity: IdentityService
lateinit var net: MessagingService
lateinit var api: APIServer
lateinit var scheduler: SchedulerService
var isPreviousCheckpointsPresent = false
private set
@ -140,7 +152,11 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
// the identity key. But the infrastructure to make that easy isn't here yet.
keyManagement = E2ETestKeyManagementService(setOf(storage.myLegalIdentityKey))
api = APIServerImpl(this)
smm = StateMachineManager(services, listOf(storage, net, wallet, keyManagement, identity, platformClock), checkpointStorage, serverThread)
scheduler = NodeSchedulerService(services)
smm = StateMachineManager(services,
listOf(storage, net, wallet, keyManagement, identity, platformClock, scheduler, interestRatesService),
checkpointStorage,
serverThread)
// This object doesn't need to be referenced from this class because it registers handlers on the network
// service and so that keeps it from being collected.
@ -154,6 +170,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
ANSIProgressObserver(smm)
// Add wallet observers
CashBalanceAsMetricsObserver(services)
ScheduledActivityObserver(services)
startMessagingService()
_networkMapRegistrationFuture.setFuture(registerWithNetworkMap())
@ -210,7 +227,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
private fun updateRegistration(serviceInfo: NodeInfo, type: AddOrRemove): ListenableFuture<NetworkMapService.RegistrationResponse> {
// Register this node against the network
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val expires = platformClock.instant() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(info, networkMapSeq++, type, expires)
val sessionID = random63BitValue()
val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID)
@ -227,7 +244,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
}
open protected fun makeNetworkMapService() {
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val expires = platformClock.instant() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(info, Long.MAX_VALUE, AddOrRemove.ADD, expires)
inNodeNetworkMapService = InMemoryNetworkMapService(net, reg, services.networkMapCache)
}

View File

@ -15,6 +15,7 @@ import com.r3corda.core.node.services.linearHeadsOfType
import com.r3corda.core.node.services.testing.MockIdentityService
import com.r3corda.core.random63BitValue
import com.r3corda.core.success
import com.r3corda.node.services.FixingSessionInitiationHandler
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.utilities.JsonSupport
import com.r3corda.protocols.TwoPartyDealProtocol
@ -30,7 +31,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
val om = JsonSupport.createDefaultMapper(MockIdentityService(network.identities))
init {
currentDay = LocalDate.of(2016, 3, 10) // Should be 12th but the actual first fixing date gets rolled backwards.
currentDateAndTime = LocalDate.of(2016, 3, 8).atStartOfDay()
}
private var nodeAKey: KeyPair? = null
@ -39,6 +40,11 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
private val executeOnNextIteration = Collections.synchronizedList(LinkedList<() -> Unit>())
override fun startMainSimulation(): ListenableFuture<Unit> {
// TODO: until we have general session initiation
FixingSessionInitiationHandler.register(banks[0])
FixingSessionInitiationHandler.register(banks[1])
val future = SettableFuture.create<Unit>()
nodeAKey = banks[0].keyManagement.freshKey()
@ -80,7 +86,6 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
val node1: SimulatedNode = banks[i]
val node2: SimulatedNode = banks[j]
val sessionID = random63BitValue()
val swaps: Map<SecureHash, StateAndRef<InterestRateSwap.State>> = node1.services.walletService.linearHeadsOfType<InterestRateSwap.State>()
val theDealRef: StateAndRef<InterestRateSwap.State> = swaps.values.single()
@ -89,30 +94,23 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
extraNodeLabels[node1] = "Fixing event on $nextFixingDate"
extraNodeLabels[node2] = "Fixing event on $nextFixingDate"
// For some reason the first fix is always before the effective date.
if (nextFixingDate > currentDay)
currentDay = nextFixingDate
val sideA = TwoPartyDealProtocol.Floater(node2.net.myAddress, sessionID, notary.info, theDealRef, nodeAKey!!, sessionID)
val sideB = TwoPartyDealProtocol.Fixer(node1.net.myAddress, notary.info.identity, theDealRef, sessionID)
linkConsensus(listOf(node1, node2, regulators[0]), sideB)
linkProtocolProgress(node1, sideA)
linkProtocolProgress(node2, sideB)
// We have to start the protocols in separate iterations, as adding to the SMM effectively 'iterates' that node
// in the simulation, so if we don't do this then the two sides seem to act simultaneously.
val retFuture = SettableFuture.create<Unit>()
val futA = node1.smm.add("floater", sideA)
val futB = node2.smm.add("fixer", sideB)
executeOnNextIteration += {
Futures.allAsList(futA, futB) success {
retFuture.set(null)
} failure { throwable ->
retFuture.setException(throwable)
}
// Complete the future when the state has been consumed on both nodes
val futA = node1.services.walletService.whenConsumed(theDealRef.ref)
val futB = node2.services.walletService.whenConsumed(theDealRef.ref)
Futures.allAsList(futA, futB) success {
retFuture.set(null)
} failure { throwable ->
retFuture.setException(throwable)
}
showProgressFor(listOf(node1, node2))
showConsensusFor(listOf(node1, node2, regulators[0]))
// For some reason the first fix is always before the effective date.
if (nextFixingDate > currentDateAndTime.toLocalDate())
currentDateAndTime = nextFixingDate.atTime(15, 0)
return retFuture
}
@ -132,13 +130,11 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
val sessionID = random63BitValue()
val instigator = TwoPartyDealProtocol.Instigator(node2.net.myAddress, notary.info, irs, nodeAKey!!, sessionID)
val instigator = TwoPartyDealProtocol.Instigator(node2.net.myAddress, notary.info.identity, irs, nodeAKey!!, sessionID)
val acceptor = TwoPartyDealProtocol.Acceptor(node1.net.myAddress, notary.info.identity, irs, sessionID)
// TODO: Eliminate the need for linkProtocolProgress
linkConsensus(listOf(node1, node2, regulators[0]), acceptor)
linkProtocolProgress(node1, instigator)
linkProtocolProgress(node2, acceptor)
showProgressFor(listOf(node1, node2))
showConsensusFor(listOf(node1, node2, regulators[0]))
val instigatorFuture: ListenableFuture<SignedTransaction> = node1.smm.add("instigator", instigator)

View File

@ -1,6 +1,5 @@
package com.r3corda.node.internal.testing
import com.google.common.base.Function
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.node.CityDatabase
@ -15,11 +14,14 @@ import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.node.utilities.AddOrRemove
import rx.Observable
import rx.subjects.PublishSubject
import java.nio.file.Path
import java.security.KeyPair
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.util.*
/**
@ -145,6 +147,8 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
val serviceProviders: List<SimulatedNode> = listOf(notary, ratesOracle, networkMap)
val banks: List<SimulatedNode> = bankFactory.createAll()
val clocks = (serviceProviders + regulators + banks).map { it.services.clock as TestClock }
private val _allProtocolSteps = PublishSubject.create<Pair<SimulatedNode, ProgressTracker.Change>>()
private val _doneSteps = PublishSubject.create<Collection<SimulatedNode>>()
val allProtocolSteps: Observable<Pair<SimulatedNode, ProgressTracker.Change>> = _allProtocolSteps
@ -156,14 +160,21 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
* The current simulated date. By default this never changes. If you want it to change, you should do so from
* within your overridden [iterate] call. Changes in the current day surface in the [dateChanges] observable.
*/
var currentDay: LocalDate = LocalDate.now()
var currentDateAndTime: LocalDateTime = LocalDate.now().atStartOfDay()
protected set(value) {
field = value
_dateChanges.onNext(value)
}
private val _dateChanges = PublishSubject.create<LocalDate>()
val dateChanges: Observable<LocalDate> = _dateChanges
private val _dateChanges = PublishSubject.create<LocalDateTime>()
val dateChanges: Observable<LocalDateTime> get() = _dateChanges
init {
// Advance node clocks when current time is changed
dateChanges.subscribe {
clocks.setTo(currentDateAndTime.toInstant(ZoneOffset.UTC))
}
}
/**
* A place for simulations to stash human meaningful text about what the node is "thinking", which might appear
@ -201,7 +212,15 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
return null
}
protected fun linkProtocolProgress(node: SimulatedNode, protocol: ProtocolLogic<*>) {
protected fun showProgressFor(nodes: List<SimulatedNode>) {
nodes.forEach { node ->
node.smm.changes.filter { it.second == AddOrRemove.ADD }.first().subscribe {
linkProtocolProgress(node, it.first)
}
}
}
private fun linkProtocolProgress(node: SimulatedNode, protocol: ProtocolLogic<*>) {
val pt = protocol.progressTracker ?: return
pt.changes.subscribe { change: ProgressTracker.Change ->
// Runs on node thread.
@ -211,7 +230,15 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
_allProtocolSteps.onNext(Pair(node, ProgressTracker.Change.Position(pt, pt.steps[1])))
}
protected fun linkConsensus(nodes: Collection<SimulatedNode>, protocol: ProtocolLogic<*>) {
protected fun showConsensusFor(nodes: List<SimulatedNode>) {
val node = nodes.first()
node.smm.changes.filter { it.second == AddOrRemove.ADD }.first().subscribe {
linkConsensus(nodes, it.first)
}
}
private fun linkConsensus(nodes: Collection<SimulatedNode>, protocol: ProtocolLogic<*>) {
protocol.progressTracker?.changes?.subscribe { change: ProgressTracker.Change ->
// Runs on node thread.
if (protocol.progressTracker!!.currentStep == ProgressTracker.DONE) {

View File

@ -49,3 +49,9 @@ class TestClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableC
return delegateClock.zone
}
}
/**
* A helper method to set several [TestClock]s to approximately the same time. The clocks may drift by the time it
* takes for each [TestClock] to have it's time set and any observers to execute.
*/
fun Iterable<TestClock>.setTo(instant: Instant) = this.forEach { it.setTo(instant) }

View File

@ -49,9 +49,8 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
val sellerProtocol = TwoPartyTradeProtocol.Seller(buyer.net.myAddress, notary.info,
issuance.tx.outRef(0), amount, seller.storage.myLegalIdentityKey, sessionID)
linkConsensus(listOf(buyer, seller, notary), sellerProtocol)
linkProtocolProgress(buyer, buyerProtocol)
linkProtocolProgress(seller, sellerProtocol)
showConsensusFor(listOf(buyer, seller, notary))
showProgressFor(listOf(buyer, seller))
val buyerFuture = buyer.smm.add("bank.$buyerBankIndex.${TwoPartyTradeProtocol.TRADE_TOPIC}.buyer", buyerProtocol)
val sellerFuture = seller.smm.add("bank.$sellerBankIndex.${TwoPartyTradeProtocol.TRADE_TOPIC}.seller", sellerProtocol)

View File

@ -0,0 +1,22 @@
package com.r3corda.node.services
import com.r3corda.core.serialization.deserialize
import com.r3corda.node.internal.AbstractNode
import com.r3corda.protocols.TwoPartyDealProtocol
/**
* This is a temporary handler required for establishing random sessionIDs for the [Fixer] and [Floater] as part of
* running scheduled fixings for the [InterestRateSwap] contract.
*
* TODO: This will be replaced with the automatic sessionID / session setup work.
*/
object FixingSessionInitiationHandler {
fun register(node: AbstractNode) {
node.net.addMessageHandler("${TwoPartyDealProtocol.FIX_INITIATE_TOPIC}.0") { msg, registration ->
val initiation = msg.data.deserialize<TwoPartyDealProtocol.FixingSessionInitiation>()
val protocol = TwoPartyDealProtocol.Fixer(initiation)
node.smm.add("fixings", protocol)
}
}
}

View File

@ -1,11 +1,15 @@
package com.r3corda.node.services.api
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.TxWritableStorageService
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory
abstract class ServiceHubInternal : ServiceHub {
abstract val monitoringService: MonitoringService
abstract val protocolLogicRefFactory: ProtocolLogicRefFactory
/**
* Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then
@ -18,4 +22,11 @@ abstract class ServiceHubInternal : ServiceHub {
txs.forEach { writableStorageService.validatedTransactions.addTransaction(it) }
walletService.notifyAll(txs.map { it.tx })
}
/**
* TODO: borrowing this method from service manager work in another branch. It's required to avoid circular dependency
* between SMM and the scheduler. That particular problem should also be resolved by the service manager work
* itself, at which point this method would not be needed (by the scheduler)
*/
abstract fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T>
}

View File

@ -1,5 +1,7 @@
package com.r3corda.node.services.clientapi
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.RetryableException
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party
@ -8,14 +10,19 @@ import com.r3corda.core.math.CubicSplineInterpolator
import com.r3corda.core.math.Interpolator
import com.r3corda.core.math.InterpolatorFactory
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.api.AcceptsFileUpload
import org.slf4j.LoggerFactory
import com.r3corda.node.utilities.FiberBox
import com.r3corda.protocols.RatesFixProtocol
import org.slf4j.LoggerFactory
import java.io.InputStream
import java.math.BigDecimal
import java.security.KeyPair
import java.time.Clock
import java.time.Instant
import java.time.LocalDate
import java.util.*
import javax.annotation.concurrent.ThreadSafe
@ -36,7 +43,7 @@ object NodeInterestRates {
*/
class Service(node: AbstractNode) : AcceptsFileUpload, AbstractNodeService(node.services.networkService) {
val ss = node.services.storageService
val oracle = NodeInterestRates.Oracle(ss.myLegalIdentity, ss.myLegalIdentityKey)
val oracle = Oracle(ss.myLegalIdentity, ss.myLegalIdentityKey, node.services.clock)
private val logger = LoggerFactory.getLogger(Service::class.java)
@ -46,11 +53,42 @@ object NodeInterestRates {
{ message, e -> logger.error("Exception during interest rate oracle request processing", e) }
)
addMessageHandler(RatesFixProtocol.TOPIC_QUERY,
{ req: RatesFixProtocol.QueryRequest -> oracle.query(req.queries) },
{ req: RatesFixProtocol.QueryRequest ->
/**
* We put this into a protocol so that if it blocks waiting for the interest rate to become
* available, we a) don't block this thread and b) allow the fact we are waiting
* to be persisted/checkpointed.
* Interest rates become available when they are uploaded via the web as per [DataUploadServlet],
* if they haven't already been uploaded that way.
*/
node.smm.add("fixing", FixQueryHandler(this, req))
return@addMessageHandler
},
{ message, e -> logger.error("Exception during interest rate oracle request processing", e) }
)
}
private class FixQueryHandler(val service: Service, val request: RatesFixProtocol.QueryRequest) : ProtocolLogic<Unit>() {
companion object {
object RECEIVED : ProgressTracker.Step("Received fix request")
object SENDING : ProgressTracker.Step("Sending fix response")
}
override val progressTracker = ProgressTracker(RECEIVED, SENDING)
init {
progressTracker.currentStep = RECEIVED
}
@Suspendable
override fun call(): Unit {
val answers = service.oracle.query(request.queries, request.deadline)
progressTracker.currentStep = SENDING
send("${RatesFixProtocol.TOPIC}.query", request.replyTo, request.sessionID!!, answers)
}
}
// File upload support
override val dataTypePrefix = "interest-rates"
override val acceptableFileExtensions = listOf(".rates", ".txt")
@ -73,25 +111,44 @@ object NodeInterestRates {
* The oracle will try to interpolate the missing value of a tenor for the given fix name and date.
*/
@ThreadSafe
class Oracle(val identity: Party, private val signingKey: KeyPair) {
class Oracle(val identity: Party, private val signingKey: KeyPair, val clock: Clock) {
private class InnerState {
var container: FixContainer = FixContainer(emptyList<Fix>())
}
private val mutex = FiberBox(InnerState())
var knownFixes: FixContainer
set(value) {
require(value.size > 0)
mutex.write {
container = value
}
}
get() = mutex.read { container }
// Make this the last bit of initialisation logic so fully constructed when entered into instances map
init {
require(signingKey.public == identity.owningKey)
}
@Volatile var knownFixes = FixContainer(emptyList<Fix>())
set(value) {
require(value.size > 0)
field = value
}
fun query(queries: List<FixOf>): List<Fix> {
/**
* This method will now wait until the given deadline if the fix for the given [FixOf] is not immediately
* available. To implement this, [readWithDeadline] will loop if the deadline is not reached and we throw
* [UnknownFix] as it implements [RetryableException] which has special meaning to this function.
*/
@Suspendable
fun query(queries: List<FixOf>, deadline: Instant): List<Fix> {
require(queries.isNotEmpty())
val knownFixes = knownFixes // Snapshot
val answers: List<Fix?> = queries.map { knownFixes[it] }
val firstNull = answers.indexOf(null)
if (firstNull != -1)
throw UnknownFix(queries[firstNull])
return answers.filterNotNull()
return mutex.readWithDeadline(clock, deadline) {
val answers: List<Fix?> = queries.map { container[it] }
val firstNull = answers.indexOf(null)
if (firstNull != -1) {
throw UnknownFix(queries[firstNull])
} else {
answers.filterNotNull()
}
}
}
fun sign(wtx: WireTransaction): DigitalSignature.LegallyIdentifiable {
@ -120,9 +177,8 @@ object NodeInterestRates {
}
}
class UnknownFix(val fix: FixOf) : Exception() {
override fun toString() = "Unknown fix: $fix"
}
// TODO: can we split into two? Fix not available (retryable/transient) and unknown (permanent)
class UnknownFix(val fix: FixOf) : RetryableException("Unknown fix: $fix")
/** Fix container, for every fix name & date pair stores a tenor to interest rate map - [InterpolatingRateMap] */
class FixContainer(val fixes: List<Fix>, val factory: InterpolatorFactory = CubicSplineInterpolator) {

View File

@ -0,0 +1,177 @@
package com.r3corda.node.services.events
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.ThreadBox
import com.r3corda.core.contracts.SchedulableState
import com.r3corda.core.contracts.ScheduledStateRef
import com.r3corda.core.contracts.StateRef
import com.r3corda.core.node.services.SchedulerService
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.utilities.awaitWithDeadline
import java.time.Instant
import java.util.*
import java.util.concurrent.Executor
import java.util.concurrent.Executors
import javax.annotation.concurrent.ThreadSafe
/**
* A first pass of a simple [SchedulerService] that works with [MutableClock]s for testing, demonstrations and simulations
* that also encompasses the [Wallet] observer for processing transactions.
*
* This will observe transactions as they are stored and schedule and unschedule activities based on the States consumed
* or produced.
*
* TODO: Needs extensive support from persistence and protocol frameworks to be truly reliable and atomic.
*
* Currently does not provide any system state other than the ContractState so the expectation is that a transaction
* is the outcome of the activity in order to schedule another activity. Once we have implemented more persistence
* in the nodes, maybe we can consider multiple activities and whether the activities have been completed or not,
* but that starts to sound a lot like off-ledger state.
*
* @param services Core node services.
* @param protocolLogicRefFactory Factory for restoring [ProtocolLogic] instances from references.
* @param schedulerTimerExecutor The executor the scheduler blocks on waiting for the clock to advance to the next
* activity. Only replace this for unit testing purposes. This is not the executor the [ProtocolLogic] is launched on.
*/
@ThreadSafe
class NodeSchedulerService(private val services: ServiceHubInternal,
private val protocolLogicRefFactory: ProtocolLogicRefFactory = ProtocolLogicRefFactory(),
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor())
: SchedulerService, SingletonSerializeAsToken() {
private val log = loggerFor<NodeSchedulerService>()
// Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're
// inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference
// to somewhere.
private class InnerState {
// TODO: This has no persistence, and we don't consider initialising from non-empty map if we add persistence.
// If we were to rebuild the wallet at start up by replaying transactions and re-calculating, then
// persistence here would be unnecessary.
var scheduledStates = HashMap<StateRef, ScheduledStateRef>()
var earliestState: ScheduledStateRef? = null
var rescheduled: SettableFuture<Boolean>? = null
internal fun recomputeEarliest() {
earliestState = scheduledStates.map { it.value }.sortedBy { it.scheduledAt }.firstOrNull()
}
}
private val mutex = ThreadBox(InnerState())
override fun scheduleStateActivity(action: ScheduledStateRef) {
log.trace { "Schedule $action" }
mutex.locked {
scheduledStates[action.ref] = action
if (action.scheduledAt.isBefore(earliestState?.scheduledAt ?: Instant.MAX)) {
// We are earliest
earliestState = action
rescheduleWakeUp()
} else if (earliestState?.ref == action.ref && earliestState!!.scheduledAt != action.scheduledAt) {
// We were earliest but might not be any more
recomputeEarliest()
rescheduleWakeUp()
}
}
}
override fun unscheduleStateActivity(ref: StateRef) {
log.trace { "Unschedule $ref" }
mutex.locked {
val removedAction = scheduledStates.remove(ref)
if (removedAction == earliestState && removedAction != null) {
recomputeEarliest()
rescheduleWakeUp()
}
}
}
/**
* This method first cancels the [Future] for any pending action so that the [awaitWithDeadline] used below
* drops through without running the action. We then create a new [Future] for the new action (so it too can be
* cancelled), and then await the arrival of the scheduled time. If we reach the scheduled time (the deadline)
* without the [Future] being cancelled then we run the scheduled action. Finally we remove that action from the
* scheduled actions and recompute the next scheduled action.
*/
private fun rescheduleWakeUp() {
// Note, we already have the mutex but we need the scope again here
val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked {
rescheduled?.cancel(false)
rescheduled = SettableFuture.create()
Pair(earliestState, rescheduled!!)
}
if (scheduledState != null) {
schedulerTimerExecutor.execute() {
log.trace { "Scheduling as next $scheduledState" }
// This will block the scheduler single thread until the scheduled time (returns false) OR
// the Future is cancelled due to rescheduling (returns true).
if (!services.clock.awaitWithDeadline(scheduledState.scheduledAt, ourRescheduledFuture)) {
log.trace { "Invoking as next $scheduledState" }
onTimeReached(scheduledState)
} else {
log.trace { "Recheduled $scheduledState" }
}
}
}
}
private fun onTimeReached(scheduledState: ScheduledStateRef) {
try {
runScheduledActionForState(scheduledState)
} finally {
// Unschedule once complete (or checkpointed)
mutex.locked {
// need to remove us from those scheduled, but only if we are still next
scheduledStates.compute(scheduledState.ref) { ref, value ->
if (value === scheduledState) null else value
}
// and schedule the next one
recomputeEarliest()
rescheduleWakeUp()
}
}
}
private fun runScheduledActionForState(scheduledState: ScheduledStateRef) {
val txState = services.loadState(scheduledState.ref)
// It's OK to return if it's null as there's nothing scheduled
// TODO: implement sandboxing as necessary
val scheduledActivity = sandbox {
val state = txState.data as SchedulableState
state.nextScheduledActivity(scheduledState.ref, protocolLogicRefFactory)
} ?: return
if (scheduledActivity.scheduledAt.isAfter(services.clock.instant())) {
// I suppose it might turn out that the action is no longer due (a bug, maybe), so we need to defend against that and re-schedule
// TODO: warn etc
mutex.locked {
// Replace with updated instant
scheduledStates.compute(scheduledState.ref) { ref, value ->
if (value === scheduledState) ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt) else value
}
}
} else {
/**
* TODO: align with protocol invocation via API... make it the same code
* TODO: Persistence and durability issues:
* a) Need to consider potential to run activity twice if restart between here and removing from map if we add persistence
* b) But if remove from map first, there's potential to run zero times if restart
* c) Address by switch to 3rd party scheduler? Only benefit of this impl. is support for DemoClock or other MutableClocks (e.g. for testing)
* TODO: ProtocolLogicRefFactory needs to sort out the class loader etc
*/
val logic = protocolLogicRefFactory.toProtocolLogic(scheduledActivity.logicRef)
log.trace { "Firing ProtocolLogic $logic" }
// TODO: ProtocolLogic should be checkpointed by the time this returns
services.startProtocol("scheduled", logic)
}
}
// TODO: Does nothing right now, but beware we are calling dynamically loaded code in the contract inside here.
private inline fun <T : Any> sandbox(code: () -> T?): T? {
return code()
}
}

View File

@ -0,0 +1,35 @@
package com.r3corda.node.services.events
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.SchedulableState
import com.r3corda.core.contracts.ScheduledStateRef
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.node.services.api.ServiceHubInternal
/**
* This observes the wallet and schedules and unschedules activities appropriately based on state production and
* consumption.
*/
class ScheduledActivityObserver(val services: ServiceHubInternal) {
init {
// TODO: Need to consider failure scenarios. This needs to run if the TX is successfully recorded
services.walletService.updates.subscribe { update ->
update.consumed.forEach { services.schedulerService.unscheduleStateActivity(it) }
update.produced.forEach { scheduleStateActivity(it, services.protocolLogicRefFactory) }
}
}
private fun scheduleStateActivity(produced: StateAndRef<ContractState>, protocolLogicRefFactory: ProtocolLogicRefFactory) {
val producedState = produced.state.data
if (producedState is SchedulableState) {
val scheduledAt = sandbox { producedState.nextScheduledActivity(produced.ref, protocolLogicRefFactory)?.scheduledAt } ?: return
services.schedulerService.scheduleStateActivity(ScheduledStateRef(produced.ref, scheduledAt))
}
}
// TODO: Beware we are calling dynamically loaded contract code inside here.
private inline fun <T : Any> sandbox(code: () -> T?): T? {
return code()
}
}

View File

@ -2,9 +2,12 @@ package com.r3corda.node.services
import com.codahale.metrics.MetricRegistry
import com.r3corda.core.contracts.SignedTransaction
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.*
import com.r3corda.core.node.services.testing.MockStorageService
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.MonitoringService
@ -12,6 +15,7 @@ import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.network.MockNetworkMapCache
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.persistence.DataVendingService
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.wallet.NodeWalletService
import java.time.Clock
@ -23,10 +27,11 @@ open class MockServices(
val storage: TxWritableStorageService? = MockStorageService(),
val mapCache: NetworkMapCache? = MockNetworkMapCache(),
val mapService: NetworkMapService? = null,
val overrideClock: Clock? = NodeClock()
val scheduler: SchedulerService? = null,
val overrideClock: Clock? = NodeClock(),
val protocolFactory: ProtocolLogicRefFactory? = ProtocolLogicRefFactory()
) : ServiceHubInternal() {
override val walletService: WalletService = customWallet ?: NodeWalletService(this)
override val keyManagementService: KeyManagementService
get() = keyManagement ?: throw UnsupportedOperationException()
override val identityService: IdentityService
@ -37,10 +42,15 @@ open class MockServices(
get() = mapCache ?: throw UnsupportedOperationException()
override val storageService: StorageService
get() = storage ?: throw UnsupportedOperationException()
override val schedulerService: SchedulerService
get() = scheduler ?: throw UnsupportedOperationException()
override val clock: Clock
get() = overrideClock ?: throw UnsupportedOperationException()
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val protocolLogicRefFactory: ProtocolLogicRefFactory
get() = protocolFactory ?: throw UnsupportedOperationException()
// We isolate the storage service with writable TXes so that it can't be accessed except via recordTransactions()
private val txStorageService: TxWritableStorageService
get() = storage ?: throw UnsupportedOperationException()
@ -48,6 +58,12 @@ open class MockServices(
override fun recordTransactions(txs: Iterable<SignedTransaction>) =
recordTransactionsInternal(txStorageService, txs)
lateinit var smm: StateMachineManager
override fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
return smm.add(loggerName, logic)
}
init {
if (net != null && storage != null) {
// Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener

View File

@ -21,6 +21,8 @@ import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.protocols.RatesFixProtocol
import org.junit.Assert
import org.junit.Test
import java.time.Clock
import java.time.Duration
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
@ -37,11 +39,12 @@ class NodeInterestRatesTest {
val DUMMY_CASH_ISSUER_KEY = generateKeyPair()
val DUMMY_CASH_ISSUER = Party("Cash issuer", DUMMY_CASH_ISSUER_KEY.public)
val oracle = NodeInterestRates.Oracle(MEGA_CORP, MEGA_CORP_KEY).apply { knownFixes = TEST_DATA }
val clock = Clock.systemUTC()
val oracle = NodeInterestRates.Oracle(MEGA_CORP, MEGA_CORP_KEY, clock).apply { knownFixes = TEST_DATA }
@Test fun `query successfully`() {
val q = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M")
val res = oracle.query(listOf(q))
val res = oracle.query(listOf(q), clock.instant())
assertEquals(1, res.size)
assertEquals("0.678".bd, res[0].value)
assertEquals(q, res[0].of)
@ -50,13 +53,13 @@ class NodeInterestRatesTest {
@Test fun `query with one success and one missing`() {
val q1 = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M")
val q2 = NodeInterestRates.parseFixOf("LIBOR 2016-03-15 1M")
val e = assertFailsWith<NodeInterestRates.UnknownFix> { oracle.query(listOf(q1, q2)) }
val e = assertFailsWith<NodeInterestRates.UnknownFix> { oracle.query(listOf(q1, q2), clock.instant()) }
assertEquals(e.fix, q2)
}
@Test fun `query successfully with interpolated rate`() {
val q = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 5M")
val res = oracle.query(listOf(q))
val res = oracle.query(listOf(q), clock.instant())
assertEquals(1, res.size)
Assert.assertEquals(0.7316228, res[0].value.toDouble(), 0.0000001)
assertEquals(q, res[0].of)
@ -64,11 +67,11 @@ class NodeInterestRatesTest {
@Test fun `rate missing and unable to interpolate`() {
val q = NodeInterestRates.parseFixOf("EURIBOR 2016-03-15 3M")
assertFailsWith<NodeInterestRates.UnknownFix> { oracle.query(listOf(q)) }
assertFailsWith<NodeInterestRates.UnknownFix> { oracle.query(listOf(q), clock.instant()) }
}
@Test fun `empty query`() {
assertFailsWith<IllegalArgumentException> { oracle.query(emptyList()) }
assertFailsWith<IllegalArgumentException> { oracle.query(emptyList(), clock.instant()) }
}
@Test fun `refuse to sign with no relevant commands`() {
@ -80,7 +83,7 @@ class NodeInterestRatesTest {
@Test fun `sign successfully`() {
val tx = makeTX()
val fix = oracle.query(listOf(NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M"))).first()
val fix = oracle.query(listOf(NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M")), clock.instant()).first()
tx.addCommand(fix, oracle.identity.owningKey)
// Sign successfully.
@ -106,7 +109,7 @@ class NodeInterestRatesTest {
val tx = TransactionType.General.Builder()
val fixOf = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M")
val protocol = RatesFixProtocol(tx, n2.info, fixOf, "0.675".bd, "0.1".bd)
val protocol = RatesFixProtocol(tx, n2.info, fixOf, "0.675".bd, "0.1".bd, Duration.ofNanos(1))
BriefLogFormatter.initVerbose("rates")
val future = n1.smm.add("rates", protocol)

View File

@ -0,0 +1,247 @@
package com.r3corda.node.services
import com.google.common.jimfs.Configuration
import com.google.common.jimfs.Jimfs
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.days
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.testing.MockKeyManagementService
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRef
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.testing.ALICE_KEY
import com.r3corda.core.testing.DUMMY_NOTARY
import com.r3corda.node.internal.testing.TestClock
import com.r3corda.node.services.events.NodeSchedulerService
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.utilities.AffinityExecutor
import org.assertj.core.api.Assertions.assertThat
import org.junit.Before
import org.junit.Test
import java.nio.file.FileSystem
import java.security.PublicKey
import java.time.Clock
import java.time.Instant
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
// Use an in memory file system for testing attachment storage.
val fs: FileSystem = Jimfs.newFileSystem(Configuration.unix())
val realClock: Clock = Clock.systemUTC()
val stoppedClock = Clock.fixed(realClock.instant(), realClock.zone)
val testClock = TestClock(stoppedClock)
val smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
val schedulerGatedExecutor = AffinityExecutor.Gate(true)
// We have to allow Java boxed primitives but Kotlin warns we shouldn't be using them
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
val factory = ProtocolLogicRefFactory(setOf(TestProtocolLogic::class.java.name), setOf(NodeSchedulerServiceTest::class.java.name, Integer::class.java.name))
val scheduler: NodeSchedulerService
val services: ServiceHub
/**
* Have a reference to this test added to [ServiceHub] so that when the [ProtocolLogic] runs it can access the test instance.
* The [TestState] is serialized and deserialized so attempting to use a transient field won't work, as it just
* results in NPE.
*/
interface TestReference {
val testReference: NodeSchedulerServiceTest
}
init {
val kms = MockKeyManagementService(ALICE_KEY)
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"))
val mockServices = object : MockServices(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
override val testReference = this@NodeSchedulerServiceTest
}
services = mockServices
scheduler = NodeSchedulerService(mockServices, factory, schedulerGatedExecutor)
val mockSMM = StateMachineManager(mockServices, listOf(mockServices), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor)
mockServices.smm = mockSMM
}
lateinit var countDown: CountDownLatch
var calls: Int = 0
@Before
fun setup() {
countDown = CountDownLatch(1)
calls = 0
}
class TestState(val protocolLogicRef: ProtocolLogicRef, val instant: Instant) : LinearState, SchedulableState {
override val participants: List<PublicKey>
get() = throw UnsupportedOperationException()
override val thread = SecureHash.sha256("does not matter but we need it to be unique ${Math.random()}")
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean = true
override fun nextScheduledActivity(thisStateRef: StateRef, protocolLogicRefFactory: ProtocolLogicRefFactory): ScheduledActivity? = ScheduledActivity(protocolLogicRef, instant)
override val contract: Contract
get() = throw UnsupportedOperationException()
}
class TestProtocolLogic(val increment: Int = 1) : ProtocolLogic<Unit>() {
override fun call() {
(serviceHub as TestReference).testReference.calls += increment
(serviceHub as TestReference).testReference.countDown.countDown()
}
}
class Command : TypeOnlyCommandData()
@Test
fun `test activity due now`() {
val time = stoppedClock.instant()
scheduleTX(time)
assertThat(calls).isEqualTo(0)
schedulerGatedExecutor.waitAndRun()
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(1)
}
@Test
fun `test activity due in the past`() {
val time = stoppedClock.instant() - 1.days
scheduleTX(time)
assertThat(calls).isEqualTo(0)
schedulerGatedExecutor.waitAndRun()
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(1)
}
@Test
fun `test activity due in the future`() {
val time = stoppedClock.instant() + 1.days
scheduleTX(time)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
testClock.advanceBy(1.days)
backgroundExecutor.shutdown()
backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(1)
}
@Test
fun `test activity due in the future and schedule another earlier`() {
val time = stoppedClock.instant() + 1.days
scheduleTX(time + 1.days)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduleTX(time, 3)
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
testClock.advanceBy(1.days)
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(3)
backgroundExecutor.shutdown()
backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)
}
@Test
fun `test activity due in the future and schedule another later`() {
val time = stoppedClock.instant() + 1.days
scheduleTX(time)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduleTX(time + 1.days, 3)
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
testClock.advanceBy(1.days)
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(1)
testClock.advanceBy(1.days)
backgroundExecutor.shutdown()
backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)
}
@Test
fun `test activity due in the future and schedule another for same time`() {
val time = stoppedClock.instant() + 1.days
scheduleTX(time)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduleTX(time, 3)
testClock.advanceBy(1.days)
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(1)
backgroundExecutor.shutdown()
backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)
}
@Test
fun `test activity due in the future and schedule another for same time then unschedule original`() {
val time = stoppedClock.instant() + 1.days
var scheduledRef1 = scheduleTX(time)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduleTX(time, 3)
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
scheduler.unscheduleStateActivity(scheduledRef1!!.ref)
testClock.advanceBy(1.days)
countDown.await(60, TimeUnit.SECONDS)
assertThat(calls).isEqualTo(3)
backgroundExecutor.shutdown()
backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)
}
@Test
fun `test activity due in the future then unschedule`() {
var scheduledRef1 = scheduleTX(stoppedClock.instant() + 1.days)
val backgroundExecutor = Executors.newSingleThreadExecutor()
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduler.unscheduleStateActivity(scheduledRef1!!.ref)
testClock.advanceBy(1.days)
assertThat(calls).isEqualTo(0)
backgroundExecutor.shutdown()
backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)
}
private fun scheduleTX(instant: Instant, increment: Int = 1): ScheduledStateRef? {
var scheduledRef: ScheduledStateRef? = null
apply {
val freshKey = services.keyManagementService.freshKey()
val state = TestState(factory.create(TestProtocolLogic::class.java, increment), instant)
val usefulTX = TransactionType.General.Builder(DUMMY_NOTARY).apply {
addOutputState(state)
addCommand(Command(), freshKey.public)
signWith(freshKey)
}.toSignedTransaction()
val txHash = usefulTX.id
services.recordTransactions(usefulTX)
scheduledRef = ScheduledStateRef(StateRef(txHash, 0), state.instant)
scheduler.scheduleStateActivity(scheduledRef!!)
}
return scheduledRef
}
}