First pass at implementing dependency resolution and checking in the two-party trade protocol.

This commit is incomplete: only the seller side currently checks. The code will be refactored out into subprotocols in further commits.

Note that timeouts are currently unhandled.
This commit is contained in:
Mike Hearn 2016-02-09 19:48:15 +01:00
parent f92455442d
commit df4d926bca
18 changed files with 509 additions and 79 deletions

View File

@ -104,7 +104,7 @@ class Cash : Contract {
// literally anyone with access to the network can issue cash claims of arbitrary amounts! It is up
// to the recipient to decide if the backing party is trustworthy or not, via some
// as-yet-unwritten identity service. See ADP-22 for discussion.
val outputsInstitution = outputs.map { it.deposit.party }.singleOrNull()
val outputsInstitution = outputs.map { it.deposit.party }.distinct().singleOrNull()
if (outputsInstitution != null) {
requireThat {
"the issue command has a nonce" by (issueCommand.value.nonce != 0L)

View File

@ -45,9 +45,7 @@ class Requirements {
}
}
val R = Requirements()
inline fun requireThat(body: Requirements.() -> Unit) {
R.body()
}
inline fun <R> requireThat(body: Requirements.() -> R) = R.body()
//// Amounts //////////////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -12,6 +12,7 @@ import co.paralleluniverse.fibers.Suspendable
import core.crypto.DigitalSignature
import core.crypto.SecureHash
import core.crypto.signWithECDSA
import core.crypto.toStringShort
import core.node.TimestampingError
import core.serialization.SerializedBytes
import core.serialization.deserialize
@ -83,6 +84,9 @@ data class WireTransaction(val inputs: List<StateRef>,
return SignedTransaction(serialized, withSigs)
}
@Suppress("UNCHECKED_CAST")
fun <T : ContractState> outRef(index: Int) = StateAndRef(outputs[index] as T, StateRef(id, index))
override fun toString(): String {
val buf = StringBuilder()
buf.appendln("Transaction:")
@ -128,7 +132,7 @@ data class SignedTransaction(val txBits: SerializedBytes<WireTransaction>, val s
val cmdKeys = tx.commands.flatMap { it.pubkeys }.toSet()
val sigKeys = sigs.map { it.by }.toSet()
if (!sigKeys.containsAll(cmdKeys))
throw SignatureException("Missing signatures on the transaction for: ${cmdKeys - sigKeys}")
throw SignatureException("Missing signatures on transaction ${id.prefixChars()} for: ${(cmdKeys - sigKeys).map { it.toStringShort() }}")
}
/**

View File

@ -14,16 +14,16 @@ import contracts.Cash
import contracts.sumCashBy
import core.*
import core.crypto.DigitalSignature
import core.crypto.SecureHash
import core.crypto.signWithECDSA
import core.messaging.LegallyIdentifiableNode
import core.messaging.ProtocolStateMachine
import core.messaging.SingleMessageRecipient
import core.messaging.StateMachineManager
import core.messaging.*
import core.node.DataVendingService
import core.node.TimestamperClient
import core.utilities.trace
import java.security.KeyPair
import java.security.PublicKey
import java.time.Instant
import java.util.*
/**
* This asset trading protocol implements a "delivery vs payment" type swap. It has two parties (B and S for buyer
@ -67,6 +67,12 @@ object TwoPartyTradeProtocol {
return buyer.resultFuture
}
class UnacceptablePriceException(val givenPrice: Amount) : Exception()
class AssetMismatchException(val expectedTypeName: String, val typeName: String) : Exception() {
override fun toString() = "The submitted asset didn't match the expected type: $expectedTypeName vs $typeName"
}
class ExcessivelyLargeTransactionGraphException() : Exception()
// This object is serialised to the network and is the first protocol message the seller sends to the buyer.
class SellerTradeInfo(
val assetForSale: StateAndRef<OwnableState>,
@ -103,11 +109,14 @@ object TwoPartyTradeProtocol {
// Make the first message we'll send to kick off the protocol.
val hello = SellerTradeInfo(assetToSell, price, myKeyPair.public, sessionID)
val maybePartialTX = sendAndReceive(TRADE_TOPIC, otherSide, buyerSessionID, sessionID, hello, SignedTransaction::class.java)
val partialTX = maybePartialTX.validate {
val maybeSTX = sendAndReceive<SignedTransaction>(TRADE_TOPIC, otherSide, buyerSessionID, sessionID, hello)
maybeSTX.validate {
it.verifySignatures()
logger.trace { "Received partially signed transaction" }
val wtx: WireTransaction = it.tx
logger.trace { "Received partially signed transaction: ${it.id}" }
checkDependencies(it)
requireThat {
"transaction sends us the right amount of cash" by (wtx.outputs.sumCashBy(myKeyPair.public) == price)
@ -123,8 +132,121 @@ object TwoPartyTradeProtocol {
// but the goal of this code is not to be fully secure, but rather, just to find good ways to
// express protocol state machines on top of the messaging layer.
}
return it
}
}
@Suspendable
open fun checkDependencies(txToCheck: SignedTransaction) {
val toVerify = HashSet<LedgerTransaction>()
val alreadyVerified = HashSet<LedgerTransaction>()
val downloadedSignedTxns = ArrayList<SignedTransaction>()
fetchDependenciesAndCheckSignatures(txToCheck.tx.inputs, toVerify, alreadyVerified, downloadedSignedTxns)
TransactionGroup(toVerify, alreadyVerified).verify(serviceHub.storageService.contractPrograms)
// Now write all the transactions we just validated back to the database for next time, including
// signatures so we can serve up these transactions to other peers when we, in turn, send one that
// depends on them onto another peer.
//
// It may seem tempting to write transactions to the database as we receive them, instead of all at once
// here at the end. Doing it this way avoids cases where a transaction is in the database but its
// dependencies aren't, or an unvalidated and possibly broken tx is there.
serviceHub.storageService.validatedTransactions.putAll(downloadedSignedTxns.associateBy { it.id })
}
@Suspendable
private fun fetchDependenciesAndCheckSignatures(depsToCheck: List<StateRef>,
toVerify: HashSet<LedgerTransaction>,
alreadyVerified: HashSet<LedgerTransaction>,
downloadedSignedTxns: ArrayList<SignedTransaction>) {
// A1. Create a work queue of transaction hashes waiting for resolution. Create a TransactionGroup.
//
// B1. Pop a hash. Look it up in the database. If it's not there, put the hash into a list for sending to
// the other peer. If it is there, load it and put its outputs into the TransactionGroup as unverified
// roots, because it's already been validated before.
// B2. If the queue is not empty, GOTO B1
// B3. If the request list is empty, GOTO D1
//
// C1. Send the request for hashes to the peer and wait for the response. Clear the request list.
// C2. For each transaction returned, verify that it does indeed hash to the requested transaction.
// C3. Add each transaction to the TransactionGroup.
// C4. Add each input state in each transaction to the work queue.
//
// D1. Verify the transaction group.
// D2. Write all the transactions in the group to the database.
// END
//
// Note: This protocol leaks private data. If you download a transaction and then do NOT request a
// dependency, it means you already have it, which in turn means you must have been involved with it before
// somehow, either in the tx itself or in any following spend of it. If there were no following spends, then
// your peer knows for sure that you were involved ... this is bad! The only obvious ways to fix this are
// something like onion routing of requests, secure hardware, or both.
//
// TODO: This needs to be factored out into a general subprotocol and subprotocol handling improved.
val workQ = ArrayList<StateRef>()
workQ.addAll(depsToCheck)
val nextRequests = ArrayList<SecureHash>()
val db = serviceHub.storageService.validatedTransactions
var limitCounter = 0
while (true) {
for (ref in workQ) {
val stx: SignedTransaction? = db[ref.txhash]
if (stx == null) {
// Transaction wasn't found in our local database, so we have to ask for it.
nextRequests.add(ref.txhash)
} else {
alreadyVerified.add(stx.verifyToLedgerTransaction(serviceHub.identityService))
}
}
workQ.clear()
if (nextRequests.isEmpty())
break
val sid = random63BitValue()
val fetchReq = DataVendingService.Request(nextRequests, serviceHub.networkService.myAddress, sid)
logger.info("Requesting ${nextRequests.size} dependency(s) for verification")
val maybeTxns: UntrustworthyData<ArrayList<SignedTransaction?>> =
sendAndReceive("platform.fetch.tx", otherSide, 0, sid, fetchReq)
// Check for a buggy/malicious peer answering with something that we didn't ask for, and then
// verify the signatures on the transactions and look up the identities to get LedgerTransactions.
// Note that this doesn't run contracts: just checks the signatures match the data.
val stxns: List<SignedTransaction> = validateTXFetchResponse(maybeTxns, nextRequests)
nextRequests.clear()
val ltxns = stxns.map { it.verifyToLedgerTransaction(serviceHub.identityService) }
// Add to the TransactionGroup, pending verification.
toVerify.addAll(ltxns)
downloadedSignedTxns.addAll(stxns)
// And now add all the input states to the work queue for database or remote resolution.
workQ.addAll(ltxns.flatMap { it.inputs })
// And loop around ...
limitCounter += workQ.size
if (limitCounter > 5000)
throw ExcessivelyLargeTransactionGraphException()
}
}
private fun validateTXFetchResponse(maybeTxns: UntrustworthyData<ArrayList<SignedTransaction?>>,
nextRequests: ArrayList<SecureHash>): List<SignedTransaction> {
return maybeTxns.validate { response ->
require(response.size == nextRequests.size)
val answers = response.requireNoNulls()
// Check transactions actually hash to what we requested, if this fails the remote node
// is a malicious protocol violator or buggy.
for ((index, stx) in answers.withIndex())
require(stx.id == nextRequests[index])
answers
}
return partialTX
}
open fun signWithOurKey(partialTX: SignedTransaction) = myKeyPair.signWithECDSA(partialTX.txBits)
@ -139,8 +261,6 @@ object TwoPartyTradeProtocol {
tsaSig: DigitalSignature.LegallyIdentifiable): SignedTransaction {
val fullySigned = partialTX + tsaSig + ourSignature
// TODO: We should run it through our full TransactionGroup of all transactions here.
logger.trace { "Built finished transaction, sending back to secondary!" }
send(TRADE_TOPIC, otherSide, buyerSessionID, SignaturesFromSeller(tsaSig, ourSignature))
@ -148,11 +268,6 @@ object TwoPartyTradeProtocol {
}
}
class UnacceptablePriceException(val givenPrice: Amount) : Exception()
class AssetMismatchException(val expectedTypeName: String, val typeName: String) : Exception() {
override fun toString() = "The submitted asset didn't match the expected type: $expectedTypeName vs $typeName"
}
open class Buyer(val otherSide: SingleMessageRecipient,
val timestampingAuthority: Party,
val acceptablePrice: Amount,
@ -189,6 +304,8 @@ object TwoPartyTradeProtocol {
throw UnacceptablePriceException(it.price)
if (!typeToBuy.isInstance(it.assetForSale.state))
throw AssetMismatchException(typeToBuy.name, assetTypeName)
return@validate it
}
// TODO: Either look up the stateref here in our local db, or accept a long chain of states and
@ -203,7 +320,7 @@ object TwoPartyTradeProtocol {
// TODO: Protect against the seller terminating here and leaving us in the lurch without the final tx.
return sendAndReceive(TRADE_TOPIC, otherSide, theirSessionID, sessionID, stx, SignaturesFromSeller::class.java).validate {}
return sendAndReceive(TRADE_TOPIC, otherSide, theirSessionID, sessionID, stx, SignaturesFromSeller::class.java).validate { it }
}
open fun signWithOurKeys(cashSigningPubKeys: List<PublicKey>, ptx: TransactionBuilder): SignedTransaction {

View File

@ -10,6 +10,7 @@ package core
import co.paralleluniverse.fibers.Suspendable
import core.crypto.DigitalSignature
import core.crypto.SecureHash
import core.crypto.generateKeyPair
import core.messaging.MessagingService
import core.messaging.NetworkMap
@ -80,6 +81,18 @@ object DummyTimestampingAuthority {
interface StorageService {
fun <K,V> getMap(tableName: String): MutableMap<K, V>
/**
* A map of hash->tx where tx has been signature/contract validated and the states are known to be correct.
* The signatures aren't technically needed after that point, but we keep them around so that we can relay
* the transaction data to other nodes that need it.
*/
val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
/**
* A map of program hash->contract class type, used for verification.
*/
val contractPrograms: ContractFactory
/**
* Returns the legal identity that this node is configured with. Assumed to be initialised when the node is
* first installed.

View File

@ -69,6 +69,8 @@ data class TransactionForVerification(val inStates: List<ContractState>,
override fun equals(other: Any?) = other is TransactionForVerification && other.origHash == origHash
/**
* Runs the smart contracts governing this transaction.
*
* @throws TransactionVerificationException if a contract throws an exception, the original is in the cause field
* @throws IllegalStateException if a state refers to an unknown contract.
*/

View File

@ -49,6 +49,9 @@ import javax.annotation.concurrent.ThreadSafe
* TODO: Consider the issue of continuation identity more deeply: is it a safe assumption that a serialised
* continuation is always unique?
* TODO: Think about how to bring the system to a clean stop so it can be upgraded without any serialised stacks on disk
* TODO: Timeouts
* TODO: Surfacing of exceptions via an API and/or management UI
* TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt
*/
@ThreadSafe
class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) {
@ -237,12 +240,6 @@ abstract class ProtocolStateMachine<R> : Fiber<R>("protocol", SameThreadFiberSch
@Transient protected lateinit var logger: Logger
@Transient private var _resultFuture: SettableFuture<R>? = SettableFuture.create<R>()
init {
setDefaultUncaughtExceptionHandler { strand, throwable ->
logger.error("Caught error whilst running protocol state machine ${this.javaClass.name}", throwable)
}
}
/** This future will complete when the call method returns. */
val resultFuture: ListenableFuture<R> get() {
return _resultFuture ?: run {
@ -258,6 +255,10 @@ abstract class ProtocolStateMachine<R> : Fiber<R>("protocol", SameThreadFiberSch
this.logger = logger
this.resumeWithObject = withObject
this.serviceHub = serviceHub
setUncaughtExceptionHandler { strand, throwable ->
logger.error("Caught error whilst running protocol state machine ${strand.javaClass.name}", throwable)
}
}
// This line may look useless, but it's needed to convince the Quasar bytecode rewriter to do the right thing.
@ -306,6 +307,15 @@ abstract class ProtocolStateMachine<R> : Fiber<R>("protocol", SameThreadFiberSch
val result = FiberRequest.NotExpectingResponse(topic, destination, sessionID, obj)
Fiber.parkAndSerialize { fiber, writer -> suspendFunc!!(result, writer.write(fiber)) }
}
// Kotlin helpers that allow the use of generic types.
inline fun <reified T : Any> sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long,
sessionIDForReceive: Long, obj: Any): UntrustworthyData<T> {
return sendAndReceive(topic, destination, sessionIDForSend, sessionIDForReceive, obj, T::class.java)
}
inline fun <reified T : Any> receive(topic: String, sessionIDForReceive: Long): UntrustworthyData<T> {
return receive(topic, sessionIDForReceive, T::class.java)
}
}
/**
@ -325,10 +335,7 @@ class UntrustworthyData<T>(private val fromUntrustedWorld: T) {
get() = fromUntrustedWorld
@Suppress("DEPRECATION")
inline fun validate(validator: (T) -> Unit): T {
validator(data)
return data
}
inline fun <R> validate(validator: (T) -> R) = validator(data)
}
// TODO: Clean this up

View File

@ -0,0 +1,65 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.node
import core.StorageService
import core.crypto.SecureHash
import core.messaging.Message
import core.messaging.MessagingService
import core.messaging.SingleMessageRecipient
import core.messaging.send
import core.serialization.deserialize
import core.utilities.loggerFor
import javax.annotation.concurrent.ThreadSafe
/**
* This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple
* glue that sits between the network layer and the database layer.
*
* Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There
* are no access control lists. If you want to keep some data private, then you must be careful who you give its name
* to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have
* its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as
* such the hash of a piece of data can be seen as a type of password allowing access to it.
*
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
*/
@ThreadSafe
class DataVendingService(private val net: MessagingService, private val storage: StorageService) {
companion object {
val TX_FETCH_TOPIC = "platform.fetch.tx"
val CONTRACT_FETCH_TOPIC = "platform.fetch.contract"
val logger = loggerFor<DataVendingService>()
}
init {
net.addMessageHandler("$TX_FETCH_TOPIC.0") { msg, registration -> handleTXRequest(msg) }
net.addMessageHandler("$CONTRACT_FETCH_TOPIC.0") { msg, registration -> handleContractRequest(msg) }
}
// TODO: Give all messages a respond-to address+session ID automatically.
data class Request(val hashes: List<SecureHash>, val responseTo: SingleMessageRecipient, val sessionID: Long)
private fun handleTXRequest(msg: Message) {
val req = msg.data.deserialize<Request>()
require(req.hashes.isNotEmpty())
val answers = req.hashes.map {
val tx = storage.validatedTransactions[it]
if (tx == null)
logger.info("Got request for unknown tx $it")
tx
}
net.send("$TX_FETCH_TOPIC.${req.sessionID}", req.responseTo, answers)
}
private fun handleContractRequest(msg: Message) {
TODO("PLT-12: Basic module/sandbox system for contracts")
}
}

View File

@ -59,6 +59,9 @@ class E2ETestWalletService(private val services: ServiceHub) : WalletService {
return@map issuance.toSignedTransaction(true)
}
// TODO: Centralise the process of transaction acceptance and filtering into the wallet, then move this out.
services.storageService.validatedTransactions.putAll(transactions.associateBy { it.id })
val statesAndRefs = transactions.map {
StateAndRef(it.tx.outputs[0] as OwnableState, StateRef(it.id, 0))
}

View File

@ -9,7 +9,9 @@
package core.node
import com.google.common.net.HostAndPort
import contracts.*
import core.*
import core.crypto.SecureHash
import core.crypto.generateKeyPair
import core.messaging.*
import core.serialization.deserialize
@ -67,6 +69,22 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
override val identityService: IdentityService get() = identity
}
// TODO: This will be obsoleted by "PLT-12: Basic module/sandbox system for contracts"
private val contractFactory = object : ContractFactory {
private val contracts = mapOf(
CASH_PROGRAM_ID to Cash::class.java,
CP_PROGRAM_ID to CommercialPaper::class.java,
CROWDFUND_PROGRAM_ID to CrowdFund::class.java,
DUMMY_PROGRAM_ID to DummyContract::class.java
)
override fun <T : Contract> get(hash: SecureHash): T {
val c = contracts[hash] ?: throw UnknownContractException()
@Suppress("UNCHECKED_CAST")
return c.newInstance() as T
}
}
val storage: StorageService
val smm: StateMachineManager
val net: ArtemisMessagingService
@ -107,6 +125,10 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
listOf(storage.myLegalIdentity)
identity = FixedIdentityService(knownIdentities)
// This object doesn't need to be referenced from this class because it registers handlers on the network
// service and so that keeps it from being collected.
DataVendingService(net, storage)
net.start()
}
@ -149,7 +171,7 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
log.info("Node owned by ${identity.name} starting up ...")
return object : StorageService {
val ss = object : StorageService {
private val tables = HashMap<String, MutableMap<Any, Any>>()
@Suppress("UNCHECKED_CAST")
@ -160,9 +182,15 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
}
}
override val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
get() = getMap("validated-transactions")
override val contractPrograms = contractFactory
override val myLegalIdentity = identity
override val myLegalIdentityKey = keypair
}
return ss
}
private fun alreadyRunningNodeCheck() {

View File

@ -109,9 +109,10 @@ class TimestamperClient(private val psm: ProtocolStateMachine<*>, private val no
sessionID, req, DigitalSignature.LegallyIdentifiable::class.java)
// Check that the timestamping authority gave us back a valid signature and didn't break somehow
val signature = maybeSignature.validate { it.verifyWithECDSA(wtxBytes) }
return signature
maybeSignature.validate { sig ->
sig.verifyWithECDSA(wtxBytes)
return sig
}
}
}

View File

@ -12,7 +12,6 @@ import com.google.common.net.HostAndPort
import contracts.CommercialPaper
import contracts.protocols.TwoPartyTradeProtocol
import core.*
import core.crypto.SecureHash
import core.crypto.generateKeyPair
import core.messaging.LegallyIdentifiableNode
import core.messaging.SingleMessageRecipient
@ -86,7 +85,7 @@ fun main(args: Array<String>) {
// Now do some fake nonsense just to give us some activity.
(node.services.walletService as E2ETestWalletService).fillWithSomeTestCash(1000.DOLLARS)
(node.services.walletService as E2ETestWalletService).fillWithSomeTestCash(1500.DOLLARS)
val timestampingAuthority = node.services.networkMapService.timestampingNodes.first()
if (listening) {
@ -103,7 +102,7 @@ fun main(args: Array<String>) {
val replyTo = msg.data.deserialize<SingleMessageRecipient>(includeClassName = true)
val buyerSessionID = random63BitValue()
println("Got a new junk trade request, sending back session ID and starting buy protocol")
val future = TwoPartyTradeProtocol.runBuyer(node.smm, timestampingAuthority, replyTo, 100.DOLLARS,
val future = TwoPartyTradeProtocol.runBuyer(node.smm, timestampingAuthority, replyTo, 1000.DOLLARS,
CommercialPaper.State::class.java, buyerSessionID)
future success {
@ -138,10 +137,10 @@ fun main(args: Array<String>) {
println("Got session ID back, now starting the sell protocol")
val cpOwnerKey = node.keyManagement.freshKey()
val commercialPaper = makeFakeCommercialPaper(cpOwnerKey.public)
val commercialPaper = makeFakeCommercialPaper(node.storage, cpOwnerKey.public)
val future = TwoPartyTradeProtocol.runSeller(node.smm, timestampingAuthority,
otherSide, commercialPaper, 100.DOLLARS, cpOwnerKey, sessionID)
otherSide, commercialPaper, 1000.DOLLARS, cpOwnerKey, sessionID)
future success {
println()
@ -165,13 +164,28 @@ fun main(args: Array<String>) {
}
}
fun makeFakeCommercialPaper(ownedBy: PublicKey): StateAndRef<CommercialPaper.State> {
fun makeFakeCommercialPaper(storageService: StorageService, ownedBy: PublicKey): StateAndRef<CommercialPaper.State> {
// Make a fake company that's issued its own paper.
val party = Party("MegaCorp, Inc", generateKeyPair().public)
// ownedBy here is the random key that gives us control over it.
val paper = CommercialPaper.State(party.ref(1,2,3), ownedBy, 1100.DOLLARS, Instant.now() + 10.days)
val randomRef = StateRef(SecureHash.randomSHA256(), 0)
return StateAndRef(paper, randomRef)
val keyPair = generateKeyPair()
val party = Party("MegaCorp, Inc", keyPair.public)
val issuance = run {
val tx = CommercialPaper().generateIssue(party.ref(1,2,3), 1100.DOLLARS, Instant.now() + 10.days)
tx.signWith(keyPair)
tx.toSignedTransaction(true)
}
val move = run {
val tx = TransactionBuilder()
CommercialPaper().generateMove(tx, issuance.tx.outRef(0), ownedBy)
tx.signWith(keyPair)
tx.toSignedTransaction(true)
}
storageService.validatedTransactions[issuance.id] = issuance
storageService.validatedTransactions[move.id] = move
return move.tx.outRef(0)
}
private fun loadConfigFile(configFile: Path): NodeConfiguration {

View File

@ -15,9 +15,11 @@ import core.crypto.signWithECDSA
import core.messaging.MessagingService
import core.messaging.MockNetworkMap
import core.messaging.NetworkMap
import core.node.DataVendingService
import core.node.TimestampingError
import core.serialization.SerializedBytes
import core.serialization.deserialize
import core.testutils.RecordingMap
import core.testutils.TEST_KEYS_TO_CORP_MAP
import core.testutils.TEST_PROGRAM_MAP
import core.testutils.TEST_TX_TIME
@ -65,16 +67,27 @@ class MockWalletService(val states: List<StateAndRef<OwnableState>>) : WalletSer
}
@ThreadSafe
class MockStorageService : StorageService {
class MockStorageService(val isRecording: Boolean = false) : StorageService {
override val myLegalIdentityKey: KeyPair = generateKeyPair()
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)
private val tables = HashMap<String, MutableMap<Any, Any>>()
override val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
get() = getMap("validated-transactions")
override val contractPrograms = MockContractFactory
@Suppress("UNCHECKED_CAST")
override fun <K, V> getMap(tableName: String): MutableMap<K, V> {
synchronized(tables) {
return tables.getOrPut(tableName) { Collections.synchronizedMap(HashMap<Any, Any>()) } as MutableMap<K, V>
return tables.getOrPut(tableName) {
val map = Collections.synchronizedMap(HashMap<Any, Any>())
if (isRecording)
RecordingMap(map)
else
map
} as MutableMap<K, V>
}
}
}
@ -107,4 +120,12 @@ class MockServices(
get() = networkMap ?: throw UnsupportedOperationException()
override val storageService: StorageService
get() = storage ?: throw UnsupportedOperationException()
init {
if (net != null && storage != null) {
// Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener
// on the networking service, so that will keep it from being collected.
DataVendingService(net, storage)
}
}
}

View File

@ -16,6 +16,7 @@ import core.ThreadBox
import core.crypto.sha256
import core.node.TimestamperNodeService
import core.utilities.loggerFor
import java.security.KeyPair
import java.time.Instant
import java.util.*
import java.util.concurrent.Executor
@ -221,10 +222,7 @@ class InMemoryNetwork {
private fun pumpInternal(block: Boolean): Boolean {
val q = getQueueForHandle(handle)
val message = if (block) q.take() else q.poll()
if (message == null)
return false
val message = (if (block) q.take() else q.poll()) ?: return false
val deliverTo = state.locked {
val h = handlers.filter { if (it.topic.isBlank()) true else message.topic == it.topic }

View File

@ -44,16 +44,9 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
}
@Test
fun cashForCP() {
fun `trade cash for commercial paper`() {
transactionGroupFor<ContractState> {
// Bob (Buyer) has some cash, Alice (Seller) has some commercial paper she wants to sell to Bob.
roots {
transaction(CommercialPaper.State(MEGA_CORP.ref(1, 2, 3), ALICE, 1200.DOLLARS, TEST_TX_TIME + 7.days) label "alice's paper")
transaction(800.DOLLARS.CASH `owned by` BOB label "bob cash1")
transaction(300.DOLLARS.CASH `owned by` BOB label "bob cash2")
}
val bobsWallet = listOf<StateAndRef<Cash.State>>(lookup("bob cash1"), lookup("bob cash2"))
val bobsWallet = fillUp().first
val (alicesAddress, alicesNode) = makeNode(inBackground = true)
val (bobsAddress, bobsNode) = makeNode(inBackground = true)
@ -61,10 +54,12 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
val alicesServices = MockServices(net = alicesNode)
val bobsServices = MockServices(
wallet = MockWalletService(bobsWallet),
wallet = MockWalletService(bobsWallet.states),
keyManagement = MockKeyManagementService(mapOf(BOB to BOB_KEY.private)),
net = bobsNode
net = bobsNode,
storage = MockStorageService()
)
loadFakeTxnsIntoStorage(bobsServices.storageService)
val buyerSessionID = random63BitValue()
@ -94,16 +89,9 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
}
@Test
fun serializeAndRestore() {
fun `shut down and restore`() {
transactionGroupFor<ContractState> {
// Buyer Bob has some cash, Seller Alice has some commercial paper she wants to sell to Bob.
roots {
transaction(CommercialPaper.State(MEGA_CORP.ref(1, 2, 3), ALICE, 1200.DOLLARS, TEST_TX_TIME + 7.days) label "alice's paper")
transaction(800.DOLLARS.CASH `owned by` BOB label "bob cash1")
transaction(300.DOLLARS.CASH `owned by` BOB label "bob cash2")
}
val bobsWallet = listOf<StateAndRef<Cash.State>>(lookup("bob cash1"), lookup("bob cash2"))
val wallet = fillUp().first
val (alicesAddress, alicesNode) = makeNode(inBackground = false)
var (bobsAddress, bobsNode) = makeNode(inBackground = false)
@ -113,11 +101,12 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
val alicesServices = MockServices(wallet = null, keyManagement = null, net = alicesNode)
var bobsServices = MockServices(
wallet = MockWalletService(bobsWallet),
wallet = MockWalletService(wallet.states),
keyManagement = MockKeyManagementService(mapOf(BOB to BOB_KEY.private)),
net = bobsNode,
storage = bobsStorage
)
loadFakeTxnsIntoStorage(bobsStorage)
val smmBuyer = StateMachineManager(bobsServices, MoreExecutors.directExecutor())
@ -149,6 +138,12 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
// Seller Alice already sent a message to Buyer Bob. Pump once:
bobsNode.pump(false)
// Bob sends a couple of queries for the dependencies back to Alice. Alice reponds.
alicesNode.pump(false)
bobsNode.pump(false)
alicesNode.pump(false)
bobsNode.pump(false)
// OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature.
// Save the state machine to "disk" (i.e. a variable, here)
assertEquals(1, bobsStorage.getMap<Any, Any>("state machines").size)
@ -185,4 +180,104 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
assertTrue(smm.stateMachines.isEmpty())
}
}
@Test
fun `check dependencies of the sale asset are resolved`() {
transactionGroupFor<ContractState> {
val (bobsWallet, fakeTxns) = fillUp()
val (alicesAddress, alicesNode) = makeNode(inBackground = true)
val (bobsAddress, bobsNode) = makeNode(inBackground = true)
val timestamper = network.setupTimestampingNode(false).first
val alicesServices = MockServices(net = alicesNode)
val bobsServices = MockServices(
wallet = MockWalletService(bobsWallet.states),
keyManagement = MockKeyManagementService(mapOf(BOB to BOB_KEY.private)),
net = bobsNode,
storage = MockStorageService(isRecording = true)
)
loadFakeTxnsIntoStorage(bobsServices.storageService)
val buyerSessionID = random63BitValue()
val aliceResult = TwoPartyTradeProtocol.runSeller(
StateMachineManager(alicesServices, backgroundThread),
timestamper,
bobsAddress,
lookup("alice's paper"),
1000.DOLLARS,
ALICE_KEY,
buyerSessionID
)
val bobResult = TwoPartyTradeProtocol.runBuyer(
StateMachineManager(bobsServices, backgroundThread),
timestamper,
alicesAddress,
1000.DOLLARS,
CommercialPaper.State::class.java,
buyerSessionID
)
// This line forces the protocol to run to completion.
assertEquals(aliceResult.get(), bobResult.get())
val records = (bobsServices.storageService.validatedTransactions as RecordingMap).records
val expected = listOf(
RecordingMap.Get(fakeTxns[1].id),
RecordingMap.Get(fakeTxns[2].id),
RecordingMap.Get(fakeTxns[3].id),
RecordingMap.Get(fakeTxns[0].id),
RecordingMap.Get(fakeTxns[0].id)
)
assertEquals(expected, records)
}
}
private fun TransactionGroupDSL<ContractState>.loadFakeTxnsIntoStorage(ss: StorageService) {
val txStorage = ss.validatedTransactions
val map = signAll().associateBy { it.id }
if (txStorage is RecordingMap) {
txStorage.putAllUnrecorded(map)
} else
txStorage.putAll(map)
}
private fun TransactionGroupDSL<ContractState>.fillUp(): Pair<Wallet, List<WireTransaction>> {
// Bob (Buyer) has some cash he got from the Bank of Elbonia, Alice (Seller) has some commercial paper she
// wants to sell to Bob.
val eb1 = transaction {
// Issued money to itself.
output("elbonian money 1") { 800.DOLLARS.CASH `issued by` MEGA_CORP `owned by` MEGA_CORP_PUBKEY }
output("elbonian money 2") { 1000.DOLLARS.CASH `issued by` MEGA_CORP `owned by` MEGA_CORP_PUBKEY }
arg(MEGA_CORP_PUBKEY) { Cash.Commands.Issue() }
timestamp(TEST_TX_TIME)
}
// Bob gets some cash onto the ledger from BoE
val bc1 = transaction {
input("elbonian money 1")
output("bob cash 1") { 800.DOLLARS.CASH `issued by` MEGA_CORP `owned by` BOB }
arg(MEGA_CORP_PUBKEY) { Cash.Commands.Move() }
}
val bc2 = transaction {
input("elbonian money 2")
output("bob cash 2") { 300.DOLLARS.CASH `issued by` MEGA_CORP `owned by` BOB }
output { 700.DOLLARS.CASH `issued by` MEGA_CORP `owned by` MEGA_CORP_PUBKEY } // Change output.
arg(MEGA_CORP_PUBKEY) { Cash.Commands.Move() }
}
val ap = transaction {
output("alice's paper") {
CommercialPaper.State(MEGA_CORP.ref(1, 2, 3), ALICE, 1200.DOLLARS, TEST_TX_TIME + 7.days)
}
arg(MEGA_CORP_PUBKEY) { CommercialPaper.Commands.Issue() }
timestamp(TEST_TX_TIME)
}
val wallet = Wallet(listOf<StateAndRef<Cash.State>>(lookup("bob cash 1"), lookup("bob cash 2")))
return Pair(wallet, listOf(eb1, bc1, bc2, ap))
}
}

View File

@ -22,7 +22,7 @@ import kotlin.test.assertEquals
class E2ETestWalletServiceTest {
val services: ServiceHub = MockServices(
keyManagement = MockKeyManagementService(emptyMap(), arrayListOf<KeyPair>(ALICE_KEY, ALICE_KEY, ALICE_KEY))
keyManagement = MockKeyManagementService(emptyMap(), arrayListOf<KeyPair>(ALICE_KEY, ALICE_KEY, ALICE_KEY))
)
@Test fun splits() {

View File

@ -0,0 +1,61 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.testutils
import java.util.*
import javax.annotation.concurrent.ThreadSafe
/**
* A RecordingMap wraps a regular Map<K, V> and records the sequence of gets and puts to it. This is useful in
* white box unit tests to ensure that code is accessing a data store as much as you expect.
*
* Note: although this class itself thread safe, if the underlying map is not, then this class loses its thread safety.
*/
@ThreadSafe
class RecordingMap<K, V>(private val wrappedMap: MutableMap<K, V>) : MutableMap<K, V> by wrappedMap {
// If/when Kotlin supports data classes inside sealed classes, that would be preferable to this.
interface Record
data class Get<K>(val key: K) : Record
data class Put<K, V>(val key: K, val value: V) : Record
private val _records = Collections.synchronizedList(ArrayList<Record>())
/** Returns a snapshot of the set of records */
val records: List<Record> get() = _records.toList()
fun clearRecords() = _records.clear()
override fun get(key: K): V? {
_records.add(Get(key))
return wrappedMap[key]
}
override fun put(key: K, value: V): V? {
_records.add(Put(key, value))
return wrappedMap.put(key, value)
}
override fun putAll(from: Map<out K, V>) {
for ((k, v) in from) {
put(k, v)
}
}
fun putAllUnrecorded(from: Map<out K, V>) {
wrappedMap.putAll(from)
}
}

View File

@ -15,6 +15,7 @@ import core.*
import core.crypto.*
import core.serialization.serialize
import core.visualiser.GraphVisualiser
import java.security.KeyPair
import java.security.PublicKey
import java.time.Instant
import java.util.*
@ -41,7 +42,7 @@ val BOB = BOB_KEY.public
val MEGA_CORP = Party("MegaCorp", MEGA_CORP_PUBKEY)
val MINI_CORP = Party("MiniCorp", MINI_CORP_PUBKEY)
val ALL_TEST_KEYS = listOf(MEGA_CORP_KEY, MINI_CORP_KEY, ALICE_KEY, BOB_KEY)
val ALL_TEST_KEYS = listOf(MEGA_CORP_KEY, MINI_CORP_KEY, ALICE_KEY, BOB_KEY, DummyTimestampingAuthority.key)
val TEST_KEYS_TO_CORP_MAP: Map<PublicKey, Party> = mapOf(
MEGA_CORP_PUBKEY to MEGA_CORP,
@ -315,15 +316,17 @@ class TransactionGroupDSL<T : ContractState>(private val stateType: Class<T>) {
GraphVisualiser(this as TransactionGroupDSL<ContractState>).display()
}
fun signAll(): List<SignedTransaction> {
fun signAll(vararg extraKeys: KeyPair): List<SignedTransaction> {
return txns.map { wtx ->
val allPubKeys = wtx.commands.flatMap { it.pubkeys }.toSet()
val allPubKeys = wtx.commands.flatMap { it.pubkeys }.toMutableSet()
val bits = wtx.serialize()
require(bits == wtx.serialized)
val sigs = ArrayList<DigitalSignature.WithKey>()
for (key in ALL_TEST_KEYS) {
if (allPubKeys.contains(key.public))
for (key in ALL_TEST_KEYS + extraKeys) {
if (allPubKeys.contains(key.public)) {
sigs += key.signWithECDSA(bits)
allPubKeys -= key.public
}
}
wtx.toSignedTransaction(sigs)
}