mirror of
https://github.com/corda/corda.git
synced 2025-06-22 17:09:00 +00:00
Add StateMachineRunId, type for SMM Changes
This commit is contained in:
@ -36,10 +36,7 @@ import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC
|
||||
import com.r3corda.node.services.network.NodeRegistration
|
||||
import com.r3corda.node.services.network.PersistentNetworkMapService
|
||||
import com.r3corda.node.services.persistence.NodeAttachmentService
|
||||
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
|
||||
import com.r3corda.node.services.persistence.PerFileTransactionStorage
|
||||
import com.r3corda.node.services.persistence.StorageServiceImpl
|
||||
import com.r3corda.node.services.persistence.*
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
import com.r3corda.node.services.transactions.NotaryService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
|
@ -7,6 +7,7 @@ import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.node.services.TxWritableStorageService
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.protocols.ProtocolLogicRefFactory
|
||||
import com.r3corda.core.protocols.StateMachineRunId
|
||||
|
||||
interface MessagingServiceInternal : MessagingService {
|
||||
/**
|
||||
|
@ -2,6 +2,7 @@ package com.r3corda.node.services.monitor
|
||||
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.transactions.LedgerTransaction
|
||||
import com.r3corda.core.protocols.StateMachineRunId
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
@ -22,13 +23,13 @@ sealed class ServiceToClientEvent(val time: Instant) {
|
||||
}
|
||||
class StateMachine(
|
||||
time: Instant,
|
||||
val fiberId: Long,
|
||||
val stateMachineRunId: StateMachineRunId,
|
||||
val label: String,
|
||||
val addOrRemove: AddOrRemove
|
||||
) : ServiceToClientEvent(time) {
|
||||
override fun toString() = "StateMachine($label, ${addOrRemove.name})"
|
||||
}
|
||||
class Progress(time: Instant, val fiberId: Long, val message: String) : ServiceToClientEvent(time) {
|
||||
class Progress(time: Instant, val stateMachineRunId: StateMachineRunId, val message: String) : ServiceToClientEvent(time) {
|
||||
override fun toString() = "Progress($message)"
|
||||
}
|
||||
class TransactionBuild(time: Instant, val id: UUID, val state: TransactionBuildResult) : ServiceToClientEvent(time) {
|
||||
@ -46,7 +47,7 @@ sealed class TransactionBuildResult {
|
||||
*
|
||||
* @param transaction the transaction created as a result, in the case where the protocol has completed.
|
||||
*/
|
||||
class ProtocolStarted(val fiberId: Long, val transaction: LedgerTransaction?, val message: String?) : TransactionBuildResult() {
|
||||
class ProtocolStarted(val stateMachineId: StateMachineRunId, val transaction: LedgerTransaction?, val message: String?) : TransactionBuildResult() {
|
||||
override fun toString() = "Started($message)"
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,7 @@ import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.Vault
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.protocols.StateMachineRunId
|
||||
import com.r3corda.core.serialization.serialize
|
||||
import com.r3corda.core.transactions.LedgerTransaction
|
||||
import com.r3corda.core.transactions.TransactionBuilder
|
||||
@ -63,15 +64,15 @@ class NodeMonitorService(services: ServiceHubInternal, val smm: StateMachineMana
|
||||
services.storageService.validatedTransactions.updates.subscribe { tx -> notifyTransaction(tx.tx.toLedgerTransaction(services)) }
|
||||
services.vaultService.updates.subscribe { update -> notifyVaultUpdate(update) }
|
||||
smm.changes.subscribe { change ->
|
||||
val fiberId: Long = change.third
|
||||
val logic: ProtocolLogic<*> = change.first
|
||||
val stateMachineRunId: StateMachineRunId = change.stateMachineRunId
|
||||
val logic: ProtocolLogic<*> = change.logic
|
||||
val progressTracker = logic.progressTracker
|
||||
|
||||
notifyEvent(ServiceToClientEvent.StateMachine(Instant.now(), fiberId, logic.javaClass.name, change.second))
|
||||
notifyEvent(ServiceToClientEvent.StateMachine(Instant.now(), stateMachineRunId, logic.javaClass.name, change.addOrRemove))
|
||||
if (progressTracker != null) {
|
||||
when (change.second) {
|
||||
when (change.addOrRemove) {
|
||||
AddOrRemove.ADD -> progressTracker.changes.subscribe { progress ->
|
||||
notifyEvent(ServiceToClientEvent.Progress(Instant.now(), fiberId, progress.toString()))
|
||||
notifyEvent(ServiceToClientEvent.Progress(Instant.now(), stateMachineRunId, progress.toString()))
|
||||
}
|
||||
AddOrRemove.REMOVE -> {
|
||||
// Nothing to do
|
||||
@ -168,7 +169,7 @@ class NodeMonitorService(services: ServiceHubInternal, val smm: StateMachineMana
|
||||
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
|
||||
val protocol = FinalityProtocol(tx, setOf(req), setOf(req.recipient))
|
||||
return TransactionBuildResult.ProtocolStarted(
|
||||
smm.add(BroadcastTransactionProtocol.TOPIC, protocol).machineId,
|
||||
smm.add(BroadcastTransactionProtocol.TOPIC, protocol).stateMachineRunId,
|
||||
tx.tx.toLedgerTransaction(services),
|
||||
"Cash payment transaction generated"
|
||||
)
|
||||
@ -202,7 +203,7 @@ class NodeMonitorService(services: ServiceHubInternal, val smm: StateMachineMana
|
||||
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
|
||||
val protocol = FinalityProtocol(tx, setOf(req), participants)
|
||||
return TransactionBuildResult.ProtocolStarted(
|
||||
smm.add(BroadcastTransactionProtocol.TOPIC, protocol).machineId,
|
||||
smm.add(BroadcastTransactionProtocol.TOPIC, protocol).stateMachineRunId,
|
||||
tx.tx.toLedgerTransaction(services),
|
||||
"Cash destruction transaction generated"
|
||||
)
|
||||
@ -221,7 +222,7 @@ class NodeMonitorService(services: ServiceHubInternal, val smm: StateMachineMana
|
||||
// Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it
|
||||
val protocol = BroadcastTransactionProtocol(tx, setOf(req), setOf(req.recipient))
|
||||
return TransactionBuildResult.ProtocolStarted(
|
||||
smm.add(BroadcastTransactionProtocol.TOPIC, protocol).machineId,
|
||||
smm.add(BroadcastTransactionProtocol.TOPIC, protocol).stateMachineRunId,
|
||||
tx.tx.toLedgerTransaction(services),
|
||||
"Cash issuance completed"
|
||||
)
|
||||
|
@ -9,6 +9,7 @@ import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.protocols.ProtocolStateMachine
|
||||
import com.r3corda.core.protocols.StateMachineRunId
|
||||
import com.r3corda.core.utilities.UntrustworthyData
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.api.ServiceHubInternal
|
||||
@ -28,7 +29,8 @@ import java.util.concurrent.ExecutionException
|
||||
* a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost
|
||||
* logic element gets to return the value that the entire state machine resolves to.
|
||||
*/
|
||||
class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
|
||||
class ProtocolStateMachineImpl<R>(override val stateMachineRunId: StateMachineRunId,
|
||||
val logic: ProtocolLogic<R>,
|
||||
scheduler: FiberScheduler,
|
||||
private val loggerName: String)
|
||||
: Fiber<R>("protocol", scheduler), ProtocolStateMachine<R> {
|
||||
|
@ -7,12 +7,14 @@ import com.codahale.metrics.Gauge
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.google.common.base.Throwables
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.abbreviate
|
||||
import com.r3corda.core.messaging.TopicSession
|
||||
import com.r3corda.core.messaging.runOnNextMessage
|
||||
import com.r3corda.core.messaging.send
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.protocols.ProtocolStateMachine
|
||||
import com.r3corda.core.protocols.StateMachineRunId
|
||||
import com.r3corda.core.serialization.*
|
||||
import com.r3corda.core.then
|
||||
import com.r3corda.core.utilities.ProgressTracker
|
||||
@ -24,10 +26,10 @@ import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import rx.subjects.UnicastSubject
|
||||
import java.io.PrintWriter
|
||||
import java.io.StringWriter
|
||||
import java.util.*
|
||||
import java.util.Collections.synchronizedMap
|
||||
import java.util.concurrent.ExecutionException
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
@ -60,30 +62,43 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
|
||||
val scheduler = FiberScheduler()
|
||||
|
||||
data class Change(
|
||||
val logic: ProtocolLogic<*>,
|
||||
val addOrRemove: AddOrRemove,
|
||||
val stateMachineRunId: StateMachineRunId
|
||||
)
|
||||
|
||||
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
|
||||
// property.
|
||||
private val stateMachines = synchronizedMap(LinkedHashMap<ProtocolStateMachineImpl<*>, Checkpoint>())
|
||||
private val mutex = ThreadBox(object {
|
||||
var started = false
|
||||
val stateMachines = LinkedHashMap<ProtocolStateMachineImpl<*>, Checkpoint>()
|
||||
val changesPublisher = PublishSubject.create<Change>()
|
||||
|
||||
fun notifyChangeObservers(psm: ProtocolStateMachineImpl<*>, addOrRemove: AddOrRemove) {
|
||||
changesPublisher.onNext(Change(psm.logic, addOrRemove, psm.stateMachineRunId))
|
||||
}
|
||||
})
|
||||
|
||||
// Monitoring support.
|
||||
private val metrics = serviceHub.monitoringService.metrics
|
||||
|
||||
init {
|
||||
metrics.register("Protocols.InFlight", Gauge<Int> { stateMachines.size })
|
||||
metrics.register("Protocols.InFlight", Gauge<Int> { mutex.content.stateMachines.size })
|
||||
}
|
||||
|
||||
private val checkpointingMeter = metrics.meter("Protocols.Checkpointing Rate")
|
||||
private val totalStartedProtocols = metrics.counter("Protocols.Started")
|
||||
private val totalFinishedProtocols = metrics.counter("Protocols.Finished")
|
||||
private var started = false
|
||||
|
||||
// Context for tokenized services in checkpoints
|
||||
private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryo())
|
||||
|
||||
/** Returns a list of all state machines executing the given protocol logic at the top level (subprotocols do not count) */
|
||||
fun <P : ProtocolLogic<T>, T> findStateMachines(protocolClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
|
||||
synchronized(stateMachines) {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
return stateMachines.keys
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
return mutex.locked {
|
||||
stateMachines.keys
|
||||
.map { it.logic }
|
||||
.filterIsInstance(protocolClass)
|
||||
.map { it to (it.psm as ProtocolStateMachineImpl<T>).resultFuture }
|
||||
@ -91,19 +106,25 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
}
|
||||
|
||||
val allStateMachines: List<ProtocolLogic<*>>
|
||||
get() = stateMachines.keys.map { it.logic }
|
||||
|
||||
private val _changesPublisher = PublishSubject.create<Triple<ProtocolLogic<*>, AddOrRemove, Long>>()
|
||||
get() = mutex.locked { stateMachines.keys.map { it.logic } }
|
||||
|
||||
/**
|
||||
* An observable that emits triples of the changing protocol, the type of change, and a process-specific ID number
|
||||
* which may change across restarts.
|
||||
*/
|
||||
val changes: Observable<Triple<ProtocolLogic<*>, AddOrRemove, Long>>
|
||||
get() = _changesPublisher
|
||||
val changes: Observable<Change>
|
||||
get() = mutex.content.changesPublisher
|
||||
|
||||
private fun notifyChangeObservers(psm: ProtocolStateMachineImpl<*>, change: AddOrRemove) {
|
||||
_changesPublisher.onNext(Triple(psm.logic, change, psm.id))
|
||||
/**
|
||||
* Atomic get snapshot + subscribe. This is needed so we don't miss updates between subscriptions to [changes] and
|
||||
* calls to [allStateMachines]
|
||||
*/
|
||||
fun getAllStateMachinesAndChanges(): Pair<List<ProtocolStateMachineImpl<*>>, Observable<Change>> {
|
||||
return mutex.locked {
|
||||
val bufferedChanges = UnicastSubject.create<Change>()
|
||||
changesPublisher.subscribe(bufferedChanges)
|
||||
Pair(stateMachines.keys.toList(), bufferedChanges)
|
||||
}
|
||||
}
|
||||
|
||||
// Used to work around a small limitation in Quasar.
|
||||
@ -122,7 +143,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
fun start() {
|
||||
checkpointStorage.checkpoints.forEach { createFiberForCheckpoint(it) }
|
||||
serviceHub.networkMapCache.mapServiceRegistered.then(executor) {
|
||||
synchronized(started) {
|
||||
mutex.locked {
|
||||
started = true
|
||||
stateMachines.forEach { restartFiber(it.key, it.value) }
|
||||
}
|
||||
@ -205,18 +226,22 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
}
|
||||
psm.actionOnEnd = {
|
||||
psm.logic.progressTracker?.currentStep = ProgressTracker.DONE
|
||||
val finalCheckpoint = stateMachines.remove(psm)
|
||||
if (finalCheckpoint != null) {
|
||||
checkpointStorage.removeCheckpoint(finalCheckpoint)
|
||||
mutex.locked {
|
||||
val finalCheckpoint = stateMachines.remove(psm)
|
||||
if (finalCheckpoint != null) {
|
||||
checkpointStorage.removeCheckpoint(finalCheckpoint)
|
||||
}
|
||||
totalFinishedProtocols.inc()
|
||||
notifyChangeObservers(psm, AddOrRemove.REMOVE)
|
||||
}
|
||||
totalFinishedProtocols.inc()
|
||||
notifyChangeObservers(psm, AddOrRemove.REMOVE)
|
||||
}
|
||||
val checkpoint = startingCheckpoint()
|
||||
checkpoint.fiberCreated = true
|
||||
totalStartedProtocols.inc()
|
||||
stateMachines[psm] = checkpoint
|
||||
notifyChangeObservers(psm, AddOrRemove.ADD)
|
||||
mutex.locked {
|
||||
stateMachines[psm] = checkpoint
|
||||
notifyChangeObservers(psm, AddOrRemove.ADD)
|
||||
}
|
||||
return checkpoint
|
||||
}
|
||||
|
||||
@ -226,14 +251,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
* restarted with checkpointed state machines in the storage service.
|
||||
*/
|
||||
fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ProtocolStateMachine<T> {
|
||||
val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName)
|
||||
val id = StateMachineRunId.createRandom()
|
||||
val fiber = ProtocolStateMachineImpl(id, logic, scheduler, loggerName)
|
||||
// Need to add before iterating in case of immediate completion
|
||||
val checkpoint = initFiber(fiber) {
|
||||
val checkpoint = Checkpoint(serializeFiber(fiber), null, null)
|
||||
checkpoint
|
||||
}
|
||||
checkpointStorage.addCheckpoint(checkpoint)
|
||||
synchronized(started) { // If we are not started then our checkpoint will be picked up during start
|
||||
mutex.locked { // If we are not started then our checkpoint will be picked up during start
|
||||
if (!started) {
|
||||
return fiber
|
||||
}
|
||||
@ -267,7 +293,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
request: ProtocolIORequest?,
|
||||
receivedPayload: Any?) {
|
||||
val newCheckpoint = Checkpoint(serialisedFiber, request, receivedPayload)
|
||||
val previousCheckpoint = stateMachines.put(psm, newCheckpoint)
|
||||
val previousCheckpoint = mutex.locked {
|
||||
stateMachines.put(psm, newCheckpoint)
|
||||
}
|
||||
if (previousCheckpoint != null) {
|
||||
checkpointStorage.removeCheckpoint(previousCheckpoint)
|
||||
}
|
||||
|
@ -13,10 +13,10 @@ import java.util.*
|
||||
class ANSIProgressObserver(val smm: StateMachineManager) {
|
||||
|
||||
init {
|
||||
smm.changes.subscribe { change: Triple<ProtocolLogic<*>, AddOrRemove, Long> ->
|
||||
when (change.second) {
|
||||
AddOrRemove.ADD -> addProtocolLogic(change.first)
|
||||
AddOrRemove.REMOVE -> removeProtocolLogic(change.first)
|
||||
smm.changes.subscribe { change ->
|
||||
when (change.addOrRemove) {
|
||||
AddOrRemove.ADD -> addProtocolLogic(change.logic)
|
||||
AddOrRemove.REMOVE -> removeProtocolLogic(change.logic)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -86,8 +86,8 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
scheduler = NodeSchedulerService(services, factory, schedulerGatedExecutor)
|
||||
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
|
||||
val mockSMM = StateMachineManager(services, listOf(services), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor)
|
||||
mockSMM.changes.subscribe { change:Triple<ProtocolLogic<*>, AddOrRemove, Long> ->
|
||||
if(change.second==AddOrRemove.REMOVE && mockSMM.allStateMachines.size==0) {
|
||||
mockSMM.changes.subscribe { change ->
|
||||
if (change.addOrRemove == AddOrRemove.REMOVE && mockSMM.allStateMachines.isEmpty()) {
|
||||
smmHasRemovedAllProtocols.countDown()
|
||||
}
|
||||
}
|
||||
@ -273,4 +273,4 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
}
|
||||
return scheduledRef
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user