Basic clean-up for demos, load-test and examples (#590)

* Fix RPC usage by marking unused Observables.
* Update Client RPC tutorial documentation to mention the importance of "notUsed" observables.
This commit is contained in:
Chris Rankin 2017-04-26 11:01:09 +01:00 committed by GitHub
parent c5a9312e07
commit af36e0f731
11 changed files with 58 additions and 23 deletions

View File

@ -1,9 +1,9 @@
package net.corda.docs package net.corda.docs
import net.corda.client.rpc.notUsed
import net.corda.contracts.asset.Cash import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Amount import net.corda.core.contracts.Amount
import net.corda.core.contracts.USD import net.corda.core.contracts.USD
import net.corda.core.crypto.X509Utilities
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.node.CordaPluginRegistry import net.corda.core.node.CordaPluginRegistry
@ -106,11 +106,15 @@ fun main(args: Array<String>) {
// START 6 // START 6
fun generateTransactions(proxy: CordaRPCOps) { fun generateTransactions(proxy: CordaRPCOps) {
var ownedQuantity = proxy.vaultAndUpdates().first.fold(0L) { sum, state -> val (vault, vaultUpdates) = proxy.vaultAndUpdates()
vaultUpdates.notUsed()
var ownedQuantity = vault.fold(0L) { sum, state ->
sum + (state.state.data as Cash.State).amount.quantity sum + (state.state.data as Cash.State).amount.quantity
} }
val issueRef = OpaqueBytes.of(0) val issueRef = OpaqueBytes.of(0)
val notary = proxy.networkMapUpdates().first.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity val (parties, partyUpdates) = proxy.networkMapUpdates()
partyUpdates.notUsed()
val notary = parties.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity
val me = proxy.nodeIdentity().legalIdentity val me = proxy.nodeIdentity().legalIdentity
while (true) { while (true) {
Thread.sleep(1000) Thread.sleep(1000)

View File

@ -61,7 +61,7 @@ Now we just need to create the transactions themselves!
:start-after: START 6 :start-after: START 6
:end-before: END 6 :end-before: END 6
We utilise several RPC functions here to query things like the notaries in the node cluster or our own vault. We utilise several RPC functions here to query things like the notaries in the node cluster or our own vault. These RPC functions also return ``Observable`` objects so that the node can send us updated values. However, we don't need updates here and so we mark these observables as ``notUsed``. (As a rule, you should always either subscribe to an ``Observable`` or mark it as not used. Failing to do this will leak resources in the node.)
Then in a loop we generate randomly either an Issue, a Pay or an Exit transaction. Then in a loop we generate randomly either an Issue, a Pay or an Exit transaction.

View File

@ -1,5 +1,6 @@
package net.corda.irs.api package net.corda.irs.api
import net.corda.client.rpc.notUsed
import net.corda.core.contracts.filterStatesOfType import net.corda.core.contracts.filterStatesOfType
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.getOrThrow import net.corda.core.getOrThrow
@ -42,7 +43,9 @@ class InterestRateSwapAPI(val rpc: CordaRPCOps) {
private fun generateDealLink(deal: InterestRateSwap.State<*>) = "/api/irs/deals/" + deal.common.tradeID private fun generateDealLink(deal: InterestRateSwap.State<*>) = "/api/irs/deals/" + deal.common.tradeID
private fun getDealByRef(ref: String): InterestRateSwap.State<*>? { private fun getDealByRef(ref: String): InterestRateSwap.State<*>? {
val states = rpc.vaultAndUpdates().first.filterStatesOfType<InterestRateSwap.State<*>>().filter { it.state.data.ref == ref } val (vault, vaultUpdates) = rpc.vaultAndUpdates()
vaultUpdates.notUsed()
val states = vault.filterStatesOfType<InterestRateSwap.State<*>>().filter { it.state.data.ref == ref }
return if (states.isEmpty()) null else { return if (states.isEmpty()) null else {
val deals = states.map { it.state.data } val deals = states.map { it.state.data }
return if (deals.isEmpty()) null else deals[0] return if (deals.isEmpty()) null else deals[0]
@ -50,7 +53,9 @@ class InterestRateSwapAPI(val rpc: CordaRPCOps) {
} }
private fun getAllDeals(): Array<InterestRateSwap.State<*>> { private fun getAllDeals(): Array<InterestRateSwap.State<*>> {
val states = rpc.vaultAndUpdates().first.filterStatesOfType<InterestRateSwap.State<*>>() val (vault, vaultUpdates) = rpc.vaultAndUpdates()
vaultUpdates.notUsed()
val states = vault.filterStatesOfType<InterestRateSwap.State<*>>()
val swaps = states.map { it.state.data }.toTypedArray() val swaps = states.map { it.state.data }.toTypedArray()
return swaps return swaps
} }

View File

@ -26,7 +26,7 @@ class IRSDemoClientApi(private val hostAndPort: HostAndPort) {
fun runUploadRates() { fun runUploadRates() {
val fileContents = IOUtils.toString(Thread.currentThread().contextClassLoader.getResourceAsStream("example.rates.txt"), Charsets.UTF_8.name()) val fileContents = IOUtils.toString(Thread.currentThread().contextClassLoader.getResourceAsStream("example.rates.txt"), Charsets.UTF_8.name())
val url = URL("http://$hostAndPort/upload/interest-rates") val url = URL("http://$hostAndPort/upload/interest-rates")
assert(uploadFile(url, fileContents)) check(uploadFile(url, fileContents))
} }
private companion object { private companion object {

View File

@ -4,13 +4,13 @@ import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.Futures
import joptsimple.OptionParser import joptsimple.OptionParser
import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.notUsed
import net.corda.core.crypto.toStringShort import net.corda.core.crypto.toStringShort
import net.corda.core.div import net.corda.core.div
import net.corda.core.getOrThrow import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.BOB
import net.corda.flows.NotaryFlow import net.corda.flows.NotaryFlow
import net.corda.nodeapi.config.SSLConfiguration import net.corda.nodeapi.config.SSLConfiguration
import net.corda.notarydemo.flows.DummyIssueAndMove import net.corda.notarydemo.flows.DummyIssueAndMove
@ -30,11 +30,15 @@ fun main(args: Array<String>) {
/** Interface for using the notary demo API from a client. */ /** Interface for using the notary demo API from a client. */
private class NotaryDemoClientApi(val rpc: CordaRPCOps) { private class NotaryDemoClientApi(val rpc: CordaRPCOps) {
private val notary by lazy { private val notary by lazy {
rpc.networkMapUpdates().first.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity val (parties, partyUpdates) = rpc.networkMapUpdates()
partyUpdates.notUsed()
parties.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity
} }
private val counterpartyNode by lazy { private val counterpartyNode by lazy {
rpc.networkMapUpdates().first.first { it.legalIdentity.name == "CN=Counterparty,O=R3,OU=corda,L=London,C=UK" } val (parties, partyUpdates) = rpc.networkMapUpdates()
partyUpdates.notUsed()
parties.first { it.legalIdentity.name == "CN=Counterparty,O=R3,OU=corda,L=London,C=UK" }
} }
private companion object { private companion object {

View File

@ -1,6 +1,7 @@
package net.corda.vega.api package net.corda.vega.api
import com.opengamma.strata.basics.currency.MultiCurrencyAmount import com.opengamma.strata.basics.currency.MultiCurrencyAmount
import net.corda.client.rpc.notUsed
import net.corda.core.contracts.DealState import net.corda.core.contracts.DealState
import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.filterStatesOfType import net.corda.core.contracts.filterStatesOfType
@ -35,7 +36,9 @@ class PortfolioApi(val rpc: CordaRPCOps) {
private val portfolioUtils = PortfolioApiUtils(ownParty) private val portfolioUtils = PortfolioApiUtils(ownParty)
private inline fun <reified T : DealState> dealsWith(party: AbstractParty): List<StateAndRef<T>> { private inline fun <reified T : DealState> dealsWith(party: AbstractParty): List<StateAndRef<T>> {
return rpc.vaultAndUpdates().first.filterStatesOfType<T>().filter { it.state.data.parties.any { it == party } } val (vault, vaultUpdates) = rpc.vaultAndUpdates()
vaultUpdates.notUsed()
return vault.filterStatesOfType<T>().filter { it.state.data.parties.any { it == party } }
} }
/** /**
@ -130,8 +133,8 @@ class PortfolioApi(val rpc: CordaRPCOps) {
Response.ok().entity(swaps.map { Response.ok().entity(swaps.map {
it.toView(ownParty, it.toView(ownParty,
latestPortfolioStateData?.portfolio?.toStateAndRef<IRSState>(rpc)?.toPortfolio(), latestPortfolioStateData?.portfolio?.toStateAndRef<IRSState>(rpc)?.toPortfolio(),
PVs?.get(it.id.second.toString()) ?: MultiCurrencyAmount.empty(), PVs?.get(it.id.second) ?: MultiCurrencyAmount.empty(),
IMs?.get(it.id.second.toString()) ?: InitialMarginTriple.zero() IMs?.get(it.id.second) ?: InitialMarginTriple.zero()
) )
}).build() }).build()
} }
@ -247,7 +250,9 @@ class PortfolioApi(val rpc: CordaRPCOps) {
@Path("whoami") @Path("whoami")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
fun getWhoAmI(): AvailableParties { fun getWhoAmI(): AvailableParties {
val counterParties = rpc.networkMapUpdates().first.filter { val (parties, partyUpdates) = rpc.networkMapUpdates()
partyUpdates.notUsed()
val counterParties = parties.filter {
it.legalIdentity.name != DUMMY_MAP.name it.legalIdentity.name != DUMMY_MAP.name
&& it.legalIdentity.name != DUMMY_NOTARY.name && it.legalIdentity.name != DUMMY_NOTARY.name
&& it.legalIdentity.name != ownParty.name && it.legalIdentity.name != ownParty.name

View File

@ -1,5 +1,6 @@
package net.corda.vega.portfolio package net.corda.vega.portfolio
import net.corda.client.rpc.notUsed
import net.corda.core.contracts.* import net.corda.core.contracts.*
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
@ -33,7 +34,9 @@ fun List<StateAndRef<IRSState>>.toPortfolio(): Portfolio {
} }
inline fun <reified T : ContractState> List<StateRef>.toStateAndRef(rpc: CordaRPCOps): List<StateAndRef<T>> { inline fun <reified T : ContractState> List<StateRef>.toStateAndRef(rpc: CordaRPCOps): List<StateAndRef<T>> {
val stateRefs = rpc.vaultAndUpdates().first.associateBy { it.ref } val (vault, vaultUpdates) = rpc.vaultAndUpdates()
vaultUpdates.notUsed()
val stateRefs = vault.associateBy { it.ref }
return mapNotNull { stateRefs[it] }.filterStatesOfType<T>() return mapNotNull { stateRefs[it] }.filterStatesOfType<T>()
} }

View File

@ -1,6 +1,7 @@
package net.corda.traderdemo package net.corda.traderdemo
import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.Futures
import net.corda.client.rpc.notUsed
import net.corda.contracts.CommercialPaper import net.corda.contracts.CommercialPaper
import net.corda.contracts.asset.Cash import net.corda.contracts.asset.Cash
import net.corda.contracts.testing.calculateRandomlySizedAmounts import net.corda.contracts.testing.calculateRandomlySizedAmounts
@ -18,7 +19,6 @@ import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.testing.BOC import net.corda.testing.BOC
import net.corda.traderdemo.flow.SellerFlow import net.corda.traderdemo.flow.SellerFlow
import java.util.* import java.util.*
import kotlin.test.assertEquals
/** /**
* Interface for communicating with nodes running the trader demo. * Interface for communicating with nodes running the trader demo.
@ -28,11 +28,19 @@ class TraderDemoClientApi(val rpc: CordaRPCOps) {
val logger = loggerFor<TraderDemoClientApi>() val logger = loggerFor<TraderDemoClientApi>()
} }
val cashCount: Int get() = rpc.vaultAndUpdates().first.filterStatesOfType<Cash.State>().size val cashCount: Int get() {
val (vault, vaultUpdates) = rpc.vaultAndUpdates()
vaultUpdates.notUsed()
return vault.filterStatesOfType<Cash.State>().size
}
val dollarCashBalance: Amount<Currency> get() = rpc.getCashBalances()[USD]!! val dollarCashBalance: Amount<Currency> get() = rpc.getCashBalances()[USD]!!
val commercialPaperCount: Int get() = rpc.vaultAndUpdates().first.filterStatesOfType<CommercialPaper.State>().size val commercialPaperCount: Int get() {
val (vault, vaultUpdates) = rpc.vaultAndUpdates()
vaultUpdates.notUsed()
return vault.filterStatesOfType<CommercialPaper.State>().size
}
fun runBuyer(amount: Amount<Currency> = 30000.DOLLARS) { fun runBuyer(amount: Amount<Currency> = 30000.DOLLARS) {
val bankOfCordaParty = rpc.partyFromName(BOC.name) val bankOfCordaParty = rpc.partyFromName(BOC.name)
@ -58,7 +66,7 @@ class TraderDemoClientApi(val rpc: CordaRPCOps) {
if (!rpc.attachmentExists(SellerFlow.PROSPECTUS_HASH)) { if (!rpc.attachmentExists(SellerFlow.PROSPECTUS_HASH)) {
javaClass.classLoader.getResourceAsStream("bank-of-london-cp.jar").use { javaClass.classLoader.getResourceAsStream("bank-of-london-cp.jar").use {
val id = rpc.uploadAttachment(it) val id = rpc.uploadAttachment(it)
assertEquals(SellerFlow.PROSPECTUS_HASH, id) check(SellerFlow.PROSPECTUS_HASH == id)
} }
} }

View File

@ -1,6 +1,7 @@
package net.corda.loadtest package net.corda.loadtest
import net.corda.client.mock.Generator import net.corda.client.mock.Generator
import net.corda.client.rpc.notUsed
import net.corda.core.crypto.toBase58String import net.corda.core.crypto.toBase58String
import net.corda.node.driver.PortAllocation import net.corda.node.driver.PortAllocation
import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService
@ -86,7 +87,7 @@ data class LoadTest<T, S>(
log.info("$count remaining commands, state:\n$state") log.info("$count remaining commands, state:\n$state")
// Generate commands // Generate commands
val commands = nodes.generate(state, parameters.parallelism).generate(random).getOrThrow() val commands = nodes.generate(state, parameters.parallelism).generate(random).getOrThrow()
require(commands.size > 0) require(commands.isNotEmpty())
log.info("Generated command batch of size ${commands.size}: $commands") log.info("Generated command batch of size ${commands.size}: $commands")
// Interpret commands // Interpret commands
val newState = commands.fold(state, interpret) val newState = commands.fold(state, interpret)
@ -170,7 +171,8 @@ fun runLoadTests(configuration: LoadTestConfiguration, tests: List<Pair<LoadTest
log.info("Getting node info of ${connection.hostName}") log.info("Getting node info of ${connection.hostName}")
val nodeInfo = connection.proxy.nodeIdentity() val nodeInfo = connection.proxy.nodeIdentity()
log.info("Got node info of ${connection.hostName}: $nodeInfo!") log.info("Got node info of ${connection.hostName}: $nodeInfo!")
val otherNodeInfos = connection.proxy.networkMapUpdates().first val (otherNodeInfos, nodeInfoUpdates) = connection.proxy.networkMapUpdates()
nodeInfoUpdates.notUsed()
val pubkeysString = otherNodeInfos.map { val pubkeysString = otherNodeInfos.map {
" ${it.legalIdentity.name}: ${it.legalIdentity.owningKey.toBase58String()}" " ${it.legalIdentity.name}: ${it.legalIdentity.owningKey.toBase58String()}"
}.joinToString("\n") }.joinToString("\n")

View File

@ -2,6 +2,7 @@ package net.corda.loadtest.tests
import net.corda.client.mock.Generator import net.corda.client.mock.Generator
import net.corda.client.mock.pickN import net.corda.client.mock.pickN
import net.corda.client.rpc.notUsed
import net.corda.contracts.asset.Cash import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Issued import net.corda.core.contracts.Issued
import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.PartyAndReference
@ -218,7 +219,8 @@ val crossCashTest = LoadTest<CrossCashCommand, CrossCashState>(
val currentNodeVaults = HashMap<AbstractParty, HashMap<AbstractParty, Long>>() val currentNodeVaults = HashMap<AbstractParty, HashMap<AbstractParty, Long>>()
simpleNodes.forEach { simpleNodes.forEach {
val quantities = HashMap<AbstractParty, Long>() val quantities = HashMap<AbstractParty, Long>()
val vault = it.connection.proxy.vaultAndUpdates().first val (vault, vaultUpdates) = it.connection.proxy.vaultAndUpdates()
vaultUpdates.notUsed()
vault.forEach { vault.forEach {
val state = it.state.data val state = it.state.data
if (state is Cash.State) { if (state is Cash.State) {

View File

@ -4,6 +4,7 @@ import de.danielbechler.diff.ObjectDifferFactory
import net.corda.client.mock.Generator import net.corda.client.mock.Generator
import net.corda.client.mock.pickOne import net.corda.client.mock.pickOne
import net.corda.client.mock.replicatePoisson import net.corda.client.mock.replicatePoisson
import net.corda.client.rpc.notUsed
import net.corda.contracts.asset.Cash import net.corda.contracts.asset.Cash
import net.corda.core.contracts.USD import net.corda.core.contracts.USD
import net.corda.core.crypto.AbstractParty import net.corda.core.crypto.AbstractParty
@ -70,7 +71,8 @@ val selfIssueTest = LoadTest<SelfIssueCommand, SelfIssueState>(
gatherRemoteState = { previousState -> gatherRemoteState = { previousState ->
val selfIssueVaults = HashMap<AbstractParty, Long>() val selfIssueVaults = HashMap<AbstractParty, Long>()
simpleNodes.forEach { (_, connection, info) -> simpleNodes.forEach { (_, connection, info) ->
val vault = connection.proxy.vaultAndUpdates().first val (vault, vaultUpdates) = connection.proxy.vaultAndUpdates()
vaultUpdates.notUsed()
vault.forEach { vault.forEach {
val state = it.state.data val state = it.state.data
if (state is Cash.State) { if (state is Cash.State) {