Replace unused variables with _ after kotlin 1.1.1 upgrade (#456)
* Replace unused variables with _ after kotlin 1.1.1 upgrade
@ -5,12 +5,10 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.google.common.collect.HashMultimap
import com.google.common.collect.Multimap
import com.google.common.collect.MultimapBuilder
import net.corda.jackson.StringToMethodCallParser.ParsedMethodCall
import org.slf4j.LoggerFactory
import java.lang.reflect.Constructor
import java.lang.reflect.Method
import java.util.*
import java.util.concurrent.Callable
import javax.annotation.concurrent.ThreadSafe
import kotlin.reflect.KClass
@ -75,14 +73,14 @@ import kotlin.reflect.jvm.kotlinFunction
open class StringToMethodCallParser<in T : Any> @JvmOverloads constructor(
targetType: Class<out T>,
private val om: ObjectMapper = JacksonSupport.createNonRpcMapper(YAMLFactory()))
private val om: ObjectMapper = JacksonSupport.createNonRpcMapper(YAMLFactory())) {
/** Same as the regular constructor but takes a Kotlin reflection [KClass] instead of a Java [Class]. */
constructor(targetType: KClass<out T>) : this(targetType.java)
companion object {
private val ignoredNames = Object::class.java.methods.map { it.name }
private fun methodsFromType(clazz: Class<*>): Multimap<String, Method> {
val result = HashMultimap.create<String, Method>()
for ((key, value) in clazz.methods.filterNot { it.isSynthetic && it.name !in ignoredNames }.map { it.name to it }) {
@ -90,6 +88,7 @@ open class StringToMethodCallParser<in T : Any> @JvmOverloads constructor(
return result
private val log = LoggerFactory.getLogger(StringToMethodCallParser::class.java)!!
@ -126,7 +125,7 @@ open class StringToMethodCallParser<in T : Any> @JvmOverloads constructor(
return method.parameters.mapIndexed { index, param ->
when {
param.isNamePresent -> param.name
// index + 1 because the first Kotlin reflection param is 'this', but that doesn't match Java reflection.
// index + 1 because the first Kotlin reflection param is 'this', but that doesn't match Java reflection.
kf != null -> kf.parameters[index + 1].name ?: throw UnparseableCallException.ReflectionDataMissing(method.name, index)
else -> throw UnparseableCallException.ReflectionDataMissing(method.name, index)
@ -193,7 +192,7 @@ open class StringToMethodCallParser<in T : Any> @JvmOverloads constructor(
val parameterString = "{ $args }"
val tree: JsonNode = om.readTree(parameterString) ?: throw UnparseableCallException(args)
if (tree.size() > parameters.size) throw UnparseableCallException.TooManyParameters(methodNameHint, args)
val inOrderParams: List<Any?> = parameters.mapIndexed { index, param ->
val inOrderParams: List<Any?> = parameters.mapIndexed { _, param ->
val (argName, argType) = param
val entry = tree[argName] ?: throw UnparseableCallException.MissingParameter(methodNameHint, argName, args)
om.readValue(entry.traverse(om), argType)
@ -31,7 +31,7 @@ class ChosenList<E>(
init {
chosenListObservable.addListener { observable: Observable -> rechoose() }
chosenListObservable.addListener { _: Observable -> rechoose() }
nextAdd(0, currentList.size)
@ -38,7 +38,7 @@ class FlattenedList<A>(val sourceList: ObservableList<out ObservableValue<out A>
private fun createListener(wrapped: WrappedObservableValue<out A>): ChangeListener<A> {
val listener = ChangeListener<A> { _observableValue, oldValue, newValue ->
val listener = ChangeListener<A> { _, oldValue, _ ->
val currentIndex = indexMap[wrapped]!!.first
nextReplace(currentIndex, currentIndex + 1, listOf(oldValue))
@ -55,7 +55,7 @@ class FlattenedList<A>(val sourceList: ObservableList<out ObservableValue<out A>
val from = c.from
val to = c.to
val permutation = IntArray(to, { c.getPermutation(it) })
indexMap.replaceAll { _observableValue, pair -> Pair(permutation[pair.first], pair.second) }
indexMap.replaceAll { _, pair -> Pair(permutation[pair.first], pair.second) }
nextPermutation(from, to, permutation)
} else if (c.wasUpdated()) {
throw UnsupportedOperationException("FlattenedList doesn't support Update changes")
@ -67,7 +67,7 @@ fun <A> Observable<A>.recordInSequence(): ObservableList<A> {
* @param toKey Function retrieving the key to associate with.
* @param merge The function to be called if there is an existing element at the key.
fun <A, K> Observable<A>.recordAsAssociation(toKey: (A) -> K, merge: (K, oldValue: A, newValue: A) -> A = { _key, _oldValue, newValue -> newValue }): ObservableMap<K, A> {
fun <A, K> Observable<A>.recordAsAssociation(toKey: (A) -> K, merge: (K, oldValue: A, newValue: A) -> A = { _, _, newValue -> newValue }): ObservableMap<K, A> {
return fold(FXCollections.observableHashMap<K, A>()) { map, item ->
val key = toKey(item)
map[key] = map[key]?.let { merge(key, it, item) } ?: item
@ -158,7 +158,7 @@ fun <K, A, B> ObservableList<out A>.associateBy(toKey: (A) -> K, assemble: (K, A
* val nameToPerson: ObservableMap<String, Person> = people.associateBy(Person::name)
fun <K, A> ObservableList<out A>.associateBy(toKey: (A) -> K): ObservableMap<K, A> {
return associateBy(toKey) { key, value -> value }
return associateBy(toKey) { _, value -> value }
@ -176,7 +176,7 @@ fun <K : Any, A : Any, B> ObservableList<out A>.associateByAggregation(toKey: (A
* val heightToPeople: ObservableMap<Long, ObservableList<Person>> = people.associateByAggregation(Person::height)
fun <K : Any, A : Any> ObservableList<out A>.associateByAggregation(toKey: (A) -> K): ObservableMap<K, ObservableList<A>> {
return associateByAggregation(toKey) { key, value -> value }
return associateByAggregation(toKey) { _, value -> value }
@ -260,7 +260,7 @@ fun <A : Any, B : Any, K : Any> ObservableList<A>.leftOuterJoin(
val leftTableMap = associateByAggregation(leftToJoinKey)
val rightTableMap = rightTable.associateByAggregation(rightToJoinKey)
val joinedMap: ObservableMap<K, Pair<ObservableList<A>, ObservableList<B>>> =
LeftOuterJoinedMap(leftTableMap, rightTableMap) { _key, left, rightValue ->
LeftOuterJoinedMap(leftTableMap, rightTableMap) { _, left, rightValue ->
Pair(left, ChosenList(rightValue.map { it ?: FXCollections.emptyObservableList() }))
return joinedMap
@ -285,7 +285,7 @@ fun <A> ObservableList<A>.last(): ObservableValue<A?> {
fun <T : Any> ObservableList<T>.unique(): ObservableList<T> {
return AggregatedList(this, { it }, { key, _list -> key })
return AggregatedList(this, { it }, { key, _ -> key })
fun ObservableValue<*>.isNotNull(): BooleanBinding {
@ -16,7 +16,7 @@ class AssociatedListTest {
fun setup() {
sourceList = FXCollections.observableArrayList(0)
associatedList = AssociatedList(sourceList, { it % 3 }) { mod3, number -> number }
associatedList = AssociatedList(sourceList, { it % 3 }) { _, number -> number }
replayedMap = ReplayedMap(associatedList)
@ -81,7 +81,7 @@ class EventGenerator(
val exitCashGenerator =
amountToIssueGenerator.combine(partyGenerator, issueRefGenerator) { amount, to, issueRef ->
amountToIssueGenerator.combine(partyGenerator, issueRefGenerator) { amount, _, issueRef ->
@ -75,7 +75,7 @@ fun <T> future(block: () -> T): ListenableFuture<T> = CompletableToListenable(Co
private class CompletableToListenable<T>(private val base: CompletableFuture<T>) : Future<T> by base, ListenableFuture<T> {
override fun addListener(listener: Runnable, executor: Executor) {
base.whenCompleteAsync(BiConsumer { result, exception -> listener.run() }, executor)
base.whenCompleteAsync(BiConsumer { _, _ -> listener.run() }, executor)
@ -104,6 +104,7 @@ infix fun <T> ListenableFuture<T>.success(body: (T) -> Unit): ListenableFuture<T
infix fun <T> ListenableFuture<T>.failure(body: (Throwable) -> Unit): ListenableFuture<T> = apply { failure(RunOnCallerThread, body) }
@Suppress("UNCHECKED_CAST") // We need the awkward cast because otherwise F cannot be nullable, even though it's safe.
infix fun <F, T> ListenableFuture<F>.map(mapper: (F) -> T): ListenableFuture<T> = Futures.transform(this, Function { (mapper as (F?) -> T)(it) })
infix fun <F, T> ListenableFuture<F>.flatMap(mapper: (F) -> ListenableFuture<T>): ListenableFuture<T> = Futures.transformAsync(this) { mapper(it!!) }
/** Executes the given block and sets the future to either the result, or any exception that was thrown. */
inline fun <T> SettableFuture<T>.catch(block: () -> T) {
@ -107,11 +107,11 @@ object Crypto {
* Do not forget to add the DEFAULT_SIGNATURE_SCHEME as well.
private val supportedSignatureSchemes = mapOf(
RSA_SHA256.schemeCodeName to RSA_SHA256,
ECDSA_SECP256K1_SHA256.schemeCodeName to ECDSA_SECP256K1_SHA256,
ECDSA_SECP256R1_SHA256.schemeCodeName to ECDSA_SECP256R1_SHA256,
EDDSA_ED25519_SHA512.schemeCodeName to EDDSA_ED25519_SHA512,
SPHINCS256_SHA256.schemeCodeName to SPHINCS256_SHA256
RSA_SHA256.schemeCodeName to RSA_SHA256,
ECDSA_SECP256K1_SHA256.schemeCodeName to ECDSA_SECP256K1_SHA256,
ECDSA_SECP256R1_SHA256.schemeCodeName to ECDSA_SECP256R1_SHA256,
EDDSA_ED25519_SHA512.schemeCodeName to EDDSA_ED25519_SHA512,
SPHINCS256_SHA256.schemeCodeName to SPHINCS256_SHA256
@ -181,9 +181,9 @@ object Crypto {
fun decodePrivateKey(encodedKey: ByteArray): PrivateKey {
for (sig in supportedSignatureSchemes.values) {
for ((_, _, _, _, keyFactory) in supportedSignatureSchemes.values) {
try {
return sig.keyFactory.generatePrivate(PKCS8EncodedKeySpec(encodedKey))
return keyFactory.generatePrivate(PKCS8EncodedKeySpec(encodedKey))
} catch (ikse: InvalidKeySpecException) {
// ignore it - only used to bypass the scheme that causes an exception.
@ -218,9 +218,9 @@ object Crypto {
fun decodePublicKey(encodedKey: ByteArray): PublicKey {
for (sig in supportedSignatureSchemes.values) {
for ((_, _, _, _, keyFactory) in supportedSignatureSchemes.values) {
try {
return sig.keyFactory.generatePublic(X509EncodedKeySpec(encodedKey))
return keyFactory.generatePublic(X509EncodedKeySpec(encodedKey))
} catch (ikse: InvalidKeySpecException) {
// ignore it - only used to bypass the scheme that causes an exception.
@ -66,7 +66,7 @@ object DefaultKryoCustomizer {
register(CompositeKey.Leaf::class.java, CompositeKeyLeafSerializer)
// Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway.
register(Array<StackTraceElement>::class, read = { kryo, input -> emptyArray() }, write = { kryo, output, obj -> })
register(Array<StackTraceElement>::class, read = { _, _ -> emptyArray() }, write = { _, _, _ -> })
// This ensures a NonEmptySetSerializer is constructed with an initial value.
register(NonEmptySet::class.java, NonEmptySetSerializer)
@ -6,7 +6,6 @@ import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.getOrThrow
import net.corda.core.node.recordTransactions
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.SignedTransaction
@ -43,9 +42,9 @@ class ResolveTransactionsFlow(private val txHashes: Set<SecureHash>,
// Construct txhash -> dependent-txs map
val forwardGraph = HashMap<SecureHash, HashSet<SignedTransaction>>()
transactions.forEach { stx ->
stx.tx.inputs.forEach { input ->
stx.tx.inputs.forEach { (txhash) ->
// Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is)
forwardGraph.getOrPut(input.txhash) { LinkedHashSet() }.add(stx)
forwardGraph.getOrPut(txhash) { LinkedHashSet() }.add(stx)
@ -17,7 +17,7 @@ class DummyContractV2Tests {
val v1State = TransactionState(DummyContract.SingleOwnerState(0, ALICE_PUBKEY), DUMMY_NOTARY)
val v1Ref = StateRef(SecureHash.randomSHA256(), 0)
val v1StateAndRef = StateAndRef(v1State, v1Ref)
val (tx, signers) = DummyContractV2().generateUpgradeFromV1(v1StateAndRef)
val (tx, _) = DummyContractV2().generateUpgradeFromV1(v1StateAndRef)
assertEquals(v1Ref, tx.inputs.single())
@ -531,7 +531,7 @@ class Obligation<P> : Contract {
// Produce a new set of states
val groups = statesAndRefs.groupBy { it.state.data.amount.token }
for ((aggregateState, stateAndRefs) in groups) {
for ((_, stateAndRefs) in groups) {
val partiesUsed = ArrayList<CompositeKey>()
stateAndRefs.forEach { stateAndRef ->
val outState = stateAndRef.state.data.copy(lifecycle = lifecycle)
@ -14,7 +14,10 @@ import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.testing.*
import net.corda.testing.BOC
import net.corda.testing.MEGA_CORP
import net.corda.testing.initiateSingleShotFlow
import net.corda.testing.ledger
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import org.junit.Test
@ -86,7 +89,7 @@ class IssuerFlowTest {
runIssuerAndIssueRequester(bankOfCordaNode, bankClientNode, Amount(pennies, amount.token), issueToPartyAndRef)
handles.forEach {
require (it.issueRequestResult.get() is SignedTransaction)
require(it.issueRequestResult.get() is SignedTransaction)
@ -95,10 +98,10 @@ class IssuerFlowTest {
private fun runIssuerAndIssueRequester(issuerNode: MockNode, issueToNode: MockNode,
amount: Amount<Currency>, issueToPartyAndRef: PartyAndReference) : RunResult {
amount: Amount<Currency>, issueToPartyAndRef: PartyAndReference): RunResult {
val resolvedIssuerParty = issuerNode.services.identityService.partyFromAnonymous(issueToPartyAndRef) ?: throw IllegalStateException()
val issuerFuture = issuerNode.initiateSingleShotFlow(IssuerFlow.IssuanceRequester::class) {
otherParty -> IssuerFlow.Issuer(resolvedIssuerParty)
val issuerFuture = issuerNode.initiateSingleShotFlow(IssuerFlow.IssuanceRequester::class) { _ ->
}.map { it.stateMachine }
val issueRequest = IssuanceRequester(amount, resolvedIssuerParty, issueToPartyAndRef.reference, issuerNode.info.legalIdentity)
@ -3,7 +3,6 @@ package net.corda.node.services
import net.corda.core.bufferUntilSubscribed
import net.corda.core.contracts.Amount
import net.corda.core.contracts.POUNDS
import net.corda.core.contracts.issuedBy
import net.corda.core.crypto.Party
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
@ -89,7 +88,7 @@ class DistributedServiceTests : DriverBasedTest() {
expect(match = { it.second is StateMachineUpdate.Added }) {
val (notary, update) = it
update as StateMachineUpdate.Added
notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 }
notarisationsPerNotary.compute(notary.legalIdentity) { _, number -> number?.plus(1) ?: 1 }
@ -128,7 +127,7 @@ class DistributedServiceTests : DriverBasedTest() {
expect(match = { it.second is StateMachineUpdate.Added }) {
val (notary, update) = it
update as StateMachineUpdate.Added
notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 }
notarisationsPerNotary.compute(notary.legalIdentity) { _, number -> number?.plus(1) ?: 1 }
@ -107,7 +107,7 @@ class P2PMessagingTest : NodeBasedTest() {
private fun Node.respondWith(message: Any) {
net.addMessageHandler(javaClass.name, DEFAULT_SESSION_ID) { netMessage, reg ->
net.addMessageHandler(javaClass.name, DEFAULT_SESSION_ID) { netMessage, _ ->
val request = netMessage.data.deserialize<TestRequest>()
val response = net.createMessage(javaClass.name, request.sessionID, message.serialize().bytes)
net.send(response, request.replyTo)
@ -243,7 +243,7 @@ class Node(override val configuration: FullNodeConfiguration,
createsObjectNamesWith { type, domain, name ->
createsObjectNamesWith { _, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
@ -31,7 +31,7 @@ abstract class AbstractNodeService(val services: ServiceHubInternal) : Singleton
addMessageHandler(topic: String,
crossinline handler: (Q) -> R,
crossinline exceptionConsumer: (Message, Exception) -> Unit): MessageHandlerRegistration {
return net.addMessageHandler(topic, DEFAULT_SESSION_ID) { message, r ->
return net.addMessageHandler(topic, DEFAULT_SESSION_ID) { message, _ ->
try {
val request = message.data.deserialize<Q>()
val response = handler(request)
@ -57,7 +57,7 @@ abstract class AbstractNodeService(val services: ServiceHubInternal) : Singleton
protected inline fun <reified Q : ServiceRequestMessage, reified R : Any>
addMessageHandler(topic: String,
crossinline handler: (Q) -> R): MessageHandlerRegistration {
return addMessageHandler(topic, handler, { message: Message, exception: Exception -> throw exception })
return addMessageHandler(topic, handler, { _: Message, exception: Exception -> throw exception })
@ -2,7 +2,6 @@ package net.corda.node.services.events
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.SettableFuture
import kotlinx.support.jdk8.collections.compute
import net.corda.core.ThreadBox
import net.corda.core.contracts.SchedulableState
import net.corda.core.contracts.ScheduledActivity
@ -200,7 +199,7 @@ class NodeSchedulerService(private val database: Database,
var scheduledLogic: FlowLogic<*>? = null
scheduler.mutex.locked {
// need to remove us from those scheduled, but only if we are still next
scheduledStates.compute(scheduledState.ref) { ref, value ->
scheduledStates.compute(scheduledState.ref) { _, value ->
if (value === scheduledState) {
if (scheduledActivity == null) {
logger.info("Scheduled state $scheduledState has rescheduled to never.")
@ -81,7 +81,7 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
ifChangedSinceVer: Int?): ListenableFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
net.addMessageHandler(NetworkMapService.PUSH_TOPIC, DEFAULT_SESSION_ID) { message, r ->
net.addMessageHandler(NetworkMapService.PUSH_TOPIC, DEFAULT_SESSION_ID) { message, _ ->
try {
val req = message.data.deserialize<NetworkMapService.Update>()
val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_TOPIC, DEFAULT_SESSION_ID,
@ -99,9 +99,9 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress)
val future = net.sendRequest<FetchMapResponse>(NetworkMapService.FETCH_TOPIC, req, networkMapAddress).map { resp ->
val future = net.sendRequest<FetchMapResponse>(NetworkMapService.FETCH_TOPIC, req, networkMapAddress).map { (nodes) ->
// We may not receive any nodes back, if the map hasn't changed since the version specified
resp.nodes?.forEach { processRegistration(it) }
nodes?.forEach { processRegistration(it) }
@ -1,8 +1,6 @@
package net.corda.node.services.network
import com.google.common.annotations.VisibleForTesting
import kotlinx.support.jdk8.collections.compute
import kotlinx.support.jdk8.collections.removeIf
import net.corda.core.ThreadBox
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.Party
@ -108,6 +106,7 @@ interface NetworkMapService {
data class Update(val wireReg: WireNodeRegistration, val mapVersion: Int, val replyTo: MessageRecipients)
data class UpdateAcknowledge(val mapVersion: Int, val replyTo: MessageRecipients)
@ -162,7 +161,7 @@ abstract class AbstractNetworkMapService(services: ServiceHubInternal) : Network
handlers += addMessageHandler(QUERY_TOPIC) { req: QueryIdentityRequest -> processQueryRequest(req) }
handlers += addMessageHandler(REGISTER_TOPIC) { req: RegistrationRequest -> processRegistrationRequest(req) }
handlers += addMessageHandler(SUBSCRIPTION_TOPIC) { req: SubscribeRequest -> processSubscriptionRequest(req) }
handlers += net.addMessageHandler(PUSH_ACK_TOPIC, DEFAULT_SESSION_ID) { message, r ->
handlers += net.addMessageHandler(PUSH_ACK_TOPIC, DEFAULT_SESSION_ID) { message, _ ->
val req = message.data.deserialize<UpdateAcknowledge>()
@ -237,7 +236,7 @@ abstract class AbstractNetworkMapService(services: ServiceHubInternal) : Network
// in on different threads, there is no risk of a race condition while checking
// sequence numbers.
val registrationInfo = try {
nodeRegistrations.compute(node.legalIdentity) { mapKey: Party, existing: NodeRegistrationInfo? ->
nodeRegistrations.compute(node.legalIdentity) { _: Party, existing: NodeRegistrationInfo? ->
require(!((existing == null || existing.reg.type == REMOVE) && change.type == REMOVE)) {
"Attempting to de-register unknown node"
@ -62,8 +62,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
StrandLocalTransactionManager.database = db
else Strand.sleep(millis)
} else Strand.sleep(millis)
@ -358,7 +357,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
waitingForResponse = ioRequest
var exceptionDuringSuspend: Throwable? = null
parkAndSerialize { f, s ->
parkAndSerialize { _, _ ->
logger.trace { "Suspended on $ioRequest" }
// restore the Tx onto the ThreadLocal so that we can commit the ensuing checkpoint to the DB
try {
@ -232,7 +232,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
started = true
stateMachines.keys.forEach { resumeRestoredFiber(it) }
serviceHub.networkService.addMessageHandler(sessionTopic) { message, reg ->
serviceHub.networkService.addMessageHandler(sessionTopic) { message, _ ->
@ -102,7 +102,7 @@ object BFTSMaRt {
/** An extractor to build the final response message for the client application from all received replica replies. */
private fun buildExtractor(): Extractor {
return Extractor { replies, sameContent, lastReceived ->
return Extractor { replies, _, lastReceived ->
val responses = replies.mapNotNull { it?.content?.deserialize<ReplicaResponse>() }
val accepted = responses.filterIsInstance<ReplicaResponse.Signature>()
val rejected = responses.filterIsInstance<ReplicaResponse.Error>()
@ -156,7 +156,7 @@ object BFTSMaRt {
override fun appExecuteBatch(command: Array<ByteArray>, mcs: Array<MessageContext>): Array<ByteArray?> {
val replies = command.zip(mcs) { c, m ->
val replies = command.zip(mcs) { c, _ ->
return replies.toTypedArray()
@ -13,7 +13,7 @@ import java.util.*
class CashBalanceAsMetricsObserver(val serviceHubInternal: ServiceHubInternal, val database: Database) {
init {
// TODO: Need to consider failure scenarios. This needs to run if the TX is successfully recorded
serviceHubInternal.vaultService.updates.subscribe { update ->
serviceHubInternal.vaultService.updates.subscribe { _ ->
@ -120,7 +120,7 @@ private fun <T : Any> makeStrandFriendlySettableFuture(future: Future<T>): Setta
} else if (future is CompletableFuture) {
val settable = SettableFuture<Boolean>()
future.whenComplete(BiConsumer { value, throwable -> settable.set(true) })
future.whenComplete(BiConsumer { _, _ -> settable.set(true) })
} else {
throw IllegalArgumentException("Cannot make future $future Fiber friendly.")
@ -26,7 +26,7 @@ import kotlin.system.measureTimeMillis
* TODO: make this value configurable
* TODO: tune this value, as it's currently mostly a guess
val DEFAULT_MAX_BUCKETS = (256 * (1 + Math.max(0, (Runtime.getRuntime().maxMemory()/1000000 - 128) / 64))).toInt()
val DEFAULT_MAX_BUCKETS = (256 * (1 + Math.max(0, (Runtime.getRuntime().maxMemory() / 1000000 - 128) / 64))).toInt()
* A convenient JDBC table backed hash map with iteration order based on insertion order.
@ -255,7 +255,7 @@ abstract class AbstractJDBCHashMap<K : Any, V : Any, out T : JDBCHashedTable>(va
override fun remove(key: K): V? {
val bucket = getBucket(key)
var removed: V? = null
buckets.computeIfPresent(key.hashCode()) { hashCode, value ->
buckets.computeIfPresent(key.hashCode()) { _, value ->
for (entry in value) {
if (entry.key == key) {
removed = entry.value
@ -369,7 +369,7 @@ abstract class AbstractJDBCHashMap<K : Any, V : Any, out T : JDBCHashedTable>(va
var oldValue: V? = null
var oldSeqNo: Int? = null
buckets.compute(key.hashCode()) { hashCode, list ->
buckets.compute(key.hashCode()) { _, list ->
val newList = list ?: newBucket()
val iterator = newList.listIterator()
while (iterator.hasNext()) {
@ -424,7 +424,7 @@ abstract class AbstractJDBCHashMap<K : Any, V : Any, out T : JDBCHashedTable>(va
private fun getBucket(key: Any): MutableList<NotReallyMutableEntry<K, V>> {
return buckets.computeIfAbsent(key.hashCode()) { hashCode ->
return buckets.computeIfAbsent(key.hashCode()) { _ ->
if (!loadOnInit) {
} else {
@ -41,13 +41,13 @@ class InMemoryMessagingTests {
var finalDelivery: Message? = null
with(node2) {
node2.net.addMessageHandler { msg, registration ->
node2.net.addMessageHandler { msg, _ ->
node2.net.send(msg, node3.info.address)
with(node3) {
node2.net.addMessageHandler { msg, registration ->
node2.net.addMessageHandler { msg, _ ->
finalDelivery = msg
@ -69,7 +69,7 @@ class InMemoryMessagingTests {
val bits = "test-content".toByteArray()
var counter = 0
listOf(node1, node2, node3).forEach { it.net.addMessageHandler { msg, registration -> counter++ } }
listOf(node1, node2, node3).forEach { it.net.addMessageHandler { _, _ -> counter++ } }
node1.net.send(node2.net.createMessage("test.topic", DEFAULT_SESSION_ID, bits), network.messagingNetwork.everyoneOnline)
network.runNetwork(rounds = 1)
assertEquals(3, counter)
@ -85,7 +85,7 @@ class InMemoryMessagingTests {
val node2 = network.createNode(networkMapAddress = node1.info.address)
var received: Int = 0
node1.net.addMessageHandler("valid_message") { msg, reg ->
node1.net.addMessageHandler("valid_message") { _, _ ->
@ -219,7 +219,7 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
private fun MockNode.subscribe(): List<Update> {
val request = SubscribeRequest(true, info.address)
val updates = BlockingArrayQueue<Update>()
services.networkService.addMessageHandler(PUSH_TOPIC, DEFAULT_SESSION_ID) { message, r ->
services.networkService.addMessageHandler(PUSH_TOPIC, DEFAULT_SESSION_ID) { message, _ ->
updates += message.data.deserialize<Update>()
val response = services.networkService.sendRequest<SubscribeResponse>(SUBSCRIPTION_TOPIC, request, mapServiceNode.info.address)
@ -260,6 +260,7 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
class Added(node: NodeInfo) : Changed(node) {
constructor(node: MockNode) : this(node.info)
class Removed(node: NodeInfo) : Changed(node) {
constructor(node: MockNode) : this(node.info)
@ -208,10 +208,10 @@ class ArtemisMessagingTests {
val messagingClient = createMessagingClient()
messagingClient.addMessageHandler(topic) { message, r ->
messagingClient.addMessageHandler(topic) { message, _ ->
messagingClient.addMessageHandler(NetworkMapService.FETCH_TOPIC) { message, r ->
messagingClient.addMessageHandler(NetworkMapService.FETCH_TOPIC) { message, _ ->
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.
@ -75,7 +75,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
fun createAll(): List<SimulatedNode> {
return bankLocations.mapIndexed { i, location ->
return bankLocations.mapIndexed { i, _ ->
// Use deterministic seeds so the simulation is stable. Needed so that party owning keys are stable.
network.createNode(networkMap.info.address, start = false, nodeFactory = this, entropyRoot = BigInteger.valueOf(i.toLong())) as SimulatedNode
@ -258,7 +258,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
private fun linkConsensus(nodes: Collection<SimulatedNode>, flow: FlowLogic<*>) {
flow.progressTracker?.changes?.subscribe { change: ProgressTracker.Change ->
flow.progressTracker?.changes?.subscribe { _: ProgressTracker.Change ->
// Runs on node thread.
if (flow.progressTracker!!.currentStep == ProgressTracker.DONE) {
@ -157,7 +157,7 @@ class NetworkMapVisualiser : Application() {
stage.focusedProperty().addListener { value, old, new ->
stage.focusedProperty().addListener { _, _, new ->
if (new) {
@ -209,7 +209,7 @@ class NetworkMapVisualiser : Application() {
viewModel.runningPausedState = newRunningPausedState
.addListener { ov, value, newValue -> viewModel.displayStyle = newValue }
.addListener { _, _, newValue -> viewModel.displayStyle = newValue }
viewModel.simulation.dateChanges.observeOn(uiThread).subscribe { view.dateLabel.text = it.format(DateTimeFormatter.ofLocalizedDate(FormatStyle.MEDIUM)) }
@ -239,7 +239,7 @@ class NetworkMapVisualiser : Application() {
val extraLabel = viewModel.simulation.extraNodeLabels[node]
val label = if (extraLabel != null) "${node.info.legalIdentity.name}: $extraLabel" else node.info.legalIdentity.name
val widget = view.buildProgressTrackerWidget(label, tracker.topLevelTracker)
println("Added: ${tracker}, ${widget}")
println("Added: $tracker, $widget")
viewModel.trackerBoxes[tracker] = widget
view.sidebar.children += widget.vbox
} else {
@ -256,7 +256,7 @@ class NetworkMapVisualiser : Application() {
val pane = viewModel.trackerBoxes[tracker]!!.vbox
// Slide the other tracker widgets up and over this one.
val slideProp = SimpleDoubleProperty(0.0)
slideProp.addListener { obv -> pane.padding = Insets(0.0, 0.0, slideProp.value, 0.0) }
slideProp.addListener { _ -> pane.padding = Insets(0.0, 0.0, slideProp.value, 0.0) }
val timeline = Timeline(
KeyValue(pane.opacityProperty(), 0.0),
@ -62,14 +62,14 @@ class VisualiserViewModel {
fun repositionNodes() {
for ((index, bank) in simulation.banks.withIndex()) {
nodesToWidgets[bank]!!.position(index, when (displayStyle) {
Style.MAP -> { node, index -> nodeMapCoords(node) }
Style.CIRCLE -> { node, index -> nodeCircleCoords(NetworkMapVisualiser.NodeType.BANK, index) }
Style.MAP -> { node, _ -> nodeMapCoords(node) }
Style.CIRCLE -> { _, index -> nodeCircleCoords(NetworkMapVisualiser.NodeType.BANK, index) }
for ((index, serviceProvider) in (simulation.serviceProviders + simulation.regulators).withIndex()) {
nodesToWidgets[serviceProvider]!!.position(index, when (displayStyle) {
Style.MAP -> { node, index -> nodeMapCoords(node) }
Style.CIRCLE -> { node, index -> nodeCircleCoords(NetworkMapVisualiser.NodeType.SERVICE, index) }
Style.MAP -> { node, _ -> nodeMapCoords(node) }
Style.CIRCLE -> { _, index -> nodeCircleCoords(NetworkMapVisualiser.NodeType.SERVICE, index) }
@ -170,8 +170,8 @@ class VisualiserViewModel {
val widget = NodeWidget(forNode, innerDot, outerDot, longPulseOuterDot, pulseAnim, longPulseAnim, nameLabel, statusLabel)
when (displayStyle) {
Style.CIRCLE -> widget.position(index, { node, index -> nodeCircleCoords(nodeType, index) })
Style.MAP -> widget.position(index, { node, index -> nodeMapCoords(node) })
Style.CIRCLE -> widget.position(index, { _, index -> nodeCircleCoords(nodeType, index) })
Style.MAP -> widget.position(index, { node, _ -> nodeMapCoords(node) })
return widget
@ -3,7 +3,6 @@ package net.corda.testing
import net.corda.core.contracts.*
import net.corda.core.crypto.*
import net.corda.core.node.ServiceHub
import net.corda.core.node.recordTransactions
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
@ -75,7 +74,7 @@ fun LedgerDSLInterpreter<TransactionDSLInterpreter>.ledger(
* the triggered diagnostic.
sealed class EnforceVerifyOrFail {
internal object Token: EnforceVerifyOrFail()
internal object Token : EnforceVerifyOrFail()
class DuplicateOutputLabel(label: String) : Exception("Output label '$label' already used")
@ -154,7 +153,7 @@ data class TestTransactionDSLInterpreter private constructor(
) = dsl(TransactionDSL(copy()))
data class TestLedgerDSLInterpreter private constructor (
data class TestLedgerDSLInterpreter private constructor(
val services: ServiceHub,
internal val labelToOutputStateAndRefs: HashMap<String, StateAndRef<ContractState>> = HashMap(),
private val transactionWithLocations: HashMap<SecureHash, WireTransactionWithLocation> = LinkedHashMap(),
@ -233,7 +232,7 @@ data class TestLedgerDSLInterpreter private constructor (
fun outputToLabel(state: ContractState): String? =
labelToOutputStateAndRefs.filter { it.value.state.data == state }.keys.firstOrNull()
labelToOutputStateAndRefs.filter { it.value.state.data == state }.keys.firstOrNull()
private fun <R> recordTransactionWithTransactionMap(
transactionLabel: String?,
@ -283,7 +282,7 @@ data class TestLedgerDSLInterpreter private constructor (
try {
val usedInputs = mutableSetOf<StateRef>()
services.recordTransactions(transactionsUnverified.map { SignedTransaction(it.serialized, listOf(NullSignature)) })
for ((key, value) in transactionWithLocations) {
for ((_, value) in transactionWithLocations) {
val wtx = value.transaction
val ltx = wtx.toLedgerTransaction(services)
@ -66,7 +66,7 @@ class InMemoryMessagingNetwork(
private val messageSendQueue = LinkedBlockingQueue<MessageTransfer>()
private val _sentMessages = PublishSubject.create<MessageTransfer>()
@Suppress("unused") // Used by the visualiser tool.
/** A stream of (sender, message, recipients) triples */
/** A stream of (sender, message, recipients) triples */
val sentMessages: Observable<MessageTransfer>
get() = _sentMessages
@ -82,7 +82,7 @@ class InMemoryMessagingNetwork(
private val serviceToPeersMapping = HashMap<ServiceHandle, LinkedHashSet<PeerHandle>>()
@Suppress("unused") // Used by the visualiser tool.
/** A stream of (sender, message, recipients) triples */
/** A stream of (sender, message, recipients) triples */
val receivedMessages: Observable<MessageTransfer>
get() = _receivedMessages
@ -217,10 +217,11 @@ class InMemoryMessagingNetwork(
return pickFrom[random.nextInt(pickFrom.size)]
class RoundRobin : ServicePeerAllocationStrategy() {
val previousPicks = HashMap<ServiceHandle, Int>()
override fun <A> pickNext(service: ServiceHandle, pickFrom: List<A>): A {
val nextIndex = previousPicks.compute(service) { _key, previous ->
val nextIndex = previousPicks.compute(service) { _, previous ->
(previous?.plus(1) ?: 0) % pickFrom.size
return pickFrom[nextIndex]
@ -35,9 +35,7 @@ import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.nodeapi.User
import org.apache.commons.lang.SystemUtils
import org.controlsfx.dialog.ExceptionDialog
import tornadofx.App
import tornadofx.addStageIcon
import tornadofx.find
import tornadofx.*
import java.util.*
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.ExecutionException
@ -99,7 +97,7 @@ class Main : App(MainView::class) {
init {
// Shows any uncaught exception in exception dialog.
Thread.setDefaultUncaughtExceptionHandler { thread, throwable ->
Thread.setDefaultUncaughtExceptionHandler { _, throwable ->
// Show exceptions in exception dialog. Ensure this runs in application thread.
runInFxApplicationThread {
@ -156,19 +154,19 @@ fun main(args: Array<String>) {
// TODO : Supported flow should be exposed somehow from the node instead of set of ServiceInfo.
val notary = startNode("Notary", advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)),
customOverrides = mapOf("nearestCity" to "Zurich"))
customOverrides = mapOf("nearestCity" to "Zurich"))
val alice = startNode("Alice", rpcUsers = arrayListOf(user),
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("cash"))),
customOverrides = mapOf("nearestCity" to "Milan"))
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("cash"))),
customOverrides = mapOf("nearestCity" to "Milan"))
val bob = startNode("Bob", rpcUsers = arrayListOf(user),
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("cash"))),
customOverrides = mapOf("nearestCity" to "Madrid"))
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("cash"))),
customOverrides = mapOf("nearestCity" to "Madrid"))
val issuerGBP = startNode("UK Bank Plc", rpcUsers = arrayListOf(manager),
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("issuer.GBP"))),
customOverrides = mapOf("nearestCity" to "London"))
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("issuer.GBP"))),
customOverrides = mapOf("nearestCity" to "London"))
val issuerUSD = startNode("USA Bank Corp", rpcUsers = arrayListOf(manager),
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("issuer.USD"))),
customOverrides = mapOf("nearestCity" to "New York"))
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("issuer.USD"))),
customOverrides = mapOf("nearestCity" to "New York"))
val notaryNode = notary.get()
val aliceNode = alice.get()
@ -220,12 +218,12 @@ fun main(args: Array<String>) {
val maxIterations = 100000
val flowHandles = mapOf(
"GBPIssuer" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations+1),
"USDIssuer" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations+1),
"Alice" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations+1),
"Bob" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations+1),
"GBPExit" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations+1),
"USDExit" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations+1)
"GBPIssuer" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations + 1),
"USDIssuer" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations + 1),
"Alice" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations + 1),
"Bob" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations + 1),
"GBPExit" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations + 1),
"USDExit" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations + 1)
flowHandles.forEach {
@ -252,14 +250,14 @@ fun main(args: Array<String>) {
println("[$i] ISSUING ${command.amount} with ref ${command.issueRef} to ${command.recipient}")
val cmd = command.startFlow(issuerRPCGBP)
cmd.progress.subscribe({}, {})?.unsubscribe()
issuerUSDEventGenerator.bankOfCordaIssueGenerator.map { command ->
println("[$i] ISSUING ${command.amount} with ref ${command.issueRef} to ${command.recipient}")
val cmd = command.startFlow(issuerRPCUSD)
cmd.progress.subscribe({}, {})?.unsubscribe()
@ -270,14 +268,14 @@ fun main(args: Array<String>) {
println("[$i] EXITING ${command.amount} with ref ${command.issueRef}")
val cmd = command.startFlow(issuerRPCGBP)
cmd.progress.subscribe({}, {})?.unsubscribe()
issuerUSDEventGenerator.bankOfCordaExitGenerator.map { command ->
println("[$i] EXITING ${command.amount} with ref ${command.issueRef}")
val cmd = command.startFlow(issuerRPCUSD)
cmd.progress.subscribe({}, {})?.unsubscribe()
@ -289,7 +287,7 @@ fun main(args: Array<String>) {
println("[$i] SENDING ${command.amount} from ${aliceRPC.nodeIdentity().legalIdentity} to ${command.recipient}")
val cmd = command.startFlow(aliceRPC)
cmd.progress.subscribe({}, {})?.unsubscribe()
@ -298,7 +296,7 @@ fun main(args: Array<String>) {
println("[$i] SENDING ${command.amount} from ${bobRPC.nodeIdentity().legalIdentity} to ${command.recipient}")
val cmd = command.startFlow(bobRPC)
cmd.progress.subscribe({}, {})?.unsubscribe()
@ -29,7 +29,7 @@ class Dashboard : CordaView() {
init {
Bindings.bindContent(tilePane.children, widgetPanes)
// Dynamically change column count and width according to the window size.
tilePane.widthProperty().addListener { e ->
tilePane.widthProperty().addListener { _ ->
val prefWidth = 350
val columns: Int = ((tilePane.width - 10) / prefWidth).toInt()
tilePane.children.forEach { (it as? TitledPane)?.prefWidth = (tilePane.width - 10) / columns }
@ -118,7 +118,7 @@ class Network : CordaView() {
Bindings.bindContent(mapPane.children, mapLabels)
// Run once when the screen is ready.
// TODO : Find a better way to do this.
mapPane.heightProperty().addListener { _o, old, _new ->
mapPane.heightProperty().addListener { _, old, _ ->
if (old == 0.0) myMapLabel.value?.let { mapScrollPane.centerLabel(it) }
// Listen on zooming gesture, if device has gesture support.
@ -128,7 +128,7 @@ class Network : CordaView() {
zoomInButton.setOnAction { zoom(1.2) }
zoomOutButton.setOnAction { zoom(0.8) }
lastTransactions.addListener { observableValue, old, new ->
lastTransactions.addListener { _, _, new ->
new?.forEach {
it.first.value?.let { a ->
it.second.value?.let { b ->
@ -17,8 +17,8 @@ import javafx.scene.layout.BorderPane
import javafx.scene.layout.HBox
import javafx.scene.layout.Priority
import javafx.scene.layout.VBox
import net.corda.client.jfx.utils.*
import net.corda.client.jfx.model.*
import net.corda.client.jfx.utils.*
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Amount
import net.corda.core.contracts.StateAndRef
@ -220,7 +220,7 @@ class CashViewer : CordaView("Cash") {
root.isExpanded = true
isShowRoot = false
// TODO use smart resize
setColumnPrefWidthPolicy { tableWidthWithoutPaddingAndBorder, column ->
setColumnPrefWidthPolicy { tableWidthWithoutPaddingAndBorder, _ ->
Math.floor(tableWidthWithoutPaddingAndBorder.toDouble() / columns.size).toInt()
@ -229,7 +229,7 @@ class CashViewer : CordaView("Cash") {
cashViewerTableIssuerCurrency.setCellValueFactory {
val node = it.value.value
when (node) {
// TODO: Anonymous should probably be italicised or similar
// TODO: Anonymous should probably be italicised or similar
is ViewerNode.IssuerNode -> SimpleStringProperty(node.issuer.nameOrNull() ?: "Anonymous")
is ViewerNode.CurrencyNode -> node.amount.map { it.token.toString() }
@ -308,7 +308,7 @@ class CashViewer : CordaView("Cash") {
linechart(null, xAxis, yAxis) {
series("USD") {
sumAmount.addListener { observableValue, old, new ->
sumAmount.addListener { _, _, _ ->
val lastTimeStamp = data.last().value?.xValue
if (lastTimeStamp == null || System.currentTimeMillis() - lastTimeStamp.toLong() > 1.seconds.toMillis()) {
data(System.currentTimeMillis(), sumAmount.value.quantity)
@ -52,23 +52,23 @@ fun hang(hangIntervalRange: LongRange) = Disruption("Hang randomly") { node, ran
node.doWhileSigStopped { Thread.sleep(hangIntervalMs) }
val restart = Disruption("Restart randomly") { node, random ->
node.connection.runShellCommandGetOutput("sudo systemctl restart ${node.configuration.remoteSystemdServiceName}").getResultOrThrow()
val restart = Disruption("Restart randomly") { (configuration, connection), _ ->
connection.runShellCommandGetOutput("sudo systemctl restart ${configuration.remoteSystemdServiceName}").getResultOrThrow()
val kill = Disruption("Kill randomly") { node, random ->
val kill = Disruption("Kill randomly") { node, _ ->
val pid = node.getNodePid()
node.connection.runShellCommandGetOutput("sudo kill $pid")
val deleteDb = Disruption("Delete persistence database without restart") { node, random ->
node.connection.runShellCommandGetOutput("sudo rm ${node.configuration.remoteNodeDirectory}/persistence.mv.db").getResultOrThrow()
val deleteDb = Disruption("Delete persistence database without restart") { (configuration, connection), _ ->
connection.runShellCommandGetOutput("sudo rm ${configuration.remoteNodeDirectory}/persistence.mv.db").getResultOrThrow()
fun strainCpu(parallelism: Int, durationSeconds: Int) = Disruption("Put strain on cpu") { node, random ->
fun strainCpu(parallelism: Int, durationSeconds: Int) = Disruption("Put strain on cpu") { (_, connection), _ ->
val shell = "for c in {1..$parallelism} ; do openssl enc -aes-128-cbc -in /dev/urandom -pass pass: -e > /dev/null & done && JOBS=\$(jobs -p) && (sleep $durationSeconds && kill \$JOBS) & wait"
@ -7,12 +7,8 @@ import net.corda.client.mock.replicatePoisson
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.USD
import net.corda.core.crypto.AbstractParty
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowException
import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow
import net.corda.core.toFuture
import net.corda.flows.CashException
import net.corda.flows.CashFlowCommand
import net.corda.loadtest.LoadTest
import net.corda.loadtest.NodeHandle
@ -39,7 +35,7 @@ val selfIssueTest = LoadTest<SelfIssueCommand, SelfIssueState>(
"Self issuing cash randomly",
generate = { state, parallelism ->
generate = { _, parallelism ->
val generateIssue = Generator.pickOne(simpleNodes).bind { node: NodeHandle ->
generateIssue(1000, USD, notary.info.notaryIdentity, listOf(node.info.legalIdentity)).map {
SelfIssueCommand(it, node)
@ -73,13 +69,13 @@ val selfIssueTest = LoadTest<SelfIssueCommand, SelfIssueState>(
gatherRemoteState = { previousState ->
val selfIssueVaults = HashMap<AbstractParty, Long>()
simpleNodes.forEach { node ->
val vault = node.connection.proxy.vaultAndUpdates().first
simpleNodes.forEach { (_, connection, info) ->
val vault = connection.proxy.vaultAndUpdates().first
vault.forEach {
val state = it.state.data
if (state is Cash.State) {
val issuer = state.amount.token.issuer.party
if (issuer == node.info.legalIdentity as AbstractParty) {
if (issuer == info.legalIdentity as AbstractParty) {
selfIssueVaults.put(issuer, (selfIssueVaults[issuer] ?: 0L) + state.amount.quantity)
@ -91,7 +87,7 @@ val selfIssueTest = LoadTest<SelfIssueCommand, SelfIssueState>(
if (!diff.isUntouched) {
var diffString = ""
diff.visit { node, visit ->
diff.visit { node, _ ->
if (node.isChanged && node.children.all { !it.isChanged }) {
diffString += "${node.propertyPath}: simulated[${node.canonicalGet(previousState.vaultsSelfIssued)}], actual[${node.canonicalGet(selfIssueVaults)}]\n"
