Integrate a simple progress tracking system into the protocol framework.

The progress tracker API lets you model a tree of steps, along the same structure as protocols and subprotocols. Each step has an (optionally changing) label, and thus progress trackers can be arranged in a tree. Updates to the progress at each level flow up the tree via an RxJava observable (I guess we will use this more in future).

A simple console renderer is provided that uses ANSI escapes and Emoji to show animated progress through a protocol.

The trader demo is enhanced to use this framework, when run outside of Gradle.
This commit is contained in:
Mike Hearn 2016-02-23 18:17:46 +01:00
parent ab2b447a5e
commit c3f86f6557
10 changed files with 621 additions and 53 deletions

View File

@ -55,7 +55,9 @@ dependencies {
compile "com.google.code.findbugs:jsr305:3.0.1"
compile "org.slf4j:slf4j-jdk14:1.7.13"
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
compile("org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version") {
force = true
}
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
@ -63,9 +65,11 @@ dependencies {
force = true // Conflict between Quasar and Artemis
}
// JOpt: for command line flags.
compile "net.sf.jopt-simple:jopt-simple:4.9"
compile("com.esotericsoftware:kryo:3.0.3")
// Kryo: object graph serialization.
compile "com.esotericsoftware:kryo:3.0.3"
compile "de.javakaffee:kryo-serializers:0.37"
// Quasar: for the bytecode rewriting for state machines.
@ -78,7 +82,10 @@ dependencies {
}
compile "org.apache.activemq:artemis-core-client:${artemis_version}"
// For visualisation
// JAnsi: for drawing things to the terminal in nicely coloured ways.
compile "org.fusesource.jansi:jansi:1.11"
// GraphStream: For visualisation
compile "org.graphstream:gs-core:1.3"
compile "org.graphstream:gs-ui:1.3"
compile("com.intellij:forms_rt:7.0.3") {
@ -91,6 +98,10 @@ dependencies {
// slower but you get a much better error message if you forget to annotate a method with @Suspendable that needs it.
//
// In Java 9 (hopefully) the requirement to annotate methods as @Suspendable will go away.
applicationDefaultJvmArgs = ["-javaagent:${configurations.quasar.singleFile}"]
mainClassName = 'core.node.TraderDemoKt'
tasks.withType(Test) {
jvmArgs "-javaagent:${configurations.quasar.singleFile}"
jvmArgs "-Dco.paralleluniverse.fibers.verifyInstrumentation"
@ -100,7 +111,6 @@ tasks.withType(JavaExec) {
jvmArgs "-Dco.paralleluniverse.fibers.verifyInstrumentation"
}
mainClassName = 'core.node.TraderDemoKt'
task runDemoBuyer(type: JavaExec, dependsOn: ':classes') {
classpath = sourceSets.main.runtimeClasspath
@ -111,6 +121,5 @@ task runDemoBuyer(type: JavaExec, dependsOn: ':classes') {
task runDemoSeller(type: JavaExec, dependsOn: ':classes') {
classpath = sourceSets.main.runtimeClasspath
main = 'core.node.TraderDemoKt'
args = ['--dir=seller', '--fake-trade-with=localhost', '--network-address=localhost:31338',
'--timestamper-identity-file=buyer/identity-public', '--timestamper-address=localhost']
args = ['--dir=seller', '--fake-trade-with=localhost', '--network-address=localhost:31338', '--timestamper-identity-file=buyer/identity-public', '--timestamper-address=localhost']
}

View File

@ -26,13 +26,20 @@ repositories {
}
dependencies {
compile 'junit:junit:4.12'
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
// SLF4J: Logging framework.
compile "org.slf4j:slf4j-jdk14:1.7.13"
compile("com.google.guava:guava:19.0")
// Guava: Google utilities library.
compile "com.google.guava:guava:19.0"
// RxJava: observable streams of events.
compile "io.reactivex:rxkotlin:0.40.1"
// Quasar: for the bytecode rewriting for state machines.
compile("co.paralleluniverse:quasar-core:${quasar_version}:jdk8")

View File

@ -0,0 +1,170 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.utilities
import core.TransientProperty
import rx.Observable
import rx.Subscription
import rx.lang.kotlin.BehaviourSubject
import rx.subjects.BehaviorSubject
import java.util.*
// TODO: Expose the concept of errors.
// TODO: It'd be helpful if this class was at least partly thread safe.
/**
* A progress tracker helps surface information about the progress of an operation to a user interface or API of some
* kind. It lets you define a set of _steps_ that represent an operation. A step is represented by an object (typically
* a singleton).
*
* Steps may logically be children of other steps, which models the case where a large top level operation involves
* sub-operations which may also have a notion of progress. If a step has children, then the tracker will report the
* steps children as the "next step" after the parent. In other words, a parent step is considered to involve actual
* reportable work and is a thing. If the parent step simply groups other steps, then you'll have to step over it
* manually.
*
* Each step has a label. It is assumed by default that the label does not change. If you want a label to change, then
* you can emit a [ProgressTracker.Change.Rendering] object on the [ProgressTracker.Step.changes] observable stream
* after it changes. That object will propagate through to the top level trackers [changes] stream, which renderers can
* subscribe to in order to learn about progress.
*
* An operation can move both forwards and backwards through steps, thus, a [ProgressTracker] can represent operations
* that include loops.
*
* A progress tracker is *not* thread safe. You may move events from the thread making progress to another thread by
* using the [Observable] subscribeOn call.
*/
class ProgressTracker(vararg steps: Step) {
sealed class Change {
class Position(val newStep: Step) : Change()
class Rendering(val ofStep: Step) : Change()
class Structural(val parent: Step) : Change()
}
/** The superclass of all step objects. */
open class Step(open val label: String) {
open val changes: Observable<Change> = Observable.empty()
}
/** This class makes it easier to relabel a step on the fly, to provide transient information. */
open class RelabelableStep(currentLabel: String) : Step(currentLabel) {
override val changes = BehaviourSubject<Change>()
var currentLabel: String = currentLabel
set(value) {
field = value
changes.onNext(ProgressTracker.Change.Rendering(this))
}
override val label: String get() = currentLabel
}
object UNSTARTED : Step("Unstarted")
object DONE : Step("Done")
/** The steps in this tracker, same as the steps passed to the constructor but with UNSTARTED and DONE inserted. */
val steps = arrayOf(UNSTARTED, *steps, DONE)
/** The zero-based index of the current step in the [steps] array (i.e. with UNSTARTED and DONE) */
var stepIndex: Int = 0
private set
/**
* Reading returns the value of steps[stepIndex], writing moves the position of the current tracker. Once moved to
* the [DONE] state, this tracker is finished and the current step cannot be moved again.
*/
var currentStep: Step
get() = steps[stepIndex]
set(value) {
if (currentStep != value) {
check(currentStep != DONE) { "Cannot rewind a progress tracker once it reaches the done state" }
val index = steps.indexOf(value)
require(index != -1)
if (index < stepIndex) {
// We are going backwards: unlink and unsubscribe from any child nodes that we're rolling back
// through, in preparation for moving through them again.
for (i in stepIndex downTo index) {
childrenFor.remove(steps[i])
}
}
curChangeSubscription?.unsubscribe()
stepIndex = index
_changes.onNext(Change.Position(steps[index]))
curChangeSubscription = currentStep.changes.subscribe { _changes.onNext(it) }
if (currentStep == DONE) _changes.onCompleted()
}
}
/** Returns the current step, descending into children to find the deepest step we are up to. */
val currentStepRecursive: Step
get() = childrenFor[currentStep]?.currentStepRecursive ?: currentStep
/**
* Writable map that lets you insert child [ProgressTracker]s for particular steps. It's OK to edit this even
* after a progress tracker has been started.
*/
var childrenFor = object : HashMap<Step, ProgressTracker>() {
override fun put(key: Step, value: ProgressTracker): ProgressTracker? {
val r = super.put(key, value)
childSubscriptions[value] = value.changes.subscribe({ _changes.onNext(it) }, { _changes.onError(it) })
_changes.onNext(Change.Structural(key))
return r
}
override fun remove(key: Step): ProgressTracker? {
if (containsKey(key))
childSubscriptions[this[key]]?.let { it.unsubscribe(); childSubscriptions.remove(this[key]) }
_changes.onNext(Change.Structural(key))
return super.remove(key)
}
}
private val childSubscriptions = HashMap<ProgressTracker, Subscription>()
private fun _allSteps(level: Int = 0): List<Pair<Int, Step>> {
val result = ArrayList<Pair<Int, Step>>()
for (step in steps) {
if (step == UNSTARTED) continue
if (level > 0 && step == DONE) continue
result += Pair(level, step)
childrenFor[step]?.let { result += it._allSteps(level + 1) }
}
return result
}
/**
* A list of all steps in this ProgressTracker and the children, with the indent level provided starting at zero.
* Note that UNSTARTED is never counted, and DONE is only counted at the calling level.
*/
val allSteps: List<Pair<Int, Step>> get() = _allSteps()
private var curChangeSubscription: Subscription? = null
/**
* Iterates the progress tracker. If the current step has a child, the child is iterated instead (recursively).
* Returns the latest step at the bottom of the step tree.
*/
fun nextStep(): Step {
currentStep = steps[steps.indexOf(currentStep) + 1]
return currentStep
}
// These two fields won't be serialized.
private val _changes by TransientProperty { BehaviorSubject.create<Change>(Change.Position(UNSTARTED)) }
/**
* An observable stream of changes: includes child steps, resets and any changes emitted by individual steps (e.g.
* if a step changed its label or rendering).
*/
val changes: Observable<Change> get() = _changes
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.utilities
import org.junit.Before
import org.junit.Test
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFails
class ProgressTrackerTest {
object SimpleSteps {
object ONE : ProgressTracker.Step("one")
object TWO : ProgressTracker.Step("two")
object THREE : ProgressTracker.Step("three")
object FOUR : ProgressTracker.Step("four")
fun tracker() = ProgressTracker(ONE, TWO, THREE, FOUR)
}
object ChildSteps {
object AYY : ProgressTracker.Step("ayy")
object BEE : ProgressTracker.Step("bee")
object SEA : ProgressTracker.Step("sea")
fun tracker() = ProgressTracker(AYY, BEE, SEA)
}
lateinit var pt: ProgressTracker
@Before
fun before() {
pt = SimpleSteps.tracker()
}
@Test
fun `check basic steps`() {
assertEquals(ProgressTracker.UNSTARTED, pt.currentStep)
assertEquals(0, pt.stepIndex)
var stepNotification: ProgressTracker.Step? = null
pt.changes.subscribe { stepNotification = (it as? ProgressTracker.Change.Position)?.newStep }
assertEquals(SimpleSteps.ONE, pt.nextStep())
assertEquals(1, pt.stepIndex)
assertEquals(SimpleSteps.ONE, stepNotification)
assertEquals(SimpleSteps.TWO, pt.nextStep())
assertEquals(SimpleSteps.THREE, pt.nextStep())
assertEquals(SimpleSteps.FOUR, pt.nextStep())
assertEquals(ProgressTracker.DONE, pt.nextStep())
}
@Test
fun `cannot go beyond end`() {
pt.currentStep = SimpleSteps.FOUR
assertFails { pt.nextStep() }
}
@Test
fun `nested children are stepped correctly`() {
val pt2 = ChildSteps.tracker()
val stepNotification = LinkedList<ProgressTracker.Change>()
pt.changes.subscribe {
stepNotification += it
}
pt.currentStep = SimpleSteps.ONE
pt.childrenFor[SimpleSteps.TWO] = pt2
pt.nextStep()
assertEquals(ChildSteps.AYY, pt.nextStep())
assertEquals(ChildSteps.AYY, (stepNotification.pollFirst() as ProgressTracker.Change.Position).newStep)
assertEquals(SimpleSteps.TWO, (stepNotification.pollFirst() as ProgressTracker.Change.Structural).parent)
assertEquals(ChildSteps.BEE, pt2.nextStep())
}
@Test
fun `can be rewound`() {
val pt2 = ChildSteps.tracker()
pt.childrenFor[SimpleSteps.TWO] = pt2
repeat(4) { pt.nextStep() }
pt.currentStep = SimpleSteps.ONE
assertEquals(SimpleSteps.TWO, pt.nextStep())
}
}

View File

@ -511,3 +511,52 @@ the fact that it takes minimal resources and can survive node restarts.
this problem doesn't occur. It's also restored for you when a protocol state machine is restored after a node
restart.
Progress tracking
-----------------
Not shown in the code snippets above is the usage of the ``ProgressTracker`` API. Progress tracking exports information
from a protocol about where it's got up to in such a way that observers can render it in a useful manner to humans who
may need to be informed. It may be rendered via an API, in a GUI, onto a terminal window, etc.
A ``ProgressTracker`` is constructed with a series of ``Step`` objects, where each step is an object representing a
stage in a piece of work. It is therefore typical to use singletons that subclass ``Step``, which may be defined easily
in one line when using Kotlin. Typical steps might be "Waiting for response from peer", "Waiting for signature to be
approved", "Downloading and verifying data" etc.
Each step exposes a label. By default labels are fixed, but by subclassing ``RelabelableStep``
you can make a step that can update its label on the fly. That's useful for steps that want to expose non-structured
progress information like the current file being downloaded. By defining your own step types, you can export progress
in a way that's both human readable and machine readable.
Progress trackers are hierarchical. Each step can be the parent for another tracker. By altering the
``ProgressTracker.childrenFor[step] = tracker`` map, a tree of steps can be created. It's allowed to alter the hierarchy
at runtime, on the fly, and the progress renderers will adapt to that properly. This can be helpful when you don't
fully know ahead of time what steps will be required. If you _do_ know what is required, configuring as much of the
hierarchy ahead of time is a good idea, as that will help the users see what is coming up.
Every tracker has not only the steps given to it at construction time, but also the singleton
``ProgressTracker.UNSTARTED`` step and the ``ProgressTracker.DONE`` step. Once a tracker has become ``DONE`` its
position may not be modified again (because e.g. the UI may have been removed/cleaned up), but until that point, the
position can be set to any arbitrary set both forwards and backwards. Steps may be skipped, repeated, etc. Note that
rolling the current step backwards will delete any progress trackers that are children of the steps being reversed, on
the assumption that those subtasks will have to be repeated.
Trackers provide an `Rx observable <http://reactivex.io/>`_ which streams changes to the hierarchy. The top level
observable exposes all the events generated by its children as well. The changes are represented by objects indicating
whether the change is one of position (i.e. progress), structure (i.e. new subtasks being added/removed) or some other
aspect of rendering (i.e. a step has changed in some way and is requesting a re-render).
The protocol framework is somewhat integrated with this API. Each ``ProtocolLogic`` may optionally provide a tracker by
overriding the ``protocolTracker`` property (``getProtocolTracker`` method in Java). If the
``ProtocolLogic.subProtocol`` method is used, then the tracker of the sub-protocol will be made a child of the current
step in the parent protocol automatically, if the parent is using tracking in the first place. The framework will also
automatically set the current step to ``DONE`` for you, when the protocol is finished.
Because a protocol may sometimes wish to configure the children in its progress hierarchy _before_ the sub-protocol
is constructed, for sub-protocols that always follow the same outline regardless of their parameters it's conventional
to define a companion object/static method (for Kotlin/Java respectively) that constructs a tracker, and then allow
the sub-protocol to have the tracker it will use be passed in as a parameter. This allows all trackers to be built
and linked ahead of time.
In future, the progress tracking framework will become a vital part of how exceptions, errors, and other faults are
surfaced to human operators for investigation and resolution.

View File

@ -16,10 +16,11 @@ import core.*
import core.crypto.DigitalSignature
import core.crypto.signWithECDSA
import core.messaging.LegallyIdentifiableNode
import core.protocols.ProtocolLogic
import core.messaging.SingleMessageRecipient
import core.messaging.StateMachineManager
import core.node.TimestampingProtocol
import core.protocols.ProtocolLogic
import core.utilities.ProgressTracker
import core.utilities.trace
import java.security.KeyPair
import java.security.PublicKey
@ -86,22 +87,40 @@ object TwoPartyTradeProtocol {
val assetToSell: StateAndRef<OwnableState>,
val price: Amount,
val myKeyPair: KeyPair,
val buyerSessionID: Long) : ProtocolLogic<SignedTransaction>() {
val buyerSessionID: Long,
override val progressTracker: ProgressTracker = Seller.tracker()) : ProtocolLogic<SignedTransaction>() {
companion object {
object AWAITING_PROPOSAL : ProgressTracker.Step("Awaiting transaction proposal")
object VERIFYING : ProgressTracker.Step("Verifying transaction proposal")
object SIGNING : ProgressTracker.Step("Signing transaction")
object TIMESTAMPING : ProgressTracker.Step("Timestamping transaction")
object SENDING_SIGS : ProgressTracker.Step("Sending transaction signatures to buyer")
fun tracker() = ProgressTracker(AWAITING_PROPOSAL, VERIFYING, SIGNING, TIMESTAMPING, SENDING_SIGS)
}
@Suspendable
override fun call(): SignedTransaction {
val partialTX: SignedTransaction = receiveAndCheckProposedTransaction()
// These two steps could be done in parallel, in theory. Our framework doesn't support that yet though.
val ourSignature = signWithOurKey(partialTX)
val tsaSig = subProtocol(TimestampingProtocol(timestampingAuthority, partialTX.txBits))
val tsaSig = timestamp(partialTX)
val signedTransaction = sendSignatures(partialTX, ourSignature, tsaSig)
return sendSignatures(partialTX, ourSignature, tsaSig)
}
return signedTransaction
@Suspendable
private fun timestamp(partialTX: SignedTransaction): DigitalSignature.LegallyIdentifiable {
progressTracker.currentStep = TIMESTAMPING
return subProtocol(TimestampingProtocol(timestampingAuthority, partialTX.txBits))
}
@Suspendable
private fun receiveAndCheckProposedTransaction(): SignedTransaction {
progressTracker.currentStep = AWAITING_PROPOSAL
val sessionID = random63BitValue()
// Make the first message we'll send to kick off the protocol.
@ -109,7 +128,11 @@ object TwoPartyTradeProtocol {
val maybeSTX = sendAndReceive<SignedTransaction>(TRADE_TOPIC, otherSide, buyerSessionID, sessionID, hello)
progressTracker.currentStep = VERIFYING
maybeSTX.validate {
progressTracker.nextStep()
// Check that the tx proposed by the buyer is valid.
val missingSigs = it.verify(throwIfSignaturesAreMissing = false)
if (missingSigs != setOf(myKeyPair.public, timestampingAuthority.identity.owningKey))
@ -148,11 +171,16 @@ object TwoPartyTradeProtocol {
subProtocol(ResolveTransactionsProtocol(dependencyTxIDs, otherSide))
}
private fun signWithOurKey(partialTX: SignedTransaction) = myKeyPair.signWithECDSA(partialTX.txBits)
@Suspendable
open fun signWithOurKey(partialTX: SignedTransaction): DigitalSignature.WithKey {
progressTracker.currentStep = SIGNING
return myKeyPair.signWithECDSA(partialTX.txBits)
}
@Suspendable
private fun sendSignatures(partialTX: SignedTransaction, ourSignature: DigitalSignature.WithKey,
tsaSig: DigitalSignature.LegallyIdentifiable): SignedTransaction {
progressTracker.currentStep = SENDING_SIGS
val fullySigned = partialTX + tsaSig + ourSignature
logger.trace { "Built finished transaction, sending back to secondary!" }
@ -168,14 +196,24 @@ object TwoPartyTradeProtocol {
val typeToBuy: Class<out OwnableState>,
val sessionID: Long) : ProtocolLogic<SignedTransaction>() {
object RECEIVING : ProgressTracker.Step("Waiting for seller trading info")
object VERIFYING : ProgressTracker.Step("Verifying seller assets")
object SIGNING : ProgressTracker.Step("Generating and signing transaction proposal")
object SWAPPING_SIGNATURES : ProgressTracker.Step("Swapping signatures with the seller")
override val progressTracker = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SWAPPING_SIGNATURES)
@Suspendable
override fun call(): SignedTransaction {
val tradeRequest = receiveAndValidateTradeRequest()
progressTracker.currentStep = SIGNING
val (ptx, cashSigningPubKeys) = assembleSharedTX(tradeRequest)
val stx = signWithOurKeys(cashSigningPubKeys, ptx)
val signatures = swapSignaturesWithSeller(stx, tradeRequest.sessionID)
logger.trace { "Got signatures from seller, verifying ... "}
logger.trace { "Got signatures from seller, verifying ... " }
val fullySigned = stx + signatures.timestampAuthoritySig + signatures.sellerSig
fullySigned.verify()
@ -185,9 +223,11 @@ object TwoPartyTradeProtocol {
@Suspendable
private fun receiveAndValidateTradeRequest(): SellerTradeInfo {
progressTracker.currentStep = RECEIVING
// Wait for a trade request to come in on our pre-provided session ID.
val maybeTradeRequest = receive<SellerTradeInfo>(TRADE_TOPIC, sessionID)
progressTracker.currentStep = VERIFYING
maybeTradeRequest.validate {
// What is the seller trying to sell us?
val asset = it.assetForSale.state
@ -211,6 +251,7 @@ object TwoPartyTradeProtocol {
@Suspendable
private fun swapSignaturesWithSeller(stx: SignedTransaction, theirSessionID: Long): SignaturesFromSeller {
progressTracker.currentStep = SWAPPING_SIGNATURES
logger.trace { "Sending partially signed transaction to seller" }
// TODO: Protect against the seller terminating here and leaving us in the lurch without the final tx.

View File

@ -24,6 +24,7 @@ import core.serialization.THREAD_LOCAL_KRYO
import core.serialization.createKryo
import core.serialization.deserialize
import core.serialization.serialize
import core.utilities.ProgressTracker
import core.utilities.trace
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@ -51,6 +52,8 @@ import javax.annotation.concurrent.ThreadSafe
* TODO: Timeouts
* TODO: Surfacing of exceptions via an API and/or management UI
* TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt
* TODO: Make Kryo (de)serialize markers for heavy objects that are currently in the service hub. This avoids mistakes
* where services are temporarily put on the stack.
*/
@ThreadSafe
class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) {
@ -65,7 +68,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
// class that inserts itself into a ThreadLocal. That then gets caught in fiber serialisation, which we don't
// want because it can't get recreated properly. It turns out there's no good workaround for this! All the obvious
// approaches fail. Pending resolution of https://github.com/puniverse/quasar/issues/153 we just disable
// checkpointing when unit tests are run inside Gradle. The right fix is probably to make Quasar's
// checkpointing when unit tests are run inside Gradle. The right fix is probably to stop Quasar's
// bit-too-clever-for-its-own-good ThreadLocal serialisation trick. It already wasted far more time than it can
// ever recover.
val checkpointing: Boolean get() = !System.err.javaClass.name.contains("LinePerThreadBufferingOutputStream")
@ -205,6 +208,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
// We're back! Check if the fiber is finished and if so, clean up.
if (psm.isTerminated) {
psm.logic.progressTracker?.currentStep = ProgressTracker.DONE
_stateMachines.remove(psm.logic)
checkpointsMap.remove(prevCheckpointKey)
}

View File

@ -13,13 +13,16 @@ import com.google.common.net.HostAndPort
import contracts.CommercialPaper
import contracts.protocols.TwoPartyTradeProtocol
import core.*
import core.crypto.DigitalSignature
import core.crypto.generateKeyPair
import core.messaging.LegallyIdentifiableNode
import core.protocols.ProtocolLogic
import core.messaging.SingleMessageRecipient
import core.protocols.ProtocolLogic
import core.serialization.deserialize
import core.utilities.ANSIProgressRenderer
import core.utilities.BriefLogFormatter
import core.utilities.Emoji
import core.utilities.ProgressTracker
import joptsimple.OptionParser
import java.nio.file.Files
import java.nio.file.Path
@ -33,7 +36,6 @@ import kotlin.system.exitProcess
//
// Please see docs/build/html/running-the-trading-demo.html
fun main(args: Array<String>) {
val parser = OptionParser()
val networkAddressArg = parser.accepts("network-address").withRequiredArg().required()
@ -60,7 +62,8 @@ fun main(args: Array<String>) {
exitProcess(1)
}
BriefLogFormatter.initVerbose("platform.trade")
// Suppress the Artemis MQ noise, and activate the demo logging.
BriefLogFormatter.initVerbose("+demo.buyer", "+demo.seller", "-org.apache.activemq")
val dir = Paths.get(options.valueOf(dirArg))
val configFile = dir.resolve("config")
@ -90,7 +93,9 @@ fun main(args: Array<String>) {
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId) }
if (listening) {
node.smm.add("demo.buyer", TraderDemoProtocolBuyer()).get() // This thread will halt forever here.
val buyer = TraderDemoProtocolBuyer()
ANSIProgressRenderer.progressTracker = buyer.progressTracker
node.smm.add("demo.buyer", buyer).get() // This thread will halt forever here.
} else {
if (!options.has(fakeTradeWithArg)) {
println("Need the --fake-trade-with command line argument")
@ -98,7 +103,9 @@ fun main(args: Array<String>) {
}
val peerAddr = HostAndPort.fromString(options.valuesOf(fakeTradeWithArg).single()).withDefaultPort(Node.DEFAULT_PORT)
val otherSide = ArtemisMessagingService.makeRecipient(peerAddr)
node.smm.add("demo.seller", TraderDemoProtocolSeller(myNetAddr, otherSide)).get()
val seller = TraderDemoProtocolSeller(myNetAddr, otherSide)
ANSIProgressRenderer.progressTracker = seller.progressTracker
node.smm.add("demo.seller", seller).get()
node.stop()
}
}
@ -106,6 +113,12 @@ fun main(args: Array<String>) {
// We create a couple of ad-hoc test protocols that wrap the two party trade protocol, to give us the demo logic.
class TraderDemoProtocolBuyer() : ProtocolLogic<Unit>() {
companion object {
object WAITING_FOR_SELLER_TO_CONNECT : ProgressTracker.Step("Waiting for seller to connect to us")
object STARTING_BUY : ProgressTracker.Step("Seller connected, purchasing commercial paper asset")
}
override val progressTracker = ProgressTracker(WAITING_FOR_SELLER_TO_CONNECT, STARTING_BUY)
@Suspendable
override fun call() {
// Give us some cash. Note that as nodes do not currently track forward pointers, we can spend the same cash over
@ -119,15 +132,11 @@ class TraderDemoProtocolBuyer() : ProtocolLogic<Unit>() {
//
// As the seller initiates the DVP/two-party trade protocol, here, we will be the buyer.
try {
println()
println("Waiting for a seller to connect to us!")
progressTracker.currentStep = WAITING_FOR_SELLER_TO_CONNECT
val hostname = receive<HostAndPort>("test.junktrade", 0).validate { it.withDefaultPort(Node.DEFAULT_PORT) }
val newPartnerAddr = ArtemisMessagingService.makeRecipient(hostname)
val sessionID = random63BitValue()
println()
println("Got a new junk trade request from $newPartnerAddr, sending back a fresh session ID and starting buy protocol")
println()
progressTracker.currentStep = STARTING_BUY
send("test.junktrade", newPartnerAddr, 0, sessionID)
val tsa = serviceHub.networkMapService.timestampingNodes[0]
@ -135,52 +144,58 @@ class TraderDemoProtocolBuyer() : ProtocolLogic<Unit>() {
CommercialPaper.State::class.java, sessionID)
val tradeTX: SignedTransaction = subProtocol(buyer)
println()
println("Purchase complete - we are a happy customer! Final transaction is:")
println()
println(Emoji.renderIfSupported(tradeTX.tx))
println()
println("Waiting for another seller to connect. Or press Ctrl-C to shut me down.")
logger.info("Purchase complete - we are a happy customer! Final transaction is: " +
"\n\n${Emoji.renderIfSupported(tradeTX.tx)}")
} catch(e: Exception) {
println()
println("Something went wrong whilst trading!")
println()
e.printStackTrace()
logger.error("Something went wrong whilst trading!", e)
}
}
}
}
class TraderDemoProtocolSeller(val myAddress: HostAndPort,
val otherSide: SingleMessageRecipient) : ProtocolLogic<Unit>() {
val otherSide: SingleMessageRecipient,
override val progressTracker: ProgressTracker = TraderDemoProtocolSeller.tracker()) : ProtocolLogic<Unit>() {
companion object {
object ANNOUNCING : ProgressTracker.Step("Announcing to the buyer node")
object SELF_ISSUING : ProgressTracker.Step("Got session ID back, issuing and timestamping some commercial paper")
object TRADING : ProgressTracker.Step("Starting the trade protocol")
// We vend a progress tracker that already knows there's going to be a TwoPartyTradingProtocol involved at some
// point: by setting up the tracker in advance, the user can see what's coming in more detail, instead of being
// surprised when it appears as a new set of tasks below the current one.
fun tracker() = ProgressTracker(ANNOUNCING, SELF_ISSUING, TRADING).apply {
childrenFor[TRADING] = TwoPartyTradeProtocol.Seller.tracker()
}
}
@Suspendable
override fun call() {
println()
println("Announcing ourselves to the buyer node!")
println()
progressTracker.currentStep = ANNOUNCING
val sessionID = sendAndReceive<Long>("test.junktrade", otherSide, 0, 0, myAddress).validate { it }
println()
println("Got session ID back, issuing and timestamping some commercial paper")
progressTracker.currentStep = SELF_ISSUING
val tsa = serviceHub.networkMapService.timestampingNodes[0]
val cpOwnerKey = serviceHub.keyManagementService.freshKey()
val commercialPaper = makeFakeCommercialPaper(cpOwnerKey.public, tsa)
println()
println("Timestamped my commercial paper issuance, starting the trade protocol.")
progressTracker.currentStep = TRADING
val seller = TwoPartyTradeProtocol.Seller(otherSide, tsa, commercialPaper, 1000.DOLLARS, cpOwnerKey, sessionID)
val seller = object : TwoPartyTradeProtocol.Seller(otherSide, tsa, commercialPaper, 1000.DOLLARS,
cpOwnerKey, sessionID, progressTracker.childrenFor[TRADING]!!) {
override fun signWithOurKey(partialTX: SignedTransaction): DigitalSignature.WithKey {
val s = super.signWithOurKey(partialTX)
// Fake delay to make it look like we're doing something more intensive than we really are, to show
// the progress tracking framework.
Thread.sleep(2000)
return s
}
}
val tradeTX: SignedTransaction = subProtocol(seller)
println()
println("Sale completed - we have a happy customer!")
println()
println("Final transaction is")
println()
println(Emoji.renderIfSupported(tradeTX.tx))
println()
logger.info("Sale completed - we have a happy customer!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(tradeTX.tx)}")
}
@Suspendable

View File

@ -11,6 +11,7 @@ package core.protocols
import co.paralleluniverse.fibers.Suspendable
import core.ServiceHub
import core.messaging.MessageRecipients
import core.utilities.ProgressTracker
import core.utilities.UntrustworthyData
import org.slf4j.Logger
@ -37,6 +38,7 @@ abstract class ProtocolLogic<T> {
/** This is where you should log things to. */
val logger: Logger get() = psm.logger
/** Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts */
val serviceHub: ServiceHub get() = psm.serviceHub
@ -58,9 +60,32 @@ abstract class ProtocolLogic<T> {
*/
@Suspendable fun <R> subProtocol(subLogic: ProtocolLogic<R>): R {
subLogic.psm = psm
return subLogic.call()
maybeWireUpProgressTracking(subLogic)
val r = subLogic.call()
// It's easy to forget this when writing protocols so we just step it to the DONE state when it completes.
subLogic.progressTracker?.currentStep = ProgressTracker.DONE
return r
}
private fun maybeWireUpProgressTracking(subLogic: ProtocolLogic<*>) {
val ours = progressTracker
val theirs = subLogic.progressTracker
if (ours != null && theirs != null)
ours.childrenFor[ours.currentStep] = theirs
}
/**
* Override this to provide a [ProgressTracker]. If one is provided and stepped, the framework will do something
* helpful with the progress reports. If this protocol is invoked as a sub-protocol of another, then the
* tracker will be made a child of the current step in the parent. If it's null, this protocol doesn't track
* progress.
*
* Note that this has to return a tracker before the protocol is invoked. You can't change your mind half way
* through.
*/
open val progressTracker: ProgressTracker? = null
/** This is where you fill out your business logic. */
@Suspendable
abstract fun call(): T
}

View File

@ -0,0 +1,157 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.utilities
import org.fusesource.jansi.Ansi
import org.fusesource.jansi.AnsiConsole
import org.fusesource.jansi.AnsiOutputStream
import rx.Subscription
import java.util.logging.ConsoleHandler
import java.util.logging.Formatter
import java.util.logging.LogRecord
import java.util.logging.Logger
/**
* Knows how to render a [ProgressTracker] to the terminal using coloured, emoji-fied output. Useful when writing small
* command line tools, demos, tests etc. Just set the [progressTracker] field and it will go ahead and start drawing
* if the terminal supports it. Otherwise it just prints out the name of the step whenever it changes.
*
* TODO: Thread safety
*/
object ANSIProgressRenderer {
private var installedYet = false
private var subscription: Subscription? = null
private class LineBumpingConsoleHandler : ConsoleHandler() {
override fun getFormatter(): Formatter = BriefLogFormatter()
override fun publish(r: LogRecord?) {
if (progressTracker != null) {
val ansi = Ansi.ansi()
repeat(prevLinesDrawn) { ansi.eraseLine().cursorUp(1).eraseLine() }
System.out.print(ansi)
System.out.flush()
}
super.publish(r)
if (progressTracker != null)
draw(false)
}
}
private var usingANSI = false
private var loggerRef: Logger? = null
var progressTracker: ProgressTracker? = null
set(value) {
subscription?.unsubscribe()
field = value
if (!installedYet) {
AnsiConsole.systemInstall()
// This line looks weird as hell because the magic code to decide if we really have a TTY or not isn't
// actually exposed anywhere as a function (weak sauce). So we have to rely on our knowledge of jansi
// implementation details.
usingANSI = AnsiConsole.wrapOutputStream(System.out) !is AnsiOutputStream
if (usingANSI) {
loggerRef = Logger.getLogger("").apply {
val current = handlers[0]
removeHandler(current)
val new = LineBumpingConsoleHandler()
new.level = current.level
addHandler(new)
}
}
installedYet = true
}
subscription = value?.changes?.subscribe { draw(true) }
}
// prevMessagePrinted is just for non-ANSI mode.
private var prevMessagePrinted: String? = null
// prevLinesDraw is just for ANSI mode.
private var prevLinesDrawn = 0
private fun draw(moveUp: Boolean) {
val pt = progressTracker!!
if (!usingANSI) {
val currentMessage = pt.currentStepRecursive.label
if (currentMessage != prevMessagePrinted) {
println(currentMessage)
prevMessagePrinted = currentMessage
}
return
}
// Handle the case where the number of steps in a progress tracker is changed during execution.
val ansi = Ansi.ansi()
if (prevLinesDrawn > 0 && moveUp)
ansi.cursorUp(prevLinesDrawn)
// Put a blank line between any logging and us.
ansi.eraseLine()
ansi.newline()
val newLinesDrawn = 1 + pt.renderLevel(ansi, 0, pt.allSteps)
if (newLinesDrawn < prevLinesDrawn) {
// If some steps were removed from the progress tracker, we don't want to leave junk hanging around below.
val linesToClear = prevLinesDrawn - newLinesDrawn
repeat(linesToClear) {
ansi.eraseLine()
ansi.newline()
}
ansi.cursorUp(linesToClear)
}
prevLinesDrawn = newLinesDrawn
// Need to force a flush here in order to ensure stderr/stdout sync up properly.
System.out.print(ansi)
System.out.flush()
}
// Returns number of lines rendered.
private fun ProgressTracker.renderLevel(ansi: Ansi, indent: Int, allSteps: List<Pair<Int, ProgressTracker.Step>>): Int {
with(ansi) {
var lines = 0
for ((index, step) in steps.withIndex()) {
// Don't bother rendering these special steps in some cases.
if (step == ProgressTracker.UNSTARTED) continue
if (indent > 0 && step == ProgressTracker.DONE) continue
val marker = when {
index < stepIndex -> Emoji.CODE_GREEN_TICK + " "
index == stepIndex && step == ProgressTracker.DONE -> Emoji.CODE_GREEN_TICK + " "
index == stepIndex -> Emoji.CODE_RIGHT_ARROW + " "
else -> " "
}
a(" ".repeat(indent))
a(marker)
val active = index == stepIndex && step != ProgressTracker.DONE
if (active) bold()
a(step.label)
if (active) boldOff()
eraseLine(Ansi.Erase.FORWARD)
newline()
lines++
val child = childrenFor[step]
if (child != null)
lines += child.renderLevel(ansi, indent + 1, allSteps)
}
return lines
}
}
}