Include fix for NPEs with RPC exceptions.

This commit is contained in:
Chris Rankin 2017-02-03 11:46:54 +00:00
commit 0c1a6aad6b
33 changed files with 305 additions and 165 deletions

View File

@ -1,6 +1,8 @@
package net.corda.client
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.issuedBy
import net.corda.core.flows.FlowException
import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow
import net.corda.core.node.services.ServiceInfo
@ -8,26 +10,27 @@ import net.corda.core.random63BitValue
import net.corda.core.serialization.OpaqueBytes
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.node.driver.DriverBasedTest
import net.corda.node.driver.NodeHandle
import net.corda.node.driver.driver
import net.corda.node.internal.Node
import net.corda.node.services.User
import net.corda.node.services.config.configureTestSSL
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.testing.node.NodeBasedTest
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Before
import org.junit.Test
class CordaRPCClientTest : DriverBasedTest() {
class CordaRPCClientTest : NodeBasedTest() {
private val rpcUser = User("user1", "test", permissions = setOf(startFlowPermission<CashFlow>()))
private lateinit var node: NodeHandle
private lateinit var node: Node
private lateinit var client: CordaRPCClient
override fun setup() = driver(isDebug = true) {
node = startNode(rpcUsers = listOf(rpcUser), advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type))).getOrThrow()
client = node.rpcClientToNode()
runTest()
@Before
fun setUp() {
node = startNode("Alice", rpcUsers = listOf(rpcUser), advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type))).getOrThrow()
client = CordaRPCClient(node.configuration.artemisAddress, configureTestSSL())
}
@Test
@ -50,7 +53,7 @@ class CordaRPCClientTest : DriverBasedTest() {
}
@Test
fun `indefinite block bug`() {
fun `close-send deadlock and premature shutdown on empty observable`() {
println("Starting client")
client.start(rpcUser.username, rpcUser.password)
println("Creating proxy")
@ -58,11 +61,25 @@ class CordaRPCClientTest : DriverBasedTest() {
println("Starting flow")
val flowHandle = proxy.startFlow(
::CashFlow,
CashCommand.IssueCash(20.DOLLARS, OpaqueBytes.of(0), node.nodeInfo.legalIdentity, node.nodeInfo.legalIdentity))
CashCommand.IssueCash(20.DOLLARS, OpaqueBytes.of(0), node.info.legalIdentity, node.info.legalIdentity))
println("Started flow, waiting on result")
flowHandle.progress.subscribe {
println("PROGRESS $it")
}
println("Result: ${flowHandle.returnValue.toBlocking().first()}")
println("Result: ${flowHandle.returnValue.getOrThrow()}")
}
@Test
fun `FlowException thrown by flow`() {
client.start(rpcUser.username, rpcUser.password)
val proxy = client.proxy()
val handle = proxy.startFlow(::CashFlow, CashCommand.PayCash(
amount = 100.DOLLARS.issuedBy(node.info.legalIdentity.ref(1)),
recipient = node.info.legalIdentity
))
// TODO Restrict this to CashException once RPC serialisation has been fixed
assertThatExceptionOfType(FlowException::class.java).isThrownBy {
handle.returnValue.getOrThrow()
}
}
}

View File

@ -124,7 +124,7 @@ class NodeMonitorModelTest : DriverBasedTest() {
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.legalIdentity,
notary = notaryNode.notaryIdentity
)).returnValue.toBlocking().first()
)).returnValue.getOrThrow()
rpc.startFlow(::CashFlow, CashCommand.PayCash(
amount = Amount(100, Issued(PartyAndReference(aliceNode.legalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),

View File

@ -6,10 +6,7 @@ package net.corda.core
import com.google.common.base.Function
import com.google.common.base.Throwables
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.*
import kotlinx.support.jdk7.use
import net.corda.core.crypto.newSecureRandom
import org.slf4j.Logger
@ -115,7 +112,7 @@ inline fun <T> SettableFuture<T>.catch(block: () -> T) {
}
}
fun <A> ListenableFuture<A>.toObservable(): Observable<A> {
fun <A> ListenableFuture<out A>.toObservable(): Observable<A> {
return Observable.create { subscriber ->
success {
subscriber.onNext(it)
@ -384,15 +381,24 @@ fun <T> Observer<T>.tee(vararg teeTo: Observer<T>): Observer<T> {
/**
* Returns a [ListenableFuture] bound to the *first* item emitted by this Observable. The future will complete with a
* NoSuchElementException if no items are emitted or any other error thrown by the Observable.
* NoSuchElementException if no items are emitted or any other error thrown by the Observable. If it's cancelled then
* it will unsubscribe from the observable.
*/
fun <T> Observable<T>.toFuture(): ListenableFuture<T> {
val future = SettableFuture.create<T>()
first().subscribe(
{ future.set(it) },
{ future.setException(it) }
)
return future
fun <T> Observable<T>.toFuture(): ListenableFuture<T> = ObservableToFuture(this)
private class ObservableToFuture<T>(observable: Observable<T>) : AbstractFuture<T>(), Observer<T> {
private val subscription = observable.first().subscribe(this)
override fun onNext(value: T) {
set(value)
}
override fun onError(e: Throwable) {
setException(e)
}
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
subscription.unsubscribe()
return super.cancel(mayInterruptIfRunning)
}
override fun onCompleted() {}
}
/** Return the sum of an Iterable of [BigDecimal]s. */

View File

@ -1,5 +1,6 @@
package net.corda.core.messaging
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.CompositeKey
@ -107,15 +108,16 @@ interface CordaRPCOps : RPCOps {
fun uploadFile(dataType: String, name: String?, file: InputStream): String
/**
* Returns the node-local current time.
* Returns the node's current time.
*/
fun currentNodeTime(): Instant
/**
* Returns an Observable emitting a single Unit once the node is registered with the network map.
* Returns a [ListenableFuture] which completes when the node has registered wih the network map service. It can also
* complete with an exception if it is unable to.
*/
@RPCReturnsObservables
fun waitUntilRegisteredWithNetworkMap(): Observable<Unit>
fun waitUntilRegisteredWithNetworkMap(): ListenableFuture<Unit>
// TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of
// the node's state locally and query that directly.
@ -179,13 +181,10 @@ inline fun <T : Any, A, B, C, D, reified R : FlowLogic<T>> CordaRPCOps.startFlow
*
* @param id The started state machine's ID.
* @param progress The stream of progress tracker events.
* @param returnValue An Observable emitting a single event containing the flow's return value.
* To block on this value:
* val returnValue = rpc.startFlow(::MyFlow).returnValue.toBlocking().first()
* @param returnValue A [ListenableFuture] of the flow's return value.
*/
data class FlowHandle<A>(
val id: StateMachineRunId,
val progress: Observable<String>,
// TODO This should be ListenableFuture<A>
val returnValue: Observable<A>
val returnValue: ListenableFuture<A>
)

View File

@ -72,12 +72,7 @@ interface NetworkMapCache {
*/
/** Look up the node info for a specific peer key. */
fun getNodeByLegalIdentityKey(compositeKey: CompositeKey): NodeInfo? {
// Although we should never have more than one match, it is theoretically possible. Report an error if it happens.
val candidates = partyNodes.filter { it.legalIdentity.owningKey == compositeKey }
check(candidates.size <= 1) { "Found more than one match for key $compositeKey" }
return candidates.singleOrNull()
}
fun getNodeByLegalIdentityKey(compositeKey: CompositeKey): NodeInfo?
/** Look up all nodes advertising the service owned by [compositeKey] */
fun getNodesByAdvertisedServiceIdentityKey(compositeKey: CompositeKey): List<NodeInfo> {
return partyNodes.filter { it.advertisedServices.any { it.identity.owningKey == compositeKey } }

View File

@ -5,6 +5,7 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Registration
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
@ -66,7 +67,7 @@ import kotlin.reflect.jvm.javaType
*/
// A convenient instance of Kryo pre-configured with some useful things. Used as a default by various functions.
val THREAD_LOCAL_KRYO = ThreadLocal.withInitial { createKryo() }
val THREAD_LOCAL_KRYO: ThreadLocal<Kryo> = ThreadLocal.withInitial { createKryo() }
/**
* A type safe wrapper around a byte array that contains a serialised object. You can call [SerializedBytes.deserialize]
@ -76,7 +77,7 @@ class SerializedBytes<T : Any>(bytes: ByteArray) : OpaqueBytes(bytes) {
// It's OK to use lazy here because SerializedBytes is configured to use the ImmutableClassSerializer.
val hash: SecureHash by lazy { bytes.sha256() }
fun writeToFile(path: Path) = Files.write(path, bytes)
fun writeToFile(path: Path): Path = Files.write(path, bytes)
}
// Some extension functions that make deserialisation convenient and provide auto-casting of the result.
@ -385,8 +386,7 @@ object KotlinObjectSerializer : Serializer<DeserializeAsKotlinObjectDef>() {
return type.getField("INSTANCE").get(null) as DeserializeAsKotlinObjectDef
}
override fun write(kryo: Kryo, output: Output, obj: DeserializeAsKotlinObjectDef) {
}
override fun write(kryo: Kryo, output: Output, obj: DeserializeAsKotlinObjectDef) {}
}
fun createKryo(k: Kryo = Kryo()): Kryo {
@ -402,12 +402,10 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
// Because we like to stick a Kryo object in a ThreadLocal to speed things up a bit, we can end up trying to
// serialise the Kryo object itself when suspending a fiber. That's dumb, useless AND can cause crashes, so
// we avoid it here.
register(Kryo::class.java, object : Serializer<Kryo>() {
override fun read(kryo: Kryo, input: Input, type: Class<Kryo>): Kryo {
return createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo)
}
override fun write(kryo: Kryo, output: Output, obj: Kryo) {}
})
register(Kryo::class,
read = { kryo, input -> createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo) },
write = { kryo, output, obj -> }
)
register(EdDSAPublicKey::class.java, Ed25519PublicKeySerializer)
register(EdDSAPrivateKey::class.java, Ed25519PrivateKeySerializer)
@ -435,6 +433,8 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
// This ensures a NonEmptySetSerializer is constructed with an initial value.
register(NonEmptySet::class.java, NonEmptySetSerializer)
register(Array<StackTraceElement>::class, read = { kryo, input -> emptyArray() }, write = { kryo, output, o -> })
/** This ensures any kotlin objects that implement [DeserializeAsKotlinObjectDef] are read back in as singletons. */
addDefaultSerializer(DeserializeAsKotlinObjectDef::class.java, KotlinObjectSerializer)
@ -451,6 +451,19 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
}
}
inline fun <T : Any> Kryo.register(
type: KClass<T>,
crossinline read: (Kryo, Input) -> T,
crossinline write: (Kryo, Output, T) -> Unit): Registration {
return register(
type.java,
object : Serializer<T>() {
override fun read(kryo: Kryo, input: Input, type: Class<T>): T = read(kryo, input)
override fun write(kryo: Kryo, output: Output, obj: T) = write(kryo, output, obj)
}
)
}
/**
* Use this method to mark any types which can have the same instance within it more than once. This will make sure
* the serialised form is stable across multiple serialise-deserialise cycles. Using this on a type with internal cyclic
@ -517,7 +530,7 @@ var Kryo.attachmentStorage: AttachmentStorage?
//Used in Merkle tree calculation. It doesn't cover all the cases of unstable serialization format.
fun extendKryoHash(kryo: Kryo): Kryo {
return kryo.apply {
setReferences(false)
references = false
register(LinkedHashMap::class.java, MapSerializer())
register(HashMap::class.java, OrderedSerializer)
}

View File

@ -4,6 +4,7 @@ import org.assertj.core.api.Assertions.*
import org.junit.Test
import rx.subjects.PublishSubject
import java.util.*
import java.util.concurrent.CancellationException
class UtilsTest {
@Test
@ -44,4 +45,16 @@ class UtilsTest {
future.getOrThrow()
}.isSameAs(exception)
}
@Test
fun `toFuture - cancel`() {
val subject = PublishSubject.create<String>()
val future = subject.toFuture()
future.cancel(false)
assertThat(subject.hasObservers()).isFalse()
subject.onNext("Hello")
assertThatExceptionOfType(CancellationException::class.java).isThrownBy {
future.get()
}
}
}

View File

@ -46,6 +46,14 @@ through to the server where the corresponding server-side observables are also u
a warning printed to the logs and the proxy will be closed for you. But don't rely on this, as garbage
collection is non-deterministic.
Futures
-------
A method can also return a ``ListenableFuture`` in its object graph and it will be treated in a similar manner to
observables, including needing to mark the RPC with the ``@RPCReturnsObservables`` annotation. Unlike for an observable,
once the single value (or an exception) has been received all server-side resources will be released automatically. Calling
the ``cancel`` method on the future will unsubscribe it from any future value and release any resources.
Versioning
----------

View File

@ -9,7 +9,6 @@ import net.corda.core.messaging.startFlow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.Vault
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.toFuture
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.node.driver.driver
@ -87,7 +86,7 @@ class IntegrationTestingTutorial {
amount = i.DOLLARS.issuedBy(alice.nodeInfo.legalIdentity.ref(issueRef)),
recipient = alice.nodeInfo.legalIdentity
))
flowHandle.returnValue.toFuture().getOrThrow()
flowHandle.returnValue.getOrThrow()
}
aliceVaultUpdates.expectEvents {

View File

@ -54,9 +54,11 @@ task buildCordaJAR(type: FatCapsule, dependsOn: ['buildCertSigningRequestUtility
applicationSource = files(project.tasks.findByName('jar'), '../build/classes/main/CordaCaplet.class', 'config/dev/log4j2.xml')
capsuleManifest {
applicationVersion = corda_version
appClassPath = ["jolokia-agent-war-${project.rootProject.ext.jolokia_version}.war"]
javaAgents = ["quasar-core-${quasar_version}-jdk8.jar"]
systemProperties['visualvm.display.name'] = 'Corda'
systemProperties['corda.version'] = corda_version
minJavaVersion = '1.8.0'
// This version is known to work and avoids earlier 8u versions that have bugs.
minUpdateVersion['1.8'] = '102'

View File

@ -11,7 +11,6 @@ import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.toFuture
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.node.driver.DriverBasedTest
@ -138,13 +137,13 @@ class DistributedServiceTests : DriverBasedTest() {
val issueHandle = aliceProxy.startFlow(
::CashFlow,
CashCommand.IssueCash(amount, OpaqueBytes.of(0), alice.nodeInfo.legalIdentity, raftNotaryIdentity))
issueHandle.returnValue.toFuture().getOrThrow()
issueHandle.returnValue.getOrThrow()
}
private fun paySelf(amount: Amount<Currency>) {
val payHandle = aliceProxy.startFlow(
::CashFlow,
CashCommand.PayCash(amount.issuedBy(alice.nodeInfo.legalIdentity.ref(0)), alice.nodeInfo.legalIdentity))
payHandle.returnValue.toFuture().getOrThrow()
payHandle.returnValue.getOrThrow()
}
}

View File

@ -380,7 +380,7 @@ open class DriverDSL(
registerProcess(processFuture)
return processFuture.flatMap { process ->
establishRpc(messagingAddress, configuration).flatMap { rpc ->
rpc.waitUntilRegisteredWithNetworkMap().toFuture().map {
rpc.waitUntilRegisteredWithNetworkMap().map {
NodeHandle(rpc.nodeIdentity(), rpc, configuration, process)
}
}

View File

@ -15,7 +15,6 @@ import net.corda.core.node.ServiceHub
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault
import net.corda.core.toObservable
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.messaging.requirePermission
import net.corda.node.services.startFlowPermission
@ -97,7 +96,7 @@ class CordaRPCOpsImpl(
return FlowHandle(
id = stateMachine.id,
progress = stateMachine.logic.track()?.second ?: Observable.empty(),
returnValue = stateMachine.resultFuture.toObservable()
returnValue = stateMachine.resultFuture
)
}
@ -111,7 +110,7 @@ class CordaRPCOpsImpl(
}
}
override fun waitUntilRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered.toObservable()
override fun waitUntilRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered
override fun partyFromKey(key: CompositeKey) = services.identityService.partyFromKey(key)
override fun partyFromName(name: String) = services.identityService.partyFromName(name)

View File

@ -31,6 +31,8 @@ import org.jetbrains.exposed.sql.Database
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.nio.channels.FileLock
import java.nio.file.Files
import java.nio.file.Paths
import java.time.Clock
import javax.management.ObjectName
import javax.servlet.*
@ -100,6 +102,27 @@ class Node(override val configuration: FullNodeConfiguration,
private lateinit var userService: RPCUserService
init {
checkVersionUnchanged()
}
/**
* Abort starting the node if an existing deployment with a different version is detected in the current directory.
* The current version is expected to be specified as a system property. If not provided, the check will be ignored.
*/
private fun checkVersionUnchanged() {
val currentVersion = System.getProperty("corda.version") ?: return
val versionFile = Paths.get("version")
if (Files.exists(versionFile)) {
val existingVersion = Files.readAllLines(versionFile)[0]
check(existingVersion == currentVersion) {
"Version change detected - current: $currentVersion, existing: $existingVersion. Node upgrades are not yet supported."
}
} else {
Files.write(versionFile, currentVersion.toByteArray())
}
}
override fun makeMessagingService(): MessagingServiceInternal {
userService = RPCUserServiceImpl(configuration)

View File

@ -8,9 +8,10 @@ import com.esotericsoftware.kryo.Registration
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.serializers.JavaSerializer
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import de.javakaffee.kryoserializers.ArraysAsListSerializer
import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer
import de.javakaffee.kryoserializers.guava.*
import net.corda.contracts.asset.Cash
import net.corda.core.ErrorOr
@ -19,6 +20,8 @@ import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowException
import net.corda.core.flows.IllegalFlowLogicException
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.StateMachineInfo
@ -26,12 +29,15 @@ import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.serialization.*
import net.corda.core.toFuture
import net.corda.core.toObservable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.node.internal.AbstractNode
import net.corda.node.services.User
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress
import net.corda.node.services.statemachine.FlowSessionException
import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.apache.activemq.artemis.api.core.SimpleString
@ -138,6 +144,7 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(Array<Any>(0,{}).javaClass)
register(Class::class.java, ClassSerializer)
UnmodifiableCollectionsSerializer.registerSerializers(this)
ImmutableListSerializer.registerSerializers(this)
ImmutableSetSerializer.registerSerializers(this)
ImmutableSortedSetSerializer.registerSerializers(this)
@ -207,16 +214,18 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(SimpleString::class.java)
register(ServiceEntry::class.java)
// Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway.
register(Array<StackTraceElement>::class, read = { kryo, input -> emptyArray() }, write = { kryo, output, obj -> })
register(FlowException::class.java)
register(FlowSessionException::class.java)
register(IllegalFlowLogicException::class.java)
register(RuntimeException::class.java)
register(IllegalArgumentException::class.java)
register(ArrayIndexOutOfBoundsException::class.java)
register(IndexOutOfBoundsException::class.java)
// Kryo couldn't serialize Collections.unmodifiableCollection in Throwable correctly, causing null pointer exception when try to access the deserialize object.
register(NoSuchElementException::class.java, JavaSerializer())
register(NoSuchElementException::class.java)
register(RPCException::class.java)
register(Array<StackTraceElement>::class.java, read = { kryo, input -> emptyArray() }, write = { kryo, output, o -> })
register(Collections.unmodifiableList(emptyList<String>()).javaClass)
register(PermissionException::class.java)
register(Throwable::class.java)
register(FlowHandle::class.java)
register(KryoException::class.java)
register(StringBuffer::class.java)
@ -229,20 +238,45 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
pluginRegistries.forEach { it.registerRPCKryoTypes(this) }
}
// Helper method, attempt to reduce boiler plate code
private fun <T> register(type: Class<T>, read: (Kryo, Input) -> T, write: (Kryo, Output, T) -> Unit) {
register(type, object : Serializer<T>() {
override fun read(kryo: Kryo, input: Input, type: Class<T>): T = read(kryo, input)
override fun write(kryo: Kryo, output: Output, o: T) = write(kryo, output, o)
})
// TODO: workaround to prevent Observable registration conflict when using plugin registered kyro classes
private val observableRegistration: Registration? = observableSerializer?.let { register(Observable::class.java, it, 10000) }
private val listenableFutureRegistration: Registration? = observableSerializer?.let {
// Register ListenableFuture by making use of Observable serialisation.
// TODO Serialisation could be made more efficient as a future can only emit one value (or exception)
@Suppress("UNCHECKED_CAST")
register(ListenableFuture::class,
read = { kryo, input -> it.read(kryo, input, Observable::class.java as Class<Observable<Any>>).toFuture() },
write = { kryo, output, obj -> it.write(kryo, output, obj.toObservable()) }
)
}
// TODO: workaround to prevent Observable registration conflict when using plugin registered kyro classes
val observableRegistration: Registration? = if (observableSerializer != null) register(Observable::class.java, observableSerializer, 10000) else null
// Avoid having to worry about the subtypes of FlowException by converting all of them to just FlowException.
// This is a temporary hack until a proper serialisation mechanism is in place.
private val flowExceptionRegistration: Registration = register(
FlowException::class,
read = { kryo, input ->
val message = input.readString()
val cause = kryo.readObjectOrNull(input, Throwable::class.java)
FlowException(message, cause)
},
write = { kryo, output, obj ->
// The subclass may have overridden toString so we use that
val message = if (obj.javaClass != FlowException::class.java) obj.toString() else obj.message
output.writeString(message)
kryo.writeObjectOrNull(output, obj.cause, Throwable::class.java)
}
)
override fun getRegistration(type: Class<*>): Registration {
if (Observable::class.java.isAssignableFrom(type))
return observableRegistration ?: throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables")
return observableRegistration ?:
throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables")
if (ListenableFuture::class.java.isAssignableFrom(type))
return listenableFutureRegistration ?:
throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables")
if (FlowException::class.java.isAssignableFrom(type))
return flowExceptionRegistration
return super.getRegistration(type)
}
}

View File

@ -4,6 +4,7 @@ import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.Party
import net.corda.core.map
import net.corda.core.messaging.MessagingService
@ -70,6 +71,8 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
return null
}
override fun getNodeByLegalIdentityKey(compositeKey: CompositeKey): NodeInfo? = registeredNodes[Party("", compositeKey)]
override fun track(): Pair<List<NodeInfo>, Observable<MapChange>> {
synchronized(_changed) {
return Pair(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction())

View File

@ -368,9 +368,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>, errorResponse: Pair<FlowException, Boolean>?) {
// TODO Blanking the stack trace prevents the receiving flow from filling in its own stack trace
// @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
// (errorResponse?.first as java.lang.Throwable?)?.stackTrace = emptyArray()
openSessions.values.removeIf { session ->
if (session.fiber == fiber) {
session.endSession(errorResponse)

View File

@ -1,8 +1,13 @@
package net.corda.node.messaging
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.getOrThrow
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.RPCReturnsObservables
import net.corda.core.serialization.SerializedBytes
import net.corda.core.success
import net.corda.core.utilities.LogHelper
import net.corda.node.services.RPCUserService
import net.corda.node.services.User
@ -120,30 +125,31 @@ class ClientRPCInfrastructureTests {
@RPCReturnsObservables
fun makeComplicatedObservable(): Observable<Pair<String, Observable<String>>>
@RPCReturnsObservables
fun makeListenableFuture(): ListenableFuture<Int>
@RPCReturnsObservables
fun makeComplicatedListenableFuture(): ListenableFuture<Pair<String, ListenableFuture<String>>>
@RPCSinceVersion(2)
fun addedLater()
fun captureUser(): String
}
lateinit var complicatedObservable: Observable<Pair<String, Observable<String>>>
private lateinit var complicatedObservable: Observable<Pair<String, Observable<String>>>
private lateinit var complicatedListenableFuturee: ListenableFuture<Pair<String, ListenableFuture<String>>>
inner class TestOpsImpl : TestOps {
override val protocolVersion = 1
override fun barf(): Unit = throw IllegalArgumentException("Barf!")
override fun void() {
}
override fun void() {}
override fun someCalculation(str: String, num: Int) = "$str $num"
override fun makeObservable(): Observable<Int> = Observable.just(1, 2, 3, 4)
override fun makeListenableFuture(): ListenableFuture<Int> = Futures.immediateFuture(1)
override fun makeComplicatedObservable() = complicatedObservable
override fun makeComplicatedListenableFuture(): ListenableFuture<Pair<String, ListenableFuture<String>>> = complicatedListenableFuturee
override fun addedLater(): Unit = throw UnsupportedOperationException("not implemented")
override fun captureUser(): String = CURRENT_RPC_USER.get().username
}
@ -212,6 +218,41 @@ class ClientRPCInfrastructureTests {
assertEquals(1, clientSession.addressQuery(rpcQueuesQuery).queueNames.size)
}
@Test
fun `simple ListenableFuture`() {
val value = proxy.makeListenableFuture().getOrThrow()
assertThat(value).isEqualTo(1)
}
@Test
fun `complex ListenableFuture`() {
val serverQuote = SettableFuture.create<Pair<String, ListenableFuture<String>>>()
complicatedListenableFuturee = serverQuote
val twainQuote = "Mark Twain" to Futures.immediateFuture("I have never let my schooling interfere with my education.")
val clientQuotes = LinkedBlockingQueue<String>()
val clientFuture = proxy.makeComplicatedListenableFuture()
clientFuture.success {
val name = it.first
it.second.success {
clientQuotes += "Quote by $name: $it"
}
}
val rpcQueuesQuery = SimpleString("clients.${authenticatedUser.username}.rpc.*")
assertEquals(2, clientSession.addressQuery(rpcQueuesQuery).queueNames.size)
assertThat(clientQuotes).isEmpty()
serverQuote.set(twainQuote)
assertThat(clientQuotes.take()).isEqualTo("Quote by Mark Twain: I have never let my schooling interfere with my education.")
// TODO This final assert sometimes fails because the relevant queue hasn't been removed yet
// assertEquals(1, clientSession.addressQuery(rpcQueuesQuery).queueNames.size)
}
@Test
fun versioning() {
assertFailsWith<UnsupportedOperationException> { proxy.addedLater() }
@ -221,5 +262,4 @@ class ClientRPCInfrastructureTests {
fun `authenticated user is available to RPC`() {
assertThat(proxy.captureUser()).isEqualTo(authenticatedUser.username)
}
}

View File

@ -4,7 +4,6 @@ import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.node.services.network.NetworkMapService
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.expect
import net.corda.testing.node.MockNetwork
import org.junit.Test
import java.math.BigInteger
@ -34,12 +33,7 @@ class InMemoryNetworkMapCacheTest {
databaseTransaction(nodeA.database) {
nodeA.netMapCache.addNode(nodeB.info)
}
// Now both nodes match, so it throws an error
expect<IllegalStateException> {
nodeA.netMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey)
}
expect<IllegalStateException> {
nodeA.netMapCache.getNodeByLegalIdentityKey(nodeB.info.legalIdentity.owningKey)
}
// The details of node B write over those for node A
assertEquals(nodeA.netMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeB.info)
}
}

View File

@ -390,9 +390,8 @@ class StateMachineManagerTests {
node2 sent sessionConfirm to node1,
node2 sent sessionEnd(errorFlow.exceptionThrown) to node1
)
// TODO see StateMachineManager.endAllFiberSessions
// // Make sure the original stack trace isn't sent down the wire
// assertThat((sessionTransfers.last().message as SessionEnd).errorResponse!!.stackTrace).isEmpty()
// Make sure the original stack trace isn't sent down the wire
assertThat((sessionTransfers.last().message as SessionEnd).errorResponse!!.stackTrace).isEmpty()
}
@Test

View File

@ -6,6 +6,7 @@ import net.corda.core.contracts.TransactionType
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.Emoji
@ -83,9 +84,9 @@ fun sender(rpc: CordaRPCOps) {
// Send the transaction to the other recipient
val stx = ptx.toSignedTransaction()
println("Sending ${stx.id}")
val protocolHandle = rpc.startFlow(::FinalityFlow, stx, setOf(otherSide))
protocolHandle.progress.subscribe(::println)
protocolHandle.returnValue.toBlocking().first()
val flowHandle = rpc.startFlow(::FinalityFlow, stx, setOf(otherSide))
flowHandle.progress.subscribe(::println)
flowHandle.returnValue.getOrThrow()
}
fun recipient(rpc: CordaRPCOps) {

View File

@ -5,7 +5,6 @@ import net.corda.core.contracts.DOLLARS
import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.node.driver.driver
import net.corda.node.services.User
@ -16,7 +15,6 @@ import net.corda.testing.expect
import net.corda.testing.expectEvents
import net.corda.testing.sequence
import org.junit.Test
import kotlin.test.assertTrue
class BankOfCordaRPCClientTest {
@Test
@ -45,13 +43,12 @@ class BankOfCordaRPCClientTest {
val vaultUpdatesBigCorp = bigCorpProxy.vaultAndUpdates().second
// Kick-off actual Issuer Flow
val result = bocProxy.startFlow(
bocProxy.startFlow(
::IssuanceRequester,
1000.DOLLARS,
nodeBigCorporation.nodeInfo.legalIdentity,
BOC_PARTY_REF,
nodeBankOfCorda.nodeInfo.legalIdentity).returnValue.toBlocking().first()
assertTrue { result is SignedTransaction }
nodeBankOfCorda.nodeInfo.legalIdentity).returnValue.getOrThrow()
// Check Bank of Corda Vault Updates
vaultUpdatesBoc.expectEvents {

View File

@ -2,14 +2,15 @@ package net.corda.bank.api
import com.google.common.net.HostAndPort
import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams
import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.core.contracts.Amount
import net.corda.core.contracts.currency
import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.node.services.config.configureTestSSL
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.testing.http.HttpApi
/**
@ -43,6 +44,6 @@ class BankOfCordaClientApi(val hostAndPort: HostAndPort) {
val amount = Amount(params.amount, currency(params.currency))
val issuerToPartyRef = OpaqueBytes.of(params.issueToPartyRefAsString.toByte())
return proxy.startFlow(::IssuanceRequester, amount, issueToParty, issuerToPartyRef, issuerBankParty).returnValue.toBlocking().first()
return proxy.startFlow(::IssuanceRequester, amount, issueToParty, issuerToPartyRef, issuerBankParty).returnValue.getOrThrow()
}
}

View File

@ -1,13 +1,14 @@
package net.corda.bank.api
import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.core.contracts.Amount
import net.corda.core.contracts.currency
import net.corda.core.flows.FlowException
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.loggerFor
import net.corda.flows.IssuerFlow.IssuanceRequester
import java.time.LocalDateTime
import javax.ws.rs.*
import javax.ws.rs.core.MediaType
@ -46,12 +47,13 @@ class BankOfCordaWebApi(val rpc: CordaRPCOps) {
// invoke client side of Issuer Flow: IssuanceRequester
// The line below blocks and waits for the future to resolve.
val result = rpc.startFlow(::IssuanceRequester, amount, issueToParty, issuerToPartyRef, issuerBankParty).returnValue.toBlocking().first()
if (result is SignedTransaction) {
logger.info("Issue request completed successfully: ${params}")
return Response.status(Response.Status.CREATED).build()
} else {
return Response.status(Response.Status.BAD_REQUEST).build()
val status = try {
rpc.startFlow(::IssuanceRequester, amount, issueToParty, issuerToPartyRef, issuerBankParty).returnValue.getOrThrow()
logger.info("Issue request completed successfully: $params")
Response.Status.CREATED
} catch (e: FlowException) {
Response.Status.BAD_REQUEST
}
return Response.status(status).build()
}
}

View File

@ -1,6 +1,7 @@
package net.corda.irs.api
import net.corda.core.contracts.filterStatesOfType
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.loggerFor
@ -66,12 +67,12 @@ class InterestRateSwapAPI(val rpc: CordaRPCOps) {
@Path("deals")
@Consumes(MediaType.APPLICATION_JSON)
fun storeDeal(newDeal: InterestRateSwap.State): Response {
try {
rpc.startFlow(AutoOfferFlow::Requester, newDeal).returnValue.toBlocking().first()
return Response.created(URI.create(generateDealLink(newDeal))).build()
return try {
rpc.startFlow(AutoOfferFlow::Requester, newDeal).returnValue.getOrThrow()
Response.created(URI.create(generateDealLink(newDeal))).build()
} catch (ex: Throwable) {
logger.info("Exception when creating deal: $ex")
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(ex.toString()).build()
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(ex.toString()).build()
}
}
@ -94,7 +95,7 @@ class InterestRateSwapAPI(val rpc: CordaRPCOps) {
val priorDemoDate = fetchDemoDate()
// Can only move date forwards
if (newDemoDate.isAfter(priorDemoDate)) {
rpc.startFlow(UpdateBusinessDayFlow::Broadcast, newDemoDate).returnValue.toBlocking().first()
rpc.startFlow(UpdateBusinessDayFlow::Broadcast, newDemoDate).returnValue.getOrThrow()
return Response.ok().build()
}
val msg = "demodate is already $priorDemoDate and can only be updated with a later date"
@ -113,7 +114,7 @@ class InterestRateSwapAPI(val rpc: CordaRPCOps) {
@Path("restart")
@Consumes(MediaType.APPLICATION_JSON)
fun exitServer(): Response {
rpc.startFlow(ExitServerFlow::Broadcast, 83).returnValue.toBlocking().first()
rpc.startFlow(ExitServerFlow::Broadcast, 83).returnValue.getOrThrow()
return Response.ok().build()
}
}

View File

@ -1,8 +1,10 @@
package net.corda.notarydemo
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import net.corda.core.crypto.toStringShort
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction
@ -60,9 +62,9 @@ private class NotaryDemoClientApi(val rpc: CordaRPCOps) {
*/
private fun buildTransactions(count: Int): List<SignedTransaction> {
val moveTransactions = (1..count).map {
rpc.startFlow(::DummyIssueAndMove, notary, counterpartyNode.legalIdentity).returnValue.toBlocking().toFuture()
rpc.startFlow(::DummyIssueAndMove, notary, counterpartyNode.legalIdentity).returnValue
}
return moveTransactions.map { it.get() }
return Futures.allAsList(moveTransactions).getOrThrow()
}
/**
@ -72,10 +74,8 @@ private class NotaryDemoClientApi(val rpc: CordaRPCOps) {
* @return a list of encoded signer public keys - one for every transaction
*/
private fun notariseTransactions(transactions: List<SignedTransaction>): List<String> {
val signatureFutures = transactions.map {
rpc.startFlow(NotaryFlow::Client, it).returnValue.toBlocking().toFuture()
}
return signatureFutures.map { it.get().by.toStringShort() }
val signatureFutures = transactions.map { rpc.startFlow(NotaryFlow::Client, it).returnValue }
return Futures.allAsList(signatureFutures).getOrThrow().map { it.by.toStringShort() }
}
}

View File

@ -42,8 +42,8 @@ class SimmValuationTest : IntegrationTestCategory {
}
}
private fun getPartyWithName(partyApi: HttpApi, countryparty: String): PortfolioApi.ApiParty =
getAvailablePartiesFor(partyApi).counterparties.single { it.text == countryparty }
private fun getPartyWithName(partyApi: HttpApi, counterparty: String): PortfolioApi.ApiParty =
getAvailablePartiesFor(partyApi).counterparties.single { it.text == counterparty }
private fun getAvailablePartiesFor(partyApi: HttpApi): PortfolioApi.AvailableParties {
return partyApi.getJson<PortfolioApi.AvailableParties>("whoami")
@ -68,4 +68,4 @@ class SimmValuationTest : IntegrationTestCategory {
val valuations = partyApi.getJson<PortfolioApiUtils.ValuationsView>("${counterparty.id}/portfolio/valuations")
return (valuations.initialMargin.call["total"] != 0.0)
}
}
}

View File

@ -6,6 +6,7 @@ import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.filterStatesOfType
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.Party
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.vega.analytics.InitialMarginTriple
@ -159,7 +160,7 @@ class PortfolioApi(val rpc: CordaRPCOps) {
return withParty(partyName) {
val buyer = if (swap.buySell.isBuy) ownParty else it
val seller = if (swap.buySell.isSell) ownParty else it
rpc.startFlow(IRSTradeFlow::Requester, swap.toData(buyer, seller), it).returnValue.toBlocking().first()
rpc.startFlow(IRSTradeFlow::Requester, swap.toData(buyer, seller), it).returnValue.getOrThrow()
Response.accepted().entity("{}").build()
}
}
@ -266,12 +267,12 @@ class PortfolioApi(val rpc: CordaRPCOps) {
fun startPortfolioCalculations(params: ValuationCreationParams = ValuationCreationParams(LocalDate.of(2016, 6, 6)), @PathParam("party") partyName: String): Response {
return withParty(partyName) { otherParty ->
val existingSwap = getPortfolioWith(otherParty)
if (existingSwap == null) {
rpc.startFlow(SimmFlow::Requester, otherParty, params.valuationDate).returnValue.toBlocking().first()
val flowHandle = if (existingSwap == null) {
rpc.startFlow(SimmFlow::Requester, otherParty, params.valuationDate)
} else {
val handle = rpc.startFlow(SimmRevaluation::Initiator, getPortfolioStateAndRefWith(otherParty).ref, params.valuationDate)
handle.returnValue.toBlocking().first()
rpc.startFlow(SimmRevaluation::Initiator, getPortfolioStateAndRefWith(otherParty).ref, params.valuationDate)
}
flowHandle.returnValue.getOrThrow()
withPortfolio(otherParty) { portfolioState ->
val portfolio = portfolioState.portfolio.toStateAndRef<IRSState>(rpc).toPortfolio()

View File

@ -1,19 +1,17 @@
package net.corda.traderdemo
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import net.corda.contracts.testing.calculateRandomlySizedAmounts
import net.corda.core.contracts.Amount
import net.corda.core.contracts.DOLLARS
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.Emoji
import net.corda.core.utilities.loggerFor
import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.testing.BOC
import net.corda.testing.http.HttpApi
import net.corda.traderdemo.flow.SellerFlow
import java.util.*
import kotlin.test.assertEquals
@ -26,20 +24,17 @@ class TraderDemoClientApi(val rpc: CordaRPCOps) {
val logger = loggerFor<TraderDemoClientApi>()
}
fun runBuyer(amount: Amount<Currency> = 30000.0.DOLLARS, notary: String = "Notary"): Boolean {
fun runBuyer(amount: Amount<Currency> = 30000.0.DOLLARS): Boolean {
val bankOfCordaParty = rpc.partyFromName(BOC.name)
?: throw Exception("Unable to locate ${BOC.name} in Network Map Service")
val me = rpc.nodeIdentity()
// TODO: revert back to multiple issue request amounts (3,10) when soft locking implemented
val amounts = calculateRandomlySizedAmounts(amount, 1, 1, Random())
val handles = amounts.map {
rpc.startFlow(::IssuanceRequester, amount, me.legalIdentity, OpaqueBytes.of(1), bankOfCordaParty)
}
handles.forEach {
require(it.returnValue.toBlocking().first() is SignedTransaction)
val resultFutures = amounts.map {
rpc.startFlow(::IssuanceRequester, amount, me.legalIdentity, OpaqueBytes.of(1), bankOfCordaParty).returnValue
}
Futures.allAsList(resultFutures).getOrThrow()
return true
}
@ -60,7 +55,7 @@ class TraderDemoClientApi(val rpc: CordaRPCOps) {
}
// The line below blocks and waits for the future to resolve.
val stx = rpc.startFlow(::SellerFlow, otherParty, amount).returnValue.toBlocking().first()
val stx = rpc.startFlow(::SellerFlow, otherParty, amount).returnValue.getOrThrow()
logger.info("Sale completed - we have a happy customer!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(stx.tx)}")
return true
} else {

View File

@ -21,7 +21,6 @@ import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
import net.corda.explorer.model.CashTransaction
import net.corda.explorer.model.IssuerModel
@ -101,7 +100,7 @@ class NewTransaction : Fragment() {
rpcProxy.value!!.startFlow(::CashFlow, it)
}
val response = try {
handle?.returnValue?.toFuture()?.getOrThrow()
handle?.returnValue?.getOrThrow()
} catch (e: FlowException) {
e
}

View File

@ -2,15 +2,20 @@ package net.corda.explorer.views
import org.junit.Assert.assertEquals
import org.junit.Test
import java.text.DecimalFormatSymbols
import java.util.*
class GuiUtilitiesKtTest {
@Test
fun `test to string with suffix`() {
assertEquals("10.5k", 10500.toStringWithSuffix())
//Required for this test to be independent of the default Locale.
val ds = DecimalFormatSymbols(Locale.getDefault()).decimalSeparator
assertEquals("10${ds}5k", 10500.toStringWithSuffix())
assertEquals("100", 100.toStringWithSuffix())
assertEquals("5.0M", 5000000.toStringWithSuffix())
assertEquals("1.0B", 1000000000.toStringWithSuffix())
assertEquals("1.5T", 1500000000000.toStringWithSuffix())
assertEquals("1000.0T", 1000000000000000.toStringWithSuffix())
assertEquals("5${ds}0M", 5000000.toStringWithSuffix())
assertEquals("1${ds}0B", 1000000000.toStringWithSuffix())
assertEquals("1${ds}5T", 1500000000000.toStringWithSuffix())
assertEquals("1000${ds}0T", 1000000000000000.toStringWithSuffix())
}
}

View File

@ -7,12 +7,11 @@ import net.corda.core.contracts.Issued
import net.corda.core.contracts.PartyAndReference
import net.corda.core.contracts.USD
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowException
import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.toFuture
import net.corda.flows.CashCommand
import net.corda.flows.CashException
import net.corda.flows.CashFlow
import net.corda.loadtest.LoadTest
import net.corda.loadtest.NodeHandle
@ -208,9 +207,9 @@ val crossCashTest = LoadTest<CrossCashCommand, CrossCashState>(
execute = { command ->
try {
val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.toFuture().getOrThrow()
val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.getOrThrow()
log.info("Success: $result")
} catch (e: CashException) {
} catch (e: FlowException) {
log.error("Failure", e)
}
},

View File

@ -7,11 +7,10 @@ import net.corda.client.mock.replicatePoisson
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.USD
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowException
import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow
import net.corda.core.toFuture
import net.corda.flows.CashCommand
import net.corda.flows.CashException
import net.corda.flows.CashFlow
import net.corda.loadtest.LoadTest
import net.corda.loadtest.NodeHandle
@ -63,9 +62,9 @@ val selfIssueTest = LoadTest<SelfIssueCommand, SelfIssueState>(
execute = { command ->
try {
val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.toFuture().getOrThrow()
val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.getOrThrow()
log.info("Success: $result")
} catch (e: CashException) {
} catch (e: FlowException) {
log.error("Failure", e)
}
},