Removing calls to the internal API - CordaFutureImpl (#1395)

* Removing calls to the internal API - CordaFutureImpl

* Addressing review comments

* Addressing review comments
This commit is contained in:
mkit 2017-09-05 07:47:22 +01:00 committed by GitHub
parent 0df93e6e50
commit 7c9825eab1
13 changed files with 98 additions and 107 deletions

View File

@ -1,17 +1,16 @@
package net.corda.attachmentdemo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.FlowPermissions.Companion.startFlowPermission
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.nodeapi.User
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.DUMMY_BANK_B
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.driver.driver
import net.corda.node.services.FlowPermissions.Companion.startFlowPermission
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.nodeapi.User
import org.junit.Test
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletableFuture.supplyAsync
class AttachmentDemoTest {
// run with a 10,000,000 bytes in-memory zip file. In practice, a slightly bigger file will be used (~10,002,000 bytes).
@ -19,19 +18,18 @@ class AttachmentDemoTest {
val numOfExpectedBytes = 10_000_000
driver(dsl = {
val demoUser = listOf(User("demo", "demo", setOf(startFlowPermission<AttachmentDemoFlow>())))
val (nodeA, nodeB) = listOf(
startNode(providedName = DUMMY_BANK_A.name, rpcUsers = demoUser),
startNode(providedName = DUMMY_BANK_B.name, rpcUsers = demoUser),
startNode(providedName = DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
).transpose().getOrThrow()
val notaryFuture = startNode(providedName = DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
val nodeAFuture = startNode(providedName = DUMMY_BANK_A.name, rpcUsers = demoUser)
val nodeBFuture = startNode(providedName = DUMMY_BANK_B.name, rpcUsers = demoUser)
val (nodeA, nodeB) = listOf(nodeAFuture, nodeBFuture, notaryFuture).map { it.getOrThrow() }
val senderThread = CompletableFuture.supplyAsync {
val senderThread = supplyAsync {
nodeA.rpcClientToNode().start(demoUser[0].username, demoUser[0].password).use {
sender(it.proxy, numOfExpectedBytes)
}
}.exceptionally { it.printStackTrace() }
val recipientThread = CompletableFuture.supplyAsync {
val recipientThread = supplyAsync {
nodeB.rpcClientToNode().start(demoUser[0].username, demoUser[0].password).use {
recipient(it.proxy)
}

View File

@ -3,11 +3,10 @@ package net.corda.bank
import net.corda.bank.api.BankOfCordaClientApi
import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams
import net.corda.core.node.services.ServiceInfo
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow
import net.corda.testing.driver.driver
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.testing.BOC
import net.corda.testing.driver.driver
import org.junit.Test
import kotlin.test.assertTrue
@ -15,10 +14,9 @@ class BankOfCordaHttpAPITest {
@Test
fun `issuer flow via Http`() {
driver(dsl = {
val (nodeBankOfCorda) = listOf(
startNode(providedName = BOC.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type))),
startNode(providedName = BIGCORP_LEGAL_NAME)
).transpose().getOrThrow()
val bigCorpNodeFuture = startNode(providedName = BIGCORP_LEGAL_NAME)
val nodeBankOfCordaFuture = startNode(providedName = BOC.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
val (nodeBankOfCorda) = listOf(nodeBankOfCordaFuture, bigCorpNodeFuture).map { it.getOrThrow() }
val anonymous = false
val nodeBankOfCordaApiAddr = startWebserver(nodeBankOfCorda).getOrThrow().listenAddress
assertTrue(BankOfCordaClientApi(nodeBankOfCordaApiAddr).requestWebIssue(IssueRequestParams(1000, "USD", BIGCORP_LEGAL_NAME, "1", BOC.name, BOC.name, anonymous)))

View File

@ -1,6 +1,5 @@
package net.corda.bank
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.Vault
@ -23,12 +22,9 @@ class BankOfCordaRPCClientTest {
val bocManager = User("bocManager", "password1", permissions = setOf(
startFlowPermission<CashIssueAndPaymentFlow>()))
val bigCorpCFO = User("bigCorpCFO", "password2", permissions = emptySet())
val (nodeBankOfCorda, nodeBigCorporation) = listOf(
startNode(providedName = BOC.name,
advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)),
rpcUsers = listOf(bocManager)),
startNode(providedName = BIGCORP_LEGAL_NAME, rpcUsers = listOf(bigCorpCFO))
).transpose().getOrThrow()
val nodeBankOfCordaFuture = startNode(providedName = BOC.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)), rpcUsers = listOf(bocManager))
val nodeBigCorporationFuture = startNode(providedName = BIGCORP_LEGAL_NAME, rpcUsers = listOf(bigCorpCFO))
val (nodeBankOfCorda, nodeBigCorporation) = listOf(nodeBankOfCordaFuture, nodeBigCorporationFuture).map { it.getOrThrow() }
// Bank of Corda RPC Client
val bocClient = nodeBankOfCorda.rpcClientToNode()

View File

@ -1,7 +1,6 @@
package net.corda.irs
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.vaultTrackBy
import net.corda.core.node.services.ServiceInfo
import net.corda.core.toFuture
@ -44,19 +43,20 @@ class IRSDemoTest : IntegrationTestCategory {
@Test
fun `runs IRS demo`() {
driver(useTestClock = true, isDebug = true) {
val (controller, nodeA, nodeB) = listOf(
startNode(providedName = DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type), ServiceInfo(NodeInterestRates.Oracle.type))),
startNode(providedName = DUMMY_BANK_A.name, rpcUsers = listOf(rpcUser)),
startNode(providedName = DUMMY_BANK_B.name)
).transpose().getOrThrow()
val controllerFuture = startNode(
providedName = DUMMY_NOTARY.name,
advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type), ServiceInfo(NodeInterestRates.Oracle.type)))
val nodeAFuture = startNode(providedName = DUMMY_BANK_A.name, rpcUsers = listOf(rpcUser))
val nodeBFuture = startNode(providedName = DUMMY_BANK_B.name)
val (controller, nodeA, nodeB) = listOf(controllerFuture, nodeAFuture, nodeBFuture).map { it.getOrThrow() }
log.info("All nodes started")
val (controllerAddr, nodeAAddr, nodeBAddr) = listOf(
startWebserver(controller),
startWebserver(nodeA),
startWebserver(nodeB)
).transpose().getOrThrow().map { it.listenAddress }
val controllerAddrFuture = startWebserver(controller)
val nodeAAddrFuture = startWebserver(nodeA)
val nodeBAddrFuture = startWebserver(nodeB)
val (controllerAddr, nodeAAddr, nodeBAddr) =
listOf(controllerAddrFuture, nodeAAddrFuture, nodeBAddrFuture).map { it.getOrThrow().listenAddress }
log.info("All webservers started")

View File

@ -1,6 +1,5 @@
package net.corda.irs
import net.corda.core.internal.concurrent.transpose
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.getOrThrow
import net.corda.testing.DUMMY_BANK_A
@ -16,11 +15,12 @@ import net.corda.testing.driver.driver
*/
fun main(args: Array<String>) {
driver(dsl = {
val (controller, nodeA, nodeB) = listOf(
startNode(providedName = DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type), ServiceInfo(NodeInterestRates.Oracle.type))),
startNode(providedName = DUMMY_BANK_A.name),
startNode(providedName = DUMMY_BANK_B.name)
).transpose().getOrThrow()
val controllerFuture = startNode(
providedName = DUMMY_NOTARY.name,
advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type), ServiceInfo(NodeInterestRates.Oracle.type)))
val nodeAFuture = startNode(providedName = DUMMY_BANK_A.name)
val nodeBFuture = startNode(providedName = DUMMY_BANK_B.name)
val (controller, nodeA, nodeB) = listOf(controllerFuture, nodeAFuture, nodeBFuture).map { it.getOrThrow() }
startWebserver(controller)
startWebserver(nodeA)

View File

@ -171,7 +171,7 @@ class NetworkMapVisualiser : Application() {
onNextInvoked()
}
}
viewModel.simulation.networkInitialisationFinished.then {
viewModel.simulation.networkInitialisationFinished.thenAccept {
view.simulateInitialisationCheckbox.isVisible = false
}
view.runPauseButton.setOnAction {

View File

@ -3,14 +3,12 @@ package net.corda.netmap.simulation
import co.paralleluniverse.fibers.Suspendable
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateAndRef
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.*
import net.corda.core.node.services.queryBy
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
@ -28,6 +26,8 @@ import rx.Observable
import java.security.PublicKey
import java.time.LocalDate
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletableFuture.allOf
/**
@ -42,46 +42,58 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
private val executeOnNextIteration = Collections.synchronizedList(LinkedList<() -> Unit>())
override fun startMainSimulation(): CordaFuture<Unit> {
override fun startMainSimulation(): CompletableFuture<Unit> {
// TODO: Determine why this isn't happening via the network map
mockNet.nodes.map { it.services.identityService }.forEach { service ->
mockNet.nodes.forEach { node -> service.registerIdentity(node.info.legalIdentityAndCert) }
mockNet.nodes.forEach { node -> service.verifyAndRegisterIdentity(node.info.legalIdentityAndCert) }
}
val future = openFuture<Unit>()
om = JacksonSupport.createInMemoryMapper(InMemoryIdentityService((banks + regulators + networkMap).map { it.info.legalIdentityAndCert }, trustRoot = DUMMY_CA.certificate))
registerFinanceJSONMappers(om)
startIRSDealBetween(0, 1).thenMatch({
return startIRSDealBetween(0, 1).thenCompose {
val future = CompletableFuture<Unit>()
// Next iteration is a pause.
executeOnNextIteration.add {}
executeOnNextIteration.add {
// Keep fixing until there's no more left to do.
val initialFixFuture = doNextFixing(0, 1)
fun onFailure(t: Throwable) {
future.setException(t) // Propagate the error.
future.completeExceptionally(t)
}
fun onSuccess(result: Unit?) {
fun onSuccess() {
// Pause for an iteration.
executeOnNextIteration.add {}
executeOnNextIteration.add {
val f = doNextFixing(0, 1)
if (f != null) {
f.thenMatch(::onSuccess, ::onFailure)
f.handle { _, throwable ->
if (throwable == null) {
onSuccess()
} else {
onFailure(throwable)
}
}
} else {
// All done!
future.set(Unit)
future.complete(Unit)
}
}
}
initialFixFuture!!.thenMatch(::onSuccess, ::onFailure)
initialFixFuture!!.handle { _, throwable ->
if (throwable == null) {
onSuccess()
} else {
onFailure(throwable)
}
}
}
}, {})
return future
future
}
}
private fun doNextFixing(i: Int, j: Int): CordaFuture<Unit>? {
private fun doNextFixing(i: Int, j: Int): CompletableFuture<Void>? {
println("Doing a fixing between $i and $j")
val node1: SimulatedNode = banks[i]
val node2: SimulatedNode = banks[j]
@ -107,10 +119,10 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
if (nextFixingDate > currentDateAndTime.toLocalDate())
currentDateAndTime = nextFixingDate.atTime(15, 0)
return listOf(futA, futB).transpose().map { Unit }
return allOf(futA.toCompletableFuture(), futB.toCompletableFuture())
}
private fun startIRSDealBetween(i: Int, j: Int): CordaFuture<SignedTransaction> {
private fun startIRSDealBetween(i: Int, j: Int): CompletableFuture<SignedTransaction> {
val node1: SimulatedNode = banks[i]
val node2: SimulatedNode = banks[j]
@ -141,8 +153,8 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
val acceptDealFlows: Observable<AcceptDealFlow> = node2.registerInitiatedFlow(AcceptDealFlow::class.java)
@Suppress("UNCHECKED_CAST")
val acceptorTxFuture = acceptDealFlows.toFuture().flatMap {
(it.stateMachine as FlowStateMachine<SignedTransaction>).resultFuture
val acceptorTxFuture = acceptDealFlows.toFuture().toCompletableFuture().thenCompose {
(it.stateMachine as FlowStateMachine<SignedTransaction>).resultFuture.toCompletableFuture()
}
showProgressFor(listOf(node1, node2))
@ -154,7 +166,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
node1.services.legalIdentityKey)
val instigatorTxFuture = node1.services.startFlow(instigator).resultFuture
return listOf(instigatorTxFuture, acceptorTxFuture).transpose().flatMap { instigatorTxFuture }
return allOf(instigatorTxFuture.toCompletableFuture(), acceptorTxFuture).thenCompose { instigatorTxFuture.toCompletableFuture() }
}
override fun iterate(): InMemoryMessagingNetwork.MessageTransfer? {

View File

@ -1,6 +1,5 @@
package net.corda.netmap.simulation
import net.corda.core.concurrent.CordaFuture
import net.corda.core.utilities.locationOrNull
import net.corda.core.flows.FlowLogic
import net.corda.core.messaging.SingleMessageRecipient
@ -8,9 +7,6 @@ import net.corda.core.node.CityDatabase
import net.corda.core.node.WorldMapLocation
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.containsType
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.transpose
import net.corda.testing.DUMMY_MAP
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.DUMMY_REGULATOR
@ -34,6 +30,9 @@ import java.time.LocalDate
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletableFuture.allOf
import java.util.concurrent.Future
/**
* Base class for network simulations that are based on the unit test / mock environment.
@ -261,21 +260,19 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
}
}
val networkInitialisationFinished = mockNet.nodes.map { it.nodeReadyFuture }.transpose()
val networkInitialisationFinished = allOf(*mockNet.nodes.map { it.nodeReadyFuture.toCompletableFuture() }.toTypedArray())
fun start(): CordaFuture<Unit> {
fun start(): Future<Unit> {
mockNet.startNodes()
// Wait for all the nodes to have finished registering with the network map service.
return networkInitialisationFinished.flatMap { startMainSimulation() }
return networkInitialisationFinished.thenCompose { startMainSimulation() }
}
/**
* Sub-classes should override this to trigger whatever they want to simulate. This method will be invoked once the
* network bringup has been simulated.
*/
protected open fun startMainSimulation(): CordaFuture<Unit> {
return doneFuture(Unit)
}
protected abstract fun startMainSimulation(): CompletableFuture<Unit>
fun stop() {
mockNet.stopNodes()

View File

@ -1,19 +1,16 @@
package net.corda.notarydemo
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.notUsed
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.toStringShort
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.notarydemo.flows.DummyIssueAndMove
import net.corda.notarydemo.flows.RPCStartableNotaryFlowClient
import net.corda.testing.BOB
import java.util.concurrent.Future
import kotlin.streams.asSequence
fun main(args: Array<String>) {
@ -53,9 +50,10 @@ private class NotaryDemoClientApi(val rpc: CordaRPCOps) {
* as it consumes the original asset and creates a copy with the new owner as its output.
*/
private fun buildTransactions(count: Int): List<SignedTransaction> {
return (1..count).map {
val flowFutures = (1..count).map {
rpc.startFlow(::DummyIssueAndMove, notary, counterpartyNode.legalIdentity, it).returnValue
}.transpose().getOrThrow()
}
return flowFutures.map { it.getOrThrow() }
}
/**
@ -64,9 +62,9 @@ private class NotaryDemoClientApi(val rpc: CordaRPCOps) {
*
* @return a list of encoded signer public keys - one for every transaction
*/
private fun notariseTransactions(transactions: List<SignedTransaction>): List<CordaFuture<List<String>>> {
private fun notariseTransactions(transactions: List<SignedTransaction>): List<Future<List<String>>> {
return transactions.map {
rpc.startFlow(::RPCStartableNotaryFlowClient, it).returnValue.map { it.map { it.by.toStringShort() } }
rpc.startFlow(::RPCStartableNotaryFlowClient, it).returnValue.toCompletableFuture().thenApply { it.map { it.by.toStringShort() } }
}
}
}

View File

@ -2,7 +2,6 @@ package net.corda.vega
import com.opengamma.strata.product.common.BuySell
import net.corda.core.node.services.ServiceInfo
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.DUMMY_BANK_B
@ -34,10 +33,13 @@ class SimmValuationTest : IntegrationTestCategory {
fun `runs SIMM valuation demo`() {
driver(isDebug = true) {
startNode(providedName = DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type))).getOrThrow()
val (nodeA, nodeB) = listOf(startNode(providedName = nodeALegalName), startNode(providedName = nodeBLegalName)).transpose().getOrThrow()
val (nodeAApi, nodeBApi) = listOf(startWebserver(nodeA), startWebserver(nodeB)).transpose()
.getOrThrow()
.map { HttpApi.fromHostAndPort(it.listenAddress, "api/simmvaluationdemo") }
val nodeAFuture = startNode(providedName = nodeALegalName)
val nodeBFuture = startNode(providedName = nodeBLegalName)
val (nodeA, nodeB) = listOf(nodeAFuture, nodeBFuture).map { it.getOrThrow() }
val nodeAWebServerFuture = startWebserver(nodeA)
val nodeBWebServerFuture = startWebserver(nodeB)
val nodeAApi = HttpApi.fromHostAndPort(nodeAWebServerFuture.getOrThrow().listenAddress, "api/simmvaluationdemo")
val nodeBApi = HttpApi.fromHostAndPort(nodeBWebServerFuture.getOrThrow().listenAddress, "api/simmvaluationdemo")
val nodeBParty = getPartyWithName(nodeAApi, nodeBLegalName)
val nodeAParty = getPartyWithName(nodeBApi, nodeALegalName)
@ -55,7 +57,7 @@ class SimmValuationTest : IntegrationTestCategory {
}
private fun getAvailablePartiesFor(partyApi: HttpApi): PortfolioApi.AvailableParties {
return partyApi.getJson<PortfolioApi.AvailableParties>("whoami")
return partyApi.getJson("whoami")
}
private fun createTradeBetween(partyApi: HttpApi, counterparty: PortfolioApi.ApiParty, tradeId: String): Boolean {

View File

@ -1,13 +1,7 @@
package net.corda.vega.analytics
fun compareIMTriples(a: InitialMarginTriple, b: InitialMarginTriple): Boolean {
if (a.first is Double && a.second is Double && a.third is Double &&
b.first is Double && b.second is Double && b.third is Double) {
if (withinTolerance(a.first, b.first) && withinTolerance(a.second, b.second) && withinTolerance(a.third, b.third)) {
return true
}
}
return false
return withinTolerance(a.first, b.first) && withinTolerance(a.second, b.second) && withinTolerance(a.third, b.third)
}
// TODO: Do this correctly

View File

@ -1,6 +1,5 @@
package net.corda.vega
import net.corda.core.internal.concurrent.transpose
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.getOrThrow
import net.corda.testing.DUMMY_BANK_A
@ -17,12 +16,11 @@ import net.corda.testing.driver.driver
*/
fun main(args: Array<String>) {
driver(dsl = {
startNode(providedName = DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
val (nodeA, nodeB, nodeC) = listOf(
startNode(providedName = DUMMY_BANK_A.name),
startNode(providedName = DUMMY_BANK_B.name),
startNode(providedName = DUMMY_BANK_C.name)
).transpose().getOrThrow()
val notaryFuture = startNode(providedName = DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
val nodeAFuture = startNode(providedName = DUMMY_BANK_A.name)
val nodeBFuture = startNode(providedName = DUMMY_BANK_B.name)
val nodeCFuture = startNode(providedName = DUMMY_BANK_C.name)
val (nodeA, nodeB, nodeC) = listOf(nodeAFuture, nodeBFuture, nodeCFuture, notaryFuture).map { it.getOrThrow() }
startWebserver(nodeA)
startWebserver(nodeB)

View File

@ -1,7 +1,6 @@
package net.corda.traderdemo
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.internal.concurrent.transpose
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
@ -32,12 +31,11 @@ class TraderDemoTest : NodeBasedTest() {
startFlowPermission<CashIssueFlow>(),
startFlowPermission<CashPaymentFlow>(),
startFlowPermission<CommercialPaperIssueFlow>()))
val (nodeA, nodeB, bankNode) = listOf(
startNode(DUMMY_BANK_A.name, rpcUsers = listOf(demoUser)),
startNode(DUMMY_BANK_B.name, rpcUsers = listOf(demoUser)),
startNode(BOC.name, rpcUsers = listOf(bankUser)),
startNode(DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
).transpose().getOrThrow()
val notaryFuture = startNode(DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
val nodeAFuture = startNode(DUMMY_BANK_A.name, rpcUsers = listOf(demoUser))
val nodeBFuture = startNode(DUMMY_BANK_B.name, rpcUsers = listOf(demoUser))
val bankNodeFuture = startNode(BOC.name, rpcUsers = listOf(bankUser))
val (nodeA, nodeB, bankNode) = listOf(nodeAFuture, nodeBFuture, bankNodeFuture, notaryFuture).map { it.getOrThrow() }
nodeA.registerInitiatedFlow(BuyerFlow::class.java)