mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
Implement the beginnings of a serving node.
It uses Artemis (an embeddable MQ broker) and can run in either a 'serving' mode, in which case it will sit around waiting to sell fake commercial paper assets, or a 'buying' mode in which case it will connect to a specified serving node and run the two party trade protocol. Most services are either mocked out or too trivial to be useful at this point. They will be fleshed out in the future.
This commit is contained in:
parent
360d8ec7ad
commit
1e8ea8eb2c
3
.gitignore
vendored
3
.gitignore
vendored
@ -6,6 +6,9 @@ TODO
|
||||
/build/
|
||||
/docs/build/doctrees
|
||||
|
||||
alpha
|
||||
beta
|
||||
|
||||
### JetBrains template
|
||||
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio
|
||||
|
||||
|
15
.idea/runConfigurations/Node__alpha.xml
generated
Normal file
15
.idea/runConfigurations/Node__alpha.xml
generated
Normal file
@ -0,0 +1,15 @@
|
||||
<component name="ProjectRunConfigurationManager">
|
||||
<configuration default="false" name="Node: alpha" type="JetRunConfigurationType" factoryName="Kotlin">
|
||||
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
|
||||
<option name="MAIN_CLASS_NAME" value="core.node.TraderDemoKt" />
|
||||
<option name="VM_PARAMETERS" value="-ea -javaagent:lib/quasar.jar" />
|
||||
<option name="PROGRAM_PARAMETERS" value="--dir=alpha --service-fake-trades --network-address=alpha" />
|
||||
<option name="WORKING_DIRECTORY" value="" />
|
||||
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
|
||||
<option name="ALTERNATIVE_JRE_PATH" />
|
||||
<option name="PASS_PARENT_ENVS" value="true" />
|
||||
<module name="r3prototyping" />
|
||||
<envs />
|
||||
<method />
|
||||
</configuration>
|
||||
</component>
|
15
.idea/runConfigurations/Node__beta.xml
generated
Normal file
15
.idea/runConfigurations/Node__beta.xml
generated
Normal file
@ -0,0 +1,15 @@
|
||||
<component name="ProjectRunConfigurationManager">
|
||||
<configuration default="false" name="Node: beta" type="JetRunConfigurationType" factoryName="Kotlin">
|
||||
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
|
||||
<option name="MAIN_CLASS_NAME" value="core.node.TraderDemoKt" />
|
||||
<option name="VM_PARAMETERS" value="-ea -javaagent:lib/quasar.jar" />
|
||||
<option name="PROGRAM_PARAMETERS" value="--dir=beta --fake-trade-with=alpha --network-address=beta:31338 --timestamper-identity-file=alpha/identity-public --timestamper-address=alpha:31338" />
|
||||
<option name="WORKING_DIRECTORY" value="" />
|
||||
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
|
||||
<option name="ALTERNATIVE_JRE_PATH" />
|
||||
<option name="PASS_PARENT_ENVS" value="true" />
|
||||
<module name="r3prototyping" />
|
||||
<envs />
|
||||
<method />
|
||||
</configuration>
|
||||
</component>
|
21
build.gradle
21
build.gradle
@ -3,7 +3,9 @@ version '1.0-SNAPSHOT'
|
||||
|
||||
apply plugin: 'java'
|
||||
apply plugin: 'kotlin'
|
||||
//apply plugin: 'org.jetbrains.dokka'
|
||||
apply plugin: 'application'
|
||||
|
||||
// apply plugin: 'org.jetbrains.dokka'
|
||||
|
||||
allprojects {
|
||||
sourceCompatibility = 1.8
|
||||
@ -62,6 +64,8 @@ dependencies {
|
||||
force = true // Conflict between Quasar and Artemis
|
||||
}
|
||||
|
||||
compile "net.sf.jopt-simple:jopt-simple:4.9"
|
||||
|
||||
compile("com.esotericsoftware:kryo:3.0.3")
|
||||
compile "de.javakaffee:kryo-serializers:0.37"
|
||||
|
||||
@ -96,3 +100,18 @@ tasks.withType(JavaExec) {
|
||||
jvmArgs "-javaagent:${configurations.quasar.singleFile}"
|
||||
jvmArgs "-Dco.paralleluniverse.fibers.verifyInstrumentation"
|
||||
}
|
||||
|
||||
mainClassName = 'core.node.TraderDemoKt'
|
||||
|
||||
task runDemoBuyer(type: JavaExec, dependsOn: ':classes') {
|
||||
classpath = sourceSets.main.runtimeClasspath
|
||||
main = 'core.node.TraderDemoKt'
|
||||
args = ['--dir=alpha', '--service-fake-trades', '--network-address=alpha']
|
||||
}
|
||||
|
||||
task runDemoSeller(type: JavaExec, dependsOn: ':classes') {
|
||||
classpath = sourceSets.main.runtimeClasspath
|
||||
main = 'core.node.TraderDemoKt'
|
||||
args = ['--dir=beta', '--fake-trade-with=alpha', '--network-address=beta:31338',
|
||||
'--timestamper-identity-file=alpha/identity-public', '--timestamper-address=alpha']
|
||||
}
|
||||
|
@ -105,6 +105,7 @@ object TwoPartyTradeProtocol {
|
||||
val ourSignature = myKeyPair.signWithECDSA(partialTX.txBits)
|
||||
val tsaSig = TimestamperClient(this, timestampingAuthority).timestamp(partialTX.txBits)
|
||||
val fullySigned = partialTX.withAdditionalSignature(tsaSig).withAdditionalSignature(ourSignature)
|
||||
val ltx = fullySigned.verifyToLedgerTransaction(serviceHub.identityService)
|
||||
|
||||
// We should run it through our full TransactionGroup of all transactions here.
|
||||
|
||||
@ -112,7 +113,7 @@ object TwoPartyTradeProtocol {
|
||||
|
||||
send(TRADE_TOPIC, otherSide, buyerSessionID, fullySigned)
|
||||
|
||||
return Pair(wtx, fullySigned.verifyToLedgerTransaction(serviceHub.identityService))
|
||||
return Pair(wtx, ltx)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -108,4 +108,8 @@ fun PublicKey.verifyWithECDSA(content: ByteArray, signature: DigitalSignature) {
|
||||
verifier.update(content)
|
||||
if (verifier.verify(signature.bits) == false)
|
||||
throw SignatureException("Signature did not match")
|
||||
}
|
||||
}
|
||||
|
||||
// Allow Kotlin destructuring: val (private, public) = keypair
|
||||
fun KeyPair.component1() = this.private
|
||||
fun KeyPair.component2() = this.public
|
@ -10,6 +10,7 @@ package core
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import core.messaging.MessagingService
|
||||
import core.messaging.NetworkMap
|
||||
import core.serialization.SerializedBytes
|
||||
import java.security.KeyPair
|
||||
import java.security.KeyPairGenerator
|
||||
@ -121,4 +122,5 @@ interface ServiceHub {
|
||||
val identityService: IdentityService
|
||||
val storageService: StorageService
|
||||
val networkService: MessagingService
|
||||
}
|
||||
val networkMapService: NetworkMap
|
||||
}
|
||||
|
@ -39,6 +39,7 @@ interface OwnableState : ContractState {
|
||||
/** Returns the SHA-256 hash of the serialised contents of this state (not cached!) */
|
||||
fun ContractState.hash(): SecureHash = SecureHash.sha256(serialize().bits)
|
||||
|
||||
// TODO: Give this a shorter name.
|
||||
/**
|
||||
* A stateref is a pointer to a state, this is an equivalent of an "outpoint" in Bitcoin. It records which transaction
|
||||
* defined the state and where in that transaction it was.
|
||||
@ -126,4 +127,4 @@ interface Contract {
|
||||
* the contract's contents).
|
||||
*/
|
||||
val legalContractReference: SecureHash
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,9 @@ import java.security.SecureRandom
|
||||
import java.time.Duration
|
||||
import java.time.temporal.Temporal
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.locks.Lock
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import kotlin.concurrent.withLock
|
||||
|
||||
val Int.days: Duration get() = Duration.ofDays(this.toLong())
|
||||
val Int.hours: Duration get() = Duration.ofHours(this.toLong())
|
||||
@ -46,4 +49,38 @@ fun <T> SettableFuture<T>.setFrom(logger: Logger? = null, block: () -> T): Setta
|
||||
// Simple infix function to add back null safety that the JDK lacks: timeA until timeB
|
||||
infix fun Temporal.until(endExclusive: Temporal) = Duration.between(this, endExclusive)
|
||||
|
||||
val RunOnCallerThread = MoreExecutors.directExecutor()
|
||||
// An alias that can sometimes make code clearer to read.
|
||||
val RunOnCallerThread = MoreExecutors.directExecutor()
|
||||
|
||||
inline fun <T> logElapsedTime(label: String, logger: Logger? = null, body: () -> T): T {
|
||||
val now = System.currentTimeMillis()
|
||||
val r = body()
|
||||
val elapsed = System.currentTimeMillis() - now
|
||||
if (logger != null)
|
||||
logger.info("$label took $elapsed msec")
|
||||
else
|
||||
println("$label took $elapsed msec")
|
||||
return r
|
||||
}
|
||||
|
||||
/**
|
||||
* A threadbox is a simple utility that makes it harder to forget to take a lock before accessing some shared state.
|
||||
* Simply define a private class to hold the data that must be grouped under the same lock, and then pass the only
|
||||
* instance to the ThreadBox constructor. You can now use the [locked] method with a lambda to take the lock in a
|
||||
* way that ensures it'll be released if there's an exception.
|
||||
*
|
||||
* Note that this technique is not infallible: if you capture a reference to the fields in another lambda which then
|
||||
* gets stored and invoked later, there may still be unsafe multi-threaded access going on, so watch out for that.
|
||||
* This is just a simple guard rail that makes it harder to slip up.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* private class MutableState { var i = 5 }
|
||||
* private val state = ThreadBox(MutableState())
|
||||
*
|
||||
* val ii = state.locked { i }
|
||||
*/
|
||||
class ThreadBox<T>(content: T, private val lock: Lock = ReentrantLock()) {
|
||||
private val content = content
|
||||
fun <R> locked(body: T.() -> R): R = lock.withLock { body(content) }
|
||||
}
|
||||
|
@ -153,10 +153,6 @@ public class InMemoryNetwork {
|
||||
|
||||
override val myAddress: SingleMessageRecipient = handle
|
||||
|
||||
override val networkMap: NetworkMap get() = object : NetworkMap {
|
||||
override val timestampingNodes = if (timestampingAdvert != null) listOf(timestampingAdvert!!) else emptyList()
|
||||
}
|
||||
|
||||
protected val backgroundThread = if (manuallyPumped) null else
|
||||
thread(isDaemon = true, name = "In-memory message dispatcher ") {
|
||||
while (!currentThread.isInterrupted) {
|
||||
|
@ -68,9 +68,6 @@ interface MessagingService {
|
||||
|
||||
/** Returns an address that refers to this node. */
|
||||
val myAddress: SingleMessageRecipient
|
||||
|
||||
/** Allows you to look up services and nodes that are available on the network. */
|
||||
val networkMap: NetworkMap
|
||||
}
|
||||
|
||||
/**
|
||||
@ -85,7 +82,9 @@ fun MessagingService.runOnNextMessage(topic: String = "", executor: Executor? =
|
||||
}
|
||||
}
|
||||
|
||||
fun MessagingService.send(topic: String, to: MessageRecipients, obj: Any) = send(createMessage(topic, obj.serialize().bits), to)
|
||||
fun MessagingService.send(topic: String, to: MessageRecipients, obj: Any, includeClassName: Boolean = false) {
|
||||
send(createMessage(topic, obj.serialize(includeClassName = includeClassName).bits), to)
|
||||
}
|
||||
|
||||
/**
|
||||
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
|
||||
@ -134,4 +133,4 @@ interface SingleMessageRecipient : MessageRecipients
|
||||
/** A base class for a set of recipients specifically identified by the sender. */
|
||||
interface MessageRecipientGroup : MessageRecipients
|
||||
/** A special base class for the set of all possible recipients, without having to identify who they all are. */
|
||||
interface AllPossibleRecipients : MessageRecipients
|
||||
interface AllPossibleRecipients : MessageRecipients
|
||||
|
@ -9,19 +9,25 @@
|
||||
package core.messaging
|
||||
|
||||
import core.Party
|
||||
import java.util.*
|
||||
|
||||
/** Info about a network node that has is operated by some sort of verified identity. */
|
||||
data class LegallyIdentifiableNode(val address: SingleMessageRecipient, val identity: Party)
|
||||
|
||||
/**
|
||||
* A NetworkMap allows you to look up various types of services provided by nodes on the network, and find node
|
||||
* addresses given legal identities (NB: not all nodes may have legal identities).
|
||||
* A network map contains lists of nodes on the network along with information about their identity keys, services
|
||||
* they provide and host names or IP addresses where they can be connected to. A reasonable architecture for the
|
||||
* network map service might be one like the Tor directory authorities, where several nodes linked by RAFT or Paxos
|
||||
* elect a leader and that leader distributes signed documents describing the network layout. Those documents can
|
||||
* then be cached by every node and thus a network map can be retrieved given only a single successful peer connection.
|
||||
*
|
||||
* A real implementation would probably do RPCs to a lookup service which might in turn be backed by a ZooKeeper
|
||||
* cluster or equivalent.
|
||||
*
|
||||
* For now, this class is truly minimal.
|
||||
*/
|
||||
* This interface assumes fast, synchronous access to an in-memory map.
|
||||
*/
|
||||
interface NetworkMap {
|
||||
val timestampingNodes: List<LegallyIdentifiableNode>
|
||||
}
|
||||
|
||||
// TODO: Move this to the test tree once a real network map is implemented and this scaffolding is no longer needed.
|
||||
class MockNetworkMap : NetworkMap {
|
||||
override val timestampingNodes = Collections.synchronizedList(ArrayList<LegallyIdentifiableNode>())
|
||||
}
|
||||
|
@ -176,6 +176,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
checkpointsMap.remove(prevCheckpointKey)
|
||||
}
|
||||
} catch (t: Throwable) {
|
||||
// TODO: Quasar is logging exceptions by itself too, find out where and stop it.
|
||||
logger.error("Caught error whilst invoking protocol state machine", t)
|
||||
throw t
|
||||
}
|
||||
@ -312,4 +313,4 @@ open class FiberRequest(val topic: String, val destination: MessageRecipients?,
|
||||
|
||||
class NotExpectingResponse(topic: String, destination: MessageRecipients, sessionIDForSend: Long, obj: Any?)
|
||||
: FiberRequest(topic, destination, sessionIDForSend, -1, obj)
|
||||
}
|
||||
}
|
||||
|
286
src/main/kotlin/core/node/ArtemisMessagingService.kt
Normal file
286
src/main/kotlin/core/node/ArtemisMessagingService.kt
Normal file
@ -0,0 +1,286 @@
|
||||
/*
|
||||
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
|
||||
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
|
||||
* set forth therein.
|
||||
*
|
||||
* All other rights reserved.
|
||||
*/
|
||||
|
||||
package core.node
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import core.ThreadBox
|
||||
import core.messaging.*
|
||||
import core.utilities.loggerFor
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration
|
||||
import org.apache.activemq.artemis.core.config.Configuration
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
|
||||
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
|
||||
import org.apache.activemq.artemis.core.security.Role
|
||||
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
|
||||
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule
|
||||
import java.math.BigInteger
|
||||
import java.nio.file.Path
|
||||
import java.security.SecureRandom
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import java.util.concurrent.Executor
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
// TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman.
|
||||
// TODO: Implement a discovery engine that can trigger builds of new connections when another node registers? (later)
|
||||
// TODO: SSL
|
||||
|
||||
/**
|
||||
* This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product.
|
||||
* Artemis is a message queue broker and here, we embed the entire server inside our own process. Nodes communicate
|
||||
* with each other using (by default) an Artemis specific protocol, but it supports other protocols like AQMP/1.0
|
||||
* as well.
|
||||
*
|
||||
* The current implementation is skeletal and lacks features like security or firewall tunnelling (that is, you must
|
||||
* be able to receive TCP connections in order to receive messages). It is good enough for local communication within
|
||||
* a fully connected network, trusted network or on localhost.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort) : MessagingService {
|
||||
// In future: can contain onion routing info, etc.
|
||||
private data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient
|
||||
|
||||
companion object {
|
||||
val log = loggerFor<ArtemisMessagingService>()
|
||||
|
||||
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
|
||||
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
|
||||
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
|
||||
// confusion.
|
||||
val TOPIC_PROPERTY = "platform-topic"
|
||||
|
||||
/** Temp helper until network map is established. */
|
||||
fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = Address(hostAndPort)
|
||||
}
|
||||
|
||||
private lateinit var mq: EmbeddedActiveMQ
|
||||
private lateinit var clientFactory: ClientSessionFactory
|
||||
private lateinit var session: ClientSession
|
||||
private lateinit var inboundConsumer: ClientConsumer
|
||||
|
||||
private class InnerState {
|
||||
var running = false
|
||||
val sendClients = HashMap<Address, ClientProducer>()
|
||||
}
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
|
||||
/** A registration to handle messages of different types */
|
||||
inner class Handler(val executor: Executor?, val topic: String,
|
||||
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
private val handlers = CopyOnWriteArrayList<Handler>()
|
||||
|
||||
private fun getSendClient(addr: Address): ClientProducer {
|
||||
return mutex.locked {
|
||||
sendClients.getOrPut(addr) {
|
||||
maybeSetupConnection(addr.hostAndPort)
|
||||
val qName = addr.hostAndPort.hostText
|
||||
session.createProducer(qName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun start() {
|
||||
// Wire up various bits of configuration. This is so complicated because Artemis is an embedded message queue
|
||||
// server. Thus we're running both a "server" and a "client" in the same JVM process. A future node might be
|
||||
// able to use an external MQ server instead, for instance, if a bank already has an MQ setup and wishes to
|
||||
// reuse it, or if it makes sense for scaling to split the functionality out, or if it makes sense for security.
|
||||
//
|
||||
// But for now, we bundle it all up into one thing.
|
||||
mq = EmbeddedActiveMQ()
|
||||
val config = createArtemisConfig(directory, myHostPort)
|
||||
mq.setConfiguration(config)
|
||||
val secConfig = SecurityConfiguration()
|
||||
val password = BigInteger(128, SecureRandom.getInstanceStrong()).toString(16)
|
||||
secConfig.addUser("internal", password)
|
||||
secConfig.addRole("internal", "internal")
|
||||
secConfig.defaultUser = "internal"
|
||||
config.setSecurityRoles(mapOf(
|
||||
"#" to setOf(Role("internal", true, true, true, true, true, true, true))
|
||||
))
|
||||
val secManager = ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, secConfig)
|
||||
mq.setSecurityManager(secManager)
|
||||
mq.start()
|
||||
|
||||
// Connect to our in-memory server.
|
||||
clientFactory = ActiveMQClient.createServerLocatorWithoutHA(
|
||||
TransportConfiguration(InVMConnectorFactory::class.java.name)).createSessionFactory()
|
||||
|
||||
// Create a queue on which to receive messages and set up the handler.
|
||||
session = clientFactory.createSession()
|
||||
session.createQueue(myHostPort.hostText, "inbound", false)
|
||||
inboundConsumer = session.createConsumer("inbound").setMessageHandler { message: ClientMessage ->
|
||||
// This code runs for every inbound message.
|
||||
if (!message.containsProperty(TOPIC_PROPERTY)) {
|
||||
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
|
||||
// TODO: Figure out whether we always need to acknowledge messages, even when invalid.
|
||||
return@setMessageHandler
|
||||
}
|
||||
val topic = message.getStringProperty(TOPIC_PROPERTY)
|
||||
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
|
||||
// or removed whilst the filter is executing will not affect anything.
|
||||
val deliverTo = handlers.filter { if (it.topic.isBlank()) true else it.topic == topic }
|
||||
|
||||
if (deliverTo.isEmpty()) {
|
||||
// This should probably be downgraded to a trace in future, so the protocol can evolve with new topics
|
||||
// without causing log spam.
|
||||
log.warn("Received message for $topic that doesn't have any registered handlers.")
|
||||
return@setMessageHandler
|
||||
}
|
||||
|
||||
val bits = ByteArray(message.bodySize)
|
||||
message.bodyBuffer.readBytes(bits)
|
||||
|
||||
val msg = object : Message {
|
||||
override val topic = topic
|
||||
override val data: ByteArray = bits
|
||||
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
|
||||
override val debugMessageID: String = message.messageID.toString()
|
||||
override fun serialise(): ByteArray = bits
|
||||
}
|
||||
for (handler in deliverTo) {
|
||||
(handler.executor ?: MoreExecutors.directExecutor()).execute {
|
||||
try {
|
||||
handler.callback(msg, handler)
|
||||
} catch(e: Exception) {
|
||||
log.error("Caught exception whilst executing message handler for $topic", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
message.acknowledge()
|
||||
}
|
||||
session.start()
|
||||
|
||||
mutex.locked { running = true }
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
mutex.locked {
|
||||
for (producer in sendClients.values)
|
||||
producer.close()
|
||||
sendClients.clear()
|
||||
inboundConsumer.close()
|
||||
session.close()
|
||||
mq.stop()
|
||||
|
||||
// We expect to be garbage collected shortly after being stopped, so we don't null anything explicitly here.
|
||||
|
||||
running = false
|
||||
}
|
||||
}
|
||||
|
||||
override fun send(message: Message, target: MessageRecipients) {
|
||||
if (target !is Address)
|
||||
TODO("Only simple sends to single recipients are currently implemented")
|
||||
val artemisMessage = session.createMessage(true).putStringProperty("platform-topic", message.topic).writeBodyBufferBytes(message.data)
|
||||
getSendClient(target).send(artemisMessage)
|
||||
}
|
||||
|
||||
override fun addMessageHandler(topic: String, executor: Executor?,
|
||||
callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
|
||||
val handler = Handler(executor, topic, callback)
|
||||
handlers.add(handler)
|
||||
return handler
|
||||
}
|
||||
|
||||
override fun removeMessageHandler(registration: MessageHandlerRegistration) {
|
||||
handlers.remove(registration)
|
||||
}
|
||||
|
||||
override fun createMessage(topic: String, data: ByteArray): Message {
|
||||
// TODO: We could write an object that proxies directly to an underlying MQ message here and avoid copying.
|
||||
return object : Message {
|
||||
override val topic: String get() = topic
|
||||
override val data: ByteArray get() = data
|
||||
override val debugTimestamp: Instant = Instant.now()
|
||||
override fun serialise(): ByteArray = this.serialise()
|
||||
override val debugMessageID: String get() = Instant.now().toEpochMilli().toString()
|
||||
override fun toString() = topic + "#" + String(data)
|
||||
}
|
||||
}
|
||||
|
||||
override val myAddress: SingleMessageRecipient = Address(myHostPort)
|
||||
|
||||
private enum class ConnectionDirection { INBOUND, OUTBOUND }
|
||||
|
||||
private fun maybeSetupConnection(hostAndPort: HostAndPort) {
|
||||
val name = hostAndPort.hostText
|
||||
|
||||
// To make ourselves talk to a remote server, we need a "bridge". Bridges are things inside Artemis that know how
|
||||
// to handle remote machines going away temporarily, retry connections, etc. They're the bit that handles
|
||||
// unreliable peers. Thus, we need one bridge per node we are talking to.
|
||||
//
|
||||
// Each bridge consumes from a queue on our end and forwards messages to a queue on their end. So for each node
|
||||
// we must create a queue, then create and configure a bridge.
|
||||
//
|
||||
// Note that bridges are not two way. A having a bridge to B does not imply that B can connect back to A. This
|
||||
// becomes important for cases like firewall tunnelling and connection proxying where connectivity is not
|
||||
// entirely duplex. The Artemis team may add this functionality in future:
|
||||
//
|
||||
// https://issues.apache.org/jira/browse/ARTEMIS-355
|
||||
if (!session.queueQuery(SimpleString(name)).isExists) {
|
||||
session.createQueue(name, name, true /* durable */)
|
||||
}
|
||||
if (!mq.activeMQServer.configuration.connectorConfigurations.containsKey(name)) {
|
||||
mq.activeMQServer.configuration.addConnectorConfiguration(name, tcpTransport(ConnectionDirection.OUTBOUND,
|
||||
hostAndPort.hostText, hostAndPort.port))
|
||||
mq.activeMQServer.deployBridge(BridgeConfiguration().apply {
|
||||
setName(name)
|
||||
setQueueName(name)
|
||||
setForwardingAddress(name)
|
||||
setStaticConnectors(listOf(name))
|
||||
setConfirmationWindowSize(100000) // a guess
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
private fun setConfigDirectories(config: Configuration, dir: Path) {
|
||||
config.apply {
|
||||
setBindingsDirectory(dir.resolve("bindings").toString())
|
||||
setJournalDirectory(dir.resolve("journal").toString())
|
||||
setLargeMessagesDirectory(dir.resolve("largemessages").toString())
|
||||
}
|
||||
}
|
||||
|
||||
private fun createArtemisConfig(directory: Path, hp: HostAndPort): Configuration {
|
||||
val config = ConfigurationImpl()
|
||||
setConfigDirectories(config, directory)
|
||||
// We will be talking to our server purely in memory.
|
||||
config.setAcceptorConfigurations(
|
||||
setOf(
|
||||
tcpTransport(ConnectionDirection.INBOUND, "localhost", hp.port),
|
||||
TransportConfiguration(InVMAcceptorFactory::class.java.name)
|
||||
)
|
||||
)
|
||||
return config
|
||||
}
|
||||
|
||||
private fun tcpTransport(direction: ConnectionDirection, host: String, port: Int) =
|
||||
TransportConfiguration(
|
||||
when (direction) {
|
||||
ConnectionDirection.INBOUND -> NettyAcceptorFactory::class.java.name
|
||||
ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name
|
||||
},
|
||||
mapOf(
|
||||
TransportConstants.HOST_PROP_NAME to host,
|
||||
TransportConstants.PORT_PROP_NAME to port.toInt()
|
||||
)
|
||||
)
|
||||
|
||||
}
|
48
src/main/kotlin/core/node/E2ETestKeyManagementService.kt
Normal file
48
src/main/kotlin/core/node/E2ETestKeyManagementService.kt
Normal file
@ -0,0 +1,48 @@
|
||||
/*
|
||||
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
|
||||
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
|
||||
* set forth therein.
|
||||
*
|
||||
* All other rights reserved.
|
||||
*/
|
||||
|
||||
package core.node
|
||||
|
||||
import core.KeyManagementService
|
||||
import core.ThreadBox
|
||||
import java.security.KeyPair
|
||||
import java.security.KeyPairGenerator
|
||||
import java.security.PrivateKey
|
||||
import java.security.PublicKey
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* A simple in-memory KMS that doesn't bother saving keys to disk. A real implementation would:
|
||||
*
|
||||
* - Probably be accessed via the network layer as an internal node service i.e. via a message queue, so it can run
|
||||
* on a separate/firewalled service.
|
||||
* - Use the protocol framework so requests to fetch keys can be suspended whilst a human signs off on the request.
|
||||
* - Use deterministic key derivation.
|
||||
* - Possibly have some sort of TREZOR-like two-factor authentication ability
|
||||
*
|
||||
* etc
|
||||
*/
|
||||
@ThreadSafe
|
||||
class E2ETestKeyManagementService : KeyManagementService {
|
||||
private class InnerState {
|
||||
val keys = HashMap<PublicKey, PrivateKey>()
|
||||
}
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
|
||||
// Accessing this map clones it.
|
||||
override val keys: Map<PublicKey, PrivateKey> get() = mutex.locked { HashMap(keys) }
|
||||
|
||||
override fun freshKey(): KeyPair {
|
||||
val keypair = KeyPairGenerator.getInstance("EC").genKeyPair()
|
||||
mutex.locked {
|
||||
keys[keypair.public] = keypair.private
|
||||
}
|
||||
return keypair
|
||||
}
|
||||
}
|
88
src/main/kotlin/core/node/E2ETestWalletService.kt
Normal file
88
src/main/kotlin/core/node/E2ETestWalletService.kt
Normal file
@ -0,0 +1,88 @@
|
||||
/*
|
||||
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
|
||||
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
|
||||
* set forth therein.
|
||||
*
|
||||
* All other rights reserved.
|
||||
*/
|
||||
|
||||
package core.node
|
||||
|
||||
import contracts.Cash
|
||||
import core.*
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* This class implements a simple, in memory wallet that tracks states that are owned by us, and also has a convenience
|
||||
* method to auto-generate some self-issued cash states that can be used for test trading. A real wallet would persist
|
||||
* states relevant to us into a database and once such a wallet is implemented, this scaffolding can be removed.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class E2ETestWalletService(private val services: ServiceHub) : WalletService {
|
||||
// Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're
|
||||
// inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference
|
||||
// to wallet somewhere.
|
||||
private class InnerState {
|
||||
var wallet: Wallet = Wallet(emptyList<StateAndRef<OwnableState>>())
|
||||
}
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
|
||||
override val currentWallet: Wallet get() = mutex.locked { wallet }
|
||||
|
||||
/**
|
||||
* Creates a random set of between (by default) 3 and 10 cash states that add up to the given amount and adds them
|
||||
* to the wallet.
|
||||
*
|
||||
* The cash is self issued with the current nodes identity, as fetched from the storage service. Thus it
|
||||
* would not be trusted by any sensible market participant and is effectively an IOU. If it had been issued by
|
||||
* the central bank, well ... that'd be a different story altogether.
|
||||
*/
|
||||
fun fillWithSomeTestCash(howMuch: Amount, atLeastThisManyStates: Int = 3, atMostThisManyStates: Int = 10, rng: Random = Random()) {
|
||||
val amounts = calculateRandomlySizedAmounts(howMuch, atLeastThisManyStates, atMostThisManyStates, rng)
|
||||
|
||||
val myIdentity = services.storageService.myLegalIdentity
|
||||
val myKey = services.storageService.myLegalIdentityKey
|
||||
|
||||
// We will allocate one state to one transaction, for simplicities sake.
|
||||
val cash = Cash()
|
||||
val transactions = amounts.map { pennies ->
|
||||
// This line is what makes the cash self issued. We just use zero as our deposit reference: we don't need
|
||||
// this field as there's no other database or source of truth we need to sync with.
|
||||
val depositRef = myIdentity.ref(0)
|
||||
|
||||
val issuance = TransactionBuilder()
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
cash.craftIssue(issuance, Amount(pennies, howMuch.currency), depositRef, freshKey.public)
|
||||
issuance.signWith(myKey)
|
||||
|
||||
return@map issuance.toSignedTransaction(true)
|
||||
}
|
||||
|
||||
val statesAndRefs = transactions.map {
|
||||
StateAndRef(it.tx.outputStates[0] as OwnableState, ContractStateRef(it.id, 0))
|
||||
}
|
||||
|
||||
mutex.locked {
|
||||
wallet = wallet.copy(wallet.states + statesAndRefs)
|
||||
}
|
||||
}
|
||||
|
||||
private fun calculateRandomlySizedAmounts(howMuch: Amount, min: Int, max: Int, rng: Random): LongArray {
|
||||
val numStates = min + Math.floor(rng.nextDouble() * (max - min)).toInt()
|
||||
val amounts = LongArray(numStates)
|
||||
val baseSize = howMuch.pennies / numStates
|
||||
var filledSoFar = 0L
|
||||
for (i in 0..numStates - 1) {
|
||||
if (i < numStates - 1) {
|
||||
// Adjust the amount a bit up or down, to give more realistic amounts (not all identical).
|
||||
amounts[i] = baseSize + (baseSize / 2 * (rng.nextDouble() - 0.5)).toLong()
|
||||
filledSoFar += baseSize
|
||||
} else {
|
||||
// Handle inexact rounding.
|
||||
amounts[i] = howMuch.pennies - filledSoFar
|
||||
}
|
||||
}
|
||||
return amounts
|
||||
}
|
||||
}
|
21
src/main/kotlin/core/node/FixedIdentityService.kt
Normal file
21
src/main/kotlin/core/node/FixedIdentityService.kt
Normal file
@ -0,0 +1,21 @@
|
||||
/*
|
||||
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
|
||||
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
|
||||
* set forth therein.
|
||||
*
|
||||
* All other rights reserved.
|
||||
*/
|
||||
|
||||
package core.node
|
||||
|
||||
import core.IdentityService
|
||||
import core.Party
|
||||
import java.security.PublicKey
|
||||
|
||||
/**
|
||||
* Scaffolding: a dummy identity service that just expects to have identities loaded off disk or found elsewhere.
|
||||
*/
|
||||
class FixedIdentityService(private val identities: List<Party>) : IdentityService {
|
||||
private val keyToParties = identities.toMapBy { it.owningKey }
|
||||
override fun partyFromKey(key: PublicKey): Party? = keyToParties[key]
|
||||
}
|
159
src/main/kotlin/core/node/Node.kt
Normal file
159
src/main/kotlin/core/node/Node.kt
Normal file
@ -0,0 +1,159 @@
|
||||
/*
|
||||
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
|
||||
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
|
||||
* set forth therein.
|
||||
*
|
||||
* All other rights reserved.
|
||||
*/
|
||||
|
||||
package core.node
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import core.*
|
||||
import core.messaging.*
|
||||
import core.serialization.deserialize
|
||||
import core.serialization.serialize
|
||||
import core.utilities.loggerFor
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.security.KeyPair
|
||||
import java.security.KeyPairGenerator
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
val DEFAULT_PORT = 31337
|
||||
|
||||
class ConfigurationException(message: String) : Exception(message)
|
||||
|
||||
// TODO: Split this into a regression testing environment
|
||||
|
||||
/**
|
||||
* A simple wrapper around a plain old Java .properties file. The keys have the same name as in the source code.
|
||||
*
|
||||
* TODO: Replace Java properties file with a better config file format (maybe yaml).
|
||||
* We want to be able to configure via a GUI too, so an ability to round-trip whitespace, comments etc when machine
|
||||
* editing the file is a must-have.
|
||||
*/
|
||||
class NodeConfiguration(private val properties: Properties) {
|
||||
val myLegalName: String by properties
|
||||
}
|
||||
|
||||
/**
|
||||
* A Node manages a standalone server that takes part in the P2P network. It creates the services found in [ServiceHub],
|
||||
* loads important data off disk and starts listening for connections.
|
||||
*
|
||||
* @param dir A [Path] to a location on disk where working files can be found or stored.
|
||||
* @param myNetAddr The host and port that this server will use. It can't find out its own external hostname, so you
|
||||
* have to specify that yourself.
|
||||
* @param configuration This is typically loaded from a .properties file
|
||||
* @param timestamperAddress If null, this node will become a timestamping node, otherwise, it will use that one.
|
||||
*/
|
||||
class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeConfiguration,
|
||||
timestamperAddress: LegallyIdentifiableNode?) {
|
||||
private val log = loggerFor<Node>()
|
||||
|
||||
// We will run as much stuff in this thread as possible to keep the risk of thread safety bugs low during the
|
||||
// low-performance prototyping period.
|
||||
val serverThread = Executors.newSingleThreadExecutor()
|
||||
|
||||
val services = object : ServiceHub {
|
||||
override val networkService: MessagingService get() = net
|
||||
override val networkMapService: NetworkMap = MockNetworkMap()
|
||||
override val storageService: StorageService get() = storage
|
||||
override val walletService: WalletService get() = wallet
|
||||
override val keyManagementService: KeyManagementService get() = keyManagement
|
||||
override val identityService: IdentityService get() = identity
|
||||
}
|
||||
|
||||
// TODO: Implement mutual exclusion so we can't start the node twice by accident.
|
||||
|
||||
val storage = makeStorageService(dir)
|
||||
val smm = StateMachineManager(services, serverThread)
|
||||
val net = ArtemisMessagingService(dir, myNetAddr)
|
||||
val wallet: WalletService = E2ETestWalletService(services)
|
||||
val keyManagement = E2ETestKeyManagementService()
|
||||
val inNodeTimestampingService: TimestamperNodeService?
|
||||
val identity: IdentityService
|
||||
|
||||
init {
|
||||
// Insert a network map entry for the timestamper: this is all temp scaffolding and will go away. If we are
|
||||
// given the details, the timestamping node is somewhere else. Otherwise, we do our own timestamping.
|
||||
val tsid = if (timestamperAddress != null) {
|
||||
inNodeTimestampingService = null
|
||||
timestamperAddress
|
||||
} else {
|
||||
inNodeTimestampingService = TimestamperNodeService(net, storage.myLegalIdentity, storage.myLegalIdentityKey)
|
||||
LegallyIdentifiableNode(net.myAddress, storage.myLegalIdentity)
|
||||
}
|
||||
(services.networkMapService as MockNetworkMap).timestampingNodes.add(tsid)
|
||||
|
||||
// We don't have any identity infrastructure right now, so we just throw together the only two identities we
|
||||
// know about: our own, and the identity of the remote timestamper node (if any).
|
||||
val knownIdentities = if (timestamperAddress != null)
|
||||
listOf(storage.myLegalIdentity, timestamperAddress.identity)
|
||||
else
|
||||
listOf(storage.myLegalIdentity)
|
||||
identity = FixedIdentityService(knownIdentities)
|
||||
|
||||
net.start()
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
net.stop()
|
||||
serverThread.shutdownNow()
|
||||
}
|
||||
|
||||
fun makeStorageService(dir: Path): StorageService {
|
||||
// Load the private identity key, creating it if necessary. The identity key is a long term well known key that
|
||||
// is distributed to other peers and we use it (or a key signed by it) when we need to do something
|
||||
// "permissioned". The identity file is what gets distributed and contains the node's legal name along with
|
||||
// the public key. Obviously in a real system this would need to be a certificate chain of some kind to ensure
|
||||
// the legal name is actually validated in some way.
|
||||
val privKeyFile = dir.resolve(PRIVATE_KEY_FILE_NAME)
|
||||
val pubIdentityFile = dir.resolve(PUBLIC_IDENTITY_FILE_NAME)
|
||||
|
||||
val (identity, keypair) = if (!Files.exists(privKeyFile)) {
|
||||
log.info("Identity key not found, generating fresh key!")
|
||||
val keypair: KeyPair = KeyPairGenerator.getInstance("EC").genKeyPair()
|
||||
keypair.serialize().writeToFile(privKeyFile)
|
||||
val myIdentity = Party(configuration.myLegalName, keypair.public)
|
||||
// We include the Party class with the file here to help catch mixups when admins provide files of the
|
||||
// wrong type by mistake.
|
||||
myIdentity.serialize(includeClassName = true).writeToFile(pubIdentityFile)
|
||||
Pair(myIdentity, keypair)
|
||||
} else {
|
||||
// Check that the identity in the config file matches the identity file we have stored to disk.
|
||||
// This is just a sanity check. It shouldn't fail unless the admin has fiddled with the files and messed
|
||||
// things up for us.
|
||||
val myIdentity = Files.readAllBytes(pubIdentityFile).deserialize<Party>(includeClassName = true)
|
||||
if (myIdentity.name != configuration.myLegalName)
|
||||
throw ConfigurationException("The legal name in the config file doesn't match the stored identity file:" +
|
||||
"${configuration.myLegalName} vs ${myIdentity.name}")
|
||||
// Load the private key.
|
||||
val keypair = Files.readAllBytes(privKeyFile).deserialize<KeyPair>()
|
||||
Pair(myIdentity, keypair)
|
||||
}
|
||||
|
||||
log.info("Node owned by ${identity.name} starting up ...")
|
||||
|
||||
return object : StorageService {
|
||||
private val tables = HashMap<String, MutableMap<Any, Any>>()
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
override fun <K, V> getMap(tableName: String): MutableMap<K, V> {
|
||||
// TODO: This should become a database.
|
||||
synchronized(tables) {
|
||||
return tables.getOrPut(tableName) { Collections.synchronizedMap(HashMap<Any, Any>()) } as MutableMap<K, V>
|
||||
}
|
||||
}
|
||||
|
||||
override val myLegalIdentity = identity
|
||||
override val myLegalIdentityKey = keypair
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
val PRIVATE_KEY_FILE_NAME = "identity-private-key"
|
||||
val PUBLIC_IDENTITY_FILE_NAME = "identity-public"
|
||||
}
|
||||
}
|
216
src/main/kotlin/core/node/TraderDemo.kt
Normal file
216
src/main/kotlin/core/node/TraderDemo.kt
Normal file
@ -0,0 +1,216 @@
|
||||
/*
|
||||
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
|
||||
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
|
||||
* set forth therein.
|
||||
*
|
||||
* All other rights reserved.
|
||||
*/
|
||||
|
||||
package core.node
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import contracts.CommercialPaper
|
||||
import contracts.protocols.TwoPartyTradeProtocol
|
||||
import core.*
|
||||
import core.messaging.LegallyIdentifiableNode
|
||||
import core.messaging.SingleMessageRecipient
|
||||
import core.messaging.runOnNextMessage
|
||||
import core.messaging.send
|
||||
import core.serialization.deserialize
|
||||
import core.utilities.BriefLogFormatter
|
||||
import joptsimple.OptionParser
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.security.KeyPairGenerator
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
// TRADING DEMO
|
||||
//
|
||||
// This demo app can be run in one of two modes. In the listening mode it will buy commercial paper from a selling node
|
||||
// that connects to it, using IOU cash it issued to itself. It also runs a timestamping service in this mode. In the
|
||||
// non-listening mode, it will connect to the specified listening node and sell some commercial paper in return for
|
||||
// cash. There's currently no UI so all you can see is log messages.
|
||||
//
|
||||
// Please note that the software currently assumes every node has a unique DNS name. Thus you cannot name both nodes
|
||||
// "localhost". This might get fixed in future, but for now to run the listening node, alias "alpha" to "localhost"
|
||||
// in your /etc/hosts file and then try a command line like this:
|
||||
//
|
||||
// --dir=alpha --service-fake-trades --network-address=alpha
|
||||
//
|
||||
// To run the node that initiates a trade, alias "beta" to "localhost" in your /etc/hosts file and then try a command
|
||||
// line like this:
|
||||
//
|
||||
// --dir=beta --fake-trade-with=alpha --network-address=beta:31338
|
||||
// --timestamper-identity-file=alpha/identity-public --timestamper-address=alpha
|
||||
//
|
||||
// Alternatively,
|
||||
|
||||
|
||||
fun main(args: Array<String>) {
|
||||
val parser = OptionParser()
|
||||
val networkAddressArg = parser.accepts("network-address").withRequiredArg().required()
|
||||
val dirArg = parser.accepts("directory").withRequiredArg().defaultsTo("nodedata")
|
||||
|
||||
// Some dummy functionality that won't last long ...
|
||||
|
||||
// Mode flags for the first demo.
|
||||
val serviceFakeTradesArg = parser.accepts("service-fake-trades")
|
||||
val fakeTradeWithArg = parser.accepts("fake-trade-with").requiredUnless(serviceFakeTradesArg).withRequiredArg()
|
||||
|
||||
// Temporary flags until network map and service discovery is fleshed out. The identity file does NOT contain the
|
||||
// network address because all this stuff is meant to come from a dynamic discovery service anyway, and the identity
|
||||
// is meant to be long-term stable. It could contain a domain name, but we may end up not routing messages directly
|
||||
// to DNS-identified endpoints anyway (e.g. consider onion routing as a possibility).
|
||||
val timestamperIdentityFile = parser.accepts("timestamper-identity-file").requiredIf(fakeTradeWithArg).withRequiredArg()
|
||||
val timestamperNetAddr = parser.accepts("timestamper-address").requiredIf(timestamperIdentityFile).withRequiredArg()
|
||||
|
||||
val options = try {
|
||||
parser.parse(*args)
|
||||
} catch (e: Exception) {
|
||||
println(e.message)
|
||||
printHelp()
|
||||
System.exit(1)
|
||||
throw Exception() // TODO: Remove when upgrading to Kotlin 1.0 RC
|
||||
}
|
||||
|
||||
BriefLogFormatter.initVerbose("platform.trade")
|
||||
|
||||
val dir = Paths.get(options.valueOf(dirArg))
|
||||
val configFile = dir.resolve("config")
|
||||
|
||||
if (!Files.exists(dir)) {
|
||||
Files.createDirectory(dir)
|
||||
}
|
||||
|
||||
val config = loadConfigFile(configFile)
|
||||
|
||||
val myNetAddr = HostAndPort.fromString(options.valueOf(networkAddressArg)).withDefaultPort(DEFAULT_PORT)
|
||||
val listening = options.has(serviceFakeTradesArg)
|
||||
|
||||
val timestamperId = if (options.has(timestamperIdentityFile)) {
|
||||
val addr = HostAndPort.fromString(options.valueOf(timestamperNetAddr)).withDefaultPort(DEFAULT_PORT)
|
||||
val path = Paths.get(options.valueOf(timestamperIdentityFile))
|
||||
val party = Files.readAllBytes(path).deserialize<Party>(includeClassName = true)
|
||||
LegallyIdentifiableNode(ArtemisMessagingService.makeRecipient(addr), party)
|
||||
} else null
|
||||
|
||||
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId) }
|
||||
|
||||
// Now do some fake nonsense just to give us some activity.
|
||||
|
||||
(node.services.walletService as E2ETestWalletService).fillWithSomeTestCash(1000.DOLLARS)
|
||||
|
||||
val timestampingAuthority = node.services.networkMapService.timestampingNodes.first()
|
||||
if (listening) {
|
||||
// Wait around until a node asks to start a trade with us. In a real system, this part would happen out of band
|
||||
// via some other system like an exchange or maybe even a manual messaging system like Bloomberg. But for the
|
||||
// next stage in our building site, we will just auto-generate fake trades to give our nodes something to do.
|
||||
//
|
||||
// Note that currently, the two-party trade protocol doesn't actually resolve dependencies of transactions!
|
||||
// Thus, we can make up whatever junk we like and trade non-existent cash/assets: the other side won't notice.
|
||||
// Obviously, fixing that is the next step.
|
||||
//
|
||||
// As the seller initiates the DVP/two-party trade protocol, here, we will be the buyer.
|
||||
node.net.addMessageHandler("test.junktrade") { msg, handlerRegistration ->
|
||||
val replyTo = msg.data.deserialize<SingleMessageRecipient>(includeClassName = true)
|
||||
val buyerSessionID = random63BitValue()
|
||||
println("Got a new junk trade request, sending back session ID and starting buy protocol")
|
||||
val future = TwoPartyTradeProtocol.runBuyer(node.smm, timestampingAuthority, replyTo, 100.DOLLARS,
|
||||
CommercialPaper.State::class.java, buyerSessionID)
|
||||
|
||||
future.whenComplete {
|
||||
println("Purchase complete - we are a happy customer!")
|
||||
node.stop()
|
||||
}
|
||||
|
||||
node.net.send("test.junktrade.initiate", replyTo, buyerSessionID)
|
||||
}
|
||||
} else {
|
||||
// Grab a session ID for the fake trade from the other side, then kick off the seller and sell them some junk.
|
||||
if (!options.has(fakeTradeWithArg)) {
|
||||
println("Need the --fake-trade-with command line argument")
|
||||
System.exit(1)
|
||||
}
|
||||
val peerAddr = HostAndPort.fromString(options.valuesOf(fakeTradeWithArg).single()).withDefaultPort(DEFAULT_PORT)
|
||||
val otherSide = ArtemisMessagingService.makeRecipient(peerAddr)
|
||||
node.net.runOnNextMessage("test.junktrade.initiate") { msg ->
|
||||
val sessionID = msg.data.deserialize<Long>()
|
||||
|
||||
println("Got session ID back, now starting the sell protocol")
|
||||
|
||||
val cpOwnerKey = node.keyManagement.freshKey()
|
||||
val commercialPaper = makeFakeCommercialPaper(cpOwnerKey.public)
|
||||
|
||||
val future = TwoPartyTradeProtocol.runSeller(node.smm, timestampingAuthority,
|
||||
otherSide, commercialPaper, 100.DOLLARS, cpOwnerKey, sessionID)
|
||||
|
||||
future.whenComplete {
|
||||
println("Sale completed - we have a happy customer!")
|
||||
node.stop()
|
||||
}
|
||||
}
|
||||
println("Sending a request to get a session ID to the other side")
|
||||
node.net.send("test.junktrade", otherSide, node.net.myAddress, includeClassName = true)
|
||||
}
|
||||
}
|
||||
|
||||
fun makeFakeCommercialPaper(ownedBy: PublicKey): StateAndRef<CommercialPaper.State> {
|
||||
// Make a fake company that's issued its own paper.
|
||||
val party = Party("MegaCorp, Inc", KeyPairGenerator.getInstance("EC").genKeyPair().public)
|
||||
// ownedBy here is the random key that gives us control over it.
|
||||
val paper = CommercialPaper.State(party.ref(1,2,3), ownedBy, 1100.DOLLARS, Instant.now() + 10.days)
|
||||
val randomRef = ContractStateRef(SecureHash.randomSHA256(), 0)
|
||||
return StateAndRef(paper, randomRef)
|
||||
}
|
||||
|
||||
private fun loadConfigFile(configFile: Path): NodeConfiguration {
|
||||
fun askAdminToEditConfig(configFile: Path?) {
|
||||
println("This is the first run, so you should edit the config file in $configFile and then start the node again.")
|
||||
System.exit(1)
|
||||
}
|
||||
|
||||
val defaultLegalName = "Global MegaCorp, Ltd."
|
||||
|
||||
if (!Files.exists(configFile)) {
|
||||
createDefaultConfigFile(configFile, defaultLegalName)
|
||||
askAdminToEditConfig(configFile)
|
||||
}
|
||||
|
||||
val configProps = configFile.toFile().reader().use {
|
||||
Properties().apply { load(it) }
|
||||
}
|
||||
|
||||
val config = NodeConfiguration(configProps)
|
||||
|
||||
// Make sure admin did actually edit at least the legal name.
|
||||
if (config.myLegalName == defaultLegalName)
|
||||
askAdminToEditConfig(configFile)
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
private fun createDefaultConfigFile(configFile: Path?, defaultLegalName: String) {
|
||||
Files.write(configFile,
|
||||
"""
|
||||
# Node configuration: adjust below as needed, then delete this comment.
|
||||
myLegalName = $defaultLegalName
|
||||
""".trimIndent().toByteArray())
|
||||
}
|
||||
|
||||
private fun printHelp() {
|
||||
println("""
|
||||
|
||||
To run the listening node, alias "alpha" to "localhost" in your
|
||||
/etc/hosts file and then try a command line like this:
|
||||
|
||||
--dir=alpha --service-fake-trades --network-address=alpha
|
||||
|
||||
To run the node that initiates a trade, alias "beta" to "localhost"
|
||||
in your /etc/hosts file and then try a command line like this:
|
||||
|
||||
--dir=beta --fake-trade-with=alpha --network-address=beta:31338 --timestamper-identity-file=alpha/identity-public --timestamper-address=alpha
|
||||
""".trimIndent())
|
||||
}
|
@ -23,6 +23,8 @@ import de.javakaffee.kryoserializers.ArraysAsListSerializer
|
||||
import org.objenesis.strategy.StdInstantiatorStrategy
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.security.KeyPairGenerator
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
@ -66,14 +68,19 @@ val THREAD_LOCAL_KRYO = ThreadLocal.withInitial { createKryo() }
|
||||
*/
|
||||
class SerializedBytes<T : Any>(bits: ByteArray) : OpaqueBytes(bits) {
|
||||
val hash: SecureHash by lazy { bits.sha256() }
|
||||
|
||||
fun writeToFile(path: Path) = Files.write(path, bits)
|
||||
}
|
||||
|
||||
// Some extension functions that make deserialisation convenient and provide auto-casting of the result.
|
||||
inline fun <reified T : Any> ByteArray.deserialize(kryo: Kryo = THREAD_LOCAL_KRYO.get()): T {
|
||||
return kryo.readObject(Input(this), T::class.java)
|
||||
inline fun <reified T : Any> ByteArray.deserialize(kryo: Kryo = THREAD_LOCAL_KRYO.get(), includeClassName: Boolean = false): T {
|
||||
if (includeClassName)
|
||||
return kryo.readClassAndObject(Input(this)) as T
|
||||
else
|
||||
return kryo.readObject(Input(this), T::class.java)
|
||||
}
|
||||
inline fun <reified T : Any> OpaqueBytes.deserialize(kryo: Kryo = THREAD_LOCAL_KRYO.get()): T {
|
||||
return kryo.readObject(Input(this.bits), T::class.java)
|
||||
inline fun <reified T : Any> OpaqueBytes.deserialize(kryo: Kryo = THREAD_LOCAL_KRYO.get(), includeClassName: Boolean = false): T {
|
||||
return this.bits.deserialize(kryo, includeClassName)
|
||||
}
|
||||
inline fun <reified T : Any> SerializedBytes<T>.deserialize(): T = bits.deserialize()
|
||||
|
||||
@ -81,10 +88,13 @@ inline fun <reified T : Any> SerializedBytes<T>.deserialize(): T = bits.deserial
|
||||
* Can be called on any object to convert it to a byte array (wrapped by [SerializedBytes]), regardless of whether
|
||||
* the type is marked as serializable or was designed for it (so be careful!)
|
||||
*/
|
||||
fun <T : Any> T.serialize(kryo: Kryo = THREAD_LOCAL_KRYO.get()): SerializedBytes<T> {
|
||||
fun <T : Any> T.serialize(kryo: Kryo = THREAD_LOCAL_KRYO.get(), includeClassName: Boolean = false): SerializedBytes<T> {
|
||||
val stream = ByteArrayOutputStream()
|
||||
Output(stream).use {
|
||||
kryo.writeObject(it, this)
|
||||
if (includeClassName)
|
||||
kryo.writeClassAndObject(it, this)
|
||||
else
|
||||
kryo.writeObject(it, this)
|
||||
}
|
||||
return SerializedBytes(stream.toByteArray())
|
||||
}
|
||||
|
@ -9,6 +9,8 @@
|
||||
package core
|
||||
|
||||
import core.messaging.MessagingService
|
||||
import core.messaging.MockNetworkMap
|
||||
import core.messaging.NetworkMap
|
||||
import core.node.TimestampingError
|
||||
import core.serialization.SerializedBytes
|
||||
import core.serialization.deserialize
|
||||
@ -63,11 +65,13 @@ class MockStorageService : StorageService {
|
||||
override val myLegalIdentityKey: KeyPair = KeyPairGenerator.getInstance("EC").genKeyPair()
|
||||
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)
|
||||
|
||||
private val mapOfMaps = HashMap<String, MutableMap<Any, Any>>()
|
||||
private val tables = HashMap<String, MutableMap<Any, Any>>()
|
||||
|
||||
@Synchronized
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
override fun <K, V> getMap(tableName: String): MutableMap<K, V> {
|
||||
return mapOfMaps.getOrPut(tableName) { Collections.synchronizedMap(HashMap<Any, Any>()) } as MutableMap<K, V>
|
||||
synchronized(tables) {
|
||||
return tables.getOrPut(tableName) { Collections.synchronizedMap(HashMap<Any, Any>()) } as MutableMap<K, V>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -76,7 +80,8 @@ class MockServices(
|
||||
val keyManagement: KeyManagementService? = null,
|
||||
val net: MessagingService? = null,
|
||||
val identity: IdentityService? = MockIdentityService,
|
||||
val storage: StorageService? = MockStorageService()
|
||||
val storage: StorageService? = MockStorageService(),
|
||||
val networkMap: NetworkMap? = MockNetworkMap()
|
||||
) : ServiceHub {
|
||||
override val walletService: WalletService
|
||||
get() = wallet ?: throw UnsupportedOperationException()
|
||||
@ -86,6 +91,8 @@ class MockServices(
|
||||
get() = identity ?: throw UnsupportedOperationException()
|
||||
override val networkService: MessagingService
|
||||
get() = net ?: throw UnsupportedOperationException()
|
||||
override val networkMapService: NetworkMap
|
||||
get() = networkMap ?: throw UnsupportedOperationException()
|
||||
override val storageService: StorageService
|
||||
get() = storage ?: throw UnsupportedOperationException()
|
||||
}
|
||||
|
45
src/test/kotlin/core/node/E2ETestWalletServiceTest.kt
Normal file
45
src/test/kotlin/core/node/E2ETestWalletServiceTest.kt
Normal file
@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
|
||||
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
|
||||
* set forth therein.
|
||||
*
|
||||
* All other rights reserved.
|
||||
*/
|
||||
|
||||
package core.node
|
||||
|
||||
import contracts.Cash
|
||||
import core.DOLLARS
|
||||
import core.MockKeyManagementService
|
||||
import core.MockServices
|
||||
import core.ServiceHub
|
||||
import core.testutils.ALICE
|
||||
import core.testutils.ALICE_KEY
|
||||
import org.junit.Test
|
||||
import java.security.KeyPair
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class E2ETestWalletServiceTest {
|
||||
val services: ServiceHub = MockServices(
|
||||
keyManagement = MockKeyManagementService(emptyMap(), arrayListOf<KeyPair>(ALICE_KEY, ALICE_KEY, ALICE_KEY))
|
||||
)
|
||||
|
||||
@Test fun splits() {
|
||||
val wallet = E2ETestWalletService(services)
|
||||
// Fix the PRNG so that we get the same splits every time.
|
||||
wallet.fillWithSomeTestCash(100.DOLLARS, 3, 3, Random(0L))
|
||||
|
||||
val w = wallet.currentWallet
|
||||
assertEquals(3, w.states.size)
|
||||
|
||||
val state = w.states[0].state as Cash.State
|
||||
assertEquals(services.storageService.myLegalIdentity, state.deposit.party)
|
||||
assertEquals(services.storageService.myLegalIdentityKey.public, state.deposit.party.owningKey)
|
||||
assertEquals(29.01.DOLLARS, state.amount)
|
||||
assertEquals(ALICE, state.owner)
|
||||
|
||||
assertEquals(33.34.DOLLARS, (w.states[2].state as Cash.State).amount)
|
||||
assertEquals(35.61.DOLLARS, (w.states[1].state as Cash.State).amount)
|
||||
}
|
||||
}
|
@ -48,7 +48,10 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
myNode = makeNode()
|
||||
serviceNode = makeNode()
|
||||
mockServices = MockServices(net = serviceNode.second, storage = MockStorageService())
|
||||
serverKey = network.setupTimestampingNode(true).first.identity.owningKey
|
||||
|
||||
val timestampingNodeID = network.setupTimestampingNode(true).first
|
||||
(mockServices.networkMapService as MockNetworkMap).timestampingNodes.add(timestampingNodeID)
|
||||
serverKey = timestampingNodeID.identity.owningKey
|
||||
|
||||
// And a separate one to be tested directly, to make the unit tests a bit faster.
|
||||
service = TimestamperNodeService(serviceNode.second, Party("Unit test suite", ALICE), ALICE_KEY)
|
||||
@ -76,7 +79,7 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
val psm = runNetwork {
|
||||
val smm = StateMachineManager(MockServices(net = myNode.second), RunOnCallerThread)
|
||||
val logName = TimestamperNodeService.TIMESTAMPING_PROTOCOL_TOPIC
|
||||
val psm = TestPSM(myNode.second.networkMap.timestampingNodes[0], clock.instant())
|
||||
val psm = TestPSM(mockServices.networkMapService.timestampingNodes[0], clock.instant())
|
||||
smm.add(logName, psm)
|
||||
psm
|
||||
}
|
||||
@ -128,4 +131,4 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
ptx.checkAndAddSignature(sig)
|
||||
ptx.toSignedTransaction(false).verifySignatures()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user