<configuration default="false" name="Node: buyer" type="JetRunConfigurationType" factoryName="Kotlin">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
<option name="MAIN_CLASS_NAME" value="core.node.TraderDemoKt" />
<option name="VM_PARAMETERS" value="-ea -javaagent:lib/quasar.jar" />
<option name="PROGRAM_PARAMETERS" value="--dir=buyer --service-fake-trades --network-address=localhost" />
<option name="WORKING_DIRECTORY" value="" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
<option name="ALTERNATIVE_JRE_PATH" />
<option name="PASS_PARENT_ENVS" value="true" />
<module name="r3prototyping" />
<envs />
<method />
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Node: seller" type="JetRunConfigurationType" factoryName="Kotlin">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
<option name="MAIN_CLASS_NAME" value="core.node.TraderDemoKt" />
<option name="VM_PARAMETERS" value="-ea -javaagent:lib/quasar.jar" />
<option name="PROGRAM_PARAMETERS" value="--dir=seller --fake-trade-with=localhost --network-address=localhost:31338 --timestamper-identity-file=buyer/identity-public --timestamper-address=localhost" />
<option name="WORKING_DIRECTORY" value="" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
<option name="ALTERNATIVE_JRE_PATH" />
<option name="PASS_PARENT_ENVS" value="true" />
<module name="r3prototyping" />
<envs />
<method />
apply plugin: 'java'
apply plugin: 'kotlin'
//apply plugin: 'org.jetbrains.dokka'
apply plugin: 'application'
// apply plugin: 'org.jetbrains.dokka'
allprojects {
sourceCompatibility = 1.8
ext.kotlin_version = '1.0.0-beta-4584'
ext.quasar_version = '0.7.4'
ext.asm_version = '0.5.3'
ext.artemis_version = '1.2.0'
repositories {
@ -49,28 +52,32 @@ configurations.all() {
dependencies {
testCompile 'junit:junit:4.12'
compile "com.google.code.findbugs:jsr305:3.0.1"
compile "org.slf4j:slf4j-jdk14:1.7.13"
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
compile "com.google.guava:guava:19.0"
compile("com.esotericsoftware:kryo:3.0.3") {
force = true
compile "de.javakaffee:kryo-serializers:0.37"
compile "com.google.code.findbugs:jsr305:3.0.1"
// Logging
compile "org.slf4j:slf4j-jdk14:1.7.13"
compile("com.google.guava:guava:19.0") {
force = true // Conflict between Quasar and Artemis
compile "net.sf.jopt-simple:jopt-simple:4.9"
compile "de.javakaffee:kryo-serializers:0.37"
// Quasar: for the bytecode rewriting for state machines.
compile("co.paralleluniverse:quasar-core:${quasar_version}:jdk8") {
// Quasar currently depends on an old version of Kryo, but it works fine with the newer version, so exclude it
// here so the newer version is picked up.
exclude group: "com.esotericsoftware.kryo", module: "kryo"
quasar("co.paralleluniverse:quasar-core:${quasar_version}:jdk8@jar") {
exclude group: "com.esotericsoftware.kryo", module: "kryo"
// Artemis: for reliable p2p message queues.
compile("org.apache.activemq:artemis-server:${artemis_version}") {
exclude group: "com.google.guava", module: "guava" // Artemis is on Guava 18
compile "org.apache.activemq:artemis-core-client:${artemis_version}"
// For visualisation
compile "org.graphstream:gs-core:1.3"
task runDemoBuyer(type: JavaExec, dependsOn: ':classes') {
classpath = sourceSets.main.runtimeClasspath
main = 'core.node.TraderDemoKt'
args = ['--dir=buyer', '--service-fake-trades', '--network-address=localhost']
task runDemoSeller(type: JavaExec, dependsOn: ':classes') {
classpath = sourceSets.main.runtimeClasspath
main = 'core.node.TraderDemoKt'
args = ['--dir=seller', '--fake-trade-with=localhost', '--network-address=localhost:31338',
'--timestamper-identity-file=buyer/identity-public', '--timestamper-address=localhost']
@ -9,6 +9,9 @@ The current prototype consists of a small amount of code that defines:
These are simplified versions of the real things.
* Unit tests that check the algorithms do what is expected, and which verify the behaviour of the smart contracts.
* API documentation and tutorials (what you're reading)
* A simple standalone node that uses an embedded message queue broker as its P2P messaging layer
* A trading demo that runs the node in either a listening/buying mode, or a connecting/selling mode, and swaps some
fake commercial paper assets for some self-issued IOU cash.
Some things it does not currently include but should gain later are:
Running the trading demo
The repository contains a program that implements a demo of two nodes running the two-party trading protocol, which you
can learn about in :doc:`protocol-state-machines`.
The node has only currently been tested on MacOS X. If you have success on other platforms, please let us know.
To run the demo, firstly edit your /etc/hosts file or Windows equivalent to add two aliases for localhost: alpha and
beta. This is necessary for now because parts of the code use the DNS hostname to identify nodes and thus defining two
nodes both called localhost won't work. We might fix this in future to include the port number everywhere, so making
this easier.
You should now be able to run ``ping alpha`` and ``ping beta`` and not see errors.
Now, open two terminals, and in the first run:::
./gradlew runDemoBuyer
It will create a directory named "alpha" and ask you to edit the configuration file inside. Open up ``alpha/config``
in your favourite text editor and give the node a legal identity of "Alpha Corp, Inc" or whatever else you feel like.
The actual text string is not important. Now run the gradle command again, and it should start up and wait for
a seller to connect.
In the second terminal, run::
./gradlew runDemoSeller
and repeat the process, this time calling the node ... something else.
You should see some log lines scroll past, and within a few seconds the messages "Purchase complete - we are a
happy customer!" and "Sale completed - we have a happy customer!" should be printed.
If it doesn't work, jump on the mailing list and let us know.
@ -218,7 +218,7 @@ Let's define a few commands now:
public static class Issue extends Commands {
public boolean equals(Object obj) {
return obj instanceof Redeem;
return obj instanceof Issue;
@ -4,10 +4,8 @@
.highlight .err { border: 1px solid #FF0000 } /* Error */
.highlight .k { color: #007020; font-weight: bold } /* Keyword */
.highlight .o { color: #666666 } /* Operator */
.highlight .ch { color: #408090; font-style: italic } /* Comment.Hashbang */
.highlight .cm { color: #408090; font-style: italic } /* Comment.Multiline */
.highlight .cp { color: #007020 } /* Comment.Preproc */
.highlight .cpf { color: #408090; font-style: italic } /* Comment.PreprocFile */
.highlight .c1 { color: #408090; font-style: italic } /* Comment.Single */
.highlight .cs { color: #408090; background-color: #fff0f0 } /* Comment.Special */
.highlight .gd { color: #A00000 } /* Generic.Deleted */
* Sphinx JavaScript utilties for the full-text search.
* :copyright: Copyright 2007-2016 by the Sphinx team, see AUTHORS.
* :copyright: Copyright 2007-2015 by the Sphinx team, see AUTHORS.
* :license: BSD, see LICENSE for details.
/* Non-minified version JS is _stemmer.js if file is provided */
* Porter Stemmer
@ -374,7 +373,8 @@ var Search = {
// lookup as search terms in fulltext
results = results.concat(this.performTermsSearch(searchterms, excluded, terms, titleterms));
results = results.concat(this.performTermsSearch(searchterms, excluded, terms, Scorer.term))
.concat(this.performTermsSearch(searchterms, excluded, titleterms, Scorer.title));
// let the scorer override scores with a custom scoring function
if (Scorer.score) {
@ -538,47 +538,23 @@ var Search = {
* search for full-text terms in the index
performTermsSearch : function(searchterms, excluded, terms, titleterms) {
performTermsSearch : function(searchterms, excluded, terms, score) {
var filenames = this._index.filenames;
var titles = this._index.titles;
var i, j, file;
var i, j, file, files;
var fileMap = {};
var scoreMap = {};
var results = [];
// perform the search on the required terms
for (i = 0; i < searchterms.length; i++) {
var word = searchterms[i];
var files = [];
var _o = [
{files: terms[word], score: Scorer.term},
{files: titleterms[word], score: Scorer.title}
// no match but word was a required one
if ($u.every(_o, function(o){return o.files === undefined;})) {
if ((files = terms[word]) === undefined)
if (files.length === undefined) {
files = [files];
// found search word in contents
$u.each(_o, function(o) {
var _files = o.files;
if (_files === undefined)
if (_files.length === undefined)
_files = [_files];
files = files.concat(_files);
// set score for the word in each file to Scorer.term
for (j = 0; j < _files.length; j++) {
file = _files[j];
if (!(file in scoreMap))
scoreMap[file] = {}
scoreMap[file][word] = o.score;
// create the mapping
for (j = 0; j < files.length; j++) {
file = files[j];
@ -600,9 +576,7 @@ var Search = {
// ensure that none of the excluded terms is in the search result
for (i = 0; i < excluded.length; i++) {
if (terms[excluded[i]] == file ||
titleterms[excluded[i]] == file ||
$u.contains(terms[excluded[i]] || [], file) ||
$u.contains(titleterms[excluded[i]] || [], file)) {
$u.contains(terms[excluded[i]] || [], file)) {
valid = false;
@ -610,9 +584,6 @@ var Search = {
// if we have still a valid result we can add it to the result list
if (valid) {
// select one (max) score for the file.
// for better ranking, we should calculate ranking by using words statistics like basic tf-idf...
var score = $u.max($u.map(fileMap[file], function(w){return scoreMap[file][w]}));
results.push([filenames[file], titles[file], '', null, score]);
@ -4,7 +4,7 @@
* sphinx.websupport utilties for all documentation.
* :copyright: Copyright 2007-2016 by the Sphinx team, see AUTHORS.
* :copyright: Copyright 2007-2015 by the Sphinx team, see AUTHORS.
* :license: BSD, see LICENSE for details.
These are simplified versions of the real things.</li>
@ -9,6 +9,9 @@ The current prototype consists of a small amount of code that defines:
These are simplified versions of the real things.
* Unit tests that check the algorithms do what is expected, and which verify the behaviour of the smart contracts.
* API documentation and tutorials (what you're reading)
* A simple standalone node that uses an embedded message queue broker as its P2P messaging layer
* A trading demo that runs the node in either a listening/buying mode, or a connecting/selling mode, and swaps some
fake commercial paper assets for some self-issued IOU cash.
Some things it does not currently include but should gain later are:
Running the trading demo
The repository contains a program that implements a demo of two nodes running the two-party trading protocol, which you
can learn about in :doc:`protocol-state-machines`.
The node has only currently been tested on MacOS X and Ubuntu Linux. If you have success on other platforms, please
let us know.
Now, open two terminals, and in the first run:::
./gradlew runDemoBuyer
It will create a directory named "buyer" and ask you to edit the configuration file inside. Open up ``buyer/config``
in your favourite text editor and give the node a legal identity of "Big Buyer Corp, Inc" or whatever else you feel like.
The actual text string is not important. Now run the gradle command again, and it should start up and wait for
a seller to connect.
In the second terminal, run::
./gradlew runDemoSeller
and repeat the process, this time calling the node ... something else.
You should see some log lines scroll past, and within a few seconds the messages "Purchase complete - we are a
happy customer!" and "Sale completed - we have a happy customer!" should be printed.
If it doesn't work, jump on the mailing list and let us know.
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
* All other rights reserved.
package core.crypto;
public class AddressFormatException extends IllegalArgumentException {
public AddressFormatException() {
public AddressFormatException(String message) {
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
* All other rights reserved.
package core.crypto;
import core.*;
import java.math.*;
import java.util.*;
* Base58 is a way to encode Bitcoin addresses (or arbitrary data) as alphanumeric strings.
* <p>
* Note that this is not the same base58 as used by Flickr, which you may find referenced around the Internet.
* <p>
* Satoshi explains: why base-58 instead of standard base-64 encoding?
* <ul>
* <li>Don't want 0OIl characters that look the same in some fonts and
* could be used to create visually identical looking account numbers.</li>
* <li>A string with non-alphanumeric characters is not as easily accepted as an account number.</li>
* <li>E-mail usually won't line-break if there's no punctuation to break at.</li>
* <li>Doubleclicking selects the whole number as one word if it's all alphanumeric.</li>
* </ul>
* <p>
* However, note that the encoding/decoding runs in O(n²) time, so it is not useful for large data.
* <p>
* The basic idea of the encoding is to treat the data bytes as a large number represented using
* base-256 digits, convert the number to be represented using base-58 digits, preserve the exact
* number of leading zeros (which are otherwise lost during the mathematical operations on the
* numbers), and finally represent the resulting base-58 digits as alphanumeric ASCII characters.
* NB: This class originally comes from the Apache licensed bitcoinj library. The original author of this code is the
* same as the original author of the R3 repository.
public class Base58 {
public static final char[] ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz".toCharArray();
private static final char ENCODED_ZERO = ALPHABET[0];
private static final int[] INDEXES = new int[128];
static {
Arrays.fill(INDEXES, -1);
for (int i = 0; i < ALPHABET.length; i++) {
* Encodes the given bytes as a base58 string (no checksum is appended).
* @param input the bytes to encode
* @return the base58-encoded string
public static String encode(byte[] input) {
if (input.length == 0) {
return "";
// Count leading zeros.
int zeros = 0;
while (zeros < input.length && input[zeros] == 0) {
// Convert base-256 digits to base-58 digits (plus conversion to ASCII characters)
input = Arrays.copyOf(input, input.length); // since we modify it in-place
char[] encoded = new char[input.length * 2]; // upper bound
int outputStart = encoded.length;
for (int inputStart = zeros; inputStart < input.length; ) {
encoded[--outputStart] = ALPHABET[divmod(input, inputStart, 256, 58)];
if (input[inputStart] == 0) {
++inputStart; // optimization - skip leading zeros
// Preserve exactly as many leading encoded zeros in output as there were leading zeros in input.
while (outputStart < encoded.length && encoded[outputStart] == ENCODED_ZERO) {
while (--zeros >= 0) {
encoded[--outputStart] = ENCODED_ZERO;
// Return encoded string (including encoded leading zeros).
return new String(encoded, outputStart, encoded.length - outputStart);
* Decodes the given base58 string into the original data bytes.
* @param input the base58-encoded string to decode
* @return the decoded data bytes
* @throws AddressFormatException if the given string is not a valid base58 string
public static byte[] decode(String input) throws AddressFormatException {
if (input.length() == 0) {
return new byte[0];
// Convert the base58-encoded ASCII chars to a base58 byte sequence (base58 digits).
byte[] input58 = new byte[input.length()];
for (int i = 0; i < input.length(); ++i) {
char c = input.charAt(i);
int digit = c < 128 ? INDEXES[c] : -1;
if (digit < 0) {
throw new AddressFormatException("Illegal character " + c + " at position " + i);
input58[i] = (byte) digit;
// Count leading zeros.
int zeros = 0;
while (zeros < input58.length && input58[zeros] == 0) {
// Convert base-58 digits to base-256 digits.
byte[] decoded = new byte[input.length()];
int outputStart = decoded.length;
for (int inputStart = zeros; inputStart < input58.length; ) {
decoded[--outputStart] = divmod(input58, inputStart, 58, 256);
if (input58[inputStart] == 0) {
++inputStart; // optimization - skip leading zeros
// Ignore extra leading zeroes that were added during the calculation.
while (outputStart < decoded.length && decoded[outputStart] == 0) {
// Return decoded data (including original number of leading zeros).
return Arrays.copyOfRange(decoded, outputStart - zeros, decoded.length);
public static BigInteger decodeToBigInteger(String input) throws AddressFormatException {
return new BigInteger(1, decode(input));
* Decodes the given base58 string into the original data bytes, using the checksum in the
* last 4 bytes of the decoded data to verify that the rest are correct. The checksum is
* removed from the returned data.
* @param input the base58-encoded string to decode (which should include the checksum)
* @throws AddressFormatException if the input is not base 58 or the checksum does not validate.
public static byte[] decodeChecked(String input) throws AddressFormatException {
byte[] decoded = decode(input);
if (decoded.length < 4)
throw new AddressFormatException("Input too short");
byte[] data = Arrays.copyOfRange(decoded, 0, decoded.length - 4);
byte[] checksum = Arrays.copyOfRange(decoded, decoded.length - 4, decoded.length);
byte[] actualChecksum = Arrays.copyOfRange(SecureHash.Companion.sha256Twice(data).getBits(), 0, 4);
if (!Arrays.equals(checksum, actualChecksum))
throw new AddressFormatException("Checksum does not validate");
return data;
* Divides a number, represented as an array of bytes each containing a single digit
* in the specified base, by the given divisor. The given number is modified in-place
* to contain the quotient, and the return value is the remainder.
* @param number the number to divide
* @param firstDigit the index within the array of the first non-zero digit
* (this is used for optimization by skipping the leading zeros)
* @param base the base in which the number's digits are represented (up to 256)
* @param divisor the number to divide by (up to 256)
* @return the remainder of the division operation
private static byte divmod(byte[] number, int firstDigit, int base, int divisor) {
// this is just long division which accounts for the base of the input digits
int remainder = 0;
for (int i = firstDigit; i < number.length; i++) {
int digit = (int) number[i] & 0xFF;
int temp = remainder * base + digit;
number[i] = (byte) (temp / divisor);
remainder = temp % divisor;
return (byte) remainder;
package contracts
import core.*
import core.utilities.Emoji
import java.security.PublicKey
import java.security.SecureRandom
import java.util.*
@ -60,7 +61,7 @@ class Cash : Contract {
override val owner: PublicKey
) : OwnableState {
override val programRef = CASH_PROGRAM_ID
override fun toString() = "Cash($amount at $deposit owned by $owner)"
override fun toString() = "${Emoji.bagOfCash}Cash($amount at $deposit owned by ${owner.toStringShort()})"
override fun withNewOwner(newOwner: PublicKey) = Pair(Commands.Move(), copy(owner = newOwner))
import core.*
import core.utilities.Emoji
import java.security.PublicKey
import java.time.Instant
@ -51,6 +52,7 @@ class CommercialPaper : Contract {
fun withoutOwner() = copy(owner = NullPublicKey)
override fun withNewOwner(newOwner: PublicKey) = Pair(Commands.Move(), copy(owner = newOwner))
override fun toString() = "${Emoji.newspaper}CommercialPaper(of $faceValue redeemable on $maturityDate by '$issuance', owned by ${owner.toStringShort()})"
interface Commands : CommandData {
@ -105,6 +105,7 @@ object TwoPartyTradeProtocol {
val ourSignature = myKeyPair.signWithECDSA(partialTX.txBits)
val tsaSig = TimestamperClient(this, timestampingAuthority).timestamp(partialTX.txBits)
val fullySigned = partialTX.withAdditionalSignature(tsaSig).withAdditionalSignature(ourSignature)
val ltx = fullySigned.verifyToLedgerTransaction(serviceHub.identityService)
// We should run it through our full TransactionGroup of all transactions here.
@ -112,7 +113,7 @@ object TwoPartyTradeProtocol {
send(TRADE_TOPIC, otherSide, buyerSessionID, fullySigned)
return Pair(wtx, fullySigned.verifyToLedgerTransaction(serviceHub.identityService))
return Pair(wtx, ltx)
package core
import com.google.common.io.BaseEncoding
import core.crypto.Base58
import core.serialization.OpaqueBytes
import java.math.BigInteger
import java.security.*
import java.security.interfaces.ECPublicKey
// "sealed" here means there can't be any subclasses other than the ones defined here.
sealed class SecureHash(bits: ByteArray) : OpaqueBytes(bits) {
@ -34,6 +36,7 @@ sealed class SecureHash(bits: ByteArray) : OpaqueBytes(bits) {
fun sha256(bits: ByteArray) = SHA256(MessageDigest.getInstance("SHA-256").digest(bits))
fun sha256Twice(bits: ByteArray) = sha256(sha256(bits).bits)
fun sha256(str: String) = sha256(str.toByteArray())
fun randomSHA256() = sha256(SecureRandom.getInstanceStrong().generateSeed(32))
@ -108,4 +111,15 @@ fun PublicKey.verifyWithECDSA(content: ByteArray, signature: DigitalSignature) {
if (verifier.verify(signature.bits) == false)
throw SignatureException("Signature did not match")
/** Render a public key to a string, using a short form if it's an elliptic curve public key */
fun PublicKey.toStringShort(): String {
return (this as? ECPublicKey)?.let { key ->
"DL" + Base58.encode(key.w.affineX.toByteArray()) // DL -> Distributed Ledger
} ?: toString()
// Allow Kotlin destructuring: val (private, public) = keypair
operator fun KeyPair.component1() = this.private
operator fun KeyPair.component2() = this.public
import co.paralleluniverse.fibers.Suspendable
import core.messaging.MessagingService
import core.messaging.NetworkMap
import core.serialization.SerializedBytes
import java.security.KeyPair
import java.security.KeyPairGenerator
@ -121,4 +122,5 @@ interface ServiceHub {
val identityService: IdentityService
val storageService: StorageService
val networkService: MessagingService
val networkMapService: NetworkMap
@ -39,11 +39,14 @@ interface OwnableState : ContractState {
/** Returns the SHA-256 hash of the serialised contents of this state (not cached!) */
fun ContractState.hash(): SecureHash = SecureHash.sha256(serialize().bits)
// TODO: Give this a shorter name.
* A stateref is a pointer to a state, this is an equivalent of an "outpoint" in Bitcoin. It records which transaction
* defined the state and where in that transaction it was.
data class ContractStateRef(val txhash: SecureHash, val index: Int)
data class ContractStateRef(val txhash: SecureHash, val index: Int) {
override fun toString() = "$txhash($index)"
/** A StateAndRef is simply a (state, ref) pair. For instance, a wallet (which holds available assets) contains these. */
data class StateAndRef<out T : ContractState>(val state: T, val ref: ContractStateRef)
@ -79,6 +82,9 @@ data class Command(val data: CommandData, val pubkeys: List<PublicKey>) {
constructor(data: CommandData, key: PublicKey) : this(data, listOf(key))
private fun commandDataToString() = data.toString().let { if (it.contains("@")) it.replace('$', '.').split("@")[0] else it }
override fun toString() = "${commandDataToString()} with pubkeys ${pubkeys.map { it.toStringShort() }}"
/** Wraps an object that was signed by a public key, which may be a well known/recognised institutional key. */
@ -126,4 +132,4 @@ interface Contract {
* the contract's contents).
val legalContractReference: SecureHash
import core.serialization.SerializedBytes
import core.serialization.deserialize
import core.serialization.serialize
import core.utilities.Emoji
import java.security.KeyPair
import java.security.PublicKey
import java.security.SignatureException
@ -60,6 +61,15 @@ data class WireTransaction(val inputStates: List<ContractStateRef>,
return LedgerTransaction(inputStates, outputStates, authenticatedArgs, originalHash)
override fun toString(): String {
val buf = StringBuilder()
for (input in inputStates) buf.appendln("${Emoji.rightArrow}INPUT: $input")
for (output in outputStates) buf.appendln("${Emoji.leftArrow}OUTPUT: $output")
for (command in commands) buf.appendln("${Emoji.diamond}COMMAND: $command")
return buf.toString()
/** Container for a [WireTransaction] and attached signatures. */
@ -16,6 +16,9 @@ import java.security.SecureRandom
import java.time.Duration
import java.time.temporal.Temporal
import java.util.concurrent.Executor
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
val Int.days: Duration get() = Duration.ofDays(this.toLong())
val Int.hours: Duration get() = Duration.ofHours(this.toLong())
@ -28,8 +31,8 @@ val Int.seconds: Duration get() = Duration.ofSeconds(this.toLong())
fun random63BitValue(): Long = Math.abs(SecureRandom.getInstanceStrong().nextLong())
fun <T> ListenableFuture<T>.whenComplete(executor: Executor? = null, body: () -> Unit) {
addListener(Runnable { body() }, executor ?: MoreExecutors.directExecutor())
fun <T> ListenableFuture<T>.whenComplete(executor: Executor? = null, body: (T) -> Unit) {
addListener(Runnable { body(get()) }, executor ?: RunOnCallerThread)
/** Executes the given block and sets the future to either the result, or any exception that was thrown. */
@ -46,4 +49,38 @@ fun <T> SettableFuture<T>.setFrom(logger: Logger? = null, block: () -> T): Setta
// Simple infix function to add back null safety that the JDK lacks: timeA until timeB
infix fun Temporal.until(endExclusive: Temporal) = Duration.between(this, endExclusive)
val RunOnCallerThread = MoreExecutors.directExecutor()
// An alias that can sometimes make code clearer to read.
val RunOnCallerThread = MoreExecutors.directExecutor()
inline fun <T> logElapsedTime(label: String, logger: Logger? = null, body: () -> T): T {
val now = System.currentTimeMillis()
val r = body()
val elapsed = System.currentTimeMillis() - now
if (logger != null)
logger.info("$label took $elapsed msec")
println("$label took $elapsed msec")
return r
* A threadbox is a simple utility that makes it harder to forget to take a lock before accessing some shared state.
* Simply define a private class to hold the data that must be grouped under the same lock, and then pass the only
* instance to the ThreadBox constructor. You can now use the [locked] method with a lambda to take the lock in a
* way that ensures it'll be released if there's an exception.
* Note that this technique is not infallible: if you capture a reference to the fields in another lambda which then
* gets stored and invoked later, there may still be unsafe multi-threaded access going on, so watch out for that.
* This is just a simple guard rail that makes it harder to slip up.
* Example:
* private class MutableState { var i = 5 }
* private val state = ThreadBox(MutableState())
* val ii = state.locked { i }
class ThreadBox<T>(content: T, private val lock: Lock = ReentrantLock()) {
private val content = content
fun <R> locked(body: T.() -> R): R = lock.withLock { body(content) }
override val myAddress: SingleMessageRecipient = handle
override val networkMap: NetworkMap get() = object : NetworkMap {
override val timestampingNodes = if (timestampingAdvert != null) listOf(timestampingAdvert!!) else emptyList()
protected val backgroundThread = if (manuallyPumped) null else
thread(isDaemon = true, name = "In-memory message dispatcher ") {
while (!currentThread.isInterrupted) {
@ -68,9 +68,6 @@ interface MessagingService {
/** Returns an address that refers to this node. */
val myAddress: SingleMessageRecipient
/** Allows you to look up services and nodes that are available on the network. */
val networkMap: NetworkMap
@ -85,7 +82,9 @@ fun MessagingService.runOnNextMessage(topic: String = "", executor: Executor? =
fun MessagingService.send(topic: String, to: MessageRecipients, obj: Any) = send(createMessage(topic, obj.serialize().bits), to)
fun MessagingService.send(topic: String, to: MessageRecipients, obj: Any, includeClassName: Boolean = false) {
send(createMessage(topic, obj.serialize(includeClassName = includeClassName).bits), to)
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
@ -134,4 +133,4 @@ interface SingleMessageRecipient : MessageRecipients
/** A base class for a set of recipients specifically identified by the sender. */
interface MessageRecipientGroup : MessageRecipients
/** A special base class for the set of all possible recipients, without having to identify who they all are. */
interface AllPossibleRecipients : MessageRecipients
interface AllPossibleRecipients : MessageRecipients
package core.messaging
import core.Party
import java.util.*
/** Info about a network node that has is operated by some sort of verified identity. */
data class LegallyIdentifiableNode(val address: SingleMessageRecipient, val identity: Party)
* A NetworkMap allows you to look up various types of services provided by nodes on the network, and find node
* addresses given legal identities (NB: not all nodes may have legal identities).
* A network map contains lists of nodes on the network along with information about their identity keys, services
* they provide and host names or IP addresses where they can be connected to. A reasonable architecture for the
* network map service might be one like the Tor directory authorities, where several nodes linked by RAFT or Paxos
* elect a leader and that leader distributes signed documents describing the network layout. Those documents can
* then be cached by every node and thus a network map can be retrieved given only a single successful peer connection.
* A real implementation would probably do RPCs to a lookup service which might in turn be backed by a ZooKeeper
* cluster or equivalent.
* For now, this class is truly minimal.
* This interface assumes fast, synchronous access to an in-memory map.
interface NetworkMap {
val timestampingNodes: List<LegallyIdentifiableNode>
// TODO: Move this to the test tree once a real network map is implemented and this scaffolding is no longer needed.
class MockNetworkMap : NetworkMap {
override val timestampingNodes = Collections.synchronizedList(ArrayList<LegallyIdentifiableNode>())
@ -176,6 +176,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
} catch (t: Throwable) {
// TODO: Quasar is logging exceptions by itself too, find out where and stop it.
logger.error("Caught error whilst invoking protocol state machine", t)
throw t
@ -312,4 +313,4 @@ open class FiberRequest(val topic: String, val destination: MessageRecipients?,
class NotExpectingResponse(topic: String, destination: MessageRecipients, sessionIDForSend: Long, obj: Any?)
: FiberRequest(topic, destination, sessionIDForSend, -1, obj)
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
* All other rights reserved.
package core.node
import com.google.common.net.HostAndPort
import core.RunOnCallerThread
import core.ThreadBox
import core.messaging.*
import core.utilities.loggerFor
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.api.core.client.*
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule
import java.math.BigInteger
import java.nio.file.Path
import java.security.SecureRandom
import java.time.Instant
import java.util.*
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executor
import javax.annotation.concurrent.ThreadSafe
// TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman.
// TODO: Implement a discovery engine that can trigger builds of new connections when another node registers? (later)
* This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product.
* Artemis is a message queue broker and here, we embed the entire server inside our own process. Nodes communicate
* with each other using (by default) an Artemis specific protocol, but it supports other protocols like AQMP/1.0
* as well.
* The current implementation is skeletal and lacks features like security or firewall tunnelling (that is, you must
* be able to receive TCP connections in order to receive messages). It is good enough for local communication within
* a fully connected network, trusted network or on localhost.
class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort) : MessagingService {
// In future: can contain onion routing info, etc.
private data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient
companion object {
val log = loggerFor<ArtemisMessagingService>()
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
// confusion.
val TOPIC_PROPERTY = "platform-topic"
/** Temp helper until network map is established. */
fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = Address(hostAndPort)
private lateinit var mq: EmbeddedActiveMQ
private lateinit var clientFactory: ClientSessionFactory
private lateinit var session: ClientSession
private lateinit var inboundConsumer: ClientConsumer
private class InnerState {
var running = false
val sendClients = HashMap<Address, ClientProducer>()
private val mutex = ThreadBox(InnerState())
/** A registration to handle messages of different types */
inner class Handler(val executor: Executor?, val topic: String,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
private val handlers = CopyOnWriteArrayList<Handler>()
private fun getSendClient(addr: Address): ClientProducer {
return mutex.locked {
sendClients.getOrPut(addr) {
val qName = addr.hostAndPort.toString()
fun start() {
// Wire up various bits of configuration. This is so complicated because Artemis is an embedded message queue
// server. Thus we're running both a "server" and a "client" in the same JVM process. A future node might be
// able to use an external MQ server instead, for instance, if a bank already has an MQ setup and wishes to
// reuse it, or if it makes sense for scaling to split the functionality out, or if it makes sense for security.
// But for now, we bundle it all up into one thing.
mq = EmbeddedActiveMQ()
val config = createArtemisConfig(directory, myHostPort)
val secConfig = SecurityConfiguration()
val password = BigInteger(128, SecureRandom.getInstanceStrong()).toString(16)
secConfig.addUser("internal", password)
secConfig.addRole("internal", "internal")
secConfig.defaultUser = "internal"
"#" to setOf(Role("internal", true, true, true, true, true, true, true))
val secManager = ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, secConfig)
// Connect to our in-memory server.
clientFactory = ActiveMQClient.createServerLocatorWithoutHA(
// Create a queue on which to receive messages and set up the handler.
session = clientFactory.createSession()
session.createQueue(myHostPort.toString(), "inbound", false)
inboundConsumer = session.createConsumer("inbound").setMessageHandler { message: ClientMessage ->
// This code runs for every inbound message.
if (!message.containsProperty(TOPIC_PROPERTY)) {
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
// TODO: Figure out whether we always need to acknowledge messages, even when invalid.
val topic = message.getStringProperty(TOPIC_PROPERTY)
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
// or removed whilst the filter is executing will not affect anything.
val deliverTo = handlers.filter { if (it.topic.isBlank()) true else it.topic == topic }
if (deliverTo.isEmpty()) {
// This should probably be downgraded to a trace in future, so the protocol can evolve with new topics
// without causing log spam.
log.warn("Received message for $topic that doesn't have any registered handlers.")
val bits = ByteArray(message.bodySize)
val msg = object : Message {
override val topic = topic
override val data: ByteArray = bits
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
override val debugMessageID: String = message.messageID.toString()
override fun serialise(): ByteArray = bits
for (handler in deliverTo) {
(handler.executor ?: RunOnCallerThread).execute {
try {
handler.callback(msg, handler)
} catch(e: Exception) {
log.error("Caught exception whilst executing message handler for $topic", e)
mutex.locked { running = true }
override fun stop() {
mutex.locked {
for (producer in sendClients.values)
// We expect to be garbage collected shortly after being stopped, so we don't null anything explicitly here.
running = false
override fun send(message: Message, target: MessageRecipients) {
if (target !is Address)
TODO("Only simple sends to single recipients are currently implemented")
val artemisMessage = session.createMessage(true).putStringProperty("platform-topic", message.topic).writeBodyBufferBytes(message.data)
override fun addMessageHandler(topic: String, executor: Executor?,
callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
val handler = Handler(executor, topic, callback)
return handler
override fun removeMessageHandler(registration: MessageHandlerRegistration) {
override fun createMessage(topic: String, data: ByteArray): Message {
// TODO: We could write an object that proxies directly to an underlying MQ message here and avoid copying.
return object : Message {
override val topic: String get() = topic
override val data: ByteArray get() = data
override val debugTimestamp: Instant = Instant.now()
override fun serialise(): ByteArray = this.serialise()
override val debugMessageID: String get() = Instant.now().toEpochMilli().toString()
override fun toString() = topic + "#" + String(data)
override val myAddress: SingleMessageRecipient = Address(myHostPort)
private enum class ConnectionDirection { INBOUND, OUTBOUND }
private fun maybeSetupConnection(hostAndPort: HostAndPort) {
val name = hostAndPort.toString()
// To make ourselves talk to a remote server, we need a "bridge". Bridges are things inside Artemis that know how
// to handle remote machines going away temporarily, retry connections, etc. They're the bit that handles
// unreliable peers. Thus, we need one bridge per node we are talking to.
// Each bridge consumes from a queue on our end and forwards messages to a queue on their end. So for each node
// we must create a queue, then create and configure a bridge.
// Note that bridges are not two way. A having a bridge to B does not imply that B can connect back to A. This
// becomes important for cases like firewall tunnelling and connection proxying where connectivity is not
// entirely duplex. The Artemis team may add this functionality in future:
// https://issues.apache.org/jira/browse/ARTEMIS-355
if (!session.queueQuery(SimpleString(name)).isExists) {
session.createQueue(name, name, true /* durable */)
if (!mq.activeMQServer.configuration.connectorConfigurations.containsKey(name)) {
mq.activeMQServer.configuration.addConnectorConfiguration(name, tcpTransport(ConnectionDirection.OUTBOUND,
hostAndPort.hostText, hostAndPort.port))
mq.activeMQServer.deployBridge(BridgeConfiguration().apply {
setConfirmationWindowSize(100000) // a guess
private fun setConfigDirectories(config: Configuration, dir: Path) {
config.apply {
private fun createArtemisConfig(directory: Path, hp: HostAndPort): Configuration {
val config = ConfigurationImpl()
setConfigDirectories(config, directory)
// We will be talking to our server purely in memory.
tcpTransport(ConnectionDirection.INBOUND, "", hp.port),
return config
private fun tcpTransport(direction: ConnectionDirection, host: String, port: Int) =
when (direction) {
ConnectionDirection.INBOUND -> NettyAcceptorFactory::class.java.name
ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name
TransportConstants.HOST_PROP_NAME to host,
TransportConstants.PORT_PROP_NAME to port.toInt()
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
* All other rights reserved.
package core.node
import core.KeyManagementService
import core.ThreadBox
import java.security.KeyPair
import java.security.KeyPairGenerator
import java.security.PrivateKey
import java.security.PublicKey
import java.util.*
import javax.annotation.concurrent.ThreadSafe
* A simple in-memory KMS that doesn't bother saving keys to disk. A real implementation would:
* - Probably be accessed via the network layer as an internal node service i.e. via a message queue, so it can run
* on a separate/firewalled service.
* - Use the protocol framework so requests to fetch keys can be suspended whilst a human signs off on the request.
* - Use deterministic key derivation.
* - Possibly have some sort of TREZOR-like two-factor authentication ability
* etc
class E2ETestKeyManagementService : KeyManagementService {
private class InnerState {
val keys = HashMap<PublicKey, PrivateKey>()
private val mutex = ThreadBox(InnerState())
// Accessing this map clones it.
override val keys: Map<PublicKey, PrivateKey> get() = mutex.locked { HashMap(keys) }
override fun freshKey(): KeyPair {
val keypair = KeyPairGenerator.getInstance("EC").genKeyPair()
mutex.locked {
keys[keypair.public] = keypair.private
return keypair
Normal file
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
* All other rights reserved.
package core.node
import contracts.Cash
import core.*
import java.util.*
import javax.annotation.concurrent.ThreadSafe
* This class implements a simple, in memory wallet that tracks states that are owned by us, and also has a convenience
* method to auto-generate some self-issued cash states that can be used for test trading. A real wallet would persist
* states relevant to us into a database and once such a wallet is implemented, this scaffolding can be removed.
class E2ETestWalletService(private val services: ServiceHub) : WalletService {
// Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're
// inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference
// to wallet somewhere.
private class InnerState {
var wallet: Wallet = Wallet(emptyList<StateAndRef<OwnableState>>())
private val mutex = ThreadBox(InnerState())
override val currentWallet: Wallet get() = mutex.locked { wallet }
* Creates a random set of between (by default) 3 and 10 cash states that add up to the given amount and adds them
* to the wallet.
* The cash is self issued with the current nodes identity, as fetched from the storage service. Thus it
* would not be trusted by any sensible market participant and is effectively an IOU. If it had been issued by
* the central bank, well ... that'd be a different story altogether.
fun fillWithSomeTestCash(howMuch: Amount, atLeastThisManyStates: Int = 3, atMostThisManyStates: Int = 10, rng: Random = Random()) {
val amounts = calculateRandomlySizedAmounts(howMuch, atLeastThisManyStates, atMostThisManyStates, rng)
val myIdentity = services.storageService.myLegalIdentity
val myKey = services.storageService.myLegalIdentityKey
// We will allocate one state to one transaction, for simplicities sake.
val cash = Cash()
val transactions = amounts.map { pennies ->
// This line is what makes the cash self issued. We just use zero as our deposit reference: we don't need
// this field as there's no other database or source of truth we need to sync with.
val depositRef = myIdentity.ref(0)
val issuance = TransactionBuilder()
val freshKey = services.keyManagementService.freshKey()
cash.craftIssue(issuance, Amount(pennies, howMuch.currency), depositRef, freshKey.public)
return@map issuance.toSignedTransaction(true)
val statesAndRefs = transactions.map {
StateAndRef(it.tx.outputStates[0] as OwnableState, ContractStateRef(it.id, 0))
mutex.locked {
wallet = wallet.copy(wallet.states + statesAndRefs)
private fun calculateRandomlySizedAmounts(howMuch: Amount, min: Int, max: Int, rng: Random): LongArray {
val numStates = min + Math.floor(rng.nextDouble() * (max - min)).toInt()
val amounts = LongArray(numStates)
val baseSize = howMuch.pennies / numStates
var filledSoFar = 0L
for (i in 0..numStates - 1) {
if (i < numStates - 1) {
// Adjust the amount a bit up or down, to give more realistic amounts (not all identical).
amounts[i] = baseSize + (baseSize / 2 * (rng.nextDouble() - 0.5)).toLong()
filledSoFar += baseSize
} else {
// Handle inexact rounding.
amounts[i] = howMuch.pennies - filledSoFar
return amounts
Normal file
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
* All other rights reserved.
package core.node
import core.IdentityService
import core.Party
import java.security.PublicKey
* Scaffolding: a dummy identity service that just expects to have identities loaded off disk or found elsewhere.
class FixedIdentityService(private val identities: List<Party>) : IdentityService {
private val keyToParties = identities.toMapBy { it.owningKey }
override fun partyFromKey(key: PublicKey): Party? = keyToParties[key]
Normal file
Normal file
import com.google.common.net.HostAndPort
import core.*
import core.messaging.*
import core.serialization.deserialize
import core.serialization.serialize
import core.utilities.loggerFor
import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyPair
import java.security.KeyPairGenerator
import java.util.*
import java.util.concurrent.Executors
val DEFAULT_PORT = 31337
class ConfigurationException(message: String) : Exception(message)
// TODO: Split this into a regression testing environment
* A simple wrapper around a plain old Java .properties file. The keys have the same name as in the source code.
* TODO: Replace Java properties file with a better config file format (maybe yaml).
* We want to be able to configure via a GUI too, so an ability to round-trip whitespace, comments etc when machine
* editing the file is a must-have.
class NodeConfiguration(private val properties: Properties) {
val myLegalName: String by properties
* A Node manages a standalone server that takes part in the P2P network. It creates the services found in [ServiceHub],
* loads important data off disk and starts listening for connections.
* @param dir A [Path] to a location on disk where working files can be found or stored.
* @param myNetAddr The host and port that this server will use. It can't find out its own external hostname, so you
* have to specify that yourself.
* @param configuration This is typically loaded from a .properties file
* @param timestamperAddress If null, this node will become a timestamping node, otherwise, it will use that one.
class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeConfiguration,
timestamperAddress: LegallyIdentifiableNode?) {
private val log = loggerFor<Node>()
// We will run as much stuff in this thread as possible to keep the risk of thread safety bugs low during the
// low-performance prototyping period.
val serverThread = Executors.newSingleThreadExecutor()
val services = object : ServiceHub {
override val networkService: MessagingService get() = net
override val networkMapService: NetworkMap = MockNetworkMap()
override val storageService: StorageService get() = storage
override val walletService: WalletService get() = wallet
override val keyManagementService: KeyManagementService get() = keyManagement
override val identityService: IdentityService get() = identity
// TODO: Implement mutual exclusion so we can't start the node twice by accident.
val storage = makeStorageService(dir)
val smm = StateMachineManager(services, serverThread)
val net = ArtemisMessagingService(dir, myNetAddr)
val wallet: WalletService = E2ETestWalletService(services)
val keyManagement = E2ETestKeyManagementService()
val inNodeTimestampingService: TimestamperNodeService?
val identity: IdentityService
init {
// Insert a network map entry for the timestamper: this is all temp scaffolding and will go away. If we are
// given the details, the timestamping node is somewhere else. Otherwise, we do our own timestamping.
val tsid = if (timestamperAddress != null) {
inNodeTimestampingService = null
} else {
inNodeTimestampingService = TimestamperNodeService(net, storage.myLegalIdentity, storage.myLegalIdentityKey)
LegallyIdentifiableNode(net.myAddress, storage.myLegalIdentity)
(services.networkMapService as MockNetworkMap).timestampingNodes.add(tsid)
// We don't have any identity infrastructure right now, so we just throw together the only two identities we
// know about: our own, and the identity of the remote timestamper node (if any).
val knownIdentities = if (timestamperAddress != null)
listOf(storage.myLegalIdentity, timestamperAddress.identity)
identity = FixedIdentityService(knownIdentities)
fun stop() {
fun makeStorageService(dir: Path): StorageService {
// Load the private identity key, creating it if necessary. The identity key is a long term well known key that
// is distributed to other peers and we use it (or a key signed by it) when we need to do something
// "permissioned". The identity file is what gets distributed and contains the node's legal name along with
// the public key. Obviously in a real system this would need to be a certificate chain of some kind to ensure
// the legal name is actually validated in some way.
val privKeyFile = dir.resolve(PRIVATE_KEY_FILE_NAME)
val pubIdentityFile = dir.resolve(PUBLIC_IDENTITY_FILE_NAME)
val (identity, keypair) = if (!Files.exists(privKeyFile)) {
log.info("Identity key not found, generating fresh key!")
val keypair: KeyPair = KeyPairGenerator.getInstance("EC").genKeyPair()
val myIdentity = Party(configuration.myLegalName, keypair.public)
// We include the Party class with the file here to help catch mixups when admins provide files of the
// wrong type by mistake.
myIdentity.serialize(includeClassName = true).writeToFile(pubIdentityFile)
Pair(myIdentity, keypair)
} else {
// Check that the identity in the config file matches the identity file we have stored to disk.
// This is just a sanity check. It shouldn't fail unless the admin has fiddled with the files and messed
// things up for us.
val myIdentity = Files.readAllBytes(pubIdentityFile).deserialize<Party>(includeClassName = true)
if (myIdentity.name != configuration.myLegalName)
throw ConfigurationException("The legal name in the config file doesn't match the stored identity file:" +
"${configuration.myLegalName} vs ${myIdentity.name}")
// Load the private key.
val keypair = Files.readAllBytes(privKeyFile).deserialize<KeyPair>()
Pair(myIdentity, keypair)
log.info("Node owned by ${identity.name} starting up ...")
return object : StorageService {
private val tables = HashMap<String, MutableMap<Any, Any>>()
override fun <K, V> getMap(tableName: String): MutableMap<K, V> {
// TODO: This should become a database.
synchronized(tables) {
return tables.getOrPut(tableName) { Collections.synchronizedMap(HashMap<Any, Any>()) } as MutableMap<K, V>
override val myLegalIdentity = identity
override val myLegalIdentityKey = keypair
companion object {
val PRIVATE_KEY_FILE_NAME = "identity-private-key"
val PUBLIC_IDENTITY_FILE_NAME = "identity-public"
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
* All other rights reserved.
package core.node
import com.google.common.net.HostAndPort
import contracts.CommercialPaper
import contracts.protocols.TwoPartyTradeProtocol
import core.*
import core.messaging.LegallyIdentifiableNode
import core.messaging.SingleMessageRecipient
import core.messaging.runOnNextMessage
import core.messaging.send
import core.serialization.deserialize
import core.utilities.BriefLogFormatter
import core.utilities.Emoji
import joptsimple.OptionParser
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.security.KeyPairGenerator
import java.security.PublicKey
import java.time.Instant
import java.util.*
// This demo app can be run in one of two modes. In the listening mode it will buy commercial paper from a selling node
// that connects to it, using IOU cash it issued to itself. It also runs a timestamping service in this mode. In the
// non-listening mode, it will connect to the specified listening node and sell some commercial paper in return for
// cash. There's currently no UI so all you can see is log messages.
// Please note that the software currently assumes every node has a unique DNS name. Thus you cannot name both nodes
// "localhost". This might get fixed in future, but for now to run the listening node, alias "alpha" to "localhost"
// in your /etc/hosts file and then try a command line like this:
// --dir=alpha --service-fake-trades --network-address=alpha
// To run the node that initiates a trade, alias "beta" to "localhost" in your /etc/hosts file and then try a command
// line like this:
// --dir=beta --fake-trade-with=alpha --network-address=beta:31338
// --timestamper-identity-file=alpha/identity-public --timestamper-address=alpha
// Alternatively,
fun main(args: Array<String>) {
val parser = OptionParser()
val networkAddressArg = parser.accepts("network-address").withRequiredArg().required()
val dirArg = parser.accepts("directory").withRequiredArg().defaultsTo("nodedata")
// Some dummy functionality that won't last long ...
// Mode flags for the first demo.
val serviceFakeTradesArg = parser.accepts("service-fake-trades")
val fakeTradeWithArg = parser.accepts("fake-trade-with").requiredUnless(serviceFakeTradesArg).withRequiredArg()
// Temporary flags until network map and service discovery is fleshed out. The identity file does NOT contain the
// network address because all this stuff is meant to come from a dynamic discovery service anyway, and the identity
// is meant to be long-term stable. It could contain a domain name, but we may end up not routing messages directly
// to DNS-identified endpoints anyway (e.g. consider onion routing as a possibility).
val timestamperIdentityFile = parser.accepts("timestamper-identity-file").requiredIf(fakeTradeWithArg).withRequiredArg()
val timestamperNetAddr = parser.accepts("timestamper-address").requiredIf(timestamperIdentityFile).withRequiredArg()
val options = try {
} catch (e: Exception) {
throw Exception() // TODO: Remove when upgrading to Kotlin 1.0 RC
val dir = Paths.get(options.valueOf(dirArg))
val configFile = dir.resolve("config")
if (!Files.exists(dir)) {
val config = loadConfigFile(configFile)
val myNetAddr = HostAndPort.fromString(options.valueOf(networkAddressArg)).withDefaultPort(DEFAULT_PORT)
val listening = options.has(serviceFakeTradesArg)
val timestamperId = if (options.has(timestamperIdentityFile)) {
val addr = HostAndPort.fromString(options.valueOf(timestamperNetAddr)).withDefaultPort(DEFAULT_PORT)
val path = Paths.get(options.valueOf(timestamperIdentityFile))
val party = Files.readAllBytes(path).deserialize<Party>(includeClassName = true)
LegallyIdentifiableNode(ArtemisMessagingService.makeRecipient(addr), party)
} else null
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId) }
// Now do some fake nonsense just to give us some activity.
(node.services.walletService as E2ETestWalletService).fillWithSomeTestCash(1000.DOLLARS)
val timestampingAuthority = node.services.networkMapService.timestampingNodes.first()
if (listening) {
// Wait around until a node asks to start a trade with us. In a real system, this part would happen out of band
// via some other system like an exchange or maybe even a manual messaging system like Bloomberg. But for the
// next stage in our building site, we will just auto-generate fake trades to give our nodes something to do.
// Note that currently, the two-party trade protocol doesn't actually resolve dependencies of transactions!
// Thus, we can make up whatever junk we like and trade non-existent cash/assets: the other side won't notice.
// Obviously, fixing that is the next step.
// As the seller initiates the DVP/two-party trade protocol, here, we will be the buyer.
node.net.addMessageHandler("test.junktrade") { msg, handlerRegistration ->
val replyTo = msg.data.deserialize<SingleMessageRecipient>(includeClassName = true)
val buyerSessionID = random63BitValue()
println("Got a new junk trade request, sending back session ID and starting buy protocol")
val future = TwoPartyTradeProtocol.runBuyer(node.smm, timestampingAuthority, replyTo, 100.DOLLARS,
CommercialPaper.State::class.java, buyerSessionID)
future.whenComplete {
println("Purchase complete - we are a happy customer! Final transaction is:")
println("Waiting for another seller to connect. Or press Ctrl-C to shut me down.")
node.net.send("test.junktrade.initiate", replyTo, buyerSessionID)
println("Waiting for a seller to connect to us (run the other node) ...")
} else {
// Grab a session ID for the fake trade from the other side, then kick off the seller and sell them some junk.
if (!options.has(fakeTradeWithArg)) {
println("Need the --fake-trade-with command line argument")
val peerAddr = HostAndPort.fromString(options.valuesOf(fakeTradeWithArg).single()).withDefaultPort(DEFAULT_PORT)
val otherSide = ArtemisMessagingService.makeRecipient(peerAddr)
node.net.runOnNextMessage("test.junktrade.initiate") { msg ->
val sessionID = msg.data.deserialize<Long>()
println("Got session ID back, now starting the sell protocol")
val cpOwnerKey = node.keyManagement.freshKey()
val commercialPaper = makeFakeCommercialPaper(cpOwnerKey.public)
val future = TwoPartyTradeProtocol.runSeller(node.smm, timestampingAuthority,
otherSide, commercialPaper, 100.DOLLARS, cpOwnerKey, sessionID)
future.whenComplete {
println("Sale completed - we have a happy customer!")
println("Final transaction is")
println("Sending a message to the listening/buying node ...")
node.net.send("test.junktrade", otherSide, node.net.myAddress, includeClassName = true)
fun makeFakeCommercialPaper(ownedBy: PublicKey): StateAndRef<CommercialPaper.State> {
// Make a fake company that's issued its own paper.
val party = Party("MegaCorp, Inc", KeyPairGenerator.getInstance("EC").genKeyPair().public)
// ownedBy here is the random key that gives us control over it.
val paper = CommercialPaper.State(party.ref(1,2,3), ownedBy, 1100.DOLLARS, Instant.now() + 10.days)
val randomRef = ContractStateRef(SecureHash.randomSHA256(), 0)
return StateAndRef(paper, randomRef)
private fun loadConfigFile(configFile: Path): NodeConfiguration {
fun askAdminToEditConfig(configFile: Path?) {
println("This is the first run, so you should edit the config file in $configFile and then start the node again.")
val defaultLegalName = "Global MegaCorp, Ltd."
if (!Files.exists(configFile)) {
createDefaultConfigFile(configFile, defaultLegalName)
val configProps = configFile.toFile().reader().use {
Properties().apply { load(it) }
val config = NodeConfiguration(configProps)
// Make sure admin did actually edit at least the legal name.
if (config.myLegalName == defaultLegalName)
return config
private fun createDefaultConfigFile(configFile: Path?, defaultLegalName: String) {
# Node configuration: adjust below as needed, then delete this comment.
myLegalName = $defaultLegalName
private fun printHelp() {
To run the listening node, alias "alpha" to "localhost" in your
/etc/hosts file and then try a command line like this:
--dir=alpha --service-fake-trades --network-address=alpha
To run the node that initiates a trade, alias "beta" to "localhost"
in your /etc/hosts file and then try a command line like this:
--dir=beta --fake-trade-with=alpha --network-address=beta:31338 --timestamper-identity-file=alpha/identity-public --timestamper-address=alpha
import org.objenesis.strategy.StdInstantiatorStrategy
import java.io.ByteArrayOutputStream
import java.lang.reflect.InvocationTargetException
import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyPairGenerator
import java.time.Instant
import java.util.*
@ -66,14 +68,19 @@ val THREAD_LOCAL_KRYO = ThreadLocal.withInitial { createKryo() }
class SerializedBytes<T : Any>(bits: ByteArray) : OpaqueBytes(bits) {
val hash: SecureHash by lazy { bits.sha256() }
fun writeToFile(path: Path) = Files.write(path, bits)
// Some extension functions that make deserialisation convenient and provide auto-casting of the result.
inline fun <reified T : Any> ByteArray.deserialize(kryo: Kryo = THREAD_LOCAL_KRYO.get()): T {
return kryo.readObject(Input(this), T::class.java)
inline fun <reified T : Any> ByteArray.deserialize(kryo: Kryo = THREAD_LOCAL_KRYO.get(), includeClassName: Boolean = false): T {
if (includeClassName)
return kryo.readClassAndObject(Input(this)) as T
return kryo.readObject(Input(this), T::class.java)
inline fun <reified T : Any> OpaqueBytes.deserialize(kryo: Kryo = THREAD_LOCAL_KRYO.get()): T {
return kryo.readObject(Input(this.bits), T::class.java)
inline fun <reified T : Any> OpaqueBytes.deserialize(kryo: Kryo = THREAD_LOCAL_KRYO.get(), includeClassName: Boolean = false): T {
return this.bits.deserialize(kryo, includeClassName)
inline fun <reified T : Any> SerializedBytes<T>.deserialize(): T = bits.deserialize()
@ -81,10 +88,13 @@ inline fun <reified T : Any> SerializedBytes<T>.deserialize(): T = bits.deserial
* Can be called on any object to convert it to a byte array (wrapped by [SerializedBytes]), regardless of whether
* the type is marked as serializable or was designed for it (so be careful!)
fun <T : Any> T.serialize(kryo: Kryo = THREAD_LOCAL_KRYO.get()): SerializedBytes<T> {
fun <T : Any> T.serialize(kryo: Kryo = THREAD_LOCAL_KRYO.get(), includeClassName: Boolean = false): SerializedBytes<T> {
val stream = ByteArrayOutputStream()
Output(stream).use {
kryo.writeObject(it, this)
if (includeClassName)
kryo.writeClassAndObject(it, this)
kryo.writeObject(it, this)
return SerializedBytes(stream.toByteArray())
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
* All other rights reserved.
package core.utilities
* A simple wrapper class that contains icons and support for printing them only when we're connected to a terminal.
object Emoji {
val hasTerminal by lazy { System.getenv("TERM") != null && System.getenv("LANG").contains("UTF-8") }
const val CODE_DIAMOND = "\ud83d\udd37"
const val CODE_BAG_OF_CASH = "\ud83d\udcb0"
const val CODE_NEWSPAPER = "\ud83d\udcf0"
const val CODE_RIGHT_ARROW = "\u27a1\ufe0f"
const val CODE_LEFT_ARROW = "\u2b05\ufe0f"
* When non-null, toString() methods are allowed to use emoji in the output as we're going to render them to a
* sufficiently capable text surface.
private val emojiMode = ThreadLocal<Any>()
val diamond: String get() = if (emojiMode.get() != null) "$CODE_DIAMOND " else ""
val bagOfCash: String get() = if (emojiMode.get() != null) "$CODE_BAG_OF_CASH " else ""
val newspaper: String get() = if (emojiMode.get() != null) "$CODE_NEWSPAPER " else ""
val rightArrow: String get() = if (emojiMode.get() != null) "$CODE_RIGHT_ARROW " else ""
val leftArrow: String get() = if (emojiMode.get() != null) "$CODE_LEFT_ARROW " else ""
fun renderIfSupported(obj: Any): String {
if (!hasTerminal)
return obj.toString()
if (emojiMode.get() != null)
return obj.toString()
emojiMode.set(this) // Could be any object.
try {
return obj.toString()
} finally {
class BriefLogFormatter : Formatter() {
override fun format(logRecord: LogRecord): String {
val arguments = arrayOfNulls<Any>(6)
val arguments = arrayOfNulls<Any>(7)
arguments[0] = logRecord.threadID
arguments[1] = when (logRecord.level) {
Level.SEVERE -> " **ERROR** "
Level.WARNING -> " (warning) "
else -> ""
val fullClassName = logRecord.sourceClassName
val dollarIndex = fullClassName.indexOf('$')
val className = fullClassName.substring(fullClassName.lastIndexOf('.') + 1, if (dollarIndex == -1) fullClassName.length else dollarIndex)
arguments[1] = className
arguments[2] = logRecord.sourceMethodName
arguments[3] = Date(logRecord.millis)
arguments[4] = if (logRecord.parameters != null) MessageFormat.format(logRecord.message, *logRecord.parameters) else logRecord.message
arguments[2] = className
arguments[3] = logRecord.sourceMethodName
arguments[4] = Date(logRecord.millis)
arguments[5] = if (logRecord.parameters != null) MessageFormat.format(logRecord.message, *logRecord.parameters) else logRecord.message
if (logRecord.thrown != null) {
val result = StringWriter()
arguments[5] = result.toString()
arguments[6] = result.toString()
} else {
arguments[5] = ""
arguments[6] = ""
return messageFormat.format(arguments)
companion object {
private val messageFormat = MessageFormat("{3,date,HH:mm:ss} {0} {1}.{2}: {4}\n{5}")
private val messageFormat = MessageFormat("{4,date,HH:mm:ss} {0} {1}{2}.{3}: {5}\n{6}")
// OpenJDK made a questionable, backwards incompatible change to the Logger implementation. It internally uses
// weak references now which means simply fetching the logger and changing its configuration won't work. We must
Normal file
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
* All other rights reserved.
package core.crypto;
import org.junit.*;
import java.math.*;
import java.util.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** From the bitcoinj library */
public class Base58Test {
public void testEncode() throws Exception {
byte[] testbytes = "Hello World".getBytes();
assertEquals("JxF12TrwUP45BMd", Base58.encode(testbytes));
BigInteger bi = BigInteger.valueOf(3471844090L);
assertEquals("16Ho7Hs", Base58.encode(bi.toByteArray()));
byte[] zeroBytes1 = new byte[1];
assertEquals("1", Base58.encode(zeroBytes1));
byte[] zeroBytes7 = new byte[7];
assertEquals("1111111", Base58.encode(zeroBytes7));
// test empty encode
assertEquals("", Base58.encode(new byte[0]));
public void testDecode() throws Exception {
byte[] testbytes = "Hello World".getBytes();
byte[] actualbytes = Base58.decode("JxF12TrwUP45BMd");
assertTrue(new String(actualbytes), Arrays.equals(testbytes, actualbytes));
assertTrue("1", Arrays.equals(Base58.decode("1"), new byte[1]));
assertTrue("1111", Arrays.equals(Base58.decode("1111"), new byte[4]));
try {
Base58.decode("This isn't valid base58");
} catch (AddressFormatException e) {
// expected
// Checksum should fail.
try {
} catch (AddressFormatException e) {
// expected
// Input is too short.
try {
} catch (AddressFormatException e) {
// expected
// Test decode of empty String.
assertEquals(0, Base58.decode("").length);
// Now check we can correctly decode the case where the high bit of the first byte is not zero, so BigInteger
// sign extends. Fix for a bug that stopped us parsing keys exported using sipas patch.
public void testDecodeToBigInteger() {
byte[] input = Base58.decode("129");
assertEquals(new BigInteger(1, input), Base58.decodeToBigInteger("129"));
package core
import core.messaging.MessagingService
import core.messaging.MockNetworkMap
import core.messaging.NetworkMap
import core.node.TimestampingError
import core.serialization.SerializedBytes
import core.serialization.deserialize
@ -63,11 +65,13 @@ class MockStorageService : StorageService {
override val myLegalIdentityKey: KeyPair = KeyPairGenerator.getInstance("EC").genKeyPair()
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)
private val mapOfMaps = HashMap<String, MutableMap<Any, Any>>()
private val tables = HashMap<String, MutableMap<Any, Any>>()
override fun <K, V> getMap(tableName: String): MutableMap<K, V> {
return mapOfMaps.getOrPut(tableName) { Collections.synchronizedMap(HashMap<Any, Any>()) } as MutableMap<K, V>
synchronized(tables) {
return tables.getOrPut(tableName) { Collections.synchronizedMap(HashMap<Any, Any>()) } as MutableMap<K, V>
@ -76,7 +80,8 @@ class MockServices(
val keyManagement: KeyManagementService? = null,
val net: MessagingService? = null,
val identity: IdentityService? = MockIdentityService,
val storage: StorageService? = MockStorageService()
val storage: StorageService? = MockStorageService(),
val networkMap: NetworkMap? = MockNetworkMap()
) : ServiceHub {
override val walletService: WalletService
get() = wallet ?: throw UnsupportedOperationException()
@ -86,6 +91,8 @@ class MockServices(
get() = identity ?: throw UnsupportedOperationException()
override val networkService: MessagingService
get() = net ?: throw UnsupportedOperationException()
override val networkMapService: NetworkMap
get() = networkMap ?: throw UnsupportedOperationException()
override val storageService: StorageService
get() = storage ?: throw UnsupportedOperationException()
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
* All other rights reserved.
package core.node
import contracts.Cash
import core.DOLLARS
import core.MockKeyManagementService
import core.MockServices
import core.ServiceHub
import core.testutils.ALICE
import core.testutils.ALICE_KEY
import org.junit.Test
import java.security.KeyPair
import java.util.*
import kotlin.test.assertEquals
class E2ETestWalletServiceTest {
val services: ServiceHub = MockServices(
keyManagement = MockKeyManagementService(emptyMap(), arrayListOf<KeyPair>(ALICE_KEY, ALICE_KEY, ALICE_KEY))
@Test fun splits() {
val wallet = E2ETestWalletService(services)
// Fix the PRNG so that we get the same splits every time.
wallet.fillWithSomeTestCash(100.DOLLARS, 3, 3, Random(0L))
val w = wallet.currentWallet
assertEquals(3, w.states.size)
val state = w.states[0].state as Cash.State
assertEquals(services.storageService.myLegalIdentity, state.deposit.party)
assertEquals(services.storageService.myLegalIdentityKey.public, state.deposit.party.owningKey)
assertEquals(29.01.DOLLARS, state.amount)
assertEquals(ALICE, state.owner)
assertEquals(33.34.DOLLARS, (w.states[2].state as Cash.State).amount)
assertEquals(35.61.DOLLARS, (w.states[1].state as Cash.State).amount)
myNode = makeNode()
serviceNode = makeNode()
mockServices = MockServices(net = serviceNode.second, storage = MockStorageService())
serverKey = network.setupTimestampingNode(true).first.identity.owningKey
val timestampingNodeID = network.setupTimestampingNode(true).first
(mockServices.networkMapService as MockNetworkMap).timestampingNodes.add(timestampingNodeID)
serverKey = timestampingNodeID.identity.owningKey
// And a separate one to be tested directly, to make the unit tests a bit faster.
service = TimestamperNodeService(serviceNode.second, Party("Unit test suite", ALICE), ALICE_KEY)
@ -76,7 +79,7 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
val psm = runNetwork {
val smm = StateMachineManager(MockServices(net = myNode.second), RunOnCallerThread)
val logName = TimestamperNodeService.TIMESTAMPING_PROTOCOL_TOPIC
val psm = TestPSM(myNode.second.networkMap.timestampingNodes[0], clock.instant())
val psm = TestPSM(mockServices.networkMapService.timestampingNodes[0], clock.instant())
smm.add(logName, psm)
@ -128,4 +131,4 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
Reference in New Issue
Block a user