Persistent scheduler, with race condition removed.

This commit is contained in:
rick.parker 2016-10-07 14:26:57 +01:00
parent d7ca215f7d
commit 6a25fcfe8c
5 changed files with 183 additions and 79 deletions

View File

@ -153,7 +153,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
lateinit var net: MessagingServiceInternal
lateinit var netMapCache: NetworkMapCache
lateinit var api: APIServer
lateinit var scheduler: SchedulerService
lateinit var scheduler: NodeSchedulerService
lateinit var protocolLogicFactory: ProtocolLogicRefFactory
lateinit var schemas: SchemaService
val customServices: ArrayList<Any> = ArrayList()
@ -257,6 +257,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
runOnStop += Runnable { net.stop() }
_networkMapRegistrationFuture.setFuture(registerWithNetworkMap())
smm.start()
scheduler.start()
}
started = true
return this

View File

@ -1,21 +1,25 @@
package com.r3corda.node.services.events
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.ThreadBox
import com.r3corda.core.contracts.SchedulableState
import com.r3corda.core.contracts.ScheduledStateRef
import com.r3corda.core.contracts.StateRef
import com.r3corda.core.node.services.SchedulerService
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.utilities.awaitWithDeadline
import com.r3corda.node.utilities.databaseTransaction
import com.r3corda.node.utilities.*
import kotlinx.support.jdk8.collections.compute
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import java.time.Instant
import java.util.*
import java.util.concurrent.Executor
import java.util.concurrent.Executors
import javax.annotation.concurrent.ThreadSafe
@ -27,8 +31,6 @@ import javax.annotation.concurrent.ThreadSafe
* This will observe transactions as they are stored and schedule and unschedule activities based on the States consumed
* or produced.
*
* TODO: Needs extensive support from persistence and protocol frameworks to be truly reliable and atomic.
*
* Currently does not provide any system state other than the ContractState so the expectation is that a transaction
* is the outcome of the activity in order to schedule another activity. Once we have implemented more persistence
* in the nodes, maybe we can consider multiple activities and whether the activities have been completed or not,
@ -48,24 +50,50 @@ class NodeSchedulerService(private val database: Database,
private val log = loggerFor<NodeSchedulerService>()
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}scheduled_states") {
val output = stateRef("transaction_id", "output_index")
val scheduledAt = instant("scheduled_at")
}
// Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're
// inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference
// to somewhere.
private class InnerState {
// TODO: This has no persistence, and we don't consider initialising from non-empty map if we add persistence.
// If we were to rebuild the vault at start up by replaying transactions and re-calculating, then
// persistence here would be unnecessary.
var scheduledStates = HashMap<StateRef, ScheduledStateRef>()
var scheduledStates = object : AbstractJDBCHashMap<StateRef, ScheduledStateRef, Table>(Table, loadOnInit = true) {
override fun keyFromRow(row: ResultRow): StateRef = StateRef(row[table.output.txId], row[table.output.index])
override fun valueFromRow(row: ResultRow): ScheduledStateRef {
return ScheduledStateRef(StateRef(row[table.output.txId], row[table.output.index]), row[table.scheduledAt])
}
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<StateRef, ScheduledStateRef>, finalizables: MutableList<() -> Unit>) {
insert[table.output.txId] = entry.key.txhash
insert[table.output.index] = entry.key.index
}
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<StateRef, ScheduledStateRef>, finalizables: MutableList<() -> Unit>) {
insert[table.scheduledAt] = entry.value.scheduledAt
}
}
var earliestState: ScheduledStateRef? = null
var rescheduled: SettableFuture<Boolean>? = null
internal fun recomputeEarliest() {
earliestState = scheduledStates.map { it.value }.sortedBy { it.scheduledAt }.firstOrNull()
earliestState = scheduledStates.values.sortedBy { it.scheduledAt }.firstOrNull()
}
}
private val mutex = ThreadBox(InnerState())
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a protocol.
fun start() {
mutex.locked {
recomputeEarliest()
rescheduleWakeUp()
}
}
override fun scheduleStateActivity(action: ScheduledStateRef) {
log.trace { "Schedule $action" }
mutex.locked {
@ -100,7 +128,7 @@ class NodeSchedulerService(private val database: Database,
* without the [Future] being cancelled then we run the scheduled action. Finally we remove that action from the
* scheduled actions and recompute the next scheduled action.
*/
private fun rescheduleWakeUp() {
internal fun rescheduleWakeUp() {
// Note, we already have the mutex but we need the scope again here
val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked {
rescheduled?.cancel(false)
@ -123,60 +151,64 @@ class NodeSchedulerService(private val database: Database,
}
private fun onTimeReached(scheduledState: ScheduledStateRef) {
try {
databaseTransaction(database) {
runScheduledActionForState(scheduledState)
services.startProtocol(RunScheduled(scheduledState, this@NodeSchedulerService))
}
class RunScheduled(val scheduledState: ScheduledStateRef, val scheduler: NodeSchedulerService) : ProtocolLogic<Unit>() {
companion object {
object RUNNING : ProgressTracker.Step("Running scheduled...")
fun tracker() = ProgressTracker(RUNNING)
}
override val progressTracker = tracker()
@Suspendable
override fun call(): Unit {
progressTracker.currentStep = RUNNING
val txState = serviceHub.loadState(scheduledState.ref)
val state = txState.data as SchedulableState
val scheduledActivity = try {
// This can throw as running contract code.
state.nextScheduledActivity(scheduledState.ref, scheduler.protocolLogicRefFactory)
} catch(e: Exception) {
logger.error("Attempt to run scheduled state $scheduledState resulted in error.", e)
null
}
} finally {
// Unschedule once complete (or checkpointed)
mutex.locked {
// Ensure we are still scheduled.
var scheduledLogic: ProtocolLogic<*>? = null
scheduler.mutex.locked {
// need to remove us from those scheduled, but only if we are still next
scheduledStates.compute(scheduledState.ref) { ref, value ->
if (value === scheduledState) null else value
if (value === scheduledState) {
if (scheduledActivity == null) {
logger.info("Scheduled state $scheduledState has rescheduled to never.")
null
} else if (scheduledActivity.scheduledAt.isAfter(serviceHub.clock.instant())) {
logger.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.")
ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt)
} else {
// TODO: ProtocolLogicRefFactory needs to sort out the class loader etc
val logic = scheduler.protocolLogicRefFactory.toProtocolLogic(scheduledActivity.logicRef)
logger.trace { "Scheduler starting ProtocolLogic $logic" }
// ProtocolLogic will be checkpointed by the time this returns.
//scheduler.services.startProtocolAndForget(logic)
scheduledLogic = logic
null
}
} else {
value
}
}
// and schedule the next one
recomputeEarliest()
rescheduleWakeUp()
scheduler.rescheduleWakeUp()
}
if(scheduledLogic != null) {
subProtocol(scheduledLogic!!)
}
}
}
private fun runScheduledActionForState(scheduledState: ScheduledStateRef) {
val txState = services.loadState(scheduledState.ref)
// It's OK to return if it's null as there's nothing scheduled
// TODO: implement sandboxing as necessary
val scheduledActivity = sandbox {
val state = txState.data as SchedulableState
state.nextScheduledActivity(scheduledState.ref, protocolLogicRefFactory)
} ?: return
if (scheduledActivity.scheduledAt.isAfter(services.clock.instant())) {
// I suppose it might turn out that the action is no longer due (a bug, maybe), so we need to defend against that and re-schedule
// TODO: warn etc
mutex.locked {
// Replace with updated instant
scheduledStates.compute(scheduledState.ref) { ref, value ->
if (value === scheduledState) ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt) else value
}
}
} else {
/**
* TODO: align with protocol invocation via API... make it the same code
* TODO: Persistence and durability issues:
* a) Need to consider potential to run activity twice if restart between here and removing from map if we add persistence
* b) But if remove from map first, there's potential to run zero times if restart
* c) Address by switch to 3rd party scheduler? Only benefit of this impl. is support for DemoClock or other MutableClocks (e.g. for testing)
* TODO: ProtocolLogicRefFactory needs to sort out the class loader etc
*/
val logic = protocolLogicRefFactory.toProtocolLogic(scheduledActivity.logicRef)
log.trace { "Firing ProtocolLogic $logic" }
// TODO: ProtocolLogic should be checkpointed by the time this returns
services.startProtocol(logic)
}
}
// TODO: Does nothing right now, but beware we are calling dynamically loaded code in the contract inside here.
private inline fun <T : Any> sandbox(code: () -> T?): T? {
return code()
}
}

View File

@ -13,18 +13,10 @@ import com.r3corda.node.services.api.ServiceHubInternal
*/
class ScheduledActivityObserver(val services: ServiceHubInternal) {
init {
// TODO: Need to consider failure scenarios. This needs to run if the TX is successfully recorded
services.vaultService.updates.subscribe { update ->
update.consumed.forEach { services.schedulerService.unscheduleStateActivity(it) }
update.produced.forEach { scheduleStateActivity(it, services.protocolLogicRefFactory) }
}
// In the short term, to get restart-able IRS demo, re-initialise from vault state
// TODO: there's a race condition here. We need to move persistence into the scheduler but that is a bigger
// change so I want to revisit as a distinct branch/PR.
for (state in services.vaultService.currentVault.statesOfType<SchedulableState>()) {
scheduleStateActivity(state, services.protocolLogicRefFactory)
}
}
private fun scheduleStateActivity(produced: StateAndRef<ContractState>, protocolLogicRefFactory: ProtocolLogicRefFactory) {

View File

@ -14,6 +14,7 @@ import java.security.PublicKey
import java.sql.Connection
import java.time.Instant
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.util.*
@ -147,6 +148,8 @@ fun Table.secureHash(name: String) = this.registerColumn<SecureHash>(name, Secur
fun Table.party(nameColumnName: String, keyColumnName: String) = PartyColumns(this.varchar(nameColumnName, length = 255), this.publicKey(keyColumnName))
fun Table.uuidString(name: String) = this.registerColumn<UUID>(name, UUIDStringColumnType)
fun Table.localDate(name: String) = this.registerColumn<LocalDate>(name, LocalDateColumnType)
fun Table.localDateTime(name: String) = this.registerColumn<LocalDateTime>(name, LocalDateTimeColumnType)
fun Table.instant(name: String) = this.registerColumn<Instant>(name, InstantColumnType)
fun Table.stateRef(txIdColumnName: String, indexColumnName: String) = StateRefColumns(this.secureHash(txIdColumnName), this.integer(indexColumnName))
/**
@ -210,4 +213,68 @@ object LocalDateColumnType : ColumnType() {
override fun notNullValueToDB(value: Any): Any = if (value is LocalDate) {
java.sql.Date(value.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli())
} else value
}
/**
* [ColumnType] for marshalling to/from database on behalf of [java.time.LocalDateTime].
*/
object LocalDateTimeColumnType : ColumnType() {
private val sqlType = DateColumnType(time = true).sqlType()
override fun sqlType(): String = sqlType
override fun nonNullValueToString(value: Any): String {
if (value is String) return value
val localDateTime = when (value) {
is LocalDateTime -> value
is java.sql.Date -> value.toLocalDate().atStartOfDay()
is java.sql.Timestamp -> value.toLocalDateTime()
else -> error("Unexpected value: $value")
}
return "'$localDateTime'"
}
override fun valueFromDB(value: Any): Any = when (value) {
is java.sql.Date -> value.toLocalDate().atStartOfDay()
is java.sql.Timestamp -> value.toLocalDateTime()
is Long -> LocalDateTime.from(Instant.ofEpochMilli(value))
else -> value
}
override fun notNullValueToDB(value: Any): Any = if (value is LocalDateTime) {
java.sql.Timestamp(value.toInstant(ZoneOffset.UTC).toEpochMilli())
} else value
}
/**
* [ColumnType] for marshalling to/from database on behalf of [java.time.Instant].
*/
object InstantColumnType : ColumnType() {
private val sqlType = DateColumnType(time = true).sqlType()
override fun sqlType(): String = sqlType
override fun nonNullValueToString(value: Any): String {
if (value is String) return value
val localDateTime = when (value) {
is Instant -> value
is java.sql.Date -> value.toLocalDate().atStartOfDay().toInstant(ZoneOffset.UTC)
is java.sql.Timestamp -> value.toLocalDateTime().toInstant(ZoneOffset.UTC)
else -> error("Unexpected value: $value")
}
return "'$localDateTime'"
}
override fun valueFromDB(value: Any): Any = when (value) {
is java.sql.Date -> value.toLocalDate().atStartOfDay().toInstant(ZoneOffset.UTC)
is java.sql.Timestamp -> value.toLocalDateTime().toInstant(ZoneOffset.UTC)
is Long -> LocalDateTime.from(Instant.ofEpochMilli(value)).toInstant(ZoneOffset.UTC)
else -> value
}
override fun notNullValueToDB(value: Any): Any = if (value is Instant) {
java.sql.Timestamp(value.toEpochMilli())
} else value
}

View File

@ -17,12 +17,14 @@ import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.node.utilities.AffinityExecutor
import com.r3corda.node.utilities.configureDatabase
import com.r3corda.node.utilities.databaseTransaction
import com.r3corda.testing.ALICE_KEY
import com.r3corda.testing.node.InMemoryMessagingNetwork
import com.r3corda.testing.node.MockKeyManagementService
import com.r3corda.testing.node.TestClock
import com.r3corda.testing.node.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.exposed.sql.Database
import org.junit.After
import org.junit.Before
import org.junit.Test
@ -55,6 +57,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
lateinit var scheduler: NodeSchedulerService
lateinit var smmExecutor: AffinityExecutor.ServiceAffinityExecutor
lateinit var dataSource: Closeable
lateinit var database: Database
lateinit var countDown: CountDownLatch
lateinit var smmHasRemovedAllProtocols: CountDownLatch
@ -84,17 +87,20 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
calls = 0
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
dataSource = dataSourceAndDatabase.first
val database = dataSourceAndDatabase.second
scheduler = NodeSchedulerService(database, services, factory, schedulerGatedExecutor)
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
val mockSMM = StateMachineManager(services, listOf(services), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor, database)
mockSMM.changes.subscribe { change ->
if (change.addOrRemove == AddOrRemove.REMOVE && mockSMM.allStateMachines.isEmpty()) {
smmHasRemovedAllProtocols.countDown()
database = dataSourceAndDatabase.second
databaseTransaction(database) {
scheduler = NodeSchedulerService(database, services, factory, schedulerGatedExecutor)
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
val mockSMM = StateMachineManager(services, listOf(services, scheduler), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor, database)
mockSMM.changes.subscribe { change ->
if (change.addOrRemove == AddOrRemove.REMOVE && mockSMM.allStateMachines.isEmpty()) {
smmHasRemovedAllProtocols.countDown()
}
}
mockSMM.start()
services.smm = mockSMM
scheduler.start()
}
mockSMM.start()
services.smm = mockSMM
}
@After
@ -233,7 +239,9 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
scheduleTX(time, 3)
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
scheduler.unscheduleStateActivity(scheduledRef1!!.ref)
databaseTransaction(database) {
scheduler.unscheduleStateActivity(scheduledRef1!!.ref)
}
testClock.advanceBy(1.days)
countDown.await()
assertThat(calls).isEqualTo(3)
@ -249,7 +257,9 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
assertThat(calls).isEqualTo(0)
scheduler.unscheduleStateActivity(scheduledRef1!!.ref)
databaseTransaction(database) {
scheduler.unscheduleStateActivity(scheduledRef1!!.ref)
}
testClock.advanceBy(1.days)
assertThat(calls).isEqualTo(0)
backgroundExecutor.shutdown()
@ -270,7 +280,9 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
services.recordTransactions(usefulTX)
scheduledRef = ScheduledStateRef(StateRef(txHash, 0), state.instant)
scheduler.scheduleStateActivity(scheduledRef!!)
databaseTransaction(database) {
scheduler.scheduleStateActivity(scheduledRef!!)
}
}
return scheduledRef
}