CORDA-499: Dokka 'Kt' class cleanup (#1433)
* Remove unneeded identity registrations from tests, which sometimes cause duplicated entries in the database * Add JvmName annotations to ObservableFold and ObservableUtilities * Move createMapChange() into ReadOnlyBackedObservableMapBase as the only place it's used, to improve Java interop. * Clean up Generator by moving extension functions into the class/companion class. * Add documentation to the actual functions rather than being only on the top level class. * Add JvmName to Generators * Move extension functions into the classes they're used in to try supressing generation of empty RPCClientProxyHandlerKt class. * Add JvmName to ConcurrencyUtils * Move Iterable<ServiceInfo>.containsType() into Simulation, as it's only useful when verifying setup of a simulated case. Resolves ServiceInfoKt class being generated. * Move FlowHandle.notUsed() into the one place it's actually called, so Kotlin stops creating a FlowHandleKt class.
package net.corda.client.jfx.utils
import javafx.application.Platform
package net.corda.client.jfx.utils
import javafx.application.Platform
throw UnsupportedOperationException("remove() can't be called on ReadOnlyObservableMapBase")
fun <A, K> ObservableMap<K, A>.createMapChange(key: K, removedValue: A?, addedValue: A?): MapChangeListener.Change<K, A> {
return object : MapChangeListener.Change<K, A>(this) {
override fun getKey() = key
override fun wasRemoved() = removedValue != null
override fun wasAdded() = addedValue != null
override fun getValueRemoved() = removedValue
override fun getValueAdded() = addedValue
* Construct an object modelling the given change to an observed map.
fun createMapChange(key: K, removedValue: A?, addedValue: A?): MapChangeListener.Change<K, A> {
return object : MapChangeListener.Change<K, A>(this) {
override fun getKey() = key
override fun wasRemoved() = removedValue != null
override fun wasAdded() = addedValue != null
override fun getValueRemoved() = removedValue
override fun getValueAdded() = addedValue
* [Generator.choice] picks a generator from the specified list and runs that.
* [Generator.frequency] is similar to [choice] but the probability may be specified for each generator (it is normalised before picking).
* [Generator.combine] combines two generators of A and B with a function (A, B) -> C. Variants exist for other arities.
* [Generator.flatMap] sequences two generators using an arbitrary A->Generator<B> function. Keep the usage of this
* [Generator.flatMap] sequences two generators using an arbitrary A->Generator<B> function. Keep the usage of this
* function minimal as it may explode the stack, especially when using recursion.
* There are other utilities as well, the type of which are usually descriptive.
* The above will generate a random list of animals.
class Generator<out A>(val generate: (SplittableRandom) -> Try<A>) {
// Functor
fun <B> map(function: (A) -> B): Generator<B> =
Generator { generate(it).map(function) }
return Generator { random -> generate(random).flatMap { function(it).generate(random) } }
fun generateOrFail(random: SplittableRandom, numberOfTries: Int = 1): A {
var error: Throwable? = null
for (i in 0..numberOfTries - 1) {
val result = generate(random)
error = when (result) {
is Try.Success -> return result.value
is Try.Failure -> result.exception
if (error == null) {
throw IllegalArgumentException("numberOfTries cannot be <= 0")
} else {
throw Exception("Failed to generate", error)
companion object {
fun <A> pure(value: A) = Generator { Try.Success(value) }
fun <A> impure(valueClosure: () -> A) = Generator { Try.Success(valueClosure()) }
fun <A> fail(error: Exception) = Generator<A> { Try.Failure(error) }
// Alternative
* Pick a generator from the specified list and run it.
fun <A> choice(generators: List<Generator<A>>) = intRange(0, generators.size - 1).flatMap { generators[it] }
fun <A> success(generate: (SplittableRandom) -> A) = Generator { Try.Success(generate(it)) }
* Pick a generator from the specified list, with a probability assigned to each generator, then run the
* chosen generator.
* @param generators a list of probabilities of a generator being chosen, and generators. Probabilities must be
* non-negative.
fun <A> frequency(generators: List<Pair<Double, Generator<A>>>): Generator<A> {
require(generators.all { it.first >= 0.0 }) { "Probabilities must not be negative" }
val ranges = mutableListOf<Pair<Double, Double>>()
var current = 0.0
generators.forEach {
fun <A> frequency(vararg generators: Pair<Double, Generator<A>>) = frequency(generators.toList())
fun <A> sequence(generators: List<Generator<A>>) = Generator<List<A>> {
val result = mutableListOf<A>()
for (generator in generators) {
fun <A> Generator<A>.generateOrFail(random: SplittableRandom, numberOfTries: Int = 1): A {
var error: Throwable? = null
for (i in 0..numberOfTries - 1) {
val result = generate(random)
error = when (result) {
is Try.Success -> return result.value
is Try.Failure -> result.exception
fun int() = Generator.success(SplittableRandom::nextInt)
fun long() = Generator.success(SplittableRandom::nextLong)
fun bytes(size: Int): Generator<ByteArray> = Generator.success { random ->
ByteArray(size) { random.nextInt().toByte() }
if (error == null) {
throw IllegalArgumentException("numberOfTries cannot be <= 0")
} else {
throw Exception("Failed to generate", error)
fun Generator.Companion.int() = Generator.success(SplittableRandom::nextInt)
fun Generator.Companion.long() = Generator.success(SplittableRandom::nextLong)
fun Generator.Companion.bytes(size: Int): Generator<ByteArray> = Generator.success { random ->
ByteArray(size) { random.nextInt().toByte() }
fun intRange(range: IntRange) = intRange(range.first, range.last)
fun intRange(from: Int, to: Int): Generator<Int> = Generator.success {
(from + Math.abs(it.nextInt()) % (to - from + 1)).toInt()
fun Generator.Companion.intRange(range: IntRange) = intRange(range.first, range.last)
fun Generator.Companion.intRange(from: Int, to: Int): Generator<Int> = Generator.success {
(from + Math.abs(it.nextInt()) % (to - from + 1)).toInt()
fun longRange(range: LongRange) = longRange(range.first, range.last)
fun longRange(from: Long, to: Long): Generator<Long> = Generator.success {
(from + Math.abs(it.nextLong()) % (to - from + 1)).toLong()
fun Generator.Companion.longRange(range: LongRange) = longRange(range.first, range.last)
fun Generator.Companion.longRange(from: Long, to: Long): Generator<Long> = Generator.success {
(from + Math.abs(it.nextLong()) % (to - from + 1)).toLong()
fun double() = Generator.success { it.nextDouble() }
fun doubleRange(from: Double, to: Double): Generator<Double> = Generator.success {
from + it.nextDouble() * (to - from)
fun Generator.Companion.double() = Generator.success { it.nextDouble() }
fun Generator.Companion.doubleRange(from: Double, to: Double): Generator<Double> = Generator.success {
from + it.nextDouble() * (to - from)
fun Generator.Companion.char() = Generator {
val codePoint = Math.abs(it.nextInt()) % (17 * (1 shl 16))
if (Character.isValidCodePoint(codePoint)) {
return@Generator Try.Success(codePoint.toChar())
} else {
Try.Failure(IllegalStateException("Could not generate valid codepoint"))
fun Generator.Companion.string(meanSize: Double = 16.0) = replicatePoisson(meanSize, char()).map {
val builder = StringBuilder()
it.forEach {
fun <A> Generator.Companion.replicate(number: Int, generator: Generator<A>): Generator<List<A>> {
val generators = mutableListOf<Generator<A>>()
for (i in 1..number) {
return sequence(generators)
fun <A> Generator.Companion.replicatePoisson(meanSize: Double, generator: Generator<A>, atLeastOne: Boolean = false) = Generator<List<A>> {
val chance = (meanSize - 1) / meanSize
val result = mutableListOf<A>()
var finish = false
while (!finish) {
val res = Generator.doubleRange(0.0, 1.0).generate(it).flatMap { value ->
if (value < chance) {
generator.generate(it).map { result.add(it) }
fun char() = Generator {
val codePoint = Math.abs(it.nextInt()) % (17 * (1 shl 16))
if (Character.isValidCodePoint(codePoint)) {
return@Generator Try.Success(codePoint.toChar())
} else {
finish = true
if (result.isEmpty() && atLeastOne) {
generator.generate(it).map { result.add(it) }
} else Try.Success(Unit)
Try.Failure(IllegalStateException("Could not generate valid codepoint"))
if (res is Try.Failure) {
return@Generator res
fun string(meanSize: Double = 16.0) = replicatePoisson(meanSize, char()).map {
val builder = StringBuilder()
it.forEach {
fun <A> Generator.Companion.pickOne(list: List<A>) = Generator.intRange(0, list.size - 1).map { list[it] }
fun <A> Generator.Companion.pickN(number: Int, list: List<A>) = Generator<List<A>> {
val mask = BitSet(list.size)
val size = Math.min(list.size, number)
for (i in 0..size - 1) {
// mask[i] = 1 desugars into mask.set(i, 1), which sets a range instead of a bit
mask[i] = true
for (i in 0..list.size - 1) {
val bit = mask[i]
val swapIndex = i + it.nextInt(size - i)
mask[i] = mask[swapIndex]
mask[swapIndex] = bit
val resultList = ArrayList<A>()
list.forEachIndexed { index, a ->
if (mask[index]) {
fun <A> replicate(number: Int, generator: Generator<A>): Generator<List<A>> {
val generators = mutableListOf<Generator<A>>()
for (i in 1..number) {
return sequence(generators)
fun <A> Generator.Companion.sampleBernoulli(maxRatio: Double = 1.0, vararg collection: A) =
sampleBernoulli(listOf(collection), maxRatio)
fun <A> Generator.Companion.sampleBernoulli(collection: Collection<A>, meanRatio: Double = 1.0): Generator<List<A>> =
replicate(collection.size, Generator.doubleRange(0.0, 1.0)).map { chances ->
fun <A> replicatePoisson(meanSize: Double, generator: Generator<A>, atLeastOne: Boolean = false) = Generator<List<A>> {
val chance = (meanSize - 1) / meanSize
val result = mutableListOf<A>()
collection.forEachIndexed { index, element ->
if (chances[index] < meanRatio) {
var finish = false
while (!finish) {
val res = Generator.doubleRange(0.0, 1.0).generate(it).flatMap { value ->
if (value < chance) {
generator.generate(it).map { result.add(it) }
} else {
finish = true
if (result.isEmpty() && atLeastOne) {
generator.generate(it).map { result.add(it) }
} else Try.Success(Unit)
if (res is Try.Failure) {
return@Generator res
fun <A> pickOne(list: List<A>) = Generator.intRange(0, list.size - 1).map { list[it] }
fun <A> pickN(number: Int, list: List<A>) = Generator<List<A>> {
val mask = BitSet(list.size)
val size = Math.min(list.size, number)
for (i in 0..size - 1) {
// mask[i] = 1 desugars into mask.set(i, 1), which sets a range instead of a bit
mask[i] = true
for (i in 0..list.size - 1) {
val bit = mask[i]
val swapIndex = i + it.nextInt(size - i)
mask[i] = mask[swapIndex]
mask[swapIndex] = bit
val resultList = ArrayList<A>()
list.forEachIndexed { index, a ->
if (mask[index]) {
fun <A> sampleBernoulli(maxRatio: Double = 1.0, vararg collection: A) =
sampleBernoulli(listOf(collection), maxRatio)
fun <A> sampleBernoulli(collection: Collection<A>, meanRatio: Double = 1.0): Generator<List<A>> {
return replicate(collection.size, Generator.doubleRange(0.0, 1.0)).map { chances ->
val result = mutableListOf<A>()
collection.forEachIndexed { index, element ->
if (chances[index] < meanRatio) {
package net.corda.client.mock
import net.corda.core.contracts.Amount
@ -82,6 +82,19 @@ class RPCClientProxyHandler(
val log = loggerFor<RPCClientProxyHandler>()
// To check whether toString() is being invoked
val toStringMethod: Method = Object::toString.javaMethod!!
private fun addRpcCallSiteToThrowable(throwable: Throwable, callSite: Throwable) {
var currentThrowable = throwable
while (true) {
val cause = currentThrowable.cause
if (cause == null) {
} else {
currentThrowable = cause
// Used for reaping
@ -393,6 +406,19 @@ object RpcClientObservableSerializer : Serializer<Observable<*>>() {
return serializationContext.withProperty(RpcObservableContextKey, observableContext)
private fun <T> pinInSubscriptions(observable: Observable<T>, hardReferenceStore: MutableSet<Observable<*>>): Observable<T> {
val refCount = AtomicInteger(0)
return observable.doOnSubscribe {
if (refCount.getAndIncrement() == 0) {
require(hardReferenceStore.add(observable)) { "Reference store already contained reference $this on add" }
}.doOnUnsubscribe {
if (refCount.decrementAndGet() == 0) {
require(hardReferenceStore.remove(observable)) { "Reference store did not contain reference $this on remove" }
override fun read(kryo: Kryo, input: Input, type: Class<Observable<*>>): Observable<Any> {
val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext
val observableId = RPCApi.ObservableId(input.readLong(true))
@ -405,7 +431,7 @@ object RpcClientObservableSerializer : Serializer<Observable<*>>() {
observableContext.callSiteMap?.put(observableId.toLong, rpcCallSite)
// We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users
// don't need to store a reference to the Observables themselves.
return observable.pinInSubscriptions(observableContext.hardReferenceStore).doOnUnsubscribe {
return pinInSubscriptions(observable, observableContext.hardReferenceStore).doOnUnsubscribe {
// This causes Future completions to give warnings because the corresponding OnComplete sent from the server
// will arrive after the client unsubscribes from the observable and consequently invalidates the mapping.
// The unsubscribe is due to [ObservableToFuture]'s use of first().
@ -421,30 +447,4 @@ object RpcClientObservableSerializer : Serializer<Observable<*>>() {
val rpcRequestOrObservableId = kryo.context[RPCApi.RpcRequestOrObservableIdKey] as Long
return observableContext.callSiteMap?.get(rpcRequestOrObservableId)
private fun addRpcCallSiteToThrowable(throwable: Throwable, callSite: Throwable) {
var currentThrowable = throwable
while (true) {
val cause = currentThrowable.cause
if (cause == null) {
} else {
currentThrowable = cause
private fun <T> Observable<T>.pinInSubscriptions(hardReferenceStore: MutableSet<Observable<*>>): Observable<T> {
val refCount = AtomicInteger(0)
return this.doOnSubscribe {
if (refCount.getAndIncrement() == 0) {
require(hardReferenceStore.add(this)) { "Reference store already contained reference $this on add" }
}.doOnUnsubscribe {
if (refCount.decrementAndGet() == 0) {
require(hardReferenceStore.remove(this)) { "Reference store did not contain reference $this on remove" }
package net.corda.core.concurrent
import net.corda.core.internal.concurrent.openFuture
@ -57,16 +57,11 @@ data class FlowProgressHandleImpl<A>(
// Remember to add @Throws to FlowProgressHandle.close() if this throws an exception.
override fun close() {
try {
progress.subscribe({}, {}).unsubscribe()
} catch (e: Exception) {
// Swallow any other exceptions as well.
// Private copy of the version in client:rpc.
private fun <T> Observable<T>.notUsed() {
try {
this.subscribe({}, {}).unsubscribe()
} catch (e: Exception) {
// Swallow any other exceptions as well.
override fun toString() = if (name != null) "$type|$name" else type.toString()
fun Iterable<ServiceInfo>.containsType(type: ServiceType) = any { it.type == type }
val bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name)
val alice: Party = aliceNode.services.myInfo.legalIdentity
val bob: Party = bobNode.services.myInfo.legalIdentity
aliceNode.database.transaction {
bobNode.database.transaction {
// Alice issues then pays some cash to a new confidential identity that Bob doesn't know about
@ -26,14 +26,6 @@ class TransactionKeyFlowTests {
val bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name)
val alice: Party = aliceNode.services.myInfo.legalIdentity
val bob: Party = bobNode.services.myInfo.legalIdentity
aliceNode.database.transaction {
bobNode.database.transaction {
// Run the flows
val requesterFlow = aliceNode.services.startFlow(TransactionKeyFlow(bob))
import net.corda.core.node.CityDatabase
import net.corda.core.node.WorldMapLocation
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.containsType
import net.corda.core.node.services.ServiceType
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getX500Name
import net.corda.core.utilities.locality
* Helper function for verifying that a service info contains the given type of advertised service. For non-simulation cases
* this is a configuration matter rather than implementation.
fun Iterable<ServiceInfo>.containsType(type: ServiceType) = any { it.type == type }
package net.corda.testing
import net.corda.client.mock.Generator
import net.corda.client.mock.generateOrFail
import net.corda.client.mock.int
import net.corda.client.mock.string
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.client.mock.ErrorFlowsEventGenerator
import net.corda.client.mock.EventGenerator
import net.corda.client.mock.Generator
import net.corda.client.mock.pickOne
import net.corda.client.rpc.CordaRPCConnection
import net.corda.core.contracts.Amount
import net.corda.core.identity.Party
package net.corda.loadtest.tests
import net.corda.client.mock.Generator
import net.corda.client.mock.pickN
import net.corda.core.contracts.Issued
import net.corda.core.contracts.PartyAndReference
import net.corda.core.identity.AbstractParty
import net.corda.client.mock.Generator
import net.corda.client.mock.generateAmount
import net.corda.client.mock.pickOne
import net.corda.core.contracts.Issued
import net.corda.core.contracts.PartyAndReference
import net.corda.core.contracts.withoutIssuer
package net.corda.loadtest.tests
import net.corda.client.mock.Generator
import net.corda.client.mock.int
import net.corda.client.mock.pickOne
import net.corda.client.mock.replicate
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowException
import net.corda.core.internal.concurrent.thenMatch
import de.danielbechler.diff.ObjectDifferFactory
import net.corda.client.mock.Generator
import net.corda.client.mock.pickOne
import net.corda.client.mock.replicatePoisson
import net.corda.finance.contracts.asset.Cash
import net.corda.core.flows.FlowException
import net.corda.core.identity.AbstractParty
package net.corda.verifier
import net.corda.client.mock.generateOrFail
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow
