diff --git a/node/src/main/kotlin/com/r3corda/node/internal/APIServerImpl.kt b/node/src/main/kotlin/com/r3corda/node/internal/APIServerImpl.kt index 602717ca5e..bf9075f67e 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/APIServerImpl.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/APIServerImpl.kt @@ -8,7 +8,6 @@ import com.r3corda.core.node.services.linearHeadsOfType import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.serialization.SerializedBytes import com.r3corda.node.api.* -import com.r3corda.node.utilities.* import java.time.LocalDateTime import java.util.* import kotlin.reflect.KParameter @@ -89,7 +88,6 @@ class APIServerImpl(val node: AbstractNode) : APIServer { } // If we get here then we matched every parameter val protocol = constructor.callBy(params) as ProtocolLogic<*> - ANSIProgressRenderer.progressTracker = protocol.progressTracker val future = node.smm.add("api-call", protocol) return future } diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index bc00eb2b94..383f80c565 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -35,6 +35,7 @@ import com.r3corda.node.services.transactions.NotaryService import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.services.transactions.ValidatingNotaryService import com.r3corda.node.services.wallet.NodeWalletService +import com.r3corda.node.utilities.ANSIProgressObserver import com.r3corda.node.utilities.AddOrRemove import com.r3corda.node.utilities.AffinityExecutor import org.slf4j.Logger @@ -136,6 +137,10 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, buildAdvertisedServices() + // TODO: this model might change but for now it provides some de-coupling + // Add SMM observers + ANSIProgressObserver(smm) + startMessagingService() networkMapRegistrationFuture = registerWithNetworkMap() isPreviousCheckpointsPresent = checkpointStorage.checkpoints.any() diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt index e1b4b24513..1d2ad91afa 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt @@ -19,7 +19,10 @@ import com.r3corda.core.utilities.trace import com.r3corda.node.services.api.Checkpoint import com.r3corda.node.services.api.CheckpointStorage import com.r3corda.node.services.api.ServiceHubInternal +import com.r3corda.node.utilities.AddOrRemove import com.r3corda.node.utilities.AffinityExecutor +import rx.Observable +import rx.subjects.PublishSubject import java.io.PrintWriter import java.io.StringWriter import java.util.* @@ -57,7 +60,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService // A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines // property. - private val stateMachines = synchronizedMap(HashMap, Checkpoint>()) + private val stateMachines = synchronizedMap(LinkedHashMap, Checkpoint>()) // Monitoring support. private val metrics = serviceHub.monitoringService.metrics @@ -84,6 +87,18 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService } } + val allStateMachines: List> + get() = stateMachines.keys.map { it.logic } + + private val _changesPublisher = PublishSubject.create, AddOrRemove>>() + + val changes: Observable, AddOrRemove>> + get() = _changesPublisher + + private fun notifyChangeObservers(psm: ProtocolStateMachineImpl<*>, change: AddOrRemove) { + _changesPublisher.onNext(Pair(psm.logic, change)) + } + // Used to work around a small limitation in Quasar. private val QUASAR_UNBLOCKER = run { val field = Fiber::class.java.getDeclaredField("SERIALIZER_BLOCKER") @@ -130,6 +145,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService } } + private fun serializeFiber(fiber: ProtocolStateMachineImpl<*>): SerializedBytes> { + // We don't use the passed-in serializer here, because we need to use our own augmented Kryo. + val kryo = quasarKryo() + // add the map of tokens -> tokenizedServices to the kyro context + SerializeAsTokenSerializer.setContext(kryo, serializationContext) + return fiber.serialize(kryo) + } + private fun deserializeFiber(serialisedFiber: SerializedBytes>): ProtocolStateMachineImpl<*> { val kryo = quasarKryo() // put the map of token -> tokenized into the kryo context @@ -152,15 +175,17 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService } } - private fun initFiber(fiber: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) { - stateMachines[fiber] = checkpoint - fiber.resultFuture.then(executor) { - fiber.logic.progressTracker?.currentStep = ProgressTracker.DONE - val finalCheckpoint = stateMachines.remove(fiber) + private fun initFiber(psm: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) { + stateMachines[psm] = checkpoint + notifyChangeObservers(psm, AddOrRemove.ADD) + psm.resultFuture.then(executor) { + psm.logic.progressTracker?.currentStep = ProgressTracker.DONE + val finalCheckpoint = stateMachines.remove(psm) if (finalCheckpoint != null) { checkpointStorage.removeCheckpoint(finalCheckpoint) } totalFinishedProtocols.inc() + notifyChangeObservers(psm, AddOrRemove.REMOVE) } } @@ -173,6 +198,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService try { val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName) // Need to add before iterating in case of immediate completion + // TODO: create an initial checkpoint here initFiber(fiber, null) executor.executeASAP { iterateStateMachine(fiber, null) { @@ -218,13 +244,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService fiber: ProtocolStateMachineImpl<*>) { // We have a request to do something: send, receive, or send-and-receive. if (request is FiberRequest.ExpectingResponse<*>) { - // We don't use the passed-in serializer here, because we need to use our own augmented Kryo. - val kryo = quasarKryo() - // add the map of tokens -> tokenizedServices to the kyro context - SerializeAsTokenSerializer.setContext(kryo, serializationContext) - val serialisedFiber = fiber.serialize(kryo) // Prepare a listener on the network that runs in the background thread when we receive a message. - checkpointOnExpectingResponse(psm, request, serialisedFiber) + checkpointOnExpectingResponse(psm, request, serializeFiber(fiber)) } // If a non-null payload to send was provided, send it now. request.payload?.let { diff --git a/node/src/main/kotlin/com/r3corda/node/utilities/ANSIProgressObserver.kt b/node/src/main/kotlin/com/r3corda/node/utilities/ANSIProgressObserver.kt new file mode 100644 index 0000000000..50e5ac9d74 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/utilities/ANSIProgressObserver.kt @@ -0,0 +1,60 @@ +package com.r3corda.node.utilities + +import com.r3corda.core.ThreadBox +import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.utilities.ProgressTracker +import com.r3corda.node.services.statemachine.StateMachineManager +import java.util.* + +/** + * This observes the [StateMachineManager] and follows the progress of [ProtocolLogic]s until they complete in the order + * they are added to the [StateMachineManager]. + */ +class ANSIProgressObserver(val smm: StateMachineManager) { + + init { + smm.changes.subscribe { change: Pair, AddOrRemove> -> + when (change.second) { + AddOrRemove.ADD -> addProtocolLogic(change.first) + AddOrRemove.REMOVE -> removeProtocolLogic(change.first) + } + } + } + + private class Content { + var currentlyRendering: ProtocolLogic<*>? = null + val pending = ArrayDeque>() + } + + private val state = ThreadBox(Content()) + + private fun wireUpProgressRendering() { + state.locked { + // Repeat if the progress of the ones we pop from the queue are already done + do { + currentlyRendering = pending.poll() + if (currentlyRendering?.progressTracker != null) { + ANSIProgressRenderer.progressTracker = currentlyRendering!!.progressTracker + } + } while (currentlyRendering?.progressTracker?.currentStep == ProgressTracker.DONE) + } + } + + private fun removeProtocolLogic(protocolLogic: ProtocolLogic<*>) { + state.locked { + protocolLogic.progressTracker?.currentStep = ProgressTracker.DONE + if (currentlyRendering == protocolLogic) { + wireUpProgressRendering() + } + } + } + + private fun addProtocolLogic(protocolLogic: ProtocolLogic<*>) { + state.locked { + pending.add(protocolLogic) + if ((currentlyRendering?.progressTracker?.currentStep ?: ProgressTracker.DONE) == ProgressTracker.DONE) { + wireUpProgressRendering() + } + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/utilities/ANSIProgressRenderer.kt b/node/src/main/kotlin/com/r3corda/node/utilities/ANSIProgressRenderer.kt index 5d51ffe4c7..dcbb7ccd18 100644 --- a/node/src/main/kotlin/com/r3corda/node/utilities/ANSIProgressRenderer.kt +++ b/node/src/main/kotlin/com/r3corda/node/utilities/ANSIProgressRenderer.kt @@ -70,6 +70,10 @@ object ANSIProgressRenderer { installedYet = true } + // Reset the state when a new tracker is wired up. + prevMessagePrinted = null + prevLinesDrawn = 0 + draw(true) subscription = value?.changes?.subscribe { draw(true) } } diff --git a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt index 3b3202f87c..74d57a34c4 100644 --- a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt @@ -12,13 +12,12 @@ import com.r3corda.core.serialization.deserialize import com.r3corda.core.utilities.BriefLogFormatter import com.r3corda.core.utilities.Emoji import com.r3corda.demos.api.InterestRateSwapAPI -import joptsimple.OptionParser import com.r3corda.node.internal.Node import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingService -import com.r3corda.node.utilities.* import com.r3corda.protocols.RatesFixProtocol +import joptsimple.OptionParser import java.math.BigDecimal import java.nio.file.Files import java.nio.file.Paths @@ -89,7 +88,6 @@ fun main(args: Array) { val tx = TransactionBuilder() tx.addOutputState(Cash.State(node.storage.myLegalIdentity.ref(1), 1500.DOLLARS, node.keyManagement.freshKey().public, notary.identity)) val protocol = RatesFixProtocol(tx, oracleNode, fixOf, expectedRate, rateTolerance) - ANSIProgressRenderer.progressTracker = protocol.progressTracker node.smm.add("demo.ratefix", protocol).get() node.stop() diff --git a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt index 23cf8ee518..707386a722 100644 --- a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt @@ -26,7 +26,6 @@ import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.persistence.NodeAttachmentService import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.services.wallet.NodeWalletService -import com.r3corda.node.utilities.ANSIProgressRenderer import com.r3corda.protocols.NotaryProtocol import com.r3corda.protocols.TwoPartyTradeProtocol import com.typesafe.config.ConfigFactory @@ -173,13 +172,11 @@ fun runSeller(myNetAddr: HostAndPort, node: Node, theirNetAddr: HostAndPort) { if (node.isPreviousCheckpointsPresent) { node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).forEach { - ANSIProgressRenderer.progressTracker = it.first.progressTracker it.second.get() } } else { val otherSide = ArtemisMessagingService.makeRecipient(theirNetAddr) val seller = TraderDemoProtocolSeller(myNetAddr, otherSide) - ANSIProgressRenderer.progressTracker = seller.progressTracker node.smm.add("demo.seller", seller).get() } @@ -196,12 +193,10 @@ fun runBuyer(node: Node) { val future = if (node.isPreviousCheckpointsPresent) { val (buyer, future) = node.smm.findStateMachines(TraderDemoProtocolBuyer::class.java).single() - ANSIProgressRenderer.progressTracker = buyer.progressTracker //TODO the SMM will soon be able to wire up the ANSIProgressRenderer automatially future } else { // We use a simple scenario-specific wrapper protocol to make things happen. val buyer = TraderDemoProtocolBuyer(attachmentsPath, node.info.identity) - ANSIProgressRenderer.progressTracker = buyer.progressTracker node.smm.add("demo.buyer", buyer) } diff --git a/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt b/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt index e4b463d142..38cbbc29ae 100644 --- a/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt +++ b/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt @@ -4,15 +4,14 @@ import co.paralleluniverse.fibers.Suspendable import com.google.common.util.concurrent.FutureCallback import com.google.common.util.concurrent.Futures import com.r3corda.core.contracts.DealState -import com.r3corda.core.crypto.Party import com.r3corda.core.contracts.SignedTransaction +import com.r3corda.core.crypto.Party import com.r3corda.core.messaging.SingleMessageRecipient -import com.r3corda.node.internal.Node import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue import com.r3corda.core.serialization.deserialize -import com.r3corda.node.utilities.ANSIProgressRenderer import com.r3corda.core.utilities.ProgressTracker +import com.r3corda.node.internal.Node import com.r3corda.protocols.TwoPartyDealProtocol /** @@ -50,7 +49,6 @@ object AutoOfferProtocol { fun register(node: Node) { node.net.addMessageHandler("$TOPIC.0") { msg, registration -> val progressTracker = tracker() - ANSIProgressRenderer.progressTracker = progressTracker progressTracker.currentStep = RECEIVED val autoOfferMessage = msg.data.deserialize() // Put the deal onto the ledger diff --git a/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt b/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt index b59cc4a0aa..17788c6dc5 100644 --- a/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt +++ b/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt @@ -10,7 +10,6 @@ import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue import com.r3corda.core.serialization.deserialize import com.r3corda.core.utilities.ProgressTracker -import com.r3corda.node.utilities.ANSIProgressRenderer import com.r3corda.demos.DemoClock import com.r3corda.node.internal.Node import com.r3corda.node.services.network.MockNetworkMapCache @@ -129,7 +128,6 @@ object UpdateBusinessDayProtocol { val updateBusinessDayMessage = msg.data.deserialize() if ((node.services.clock as DemoClock).updateDate(updateBusinessDayMessage.date)) { val participant = Updater(updateBusinessDayMessage.date, updateBusinessDayMessage.sessionID) - ANSIProgressRenderer.progressTracker = participant.progressTracker node.smm.add("update.business.day", participant) } }