mirror of
synced 2024-12-18 20:47:57 +00:00
This commit is contained in:
@ -98,26 +98,6 @@ class ResolveTransactionsFlowTest {
fun `denial of service check`() {
// Chain lots of txns together.
val stx2 = makeTransactions().second
val count = 50
var cursor = stx2
repeat(count) {
val builder = DummyContract.move(cursor.tx.outRef(0), miniCorp)
val stx = megaCorpNode.services.signInitialTransaction(builder)
megaCorpNode.transaction {
cursor = stx
val p = TestFlow(setOf(cursor.id), megaCorp, 40)
val future = miniCorpNode.startFlow(p)
assertFailsWith<ResolveTransactionsFlow.ExcessivelyLargeTransactionGraph> { future.getOrThrow() }
fun `triangle of transactions resolves fine`() {
fun `triangle of transactions resolves fine`() {
val stx1 = makeTransactions().first
val stx1 = makeTransactions().first
@ -233,21 +213,21 @@ class ResolveTransactionsFlowTest {
open class TestFlow(val otherSide: Party, private val resolveTransactionsFlowFactory: (FlowSession) -> ResolveTransactionsFlow, private val txCountLimit: Int? = null) : FlowLogic<Unit>() {
open class TestFlow(private val otherSide: Party, private val resolveTransactionsFlowFactory: (FlowSession) -> ResolveTransactionsFlow) : FlowLogic<Unit>() {
constructor(txHashes: Set<SecureHash>, otherSide: Party, txCountLimit: Int? = null) : this(otherSide, { ResolveTransactionsFlow(txHashes, it) }, txCountLimit = txCountLimit)
constructor(txHashes: Set<SecureHash>, otherSide: Party) : this(otherSide, { ResolveTransactionsFlow(txHashes, it) })
constructor(stx: SignedTransaction, otherSide: Party) : this(otherSide, { ResolveTransactionsFlow(stx, it) })
constructor(stx: SignedTransaction, otherSide: Party) : this(otherSide, { ResolveTransactionsFlow(stx, it) })
override fun call() {
override fun call() {
val session = initiateFlow(otherSide)
val session = initiateFlow(otherSide)
val resolveTransactionsFlow = resolveTransactionsFlowFactory(session)
val resolveTransactionsFlow = resolveTransactionsFlowFactory(session)
txCountLimit?.let { resolveTransactionsFlow.transactionCountLimit = it }
class TestResponseFlow(val otherSideSession: FlowSession) : FlowLogic<Void?>() {
class TestResponseFlow(private val otherSideSession: FlowSession) : FlowLogic<Void?>() {
override fun call() = subFlow(TestNoSecurityDataVendingFlow(otherSideSession))
override fun call() = subFlow(TestNoSecurityDataVendingFlow(otherSideSession))
@ -1,109 +0,0 @@
package net.corda.coretests.internal
import net.corda.client.mock.Generator
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature
import net.corda.core.crypto.sign
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.internal.topologicalSort
import net.corda.core.serialization.serialize
import net.corda.core.transactions.CoreTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import org.junit.Rule
import org.junit.Test
import rx.Observable
import java.util.*
class TopologicalSortTest {
class DummyTransaction constructor(
override val id: SecureHash,
override val inputs: List<StateRef>,
@Suppress("CanBeParameter") val numberOfOutputs: Int,
override val notary: Party,
override val references: List<StateRef> = emptyList()
) : CoreTransaction() {
override val outputs: List<TransactionState<ContractState>> = (1..numberOfOutputs).map {
TransactionState(DummyState(), Contract::class.java.name, notary)
override val networkParametersHash: SecureHash? = testNetworkParameters().serialize().hash
class DummyState : ContractState {
override val participants: List<AbstractParty> = emptyList()
val testSerialization = SerializationEnvironmentRule()
fun topologicalObservableSort() {
val testIdentity = TestIdentity.fresh("asd")
val N = 10
// generate random tx DAG
val ids = (1..N).map { SecureHash.sha256("$it") }
val forwardsGenerators = (0 until ids.size).map { i ->
Generator.sampleBernoulli(ids.subList(i + 1, ids.size), 0.8).map { outputs -> ids[i] to outputs }
val transactions = Generator.sequence(forwardsGenerators).map { forwardGraph ->
val backGraph = forwardGraph.flatMap { it.second.map { output -> it.first to output } }.fold(HashMap<SecureHash, HashSet<SecureHash>>()) { backGraph, edge ->
backGraph.getOrPut(edge.second) { HashSet() }.add(edge.first)
val outrefCounts = HashMap<SecureHash, Int>()
val transactions = ArrayList<SignedTransaction>()
for ((id, outputs) in forwardGraph) {
val inputs = (backGraph[id]?.toList() ?: emptyList()).map { inputTxId ->
val ref = outrefCounts.compute(inputTxId) { _, count ->
if (count == null) {
} else {
count + 1
StateRef(inputTxId, ref)
val tx = DummyTransaction(id, inputs, outputs.size, testIdentity.party)
val bits = tx.serialize().bytes
val sig = TransactionSignature(testIdentity.keyPair.private.sign(bits).bytes, testIdentity.publicKey, SignatureMetadata(0, 0))
val stx = SignedTransaction(tx, listOf(sig))
// Swap two random items
transactions.combine(Generator.intRange(0, N - 1), Generator.intRange(0, N - 2)) { txs, i, _ ->
val k = 0 // if (i == j) i + 1 else j
val tmp = txs[i]
txs[i] = txs[k]
txs[k] = tmp
val random = SplittableRandom()
for (i in 1..100) {
val txs = transactions.generateOrFail(random)
val ordered = Observable.from(txs).topologicalSort(emptyList()).toList().toBlocking().first()
private fun checkTopologicallyOrdered(txs: List<SignedTransaction>) {
val outputs = HashSet<StateRef>()
for (tx in txs) {
if (!outputs.containsAll(tx.inputs)) {
throw IllegalStateException("Transaction $tx's inputs ${tx.inputs} are not satisfied by $outputs")
outputs.addAll(tx.coreTransaction.outputs.mapIndexed { i, _ -> StateRef(tx.id, i) })
@ -110,12 +110,13 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
private fun loadWhatWeHave(): Pair<List<T>, List<SecureHash>> {
private fun loadWhatWeHave(): Pair<List<T>, List<SecureHash>> {
val fromDisk = ArrayList<T>()
val fromDisk = ArrayList<T>()
val toFetch = ArrayList<SecureHash>()
val toFetch = ArrayList<SecureHash>()
for (txid in requests) {
for (id in requests) {
val stx = load(txid)
val item = load(id)
if (stx == null)
if (item == null) {
toFetch += txid
toFetch += id
} else {
fromDisk += stx
fromDisk += item
return Pair(fromDisk, toFetch)
return Pair(fromDisk, toFetch)
@ -2,37 +2,29 @@ package net.corda.core.internal
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.DeleteForDJVM
import net.corda.core.DeleteForDJVM
import net.corda.core.contracts.TransactionResolutionException
import net.corda.core.contracts.TransactionVerificationException
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.FlowSession
import net.corda.core.node.ServiceHub
import net.corda.core.node.StatesToRecord
import net.corda.core.node.StatesToRecord
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.ContractUpgradeWireTransaction
import net.corda.core.transactions.ContractUpgradeWireTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.exactAdd
import net.corda.core.utilities.debug
import java.util.*
import kotlin.collections.ArrayList
import kotlin.math.min
// TODO: This code is currently unit tested by TwoPartyTradeFlowTests, it should have its own tests.
* Resolves transactions for the specified [txHashes] along with their full history (dependency graph) from [otherSide].
* Resolves transactions for the specified [txHashes] along with their full history (dependency graph) from [otherSide].
* Each retrieved transaction is validated and inserted into the local transaction storage.
* Each retrieved transaction is validated and inserted into the local transaction storage.
class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>,
class ResolveTransactionsFlow private constructor(
private val otherSide: FlowSession,
private val initialTx: SignedTransaction?,
private val statesToRecord: StatesToRecord = StatesToRecord.NONE) : FlowLogic<Unit>() {
private val txHashes: Set<SecureHash>,
private val otherSide: FlowSession,
private val statesToRecord: StatesToRecord
) : FlowLogic<Unit>() {
// Need it ordered in terms of iteration. Needs to be a variable for the check-pointing logic to work.
constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE)
private val txHashes = txHashesArg.toList()
: this(null, txHashes, otherSide, statesToRecord)
/** Transaction to fetch attachments for. */
private var signedTransaction: SignedTransaction? = null
* Resolves and validates the dependencies of the specified [SignedTransaction]. Fetches the attachments, but does
* Resolves and validates the dependencies of the specified [SignedTransaction]. Fetches the attachments, but does
@ -40,88 +32,74 @@ class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>,
* @return a list of verified [SignedTransaction] objects, in a depth-first order.
* @return a list of verified [SignedTransaction] objects, in a depth-first order.
constructor(signedTransaction: SignedTransaction, otherSide: FlowSession) : this(dependencyIDs(signedTransaction), otherSide) {
constructor(transaction: SignedTransaction, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE)
this.signedTransaction = signedTransaction
: this(transaction, transaction.dependencies, otherSide, statesToRecord)
private companion object {
private val MAX_CHECKPOINT_RESOLUTION = Integer.getInteger("${ResolveTransactionsFlow::class.java.name}.max-checkpoint-resolution", 0)
private val SignedTransaction.dependencies: Set<SecureHash>
get() = (inputs.asSequence() + references.asSequence()).map { it.txhash }.toSet()
constructor(signedTransaction: SignedTransaction, otherSide: FlowSession, statesToRecord: StatesToRecord) : this(dependencyIDs(signedTransaction), otherSide, statesToRecord) {
private var fetchNetParamsFromCounterpart = false
this.signedTransaction = signedTransaction
companion object {
private fun dependencyIDs(stx: SignedTransaction) = stx.inputs.map { it.txhash }.toSet() + stx.references.map { it.txhash }.toSet()
private const val RESOLUTION_PAGE_SIZE = 100
/** Topologically sorts the given transactions such that dependencies are listed before dependers. */
fun topologicalSort(transactions: Collection<SignedTransaction>): List<SignedTransaction> {
val sort = TopologicalSort()
for (tx in transactions) {
return sort.complete()
class ExcessivelyLargeTransactionGraph : FlowException()
// TODO: Figure out a more appropriate DOS limit here, 5000 is simply a very bad guess.
/** The maximum number of transactions this flow will try to download before bailing out. */
var transactionCountLimit = 5000
set(value) {
require(value > 0) { "$value is not a valid count limit" }
field = value
@Throws(FetchDataFlow.HashNotFound::class, FetchDataFlow.IllegalTransactionRequest::class)
override fun call() {
override fun call() {
val counterpartyPlatformVersion = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSide.counterparty)?.platformVersion
// TODO This error should actually case the flow to be sent to the flow hospital to be retried
?: throw FlowException("Couldn't retrieve party's ${otherSide.counterparty} platform version from NetworkMapCache")
val counterpartyPlatformVersion = checkNotNull(serviceHub.networkMapCache.getNodeByLegalIdentity(otherSide.counterparty)?.platformVersion) {
val newTxns = ArrayList<SignedTransaction>(txHashes.size)
"Couldn't retrieve party's ${otherSide.counterparty} platform version from NetworkMapCache"
// Start fetching data.
for (pageNumber in 0..(txHashes.size - 1) / RESOLUTION_PAGE_SIZE) {
val page = page(pageNumber, RESOLUTION_PAGE_SIZE)
newTxns += downloadDependencies(page)
val txsWithMissingAttachments = if (pageNumber == 0) signedTransaction?.let { newTxns + it }
?: newTxns else newTxns
// Fetch missing parameters flow was added in version 4. This check is needed so we don't end up with node V4 sending parameters
// request to node V3 that doesn't know about this protocol.
if (counterpartyPlatformVersion >= 4) {
// Fetch missing parameters flow was added in version 4. This check is needed so we don't end up with node V4 sending parameters
// request to node V3 that doesn't know about this protocol.
fetchNetParamsFromCounterpart = counterpartyPlatformVersion >= 4
if (initialTx != null) {
val (newTxIdsSorted, newTxs) = downloadDependencies()
// Finish fetching data.
// Finish fetching data.
val result = topologicalSort(newTxns)
logger.debug { "Downloaded transaction dependencies of ${newTxIdsSorted.size} transactions" }
// If transaction resolution is performed for a transaction where some states are relevant, then those should be
// If transaction resolution is performed for a transaction where some states are relevant, then those should be
// recorded if this has not already occurred.
// recorded if this has not already occurred.
val usedStatesToRecord = if (statesToRecord == StatesToRecord.NONE) StatesToRecord.ONLY_RELEVANT else statesToRecord
val usedStatesToRecord = if (statesToRecord == StatesToRecord.NONE) StatesToRecord.ONLY_RELEVANT else statesToRecord
result.forEach {
if (newTxs != null) {
// For each transaction, verify it and insert it into the database. As we are iterating over them in a
for (txId in newTxIdsSorted) {
// depth-first order, we should not encounter any verification failures due to missing data. If we fail
val tx = newTxs.getValue(txId)
// half way through, it's no big deal, although it might result in us attempting to re-download data
// For each transaction, verify it and insert it into the database. As we are iterating over them in a
// redundantly next time we attempt verification.
// depth-first order, we should not encounter any verification failures due to missing data. If we fail
// half way through, it's no big deal, although it might result in us attempting to re-download data
serviceHub.recordTransactions(usedStatesToRecord, listOf(it))
// redundantly next time we attempt verification.
serviceHub.recordTransactions(usedStatesToRecord, listOf(tx))
} else {
val transactionStorage = serviceHub.validatedTransactions as WritableTransactionStorage
for (txId in newTxIdsSorted) {
// Retrieve and delete the transaction from the unverified store.
val (tx, isVerified) = checkNotNull(transactionStorage.getTransactionInternal(txId)) {
"Somehow the unverified transaction ($txId) that we stored previously is no longer there."
if (!isVerified) {
serviceHub.recordTransactions(usedStatesToRecord, listOf(tx))
} else {
logger.debug { "No need to record $txId as it's already been verified" }
private fun page(pageNumber: Int, pageSize: Int): Set<SecureHash> {
val offset = pageNumber * pageSize
val limit = min(offset + pageSize, txHashes.size)
// call toSet() is needed because sub-lists are not checkpoint-friendly.
return txHashes.subList(offset, limit).toSet()
// TODO use paging here (we literally get the entire dependencies graph in memory)
private fun downloadDependencies(): Pair<List<SecureHash>, Map<SecureHash, SignedTransaction>?> {
private fun downloadDependencies(depsToCheck: Set<SecureHash>): List<SignedTransaction> {
val transactionStorage = serviceHub.validatedTransactions as WritableTransactionStorage
// Maintain a work queue of all hashes to load/download, initialised with our starting set. Then do a breadth
// Maintain a work queue of all hashes to load/download, initialised with our starting set. Then do a breadth
// first traversal across the dependency graph.
// first traversal across the dependency graph.
@ -138,38 +116,50 @@ class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>,
// the db contain the identities that were resolved when the transaction was first checked, or should we
// the db contain the identities that were resolved when the transaction was first checked, or should we
// accept this kind of change is possible? Most likely solution is for identity data to be an attachment.
// accept this kind of change is possible? Most likely solution is for identity data to be an attachment.
val nextRequests = LinkedHashSet<SecureHash>() // Keep things unique but ordered, for unit test stability.
val nextRequests = LinkedHashSet<SecureHash>(txHashes) // Keep things unique but ordered, for unit test stability.
val topologicalSort = TopologicalSort()
val resultQ = LinkedHashMap<SecureHash, SignedTransaction>()
var downloadedTxs: MutableMap<SecureHash, SignedTransaction>? = HashMap()
val limit = transactionCountLimit
var limitCounter = 0
while (nextRequests.isNotEmpty()) {
while (nextRequests.isNotEmpty()) {
// Don't re-download the same tx when we haven't verified it yet but it's referenced multiple times in the
// Don't re-download the same tx when we haven't verified it yet but it's referenced multiple times in the
// graph we're traversing.
// graph we're traversing.
val notAlreadyFetched: Set<SecureHash> = nextRequests - resultQ.keys
if (nextRequests.isEmpty()) {
// Done early.
if (notAlreadyFetched.isEmpty()) // Done early.
// Request the standalone transaction data (which may refer to things we don't yet have).
// Request the standalone transaction data (which may refer to things we don't yet have).
// TODO use paging here
val freshDownloads = subFlow(FetchTransactionsFlow(nextRequests, otherSide)).downloaded
val downloads: List<SignedTransaction> = subFlow(FetchTransactionsFlow(notAlreadyFetched, otherSide)).downloaded
for (stx in downloads)
for (downloaded in freshDownloads) {
check(resultQ.putIfAbsent(stx.id, stx) == null) // Assert checks the filter at the start.
val dependencies = downloaded.dependencies
topologicalSort.add(downloaded.id, dependencies)
// Add all input states and reference input states to the work queue.
// Add all input states and reference input states to the work queue.
if (downloadedTxs != null) {
val inputHashes = downloads.flatMap { it.inputs + it.references }.map { it.txhash }
if (downloadedTxs.size < MAX_CHECKPOINT_RESOLUTION) {
downloadedTxs[downloaded.id] = downloaded
} else {
logger.info("Resolving transaction dependencies has reached a checkpoint limit of $MAX_CHECKPOINT_RESOLUTION " +
"transactions. Switching to the node database for storing the unverified transactions.")
// This acts as both a flag that we've switched over to storing the backchain into the db, and to remove what's been
// built up in the checkpoint
downloadedTxs = null
} else {
limitCounter = limitCounter exactAdd nextRequests.size
if (limitCounter > limit)
throw ExcessivelyLargeTransactionGraph()
return resultQ.values.toList()
return Pair(topologicalSort.complete(), downloadedTxs)
@ -178,25 +168,23 @@ class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>,
// TODO: This could be done in parallel with other fetches for extra speed.
// TODO: This could be done in parallel with other fetches for extra speed.
private fun fetchMissingAttachments(downloads: List<SignedTransaction>) {
private fun fetchMissingAttachments(transaction: SignedTransaction) {
val attachments = downloads.map(SignedTransaction::coreTransaction).flatMap { tx ->
val tx = transaction.coreTransaction
when (tx) {
val attachmentIds = when (tx) {
is WireTransaction -> tx.attachments
is WireTransaction -> tx.attachments.toSet()
is ContractUpgradeWireTransaction -> listOf(tx.legacyContractAttachmentId, tx.upgradedContractAttachmentId)
is ContractUpgradeWireTransaction -> setOf(tx.legacyContractAttachmentId, tx.upgradedContractAttachmentId)
else -> emptyList()
else -> return
val missingAttachments = attachments.filter { serviceHub.attachments.openAttachment(it) == null }
subFlow(FetchAttachmentsFlow(attachmentIds, otherSide))
if (missingAttachments.isNotEmpty())
subFlow(FetchAttachmentsFlow(missingAttachments.toSet(), otherSide))
// TODO This can also be done in parallel. See comment to [fetchMissingAttachments] above.
// TODO This can also be done in parallel. See comment to [fetchMissingAttachments] above.
private fun fetchMissingParameters(downloads: List<SignedTransaction>) {
private fun fetchMissingNetworkParameters(transaction: SignedTransaction) {
val parameters = downloads.mapNotNull { it.networkParametersHash }
if (fetchNetParamsFromCounterpart) {
val missingParameters = parameters.filter { !(serviceHub.networkParametersService as NetworkParametersStorage).hasParameters(it) }
transaction.networkParametersHash?.let {
if (missingParameters.isNotEmpty())
subFlow(FetchNetworkParametersFlow(setOf(it), otherSide))
subFlow(FetchNetworkParametersFlow(missingParameters.toSet(), otherSide))
@ -1,118 +1,48 @@
package net.corda.core.internal
package net.corda.core.internal
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SecureHash
import net.corda.core.transactions.SignedTransaction
import java.util.*
import rx.Observable
import java.util.Collections.reverse
* Provides a way to topologically sort SignedTransactions. This means that given any two transactions T1 and T2 in the
* Provides a way to topologically sort SignedTransactions represented just their [SecureHash] IDs. This means that given any two transactions
* list returned by [complete] if T1 is a dependency of T2 then T1 will occur earlier than T2.
* T1 and T2 in the list returned by [complete] if T1 is a dependency of T2 then T1 will occur earlier than T2.
class TopologicalSort {
class TopologicalSort {
private val forwardGraph = HashMap<SecureHash, LinkedHashSet<SignedTransaction>>()
private val forwardGraph = HashMap<SecureHash, MutableSet<SecureHash>>()
private val transactions = ArrayList<SignedTransaction>()
private val transactionIds = LinkedHashSet<SecureHash>()
val seenTransactionIds: Set<SecureHash> get() = Collections.unmodifiableSet(transactionIds)
* Add a transaction to the to-be-sorted set of transactions.
* Add a transaction to the to-be-sorted set of transactions.
* @param txId The ID of the transaction.
* @param dependentIds the IDs of all the transactions [txId] depends on.
fun add(stx: SignedTransaction) {
fun add(txId: SecureHash, dependentIds: Set<SecureHash>) {
val stateRefs = stx.references + stx.inputs
require(transactionIds.add(txId)) { "Transaction ID $txId already seen" }
stateRefs.forEach { (txhash) ->
dependentIds.forEach {
// Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is).
// Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is).
forwardGraph.getOrPut(txhash) { LinkedHashSet() }.add(stx)
forwardGraph.computeIfAbsent(it) { LinkedHashSet() }.add(txId)
* Return the sorted list of signed transactions.
* Return the sorted list of transaction IDs.
fun complete(): List<SignedTransaction> {
fun complete(): List<SecureHash> {
val visited = HashSet<SecureHash>(transactions.size)
val visited = HashSet<SecureHash>(transactionIds.size)
val result = ArrayList<SignedTransaction>(transactions.size)
val result = ArrayList<SecureHash>(transactionIds.size)
fun visit(transaction: SignedTransaction) {
fun visit(txId: SecureHash) {
if (transaction.id !in visited) {
if (visited.add(txId)) {
result += txId
return result.reversed()
return result.apply(::reverse)
private fun getOutputStateRefs(stx: SignedTransaction): List<StateRef> {
return stx.coreTransaction.outputs.mapIndexed { i, _ -> StateRef(stx.id, i) }
* Topologically sort a SignedTransaction Observable on the fly by buffering transactions until all dependencies are met.
* @param initialUnspentRefs the list of unspent references that may be spent by transactions in the observable. This is
* the initial set of references the sort uses to decide whether to buffer transactions or not. For example if this
* is empty then the Observable should start with issue transactions that don't have inputs.
fun Observable<SignedTransaction>.topologicalSort(initialUnspentRefs: Collection<StateRef>): Observable<SignedTransaction> {
data class State(
val unspentRefs: HashSet<StateRef>,
val bufferedTopologicalSort: TopologicalSort,
val bufferedInputs: HashSet<StateRef>,
val bufferedOutputs: HashSet<StateRef>
var state = State(
unspentRefs = HashSet(initialUnspentRefs),
bufferedTopologicalSort = TopologicalSort(),
bufferedInputs = HashSet(),
bufferedOutputs = HashSet()
return concatMapIterable { stx ->
val results = ArrayList<SignedTransaction>()
if (state.unspentRefs.containsAll(stx.inputs)) {
// Dependencies are satisfied
} else {
// Dependencies are not satisfied, buffer
for (outputRef in getOutputStateRefs(stx)) {
if (!state.bufferedInputs.remove(outputRef)) {
for (inputRef in stx.inputs) {
if (!state.bufferedOutputs.remove(inputRef)) {
if (state.unspentRefs.containsAll(state.bufferedInputs)) {
// Buffer satisfied
state = State(
unspentRefs = state.unspentRefs,
bufferedTopologicalSort = TopologicalSort(),
bufferedInputs = HashSet(),
bufferedOutputs = HashSet()
} else {
// Buffer not satisfied
state = State(
unspentRefs = state.unspentRefs,
bufferedTopologicalSort = state.bufferedTopologicalSort,
bufferedInputs = state.bufferedInputs,
bufferedOutputs = state.bufferedOutputs
@ -0,0 +1,29 @@
package net.corda.core.internal
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.TransactionStorage
import net.corda.core.transactions.SignedTransaction
* Thread-safe storage of transactions.
interface WritableTransactionStorage : TransactionStorage {
* Add a new *verified* transaction to the store, or convert the existing unverified transaction into a verified one.
* @param transaction The transaction to be recorded.
* @return true if the transaction was recorded as a *new verified* transcation, false if the transaction already exists.
// TODO: Throw an exception if trying to add a transaction with fewer signatures than an existing entry.
fun addTransaction(transaction: SignedTransaction): Boolean
* Add a new *unverified* transaction to the store.
fun addUnverifiedTransaction(transaction: SignedTransaction)
* Return the transaction with the given ID from the store, and a flag of whether it's verified. Returns null if no transaction with the
* ID exists.
fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, Boolean>?
@ -0,0 +1,58 @@
package net.corda.core.internal
import net.corda.core.crypto.SecureHash
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
class TopologicalSortTest {
private val topologicalSort = TopologicalSort()
private val t1 = SecureHash.randomSHA256()
private val t2 = SecureHash.randomSHA256()
private val t3 = SecureHash.randomSHA256()
private val t4 = SecureHash.randomSHA256()
fun issuance() {
topologicalSort.add(t1, emptySet())
fun `T1 to T2`() {
topologicalSort.add(t2, setOf(t1))
topologicalSort.add(t1, emptySet())
assertThat(topologicalSort.complete()).containsExactly(t1, t2)
fun `T1 to T2, T1 to T3`() {
topologicalSort.add(t3, setOf(t1))
topologicalSort.add(t2, setOf(t1))
topologicalSort.add(t1, emptySet())
val sorted = topologicalSort.complete()
assertThat(listOf(t1, t2).map(sorted::indexOf)).isSorted
assertThat(listOf(t1, t3).map(sorted::indexOf)).isSorted
fun `T1 to T2 to T4, T1 to T3 to T4`() {
topologicalSort.add(t4, setOf(t2, t3))
topologicalSort.add(t3, setOf(t1))
topologicalSort.add(t2, setOf(t1))
topologicalSort.add(t1, emptySet())
val sorted = topologicalSort.complete()
assertThat(listOf(t1, t2, t4).map(sorted::indexOf)).isSorted
assertThat(listOf(t1, t3, t4).map(sorted::indexOf)).isSorted
fun `T1 to T2 to T3 to T4, T1 to T4`() {
topologicalSort.add(t4, setOf(t2, t1))
topologicalSort.add(t3, setOf(t2))
topologicalSort.add(t2, setOf(t1))
topologicalSort.add(t1, emptySet())
val sorted = topologicalSort.complete()
assertThat(listOf(t1, t2, t3, t4).map(sorted::indexOf)).isSorted
assertThat(listOf(t1, t4).map(sorted::indexOf)).isSorted
@ -7,6 +7,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.WritableTransactionStorage
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.messaging.StateMachineTransactionMapping
@ -15,7 +16,6 @@ import net.corda.core.node.ServiceHub
import net.corda.core.node.StatesToRecord
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCacheBase
import net.corda.core.node.services.NetworkMapCacheBase
import net.corda.core.node.services.TransactionStorage
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.contextLogger
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.internal.InitiatedFlowFactory
@ -51,7 +51,8 @@ interface ServiceHubInternal : ServiceHub {
companion object {
companion object {
private val log = contextLogger()
private val log = contextLogger()
fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>,
fun recordTransactions(statesToRecord: StatesToRecord,
txs: Iterable<SignedTransaction>,
validatedTransactions: WritableTransactionStorage,
validatedTransactions: WritableTransactionStorage,
stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage,
stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage,
vaultService: VaultServiceInternal,
vaultService: VaultServiceInternal,
@ -175,19 +176,6 @@ interface FlowStarter {
interface StartedNodeServices : ServiceHubInternal, FlowStarter
interface StartedNodeServices : ServiceHubInternal, FlowStarter
* Thread-safe storage of transactions.
interface WritableTransactionStorage : TransactionStorage {
* Add a new transaction to the store. If the store already has a transaction with the same id it will be
* overwritten.
* @param transaction The transaction to be recorded.
* @return true if the transaction was recorded successfully, false if it was already recorded.
// TODO: Throw an exception if trying to add a transaction with fewer signatures than an existing entry.
fun addTransaction(transaction: SignedTransaction): Boolean
* This is the interface to storage storing state machine -> recorded tx mappings. Any time a transaction is recorded
* This is the interface to storage storing state machine -> recorded tx mappings. Any time a transaction is recorded
@ -3,10 +3,7 @@ package net.corda.node.services.persistence
import net.corda.core.concurrent.CordaFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.crypto.TransactionSignature
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.*
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.*
import net.corda.core.serialization.*
@ -14,26 +11,17 @@ import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.core.toFuture
import net.corda.core.toFuture
import net.corda.core.transactions.CoreTransaction
import net.corda.core.transactions.CoreTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.utilities.AppendOnlyPersistentMapBase
import net.corda.node.utilities.AppendOnlyPersistentMapBase
import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap
import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.*
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY
import rx.Observable
import rx.Observable
import rx.subjects.PublishSubject
import rx.subjects.PublishSubject
import javax.persistence.*
import javax.persistence.*
import kotlin.streams.toList
import kotlin.streams.toList
// cache value type to just store the immutable bits of a signed transaction plus conversion helpers
typealias TxCacheValue = Pair<SerializedBytes<CoreTransaction>, List<TransactionSignature>>
fun TxCacheValue.toSignedTx() = SignedTransaction(this.first, this.second)
fun SignedTransaction.toTxCacheValue() = TxCacheValue(this.txBits, this.sigs)
class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: NamedCacheFactory) : WritableTransactionStorage, SingletonSerializeAsToken() {
class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: NamedCacheFactory) : WritableTransactionStorage, SingletonSerializeAsToken() {
@ -41,17 +29,27 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
class DBTransaction(
class DBTransaction(
@Column(name = "tx_id", length = 64, nullable = false)
@Column(name = "tx_id", length = 64, nullable = false)
var txId: String = "",
val txId: String,
@Column(name = "state_machine_run_id", length = 36, nullable = true)
@Column(name = "state_machine_run_id", length = 36, nullable = true)
var stateMachineRunId: String? = "",
val stateMachineRunId: String?,
@Column(name = "transaction_value", nullable = false)
@Column(name = "transaction_value", nullable = false)
var transaction: ByteArray = EMPTY_BYTE_ARRAY
val transaction: ByteArray,
@Column(name = "is_verified", nullable = false)
val isVerified: Boolean
private companion object {
private companion object {
// Rough estimate for the average of a public key and the transaction metadata - hard to get exact figures here,
// as public keys can vary in size a lot, and if someone else is holding a reference to the key, it won't add
// to the memory pressure at all here.
private const val transactionSignatureOverheadEstimate = 1024
private val logger = contextLogger()
private fun contextToUse(): SerializationContext {
private fun contextToUse(): SerializationContext {
return if (effectiveSerializationEnv.serializationFactory.currentContext?.useCase == SerializationContext.UseCase.Storage) {
return if (effectiveSerializationEnv.serializationFactory.currentContext?.useCase == SerializationContext.UseCase.Storage) {
@ -65,46 +63,89 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
return WeightBasedAppendOnlyPersistentMap<SecureHash, TxCacheValue, DBTransaction, String>(
return WeightBasedAppendOnlyPersistentMap<SecureHash, TxCacheValue, DBTransaction, String>(
cacheFactory = cacheFactory,
cacheFactory = cacheFactory,
name = "DBTransactionStorage_transactions",
name = "DBTransactionStorage_transactions",
toPersistentEntityKey = { it.toString() },
toPersistentEntityKey = SecureHash::toString,
fromPersistentEntity = {
fromPersistentEntity = {
SecureHash.parse(it.txId) to TxCacheValue(it.transaction.deserialize(context = contextToUse()), it.isVerified)
it.transaction.deserialize<SignedTransaction>(context = contextToUse())
toPersistentEntity = { key: SecureHash, value: TxCacheValue ->
toPersistentEntity = { key: SecureHash, value: TxCacheValue ->
DBTransaction().apply {
txId = key.toString()
txId = key.toString(),
stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id?.uuid?.toString()
stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id?.uuid?.toString(),
transaction = value.toSignedTx().serialize(context = contextToUse()).bytes
transaction = value.toSignedTx().serialize(context = contextToUse()).bytes,
isVerified = value.isVerified
persistentEntityClass = DBTransaction::class.java,
persistentEntityClass = DBTransaction::class.java,
weighingFunc = { hash, tx -> hash.size + weighTx(tx) }
weighingFunc = { hash, tx -> hash.size + weighTx(tx) }
// Rough estimate for the average of a public key and the transaction metadata - hard to get exact figures here,
// as public keys can vary in size a lot, and if someone else is holding a reference to the key, it won't add
// to the memory pressure at all here.
private const val transactionSignatureOverheadEstimate = 1024
private fun weighTx(tx: AppendOnlyPersistentMapBase.Transactional<TxCacheValue>): Int {
private fun weighTx(tx: AppendOnlyPersistentMapBase.Transactional<TxCacheValue>): Int {
val actTx = tx.peekableValue ?: return 0
val actTx = tx.peekableValue ?: return 0
return actTx.second.sumBy { it.size + transactionSignatureOverheadEstimate } + actTx.first.size
return actTx.sigs.sumBy { it.size + transactionSignatureOverheadEstimate } + actTx.txBits.size
private val txStorage = ThreadBox(createTransactionsMap(cacheFactory))
private val txStorage = ThreadBox(createTransactionsMap(cacheFactory))
override fun addTransaction(transaction: SignedTransaction): Boolean = database.transaction {
override fun addTransaction(transaction: SignedTransaction): Boolean {
txStorage.locked {
return database.transaction {
addWithDuplicatesAllowed(transaction.id, transaction.toTxCacheValue()).apply {
txStorage.locked {
val added = addWithDuplicatesAllowed(transaction.id, TxCacheValue(transaction, isVerified = true))
if (added) {
logger.debug { "Recorded transaction ${transaction.id}." }
} else {
// We need to check that what exists in the database is verified or not.
if (get(transaction.id)!!.isVerified) {
logger.debug { "Transaction ${transaction.id} already exists so no need to record." }
// Transaction is already verified so there's nothing to do
} else {
// If it isn't verified then we can simply flip the switch and then report the transaction as "added" as per the
// contract for this method.
.createQuery("UPDATE ${DBTransaction::class.java.name} T SET T.isVerified = true WHERE T.txId = :txId")
.setParameter("txId", transaction.id.toString())
logger.debug { "Previously unverified transaction ${transaction.id} has been recorded as verified." }
override fun getTransaction(id: SecureHash): SignedTransaction? = database.transaction { txStorage.content[id]?.toSignedTx() }
private fun onNewTx(transaction: SignedTransaction): Boolean {
return true
override fun getTransaction(id: SecureHash): SignedTransaction? {
return database.transaction {
txStorage.content[id]?.let { if (it.isVerified) it.toSignedTx() else null }
override fun addUnverifiedTransaction(transaction: SignedTransaction) {
database.transaction {
txStorage.locked {
val added = addWithDuplicatesAllowed(transaction.id, TxCacheValue(transaction, isVerified = false))
if (added) {
logger.debug { "Transaction ${transaction.id} recorded as unverified." }
} else {
logger.info("Transaction ${transaction.id} already exists so no need to record.")
override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, Boolean>? {
return database.transaction {
txStorage.content[id]?.let { it.toSignedTx() to it.isVerified }
private val updatesPublisher = PublishSubject.create<SignedTransaction>().toSerialized()
private val updatesPublisher = PublishSubject.create<SignedTransaction>().toSerialized()
override val updates: Observable<SignedTransaction> = updatesPublisher.wrapWithDatabaseTransaction()
override val updates: Observable<SignedTransaction> = updatesPublisher.wrapWithDatabaseTransaction()
@ -120,18 +161,32 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
override fun trackTransaction(id: SecureHash): CordaFuture<SignedTransaction> {
override fun trackTransaction(id: SecureHash): CordaFuture<SignedTransaction> {
return database.transaction {
return database.transaction {
txStorage.locked {
txStorage.locked {
val existingTransaction = get(id)
val existingTransaction = getTransaction(id)
if (existingTransaction == null) {
if (existingTransaction == null) {
updates.filter { it.id == id }.toFuture()
updates.filter { it.id == id }.toFuture()
} else {
} else {
// Cache value type to just store the immutable bits of a signed transaction plus conversion helpers
private data class TxCacheValue(
val txBits: SerializedBytes<CoreTransaction>,
val sigs: List<TransactionSignature>,
val isVerified: Boolean
) {
constructor(stx: SignedTransaction, isVerified: Boolean) : this(stx.txBits, stx.sigs, isVerified)
fun toSignedTx() = SignedTransaction(txBits, sigs)
val transactions: List<SignedTransaction> get() = database.transaction { snapshot() }
val transactions: List<SignedTransaction> get() = database.transaction { snapshot() }
private fun snapshot() = txStorage.content.allPersisted.use { it.map { it.second.toSignedTx() }.toList() }
private fun snapshot(): List<SignedTransaction> {
return txStorage.content.allPersisted.use {
it.filter { it.second.isVerified }.map { it.second.toSignedTx() }.toList()
@ -803,7 +803,7 @@ class SingleThreadedStateMachineManager(
val interceptors = ArrayList<TransitionInterceptor>()
val interceptors = ArrayList<TransitionInterceptor>()
interceptors.add { HospitalisingInterceptor(flowHospital, it) }
interceptors.add { HospitalisingInterceptor(flowHospital, it) }
if (serviceHub.configuration.devMode) {
if (serviceHub.configuration.devMode) {
interceptors.add { DumpHistoryOnErrorInterceptor(it) }
// interceptors.add { DumpHistoryOnErrorInterceptor(it) }
if (serviceHub.configuration.shouldCheckCheckpoints()) {
if (serviceHub.configuration.shouldCheckCheckpoints()) {
interceptors.add { FiberDeserializationCheckingInterceptor(fiberDeserializationChecker!!, it) }
interceptors.add { FiberDeserializationCheckingInterceptor(fiberDeserializationChecker!!, it) }
@ -137,6 +137,10 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
fun invalidate(key: Any) {
private fun loadValue(key: K): V? {
private fun loadValue(key: K): V? {
val session = currentDBSession()
val session = currentDBSession()
val flushing = contextTransaction.flushing
val flushing = contextTransaction.flushing
@ -32,7 +32,7 @@ import net.corda.finance.contracts.asset.CASH
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.TwoPartyTradeFlow.Buyer
import net.corda.finance.flows.TwoPartyTradeFlow.Buyer
import net.corda.finance.flows.TwoPartyTradeFlow.Seller
import net.corda.finance.flows.TwoPartyTradeFlow.Seller
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.core.internal.WritableTransactionStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.checkpoints
import net.corda.node.services.persistence.checkpoints
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -30,7 +30,7 @@ import net.corda.finance.test.SampleCashSchemaV2
import net.corda.finance.test.SampleCashSchemaV3
import net.corda.finance.test.SampleCashSchemaV3
import net.corda.finance.contracts.utils.sumCash
import net.corda.finance.contracts.utils.sumCash
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.core.internal.WritableTransactionStorage
import net.corda.node.services.schema.ContractStateAndRef
import net.corda.node.services.schema.ContractStateAndRef
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.schema.PersistentStateService
import net.corda.node.services.schema.PersistentStateService
@ -27,7 +27,7 @@ import net.corda.finance.schemas.CashSchemaV1
import net.corda.finance.workflows.asset.CashUtils
import net.corda.finance.workflows.asset.CashUtils
import net.corda.finance.workflows.getCashBalance
import net.corda.finance.workflows.getCashBalance
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.core.internal.WritableTransactionStorage
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyContract
@ -0,0 +1,56 @@
package net.corda.traderdemo
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.*
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import java.time.Instant
fun main(args: Array<String>) {
val chainLength = args[0].toInt()
driver(DriverParameters(inMemoryDB = false, notarySpecs = listOf(NotarySpec(DUMMY_NOTARY_NAME, false)))) {
val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
.map { startNode(NodeParameters(providedName = it, additionalCordapps = FINANCE_CORDAPPS)) }
.map { it.getOrThrow() }
alice.rpc.startFlow(::CashIssueFlow, 1.DOLLARS, OpaqueBytes.of(1), defaultNotaryIdentity).returnValue.getOrThrow()
repeat(chainLength / 2) {
alice.rpc.startFlow(::CashPaymentFlow, 1.DOLLARS, bob.nodeInfo.singleIdentity(), false).returnValue.getOrThrow()
bob.rpc.startFlow(::CashPaymentFlow, 1.DOLLARS, alice.nodeInfo.singleIdentity(), false).returnValue.getOrThrow()
val current = (it + 1) * 2
if (current % 100 == 0) {
println("${Instant.now()} $current")
private fun DriverDSL.asds(nodeA: NodeHandle, nodeB: NodeHandle, nodeC: NodeHandle, backchainLength: Int) {
timeBackchainResolution(nodeA, nodeB, nodeC, backchainLength) // warm up
var time = 0L
repeat(3) {
val duration = timeBackchainResolution(nodeA, nodeB, nodeC, backchainLength)
println("Backchain length $backchainLength took $duration ms")
time += duration
println("Average ${time / 3} ms")
private fun DriverDSL.timeBackchainResolution(nodeA: NodeHandle, nodeB: NodeHandle, nodeC: NodeHandle, backchainLength: Int): Long {
nodeA.rpc.startFlow(::CashIssueFlow, 1.DOLLARS, OpaqueBytes.of(1), defaultNotaryIdentity).returnValue.getOrThrow()
repeat(backchainLength / 2) {
nodeA.rpc.startFlow(::CashPaymentFlow, 1.DOLLARS, nodeB.nodeInfo.singleIdentity(), false).returnValue.getOrThrow()
nodeB.rpc.startFlow(::CashPaymentFlow, 1.DOLLARS, nodeA.nodeInfo.singleIdentity(), false).returnValue.getOrThrow()
val start = System.currentTimeMillis()
nodeA.rpc.startFlow(::CashPaymentFlow, 1.DOLLARS, nodeC.nodeInfo.singleIdentity(), false).returnValue.getOrThrow()
return System.currentTimeMillis() - start
@ -2,9 +2,15 @@ package net.corda.traderdemo
import joptsimple.OptionParser
import joptsimple.OptionParser
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.internal.logElapsedTime
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.DOLLARS
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import kotlin.system.exitProcess
import kotlin.system.exitProcess
@ -13,7 +19,30 @@ import kotlin.system.exitProcess
* This entry point allows for command line running of the trader demo functions on nodes started by Main.kt.
* This entry point allows for command line running of the trader demo functions on nodes started by Main.kt.
fun main(args: Array<String>) {
fun main(args: Array<String>) {
// TraderDemo().main(args)
val bocProxy = CordaRPCClient(NetworkHostAndPort("localhost", 10006)).start("bankUser", "test").proxy
val bigProxy = CordaRPCClient(NetworkHostAndPort("localhost", 10009)).start("bigCorpUser", "test").proxy
val notaryProxy = CordaRPCClient(NetworkHostAndPort("localhost", 10003)).start("bankUser", "test").proxy
val boc = bocProxy.nodeInfo().legalIdentities.single()
val big = bigProxy.nodeInfo().legalIdentities.single()
val notary = notaryProxy.nodeInfo().legalIdentities.single()
for (index in 1..1000 step 10) {
bocProxy.startFlow(::CashIssueFlow, 1.DOLLARS, OpaqueBytes.of(1), notary).returnValue.getOrThrow()
logElapsedTime("$index: Backchain creation") {
repeat(index) {
bocProxy.startFlow(::CashPaymentFlow, 1.DOLLARS, notary, false).returnValue.getOrThrow()
notaryProxy.startFlow(::CashPaymentFlow, 1.DOLLARS, boc, false).returnValue.getOrThrow()
logElapsedTime("$index: BoC -> Big Corp") {
bocProxy.startFlow(::CashPaymentFlow, 1.DOLLARS, big, false).returnValue.getOrThrow()
private class TraderDemo {
private class TraderDemo {
@ -11,6 +11,7 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.WritableTransactionStorage
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.FlowProgressHandle
@ -7,7 +7,7 @@ import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.toFuture
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.core.internal.WritableTransactionStorage
import rx.Observable
import rx.Observable
import rx.subjects.PublishSubject
import rx.subjects.PublishSubject
import java.util.*
import java.util.*
Reference in New Issue
Block a user