mirror of
https://github.com/corda/corda.git
synced 2025-04-15 15:07:03 +00:00
Merged in PLT-61-resolve-deps (pull request #21)
Resolve and verify transaction dependencies, many other improvements
This commit is contained in:
commit
7832fd0f93
2
.idea/runConfigurations/Node__buyer.xml
generated
2
.idea/runConfigurations/Node__buyer.xml
generated
@ -2,7 +2,7 @@
|
||||
<configuration default="false" name="Node: buyer" type="JetRunConfigurationType" factoryName="Kotlin">
|
||||
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
|
||||
<option name="MAIN_CLASS_NAME" value="core.node.TraderDemoKt" />
|
||||
<option name="VM_PARAMETERS" value="-ea -javaagent:lib/quasar.jar" />
|
||||
<option name="VM_PARAMETERS" value="-ea -javaagent:lib/quasar.jar -Dco.paralleluniverse.fibers.verifyInstrumentation" />
|
||||
<option name="PROGRAM_PARAMETERS" value="--dir=buyer --service-fake-trades --network-address=localhost" />
|
||||
<option name="WORKING_DIRECTORY" value="" />
|
||||
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
|
||||
|
2
.idea/runConfigurations/Node__seller.xml
generated
2
.idea/runConfigurations/Node__seller.xml
generated
@ -2,7 +2,7 @@
|
||||
<configuration default="false" name="Node: seller" type="JetRunConfigurationType" factoryName="Kotlin">
|
||||
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
|
||||
<option name="MAIN_CLASS_NAME" value="core.node.TraderDemoKt" />
|
||||
<option name="VM_PARAMETERS" value="-ea -javaagent:lib/quasar.jar" />
|
||||
<option name="VM_PARAMETERS" value="-ea -javaagent:lib/quasar.jar -Dco.paralleluniverse.fibers.verifyInstrumentation" />
|
||||
<option name="PROGRAM_PARAMETERS" value="--dir=seller --fake-trade-with=localhost --network-address=localhost:31338 --timestamper-identity-file=buyer/identity-public --timestamper-address=localhost" />
|
||||
<option name="WORKING_DIRECTORY" value="" />
|
||||
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
|
||||
|
@ -104,7 +104,7 @@ class Cash : Contract {
|
||||
// literally anyone with access to the network can issue cash claims of arbitrary amounts! It is up
|
||||
// to the recipient to decide if the backing party is trustworthy or not, via some
|
||||
// as-yet-unwritten identity service. See ADP-22 for discussion.
|
||||
val outputsInstitution = outputs.map { it.deposit.party }.singleOrNull()
|
||||
val outputsInstitution = outputs.map { it.deposit.party }.distinct().singleOrNull()
|
||||
if (outputsInstitution != null) {
|
||||
requireThat {
|
||||
"the issue command has a nonce" by (issueCommand.value.nonce != 0L)
|
||||
|
@ -79,7 +79,11 @@ class CommercialPaper : Contract {
|
||||
// There are two possible things that can be done with this CP. The first is trading it. The second is redeeming
|
||||
// it for cash on or after the maturity date.
|
||||
val command = tx.commands.requireSingleCommand<CommercialPaper.Commands>()
|
||||
val timestamp: TimestampCommand? = tx.getTimestampBy(DummyTimestampingAuthority.identity)
|
||||
|
||||
// Here, we match acceptable timestamp authorities by name. The list of acceptable TSAs (oracles) must be
|
||||
// hard coded into the contract because otherwise we could fail to gain consensus, if nodes disagree about
|
||||
// who or what is a trusted authority.
|
||||
val timestamp: TimestampCommand? = tx.commands.getTimestampByName("The dummy timestamper", "Bank of Zurich")
|
||||
|
||||
for (group in groups) {
|
||||
when (command.value) {
|
||||
|
@ -45,9 +45,7 @@ class Requirements {
|
||||
}
|
||||
}
|
||||
val R = Requirements()
|
||||
inline fun requireThat(body: Requirements.() -> Unit) {
|
||||
R.body()
|
||||
}
|
||||
inline fun <R> requireThat(body: Requirements.() -> R) = R.body()
|
||||
|
||||
//// Amounts //////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@ -132,3 +130,19 @@ fun List<AuthenticatedObject<CommandData>>.getTimestampBy(timestampingAuthority:
|
||||
return timestampCmds.singleOrNull()?.value as? TimestampCommand
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a timestamp that was signed by any of the the named authorities, or returns null if missing.
|
||||
* Note that matching here is done by (verified, legal) name, not by public key. Any signature by any
|
||||
* party with a name that matches (case insensitively) any of the given names will yield a match.
|
||||
*/
|
||||
fun List<AuthenticatedObject<CommandData>>.getTimestampByName(vararg names: String): TimestampCommand? {
|
||||
val timestampCmd = filter { it.value is TimestampCommand }.singleOrNull() ?: return null
|
||||
val tsaNames = timestampCmd.signingParties.map { it.name.toLowerCase() }
|
||||
val acceptableNames = names.map { it.toLowerCase() }
|
||||
val acceptableNameFound = tsaNames.intersect(acceptableNames).isNotEmpty()
|
||||
if (acceptableNameFound)
|
||||
return timestampCmd.value as TimestampCommand
|
||||
else
|
||||
return null
|
||||
}
|
||||
|
||||
|
@ -23,7 +23,6 @@ class TransactionConflictException(val conflictRef: StateRef, val tx1: LedgerTra
|
||||
* [nonVerifiedRoots] set. Transactions in the non-verified set are ignored other than for looking up input states.
|
||||
*/
|
||||
class TransactionGroup(val transactions: Set<LedgerTransaction>, val nonVerifiedRoots: Set<LedgerTransaction>) {
|
||||
|
||||
/**
|
||||
* Verifies the group and returns the set of resolved transactions.
|
||||
*/
|
||||
|
@ -12,6 +12,7 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import core.crypto.DigitalSignature
|
||||
import core.crypto.SecureHash
|
||||
import core.crypto.signWithECDSA
|
||||
import core.crypto.toStringShort
|
||||
import core.node.TimestampingError
|
||||
import core.serialization.SerializedBytes
|
||||
import core.serialization.deserialize
|
||||
@ -83,6 +84,9 @@ data class WireTransaction(val inputs: List<StateRef>,
|
||||
return SignedTransaction(serialized, withSigs)
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
fun <T : ContractState> outRef(index: Int) = StateAndRef(outputs[index] as T, StateRef(id, index))
|
||||
|
||||
override fun toString(): String {
|
||||
val buf = StringBuilder()
|
||||
buf.appendln("Transaction:")
|
||||
@ -117,18 +121,25 @@ data class SignedTransaction(val txBits: SerializedBytes<WireTransaction>, val s
|
||||
|
||||
/**
|
||||
* Verify the signatures, deserialise the wire transaction and then check that the set of signatures found matches
|
||||
* the set of pubkeys in the commands.
|
||||
* the set of pubkeys in the commands. If any signatures are missing, either throws an exception (by default) or
|
||||
* returns the list of keys that have missing signatures, depending on the parameter.
|
||||
*
|
||||
* @throws SignatureException if the signature is invalid or does not match.
|
||||
* @throws SignatureException if a signature is invalid, does not match or if any signature is missing.
|
||||
*/
|
||||
fun verify() {
|
||||
fun verify(throwIfSignaturesAreMissing: Boolean = true): Set<PublicKey> {
|
||||
verifySignatures()
|
||||
// Verify that every command key was in the set that we just verified: there should be no commands that were
|
||||
// unverified.
|
||||
val cmdKeys = tx.commands.flatMap { it.pubkeys }.toSet()
|
||||
val sigKeys = sigs.map { it.by }.toSet()
|
||||
if (!sigKeys.containsAll(cmdKeys))
|
||||
throw SignatureException("Missing signatures on the transaction for: ${cmdKeys - sigKeys}")
|
||||
if (sigKeys == cmdKeys)
|
||||
return emptySet()
|
||||
|
||||
val missing = cmdKeys - sigKeys
|
||||
if (throwIfSignaturesAreMissing)
|
||||
throw SignatureException("Missing signatures on transaction ${id.prefixChars()} for: ${missing.map { it.toStringShort() }}")
|
||||
else
|
||||
return missing
|
||||
}
|
||||
|
||||
/**
|
||||
@ -157,7 +168,7 @@ class TransactionBuilder(private val inputs: MutableList<StateRef> = arrayListOf
|
||||
/**
|
||||
* Places a [TimestampCommand] in this transaction, removing any existing command if there is one.
|
||||
* To get the right signature from the timestamping service, use the [timestamp] method after building is
|
||||
* finished.
|
||||
* finished, or run use the [TimestampingProtocol] yourself.
|
||||
*
|
||||
* The window of time in which the final timestamp may lie is defined as [time] +/- [timeTolerance].
|
||||
* If you want a non-symmetrical time window you must add the command via [addCommand] yourself. The tolerance
|
||||
@ -204,8 +215,7 @@ class TransactionBuilder(private val inputs: MutableList<StateRef> = arrayListOf
|
||||
*/
|
||||
fun checkAndAddSignature(sig: DigitalSignature.WithKey) {
|
||||
require(commands.count { it.pubkeys.contains(sig.by) } > 0) { "Signature key doesn't match any command" }
|
||||
val data = toWireTransaction().serialize()
|
||||
sig.verifyWithECDSA(data.bits)
|
||||
sig.verifyWithECDSA(toWireTransaction().serialize())
|
||||
currentSigs.add(sig)
|
||||
}
|
||||
|
||||
|
@ -25,3 +25,7 @@
|
||||
.codeset > .highlight-java {
|
||||
display: none;
|
||||
}
|
||||
|
||||
.wy-nav-content {
|
||||
max-width: 1000px;
|
||||
}
|
@ -54,9 +54,9 @@ author = u'R3 CEV'
|
||||
# built documents.
|
||||
#
|
||||
# The short X.Y version.
|
||||
version = '0.1'
|
||||
version = 'latest'
|
||||
# The full version, including alpha/beta/rc tags.
|
||||
release = '0.1'
|
||||
release = 'latest'
|
||||
|
||||
# The language for content autogenerated by Sphinx. Refer to documentation
|
||||
# for a list of supported languages.
|
||||
|
@ -65,7 +65,7 @@ We use continuations for the following reasons:
|
||||
|
||||
* It allows us to write code that is free of callbacks, that looks like ordinary sequential code.
|
||||
* A suspended continuation takes far less memory than a suspended thread. It can be as low as a few hundred bytes.
|
||||
In contrast a suspended Java stack can easily be 1mb in size.
|
||||
In contrast a suspended Java thread stack can easily be 1mb in size.
|
||||
* It frees the developer from thinking (much) about persistence and serialisation.
|
||||
|
||||
A *state machine* is a piece of code that moves through various *states*. These are not the same as states in the data
|
||||
@ -140,7 +140,7 @@ each side.
|
||||
val assetToSell: StateAndRef<OwnableState>,
|
||||
val price: Amount,
|
||||
val myKeyPair: KeyPair,
|
||||
val buyerSessionID: Long) : ProtocolStateMachine<SignedTransaction>() {
|
||||
val buyerSessionID: Long) : ProtocolLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
TODO()
|
||||
@ -156,7 +156,7 @@ each side.
|
||||
val timestampingAuthority: Party,
|
||||
val acceptablePrice: Amount,
|
||||
val typeToBuy: Class<out OwnableState>,
|
||||
val sessionID: Long) : ProtocolStateMachine<SignedTransaction>() {
|
||||
val sessionID: Long) : ProtocolLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
TODO()
|
||||
@ -249,7 +249,7 @@ Let's implement the ``Seller.call`` method. This will be invoked by the platform
|
||||
|
||||
// These two steps could be done in parallel, in theory. Our framework doesn't support that yet though.
|
||||
val ourSignature = signWithOurKey(partialTX)
|
||||
val tsaSig = timestamp(partialTX)
|
||||
val tsaSig = subProtocol(TimestampingProtocol(timestampingAuthority, partialTX.txBits))
|
||||
|
||||
val stx: SignedTransaction = sendSignatures(partialTX, ourSignature, tsaSig)
|
||||
|
||||
@ -260,6 +260,12 @@ valid. Then we sign with our own key, request a timestamping authority to assert
|
||||
timestamp in the transaction (if any) is valid, and finally we send back both our signature and the TSA's signature.
|
||||
Finally, we hand back to the code that invoked the protocol the finished transaction in a couple of different forms.
|
||||
|
||||
.. note:: ``ProtocolLogic`` classes can be composed together. Here, we see the use of the ``subProtocol`` method, which
|
||||
is given an instance of ``TimestampingProtocol``. This protocol will run to completion and yield a result, almost
|
||||
as if it's a regular method call. In fact, under the hood, all the ``subProtocol`` method does is pass the current
|
||||
fiber object into the newly created object and then run ``call()`` on it ... so it basically _is_ just a method call.
|
||||
This is where we can see the benefits of using continuations/fibers as a programming model.
|
||||
|
||||
Let's fill out the ``receiveAndCheckProposedTransaction()`` method.
|
||||
|
||||
.. container:: codeset
|
||||
@ -273,28 +279,37 @@ Let's fill out the ``receiveAndCheckProposedTransaction()`` method.
|
||||
// Make the first message we'll send to kick off the protocol.
|
||||
val hello = SellerTradeInfo(assetToSell, price, myKeyPair.public, sessionID)
|
||||
|
||||
val maybePartialTX = sendAndReceive(TRADE_TOPIC, buyerSessionID, sessionID, hello, SignedTransaction::class.java)
|
||||
val partialTX = maybePartialTX.validate {
|
||||
it.verifySignatures()
|
||||
logger.trace { "Received partially signed transaction" }
|
||||
val wtx: WireTransaction = it.tx
|
||||
val maybeSTX = sendAndReceive<SignedTransaction>(TRADE_TOPIC, otherSide, buyerSessionID, sessionID, hello)
|
||||
|
||||
requireThat {
|
||||
"transaction sends us the right amount of cash" by (wtx.outputs.sumCashBy(myKeyPair.public) == price)
|
||||
// There are all sorts of funny games a malicious secondary might play here, we should fix them:
|
||||
//
|
||||
// - This tx may attempt to send some assets we aren't intending to sell to the secondary, if
|
||||
// we're reusing keys! So don't reuse keys!
|
||||
// - This tx may not be valid according to the contracts of the input states, so we must resolve
|
||||
// and fully audit the transaction chains to convince ourselves that it is actually valid.
|
||||
// - This tx may include output states that impose odd conditions on the movement of the cash,
|
||||
// once we implement state pairing.
|
||||
//
|
||||
// but the goal of this code is not to be fully secure, but rather, just to find good ways to
|
||||
// express protocol state machines on top of the messaging layer.
|
||||
}
|
||||
maybeSTX.validate {
|
||||
// Check that the tx proposed by the buyer is valid.
|
||||
val missingSigs = it.verify(throwIfSignaturesAreMissing = false)
|
||||
if (missingSigs != setOf(myKeyPair.public, timestampingAuthority.identity.owningKey))
|
||||
throw SignatureException("The set of missing signatures is not as expected: $missingSigs")
|
||||
|
||||
val wtx: WireTransaction = it.tx
|
||||
logger.trace { "Received partially signed transaction: ${it.id}" }
|
||||
|
||||
checkDependencies(it)
|
||||
|
||||
// This verifies that the transaction is contract-valid, even though it is missing signatures.
|
||||
serviceHub.verifyTransaction(wtx.toLedgerTransaction(serviceHub.identityService))
|
||||
|
||||
if (wtx.outputs.sumCashBy(myKeyPair.public) != price)
|
||||
throw IllegalArgumentException("Transaction is not sending us the right amounnt of cash")
|
||||
|
||||
// There are all sorts of funny games a malicious secondary might play here, we should fix them:
|
||||
//
|
||||
// - This tx may attempt to send some assets we aren't intending to sell to the secondary, if
|
||||
// we're reusing keys! So don't reuse keys!
|
||||
// - This tx may include output states that impose odd conditions on the movement of the cash,
|
||||
// once we implement state pairing.
|
||||
//
|
||||
// but the goal of this code is not to be fully secure (yet), but rather, just to find good ways to
|
||||
// express protocol state machines on top of the messaging layer.
|
||||
|
||||
return it
|
||||
}
|
||||
return partialTX
|
||||
}
|
||||
|
||||
That's pretty straightforward. We generate a session ID to identify what's happening on the seller side, fill out
|
||||
@ -305,12 +320,7 @@ the initial protocol message, and then call ``sendAndReceive``. This function ta
|
||||
- The thing to send. It'll be serialised and sent automatically.
|
||||
- Finally a type argument, which is the kind of object we're expecting to receive from the other side.
|
||||
|
||||
It returns a simple wrapper class, ``UntrustworthyData<SignedTransaction>``, which is just a marker class that reminds
|
||||
us that the data came from a potentially malicious external source and may have been tampered with or be unexpected in
|
||||
other ways. It doesn't add any functionality, but acts as a reminder to "scrub" the data before use. Here, our scrubbing
|
||||
simply involves checking the signatures on it. Then we could go ahead and do some more involved checks.
|
||||
|
||||
Once sendAndReceive is called, the call method will be suspended into a continuation. When it gets back we'll do a log
|
||||
Once ``sendAndReceive`` is called, the call method will be suspended into a continuation. When it gets back we'll do a log
|
||||
message. The buyer is supposed to send us a transaction with all the right inputs/outputs/commands in return, with their
|
||||
cash put into the transaction and their signature on it authorising the movement of the cash.
|
||||
|
||||
@ -323,6 +333,39 @@ cash put into the transaction and their signature on it authorising the movement
|
||||
doing things like creating threads from inside these calls would be a bad idea. They should only contain business
|
||||
logic.
|
||||
|
||||
You get back a simple wrapper class, ``UntrustworthyData<SignedTransaction>``, which is just a marker class that reminds
|
||||
us that the data came from a potentially malicious external source and may have been tampered with or be unexpected in
|
||||
other ways. It doesn't add any functionality, but acts as a reminder to "scrub" the data before use. Here, our scrubbing
|
||||
simply involves checking the signatures on it. Then we go ahead and check all the dependencies of this partial
|
||||
transaction for validity. Here's the code to do that:
|
||||
|
||||
.. container:: codeset
|
||||
|
||||
.. sourcecode:: kotlin
|
||||
|
||||
@Suspendable
|
||||
private fun checkDependencies(stx: SignedTransaction) {
|
||||
// Download and check all the transactions that this transaction depends on, but do not check this
|
||||
// transaction itself.
|
||||
val dependencyTxIDs = stx.tx.inputs.map { it.txhash }.toSet()
|
||||
subProtocol(ResolveTransactionsProtocol(dependencyTxIDs, otherSide))
|
||||
}
|
||||
|
||||
This is simple enough: we mark the method as ``@Suspendable`` because we're going to invoke a sub-protocol, extract the
|
||||
IDs of the transactions the proposed transaction depends on, and then uses a protocol provided by the system to download
|
||||
and check them all. This protocol does a breadth-first search over the dependency graph, bottoming out at issuance
|
||||
transactions that don't have any inputs themselves. Once the node has audited the transaction history, all the dependencies
|
||||
are committed to the node's local database so they won't be checked again next time.
|
||||
|
||||
.. note:: Transaction dependency resolution assumes that the peer you got the transaction from has all of the
|
||||
dependencies itself. It must do, otherwise it could not have convinced itself that the dependencies were themselves
|
||||
valid. It's important to realise that requesting only the transactions we require is a privacy leak, because if
|
||||
we don't download a transaction from the peer, they know we must have already seen it before. Fixing this privacy
|
||||
leak will come later.
|
||||
|
||||
After the dependencies, we check the proposed trading transaction for validity by running the contracts for that as
|
||||
well (but having handled the fact that some signatures are missing ourselves).
|
||||
|
||||
Here's the rest of the code:
|
||||
|
||||
.. container:: codeset
|
||||
@ -331,18 +374,10 @@ Here's the rest of the code:
|
||||
|
||||
open fun signWithOurKey(partialTX: SignedTransaction) = myKeyPair.signWithECDSA(partialTX.txBits)
|
||||
|
||||
@Suspendable
|
||||
open fun timestamp(partialTX: SignedTransaction): DigitalSignature.LegallyIdentifiable {
|
||||
return TimestamperClient(this, timestampingAuthority).timestamp(partialTX.txBits)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
open fun sendSignatures(partialTX: SignedTransaction, ourSignature: DigitalSignature.WithKey,
|
||||
tsaSig: DigitalSignature.LegallyIdentifiable): SignedTransaction {
|
||||
val fullySigned = partialTX + tsaSig + ourSignature
|
||||
fullySigned.verify()
|
||||
|
||||
// TODO: We should run it through our full TransactionGroup of all transactions here.
|
||||
|
||||
logger.trace { "Built finished transaction, sending back to secondary!" }
|
||||
|
||||
@ -352,8 +387,8 @@ Here's the rest of the code:
|
||||
|
||||
It's should be all pretty straightforward: here, ``txBits`` is the raw byte array representing the transaction.
|
||||
|
||||
In ``sendSignatures``, we take the two signatures we calculated, then add them to the partial transaction we were sent
|
||||
and verify that the signatures all make sense. This should never fail: it's just a sanity check. Finally, we wrap the
|
||||
In ``sendSignatures``, we take the two signatures we calculated, then add them to the partial transaction we were sent.
|
||||
We provide an overload for the + operator so signatures can be added to a SignedTransaction easily. Finally, we wrap the
|
||||
two signatures in a simple wrapper message class and send it back. The send won't block waiting for an acknowledgement,
|
||||
but the underlying message queue software will retry delivery if the other side has gone away temporarily.
|
||||
|
||||
@ -389,25 +424,27 @@ OK, let's do the same for the buyer side:
|
||||
@Suspendable
|
||||
open fun receiveAndValidateTradeRequest(): SellerTradeInfo {
|
||||
// Wait for a trade request to come in on our pre-provided session ID.
|
||||
val maybeTradeRequest = receive(TRADE_TOPIC, sessionID, SellerTradeInfo::class.java)
|
||||
val maybeTradeRequest = receive<SellerTradeInfo>(TRADE_TOPIC, sessionID)
|
||||
|
||||
val tradeRequest = maybeTradeRequest.validate {
|
||||
maybeTradeRequest.validate {
|
||||
// What is the seller trying to sell us?
|
||||
val assetTypeName = it.assetForSale.state.javaClass.name
|
||||
logger.trace { "Got trade request for a $assetTypeName" }
|
||||
val asset = it.assetForSale.state
|
||||
val assetTypeName = asset.javaClass.name
|
||||
logger.trace { "Got trade request for a $assetTypeName: ${it.assetForSale}" }
|
||||
|
||||
// Check the start message for acceptability.
|
||||
check(it.sessionID > 0)
|
||||
if (it.price > acceptablePrice)
|
||||
throw UnacceptablePriceException(it.price)
|
||||
if (!typeToBuy.isInstance(it.assetForSale.state))
|
||||
if (!typeToBuy.isInstance(asset))
|
||||
throw AssetMismatchException(typeToBuy.name, assetTypeName)
|
||||
}
|
||||
|
||||
// TODO: Either look up the stateref here in our local db, or accept a long chain of states and
|
||||
// validate them to audit the other side and ensure it actually owns the state we are being offered!
|
||||
// For now, just assume validity!
|
||||
return tradeRequest
|
||||
// Check the transaction that contains the state which is being resolved.
|
||||
// We only have a hash here, so if we don't know it already, we have to ask for it.
|
||||
subProtocol(ResolveTransactionsProtocol(setOf(it.assetForSale.ref.txhash), otherSide))
|
||||
|
||||
return it
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@ -416,7 +453,7 @@ OK, let's do the same for the buyer side:
|
||||
|
||||
// TODO: Protect against the seller terminating here and leaving us in the lurch without the final tx.
|
||||
|
||||
return sendAndReceive(TRADE_TOPIC, otherSide, theirSessionID, sessionID, stx, SignaturesFromSeller::class.java).validate {}
|
||||
return sendAndReceive(TRADE_TOPIC, otherSide, theirSessionID, sessionID, stx, SignaturesFromSeller::class.java).validate { it }
|
||||
}
|
||||
|
||||
open fun signWithOurKeys(cashSigningPubKeys: List<PublicKey>, ptx: TransactionBuilder): SignedTransaction {
|
||||
@ -426,12 +463,7 @@ OK, let's do the same for the buyer side:
|
||||
ptx.signWith(KeyPair(k, priv))
|
||||
}
|
||||
|
||||
val stx = ptx.toSignedTransaction(checkSufficientSignatures = false)
|
||||
stx.verifySignatures() // Verifies that we generated a signed transaction correctly.
|
||||
|
||||
// TODO: Could run verify() here to make sure the only signature missing is the sellers.
|
||||
|
||||
return stx
|
||||
return ptx.toSignedTransaction(checkSufficientSignatures = false)
|
||||
}
|
||||
|
||||
open fun assembleSharedTX(tradeRequest: SellerTradeInfo): Pair<TransactionBuilder, List<PublicKey>> {
|
||||
|
@ -637,7 +637,7 @@ the exact message).
|
||||
|
||||
|
||||
Adding a generation API to your contract
|
||||
--------------------------------------
|
||||
----------------------------------------
|
||||
|
||||
Contract classes **must** provide a verify function, but they may optionally also provide helper functions to simplify
|
||||
their usage. A simple class of functions most contracts provide are *generation functions*, which either create or
|
||||
|
@ -0,0 +1,89 @@
|
||||
/*
|
||||
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
|
||||
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
|
||||
* set forth therein.
|
||||
*
|
||||
* All other rights reserved.
|
||||
*/
|
||||
|
||||
package contracts.protocols
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import core.SignedTransaction
|
||||
import core.crypto.SecureHash
|
||||
import core.messaging.ProtocolLogic
|
||||
import core.messaging.SingleMessageRecipient
|
||||
import core.messaging.UntrustworthyData
|
||||
import core.node.DataVendingService
|
||||
import core.random63BitValue
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* Given a set of tx hashes (IDs), either loads them from local disk or asks the remote peer to provide them.
|
||||
* A malicious response in which the data provided by the remote peer does not hash to the requested hash results in
|
||||
* [DownloadedVsRequestedDataMismatch] being thrown. If the remote peer doesn't have an entry, it results in a
|
||||
* HashNotFound. Note that returned transactions are not inserted into the database, because it's up to the caller
|
||||
* to actually verify the transactions are valid.
|
||||
*/
|
||||
class FetchTransactionsProtocol(private val requests: Set<SecureHash>,
|
||||
private val otherSide: SingleMessageRecipient) : ProtocolLogic<FetchTransactionsProtocol.Result>() {
|
||||
|
||||
data class Result(val fromDisk: List<SignedTransaction>, val downloaded: List<SignedTransaction>)
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Result {
|
||||
val sid = random63BitValue()
|
||||
|
||||
// Load the transactions we have from disk and figure out which we're missing.
|
||||
val (fromDisk, toFetch) = loadWhatWeHave()
|
||||
|
||||
return if (toFetch.isEmpty()) {
|
||||
Result(fromDisk, emptyList())
|
||||
} else {
|
||||
logger.trace("Requesting ${toFetch.size} dependency(s) for verification")
|
||||
|
||||
val fetchReq = DataVendingService.Request(toFetch, serviceHub.networkService.myAddress, sid)
|
||||
val maybeTxns = sendAndReceive<ArrayList<SignedTransaction?>>("platform.fetch.tx", otherSide, 0, sid, fetchReq)
|
||||
// Check for a buggy/malicious peer answering with something that we didn't ask for.
|
||||
// Note that we strip the UntrustworthyData marker here, but of course the returned transactions may be
|
||||
// invalid in other ways! Perhaps we should keep it.
|
||||
Result(fromDisk, validateTXFetchResponse(maybeTxns, toFetch))
|
||||
}
|
||||
}
|
||||
|
||||
private fun loadWhatWeHave(): Pair<List<SignedTransaction>, List<SecureHash>> {
|
||||
val fromDisk = ArrayList<SignedTransaction>()
|
||||
val toFetch = ArrayList<SecureHash>()
|
||||
for (txid in requests) {
|
||||
val stx = serviceHub.storageService.validatedTransactions[txid]
|
||||
if (stx == null)
|
||||
toFetch += txid
|
||||
else
|
||||
fromDisk += stx
|
||||
}
|
||||
return Pair(fromDisk, toFetch)
|
||||
}
|
||||
|
||||
private fun validateTXFetchResponse(maybeTxns: UntrustworthyData<ArrayList<SignedTransaction?>>,
|
||||
requests: List<SecureHash>): List<SignedTransaction> =
|
||||
maybeTxns.validate { response ->
|
||||
if (response.size != requests.size)
|
||||
throw BadAnswer()
|
||||
for ((index, resp) in response.withIndex()) {
|
||||
if (resp == null) throw HashNotFound(requests[index])
|
||||
}
|
||||
val answers = response.requireNoNulls()
|
||||
// Check transactions actually hash to what we requested, if this fails the remote node
|
||||
// is a malicious protocol violator or buggy.
|
||||
for ((index, stx) in answers.withIndex())
|
||||
if (stx.id != requests[index])
|
||||
throw DownloadedVsRequestedDataMismatch(requests[index], stx.id)
|
||||
|
||||
answers
|
||||
}
|
||||
|
||||
open class BadAnswer : Exception()
|
||||
class HashNotFound(val requested: SecureHash) : BadAnswer()
|
||||
class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : BadAnswer()
|
||||
}
|
||||
|
@ -0,0 +1,134 @@
|
||||
/*
|
||||
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
|
||||
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
|
||||
* set forth therein.
|
||||
*
|
||||
* All other rights reserved.
|
||||
*/
|
||||
|
||||
package contracts.protocols
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import core.LedgerTransaction
|
||||
import core.SignedTransaction
|
||||
import core.TransactionGroup
|
||||
import core.WireTransaction
|
||||
import core.crypto.SecureHash
|
||||
import core.messaging.ProtocolLogic
|
||||
import core.messaging.SingleMessageRecipient
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* This protocol fetches each transaction identified by the given hashes from either disk or network, along with all
|
||||
* their dependencies, and verifies them together using a single [TransactionGroup]. If no exception is thrown, then
|
||||
* all the transactions have been successfully verified and inserted into the local database.
|
||||
*
|
||||
* A couple of constructors are provided that accept a single transaction. When these are used, the dependencies of that
|
||||
* transaction are resolved and then the transaction itself is verified. Again, if successful, the results are inserted
|
||||
* into the database as long as a [SignedTransaction] was provided. If only the [WireTransaction] form was provided
|
||||
* then this isn't enough to put into the local database, so only the dependencies are inserted. This way to use the
|
||||
* protocol is helpful when resolving and verifying a finished but partially signed transaction.
|
||||
*/
|
||||
class ResolveTransactionsProtocol(private val txHashes: Set<SecureHash>,
|
||||
private val otherSide: SingleMessageRecipient) : ProtocolLogic<Unit>() {
|
||||
|
||||
companion object {
|
||||
private fun dependencyIDs(wtx: WireTransaction) = wtx.inputs.map { it.txhash }.toSet()
|
||||
}
|
||||
|
||||
class ExcessivelyLargeTransactionGraph() : Exception()
|
||||
|
||||
// Transactions to verify after the dependencies.
|
||||
private var stx: SignedTransaction? = null
|
||||
private var wtx: WireTransaction? = null
|
||||
|
||||
constructor(stx: SignedTransaction, otherSide: SingleMessageRecipient) : this(stx.tx, otherSide) {
|
||||
this.stx = stx
|
||||
}
|
||||
|
||||
constructor(wtx: WireTransaction, otherSide: SingleMessageRecipient) : this(dependencyIDs(wtx), otherSide) {
|
||||
this.wtx = wtx
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Unit {
|
||||
val toVerify = HashSet<LedgerTransaction>()
|
||||
val alreadyVerified = HashSet<LedgerTransaction>()
|
||||
val downloadedSignedTxns = ArrayList<SignedTransaction>()
|
||||
|
||||
// This fills out toVerify, alreadyVerified (roots) and downloadedSignedTxns.
|
||||
fetchDependenciesAndCheckSignatures(txHashes, toVerify, alreadyVerified, downloadedSignedTxns)
|
||||
|
||||
if (stx != null) {
|
||||
// Check the signatures on the stx first.
|
||||
toVerify += stx!!.verifyToLedgerTransaction(serviceHub.identityService)
|
||||
} else if (wtx != null) {
|
||||
wtx!!.toLedgerTransaction(serviceHub.identityService)
|
||||
}
|
||||
|
||||
// Run all the contracts and throw an exception if any of them reject.
|
||||
TransactionGroup(toVerify, alreadyVerified).verify(serviceHub.storageService.contractPrograms)
|
||||
|
||||
// Now write all the transactions we just validated back to the database for next time, including
|
||||
// signatures so we can serve up these transactions to other peers when we, in turn, send one that
|
||||
// depends on them onto another peer.
|
||||
//
|
||||
// It may seem tempting to write transactions to the database as we receive them, instead of all at once
|
||||
// here at the end. Doing it this way avoids cases where a transaction is in the database but its
|
||||
// dependencies aren't, or an unvalidated and possibly broken tx is there.
|
||||
serviceHub.storageService.validatedTransactions.putAll(downloadedSignedTxns.associateBy { it.id })
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun fetchDependenciesAndCheckSignatures(depsToCheck: Set<SecureHash>,
|
||||
toVerify: HashSet<LedgerTransaction>,
|
||||
alreadyVerified: HashSet<LedgerTransaction>,
|
||||
downloadedSignedTxns: ArrayList<SignedTransaction>) {
|
||||
// Maintain a work queue of all hashes to load/download, initialised with our starting set.
|
||||
// Then either fetch them from the database or request them from the other side. Look up the
|
||||
// signatures against our identity database, filtering the transactions into 'already checked'
|
||||
// and 'need to check' sets.
|
||||
//
|
||||
// TODO: This approach has two problems. Analyze and resolve them:
|
||||
//
|
||||
// (1) This protocol leaks private data. If you download a transaction and then do NOT request a
|
||||
// dependency, it means you already have it, which in turn means you must have been involved with it before
|
||||
// somehow, either in the tx itself or in any following spend of it. If there were no following spends, then
|
||||
// your peer knows for sure that you were involved ... this is bad! The only obvious ways to fix this are
|
||||
// something like onion routing of requests, secure hardware, or both.
|
||||
//
|
||||
// (2) If the identity service changes the assumed identity of one of the public keys, it's possible
|
||||
// that the "tx in db is valid" invariant is violated if one of the contracts checks the identity! Should
|
||||
// the db contain the identities that were resolved when the transaction was first checked, or should we
|
||||
// accept this kind of change is possible?
|
||||
|
||||
val nextRequests = LinkedHashSet<SecureHash>() // Keep things unique but ordered, for unit test stability.
|
||||
nextRequests.addAll(depsToCheck)
|
||||
|
||||
var limitCounter = 0
|
||||
while (nextRequests.isNotEmpty()) {
|
||||
val (fromDisk, downloads) = subProtocol(FetchTransactionsProtocol(nextRequests, otherSide))
|
||||
nextRequests.clear()
|
||||
|
||||
// Resolve any legal identities from known public keys in the signatures.
|
||||
val downloadedTxns = downloads.map { it.verifyToLedgerTransaction(serviceHub.identityService) }
|
||||
|
||||
// Do the same for transactions loaded from disk (i.e. we checked them previously).
|
||||
val loadedTxns = fromDisk.map { it.verifyToLedgerTransaction(serviceHub.identityService) }
|
||||
|
||||
toVerify.addAll(downloadedTxns)
|
||||
alreadyVerified.addAll(loadedTxns)
|
||||
downloadedSignedTxns.addAll(downloads)
|
||||
|
||||
// And now add all the input states to the work queue for database or remote resolution.
|
||||
nextRequests.addAll(downloadedTxns.flatMap { it.inputs }.map { it.txhash })
|
||||
|
||||
// And loop around ...
|
||||
// TODO: Figure out a more appropriate DOS limit here, 5000 is simply a guess.
|
||||
// TODO: Unit test the DoS limit.
|
||||
limitCounter += nextRequests.size
|
||||
if (limitCounter > 5000)
|
||||
throw ExcessivelyLargeTransactionGraph()
|
||||
}
|
||||
}
|
||||
}
|
@ -16,13 +16,14 @@ import core.*
|
||||
import core.crypto.DigitalSignature
|
||||
import core.crypto.signWithECDSA
|
||||
import core.messaging.LegallyIdentifiableNode
|
||||
import core.messaging.ProtocolStateMachine
|
||||
import core.messaging.ProtocolLogic
|
||||
import core.messaging.SingleMessageRecipient
|
||||
import core.messaging.StateMachineManager
|
||||
import core.node.TimestamperClient
|
||||
import core.node.TimestampingProtocol
|
||||
import core.utilities.trace
|
||||
import java.security.KeyPair
|
||||
import java.security.PublicKey
|
||||
import java.security.SignatureException
|
||||
import java.time.Instant
|
||||
|
||||
/**
|
||||
@ -55,16 +56,19 @@ object TwoPartyTradeProtocol {
|
||||
otherSide: SingleMessageRecipient, assetToSell: StateAndRef<OwnableState>, price: Amount,
|
||||
myKeyPair: KeyPair, buyerSessionID: Long): ListenableFuture<SignedTransaction> {
|
||||
val seller = Seller(otherSide, timestampingAuthority, assetToSell, price, myKeyPair, buyerSessionID)
|
||||
smm.add("$TRADE_TOPIC.seller", seller)
|
||||
return seller.resultFuture
|
||||
return smm.add("$TRADE_TOPIC.seller", seller)
|
||||
}
|
||||
|
||||
fun runBuyer(smm: StateMachineManager, timestampingAuthority: LegallyIdentifiableNode,
|
||||
otherSide: SingleMessageRecipient, acceptablePrice: Amount, typeToBuy: Class<out OwnableState>,
|
||||
sessionID: Long): ListenableFuture<SignedTransaction> {
|
||||
val buyer = Buyer(otherSide, timestampingAuthority.identity, acceptablePrice, typeToBuy, sessionID)
|
||||
smm.add("$TRADE_TOPIC.buyer", buyer)
|
||||
return buyer.resultFuture
|
||||
return smm.add("$TRADE_TOPIC.buyer", buyer)
|
||||
}
|
||||
|
||||
class UnacceptablePriceException(val givenPrice: Amount) : Exception()
|
||||
class AssetMismatchException(val expectedTypeName: String, val typeName: String) : Exception() {
|
||||
override fun toString() = "The submitted asset didn't match the expected type: $expectedTypeName vs $typeName"
|
||||
}
|
||||
|
||||
// This object is serialised to the network and is the first protocol message the seller sends to the buyer.
|
||||
@ -82,14 +86,14 @@ object TwoPartyTradeProtocol {
|
||||
val assetToSell: StateAndRef<OwnableState>,
|
||||
val price: Amount,
|
||||
val myKeyPair: KeyPair,
|
||||
val buyerSessionID: Long) : ProtocolStateMachine<SignedTransaction>() {
|
||||
val buyerSessionID: Long) : ProtocolLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val partialTX: SignedTransaction = receiveAndCheckProposedTransaction()
|
||||
|
||||
// These two steps could be done in parallel, in theory. Our framework doesn't support that yet though.
|
||||
val ourSignature = signWithOurKey(partialTX)
|
||||
val tsaSig = timestamp(partialTX)
|
||||
val tsaSig = subProtocol(TimestampingProtocol(timestampingAuthority, partialTX.txBits))
|
||||
|
||||
val signedTransaction = sendSignatures(partialTX, ourSignature, tsaSig)
|
||||
|
||||
@ -97,50 +101,60 @@ object TwoPartyTradeProtocol {
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
open fun receiveAndCheckProposedTransaction(): SignedTransaction {
|
||||
private fun receiveAndCheckProposedTransaction(): SignedTransaction {
|
||||
val sessionID = random63BitValue()
|
||||
|
||||
// Make the first message we'll send to kick off the protocol.
|
||||
val hello = SellerTradeInfo(assetToSell, price, myKeyPair.public, sessionID)
|
||||
|
||||
val maybePartialTX = sendAndReceive(TRADE_TOPIC, otherSide, buyerSessionID, sessionID, hello, SignedTransaction::class.java)
|
||||
val partialTX = maybePartialTX.validate {
|
||||
it.verifySignatures()
|
||||
logger.trace { "Received partially signed transaction" }
|
||||
val maybeSTX = sendAndReceive<SignedTransaction>(TRADE_TOPIC, otherSide, buyerSessionID, sessionID, hello)
|
||||
|
||||
maybeSTX.validate {
|
||||
// Check that the tx proposed by the buyer is valid.
|
||||
val missingSigs = it.verify(throwIfSignaturesAreMissing = false)
|
||||
if (missingSigs != setOf(myKeyPair.public, timestampingAuthority.identity.owningKey))
|
||||
throw SignatureException("The set of missing signatures is not as expected: $missingSigs")
|
||||
|
||||
val wtx: WireTransaction = it.tx
|
||||
logger.trace { "Received partially signed transaction: ${it.id}" }
|
||||
|
||||
requireThat {
|
||||
"transaction sends us the right amount of cash" by (wtx.outputs.sumCashBy(myKeyPair.public) == price)
|
||||
// There are all sorts of funny games a malicious secondary might play here, we should fix them:
|
||||
//
|
||||
// - This tx may attempt to send some assets we aren't intending to sell to the secondary, if
|
||||
// we're reusing keys! So don't reuse keys!
|
||||
// - This tx may not be valid according to the contracts of the input states, so we must resolve
|
||||
// and fully audit the transaction chains to convince ourselves that it is actually valid.
|
||||
// - This tx may include output states that impose odd conditions on the movement of the cash,
|
||||
// once we implement state pairing.
|
||||
//
|
||||
// but the goal of this code is not to be fully secure, but rather, just to find good ways to
|
||||
// express protocol state machines on top of the messaging layer.
|
||||
}
|
||||
checkDependencies(it)
|
||||
|
||||
// This verifies that the transaction is contract-valid, even though it is missing signatures.
|
||||
serviceHub.verifyTransaction(wtx.toLedgerTransaction(serviceHub.identityService))
|
||||
|
||||
if (wtx.outputs.sumCashBy(myKeyPair.public) != price)
|
||||
throw IllegalArgumentException("Transaction is not sending us the right amounnt of cash")
|
||||
|
||||
// There are all sorts of funny games a malicious secondary might play here, we should fix them:
|
||||
//
|
||||
// - This tx may attempt to send some assets we aren't intending to sell to the secondary, if
|
||||
// we're reusing keys! So don't reuse keys!
|
||||
// - This tx may include output states that impose odd conditions on the movement of the cash,
|
||||
// once we implement state pairing.
|
||||
//
|
||||
// but the goal of this code is not to be fully secure (yet), but rather, just to find good ways to
|
||||
// express protocol state machines on top of the messaging layer.
|
||||
|
||||
return it
|
||||
}
|
||||
return partialTX
|
||||
}
|
||||
|
||||
open fun signWithOurKey(partialTX: SignedTransaction) = myKeyPair.signWithECDSA(partialTX.txBits)
|
||||
|
||||
@Suspendable
|
||||
open fun timestamp(partialTX: SignedTransaction): DigitalSignature.LegallyIdentifiable {
|
||||
return TimestamperClient(this, timestampingAuthority).timestamp(partialTX.txBits)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
open fun sendSignatures(partialTX: SignedTransaction, ourSignature: DigitalSignature.WithKey,
|
||||
private fun checkDependencies(stx: SignedTransaction) {
|
||||
// Download and check all the transactions that this transaction depends on, but do not check this
|
||||
// transaction itself.
|
||||
val dependencyTxIDs = stx.tx.inputs.map { it.txhash }.toSet()
|
||||
subProtocol(ResolveTransactionsProtocol(dependencyTxIDs, otherSide))
|
||||
}
|
||||
|
||||
private fun signWithOurKey(partialTX: SignedTransaction) = myKeyPair.signWithECDSA(partialTX.txBits)
|
||||
|
||||
@Suspendable
|
||||
private fun sendSignatures(partialTX: SignedTransaction, ourSignature: DigitalSignature.WithKey,
|
||||
tsaSig: DigitalSignature.LegallyIdentifiable): SignedTransaction {
|
||||
val fullySigned = partialTX + tsaSig + ourSignature
|
||||
|
||||
// TODO: We should run it through our full TransactionGroup of all transactions here.
|
||||
|
||||
logger.trace { "Built finished transaction, sending back to secondary!" }
|
||||
|
||||
send(TRADE_TOPIC, otherSide, buyerSessionID, SignaturesFromSeller(tsaSig, ourSignature))
|
||||
@ -148,16 +162,12 @@ object TwoPartyTradeProtocol {
|
||||
}
|
||||
}
|
||||
|
||||
class UnacceptablePriceException(val givenPrice: Amount) : Exception()
|
||||
class AssetMismatchException(val expectedTypeName: String, val typeName: String) : Exception() {
|
||||
override fun toString() = "The submitted asset didn't match the expected type: $expectedTypeName vs $typeName"
|
||||
}
|
||||
|
||||
open class Buyer(val otherSide: SingleMessageRecipient,
|
||||
val timestampingAuthority: Party,
|
||||
val acceptablePrice: Amount,
|
||||
val typeToBuy: Class<out OwnableState>,
|
||||
val sessionID: Long) : ProtocolStateMachine<SignedTransaction>() {
|
||||
val sessionID: Long) : ProtocolLogic<SignedTransaction>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val tradeRequest = receiveAndValidateTradeRequest()
|
||||
@ -174,54 +184,51 @@ object TwoPartyTradeProtocol {
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
open fun receiveAndValidateTradeRequest(): SellerTradeInfo {
|
||||
private fun receiveAndValidateTradeRequest(): SellerTradeInfo {
|
||||
// Wait for a trade request to come in on our pre-provided session ID.
|
||||
val maybeTradeRequest = receive(TRADE_TOPIC, sessionID, SellerTradeInfo::class.java)
|
||||
val maybeTradeRequest = receive<SellerTradeInfo>(TRADE_TOPIC, sessionID)
|
||||
|
||||
val tradeRequest = maybeTradeRequest.validate {
|
||||
maybeTradeRequest.validate {
|
||||
// What is the seller trying to sell us?
|
||||
val assetTypeName = it.assetForSale.state.javaClass.name
|
||||
logger.trace { "Got trade request for a $assetTypeName" }
|
||||
val asset = it.assetForSale.state
|
||||
val assetTypeName = asset.javaClass.name
|
||||
logger.trace { "Got trade request for a $assetTypeName: ${it.assetForSale}" }
|
||||
|
||||
// Check the start message for acceptability.
|
||||
check(it.sessionID > 0)
|
||||
if (it.price > acceptablePrice)
|
||||
throw UnacceptablePriceException(it.price)
|
||||
if (!typeToBuy.isInstance(it.assetForSale.state))
|
||||
if (!typeToBuy.isInstance(asset))
|
||||
throw AssetMismatchException(typeToBuy.name, assetTypeName)
|
||||
}
|
||||
|
||||
// TODO: Either look up the stateref here in our local db, or accept a long chain of states and
|
||||
// validate them to audit the other side and ensure it actually owns the state we are being offered!
|
||||
// For now, just assume validity!
|
||||
return tradeRequest
|
||||
// Check the transaction that contains the state which is being resolved.
|
||||
// We only have a hash here, so if we don't know it already, we have to ask for it.
|
||||
subProtocol(ResolveTransactionsProtocol(setOf(it.assetForSale.ref.txhash), otherSide))
|
||||
|
||||
return it
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
open fun swapSignaturesWithSeller(stx: SignedTransaction, theirSessionID: Long): SignaturesFromSeller {
|
||||
private fun swapSignaturesWithSeller(stx: SignedTransaction, theirSessionID: Long): SignaturesFromSeller {
|
||||
logger.trace { "Sending partially signed transaction to seller" }
|
||||
|
||||
// TODO: Protect against the seller terminating here and leaving us in the lurch without the final tx.
|
||||
|
||||
return sendAndReceive(TRADE_TOPIC, otherSide, theirSessionID, sessionID, stx, SignaturesFromSeller::class.java).validate {}
|
||||
return sendAndReceive<SignaturesFromSeller>(TRADE_TOPIC, otherSide, theirSessionID, sessionID, stx).validate { it }
|
||||
}
|
||||
|
||||
open fun signWithOurKeys(cashSigningPubKeys: List<PublicKey>, ptx: TransactionBuilder): SignedTransaction {
|
||||
private fun signWithOurKeys(cashSigningPubKeys: List<PublicKey>, ptx: TransactionBuilder): SignedTransaction {
|
||||
// Now sign the transaction with whatever keys we need to move the cash.
|
||||
for (k in cashSigningPubKeys) {
|
||||
val priv = serviceHub.keyManagementService.toPrivate(k)
|
||||
ptx.signWith(KeyPair(k, priv))
|
||||
}
|
||||
|
||||
val stx = ptx.toSignedTransaction(checkSufficientSignatures = false)
|
||||
stx.verifySignatures() // Verifies that we generated a signed transaction correctly.
|
||||
|
||||
// TODO: Could run verify() here to make sure the only signature missing is the sellers.
|
||||
|
||||
return stx
|
||||
return ptx.toSignedTransaction(checkSufficientSignatures = false)
|
||||
}
|
||||
|
||||
open fun assembleSharedTX(tradeRequest: SellerTradeInfo): Pair<TransactionBuilder, List<PublicKey>> {
|
||||
private fun assembleSharedTX(tradeRequest: SellerTradeInfo): Pair<TransactionBuilder, List<PublicKey>> {
|
||||
val ptx = TransactionBuilder()
|
||||
// Add input and output states for the movement of cash, by using the Cash contract to generate the states.
|
||||
val wallet = serviceHub.walletService.currentWallet
|
||||
|
@ -8,12 +8,10 @@
|
||||
|
||||
package core
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import core.crypto.DigitalSignature
|
||||
import core.crypto.SecureHash
|
||||
import core.crypto.generateKeyPair
|
||||
import core.messaging.MessagingService
|
||||
import core.messaging.NetworkMap
|
||||
import core.serialization.SerializedBytes
|
||||
import java.security.KeyPair
|
||||
import java.security.PrivateKey
|
||||
import java.security.PublicKey
|
||||
@ -80,6 +78,18 @@ object DummyTimestampingAuthority {
|
||||
interface StorageService {
|
||||
fun <K,V> getMap(tableName: String): MutableMap<K, V>
|
||||
|
||||
/**
|
||||
* A map of hash->tx where tx has been signature/contract validated and the states are known to be correct.
|
||||
* The signatures aren't technically needed after that point, but we keep them around so that we can relay
|
||||
* the transaction data to other nodes that need it.
|
||||
*/
|
||||
val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
|
||||
|
||||
/**
|
||||
* A map of program hash->contract class type, used for verification.
|
||||
*/
|
||||
val contractPrograms: ContractFactory
|
||||
|
||||
/**
|
||||
* Returns the legal identity that this node is configured with. Assumed to be initialised when the node is
|
||||
* first installed.
|
||||
@ -100,4 +110,17 @@ interface ServiceHub {
|
||||
val storageService: StorageService
|
||||
val networkService: MessagingService
|
||||
val networkMapService: NetworkMap
|
||||
|
||||
/**
|
||||
* Given a [LedgerTransaction], looks up all its dependencies in the local database, uses the identity service to map
|
||||
* the [SignedTransaction]s the DB gives back into [LedgerTransaction]s, and then runs the smart contracts for the
|
||||
* transaction. If no exception is thrown, the transaction is valid.
|
||||
*/
|
||||
fun verifyTransaction(ltx: LedgerTransaction) {
|
||||
val dependencies = ltx.inputs.map {
|
||||
storageService.validatedTransactions[it.txhash] ?: throw TransactionResolutionException(it.txhash)
|
||||
}
|
||||
val ltxns = dependencies.map { it.verifyToLedgerTransaction(identityService) }
|
||||
TransactionGroup(setOf(ltx), ltxns.toSet()).verify(storageService.contractPrograms)
|
||||
}
|
||||
}
|
||||
|
@ -9,6 +9,7 @@
|
||||
package core
|
||||
|
||||
import core.crypto.SecureHash
|
||||
import core.utilities.loggerFor
|
||||
import java.util.*
|
||||
|
||||
class TransactionResolutionException(val hash: SecureHash) : Exception()
|
||||
@ -23,6 +24,9 @@ class TransactionConflictException(val conflictRef: StateRef, val tx1: LedgerTra
|
||||
* [nonVerifiedRoots] set. Transactions in the non-verified set are ignored other than for looking up input states.
|
||||
*/
|
||||
class TransactionGroup(val transactions: Set<LedgerTransaction>, val nonVerifiedRoots: Set<LedgerTransaction>) {
|
||||
companion object {
|
||||
val logger = loggerFor<TransactionGroup>()
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies the group and returns the set of resolved transactions.
|
||||
@ -55,6 +59,9 @@ class TransactionGroup(val transactions: Set<LedgerTransaction>, val nonVerified
|
||||
|
||||
for (tx in resolved)
|
||||
tx.verify(programMap)
|
||||
|
||||
logger.trace("Successfully run the contracts for ${resolved.size} transaction(s)")
|
||||
|
||||
return resolved
|
||||
}
|
||||
|
||||
@ -69,6 +76,8 @@ data class TransactionForVerification(val inStates: List<ContractState>,
|
||||
override fun equals(other: Any?) = other is TransactionForVerification && other.origHash == origHash
|
||||
|
||||
/**
|
||||
* Runs the smart contracts governing this transaction.
|
||||
*
|
||||
* @throws TransactionVerificationException if a contract throws an exception, the original is in the cause field
|
||||
* @throws IllegalStateException if a state refers to an unknown contract.
|
||||
*/
|
||||
|
@ -14,6 +14,7 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.google.common.base.Throwables
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
@ -28,8 +29,9 @@ import core.utilities.trace
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.PrintWriter
|
||||
import java.io.StringWriter
|
||||
import java.util.*
|
||||
import java.util.concurrent.Callable
|
||||
import java.util.concurrent.Executor
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
@ -44,11 +46,13 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* A "state machine" is a class with a single call method. The call method and any others it invokes are rewritten by
|
||||
* a bytecode rewriting engine called Quasar, to ensure the code can be suspended and resumed at any point.
|
||||
*
|
||||
* TODO: The framework should propagate exceptions and handle error handling automatically.
|
||||
* TODO: Session IDs should be set up and propagated automatically, on demand.
|
||||
* TODO: Consider the issue of continuation identity more deeply: is it a safe assumption that a serialised
|
||||
* continuation is always unique?
|
||||
* TODO: Think about how to bring the system to a clean stop so it can be upgraded without any serialised stacks on disk
|
||||
* TODO: Timeouts
|
||||
* TODO: Surfacing of exceptions via an API and/or management UI
|
||||
* TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt
|
||||
*/
|
||||
@ThreadSafe
|
||||
class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) {
|
||||
@ -57,7 +61,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
private val checkpointsMap = serviceHub.storageService.getMap<SecureHash, ByteArray>("state machines")
|
||||
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
|
||||
// property.
|
||||
private val _stateMachines = Collections.synchronizedList(ArrayList<ProtocolStateMachine<*>>())
|
||||
private val _stateMachines = Collections.synchronizedList(ArrayList<ProtocolLogic<*>>())
|
||||
|
||||
// This is a workaround for something Gradle does to us during unit tests. It replaces stderr with its own
|
||||
// class that inserts itself into a ThreadLocal. That then gets caught in fiber serialisation, which we don't
|
||||
@ -68,10 +72,11 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
// ever recover.
|
||||
val checkpointing: Boolean get() = !System.err.javaClass.name.contains("LinePerThreadBufferingOutputStream")
|
||||
|
||||
/** Returns a snapshot of the currently registered state machines. */
|
||||
val stateMachines: List<ProtocolStateMachine<*>> get() {
|
||||
/** Returns a list of all state machines executing the given protocol logic at the top level (subprotocols do not count) */
|
||||
fun <T> findStateMachines(klass: Class<out ProtocolLogic<T>>): List<Pair<ProtocolLogic<T>, ListenableFuture<T>>> {
|
||||
synchronized(_stateMachines) {
|
||||
return ArrayList(_stateMachines)
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
return _stateMachines.filterIsInstance(klass).map { it to (it.psm as ProtocolStateMachine<T>).resultFuture }
|
||||
}
|
||||
}
|
||||
|
||||
@ -91,6 +96,10 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
)
|
||||
|
||||
init {
|
||||
// Blank out the default uncaught exception handler because we always catch things ourselves, and the default
|
||||
// just redundantly prints stack traces to the logs.
|
||||
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable -> }
|
||||
|
||||
if (checkpointing)
|
||||
restoreCheckpoints()
|
||||
}
|
||||
@ -104,7 +113,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
// Grab the Kryo engine configured by Quasar for its own stuff, and then do our own configuration on top
|
||||
// so we can deserialised the nested stream that holds the fiber.
|
||||
val psm = deserializeFiber(checkpoint.serialisedFiber)
|
||||
_stateMachines.add(psm)
|
||||
_stateMachines.add(psm.logic)
|
||||
val logger = LoggerFactory.getLogger(checkpoint.loggerName)
|
||||
val awaitingObjectOfType = Class.forName(checkpoint.awaitingObjectOfType)
|
||||
val topic = checkpoint.awaitingTopic
|
||||
@ -114,12 +123,26 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
val obj: Any = THREAD_LOCAL_KRYO.get().readObject(Input(netMsg.data), awaitingObjectOfType)
|
||||
logger.trace { "<- $topic : message of type ${obj.javaClass.name}" }
|
||||
iterateStateMachine(psm, serviceHub.networkService, logger, obj, checkpointKey) {
|
||||
Fiber.unparkDeserialized(it, SameThreadFiberScheduler)
|
||||
try {
|
||||
Fiber.unparkDeserialized(it, SameThreadFiberScheduler)
|
||||
} catch(e: Throwable) {
|
||||
logError(e, logger, obj, topic, it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun logError(e: Throwable, logger: Logger, obj: Any, topic: String, psm: ProtocolStateMachine<*>) {
|
||||
logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " +
|
||||
"when handling a message of type ${obj.javaClass.name} on topic $topic")
|
||||
if (logger.isTraceEnabled) {
|
||||
val s = StringWriter()
|
||||
Throwables.getRootCause(e).printStackTrace(PrintWriter(s))
|
||||
logger.trace("Stack trace of protocol error is: $s")
|
||||
}
|
||||
}
|
||||
|
||||
private fun deserializeFiber(bits: ByteArray): ProtocolStateMachine<*> {
|
||||
val deserializer = Fiber.getFiberSerializer() as KryoSerializer
|
||||
val kryo = createKryo(deserializer.kryo)
|
||||
@ -132,12 +155,13 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
* The state machine will be persisted when it suspends, with automated restart if the StateMachineManager is
|
||||
* restarted with checkpointed state machines in the storage service.
|
||||
*/
|
||||
fun <T : ProtocolStateMachine<*>> add(loggerName: String, fiber: T): T {
|
||||
fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
|
||||
val logger = LoggerFactory.getLogger(loggerName)
|
||||
val fiber = ProtocolStateMachine(logic)
|
||||
iterateStateMachine(fiber, serviceHub.networkService, logger, null, null) {
|
||||
it.start()
|
||||
}
|
||||
return fiber
|
||||
return fiber.resultFuture
|
||||
}
|
||||
|
||||
private fun persistCheckpoint(prevCheckpointKey: SecureHash?, new: ByteArray): SecureHash {
|
||||
@ -168,7 +192,11 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
if (request is FiberRequest.NotExpectingResponse) {
|
||||
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
|
||||
iterateStateMachine(psm, net, logger, null, prevCheckpointKey) {
|
||||
Fiber.unpark(it, QUASAR_UNBLOCKER)
|
||||
try {
|
||||
Fiber.unpark(it, QUASAR_UNBLOCKER)
|
||||
} catch(e: Throwable) {
|
||||
logError(e, logger, request.obj!!, request.topic, it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -179,7 +207,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
|
||||
// We're back! Check if the fiber is finished and if so, clean up.
|
||||
if (psm.isTerminated) {
|
||||
_stateMachines.remove(psm)
|
||||
_stateMachines.remove(psm.logic)
|
||||
checkpointsMap.remove(prevCheckpointKey)
|
||||
}
|
||||
}
|
||||
@ -196,7 +224,11 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
val obj: Any = THREAD_LOCAL_KRYO.get().readObject(Input(netMsg.data), responseType)
|
||||
logger.trace { "<- $topic : message of type ${obj.javaClass.name}" }
|
||||
iterateStateMachine(psm, net, logger, obj, newCheckpointKey) {
|
||||
Fiber.unpark(it, QUASAR_UNBLOCKER)
|
||||
try {
|
||||
Fiber.unpark(it, QUASAR_UNBLOCKER)
|
||||
} catch(e: Throwable) {
|
||||
logError(e, logger, obj, topic, it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -205,51 +237,73 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
object SameThreadFiberScheduler : FiberExecutorScheduler("Same thread scheduler", MoreExecutors.directExecutor())
|
||||
|
||||
/**
|
||||
* The base class that should be used by any object that wishes to act as a protocol state machine. A PSM is
|
||||
* a kind of "fiber", and a fiber in turn is a bit like a thread, but a thread that can be suspended to the heap,
|
||||
* serialised to disk, and resumed on demand.
|
||||
* A sub-class of [ProtocolLogic<T>] implements a protocol flow using direct, straight line blocking code. Thus you
|
||||
* can write complex protocol logic in an ordinary fashion, without having to think about callbacks, restarting after
|
||||
* a node crash, how many instances of your protocol there are running and so on.
|
||||
*
|
||||
* Sub-classes should override the [call] method and return whatever the final result of the protocol is. Inside the
|
||||
* call method, the rules of normal object oriented programming are a little different:
|
||||
* Invoking the network will cause the call stack to be suspended onto the heap and then serialized to a database using
|
||||
* the Quasar fibers framework. Because of this, if you need access to data that might change over time, you should
|
||||
* request it just-in-time via the [serviceHub] property which is provided. Don't try and keep data you got from a
|
||||
* service across calls to send/receive/sendAndReceive because the world might change in arbitrary ways out from
|
||||
* underneath you, for instance, if the node is restarted or reconfigured!
|
||||
*
|
||||
* - You can call send/receive/sendAndReceive in order to suspend the state machine and request network interaction.
|
||||
* This does not block a thread and when a state machine is suspended like this, it will be serialised and written
|
||||
* to stable storage. That means all objects on the stack and referenced from fields must be serialisable as well
|
||||
* (with Kryo, so they don't have to implement the Java Serializable interface). The state machine may be resumed
|
||||
* at some arbitrary later point.
|
||||
* - Because of this, if you need access to data that might change over time, you should request it just-in-time
|
||||
* via the [serviceHub] property which is provided. Don't try and keep data you got from a service across calls to
|
||||
* send/receive/sendAndReceive because the world might change in arbitrary ways out from underneath you, for instance,
|
||||
* if the node is restarted or reconfigured!
|
||||
* Additionally, be aware of what data you pin either via the stack or in your [ProtocolLogic] implementation. Very large
|
||||
* objects or datasets will hurt performance by increasing the amount of data stored in each checkpoint.
|
||||
*
|
||||
* Note that the result of the [call] method can be obtained in a couple of different ways. One is to call the get
|
||||
* method, as the PSM is a [Future]. But that will block the calling thread until the result is ready, which may not
|
||||
* be what you want (unless you know it's finished already). So you can also use the [resultFuture] property, which is
|
||||
* a [ListenableFuture] and will let you register a callback.
|
||||
*
|
||||
* Once created, a PSM should be passed to a [StateMachineManager] which will start it and manage its execution.
|
||||
* If you'd like to use another ProtocolLogic class as a component of your own, construct it on the fly and then pass
|
||||
* it to the [subProtocol] method. It will return the result of that protocol when it completes.
|
||||
*/
|
||||
abstract class ProtocolStateMachine<R> : Fiber<R>("protocol", SameThreadFiberScheduler), Callable<R> {
|
||||
abstract class ProtocolLogic<T> {
|
||||
/** Reference to the [Fiber] instance that is the top level controller for the entire flow. */
|
||||
lateinit var psm: ProtocolStateMachine<*>
|
||||
|
||||
/** This is where you should log things to. */
|
||||
val logger: Logger get() = psm.logger
|
||||
/** Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts */
|
||||
val serviceHub: ServiceHub get() = psm.serviceHub
|
||||
|
||||
// Kotlin helpers that allow the use of generic types.
|
||||
inline fun <reified T : Any> sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long,
|
||||
sessionIDForReceive: Long, obj: Any): UntrustworthyData<T> {
|
||||
return psm.sendAndReceive(topic, destination, sessionIDForSend, sessionIDForReceive, obj, T::class.java)
|
||||
}
|
||||
inline fun <reified T : Any> receive(topic: String, sessionIDForReceive: Long): UntrustworthyData<T> {
|
||||
return psm.receive(topic, sessionIDForReceive, T::class.java)
|
||||
}
|
||||
@Suspendable fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any) {
|
||||
psm.send(topic, destination, sessionID, obj)
|
||||
}
|
||||
|
||||
/**
|
||||
* Invokes the given subprotocol by simply passing through this [ProtocolLogic]s reference to the
|
||||
* [ProtocolStateMachine] and then calling the [call] method.
|
||||
*/
|
||||
@Suspendable fun <R> subProtocol(subLogic: ProtocolLogic<R>): R {
|
||||
subLogic.psm = psm
|
||||
return subLogic.call()
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
abstract fun call(): T
|
||||
}
|
||||
|
||||
/**
|
||||
* A ProtocolStateMachine instance is a suspendable fiber that delegates all actual logic to a [ProtocolLogic] instance.
|
||||
* For any given flow there is only one PSM, even if that protocol invokes subprotocols.
|
||||
*
|
||||
* These classes are created by the [StateMachineManager] when a new protocol is started at the topmost level. If
|
||||
* a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost
|
||||
* logic element gets to return the value that the entire state machine resolves to.
|
||||
*/
|
||||
class ProtocolStateMachine<R>(val logic: ProtocolLogic<R>) : Fiber<R>("protocol", SameThreadFiberScheduler) {
|
||||
// These fields shouldn't be serialised, so they are marked @Transient.
|
||||
@Transient private var suspendFunc: ((result: FiberRequest, serFiber: ByteArray) -> Unit)? = null
|
||||
@Transient private var resumeWithObject: Any? = null
|
||||
@Transient lateinit var serviceHub: ServiceHub
|
||||
@Transient protected lateinit var logger: Logger
|
||||
@Transient private var _resultFuture: SettableFuture<R>? = SettableFuture.create<R>()
|
||||
@Transient lateinit var logger: Logger
|
||||
|
||||
init {
|
||||
setDefaultUncaughtExceptionHandler { strand, throwable ->
|
||||
logger.error("Caught error whilst running protocol state machine ${this.javaClass.name}", throwable)
|
||||
}
|
||||
}
|
||||
|
||||
/** This future will complete when the call method returns. */
|
||||
val resultFuture: ListenableFuture<R> get() {
|
||||
return _resultFuture ?: run {
|
||||
val f = SettableFuture.create<R>()
|
||||
_resultFuture = f
|
||||
return f
|
||||
}
|
||||
logic.psm = this
|
||||
}
|
||||
|
||||
fun prepareForResumeWith(serviceHub: ServiceHub, withObject: Any?, logger: Logger,
|
||||
@ -260,15 +314,28 @@ abstract class ProtocolStateMachine<R> : Fiber<R>("protocol", SameThreadFiberSch
|
||||
this.serviceHub = serviceHub
|
||||
}
|
||||
|
||||
// This line may look useless, but it's needed to convince the Quasar bytecode rewriter to do the right thing.
|
||||
@Suspendable override abstract fun call(): R
|
||||
@Transient private var _resultFuture: SettableFuture<R>? = SettableFuture.create<R>()
|
||||
|
||||
/** This future will complete when the call method returns. */
|
||||
val resultFuture: ListenableFuture<R> get() {
|
||||
return _resultFuture ?: run {
|
||||
val f = SettableFuture.create<R>()
|
||||
_resultFuture = f
|
||||
return f
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable @Suppress("UNCHECKED_CAST")
|
||||
override fun run(): R {
|
||||
val result = call()
|
||||
if (result != null)
|
||||
(resultFuture as SettableFuture<R>).set(result)
|
||||
return result
|
||||
try {
|
||||
val result = logic.call()
|
||||
if (result != null)
|
||||
_resultFuture?.set(result)
|
||||
return result
|
||||
} catch (e: Throwable) {
|
||||
_resultFuture?.setException(e)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable @Suppress("UNCHECKED_CAST")
|
||||
@ -325,10 +392,7 @@ class UntrustworthyData<T>(private val fromUntrustedWorld: T) {
|
||||
get() = fromUntrustedWorld
|
||||
|
||||
@Suppress("DEPRECATION")
|
||||
inline fun validate(validator: (T) -> Unit): T {
|
||||
validator(data)
|
||||
return data
|
||||
}
|
||||
inline fun <R> validate(validator: (T) -> R) = validator(data)
|
||||
}
|
||||
|
||||
// TODO: Clean this up
|
||||
|
65
src/main/kotlin/core/node/DataVendingService.kt
Normal file
65
src/main/kotlin/core/node/DataVendingService.kt
Normal file
@ -0,0 +1,65 @@
|
||||
/*
|
||||
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
|
||||
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
|
||||
* set forth therein.
|
||||
*
|
||||
* All other rights reserved.
|
||||
*/
|
||||
|
||||
package core.node
|
||||
|
||||
import core.StorageService
|
||||
import core.crypto.SecureHash
|
||||
import core.messaging.Message
|
||||
import core.messaging.MessagingService
|
||||
import core.messaging.SingleMessageRecipient
|
||||
import core.messaging.send
|
||||
import core.serialization.deserialize
|
||||
import core.utilities.loggerFor
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple
|
||||
* glue that sits between the network layer and the database layer.
|
||||
*
|
||||
* Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There
|
||||
* are no access control lists. If you want to keep some data private, then you must be careful who you give its name
|
||||
* to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have
|
||||
* its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as
|
||||
* such the hash of a piece of data can be seen as a type of password allowing access to it.
|
||||
*
|
||||
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class DataVendingService(private val net: MessagingService, private val storage: StorageService) {
|
||||
companion object {
|
||||
val TX_FETCH_TOPIC = "platform.fetch.tx"
|
||||
val CONTRACT_FETCH_TOPIC = "platform.fetch.contract"
|
||||
|
||||
val logger = loggerFor<DataVendingService>()
|
||||
}
|
||||
|
||||
init {
|
||||
net.addMessageHandler("$TX_FETCH_TOPIC.0") { msg, registration -> handleTXRequest(msg) }
|
||||
net.addMessageHandler("$CONTRACT_FETCH_TOPIC.0") { msg, registration -> handleContractRequest(msg) }
|
||||
}
|
||||
|
||||
// TODO: Give all messages a respond-to address+session ID automatically.
|
||||
data class Request(val hashes: List<SecureHash>, val responseTo: SingleMessageRecipient, val sessionID: Long)
|
||||
|
||||
private fun handleTXRequest(msg: Message) {
|
||||
val req = msg.data.deserialize<Request>()
|
||||
require(req.hashes.isNotEmpty())
|
||||
val answers = req.hashes.map {
|
||||
val tx = storage.validatedTransactions[it]
|
||||
if (tx == null)
|
||||
logger.info("Got request for unknown tx $it")
|
||||
tx
|
||||
}
|
||||
net.send("$TX_FETCH_TOPIC.${req.sessionID}", req.responseTo, answers)
|
||||
}
|
||||
|
||||
private fun handleContractRequest(msg: Message) {
|
||||
TODO("PLT-12: Basic module/sandbox system for contracts: $msg")
|
||||
}
|
||||
}
|
@ -59,6 +59,9 @@ class E2ETestWalletService(private val services: ServiceHub) : WalletService {
|
||||
return@map issuance.toSignedTransaction(true)
|
||||
}
|
||||
|
||||
// TODO: Centralise the process of transaction acceptance and filtering into the wallet, then move this out.
|
||||
services.storageService.validatedTransactions.putAll(transactions.associateBy { it.id })
|
||||
|
||||
val statesAndRefs = transactions.map {
|
||||
StateAndRef(it.tx.outputs[0] as OwnableState, StateRef(it.id, 0))
|
||||
}
|
||||
|
@ -9,7 +9,9 @@
|
||||
package core.node
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import contracts.*
|
||||
import core.*
|
||||
import core.crypto.SecureHash
|
||||
import core.crypto.generateKeyPair
|
||||
import core.messaging.*
|
||||
import core.serialization.deserialize
|
||||
@ -67,6 +69,22 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
|
||||
override val identityService: IdentityService get() = identity
|
||||
}
|
||||
|
||||
// TODO: This will be obsoleted by "PLT-12: Basic module/sandbox system for contracts"
|
||||
private val contractFactory = object : ContractFactory {
|
||||
private val contracts = mapOf(
|
||||
CASH_PROGRAM_ID to Cash::class.java,
|
||||
CP_PROGRAM_ID to CommercialPaper::class.java,
|
||||
CROWDFUND_PROGRAM_ID to CrowdFund::class.java,
|
||||
DUMMY_PROGRAM_ID to DummyContract::class.java
|
||||
)
|
||||
|
||||
override fun <T : Contract> get(hash: SecureHash): T {
|
||||
val c = contracts[hash] ?: throw UnknownContractException()
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
return c.newInstance() as T
|
||||
}
|
||||
}
|
||||
|
||||
val storage: StorageService
|
||||
val smm: StateMachineManager
|
||||
val net: ArtemisMessagingService
|
||||
@ -107,6 +125,10 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
|
||||
listOf(storage.myLegalIdentity)
|
||||
identity = FixedIdentityService(knownIdentities)
|
||||
|
||||
// This object doesn't need to be referenced from this class because it registers handlers on the network
|
||||
// service and so that keeps it from being collected.
|
||||
DataVendingService(net, storage)
|
||||
|
||||
net.start()
|
||||
}
|
||||
|
||||
@ -149,7 +171,7 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
|
||||
|
||||
log.info("Node owned by ${identity.name} starting up ...")
|
||||
|
||||
return object : StorageService {
|
||||
val ss = object : StorageService {
|
||||
private val tables = HashMap<String, MutableMap<Any, Any>>()
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
@ -160,9 +182,15 @@ class Node(val dir: Path, val myNetAddr: HostAndPort, val configuration: NodeCon
|
||||
}
|
||||
}
|
||||
|
||||
override val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
|
||||
get() = getMap("validated-transactions")
|
||||
|
||||
override val contractPrograms = contractFactory
|
||||
override val myLegalIdentity = identity
|
||||
override val myLegalIdentityKey = keypair
|
||||
}
|
||||
|
||||
return ss
|
||||
}
|
||||
|
||||
private fun alreadyRunningNodeCheck() {
|
||||
|
@ -13,10 +13,7 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import core.*
|
||||
import core.crypto.DigitalSignature
|
||||
import core.crypto.signWithECDSA
|
||||
import core.messaging.LegallyIdentifiableNode
|
||||
import core.messaging.MessageRecipients
|
||||
import core.messaging.MessagingService
|
||||
import core.messaging.ProtocolStateMachine
|
||||
import core.messaging.*
|
||||
import core.serialization.SerializedBytes
|
||||
import core.serialization.deserialize
|
||||
import core.serialization.serialize
|
||||
@ -39,9 +36,9 @@ class TimestampingMessages {
|
||||
*/
|
||||
@ThreadSafe
|
||||
class TimestamperNodeService(private val net: MessagingService,
|
||||
private val identity: Party,
|
||||
private val signingKey: KeyPair,
|
||||
private val clock: Clock = Clock.systemDefaultZone(),
|
||||
val identity: Party,
|
||||
val signingKey: KeyPair,
|
||||
val clock: Clock = Clock.systemDefaultZone(),
|
||||
val tolerance: Duration = 30.seconds) {
|
||||
companion object {
|
||||
val TIMESTAMPING_PROTOCOL_TOPIC = "platform.timestamping.request"
|
||||
@ -95,23 +92,41 @@ class TimestamperNodeService(private val net: MessagingService,
|
||||
}
|
||||
}
|
||||
|
||||
@ThreadSafe
|
||||
class TimestamperClient(private val psm: ProtocolStateMachine<*>, private val node: LegallyIdentifiableNode) : TimestamperService {
|
||||
override val identity: Party = node.identity
|
||||
/**
|
||||
* The TimestampingProtocol class is the client code that talks to a [TimestamperNodeService] on some remote node. It is a
|
||||
* [ProtocolLogic], meaning it can either be a sub-protocol of some other protocol, or be driven independently.
|
||||
*
|
||||
* If you are not yourself authoring a protocol and want to timestamp something, the [TimestampingProtocol.Client] class
|
||||
* implements the [TimestamperService] interface, meaning it can be passed to [TransactionBuilder.timestamp] to timestamp
|
||||
* the built transaction. Please be aware that this will block, meaning it should not be used on a thread that is handling
|
||||
* a network message: use it only from spare application threads that don't have to respond to anything.
|
||||
*/
|
||||
class TimestampingProtocol(private val node: LegallyIdentifiableNode,
|
||||
private val wtxBytes: SerializedBytes<WireTransaction>) : ProtocolLogic<DigitalSignature.LegallyIdentifiable>() {
|
||||
|
||||
class Client(private val stateMachineManager: StateMachineManager, private val node: LegallyIdentifiableNode) : TimestamperService {
|
||||
override val identity: Party = node.identity
|
||||
|
||||
override fun timestamp(wtxBytes: SerializedBytes<WireTransaction>): DigitalSignature.LegallyIdentifiable {
|
||||
return stateMachineManager.add("platform.timestamping", TimestampingProtocol(node, wtxBytes)).get()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Suspendable
|
||||
override fun timestamp(wtxBytes: SerializedBytes<WireTransaction>): DigitalSignature.LegallyIdentifiable {
|
||||
override fun call(): DigitalSignature.LegallyIdentifiable {
|
||||
val sessionID = random63BitValue()
|
||||
val replyTopic = "${TimestamperNodeService.TIMESTAMPING_PROTOCOL_TOPIC}.$sessionID"
|
||||
val req = TimestampingMessages.Request(wtxBytes, psm.serviceHub.networkService.myAddress, replyTopic)
|
||||
val req = TimestampingMessages.Request(wtxBytes, serviceHub.networkService.myAddress, replyTopic)
|
||||
|
||||
val maybeSignature = psm.sendAndReceive(TimestamperNodeService.TIMESTAMPING_PROTOCOL_TOPIC, node.address, 0,
|
||||
sessionID, req, DigitalSignature.LegallyIdentifiable::class.java)
|
||||
val maybeSignature = sendAndReceive<DigitalSignature.LegallyIdentifiable>(
|
||||
TimestamperNodeService.TIMESTAMPING_PROTOCOL_TOPIC, node.address, 0, sessionID, req)
|
||||
|
||||
// Check that the timestamping authority gave us back a valid signature and didn't break somehow
|
||||
val signature = maybeSignature.validate { it.verifyWithECDSA(wtxBytes) }
|
||||
|
||||
return signature
|
||||
maybeSignature.validate { sig ->
|
||||
sig.verifyWithECDSA(wtxBytes)
|
||||
return sig
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8,16 +8,15 @@
|
||||
|
||||
package core.node
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.net.HostAndPort
|
||||
import contracts.CommercialPaper
|
||||
import contracts.protocols.TwoPartyTradeProtocol
|
||||
import core.*
|
||||
import core.crypto.SecureHash
|
||||
import core.crypto.generateKeyPair
|
||||
import core.messaging.LegallyIdentifiableNode
|
||||
import core.messaging.ProtocolLogic
|
||||
import core.messaging.SingleMessageRecipient
|
||||
import core.messaging.runOnNextMessage
|
||||
import core.messaging.send
|
||||
import core.serialization.deserialize
|
||||
import core.utilities.BriefLogFormatter
|
||||
import core.utilities.Emoji
|
||||
@ -75,6 +74,12 @@ fun main(args: Array<String>) {
|
||||
val myNetAddr = HostAndPort.fromString(options.valueOf(networkAddressArg)).withDefaultPort(Node.DEFAULT_PORT)
|
||||
val listening = options.has(serviceFakeTradesArg)
|
||||
|
||||
if (listening && config.myLegalName != "Bank of Zurich") {
|
||||
println("The buyer node must have a legal name of 'Bank of Zurich'. Please edit the config file.")
|
||||
exitProcess(1)
|
||||
}
|
||||
|
||||
// The timestamping node runs in the same process as the buyer protocol is run.
|
||||
val timestamperId = if (options.has(timestamperIdentityFile)) {
|
||||
val addr = HostAndPort.fromString(options.valueOf(timestamperNetAddr)).withDefaultPort(Node.DEFAULT_PORT)
|
||||
val path = Paths.get(options.valueOf(timestamperIdentityFile))
|
||||
@ -84,94 +89,132 @@ fun main(args: Array<String>) {
|
||||
|
||||
val node = logElapsedTime("Node startup") { Node(dir, myNetAddr, config, timestamperId) }
|
||||
|
||||
// Now do some fake nonsense just to give us some activity.
|
||||
|
||||
(node.services.walletService as E2ETestWalletService).fillWithSomeTestCash(1000.DOLLARS)
|
||||
|
||||
val timestampingAuthority = node.services.networkMapService.timestampingNodes.first()
|
||||
if (listening) {
|
||||
// Wait around until a node asks to start a trade with us. In a real system, this part would happen out of band
|
||||
// via some other system like an exchange or maybe even a manual messaging system like Bloomberg. But for the
|
||||
// next stage in our building site, we will just auto-generate fake trades to give our nodes something to do.
|
||||
//
|
||||
// Note that currently, the two-party trade protocol doesn't actually resolve dependencies of transactions!
|
||||
// Thus, we can make up whatever junk we like and trade non-existent cash/assets: the other side won't notice.
|
||||
// Obviously, fixing that is the next step.
|
||||
//
|
||||
// As the seller initiates the DVP/two-party trade protocol, here, we will be the buyer.
|
||||
node.net.addMessageHandler("test.junktrade") { msg, handlerRegistration ->
|
||||
val replyTo = msg.data.deserialize<SingleMessageRecipient>(includeClassName = true)
|
||||
val buyerSessionID = random63BitValue()
|
||||
println("Got a new junk trade request, sending back session ID and starting buy protocol")
|
||||
val future = TwoPartyTradeProtocol.runBuyer(node.smm, timestampingAuthority, replyTo, 100.DOLLARS,
|
||||
CommercialPaper.State::class.java, buyerSessionID)
|
||||
|
||||
future success {
|
||||
println()
|
||||
println("Purchase complete - we are a happy customer! Final transaction is:")
|
||||
println()
|
||||
println(Emoji.renderIfSupported(it.tx))
|
||||
println()
|
||||
println("Waiting for another seller to connect. Or press Ctrl-C to shut me down.")
|
||||
} failure {
|
||||
println()
|
||||
println("Something went wrong whilst trading!")
|
||||
println()
|
||||
}
|
||||
|
||||
node.net.send("test.junktrade.initiate", replyTo, buyerSessionID)
|
||||
}
|
||||
println()
|
||||
println("Waiting for a seller to connect to us (run the other node) ...")
|
||||
println()
|
||||
node.smm.add("demo.buyer", TraderDemoProtocolBuyer()).get() // This thread will halt forever here.
|
||||
} else {
|
||||
// Grab a session ID for the fake trade from the other side, then kick off the seller and sell them some junk.
|
||||
if (!options.has(fakeTradeWithArg)) {
|
||||
println("Need the --fake-trade-with command line argument")
|
||||
exitProcess(1)
|
||||
}
|
||||
val peerAddr = HostAndPort.fromString(options.valuesOf(fakeTradeWithArg).single()).withDefaultPort(Node.DEFAULT_PORT)
|
||||
val otherSide = ArtemisMessagingService.makeRecipient(peerAddr)
|
||||
node.net.runOnNextMessage("test.junktrade.initiate") { msg ->
|
||||
val sessionID = msg.data.deserialize<Long>()
|
||||
|
||||
println("Got session ID back, now starting the sell protocol")
|
||||
|
||||
val cpOwnerKey = node.keyManagement.freshKey()
|
||||
val commercialPaper = makeFakeCommercialPaper(cpOwnerKey.public)
|
||||
|
||||
val future = TwoPartyTradeProtocol.runSeller(node.smm, timestampingAuthority,
|
||||
otherSide, commercialPaper, 100.DOLLARS, cpOwnerKey, sessionID)
|
||||
|
||||
future success {
|
||||
println()
|
||||
println("Sale completed - we have a happy customer!")
|
||||
println()
|
||||
println("Final transaction is")
|
||||
println()
|
||||
println(Emoji.renderIfSupported(it.tx))
|
||||
println()
|
||||
node.stop()
|
||||
} failure {
|
||||
println()
|
||||
println("Something went wrong whilst trading!")
|
||||
println()
|
||||
}
|
||||
}
|
||||
println()
|
||||
println("Sending a message to the listening/buying node ...")
|
||||
println()
|
||||
node.net.send("test.junktrade", otherSide, node.net.myAddress, includeClassName = true)
|
||||
node.smm.add("demo.seller", TraderDemoProtocolSeller(myNetAddr, otherSide)).get()
|
||||
node.stop()
|
||||
}
|
||||
}
|
||||
|
||||
fun makeFakeCommercialPaper(ownedBy: PublicKey): StateAndRef<CommercialPaper.State> {
|
||||
// Make a fake company that's issued its own paper.
|
||||
val party = Party("MegaCorp, Inc", generateKeyPair().public)
|
||||
// ownedBy here is the random key that gives us control over it.
|
||||
val paper = CommercialPaper.State(party.ref(1,2,3), ownedBy, 1100.DOLLARS, Instant.now() + 10.days)
|
||||
val randomRef = StateRef(SecureHash.randomSHA256(), 0)
|
||||
return StateAndRef(paper, randomRef)
|
||||
// We create a couple of ad-hoc test protocols that wrap the two party trade protocol, to give us the demo logic.
|
||||
|
||||
class TraderDemoProtocolBuyer() : ProtocolLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
// Give us some cash. Note that as nodes do not currently track forward pointers, we can spend the same cash over
|
||||
// and over again and the double spends will never be detected! Fixing that is the next step.
|
||||
(serviceHub.walletService as E2ETestWalletService).fillWithSomeTestCash(1500.DOLLARS)
|
||||
|
||||
while (true) {
|
||||
// Wait around until a node asks to start a trade with us. In a real system, this part would happen out of band
|
||||
// via some other system like an exchange or maybe even a manual messaging system like Bloomberg. But for the
|
||||
// next stage in our building site, we will just auto-generate fake trades to give our nodes something to do.
|
||||
//
|
||||
// As the seller initiates the DVP/two-party trade protocol, here, we will be the buyer.
|
||||
try {
|
||||
println()
|
||||
println("Waiting for a seller to connect to us!")
|
||||
|
||||
val hostname = receive<HostAndPort>("test.junktrade", 0).validate { it.withDefaultPort(Node.DEFAULT_PORT) }
|
||||
val newPartnerAddr = ArtemisMessagingService.makeRecipient(hostname)
|
||||
val sessionID = random63BitValue()
|
||||
println()
|
||||
println("Got a new junk trade request from $newPartnerAddr, sending back a fresh session ID and starting buy protocol")
|
||||
println()
|
||||
send("test.junktrade", newPartnerAddr, 0, sessionID)
|
||||
|
||||
val tsa = serviceHub.networkMapService.timestampingNodes[0]
|
||||
val buyer = TwoPartyTradeProtocol.Buyer(newPartnerAddr, tsa.identity, 1000.DOLLARS,
|
||||
CommercialPaper.State::class.java, sessionID)
|
||||
val tradeTX: SignedTransaction = subProtocol(buyer)
|
||||
|
||||
println()
|
||||
println("Purchase complete - we are a happy customer! Final transaction is:")
|
||||
println()
|
||||
println(Emoji.renderIfSupported(tradeTX.tx))
|
||||
println()
|
||||
println("Waiting for another seller to connect. Or press Ctrl-C to shut me down.")
|
||||
} catch(e: Exception) {
|
||||
println()
|
||||
println("Something went wrong whilst trading!")
|
||||
println()
|
||||
e.printStackTrace()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class TraderDemoProtocolSeller(val myAddress: HostAndPort,
|
||||
val otherSide: SingleMessageRecipient) : ProtocolLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
println()
|
||||
println("Announcing ourselves to the buyer node!")
|
||||
println()
|
||||
|
||||
val sessionID = sendAndReceive<Long>("test.junktrade", otherSide, 0, 0, myAddress).validate { it }
|
||||
|
||||
println()
|
||||
println("Got session ID back, issuing and timestamping some commercial paper")
|
||||
|
||||
val tsa = serviceHub.networkMapService.timestampingNodes[0]
|
||||
val cpOwnerKey = serviceHub.keyManagementService.freshKey()
|
||||
val commercialPaper = makeFakeCommercialPaper(cpOwnerKey.public, tsa)
|
||||
|
||||
println()
|
||||
println("Timestamped my commercial paper issuance, starting the trade protocol.")
|
||||
|
||||
val seller = TwoPartyTradeProtocol.Seller(otherSide, tsa, commercialPaper, 1000.DOLLARS, cpOwnerKey, sessionID)
|
||||
val tradeTX: SignedTransaction = subProtocol(seller)
|
||||
|
||||
println()
|
||||
println("Sale completed - we have a happy customer!")
|
||||
println()
|
||||
println("Final transaction is")
|
||||
println()
|
||||
println(Emoji.renderIfSupported(tradeTX.tx))
|
||||
println()
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
fun makeFakeCommercialPaper(ownedBy: PublicKey, tsa: LegallyIdentifiableNode): StateAndRef<CommercialPaper.State> {
|
||||
// Make a fake company that's issued its own paper.
|
||||
val keyPair = generateKeyPair()
|
||||
val party = Party("MegaCorp, Inc", keyPair.public)
|
||||
|
||||
val issuance = run {
|
||||
val tx = CommercialPaper().generateIssue(party.ref(1,2,3), 1100.DOLLARS, Instant.now() + 10.days)
|
||||
|
||||
tx.setTime(Instant.now(), tsa.identity, 30.seconds)
|
||||
val tsaSig = subProtocol(TimestampingProtocol(tsa, tx.toWireTransaction().serialized))
|
||||
tx.checkAndAddSignature(tsaSig)
|
||||
|
||||
tx.signWith(keyPair)
|
||||
tx.toSignedTransaction(true)
|
||||
}
|
||||
|
||||
val move = run {
|
||||
val tx = TransactionBuilder()
|
||||
CommercialPaper().generateMove(tx, issuance.tx.outRef(0), ownedBy)
|
||||
tx.signWith(keyPair)
|
||||
tx.toSignedTransaction(true)
|
||||
}
|
||||
|
||||
with(serviceHub.storageService) {
|
||||
validatedTransactions[issuance.id] = issuance
|
||||
validatedTransactions[move.id] = move
|
||||
}
|
||||
|
||||
return move.tx.outRef(0)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private fun loadConfigFile(configFile: Path): NodeConfiguration {
|
||||
@ -205,22 +248,15 @@ private fun loadConfigFile(configFile: Path): NodeConfiguration {
|
||||
private fun createDefaultConfigFile(configFile: Path?, defaultLegalName: String) {
|
||||
Files.write(configFile,
|
||||
"""
|
||||
# Node configuration: adjust below as needed, then delete this comment.
|
||||
# Node configuration: give the buyer node the name 'Bank of Zurich' (no quotes)
|
||||
# The seller node can be named whatever you like.
|
||||
|
||||
myLegalName = $defaultLegalName
|
||||
""".trimIndent().toByteArray())
|
||||
}
|
||||
|
||||
private fun printHelp() {
|
||||
println("""
|
||||
|
||||
To run the listening node, alias "alpha" to "localhost" in your
|
||||
/etc/hosts file and then try a command line like this:
|
||||
|
||||
--dir=alpha --service-fake-trades --network-address=alpha
|
||||
|
||||
To run the node that initiates a trade, alias "beta" to "localhost"
|
||||
in your /etc/hosts file and then try a command line like this:
|
||||
|
||||
--dir=beta --fake-trade-with=alpha --network-address=beta:31338 --timestamper-identity-file=alpha/identity-public --timestamper-address=alpha
|
||||
Please refer to the documentation in docs/build/index.html to learn how to run the demo.
|
||||
""".trimIndent())
|
||||
}
|
||||
|
@ -71,12 +71,14 @@ class BriefLogFormatter : Formatter() {
|
||||
loggerRefs.add(logger)
|
||||
}
|
||||
|
||||
fun initVerbose(packageSpec: String = "") {
|
||||
fun initVerbose(vararg packages: String) {
|
||||
init()
|
||||
loggerRefs[0].handlers[0].level = Level.ALL
|
||||
val logger = Logger.getLogger(packageSpec)
|
||||
logger.level = Level.ALL
|
||||
loggerRefs.add(logger)
|
||||
for (spec in packages) {
|
||||
val logger = Logger.getLogger(spec)
|
||||
logger.level = Level.ALL
|
||||
loggerRefs.add(logger)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,12 +15,15 @@ import core.crypto.signWithECDSA
|
||||
import core.messaging.MessagingService
|
||||
import core.messaging.MockNetworkMap
|
||||
import core.messaging.NetworkMap
|
||||
import core.node.DataVendingService
|
||||
import core.node.TimestampingError
|
||||
import core.serialization.SerializedBytes
|
||||
import core.serialization.deserialize
|
||||
import core.testutils.RecordingMap
|
||||
import core.testutils.TEST_KEYS_TO_CORP_MAP
|
||||
import core.testutils.TEST_PROGRAM_MAP
|
||||
import core.testutils.TEST_TX_TIME
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.security.KeyPair
|
||||
import java.security.PrivateKey
|
||||
import java.security.PublicKey
|
||||
@ -65,16 +68,27 @@ class MockWalletService(val states: List<StateAndRef<OwnableState>>) : WalletSer
|
||||
}
|
||||
|
||||
@ThreadSafe
|
||||
class MockStorageService : StorageService {
|
||||
class MockStorageService(val recordingAs: Map<String, String>? = null) : StorageService {
|
||||
override val myLegalIdentityKey: KeyPair = generateKeyPair()
|
||||
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public)
|
||||
|
||||
private val tables = HashMap<String, MutableMap<Any, Any>>()
|
||||
|
||||
override val validatedTransactions: MutableMap<SecureHash, SignedTransaction>
|
||||
get() = getMap("validated-transactions")
|
||||
|
||||
override val contractPrograms = MockContractFactory
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
override fun <K, V> getMap(tableName: String): MutableMap<K, V> {
|
||||
synchronized(tables) {
|
||||
return tables.getOrPut(tableName) { Collections.synchronizedMap(HashMap<Any, Any>()) } as MutableMap<K, V>
|
||||
return tables.getOrPut(tableName) {
|
||||
val map = Collections.synchronizedMap(HashMap<Any, Any>())
|
||||
if (recordingAs != null && recordingAs[tableName] != null)
|
||||
RecordingMap(map, LoggerFactory.getLogger("recordingmap.${recordingAs[tableName]}"))
|
||||
else
|
||||
map
|
||||
} as MutableMap<K, V>
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -107,4 +121,12 @@ class MockServices(
|
||||
get() = networkMap ?: throw UnsupportedOperationException()
|
||||
override val storageService: StorageService
|
||||
get() = storage ?: throw UnsupportedOperationException()
|
||||
|
||||
init {
|
||||
if (net != null && storage != null) {
|
||||
// Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener
|
||||
// on the networking service, so that will keep it from being collected.
|
||||
DataVendingService(net, storage)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -48,9 +48,13 @@ open class TestWithInMemoryNetwork {
|
||||
// Keep calling "pump" in rounds until every node in the network reports that it had nothing to do
|
||||
fun <T> runNetwork(body: () -> T): T {
|
||||
val result = body()
|
||||
while (pumpAll(false).any { it }) {}
|
||||
runNetwork()
|
||||
return result
|
||||
}
|
||||
|
||||
fun runNetwork() {
|
||||
while (pumpAll(false).any { it }) {}
|
||||
}
|
||||
}
|
||||
|
||||
class InMemoryMessagingTests : TestWithInMemoryNetwork() {
|
||||
|
@ -187,11 +187,11 @@ class InMemoryNetwork {
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
running = false
|
||||
if (backgroundThread != null) {
|
||||
backgroundThread.interrupt()
|
||||
backgroundThread.join()
|
||||
}
|
||||
running = false
|
||||
netNodeHasShutdown(handle)
|
||||
}
|
||||
|
||||
@ -221,10 +221,7 @@ class InMemoryNetwork {
|
||||
|
||||
private fun pumpInternal(block: Boolean): Boolean {
|
||||
val q = getQueueForHandle(handle)
|
||||
val message = if (block) q.take() else q.poll()
|
||||
|
||||
if (message == null)
|
||||
return false
|
||||
val message = (if (block) q.take() else q.poll()) ?: return false
|
||||
|
||||
val deliverTo = state.locked {
|
||||
val h = handlers.filter { if (it.topic.isBlank()) true else message.topic == it.topic }
|
||||
|
@ -13,16 +13,19 @@ import contracts.Cash
|
||||
import contracts.CommercialPaper
|
||||
import contracts.protocols.TwoPartyTradeProtocol
|
||||
import core.*
|
||||
import core.crypto.SecureHash
|
||||
import core.testutils.*
|
||||
import core.utilities.BriefLogFormatter
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.ExecutionException
|
||||
import java.util.concurrent.Executors
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
// TODO: Refactor this test some more to cut down on messy setup duplication.
|
||||
|
||||
/**
|
||||
* In this example, Alice wishes to sell her commercial paper to Bob in return for $1,000,000 and they wish to do
|
||||
* it on the ledger atomically. Therefore they must work together to build a transaction.
|
||||
@ -30,30 +33,20 @@ import kotlin.test.assertTrue
|
||||
* We assume that Alice and Bob already found each other via some market, and have agreed the details already.
|
||||
*/
|
||||
class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
|
||||
lateinit var backgroundThread: ExecutorService
|
||||
|
||||
@Before
|
||||
fun before() {
|
||||
backgroundThread = Executors.newSingleThreadExecutor()
|
||||
BriefLogFormatter.initVerbose("platform.trade")
|
||||
}
|
||||
|
||||
@After
|
||||
fun after() {
|
||||
backgroundThread.shutdown()
|
||||
BriefLogFormatter.initVerbose("platform.trade", "core.TransactionGroup", "recordingmap")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun cashForCP() {
|
||||
fun `trade cash for commercial paper`() {
|
||||
// We run this in parallel threads to help catch any race conditions that may exist. The other tests
|
||||
// we run in the unit test thread exclusively to speed things up, ensure deterministic results and
|
||||
// allow interruption half way through.
|
||||
val backgroundThread = Executors.newSingleThreadExecutor()
|
||||
transactionGroupFor<ContractState> {
|
||||
// Bob (Buyer) has some cash, Alice (Seller) has some commercial paper she wants to sell to Bob.
|
||||
roots {
|
||||
transaction(CommercialPaper.State(MEGA_CORP.ref(1, 2, 3), ALICE, 1200.DOLLARS, TEST_TX_TIME + 7.days) label "alice's paper")
|
||||
transaction(800.DOLLARS.CASH `owned by` BOB label "bob cash1")
|
||||
transaction(300.DOLLARS.CASH `owned by` BOB label "bob cash2")
|
||||
}
|
||||
|
||||
val bobsWallet = listOf<StateAndRef<Cash.State>>(lookup("bob cash1"), lookup("bob cash2"))
|
||||
val (bobsWallet, bobsFakeCash) = fillUpForBuyer(false)
|
||||
val alicesFakePaper = fillUpForSeller(false).second
|
||||
|
||||
val (alicesAddress, alicesNode) = makeNode(inBackground = true)
|
||||
val (bobsAddress, bobsNode) = makeNode(inBackground = true)
|
||||
@ -61,10 +54,13 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
|
||||
|
||||
val alicesServices = MockServices(net = alicesNode)
|
||||
val bobsServices = MockServices(
|
||||
wallet = MockWalletService(bobsWallet),
|
||||
wallet = MockWalletService(bobsWallet.states),
|
||||
keyManagement = MockKeyManagementService(mapOf(BOB to BOB_KEY.private)),
|
||||
net = bobsNode
|
||||
net = bobsNode,
|
||||
storage = MockStorageService()
|
||||
)
|
||||
loadFakeTxnsIntoStorage(bobsFakeCash, bobsServices.storageService)
|
||||
loadFakeTxnsIntoStorage(alicesFakePaper, alicesServices.storageService)
|
||||
|
||||
val buyerSessionID = random63BitValue()
|
||||
|
||||
@ -91,33 +87,30 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
|
||||
txns.add(aliceResult.get().tx)
|
||||
verify()
|
||||
}
|
||||
backgroundThread.shutdown()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun serializeAndRestore() {
|
||||
fun `shut down and restore`() {
|
||||
transactionGroupFor<ContractState> {
|
||||
// Buyer Bob has some cash, Seller Alice has some commercial paper she wants to sell to Bob.
|
||||
roots {
|
||||
transaction(CommercialPaper.State(MEGA_CORP.ref(1, 2, 3), ALICE, 1200.DOLLARS, TEST_TX_TIME + 7.days) label "alice's paper")
|
||||
transaction(800.DOLLARS.CASH `owned by` BOB label "bob cash1")
|
||||
transaction(300.DOLLARS.CASH `owned by` BOB label "bob cash2")
|
||||
}
|
||||
val (wallet, bobsFakeCash) = fillUpForBuyer(false)
|
||||
val alicesFakePaper = fillUpForSeller(false).second
|
||||
|
||||
val bobsWallet = listOf<StateAndRef<Cash.State>>(lookup("bob cash1"), lookup("bob cash2"))
|
||||
|
||||
val (alicesAddress, alicesNode) = makeNode(inBackground = false)
|
||||
var (bobsAddress, bobsNode) = makeNode(inBackground = false)
|
||||
val (alicesAddress, alicesNode) = makeNode()
|
||||
var (bobsAddress, bobsNode) = makeNode()
|
||||
val timestamper = network.setupTimestampingNode(true)
|
||||
|
||||
val bobsStorage = MockStorageService()
|
||||
|
||||
val alicesServices = MockServices(wallet = null, keyManagement = null, net = alicesNode)
|
||||
var bobsServices = MockServices(
|
||||
wallet = MockWalletService(bobsWallet),
|
||||
wallet = MockWalletService(wallet.states),
|
||||
keyManagement = MockKeyManagementService(mapOf(BOB to BOB_KEY.private)),
|
||||
net = bobsNode,
|
||||
storage = bobsStorage
|
||||
)
|
||||
loadFakeTxnsIntoStorage(bobsFakeCash, bobsStorage)
|
||||
loadFakeTxnsIntoStorage(alicesFakePaper, alicesServices.storageService)
|
||||
|
||||
val smmBuyer = StateMachineManager(bobsServices, MoreExecutors.directExecutor())
|
||||
|
||||
@ -149,6 +142,12 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
|
||||
// Seller Alice already sent a message to Buyer Bob. Pump once:
|
||||
bobsNode.pump(false)
|
||||
|
||||
// Bob sends a couple of queries for the dependencies back to Alice. Alice reponds.
|
||||
alicesNode.pump(false)
|
||||
bobsNode.pump(false)
|
||||
alicesNode.pump(false)
|
||||
bobsNode.pump(false)
|
||||
|
||||
// OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature.
|
||||
// Save the state machine to "disk" (i.e. a variable, here)
|
||||
assertEquals(1, bobsStorage.getMap<Any, Any>("state machines").size)
|
||||
@ -156,10 +155,8 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
|
||||
// .. and let's imagine that Bob's computer has a power cut. He now has nothing now beyond what was on disk.
|
||||
bobsNode.stop()
|
||||
|
||||
// Alice doesn't know that and carries on: first timestamping and then sending Bob the now finalised
|
||||
// transaction. Alice sends a message to a node that has gone offline.
|
||||
assertTrue(alicesNode.pump(false))
|
||||
assertTrue(timestamper.second.pump(false))
|
||||
// Alice doesn't know that and carries on: she wants to know about the cash transactions he's trying to use.
|
||||
// She will wait around until Bob comes back.
|
||||
assertTrue(alicesNode.pump(false))
|
||||
|
||||
// ... bring the node back up ... the act of constructing the SMM will re-register the message handlers
|
||||
@ -171,18 +168,264 @@ class TwoPartyTradeProtocolTests : TestWithInMemoryNetwork() {
|
||||
)
|
||||
|
||||
// Find the future representing the result of this state machine again.
|
||||
assertEquals(1, smm.stateMachines.size)
|
||||
var bobFuture = smm.stateMachines.filterIsInstance<TwoPartyTradeProtocol.Buyer>().first().resultFuture
|
||||
var bobFuture = smm.findStateMachines(TwoPartyTradeProtocol.Buyer::class.java).single().second
|
||||
|
||||
// Let Bob process his mailbox.
|
||||
assertTrue(bobsNode.pump(false))
|
||||
// And off we go again.
|
||||
runNetwork()
|
||||
|
||||
// Bob is now finished and has the same transaction as Alice.
|
||||
val stx = bobFuture.get()
|
||||
txns.add(stx.tx)
|
||||
verify()
|
||||
|
||||
assertTrue(smm.stateMachines.isEmpty())
|
||||
assertTrue(smm.findStateMachines(TwoPartyTradeProtocol.Buyer::class.java).isEmpty())
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `check dependencies of the sale asset are resolved`() {
|
||||
transactionGroupFor<ContractState> {
|
||||
val (bobsWallet, bobsFakeCash) = fillUpForBuyer(false)
|
||||
val alicesFakePaper = fillUpForSeller(false).second
|
||||
|
||||
val (alicesAddress, alicesNode) = makeNode()
|
||||
val (bobsAddress, bobsNode) = makeNode()
|
||||
val timestamper = network.setupTimestampingNode(true).first
|
||||
|
||||
val alicesServices = MockServices(
|
||||
net = alicesNode,
|
||||
storage = MockStorageService(mapOf("validated-transactions" to "alice"))
|
||||
)
|
||||
val bobsServices = MockServices(
|
||||
wallet = MockWalletService(bobsWallet.states),
|
||||
keyManagement = MockKeyManagementService(mapOf(BOB to BOB_KEY.private)),
|
||||
net = bobsNode,
|
||||
storage = MockStorageService(mapOf("validated-transactions" to "bob"))
|
||||
)
|
||||
val bobsSignedTxns = loadFakeTxnsIntoStorage(bobsFakeCash, bobsServices.storageService)
|
||||
val alicesSignedTxns = loadFakeTxnsIntoStorage(alicesFakePaper, alicesServices.storageService)
|
||||
|
||||
val buyerSessionID = random63BitValue()
|
||||
|
||||
TwoPartyTradeProtocol.runSeller(
|
||||
StateMachineManager(alicesServices, RunOnCallerThread),
|
||||
timestamper,
|
||||
bobsAddress,
|
||||
lookup("alice's paper"),
|
||||
1000.DOLLARS,
|
||||
ALICE_KEY,
|
||||
buyerSessionID
|
||||
)
|
||||
TwoPartyTradeProtocol.runBuyer(
|
||||
StateMachineManager(bobsServices, RunOnCallerThread),
|
||||
timestamper,
|
||||
alicesAddress,
|
||||
1000.DOLLARS,
|
||||
CommercialPaper.State::class.java,
|
||||
buyerSessionID
|
||||
)
|
||||
|
||||
runNetwork()
|
||||
|
||||
run {
|
||||
val records = (bobsServices.storageService.validatedTransactions as RecordingMap).records
|
||||
// Check Bobs's database accesses as Bob's cash transactions are downloaded by Alice.
|
||||
val expected = listOf(
|
||||
// Buyer Bob is told about Alice's commercial paper, but doesn't know it ..
|
||||
RecordingMap.Get(alicesFakePaper[0].id),
|
||||
// He asks and gets the tx, validates it, sees it's a self issue with no dependencies, stores.
|
||||
RecordingMap.Put(alicesFakePaper[0].id, alicesSignedTxns.values.first()),
|
||||
// Alice gets Bob's proposed transaction and doesn't know his two cash states. She asks, Bob answers.
|
||||
RecordingMap.Get(bobsFakeCash[1].id),
|
||||
RecordingMap.Get(bobsFakeCash[2].id),
|
||||
// Alice notices that Bob's cash txns depend on a third tx she also doesn't know. She asks, Bob answers.
|
||||
RecordingMap.Get(bobsFakeCash[0].id)
|
||||
)
|
||||
assertEquals(expected, records)
|
||||
}
|
||||
|
||||
// And from Alice's perspective ...
|
||||
run {
|
||||
val records = (alicesServices.storageService.validatedTransactions as RecordingMap).records
|
||||
val expected = listOf(
|
||||
// Seller Alice sends her seller info to Bob, who wants to check the asset for sale.
|
||||
// He requests, Alice looks up in her DB to send the tx to Bob
|
||||
RecordingMap.Get(alicesFakePaper[0].id),
|
||||
// Seller Alice gets a proposed tx which depends on Bob's two cash txns and her own tx.
|
||||
RecordingMap.Get(bobsFakeCash[1].id),
|
||||
RecordingMap.Get(bobsFakeCash[2].id),
|
||||
RecordingMap.Get(alicesFakePaper[0].id),
|
||||
// Alice notices that Bob's cash txns depend on a third tx she also doesn't know.
|
||||
RecordingMap.Get(bobsFakeCash[0].id),
|
||||
// Bob answers with the transactions that are now all verifiable, as Alice bottomed out.
|
||||
// Bob's transactions are valid, so she commits to the database
|
||||
RecordingMap.Put(bobsFakeCash[1].id, bobsSignedTxns[bobsFakeCash[1].id]),
|
||||
RecordingMap.Put(bobsFakeCash[2].id, bobsSignedTxns[bobsFakeCash[2].id]),
|
||||
RecordingMap.Put(bobsFakeCash[0].id, bobsSignedTxns[bobsFakeCash[0].id]),
|
||||
// Now she verifies the transaction is contract-valid (not signature valid) which means
|
||||
// looking up the states again.
|
||||
RecordingMap.Get(bobsFakeCash[1].id),
|
||||
RecordingMap.Get(bobsFakeCash[2].id),
|
||||
RecordingMap.Get(alicesFakePaper[0].id)
|
||||
)
|
||||
assertEquals(expected, records)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `dependency with error on buyer side`() {
|
||||
transactionGroupFor<ContractState> {
|
||||
val (bobsWallet, fakeBobCash) = fillUpForBuyer(withError = true)
|
||||
val fakeAlicePaper = fillUpForSeller(false).second
|
||||
|
||||
val (alicesAddress, alicesNode) = makeNode()
|
||||
val (bobsAddress, bobsNode) = makeNode()
|
||||
val timestamper = network.setupTimestampingNode(true).first
|
||||
|
||||
val alicesServices = MockServices(net = alicesNode)
|
||||
val bobsServices = MockServices(
|
||||
wallet = MockWalletService(bobsWallet.states),
|
||||
keyManagement = MockKeyManagementService(mapOf(BOB to BOB_KEY.private)),
|
||||
net = bobsNode,
|
||||
storage = MockStorageService(mapOf("validated-transactions" to "bob"))
|
||||
)
|
||||
loadFakeTxnsIntoStorage(fakeBobCash, bobsServices.storageService)
|
||||
loadFakeTxnsIntoStorage(fakeAlicePaper, alicesServices.storageService)
|
||||
|
||||
val buyerSessionID = random63BitValue()
|
||||
|
||||
val aliceResult = TwoPartyTradeProtocol.runSeller(
|
||||
StateMachineManager(alicesServices, RunOnCallerThread),
|
||||
timestamper,
|
||||
bobsAddress,
|
||||
lookup("alice's paper"),
|
||||
1000.DOLLARS,
|
||||
ALICE_KEY,
|
||||
buyerSessionID
|
||||
)
|
||||
TwoPartyTradeProtocol.runBuyer(
|
||||
StateMachineManager(bobsServices, RunOnCallerThread),
|
||||
timestamper,
|
||||
alicesAddress,
|
||||
1000.DOLLARS,
|
||||
CommercialPaper.State::class.java,
|
||||
buyerSessionID
|
||||
)
|
||||
|
||||
runNetwork()
|
||||
|
||||
val e = assertFailsWith<ExecutionException> {
|
||||
aliceResult.get()
|
||||
}
|
||||
assertTrue(e.cause is TransactionVerificationException)
|
||||
assertTrue(e.cause!!.cause!!.message!!.contains("at least one cash input"))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `dependency with error on seller side`() {
|
||||
transactionGroupFor<ContractState> {
|
||||
val (bobsWallet, fakeBobCash) = fillUpForBuyer(withError = false)
|
||||
val fakeAlicePaper = fillUpForSeller(withError = true).second
|
||||
|
||||
val (alicesAddress, alicesNode) = makeNode()
|
||||
val (bobsAddress, bobsNode) = makeNode()
|
||||
val timestamper = network.setupTimestampingNode(true).first
|
||||
|
||||
val alicesServices = MockServices(net = alicesNode)
|
||||
val bobsServices = MockServices(
|
||||
wallet = MockWalletService(bobsWallet.states),
|
||||
keyManagement = MockKeyManagementService(mapOf(BOB to BOB_KEY.private)),
|
||||
net = bobsNode,
|
||||
storage = MockStorageService(mapOf("validated-transactions" to "bob"))
|
||||
)
|
||||
loadFakeTxnsIntoStorage(fakeBobCash, bobsServices.storageService)
|
||||
loadFakeTxnsIntoStorage(fakeAlicePaper, alicesServices.storageService)
|
||||
|
||||
val buyerSessionID = random63BitValue()
|
||||
|
||||
TwoPartyTradeProtocol.runSeller(
|
||||
StateMachineManager(alicesServices, RunOnCallerThread),
|
||||
timestamper,
|
||||
bobsAddress,
|
||||
lookup("alice's paper"),
|
||||
1000.DOLLARS,
|
||||
ALICE_KEY,
|
||||
buyerSessionID
|
||||
)
|
||||
val bobResult = TwoPartyTradeProtocol.runBuyer(
|
||||
StateMachineManager(bobsServices, RunOnCallerThread),
|
||||
timestamper,
|
||||
alicesAddress,
|
||||
1000.DOLLARS,
|
||||
CommercialPaper.State::class.java,
|
||||
buyerSessionID
|
||||
)
|
||||
|
||||
runNetwork()
|
||||
|
||||
val e = assertFailsWith<ExecutionException> {
|
||||
bobResult.get()
|
||||
}
|
||||
assertTrue(e.cause is TransactionVerificationException)
|
||||
assertTrue(e.cause!!.cause!!.message!!.contains("must be timestamped"))
|
||||
}
|
||||
}
|
||||
|
||||
private fun TransactionGroupDSL<ContractState>.loadFakeTxnsIntoStorage(wtxToSign: List<WireTransaction>,
|
||||
ss: StorageService): Map<SecureHash, SignedTransaction> {
|
||||
val txStorage = ss.validatedTransactions
|
||||
val map = signAll(wtxToSign).associateBy { it.id }
|
||||
if (txStorage is RecordingMap) {
|
||||
txStorage.putAllUnrecorded(map)
|
||||
} else
|
||||
txStorage.putAll(map)
|
||||
return map
|
||||
}
|
||||
|
||||
private fun TransactionGroupDSL<ContractState>.fillUpForBuyer(withError: Boolean): Pair<Wallet, List<WireTransaction>> {
|
||||
// Bob (Buyer) has some cash he got from the Bank of Elbonia, Alice (Seller) has some commercial paper she
|
||||
// wants to sell to Bob.
|
||||
|
||||
val eb1 = transaction {
|
||||
// Issued money to itself.
|
||||
output("elbonian money 1") { 800.DOLLARS.CASH `issued by` MEGA_CORP `owned by` MEGA_CORP_PUBKEY }
|
||||
output("elbonian money 2") { 1000.DOLLARS.CASH `issued by` MEGA_CORP `owned by` MEGA_CORP_PUBKEY }
|
||||
if (!withError)
|
||||
arg(MEGA_CORP_PUBKEY) { Cash.Commands.Issue() }
|
||||
timestamp(TEST_TX_TIME)
|
||||
}
|
||||
|
||||
// Bob gets some cash onto the ledger from BoE
|
||||
val bc1 = transaction {
|
||||
input("elbonian money 1")
|
||||
output("bob cash 1") { 800.DOLLARS.CASH `issued by` MEGA_CORP `owned by` BOB }
|
||||
arg(MEGA_CORP_PUBKEY) { Cash.Commands.Move() }
|
||||
}
|
||||
|
||||
val bc2 = transaction {
|
||||
input("elbonian money 2")
|
||||
output("bob cash 2") { 300.DOLLARS.CASH `issued by` MEGA_CORP `owned by` BOB }
|
||||
output { 700.DOLLARS.CASH `issued by` MEGA_CORP `owned by` MEGA_CORP_PUBKEY } // Change output.
|
||||
arg(MEGA_CORP_PUBKEY) { Cash.Commands.Move() }
|
||||
}
|
||||
|
||||
val wallet = Wallet(listOf<StateAndRef<Cash.State>>(lookup("bob cash 1"), lookup("bob cash 2")))
|
||||
return Pair(wallet, listOf(eb1, bc1, bc2))
|
||||
}
|
||||
|
||||
private fun TransactionGroupDSL<ContractState>.fillUpForSeller(withError: Boolean): Pair<Wallet, List<WireTransaction>> {
|
||||
val ap = transaction {
|
||||
output("alice's paper") {
|
||||
CommercialPaper.State(MEGA_CORP.ref(1, 2, 3), ALICE, 1200.DOLLARS, TEST_TX_TIME + 7.days)
|
||||
}
|
||||
arg(MEGA_CORP_PUBKEY) { CommercialPaper.Commands.Issue() }
|
||||
if (!withError)
|
||||
timestamp(TEST_TX_TIME)
|
||||
}
|
||||
|
||||
val wallet = Wallet(listOf<StateAndRef<Cash.State>>(lookup("alice's paper")))
|
||||
return Pair(wallet, listOf(ap))
|
||||
}
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ import kotlin.test.assertEquals
|
||||
|
||||
class E2ETestWalletServiceTest {
|
||||
val services: ServiceHub = MockServices(
|
||||
keyManagement = MockKeyManagementService(emptyMap(), arrayListOf<KeyPair>(ALICE_KEY, ALICE_KEY, ALICE_KEY))
|
||||
keyManagement = MockKeyManagementService(emptyMap(), arrayListOf<KeyPair>(ALICE_KEY, ALICE_KEY, ALICE_KEY))
|
||||
)
|
||||
|
||||
@Test fun splits() {
|
||||
|
@ -58,10 +58,9 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
service = TimestamperNodeService(serviceNode.second, Party("Unit test suite", ALICE), ALICE_KEY)
|
||||
}
|
||||
|
||||
class TestPSM(val server: LegallyIdentifiableNode, val now: Instant) : ProtocolStateMachine<Boolean>() {
|
||||
class TestPSM(val server: LegallyIdentifiableNode, val now: Instant) : ProtocolLogic<Boolean>() {
|
||||
@Suspendable
|
||||
override fun call(): Boolean {
|
||||
val client = TimestamperClient(this, server)
|
||||
val ptx = TransactionBuilder().apply {
|
||||
addInputState(StateRef(SecureHash.randomSHA256(), 0))
|
||||
addOutputState(100.DOLLARS.CASH)
|
||||
@ -69,7 +68,7 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
ptx.addCommand(TimestampCommand(now - 20.seconds, now + 20.seconds), server.identity.owningKey)
|
||||
val wtx = ptx.toWireTransaction()
|
||||
// This line will invoke sendAndReceive to interact with the network.
|
||||
val sig = client.timestamp(wtx.serialize())
|
||||
val sig = subProtocol(TimestampingProtocol(server, wtx.serialized))
|
||||
ptx.checkAndAddSignature(sig)
|
||||
return true
|
||||
}
|
||||
@ -82,7 +81,6 @@ class TimestamperNodeServiceTest : TestWithInMemoryNetwork() {
|
||||
val logName = TimestamperNodeService.TIMESTAMPING_PROTOCOL_TOPIC
|
||||
val psm = TestPSM(mockServices.networkMapService.timestampingNodes[0], clock.instant())
|
||||
smm.add(logName, psm)
|
||||
psm
|
||||
}
|
||||
assertTrue(psm.isDone)
|
||||
}
|
||||
|
66
src/test/kotlin/core/testutils/RecordingMap.kt
Normal file
66
src/test/kotlin/core/testutils/RecordingMap.kt
Normal file
@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
|
||||
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
|
||||
* set forth therein.
|
||||
*
|
||||
* All other rights reserved.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
|
||||
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
|
||||
* set forth therein.
|
||||
*
|
||||
* All other rights reserved.
|
||||
*/
|
||||
|
||||
package core.testutils
|
||||
|
||||
import core.utilities.loggerFor
|
||||
import org.slf4j.Logger
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* A RecordingMap wraps a regular Map<K, V> and records the sequence of gets and puts to it. This is useful in
|
||||
* white box unit tests to ensure that code is accessing a data store as much as you expect.
|
||||
*
|
||||
* Note: although this class itself thread safe, if the underlying map is not, then this class loses its thread safety.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class RecordingMap<K, V>(private val wrappedMap: MutableMap<K, V>,
|
||||
private val logger: Logger = loggerFor<RecordingMap<K, V>>()) : MutableMap<K, V> by wrappedMap {
|
||||
// If/when Kotlin supports data classes inside sealed classes, that would be preferable to this.
|
||||
interface Record
|
||||
data class Get<K>(val key: K) : Record
|
||||
data class Put<K, V>(val key: K, val value: V) : Record
|
||||
|
||||
private val _records = Collections.synchronizedList(ArrayList<Record>())
|
||||
|
||||
/** Returns a snapshot of the set of records */
|
||||
val records: List<Record> get() = _records.toList()
|
||||
|
||||
fun clearRecords() = _records.clear()
|
||||
|
||||
override fun get(key: K): V? {
|
||||
_records.add(Get(key))
|
||||
logger.trace("GET ${logger.name} : $key = ${wrappedMap[key]}")
|
||||
return wrappedMap[key]
|
||||
}
|
||||
|
||||
override fun put(key: K, value: V): V? {
|
||||
_records.add(Put(key, value))
|
||||
logger.trace("PUT ${logger.name} : $key = $value")
|
||||
return wrappedMap.put(key, value)
|
||||
}
|
||||
|
||||
override fun putAll(from: Map<out K, V>) {
|
||||
for ((k, v) in from) {
|
||||
put(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
fun putAllUnrecorded(from: Map<out K, V>) {
|
||||
wrappedMap.putAll(from)
|
||||
}
|
||||
}
|
@ -15,6 +15,7 @@ import core.*
|
||||
import core.crypto.*
|
||||
import core.serialization.serialize
|
||||
import core.visualiser.GraphVisualiser
|
||||
import java.security.KeyPair
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
@ -26,6 +27,8 @@ object TestUtils {
|
||||
val keypair = generateKeyPair()
|
||||
val keypair2 = generateKeyPair()
|
||||
}
|
||||
// A dummy time at which we will be pretending test transactions are created.
|
||||
val TEST_TX_TIME = Instant.parse("2015-04-17T12:00:00.00Z")
|
||||
|
||||
// A few dummy values for testing.
|
||||
val MEGA_CORP_KEY = TestUtils.keypair
|
||||
@ -41,16 +44,14 @@ val BOB = BOB_KEY.public
|
||||
val MEGA_CORP = Party("MegaCorp", MEGA_CORP_PUBKEY)
|
||||
val MINI_CORP = Party("MiniCorp", MINI_CORP_PUBKEY)
|
||||
|
||||
val ALL_TEST_KEYS = listOf(MEGA_CORP_KEY, MINI_CORP_KEY, ALICE_KEY, BOB_KEY)
|
||||
val ALL_TEST_KEYS = listOf(MEGA_CORP_KEY, MINI_CORP_KEY, ALICE_KEY, BOB_KEY, DummyTimestampingAuthority.key)
|
||||
|
||||
val TEST_KEYS_TO_CORP_MAP: Map<PublicKey, Party> = mapOf(
|
||||
MEGA_CORP_PUBKEY to MEGA_CORP,
|
||||
MINI_CORP_PUBKEY to MINI_CORP
|
||||
MINI_CORP_PUBKEY to MINI_CORP,
|
||||
DUMMY_TIMESTAMPER.identity.owningKey to DUMMY_TIMESTAMPER.identity
|
||||
)
|
||||
|
||||
// A dummy time at which we will be pretending test transactions are created.
|
||||
val TEST_TX_TIME = Instant.parse("2015-04-17T12:00:00.00Z")
|
||||
|
||||
// In a real system this would be a persistent map of hash to bytecode and we'd instantiate the object as needed inside
|
||||
// a sandbox. For unit tests we just have a hard-coded list.
|
||||
val TEST_PROGRAM_MAP: Map<SecureHash, Class<out Contract>> = mapOf(
|
||||
@ -315,15 +316,17 @@ class TransactionGroupDSL<T : ContractState>(private val stateType: Class<T>) {
|
||||
GraphVisualiser(this as TransactionGroupDSL<ContractState>).display()
|
||||
}
|
||||
|
||||
fun signAll(): List<SignedTransaction> {
|
||||
return txns.map { wtx ->
|
||||
val allPubKeys = wtx.commands.flatMap { it.pubkeys }.toSet()
|
||||
fun signAll(txnsToSign: List<WireTransaction> = txns, vararg extraKeys: KeyPair): List<SignedTransaction> {
|
||||
return txnsToSign.map { wtx ->
|
||||
val allPubKeys = wtx.commands.flatMap { it.pubkeys }.toMutableSet()
|
||||
val bits = wtx.serialize()
|
||||
require(bits == wtx.serialized)
|
||||
val sigs = ArrayList<DigitalSignature.WithKey>()
|
||||
for (key in ALL_TEST_KEYS) {
|
||||
if (allPubKeys.contains(key.public))
|
||||
for (key in ALL_TEST_KEYS + extraKeys) {
|
||||
if (allPubKeys.contains(key.public)) {
|
||||
sigs += key.signWithECDSA(bits)
|
||||
allPubKeys -= key.public
|
||||
}
|
||||
}
|
||||
wtx.toSignedTransaction(sigs)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user