Add Fiber ID to state machine change events

Where state machine change events are to be serialized for relay to external systems, serializing the entire
protocol is excessive. Instead being able to identify changes based on the ID of the fiber is a lot simpler
for these use-cases.
This commit is contained in:
Ross Nicoll
2016-07-19 14:58:08 +01:00
parent c442cd01a7
commit f72d8ed9fb
2 changed files with 6 additions and 5 deletions

View File

@ -89,13 +89,13 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
val allStateMachines: List<ProtocolLogic<*>> val allStateMachines: List<ProtocolLogic<*>>
get() = stateMachines.keys.map { it.logic } get() = stateMachines.keys.map { it.logic }
private val _changesPublisher = PublishSubject.create<Pair<ProtocolLogic<*>, AddOrRemove>>() private val _changesPublisher = PublishSubject.create<Triple<ProtocolLogic<*>, AddOrRemove, Long>>()
val changes: Observable<Pair<ProtocolLogic<*>, AddOrRemove>> val changes: Observable<Triple<ProtocolLogic<*>, AddOrRemove, Long>>
get() = _changesPublisher get() = _changesPublisher
private fun notifyChangeObservers(psm: ProtocolStateMachineImpl<*>, change: AddOrRemove) { private fun notifyChangeObservers(psm: ProtocolStateMachineImpl<*>, change: AddOrRemove) {
_changesPublisher.onNext(Pair(psm.logic, change)) _changesPublisher.onNext(Triple(psm.logic, change, psm.id))
} }
// Used to work around a small limitation in Quasar. // Used to work around a small limitation in Quasar.

View File

@ -4,6 +4,7 @@ import com.r3corda.core.ThreadBox
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.utilities.ProgressTracker import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.protocols.TwoPartyDealProtocol
import java.util.* import java.util.*
/** /**
@ -13,7 +14,7 @@ import java.util.*
class ANSIProgressObserver(val smm: StateMachineManager) { class ANSIProgressObserver(val smm: StateMachineManager) {
init { init {
smm.changes.subscribe { change: Pair<ProtocolLogic<*>, AddOrRemove> -> smm.changes.subscribe { change: Triple<ProtocolLogic<*>, AddOrRemove, Long> ->
when (change.second) { when (change.second) {
AddOrRemove.ADD -> addProtocolLogic(change.first) AddOrRemove.ADD -> addProtocolLogic(change.first)
AddOrRemove.REMOVE -> removeProtocolLogic(change.first) AddOrRemove.REMOVE -> removeProtocolLogic(change.first)
@ -57,4 +58,4 @@ class ANSIProgressObserver(val smm: StateMachineManager) {
} }
} }
} }
} }