Pull out StateMachineManager observable into it's own branch

Review feedback

Review feedback

Review feedback
This commit is contained in:
rick.parker 2016-06-10 08:33:51 +01:00
parent ff467beaec
commit 82abc69046
9 changed files with 105 additions and 28 deletions

View File

@ -8,7 +8,6 @@ import com.r3corda.core.node.services.linearHeadsOfType
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.node.api.*
import com.r3corda.node.utilities.*
import java.time.LocalDateTime
import java.util.*
import kotlin.reflect.KParameter
@ -89,7 +88,6 @@ class APIServerImpl(val node: AbstractNode) : APIServer {
}
// If we get here then we matched every parameter
val protocol = constructor.callBy(params) as ProtocolLogic<*>
ANSIProgressRenderer.progressTracker = protocol.progressTracker
val future = node.smm.add("api-call", protocol)
return future
}

View File

@ -35,6 +35,7 @@ import com.r3corda.node.services.transactions.NotaryService
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.node.services.transactions.ValidatingNotaryService
import com.r3corda.node.services.wallet.NodeWalletService
import com.r3corda.node.utilities.ANSIProgressObserver
import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.node.utilities.AffinityExecutor
import org.slf4j.Logger
@ -136,6 +137,10 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
buildAdvertisedServices()
// TODO: this model might change but for now it provides some de-coupling
// Add SMM observers
ANSIProgressObserver(smm)
startMessagingService()
networkMapRegistrationFuture = registerWithNetworkMap()
isPreviousCheckpointsPresent = checkpointStorage.checkpoints.any()

View File

@ -19,7 +19,10 @@ import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.Checkpoint
import com.r3corda.node.services.api.CheckpointStorage
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.node.utilities.AffinityExecutor
import rx.Observable
import rx.subjects.PublishSubject
import java.io.PrintWriter
import java.io.StringWriter
import java.util.*
@ -57,7 +60,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
// property.
private val stateMachines = synchronizedMap(HashMap<ProtocolStateMachineImpl<*>, Checkpoint>())
private val stateMachines = synchronizedMap(LinkedHashMap<ProtocolStateMachineImpl<*>, Checkpoint>())
// Monitoring support.
private val metrics = serviceHub.monitoringService.metrics
@ -84,6 +87,18 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
}
}
val allStateMachines: List<ProtocolLogic<*>>
get() = stateMachines.keys.map { it.logic }
private val _changesPublisher = PublishSubject.create<Pair<ProtocolLogic<*>, AddOrRemove>>()
val changes: Observable<Pair<ProtocolLogic<*>, AddOrRemove>>
get() = _changesPublisher
private fun notifyChangeObservers(psm: ProtocolStateMachineImpl<*>, change: AddOrRemove) {
_changesPublisher.onNext(Pair(psm.logic, change))
}
// Used to work around a small limitation in Quasar.
private val QUASAR_UNBLOCKER = run {
val field = Fiber::class.java.getDeclaredField("SERIALIZER_BLOCKER")
@ -130,6 +145,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
}
}
private fun serializeFiber(fiber: ProtocolStateMachineImpl<*>): SerializedBytes<ProtocolStateMachineImpl<*>> {
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
val kryo = quasarKryo()
// add the map of tokens -> tokenizedServices to the kyro context
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
return fiber.serialize(kryo)
}
private fun deserializeFiber(serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>): ProtocolStateMachineImpl<*> {
val kryo = quasarKryo()
// put the map of token -> tokenized into the kryo context
@ -152,15 +175,17 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
}
}
private fun initFiber(fiber: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) {
stateMachines[fiber] = checkpoint
fiber.resultFuture.then(executor) {
fiber.logic.progressTracker?.currentStep = ProgressTracker.DONE
val finalCheckpoint = stateMachines.remove(fiber)
private fun initFiber(psm: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) {
stateMachines[psm] = checkpoint
notifyChangeObservers(psm, AddOrRemove.ADD)
psm.resultFuture.then(executor) {
psm.logic.progressTracker?.currentStep = ProgressTracker.DONE
val finalCheckpoint = stateMachines.remove(psm)
if (finalCheckpoint != null) {
checkpointStorage.removeCheckpoint(finalCheckpoint)
}
totalFinishedProtocols.inc()
notifyChangeObservers(psm, AddOrRemove.REMOVE)
}
}
@ -173,6 +198,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
try {
val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName)
// Need to add before iterating in case of immediate completion
// TODO: create an initial checkpoint here
initFiber(fiber, null)
executor.executeASAP {
iterateStateMachine(fiber, null) {
@ -218,13 +244,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
fiber: ProtocolStateMachineImpl<*>) {
// We have a request to do something: send, receive, or send-and-receive.
if (request is FiberRequest.ExpectingResponse<*>) {
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
val kryo = quasarKryo()
// add the map of tokens -> tokenizedServices to the kyro context
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
val serialisedFiber = fiber.serialize(kryo)
// Prepare a listener on the network that runs in the background thread when we receive a message.
checkpointOnExpectingResponse(psm, request, serialisedFiber)
checkpointOnExpectingResponse(psm, request, serializeFiber(fiber))
}
// If a non-null payload to send was provided, send it now.
request.payload?.let {

View File

@ -0,0 +1,60 @@
package com.r3corda.node.utilities
import com.r3corda.core.ThreadBox
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.services.statemachine.StateMachineManager
import java.util.*
/**
* This observes the [StateMachineManager] and follows the progress of [ProtocolLogic]s until they complete in the order
* they are added to the [StateMachineManager].
*/
class ANSIProgressObserver(val smm: StateMachineManager) {
init {
smm.changes.subscribe { change: Pair<ProtocolLogic<*>, AddOrRemove> ->
when (change.second) {
AddOrRemove.ADD -> addProtocolLogic(change.first)
AddOrRemove.REMOVE -> removeProtocolLogic(change.first)
}
}
}
private class Content {
var currentlyRendering: ProtocolLogic<*>? = null
val pending = ArrayDeque<ProtocolLogic<*>>()
}
private val state = ThreadBox(Content())
private fun wireUpProgressRendering() {
state.locked {
// Repeat if the progress of the ones we pop from the queue are already done
do {
currentlyRendering = pending.poll()
if (currentlyRendering?.progressTracker != null) {
ANSIProgressRenderer.progressTracker = currentlyRendering!!.progressTracker
}
} while (currentlyRendering?.progressTracker?.currentStep == ProgressTracker.DONE)
}
}
private fun removeProtocolLogic(protocolLogic: ProtocolLogic<*>) {
state.locked {
protocolLogic.progressTracker?.currentStep = ProgressTracker.DONE
if (currentlyRendering == protocolLogic) {
wireUpProgressRendering()
}
}
}
private fun addProtocolLogic(protocolLogic: ProtocolLogic<*>) {
state.locked {
pending.add(protocolLogic)
if ((currentlyRendering?.progressTracker?.currentStep ?: ProgressTracker.DONE) == ProgressTracker.DONE) {
wireUpProgressRendering()
}
}
}
}

View File

@ -70,6 +70,10 @@ object ANSIProgressRenderer {
installedYet = true
}
// Reset the state when a new tracker is wired up.
prevMessagePrinted = null
prevLinesDrawn = 0
draw(true)
subscription = value?.changes?.subscribe { draw(true) }
}

View File

@ -12,13 +12,12 @@ import com.r3corda.core.serialization.deserialize
import com.r3corda.core.utilities.BriefLogFormatter
import com.r3corda.core.utilities.Emoji
import com.r3corda.demos.api.InterestRateSwapAPI
import joptsimple.OptionParser
import com.r3corda.node.internal.Node
import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingService
import com.r3corda.node.utilities.*
import com.r3corda.protocols.RatesFixProtocol
import joptsimple.OptionParser
import java.math.BigDecimal
import java.nio.file.Files
import java.nio.file.Paths
@ -89,7 +88,6 @@ fun main(args: Array<String>) {
val tx = TransactionBuilder()
tx.addOutputState(Cash.State(node.storage.myLegalIdentity.ref(1), 1500.DOLLARS, node.keyManagement.freshKey().public, notary.identity))
val protocol = RatesFixProtocol(tx, oracleNode, fixOf, expectedRate, rateTolerance)
ANSIProgressRenderer.progressTracker = protocol.progressTracker
node.smm.add("demo.ratefix", protocol).get()
node.stop()

View File

@ -26,7 +26,6 @@ import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.persistence.NodeAttachmentService
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.node.services.wallet.NodeWalletService
import com.r3corda.node.utilities.ANSIProgressRenderer
import com.r3corda.protocols.NotaryProtocol
import com.r3corda.protocols.TwoPartyTradeProtocol
import com.typesafe.config.ConfigFactory
@ -173,13 +172,11 @@ fun runSeller(myNetAddr: HostAndPort, node: Node, theirNetAddr: HostAndPort) {
if (node.isPreviousCheckpointsPresent) {
node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).forEach {
ANSIProgressRenderer.progressTracker = it.first.progressTracker
it.second.get()
}
} else {
val otherSide = ArtemisMessagingService.makeRecipient(theirNetAddr)
val seller = TraderDemoProtocolSeller(myNetAddr, otherSide)
ANSIProgressRenderer.progressTracker = seller.progressTracker
node.smm.add("demo.seller", seller).get()
}
@ -196,12 +193,10 @@ fun runBuyer(node: Node) {
val future = if (node.isPreviousCheckpointsPresent) {
val (buyer, future) = node.smm.findStateMachines(TraderDemoProtocolBuyer::class.java).single()
ANSIProgressRenderer.progressTracker = buyer.progressTracker //TODO the SMM will soon be able to wire up the ANSIProgressRenderer automatially
future
} else {
// We use a simple scenario-specific wrapper protocol to make things happen.
val buyer = TraderDemoProtocolBuyer(attachmentsPath, node.info.identity)
ANSIProgressRenderer.progressTracker = buyer.progressTracker
node.smm.add("demo.buyer", buyer)
}

View File

@ -4,15 +4,14 @@ import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.FutureCallback
import com.google.common.util.concurrent.Futures
import com.r3corda.core.contracts.DealState
import com.r3corda.core.crypto.Party
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.node.internal.Node
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.deserialize
import com.r3corda.node.utilities.ANSIProgressRenderer
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.internal.Node
import com.r3corda.protocols.TwoPartyDealProtocol
/**
@ -50,7 +49,6 @@ object AutoOfferProtocol {
fun register(node: Node) {
node.net.addMessageHandler("$TOPIC.0") { msg, registration ->
val progressTracker = tracker()
ANSIProgressRenderer.progressTracker = progressTracker
progressTracker.currentStep = RECEIVED
val autoOfferMessage = msg.data.deserialize<AutoOfferMessage>()
// Put the deal onto the ledger

View File

@ -10,7 +10,6 @@ import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.utilities.ANSIProgressRenderer
import com.r3corda.demos.DemoClock
import com.r3corda.node.internal.Node
import com.r3corda.node.services.network.MockNetworkMapCache
@ -129,7 +128,6 @@ object UpdateBusinessDayProtocol {
val updateBusinessDayMessage = msg.data.deserialize<UpdateBusinessDayMessage>()
if ((node.services.clock as DemoClock).updateDate(updateBusinessDayMessage.date)) {
val participant = Updater(updateBusinessDayMessage.date, updateBusinessDayMessage.sessionID)
ANSIProgressRenderer.progressTracker = participant.progressTracker
node.smm.add("update.business.day", participant)
}
}