diff --git a/build.gradle b/build.gradle index 18c9761d89..26fe8700a2 100644 --- a/build.gradle +++ b/build.gradle @@ -55,7 +55,9 @@ dependencies { compile "com.google.code.findbugs:jsr305:3.0.1" compile "org.slf4j:slf4j-jdk14:1.7.13" - compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" + compile("org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version") { + force = true + } compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version" compile "org.jetbrains.kotlin:kotlin-test:$kotlin_version" @@ -63,9 +65,11 @@ dependencies { force = true // Conflict between Quasar and Artemis } + // JOpt: for command line flags. compile "net.sf.jopt-simple:jopt-simple:4.9" - compile("com.esotericsoftware:kryo:3.0.3") + // Kryo: object graph serialization. + compile "com.esotericsoftware:kryo:3.0.3" compile "de.javakaffee:kryo-serializers:0.37" // Quasar: for the bytecode rewriting for state machines. @@ -78,7 +82,10 @@ dependencies { } compile "org.apache.activemq:artemis-core-client:${artemis_version}" - // For visualisation + // JAnsi: for drawing things to the terminal in nicely coloured ways. + compile "org.fusesource.jansi:jansi:1.11" + + // GraphStream: For visualisation compile "org.graphstream:gs-core:1.3" compile "org.graphstream:gs-ui:1.3" compile("com.intellij:forms_rt:7.0.3") { @@ -91,6 +98,10 @@ dependencies { // slower but you get a much better error message if you forget to annotate a method with @Suspendable that needs it. // // In Java 9 (hopefully) the requirement to annotate methods as @Suspendable will go away. + +applicationDefaultJvmArgs = ["-javaagent:${configurations.quasar.singleFile}"] +mainClassName = 'core.node.TraderDemoKt' + tasks.withType(Test) { jvmArgs "-javaagent:${configurations.quasar.singleFile}" jvmArgs "-Dco.paralleluniverse.fibers.verifyInstrumentation" @@ -100,7 +111,6 @@ tasks.withType(JavaExec) { jvmArgs "-Dco.paralleluniverse.fibers.verifyInstrumentation" } -mainClassName = 'core.node.TraderDemoKt' task runDemoBuyer(type: JavaExec, dependsOn: ':classes') { classpath = sourceSets.main.runtimeClasspath @@ -111,6 +121,5 @@ task runDemoBuyer(type: JavaExec, dependsOn: ':classes') { task runDemoSeller(type: JavaExec, dependsOn: ':classes') { classpath = sourceSets.main.runtimeClasspath main = 'core.node.TraderDemoKt' - args = ['--dir=seller', '--fake-trade-with=localhost', '--network-address=localhost:31338', - '--timestamper-identity-file=buyer/identity-public', '--timestamper-address=localhost'] + args = ['--dir=seller', '--fake-trade-with=localhost', '--network-address=localhost:31338', '--timestamper-identity-file=buyer/identity-public', '--timestamper-address=localhost'] } diff --git a/core/build.gradle b/core/build.gradle index 3db0c676be..1b8b2ef8a1 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -26,13 +26,20 @@ repositories { } dependencies { + compile 'junit:junit:4.12' + compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version" compile "org.jetbrains.kotlin:kotlin-test:$kotlin_version" + // SLF4J: Logging framework. compile "org.slf4j:slf4j-jdk14:1.7.13" - compile("com.google.guava:guava:19.0") + // Guava: Google utilities library. + compile "com.google.guava:guava:19.0" + + // RxJava: observable streams of events. + compile "io.reactivex:rxkotlin:0.40.1" // Quasar: for the bytecode rewriting for state machines. compile("co.paralleluniverse:quasar-core:${quasar_version}:jdk8") diff --git a/core/src/main/kotlin/core/utilities/ProgressTracker.kt b/core/src/main/kotlin/core/utilities/ProgressTracker.kt new file mode 100644 index 0000000000..6b24a29105 --- /dev/null +++ b/core/src/main/kotlin/core/utilities/ProgressTracker.kt @@ -0,0 +1,170 @@ +/* + * Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members + * pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms + * set forth therein. + * + * All other rights reserved. + */ + +package core.utilities + +import core.TransientProperty +import rx.Observable +import rx.Subscription +import rx.lang.kotlin.BehaviourSubject +import rx.subjects.BehaviorSubject +import java.util.* + +// TODO: Expose the concept of errors. +// TODO: It'd be helpful if this class was at least partly thread safe. + +/** + * A progress tracker helps surface information about the progress of an operation to a user interface or API of some + * kind. It lets you define a set of _steps_ that represent an operation. A step is represented by an object (typically + * a singleton). + * + * Steps may logically be children of other steps, which models the case where a large top level operation involves + * sub-operations which may also have a notion of progress. If a step has children, then the tracker will report the + * steps children as the "next step" after the parent. In other words, a parent step is considered to involve actual + * reportable work and is a thing. If the parent step simply groups other steps, then you'll have to step over it + * manually. + * + * Each step has a label. It is assumed by default that the label does not change. If you want a label to change, then + * you can emit a [ProgressTracker.Change.Rendering] object on the [ProgressTracker.Step.changes] observable stream + * after it changes. That object will propagate through to the top level trackers [changes] stream, which renderers can + * subscribe to in order to learn about progress. + * + * An operation can move both forwards and backwards through steps, thus, a [ProgressTracker] can represent operations + * that include loops. + * + * A progress tracker is *not* thread safe. You may move events from the thread making progress to another thread by + * using the [Observable] subscribeOn call. + */ +class ProgressTracker(vararg steps: Step) { + sealed class Change { + class Position(val newStep: Step) : Change() + class Rendering(val ofStep: Step) : Change() + class Structural(val parent: Step) : Change() + } + + /** The superclass of all step objects. */ + open class Step(open val label: String) { + open val changes: Observable = Observable.empty() + } + + /** This class makes it easier to relabel a step on the fly, to provide transient information. */ + open class RelabelableStep(currentLabel: String) : Step(currentLabel) { + override val changes = BehaviourSubject() + + var currentLabel: String = currentLabel + set(value) { + field = value + changes.onNext(ProgressTracker.Change.Rendering(this)) + } + + override val label: String get() = currentLabel + } + + object UNSTARTED : Step("Unstarted") + object DONE : Step("Done") + + /** The steps in this tracker, same as the steps passed to the constructor but with UNSTARTED and DONE inserted. */ + val steps = arrayOf(UNSTARTED, *steps, DONE) + + /** The zero-based index of the current step in the [steps] array (i.e. with UNSTARTED and DONE) */ + var stepIndex: Int = 0 + private set + + /** + * Reading returns the value of steps[stepIndex], writing moves the position of the current tracker. Once moved to + * the [DONE] state, this tracker is finished and the current step cannot be moved again. + */ + var currentStep: Step + get() = steps[stepIndex] + set(value) { + if (currentStep != value) { + check(currentStep != DONE) { "Cannot rewind a progress tracker once it reaches the done state" } + + val index = steps.indexOf(value) + require(index != -1) + + if (index < stepIndex) { + // We are going backwards: unlink and unsubscribe from any child nodes that we're rolling back + // through, in preparation for moving through them again. + for (i in stepIndex downTo index) { + childrenFor.remove(steps[i]) + } + } + + curChangeSubscription?.unsubscribe() + stepIndex = index + _changes.onNext(Change.Position(steps[index])) + curChangeSubscription = currentStep.changes.subscribe { _changes.onNext(it) } + + if (currentStep == DONE) _changes.onCompleted() + } + } + + /** Returns the current step, descending into children to find the deepest step we are up to. */ + val currentStepRecursive: Step + get() = childrenFor[currentStep]?.currentStepRecursive ?: currentStep + + /** + * Writable map that lets you insert child [ProgressTracker]s for particular steps. It's OK to edit this even + * after a progress tracker has been started. + */ + var childrenFor = object : HashMap() { + override fun put(key: Step, value: ProgressTracker): ProgressTracker? { + val r = super.put(key, value) + childSubscriptions[value] = value.changes.subscribe({ _changes.onNext(it) }, { _changes.onError(it) }) + _changes.onNext(Change.Structural(key)) + return r + } + + override fun remove(key: Step): ProgressTracker? { + if (containsKey(key)) + childSubscriptions[this[key]]?.let { it.unsubscribe(); childSubscriptions.remove(this[key]) } + _changes.onNext(Change.Structural(key)) + return super.remove(key) + } + } + + private val childSubscriptions = HashMap() + + private fun _allSteps(level: Int = 0): List> { + val result = ArrayList>() + for (step in steps) { + if (step == UNSTARTED) continue + if (level > 0 && step == DONE) continue + result += Pair(level, step) + childrenFor[step]?.let { result += it._allSteps(level + 1) } + } + return result + } + + /** + * A list of all steps in this ProgressTracker and the children, with the indent level provided starting at zero. + * Note that UNSTARTED is never counted, and DONE is only counted at the calling level. + */ + val allSteps: List> get() = _allSteps() + + private var curChangeSubscription: Subscription? = null + + /** + * Iterates the progress tracker. If the current step has a child, the child is iterated instead (recursively). + * Returns the latest step at the bottom of the step tree. + */ + fun nextStep(): Step { + currentStep = steps[steps.indexOf(currentStep) + 1] + return currentStep + } + + // These two fields won't be serialized. + private val _changes by TransientProperty { BehaviorSubject.create(Change.Position(UNSTARTED)) } + + /** + * An observable stream of changes: includes child steps, resets and any changes emitted by individual steps (e.g. + * if a step changed its label or rendering). + */ + val changes: Observable get() = _changes +} \ No newline at end of file diff --git a/core/src/test/kotlin/core/utilities/ProgressTrackerTest.kt b/core/src/test/kotlin/core/utilities/ProgressTrackerTest.kt new file mode 100644 index 0000000000..4e51672582 --- /dev/null +++ b/core/src/test/kotlin/core/utilities/ProgressTrackerTest.kt @@ -0,0 +1,91 @@ +/* + * Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members + * pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms + * set forth therein. + * + * All other rights reserved. + */ + +package core.utilities + +import org.junit.Before +import org.junit.Test +import java.util.* +import kotlin.test.assertEquals +import kotlin.test.assertFails + +class ProgressTrackerTest { + object SimpleSteps { + object ONE : ProgressTracker.Step("one") + object TWO : ProgressTracker.Step("two") + object THREE : ProgressTracker.Step("three") + object FOUR : ProgressTracker.Step("four") + + fun tracker() = ProgressTracker(ONE, TWO, THREE, FOUR) + } + object ChildSteps { + object AYY : ProgressTracker.Step("ayy") + object BEE : ProgressTracker.Step("bee") + object SEA : ProgressTracker.Step("sea") + + fun tracker() = ProgressTracker(AYY, BEE, SEA) + } + + lateinit var pt: ProgressTracker + + @Before + fun before() { + pt = SimpleSteps.tracker() + } + + @Test + fun `check basic steps`() { + assertEquals(ProgressTracker.UNSTARTED, pt.currentStep) + assertEquals(0, pt.stepIndex) + var stepNotification: ProgressTracker.Step? = null + pt.changes.subscribe { stepNotification = (it as? ProgressTracker.Change.Position)?.newStep } + + assertEquals(SimpleSteps.ONE, pt.nextStep()) + assertEquals(1, pt.stepIndex) + assertEquals(SimpleSteps.ONE, stepNotification) + + assertEquals(SimpleSteps.TWO, pt.nextStep()) + assertEquals(SimpleSteps.THREE, pt.nextStep()) + assertEquals(SimpleSteps.FOUR, pt.nextStep()) + assertEquals(ProgressTracker.DONE, pt.nextStep()) + } + + @Test + fun `cannot go beyond end`() { + pt.currentStep = SimpleSteps.FOUR + assertFails { pt.nextStep() } + } + + @Test + fun `nested children are stepped correctly`() { + val pt2 = ChildSteps.tracker() + + val stepNotification = LinkedList() + pt.changes.subscribe { + stepNotification += it + } + + pt.currentStep = SimpleSteps.ONE + pt.childrenFor[SimpleSteps.TWO] = pt2 + pt.nextStep() + + assertEquals(ChildSteps.AYY, pt.nextStep()) + assertEquals(ChildSteps.AYY, (stepNotification.pollFirst() as ProgressTracker.Change.Position).newStep) + assertEquals(SimpleSteps.TWO, (stepNotification.pollFirst() as ProgressTracker.Change.Structural).parent) + assertEquals(ChildSteps.BEE, pt2.nextStep()) + } + + @Test + fun `can be rewound`() { + val pt2 = ChildSteps.tracker() + pt.childrenFor[SimpleSteps.TWO] = pt2 + repeat(4) { pt.nextStep() } + pt.currentStep = SimpleSteps.ONE + assertEquals(SimpleSteps.TWO, pt.nextStep()) + } +} \ No newline at end of file diff --git a/docs/source/protocol-state-machines.rst b/docs/source/protocol-state-machines.rst index 293fd6d4a7..8667a91c80 100644 --- a/docs/source/protocol-state-machines.rst +++ b/docs/source/protocol-state-machines.rst @@ -511,3 +511,52 @@ the fact that it takes minimal resources and can survive node restarts. this problem doesn't occur. It's also restored for you when a protocol state machine is restored after a node restart. +Progress tracking +----------------- + +Not shown in the code snippets above is the usage of the ``ProgressTracker`` API. Progress tracking exports information +from a protocol about where it's got up to in such a way that observers can render it in a useful manner to humans who +may need to be informed. It may be rendered via an API, in a GUI, onto a terminal window, etc. + +A ``ProgressTracker`` is constructed with a series of ``Step`` objects, where each step is an object representing a +stage in a piece of work. It is therefore typical to use singletons that subclass ``Step``, which may be defined easily +in one line when using Kotlin. Typical steps might be "Waiting for response from peer", "Waiting for signature to be +approved", "Downloading and verifying data" etc. + +Each step exposes a label. By default labels are fixed, but by subclassing ``RelabelableStep`` +you can make a step that can update its label on the fly. That's useful for steps that want to expose non-structured +progress information like the current file being downloaded. By defining your own step types, you can export progress +in a way that's both human readable and machine readable. + +Progress trackers are hierarchical. Each step can be the parent for another tracker. By altering the +``ProgressTracker.childrenFor[step] = tracker`` map, a tree of steps can be created. It's allowed to alter the hierarchy +at runtime, on the fly, and the progress renderers will adapt to that properly. This can be helpful when you don't +fully know ahead of time what steps will be required. If you _do_ know what is required, configuring as much of the +hierarchy ahead of time is a good idea, as that will help the users see what is coming up. + +Every tracker has not only the steps given to it at construction time, but also the singleton +``ProgressTracker.UNSTARTED`` step and the ``ProgressTracker.DONE`` step. Once a tracker has become ``DONE`` its +position may not be modified again (because e.g. the UI may have been removed/cleaned up), but until that point, the +position can be set to any arbitrary set both forwards and backwards. Steps may be skipped, repeated, etc. Note that +rolling the current step backwards will delete any progress trackers that are children of the steps being reversed, on +the assumption that those subtasks will have to be repeated. + +Trackers provide an `Rx observable `_ which streams changes to the hierarchy. The top level +observable exposes all the events generated by its children as well. The changes are represented by objects indicating +whether the change is one of position (i.e. progress), structure (i.e. new subtasks being added/removed) or some other +aspect of rendering (i.e. a step has changed in some way and is requesting a re-render). + +The protocol framework is somewhat integrated with this API. Each ``ProtocolLogic`` may optionally provide a tracker by +overriding the ``protocolTracker`` property (``getProtocolTracker`` method in Java). If the +``ProtocolLogic.subProtocol`` method is used, then the tracker of the sub-protocol will be made a child of the current +step in the parent protocol automatically, if the parent is using tracking in the first place. The framework will also +automatically set the current step to ``DONE`` for you, when the protocol is finished. + +Because a protocol may sometimes wish to configure the children in its progress hierarchy _before_ the sub-protocol +is constructed, for sub-protocols that always follow the same outline regardless of their parameters it's conventional +to define a companion object/static method (for Kotlin/Java respectively) that constructs a tracker, and then allow +the sub-protocol to have the tracker it will use be passed in as a parameter. This allows all trackers to be built +and linked ahead of time. + +In future, the progress tracking framework will become a vital part of how exceptions, errors, and other faults are +surfaced to human operators for investigation and resolution. \ No newline at end of file diff --git a/src/main/kotlin/contracts/protocols/TwoPartyTradeProtocol.kt b/src/main/kotlin/contracts/protocols/TwoPartyTradeProtocol.kt index a7af9c2441..d368e7ef95 100644 --- a/src/main/kotlin/contracts/protocols/TwoPartyTradeProtocol.kt +++ b/src/main/kotlin/contracts/protocols/TwoPartyTradeProtocol.kt @@ -16,10 +16,11 @@ import core.* import core.crypto.DigitalSignature import core.crypto.signWithECDSA import core.messaging.LegallyIdentifiableNode -import core.protocols.ProtocolLogic import core.messaging.SingleMessageRecipient import core.messaging.StateMachineManager import core.node.TimestampingProtocol +import core.protocols.ProtocolLogic +import core.utilities.ProgressTracker import core.utilities.trace import java.security.KeyPair import java.security.PublicKey @@ -86,22 +87,40 @@ object TwoPartyTradeProtocol { val assetToSell: StateAndRef, val price: Amount, val myKeyPair: KeyPair, - val buyerSessionID: Long) : ProtocolLogic() { + val buyerSessionID: Long, + override val progressTracker: ProgressTracker = Seller.tracker()) : ProtocolLogic() { + + companion object { + object AWAITING_PROPOSAL : ProgressTracker.Step("Awaiting transaction proposal") + object VERIFYING : ProgressTracker.Step("Verifying transaction proposal") + object SIGNING : ProgressTracker.Step("Signing transaction") + object TIMESTAMPING : ProgressTracker.Step("Timestamping transaction") + object SENDING_SIGS : ProgressTracker.Step("Sending transaction signatures to buyer") + + fun tracker() = ProgressTracker(AWAITING_PROPOSAL, VERIFYING, SIGNING, TIMESTAMPING, SENDING_SIGS) + } + @Suspendable override fun call(): SignedTransaction { val partialTX: SignedTransaction = receiveAndCheckProposedTransaction() // These two steps could be done in parallel, in theory. Our framework doesn't support that yet though. val ourSignature = signWithOurKey(partialTX) - val tsaSig = subProtocol(TimestampingProtocol(timestampingAuthority, partialTX.txBits)) + val tsaSig = timestamp(partialTX) - val signedTransaction = sendSignatures(partialTX, ourSignature, tsaSig) + return sendSignatures(partialTX, ourSignature, tsaSig) + } - return signedTransaction + @Suspendable + private fun timestamp(partialTX: SignedTransaction): DigitalSignature.LegallyIdentifiable { + progressTracker.currentStep = TIMESTAMPING + return subProtocol(TimestampingProtocol(timestampingAuthority, partialTX.txBits)) } @Suspendable private fun receiveAndCheckProposedTransaction(): SignedTransaction { + progressTracker.currentStep = AWAITING_PROPOSAL + val sessionID = random63BitValue() // Make the first message we'll send to kick off the protocol. @@ -109,7 +128,11 @@ object TwoPartyTradeProtocol { val maybeSTX = sendAndReceive(TRADE_TOPIC, otherSide, buyerSessionID, sessionID, hello) + progressTracker.currentStep = VERIFYING + maybeSTX.validate { + progressTracker.nextStep() + // Check that the tx proposed by the buyer is valid. val missingSigs = it.verify(throwIfSignaturesAreMissing = false) if (missingSigs != setOf(myKeyPair.public, timestampingAuthority.identity.owningKey)) @@ -148,11 +171,16 @@ object TwoPartyTradeProtocol { subProtocol(ResolveTransactionsProtocol(dependencyTxIDs, otherSide)) } - private fun signWithOurKey(partialTX: SignedTransaction) = myKeyPair.signWithECDSA(partialTX.txBits) + @Suspendable + open fun signWithOurKey(partialTX: SignedTransaction): DigitalSignature.WithKey { + progressTracker.currentStep = SIGNING + return myKeyPair.signWithECDSA(partialTX.txBits) + } @Suspendable private fun sendSignatures(partialTX: SignedTransaction, ourSignature: DigitalSignature.WithKey, tsaSig: DigitalSignature.LegallyIdentifiable): SignedTransaction { + progressTracker.currentStep = SENDING_SIGS val fullySigned = partialTX + tsaSig + ourSignature logger.trace { "Built finished transaction, sending back to secondary!" } @@ -168,14 +196,24 @@ object TwoPartyTradeProtocol { val typeToBuy: Class, val sessionID: Long) : ProtocolLogic() { + object RECEIVING : ProgressTracker.Step("Waiting for seller trading info") + object VERIFYING : ProgressTracker.Step("Verifying seller assets") + object SIGNING : ProgressTracker.Step("Generating and signing transaction proposal") + object SWAPPING_SIGNATURES : ProgressTracker.Step("Swapping signatures with the seller") + + override val progressTracker = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SWAPPING_SIGNATURES) + @Suspendable override fun call(): SignedTransaction { val tradeRequest = receiveAndValidateTradeRequest() + + progressTracker.currentStep = SIGNING val (ptx, cashSigningPubKeys) = assembleSharedTX(tradeRequest) val stx = signWithOurKeys(cashSigningPubKeys, ptx) + val signatures = swapSignaturesWithSeller(stx, tradeRequest.sessionID) - logger.trace { "Got signatures from seller, verifying ... "} + logger.trace { "Got signatures from seller, verifying ... " } val fullySigned = stx + signatures.timestampAuthoritySig + signatures.sellerSig fullySigned.verify() @@ -185,9 +223,11 @@ object TwoPartyTradeProtocol { @Suspendable private fun receiveAndValidateTradeRequest(): SellerTradeInfo { + progressTracker.currentStep = RECEIVING // Wait for a trade request to come in on our pre-provided session ID. val maybeTradeRequest = receive(TRADE_TOPIC, sessionID) + progressTracker.currentStep = VERIFYING maybeTradeRequest.validate { // What is the seller trying to sell us? val asset = it.assetForSale.state @@ -211,6 +251,7 @@ object TwoPartyTradeProtocol { @Suspendable private fun swapSignaturesWithSeller(stx: SignedTransaction, theirSessionID: Long): SignaturesFromSeller { + progressTracker.currentStep = SWAPPING_SIGNATURES logger.trace { "Sending partially signed transaction to seller" } // TODO: Protect against the seller terminating here and leaving us in the lurch without the final tx. diff --git a/src/main/kotlin/core/messaging/StateMachineManager.kt b/src/main/kotlin/core/messaging/StateMachineManager.kt index 19aad1f202..2d7136373d 100644 --- a/src/main/kotlin/core/messaging/StateMachineManager.kt +++ b/src/main/kotlin/core/messaging/StateMachineManager.kt @@ -24,6 +24,7 @@ import core.serialization.THREAD_LOCAL_KRYO import core.serialization.createKryo import core.serialization.deserialize import core.serialization.serialize +import core.utilities.ProgressTracker import core.utilities.trace import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -51,6 +52,8 @@ import javax.annotation.concurrent.ThreadSafe * TODO: Timeouts * TODO: Surfacing of exceptions via an API and/or management UI * TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt + * TODO: Make Kryo (de)serialize markers for heavy objects that are currently in the service hub. This avoids mistakes + * where services are temporarily put on the stack. */ @ThreadSafe class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) { @@ -65,7 +68,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) // class that inserts itself into a ThreadLocal. That then gets caught in fiber serialisation, which we don't // want because it can't get recreated properly. It turns out there's no good workaround for this! All the obvious // approaches fail. Pending resolution of https://github.com/puniverse/quasar/issues/153 we just disable - // checkpointing when unit tests are run inside Gradle. The right fix is probably to make Quasar's + // checkpointing when unit tests are run inside Gradle. The right fix is probably to stop Quasar's // bit-too-clever-for-its-own-good ThreadLocal serialisation trick. It already wasted far more time than it can // ever recover. val checkpointing: Boolean get() = !System.err.javaClass.name.contains("LinePerThreadBufferingOutputStream") @@ -205,6 +208,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) // We're back! Check if the fiber is finished and if so, clean up. if (psm.isTerminated) { + psm.logic.progressTracker?.currentStep = ProgressTracker.DONE _stateMachines.remove(psm.logic) checkpointsMap.remove(prevCheckpointKey) } diff --git a/src/main/kotlin/core/node/TraderDemo.kt b/src/main/kotlin/core/node/TraderDemo.kt index 575da25730..25f2cf8c0a 100644 --- a/src/main/kotlin/core/node/TraderDemo.kt +++ b/src/main/kotlin/core/node/TraderDemo.kt @@ -13,13 +13,16 @@ import com.google.common.net.HostAndPort import contracts.CommercialPaper import contracts.protocols.TwoPartyTradeProtocol import core.* +import core.crypto.DigitalSignature import core.crypto.generateKeyPair import core.messaging.LegallyIdentifiableNode -import core.protocols.ProtocolLogic import core.messaging.SingleMessageRecipient +import core.protocols.ProtocolLogic import core.serialization.deserialize +import core.utilities.ANSIProgressRenderer import core.utilities.BriefLogFormatter import core.utilities.Emoji +import core.utilities.ProgressTracker import joptsimple.OptionParser import java.nio.file.Files import java.nio.file.Path @@ -33,7 +36,6 @@ import kotlin.system.exitProcess // // Please see docs/build/html/running-the-trading-demo.html - fun main(args: Array) { val parser = OptionParser() val networkAddressArg = parser.accepts("network-address").withRequiredArg().required() @@ -60,7 +62,8 @@ fun main(args: Array) { exitProcess(1) } - BriefLogFormatter.initVerbose("platform.trade") + // Suppress the Artemis MQ noise, and activate the demo logging. + BriefLogFormatter.initVerbose("+demo.buyer", "+demo.seller", "-org.apache.activemq") val dir = Paths.get(options.valueOf(dirArg)) val configFile = dir.resolve("config") @@ -90,7 +93,9 @@ fun main(args: Array) { val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId) } if (listening) { - node.smm.add("demo.buyer", TraderDemoProtocolBuyer()).get() // This thread will halt forever here. + val buyer = TraderDemoProtocolBuyer() + ANSIProgressRenderer.progressTracker = buyer.progressTracker + node.smm.add("demo.buyer", buyer).get() // This thread will halt forever here. } else { if (!options.has(fakeTradeWithArg)) { println("Need the --fake-trade-with command line argument") @@ -98,7 +103,9 @@ fun main(args: Array) { } val peerAddr = HostAndPort.fromString(options.valuesOf(fakeTradeWithArg).single()).withDefaultPort(Node.DEFAULT_PORT) val otherSide = ArtemisMessagingService.makeRecipient(peerAddr) - node.smm.add("demo.seller", TraderDemoProtocolSeller(myNetAddr, otherSide)).get() + val seller = TraderDemoProtocolSeller(myNetAddr, otherSide) + ANSIProgressRenderer.progressTracker = seller.progressTracker + node.smm.add("demo.seller", seller).get() node.stop() } } @@ -106,6 +113,12 @@ fun main(args: Array) { // We create a couple of ad-hoc test protocols that wrap the two party trade protocol, to give us the demo logic. class TraderDemoProtocolBuyer() : ProtocolLogic() { + companion object { + object WAITING_FOR_SELLER_TO_CONNECT : ProgressTracker.Step("Waiting for seller to connect to us") + object STARTING_BUY : ProgressTracker.Step("Seller connected, purchasing commercial paper asset") + } + override val progressTracker = ProgressTracker(WAITING_FOR_SELLER_TO_CONNECT, STARTING_BUY) + @Suspendable override fun call() { // Give us some cash. Note that as nodes do not currently track forward pointers, we can spend the same cash over @@ -119,15 +132,11 @@ class TraderDemoProtocolBuyer() : ProtocolLogic() { // // As the seller initiates the DVP/two-party trade protocol, here, we will be the buyer. try { - println() - println("Waiting for a seller to connect to us!") - + progressTracker.currentStep = WAITING_FOR_SELLER_TO_CONNECT val hostname = receive("test.junktrade", 0).validate { it.withDefaultPort(Node.DEFAULT_PORT) } val newPartnerAddr = ArtemisMessagingService.makeRecipient(hostname) val sessionID = random63BitValue() - println() - println("Got a new junk trade request from $newPartnerAddr, sending back a fresh session ID and starting buy protocol") - println() + progressTracker.currentStep = STARTING_BUY send("test.junktrade", newPartnerAddr, 0, sessionID) val tsa = serviceHub.networkMapService.timestampingNodes[0] @@ -135,52 +144,58 @@ class TraderDemoProtocolBuyer() : ProtocolLogic() { CommercialPaper.State::class.java, sessionID) val tradeTX: SignedTransaction = subProtocol(buyer) - println() - println("Purchase complete - we are a happy customer! Final transaction is:") - println() - println(Emoji.renderIfSupported(tradeTX.tx)) - println() - println("Waiting for another seller to connect. Or press Ctrl-C to shut me down.") + logger.info("Purchase complete - we are a happy customer! Final transaction is: " + + "\n\n${Emoji.renderIfSupported(tradeTX.tx)}") } catch(e: Exception) { - println() - println("Something went wrong whilst trading!") - println() - e.printStackTrace() + logger.error("Something went wrong whilst trading!", e) } } } } class TraderDemoProtocolSeller(val myAddress: HostAndPort, - val otherSide: SingleMessageRecipient) : ProtocolLogic() { + val otherSide: SingleMessageRecipient, + override val progressTracker: ProgressTracker = TraderDemoProtocolSeller.tracker()) : ProtocolLogic() { + companion object { + object ANNOUNCING : ProgressTracker.Step("Announcing to the buyer node") + object SELF_ISSUING : ProgressTracker.Step("Got session ID back, issuing and timestamping some commercial paper") + object TRADING : ProgressTracker.Step("Starting the trade protocol") + + // We vend a progress tracker that already knows there's going to be a TwoPartyTradingProtocol involved at some + // point: by setting up the tracker in advance, the user can see what's coming in more detail, instead of being + // surprised when it appears as a new set of tasks below the current one. + fun tracker() = ProgressTracker(ANNOUNCING, SELF_ISSUING, TRADING).apply { + childrenFor[TRADING] = TwoPartyTradeProtocol.Seller.tracker() + } + } + @Suspendable override fun call() { - println() - println("Announcing ourselves to the buyer node!") - println() + progressTracker.currentStep = ANNOUNCING val sessionID = sendAndReceive("test.junktrade", otherSide, 0, 0, myAddress).validate { it } - println() - println("Got session ID back, issuing and timestamping some commercial paper") + progressTracker.currentStep = SELF_ISSUING val tsa = serviceHub.networkMapService.timestampingNodes[0] val cpOwnerKey = serviceHub.keyManagementService.freshKey() val commercialPaper = makeFakeCommercialPaper(cpOwnerKey.public, tsa) - println() - println("Timestamped my commercial paper issuance, starting the trade protocol.") + progressTracker.currentStep = TRADING - val seller = TwoPartyTradeProtocol.Seller(otherSide, tsa, commercialPaper, 1000.DOLLARS, cpOwnerKey, sessionID) + val seller = object : TwoPartyTradeProtocol.Seller(otherSide, tsa, commercialPaper, 1000.DOLLARS, + cpOwnerKey, sessionID, progressTracker.childrenFor[TRADING]!!) { + override fun signWithOurKey(partialTX: SignedTransaction): DigitalSignature.WithKey { + val s = super.signWithOurKey(partialTX) + // Fake delay to make it look like we're doing something more intensive than we really are, to show + // the progress tracking framework. + Thread.sleep(2000) + return s + } + } val tradeTX: SignedTransaction = subProtocol(seller) - println() - println("Sale completed - we have a happy customer!") - println() - println("Final transaction is") - println() - println(Emoji.renderIfSupported(tradeTX.tx)) - println() + logger.info("Sale completed - we have a happy customer!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(tradeTX.tx)}") } @Suspendable diff --git a/src/main/kotlin/core/protocols/ProtocolLogic.kt b/src/main/kotlin/core/protocols/ProtocolLogic.kt index 8c9b031e47..0d9bbc78a0 100644 --- a/src/main/kotlin/core/protocols/ProtocolLogic.kt +++ b/src/main/kotlin/core/protocols/ProtocolLogic.kt @@ -11,6 +11,7 @@ package core.protocols import co.paralleluniverse.fibers.Suspendable import core.ServiceHub import core.messaging.MessageRecipients +import core.utilities.ProgressTracker import core.utilities.UntrustworthyData import org.slf4j.Logger @@ -37,6 +38,7 @@ abstract class ProtocolLogic { /** This is where you should log things to. */ val logger: Logger get() = psm.logger + /** Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts */ val serviceHub: ServiceHub get() = psm.serviceHub @@ -58,9 +60,32 @@ abstract class ProtocolLogic { */ @Suspendable fun subProtocol(subLogic: ProtocolLogic): R { subLogic.psm = psm - return subLogic.call() + maybeWireUpProgressTracking(subLogic) + val r = subLogic.call() + // It's easy to forget this when writing protocols so we just step it to the DONE state when it completes. + subLogic.progressTracker?.currentStep = ProgressTracker.DONE + return r } + private fun maybeWireUpProgressTracking(subLogic: ProtocolLogic<*>) { + val ours = progressTracker + val theirs = subLogic.progressTracker + if (ours != null && theirs != null) + ours.childrenFor[ours.currentStep] = theirs + } + + /** + * Override this to provide a [ProgressTracker]. If one is provided and stepped, the framework will do something + * helpful with the progress reports. If this protocol is invoked as a sub-protocol of another, then the + * tracker will be made a child of the current step in the parent. If it's null, this protocol doesn't track + * progress. + * + * Note that this has to return a tracker before the protocol is invoked. You can't change your mind half way + * through. + */ + open val progressTracker: ProgressTracker? = null + + /** This is where you fill out your business logic. */ @Suspendable abstract fun call(): T } \ No newline at end of file diff --git a/src/main/kotlin/core/utilities/ANSIProgressRenderer.kt b/src/main/kotlin/core/utilities/ANSIProgressRenderer.kt new file mode 100644 index 0000000000..e493f91f76 --- /dev/null +++ b/src/main/kotlin/core/utilities/ANSIProgressRenderer.kt @@ -0,0 +1,157 @@ +/* + * Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members + * pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms + * set forth therein. + * + * All other rights reserved. + */ + +package core.utilities + +import org.fusesource.jansi.Ansi +import org.fusesource.jansi.AnsiConsole +import org.fusesource.jansi.AnsiOutputStream +import rx.Subscription +import java.util.logging.ConsoleHandler +import java.util.logging.Formatter +import java.util.logging.LogRecord +import java.util.logging.Logger + +/** + * Knows how to render a [ProgressTracker] to the terminal using coloured, emoji-fied output. Useful when writing small + * command line tools, demos, tests etc. Just set the [progressTracker] field and it will go ahead and start drawing + * if the terminal supports it. Otherwise it just prints out the name of the step whenever it changes. + * + * TODO: Thread safety + */ +object ANSIProgressRenderer { + private var installedYet = false + private var subscription: Subscription? = null + + private class LineBumpingConsoleHandler : ConsoleHandler() { + override fun getFormatter(): Formatter = BriefLogFormatter() + + override fun publish(r: LogRecord?) { + if (progressTracker != null) { + val ansi = Ansi.ansi() + repeat(prevLinesDrawn) { ansi.eraseLine().cursorUp(1).eraseLine() } + System.out.print(ansi) + System.out.flush() + } + + super.publish(r) + + if (progressTracker != null) + draw(false) + } + } + + private var usingANSI = false + private var loggerRef: Logger? = null + + var progressTracker: ProgressTracker? = null + set(value) { + subscription?.unsubscribe() + + field = value + if (!installedYet) { + AnsiConsole.systemInstall() + + // This line looks weird as hell because the magic code to decide if we really have a TTY or not isn't + // actually exposed anywhere as a function (weak sauce). So we have to rely on our knowledge of jansi + // implementation details. + usingANSI = AnsiConsole.wrapOutputStream(System.out) !is AnsiOutputStream + + if (usingANSI) { + loggerRef = Logger.getLogger("").apply { + val current = handlers[0] + removeHandler(current) + val new = LineBumpingConsoleHandler() + new.level = current.level + addHandler(new) + } + } + + installedYet = true + } + + subscription = value?.changes?.subscribe { draw(true) } + } + + // prevMessagePrinted is just for non-ANSI mode. + private var prevMessagePrinted: String? = null + // prevLinesDraw is just for ANSI mode. + private var prevLinesDrawn = 0 + + private fun draw(moveUp: Boolean) { + val pt = progressTracker!! + + if (!usingANSI) { + val currentMessage = pt.currentStepRecursive.label + if (currentMessage != prevMessagePrinted) { + println(currentMessage) + prevMessagePrinted = currentMessage + } + return + } + + // Handle the case where the number of steps in a progress tracker is changed during execution. + val ansi = Ansi.ansi() + if (prevLinesDrawn > 0 && moveUp) + ansi.cursorUp(prevLinesDrawn) + + // Put a blank line between any logging and us. + ansi.eraseLine() + ansi.newline() + val newLinesDrawn = 1 + pt.renderLevel(ansi, 0, pt.allSteps) + if (newLinesDrawn < prevLinesDrawn) { + // If some steps were removed from the progress tracker, we don't want to leave junk hanging around below. + val linesToClear = prevLinesDrawn - newLinesDrawn + repeat(linesToClear) { + ansi.eraseLine() + ansi.newline() + } + ansi.cursorUp(linesToClear) + } + prevLinesDrawn = newLinesDrawn + + // Need to force a flush here in order to ensure stderr/stdout sync up properly. + System.out.print(ansi) + System.out.flush() + } + + // Returns number of lines rendered. + private fun ProgressTracker.renderLevel(ansi: Ansi, indent: Int, allSteps: List>): Int { + with(ansi) { + var lines = 0 + for ((index, step) in steps.withIndex()) { + // Don't bother rendering these special steps in some cases. + if (step == ProgressTracker.UNSTARTED) continue + if (indent > 0 && step == ProgressTracker.DONE) continue + + val marker = when { + index < stepIndex -> Emoji.CODE_GREEN_TICK + " " + index == stepIndex && step == ProgressTracker.DONE -> Emoji.CODE_GREEN_TICK + " " + index == stepIndex -> Emoji.CODE_RIGHT_ARROW + " " + else -> " " + } + a(" ".repeat(indent)) + a(marker) + + val active = index == stepIndex && step != ProgressTracker.DONE + if (active) bold() + a(step.label) + if (active) boldOff() + + eraseLine(Ansi.Erase.FORWARD) + newline() + lines++ + + val child = childrenFor[step] + if (child != null) + lines += child.renderLevel(ansi, indent + 1, allSteps) + } + return lines + } + } +} \ No newline at end of file