Basic clean-up for demos, load-test and examples (#590)

* Fix RPC usage by marking unused Observables.
* Update Client RPC tutorial documentation to mention the importance of "notUsed" observables.
This commit is contained in:
Chris Rankin 2017-04-26 11:01:09 +01:00
parent f894730693
commit e8fdb3510f
11 changed files with 58 additions and 23 deletions

View File

@ -1,9 +1,9 @@
package net.corda.docs
import net.corda.client.rpc.notUsed
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Amount
import net.corda.core.contracts.USD
import net.corda.core.crypto.X509Utilities
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.node.CordaPluginRegistry
@ -106,11 +106,15 @@ fun main(args: Array<String>) {
// START 6
fun generateTransactions(proxy: CordaRPCOps) {
var ownedQuantity = proxy.vaultAndUpdates().first.fold(0L) { sum, state ->
val (vault, vaultUpdates) = proxy.vaultAndUpdates()
vaultUpdates.notUsed()
var ownedQuantity = vault.fold(0L) { sum, state ->
sum + (state.state.data as Cash.State).amount.quantity
}
val issueRef = OpaqueBytes.of(0)
val notary = proxy.networkMapUpdates().first.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity
val (parties, partyUpdates) = proxy.networkMapUpdates()
partyUpdates.notUsed()
val notary = parties.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity
val me = proxy.nodeIdentity().legalIdentity
while (true) {
Thread.sleep(1000)

View File

@ -61,7 +61,7 @@ Now we just need to create the transactions themselves!
:start-after: START 6
:end-before: END 6
We utilise several RPC functions here to query things like the notaries in the node cluster or our own vault.
We utilise several RPC functions here to query things like the notaries in the node cluster or our own vault. These RPC functions also return ``Observable`` objects so that the node can send us updated values. However, we don't need updates here and so we mark these observables as ``notUsed``. (As a rule, you should always either subscribe to an ``Observable`` or mark it as not used. Failing to do this will leak resources in the node.)
Then in a loop we generate randomly either an Issue, a Pay or an Exit transaction.

View File

@ -1,5 +1,6 @@
package net.corda.irs.api
import net.corda.client.rpc.notUsed
import net.corda.core.contracts.filterStatesOfType
import net.corda.core.crypto.Party
import net.corda.core.getOrThrow
@ -42,7 +43,9 @@ class InterestRateSwapAPI(val rpc: CordaRPCOps) {
private fun generateDealLink(deal: InterestRateSwap.State<*>) = "/api/irs/deals/" + deal.common.tradeID
private fun getDealByRef(ref: String): InterestRateSwap.State<*>? {
val states = rpc.vaultAndUpdates().first.filterStatesOfType<InterestRateSwap.State<*>>().filter { it.state.data.ref == ref }
val (vault, vaultUpdates) = rpc.vaultAndUpdates()
vaultUpdates.notUsed()
val states = vault.filterStatesOfType<InterestRateSwap.State<*>>().filter { it.state.data.ref == ref }
return if (states.isEmpty()) null else {
val deals = states.map { it.state.data }
return if (deals.isEmpty()) null else deals[0]
@ -50,7 +53,9 @@ class InterestRateSwapAPI(val rpc: CordaRPCOps) {
}
private fun getAllDeals(): Array<InterestRateSwap.State<*>> {
val states = rpc.vaultAndUpdates().first.filterStatesOfType<InterestRateSwap.State<*>>()
val (vault, vaultUpdates) = rpc.vaultAndUpdates()
vaultUpdates.notUsed()
val states = vault.filterStatesOfType<InterestRateSwap.State<*>>()
val swaps = states.map { it.state.data }.toTypedArray()
return swaps
}

View File

@ -26,7 +26,7 @@ class IRSDemoClientApi(private val hostAndPort: HostAndPort) {
fun runUploadRates() {
val fileContents = IOUtils.toString(Thread.currentThread().contextClassLoader.getResourceAsStream("example.rates.txt"), Charsets.UTF_8.name())
val url = URL("http://$hostAndPort/upload/interest-rates")
assert(uploadFile(url, fileContents))
check(uploadFile(url, fileContents))
}
private companion object {

View File

@ -4,13 +4,13 @@ import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import joptsimple.OptionParser
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.notUsed
import net.corda.core.crypto.toStringShort
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.BOB
import net.corda.flows.NotaryFlow
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.notarydemo.flows.DummyIssueAndMove
@ -30,11 +30,15 @@ fun main(args: Array<String>) {
/** Interface for using the notary demo API from a client. */
private class NotaryDemoClientApi(val rpc: CordaRPCOps) {
private val notary by lazy {
rpc.networkMapUpdates().first.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity
val (parties, partyUpdates) = rpc.networkMapUpdates()
partyUpdates.notUsed()
parties.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity
}
private val counterpartyNode by lazy {
rpc.networkMapUpdates().first.first { it.legalIdentity.name == "CN=Counterparty,O=R3,OU=corda,L=London,C=UK" }
val (parties, partyUpdates) = rpc.networkMapUpdates()
partyUpdates.notUsed()
parties.first { it.legalIdentity.name == "CN=Counterparty,O=R3,OU=corda,L=London,C=UK" }
}
private companion object {

View File

@ -1,6 +1,7 @@
package net.corda.vega.api
import com.opengamma.strata.basics.currency.MultiCurrencyAmount
import net.corda.client.rpc.notUsed
import net.corda.core.contracts.DealState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.filterStatesOfType
@ -35,7 +36,9 @@ class PortfolioApi(val rpc: CordaRPCOps) {
private val portfolioUtils = PortfolioApiUtils(ownParty)
private inline fun <reified T : DealState> dealsWith(party: AbstractParty): List<StateAndRef<T>> {
return rpc.vaultAndUpdates().first.filterStatesOfType<T>().filter { it.state.data.parties.any { it == party } }
val (vault, vaultUpdates) = rpc.vaultAndUpdates()
vaultUpdates.notUsed()
return vault.filterStatesOfType<T>().filter { it.state.data.parties.any { it == party } }
}
/**
@ -130,8 +133,8 @@ class PortfolioApi(val rpc: CordaRPCOps) {
Response.ok().entity(swaps.map {
it.toView(ownParty,
latestPortfolioStateData?.portfolio?.toStateAndRef<IRSState>(rpc)?.toPortfolio(),
PVs?.get(it.id.second.toString()) ?: MultiCurrencyAmount.empty(),
IMs?.get(it.id.second.toString()) ?: InitialMarginTriple.zero()
PVs?.get(it.id.second) ?: MultiCurrencyAmount.empty(),
IMs?.get(it.id.second) ?: InitialMarginTriple.zero()
)
}).build()
}
@ -247,7 +250,9 @@ class PortfolioApi(val rpc: CordaRPCOps) {
@Path("whoami")
@Produces(MediaType.APPLICATION_JSON)
fun getWhoAmI(): AvailableParties {
val counterParties = rpc.networkMapUpdates().first.filter {
val (parties, partyUpdates) = rpc.networkMapUpdates()
partyUpdates.notUsed()
val counterParties = parties.filter {
it.legalIdentity.name != DUMMY_MAP.name
&& it.legalIdentity.name != DUMMY_NOTARY.name
&& it.legalIdentity.name != ownParty.name

View File

@ -1,5 +1,6 @@
package net.corda.vega.portfolio
import net.corda.client.rpc.notUsed
import net.corda.core.contracts.*
import net.corda.core.crypto.Party
import net.corda.core.messaging.CordaRPCOps
@ -33,7 +34,9 @@ fun List<StateAndRef<IRSState>>.toPortfolio(): Portfolio {
}
inline fun <reified T : ContractState> List<StateRef>.toStateAndRef(rpc: CordaRPCOps): List<StateAndRef<T>> {
val stateRefs = rpc.vaultAndUpdates().first.associateBy { it.ref }
val (vault, vaultUpdates) = rpc.vaultAndUpdates()
vaultUpdates.notUsed()
val stateRefs = vault.associateBy { it.ref }
return mapNotNull { stateRefs[it] }.filterStatesOfType<T>()
}

View File

@ -1,6 +1,7 @@
package net.corda.traderdemo
import com.google.common.util.concurrent.Futures
import net.corda.client.rpc.notUsed
import net.corda.contracts.CommercialPaper
import net.corda.contracts.asset.Cash
import net.corda.contracts.testing.calculateRandomlySizedAmounts
@ -18,7 +19,6 @@ import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.testing.BOC
import net.corda.traderdemo.flow.SellerFlow
import java.util.*
import kotlin.test.assertEquals
/**
* Interface for communicating with nodes running the trader demo.
@ -28,11 +28,19 @@ class TraderDemoClientApi(val rpc: CordaRPCOps) {
val logger = loggerFor<TraderDemoClientApi>()
}
val cashCount: Int get() = rpc.vaultAndUpdates().first.filterStatesOfType<Cash.State>().size
val cashCount: Int get() {
val (vault, vaultUpdates) = rpc.vaultAndUpdates()
vaultUpdates.notUsed()
return vault.filterStatesOfType<Cash.State>().size
}
val dollarCashBalance: Amount<Currency> get() = rpc.getCashBalances()[USD]!!
val commercialPaperCount: Int get() = rpc.vaultAndUpdates().first.filterStatesOfType<CommercialPaper.State>().size
val commercialPaperCount: Int get() {
val (vault, vaultUpdates) = rpc.vaultAndUpdates()
vaultUpdates.notUsed()
return vault.filterStatesOfType<CommercialPaper.State>().size
}
fun runBuyer(amount: Amount<Currency> = 30000.DOLLARS) {
val bankOfCordaParty = rpc.partyFromName(BOC.name)
@ -58,7 +66,7 @@ class TraderDemoClientApi(val rpc: CordaRPCOps) {
if (!rpc.attachmentExists(SellerFlow.PROSPECTUS_HASH)) {
javaClass.classLoader.getResourceAsStream("bank-of-london-cp.jar").use {
val id = rpc.uploadAttachment(it)
assertEquals(SellerFlow.PROSPECTUS_HASH, id)
check(SellerFlow.PROSPECTUS_HASH == id)
}
}

View File

@ -1,6 +1,7 @@
package net.corda.loadtest
import net.corda.client.mock.Generator
import net.corda.client.rpc.notUsed
import net.corda.core.crypto.toBase58String
import net.corda.node.driver.PortAllocation
import net.corda.node.services.network.NetworkMapService
@ -86,7 +87,7 @@ data class LoadTest<T, S>(
log.info("$count remaining commands, state:\n$state")
// Generate commands
val commands = nodes.generate(state, parameters.parallelism).generate(random).getOrThrow()
require(commands.size > 0)
require(commands.isNotEmpty())
log.info("Generated command batch of size ${commands.size}: $commands")
// Interpret commands
val newState = commands.fold(state, interpret)
@ -170,7 +171,8 @@ fun runLoadTests(configuration: LoadTestConfiguration, tests: List<Pair<LoadTest
log.info("Getting node info of ${connection.hostName}")
val nodeInfo = connection.proxy.nodeIdentity()
log.info("Got node info of ${connection.hostName}: $nodeInfo!")
val otherNodeInfos = connection.proxy.networkMapUpdates().first
val (otherNodeInfos, nodeInfoUpdates) = connection.proxy.networkMapUpdates()
nodeInfoUpdates.notUsed()
val pubkeysString = otherNodeInfos.map {
" ${it.legalIdentity.name}: ${it.legalIdentity.owningKey.toBase58String()}"
}.joinToString("\n")

View File

@ -2,6 +2,7 @@ package net.corda.loadtest.tests
import net.corda.client.mock.Generator
import net.corda.client.mock.pickN
import net.corda.client.rpc.notUsed
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Issued
import net.corda.core.contracts.PartyAndReference
@ -218,7 +219,8 @@ val crossCashTest = LoadTest<CrossCashCommand, CrossCashState>(
val currentNodeVaults = HashMap<AbstractParty, HashMap<AbstractParty, Long>>()
simpleNodes.forEach {
val quantities = HashMap<AbstractParty, Long>()
val vault = it.connection.proxy.vaultAndUpdates().first
val (vault, vaultUpdates) = it.connection.proxy.vaultAndUpdates()
vaultUpdates.notUsed()
vault.forEach {
val state = it.state.data
if (state is Cash.State) {

View File

@ -4,6 +4,7 @@ import de.danielbechler.diff.ObjectDifferFactory
import net.corda.client.mock.Generator
import net.corda.client.mock.pickOne
import net.corda.client.mock.replicatePoisson
import net.corda.client.rpc.notUsed
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.USD
import net.corda.core.crypto.AbstractParty
@ -70,7 +71,8 @@ val selfIssueTest = LoadTest<SelfIssueCommand, SelfIssueState>(
gatherRemoteState = { previousState ->
val selfIssueVaults = HashMap<AbstractParty, Long>()
simpleNodes.forEach { (_, connection, info) ->
val vault = connection.proxy.vaultAndUpdates().first
val (vault, vaultUpdates) = connection.proxy.vaultAndUpdates()
vaultUpdates.notUsed()
vault.forEach {
val state = it.state.data
if (state is Cash.State) {