mirror of
https://github.com/corda/corda.git
synced 2024-12-22 06:17:55 +00:00
Merged in cor-133-observable-smm (pull request #134)
Pull out StateMachineManager observable into it's own branch
This commit is contained in:
commit
d9ad1b78ef
@ -8,7 +8,6 @@ import com.r3corda.core.node.services.linearHeadsOfType
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.serialization.SerializedBytes
|
||||
import com.r3corda.node.api.*
|
||||
import com.r3corda.node.utilities.*
|
||||
import java.time.LocalDateTime
|
||||
import java.util.*
|
||||
import kotlin.reflect.KParameter
|
||||
@ -89,7 +88,6 @@ class APIServerImpl(val node: AbstractNode) : APIServer {
|
||||
}
|
||||
// If we get here then we matched every parameter
|
||||
val protocol = constructor.callBy(params) as ProtocolLogic<*>
|
||||
ANSIProgressRenderer.progressTracker = protocol.progressTracker
|
||||
val future = node.smm.add("api-call", protocol)
|
||||
return future
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import com.r3corda.node.services.transactions.NotaryService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.r3corda.node.services.transactions.ValidatingNotaryService
|
||||
import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import com.r3corda.node.utilities.ANSIProgressObserver
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import org.slf4j.Logger
|
||||
@ -136,6 +137,10 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
|
||||
buildAdvertisedServices()
|
||||
|
||||
// TODO: this model might change but for now it provides some de-coupling
|
||||
// Add SMM observers
|
||||
ANSIProgressObserver(smm)
|
||||
|
||||
startMessagingService()
|
||||
networkMapRegistrationFuture = registerWithNetworkMap()
|
||||
isPreviousCheckpointsPresent = checkpointStorage.checkpoints.any()
|
||||
|
@ -19,7 +19,10 @@ import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.api.Checkpoint
|
||||
import com.r3corda.node.services.api.CheckpointStorage
|
||||
import com.r3corda.node.services.api.ServiceHubInternal
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.io.PrintWriter
|
||||
import java.io.StringWriter
|
||||
import java.util.*
|
||||
@ -57,7 +60,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
|
||||
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
|
||||
// property.
|
||||
private val stateMachines = synchronizedMap(HashMap<ProtocolStateMachineImpl<*>, Checkpoint>())
|
||||
private val stateMachines = synchronizedMap(LinkedHashMap<ProtocolStateMachineImpl<*>, Checkpoint>())
|
||||
|
||||
// Monitoring support.
|
||||
private val metrics = serviceHub.monitoringService.metrics
|
||||
@ -84,6 +87,18 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
}
|
||||
}
|
||||
|
||||
val allStateMachines: List<ProtocolLogic<*>>
|
||||
get() = stateMachines.keys.map { it.logic }
|
||||
|
||||
private val _changesPublisher = PublishSubject.create<Pair<ProtocolLogic<*>, AddOrRemove>>()
|
||||
|
||||
val changes: Observable<Pair<ProtocolLogic<*>, AddOrRemove>>
|
||||
get() = _changesPublisher
|
||||
|
||||
private fun notifyChangeObservers(psm: ProtocolStateMachineImpl<*>, change: AddOrRemove) {
|
||||
_changesPublisher.onNext(Pair(psm.logic, change))
|
||||
}
|
||||
|
||||
// Used to work around a small limitation in Quasar.
|
||||
private val QUASAR_UNBLOCKER = run {
|
||||
val field = Fiber::class.java.getDeclaredField("SERIALIZER_BLOCKER")
|
||||
@ -130,6 +145,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
}
|
||||
}
|
||||
|
||||
private fun serializeFiber(fiber: ProtocolStateMachineImpl<*>): SerializedBytes<ProtocolStateMachineImpl<*>> {
|
||||
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
|
||||
val kryo = quasarKryo()
|
||||
// add the map of tokens -> tokenizedServices to the kyro context
|
||||
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
|
||||
return fiber.serialize(kryo)
|
||||
}
|
||||
|
||||
private fun deserializeFiber(serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>): ProtocolStateMachineImpl<*> {
|
||||
val kryo = quasarKryo()
|
||||
// put the map of token -> tokenized into the kryo context
|
||||
@ -152,15 +175,17 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
}
|
||||
}
|
||||
|
||||
private fun initFiber(fiber: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) {
|
||||
stateMachines[fiber] = checkpoint
|
||||
fiber.resultFuture.then(executor) {
|
||||
fiber.logic.progressTracker?.currentStep = ProgressTracker.DONE
|
||||
val finalCheckpoint = stateMachines.remove(fiber)
|
||||
private fun initFiber(psm: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) {
|
||||
stateMachines[psm] = checkpoint
|
||||
notifyChangeObservers(psm, AddOrRemove.ADD)
|
||||
psm.resultFuture.then(executor) {
|
||||
psm.logic.progressTracker?.currentStep = ProgressTracker.DONE
|
||||
val finalCheckpoint = stateMachines.remove(psm)
|
||||
if (finalCheckpoint != null) {
|
||||
checkpointStorage.removeCheckpoint(finalCheckpoint)
|
||||
}
|
||||
totalFinishedProtocols.inc()
|
||||
notifyChangeObservers(psm, AddOrRemove.REMOVE)
|
||||
}
|
||||
}
|
||||
|
||||
@ -173,6 +198,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
try {
|
||||
val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName)
|
||||
// Need to add before iterating in case of immediate completion
|
||||
// TODO: create an initial checkpoint here
|
||||
initFiber(fiber, null)
|
||||
executor.executeASAP {
|
||||
iterateStateMachine(fiber, null) {
|
||||
@ -218,13 +244,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
fiber: ProtocolStateMachineImpl<*>) {
|
||||
// We have a request to do something: send, receive, or send-and-receive.
|
||||
if (request is FiberRequest.ExpectingResponse<*>) {
|
||||
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
|
||||
val kryo = quasarKryo()
|
||||
// add the map of tokens -> tokenizedServices to the kyro context
|
||||
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
|
||||
val serialisedFiber = fiber.serialize(kryo)
|
||||
// Prepare a listener on the network that runs in the background thread when we receive a message.
|
||||
checkpointOnExpectingResponse(psm, request, serialisedFiber)
|
||||
checkpointOnExpectingResponse(psm, request, serializeFiber(fiber))
|
||||
}
|
||||
// If a non-null payload to send was provided, send it now.
|
||||
request.payload?.let {
|
||||
|
@ -0,0 +1,60 @@
|
||||
package com.r3corda.node.utilities
|
||||
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.utilities.ProgressTracker
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* This observes the [StateMachineManager] and follows the progress of [ProtocolLogic]s until they complete in the order
|
||||
* they are added to the [StateMachineManager].
|
||||
*/
|
||||
class ANSIProgressObserver(val smm: StateMachineManager) {
|
||||
|
||||
init {
|
||||
smm.changes.subscribe { change: Pair<ProtocolLogic<*>, AddOrRemove> ->
|
||||
when (change.second) {
|
||||
AddOrRemove.ADD -> addProtocolLogic(change.first)
|
||||
AddOrRemove.REMOVE -> removeProtocolLogic(change.first)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class Content {
|
||||
var currentlyRendering: ProtocolLogic<*>? = null
|
||||
val pending = ArrayDeque<ProtocolLogic<*>>()
|
||||
}
|
||||
|
||||
private val state = ThreadBox(Content())
|
||||
|
||||
private fun wireUpProgressRendering() {
|
||||
state.locked {
|
||||
// Repeat if the progress of the ones we pop from the queue are already done
|
||||
do {
|
||||
currentlyRendering = pending.poll()
|
||||
if (currentlyRendering?.progressTracker != null) {
|
||||
ANSIProgressRenderer.progressTracker = currentlyRendering!!.progressTracker
|
||||
}
|
||||
} while (currentlyRendering?.progressTracker?.currentStep == ProgressTracker.DONE)
|
||||
}
|
||||
}
|
||||
|
||||
private fun removeProtocolLogic(protocolLogic: ProtocolLogic<*>) {
|
||||
state.locked {
|
||||
protocolLogic.progressTracker?.currentStep = ProgressTracker.DONE
|
||||
if (currentlyRendering == protocolLogic) {
|
||||
wireUpProgressRendering()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun addProtocolLogic(protocolLogic: ProtocolLogic<*>) {
|
||||
state.locked {
|
||||
pending.add(protocolLogic)
|
||||
if ((currentlyRendering?.progressTracker?.currentStep ?: ProgressTracker.DONE) == ProgressTracker.DONE) {
|
||||
wireUpProgressRendering()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -70,6 +70,10 @@ object ANSIProgressRenderer {
|
||||
installedYet = true
|
||||
}
|
||||
|
||||
// Reset the state when a new tracker is wired up.
|
||||
prevMessagePrinted = null
|
||||
prevLinesDrawn = 0
|
||||
draw(true)
|
||||
subscription = value?.changes?.subscribe { draw(true) }
|
||||
}
|
||||
|
||||
|
@ -12,13 +12,12 @@ import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.utilities.BriefLogFormatter
|
||||
import com.r3corda.core.utilities.Emoji
|
||||
import com.r3corda.demos.api.InterestRateSwapAPI
|
||||
import joptsimple.OptionParser
|
||||
import com.r3corda.node.internal.Node
|
||||
import com.r3corda.node.services.clientapi.NodeInterestRates
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingService
|
||||
import com.r3corda.node.utilities.*
|
||||
import com.r3corda.protocols.RatesFixProtocol
|
||||
import joptsimple.OptionParser
|
||||
import java.math.BigDecimal
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Paths
|
||||
@ -89,7 +88,6 @@ fun main(args: Array<String>) {
|
||||
val tx = TransactionBuilder()
|
||||
tx.addOutputState(Cash.State(node.storage.myLegalIdentity.ref(1), 1500.DOLLARS, node.keyManagement.freshKey().public, notary.identity))
|
||||
val protocol = RatesFixProtocol(tx, oracleNode, fixOf, expectedRate, rateTolerance)
|
||||
ANSIProgressRenderer.progressTracker = protocol.progressTracker
|
||||
node.smm.add("demo.ratefix", protocol).get()
|
||||
node.stop()
|
||||
|
||||
|
@ -26,7 +26,6 @@ import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.persistence.NodeAttachmentService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import com.r3corda.node.utilities.ANSIProgressRenderer
|
||||
import com.r3corda.protocols.NotaryProtocol
|
||||
import com.r3corda.protocols.TwoPartyTradeProtocol
|
||||
import com.typesafe.config.ConfigFactory
|
||||
@ -173,13 +172,11 @@ fun runSeller(myNetAddr: HostAndPort, node: Node, theirNetAddr: HostAndPort) {
|
||||
|
||||
if (node.isPreviousCheckpointsPresent) {
|
||||
node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).forEach {
|
||||
ANSIProgressRenderer.progressTracker = it.first.progressTracker
|
||||
it.second.get()
|
||||
}
|
||||
} else {
|
||||
val otherSide = ArtemisMessagingService.makeRecipient(theirNetAddr)
|
||||
val seller = TraderDemoProtocolSeller(myNetAddr, otherSide)
|
||||
ANSIProgressRenderer.progressTracker = seller.progressTracker
|
||||
node.smm.add("demo.seller", seller).get()
|
||||
}
|
||||
|
||||
@ -196,12 +193,10 @@ fun runBuyer(node: Node) {
|
||||
|
||||
val future = if (node.isPreviousCheckpointsPresent) {
|
||||
val (buyer, future) = node.smm.findStateMachines(TraderDemoProtocolBuyer::class.java).single()
|
||||
ANSIProgressRenderer.progressTracker = buyer.progressTracker //TODO the SMM will soon be able to wire up the ANSIProgressRenderer automatially
|
||||
future
|
||||
} else {
|
||||
// We use a simple scenario-specific wrapper protocol to make things happen.
|
||||
val buyer = TraderDemoProtocolBuyer(attachmentsPath, node.info.identity)
|
||||
ANSIProgressRenderer.progressTracker = buyer.progressTracker
|
||||
node.smm.add("demo.buyer", buyer)
|
||||
}
|
||||
|
||||
|
@ -4,15 +4,14 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.util.concurrent.FutureCallback
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import com.r3corda.core.contracts.DealState
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.contracts.SignedTransaction
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.node.internal.Node
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.node.utilities.ANSIProgressRenderer
|
||||
import com.r3corda.core.utilities.ProgressTracker
|
||||
import com.r3corda.node.internal.Node
|
||||
import com.r3corda.protocols.TwoPartyDealProtocol
|
||||
|
||||
/**
|
||||
@ -50,7 +49,6 @@ object AutoOfferProtocol {
|
||||
fun register(node: Node) {
|
||||
node.net.addMessageHandler("$TOPIC.0") { msg, registration ->
|
||||
val progressTracker = tracker()
|
||||
ANSIProgressRenderer.progressTracker = progressTracker
|
||||
progressTracker.currentStep = RECEIVED
|
||||
val autoOfferMessage = msg.data.deserialize<AutoOfferMessage>()
|
||||
// Put the deal onto the ledger
|
||||
|
@ -10,7 +10,6 @@ import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.utilities.ProgressTracker
|
||||
import com.r3corda.node.utilities.ANSIProgressRenderer
|
||||
import com.r3corda.demos.DemoClock
|
||||
import com.r3corda.node.internal.Node
|
||||
import com.r3corda.node.services.network.MockNetworkMapCache
|
||||
@ -129,7 +128,6 @@ object UpdateBusinessDayProtocol {
|
||||
val updateBusinessDayMessage = msg.data.deserialize<UpdateBusinessDayMessage>()
|
||||
if ((node.services.clock as DemoClock).updateDate(updateBusinessDayMessage.date)) {
|
||||
val participant = Updater(updateBusinessDayMessage.date, updateBusinessDayMessage.sessionID)
|
||||
ANSIProgressRenderer.progressTracker = participant.progressTracker
|
||||
node.smm.add("update.business.day", participant)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user