diff --git a/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt b/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt index db5092e396..f4aaac98a8 100644 --- a/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt +++ b/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt @@ -1,6 +1,8 @@ package net.corda.client import net.corda.core.contracts.DOLLARS +import net.corda.core.contracts.issuedBy +import net.corda.core.flows.FlowException import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow import net.corda.core.node.services.ServiceInfo @@ -8,26 +10,27 @@ import net.corda.core.random63BitValue import net.corda.core.serialization.OpaqueBytes import net.corda.flows.CashCommand import net.corda.flows.CashFlow -import net.corda.node.driver.DriverBasedTest -import net.corda.node.driver.NodeHandle -import net.corda.node.driver.driver +import net.corda.node.internal.Node import net.corda.node.services.User +import net.corda.node.services.config.configureTestSSL import net.corda.node.services.messaging.CordaRPCClient import net.corda.node.services.startFlowPermission import net.corda.node.services.transactions.ValidatingNotaryService +import net.corda.testing.node.NodeBasedTest import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.junit.Before import org.junit.Test -class CordaRPCClientTest : DriverBasedTest() { +class CordaRPCClientTest : NodeBasedTest() { private val rpcUser = User("user1", "test", permissions = setOf(startFlowPermission())) - private lateinit var node: NodeHandle + private lateinit var node: Node private lateinit var client: CordaRPCClient - override fun setup() = driver(isDebug = true) { - node = startNode(rpcUsers = listOf(rpcUser), advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type))).getOrThrow() - client = node.rpcClientToNode() - runTest() + @Before + fun setUp() { + node = startNode("Alice", rpcUsers = listOf(rpcUser), advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type))).getOrThrow() + client = CordaRPCClient(node.configuration.artemisAddress, configureTestSSL()) } @Test @@ -50,7 +53,7 @@ class CordaRPCClientTest : DriverBasedTest() { } @Test - fun `indefinite block bug`() { + fun `close-send deadlock and premature shutdown on empty observable`() { println("Starting client") client.start(rpcUser.username, rpcUser.password) println("Creating proxy") @@ -58,11 +61,25 @@ class CordaRPCClientTest : DriverBasedTest() { println("Starting flow") val flowHandle = proxy.startFlow( ::CashFlow, - CashCommand.IssueCash(20.DOLLARS, OpaqueBytes.of(0), node.nodeInfo.legalIdentity, node.nodeInfo.legalIdentity)) + CashCommand.IssueCash(20.DOLLARS, OpaqueBytes.of(0), node.info.legalIdentity, node.info.legalIdentity)) println("Started flow, waiting on result") flowHandle.progress.subscribe { println("PROGRESS $it") } - println("Result: ${flowHandle.returnValue.toBlocking().first()}") + println("Result: ${flowHandle.returnValue.getOrThrow()}") + } + + @Test + fun `FlowException thrown by flow`() { + client.start(rpcUser.username, rpcUser.password) + val proxy = client.proxy() + val handle = proxy.startFlow(::CashFlow, CashCommand.PayCash( + amount = 100.DOLLARS.issuedBy(node.info.legalIdentity.ref(1)), + recipient = node.info.legalIdentity + )) + // TODO Restrict this to CashException once RPC serialisation has been fixed + assertThatExceptionOfType(FlowException::class.java).isThrownBy { + handle.returnValue.getOrThrow() + } } } diff --git a/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt b/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt index e0a8d6b1b3..12eab3fb5e 100644 --- a/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt +++ b/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt @@ -124,7 +124,7 @@ class NodeMonitorModelTest : DriverBasedTest() { issueRef = OpaqueBytes(ByteArray(1, { 1 })), recipient = aliceNode.legalIdentity, notary = notaryNode.notaryIdentity - )).returnValue.toBlocking().first() + )).returnValue.getOrThrow() rpc.startFlow(::CashFlow, CashCommand.PayCash( amount = Amount(100, Issued(PartyAndReference(aliceNode.legalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), diff --git a/core/src/main/kotlin/net/corda/core/Utils.kt b/core/src/main/kotlin/net/corda/core/Utils.kt index d5ca5f4312..abb9107a2e 100644 --- a/core/src/main/kotlin/net/corda/core/Utils.kt +++ b/core/src/main/kotlin/net/corda/core/Utils.kt @@ -6,10 +6,7 @@ package net.corda.core import com.google.common.base.Function import com.google.common.base.Throwables import com.google.common.io.ByteStreams -import com.google.common.util.concurrent.Futures -import com.google.common.util.concurrent.ListenableFuture -import com.google.common.util.concurrent.MoreExecutors -import com.google.common.util.concurrent.SettableFuture +import com.google.common.util.concurrent.* import kotlinx.support.jdk7.use import net.corda.core.crypto.newSecureRandom import org.slf4j.Logger @@ -115,7 +112,7 @@ inline fun SettableFuture.catch(block: () -> T) { } } -fun ListenableFuture.toObservable(): Observable { +fun ListenableFuture.toObservable(): Observable { return Observable.create { subscriber -> success { subscriber.onNext(it) @@ -384,15 +381,24 @@ fun Observer.tee(vararg teeTo: Observer): Observer { /** * Returns a [ListenableFuture] bound to the *first* item emitted by this Observable. The future will complete with a - * NoSuchElementException if no items are emitted or any other error thrown by the Observable. + * NoSuchElementException if no items are emitted or any other error thrown by the Observable. If it's cancelled then + * it will unsubscribe from the observable. */ -fun Observable.toFuture(): ListenableFuture { - val future = SettableFuture.create() - first().subscribe( - { future.set(it) }, - { future.setException(it) } - ) - return future +fun Observable.toFuture(): ListenableFuture = ObservableToFuture(this) + +private class ObservableToFuture(observable: Observable) : AbstractFuture(), Observer { + private val subscription = observable.first().subscribe(this) + override fun onNext(value: T) { + set(value) + } + override fun onError(e: Throwable) { + setException(e) + } + override fun cancel(mayInterruptIfRunning: Boolean): Boolean { + subscription.unsubscribe() + return super.cancel(mayInterruptIfRunning) + } + override fun onCompleted() {} } /** Return the sum of an Iterable of [BigDecimal]s. */ diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index fdb5939dfb..aba12670e0 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -1,5 +1,6 @@ package net.corda.core.messaging +import com.google.common.util.concurrent.ListenableFuture import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef import net.corda.core.crypto.CompositeKey @@ -107,15 +108,16 @@ interface CordaRPCOps : RPCOps { fun uploadFile(dataType: String, name: String?, file: InputStream): String /** - * Returns the node-local current time. + * Returns the node's current time. */ fun currentNodeTime(): Instant /** - * Returns an Observable emitting a single Unit once the node is registered with the network map. + * Returns a [ListenableFuture] which completes when the node has registered wih the network map service. It can also + * complete with an exception if it is unable to. */ @RPCReturnsObservables - fun waitUntilRegisteredWithNetworkMap(): Observable + fun waitUntilRegisteredWithNetworkMap(): ListenableFuture // TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of // the node's state locally and query that directly. @@ -179,13 +181,10 @@ inline fun > CordaRPCOps.startFlow * * @param id The started state machine's ID. * @param progress The stream of progress tracker events. - * @param returnValue An Observable emitting a single event containing the flow's return value. - * To block on this value: - * val returnValue = rpc.startFlow(::MyFlow).returnValue.toBlocking().first() + * @param returnValue A [ListenableFuture] of the flow's return value. */ data class FlowHandle( val id: StateMachineRunId, val progress: Observable, - // TODO This should be ListenableFuture - val returnValue: Observable + val returnValue: ListenableFuture ) diff --git a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt index 86c6fdbae3..3687381a68 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt @@ -72,12 +72,7 @@ interface NetworkMapCache { */ /** Look up the node info for a specific peer key. */ - fun getNodeByLegalIdentityKey(compositeKey: CompositeKey): NodeInfo? { - // Although we should never have more than one match, it is theoretically possible. Report an error if it happens. - val candidates = partyNodes.filter { it.legalIdentity.owningKey == compositeKey } - check(candidates.size <= 1) { "Found more than one match for key $compositeKey" } - return candidates.singleOrNull() - } + fun getNodeByLegalIdentityKey(compositeKey: CompositeKey): NodeInfo? /** Look up all nodes advertising the service owned by [compositeKey] */ fun getNodesByAdvertisedServiceIdentityKey(compositeKey: CompositeKey): List { return partyNodes.filter { it.advertisedServices.any { it.identity.owningKey == compositeKey } } diff --git a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt index d02e0b35bc..bee92be51f 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt @@ -5,6 +5,7 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy import com.esotericsoftware.kryo.KryoException +import com.esotericsoftware.kryo.Registration import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output @@ -66,7 +67,7 @@ import kotlin.reflect.jvm.javaType */ // A convenient instance of Kryo pre-configured with some useful things. Used as a default by various functions. -val THREAD_LOCAL_KRYO = ThreadLocal.withInitial { createKryo() } +val THREAD_LOCAL_KRYO: ThreadLocal = ThreadLocal.withInitial { createKryo() } /** * A type safe wrapper around a byte array that contains a serialised object. You can call [SerializedBytes.deserialize] @@ -76,7 +77,7 @@ class SerializedBytes(bytes: ByteArray) : OpaqueBytes(bytes) { // It's OK to use lazy here because SerializedBytes is configured to use the ImmutableClassSerializer. val hash: SecureHash by lazy { bytes.sha256() } - fun writeToFile(path: Path) = Files.write(path, bytes) + fun writeToFile(path: Path): Path = Files.write(path, bytes) } // Some extension functions that make deserialisation convenient and provide auto-casting of the result. @@ -385,8 +386,7 @@ object KotlinObjectSerializer : Serializer() { return type.getField("INSTANCE").get(null) as DeserializeAsKotlinObjectDef } - override fun write(kryo: Kryo, output: Output, obj: DeserializeAsKotlinObjectDef) { - } + override fun write(kryo: Kryo, output: Output, obj: DeserializeAsKotlinObjectDef) {} } fun createKryo(k: Kryo = Kryo()): Kryo { @@ -402,12 +402,10 @@ fun createKryo(k: Kryo = Kryo()): Kryo { // Because we like to stick a Kryo object in a ThreadLocal to speed things up a bit, we can end up trying to // serialise the Kryo object itself when suspending a fiber. That's dumb, useless AND can cause crashes, so // we avoid it here. - register(Kryo::class.java, object : Serializer() { - override fun read(kryo: Kryo, input: Input, type: Class): Kryo { - return createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo) - } - override fun write(kryo: Kryo, output: Output, obj: Kryo) {} - }) + register(Kryo::class, + read = { kryo, input -> createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo) }, + write = { kryo, output, obj -> } + ) register(EdDSAPublicKey::class.java, Ed25519PublicKeySerializer) register(EdDSAPrivateKey::class.java, Ed25519PrivateKeySerializer) @@ -435,6 +433,8 @@ fun createKryo(k: Kryo = Kryo()): Kryo { // This ensures a NonEmptySetSerializer is constructed with an initial value. register(NonEmptySet::class.java, NonEmptySetSerializer) + register(Array::class, read = { kryo, input -> emptyArray() }, write = { kryo, output, o -> }) + /** This ensures any kotlin objects that implement [DeserializeAsKotlinObjectDef] are read back in as singletons. */ addDefaultSerializer(DeserializeAsKotlinObjectDef::class.java, KotlinObjectSerializer) @@ -451,6 +451,19 @@ fun createKryo(k: Kryo = Kryo()): Kryo { } } +inline fun Kryo.register( + type: KClass, + crossinline read: (Kryo, Input) -> T, + crossinline write: (Kryo, Output, T) -> Unit): Registration { + return register( + type.java, + object : Serializer() { + override fun read(kryo: Kryo, input: Input, type: Class): T = read(kryo, input) + override fun write(kryo: Kryo, output: Output, obj: T) = write(kryo, output, obj) + } + ) +} + /** * Use this method to mark any types which can have the same instance within it more than once. This will make sure * the serialised form is stable across multiple serialise-deserialise cycles. Using this on a type with internal cyclic @@ -517,7 +530,7 @@ var Kryo.attachmentStorage: AttachmentStorage? //Used in Merkle tree calculation. It doesn't cover all the cases of unstable serialization format. fun extendKryoHash(kryo: Kryo): Kryo { return kryo.apply { - setReferences(false) + references = false register(LinkedHashMap::class.java, MapSerializer()) register(HashMap::class.java, OrderedSerializer) } diff --git a/core/src/test/kotlin/net/corda/core/UtilsTest.kt b/core/src/test/kotlin/net/corda/core/UtilsTest.kt index f1e4e587c8..7988d7eadc 100644 --- a/core/src/test/kotlin/net/corda/core/UtilsTest.kt +++ b/core/src/test/kotlin/net/corda/core/UtilsTest.kt @@ -4,6 +4,7 @@ import org.assertj.core.api.Assertions.* import org.junit.Test import rx.subjects.PublishSubject import java.util.* +import java.util.concurrent.CancellationException class UtilsTest { @Test @@ -44,4 +45,16 @@ class UtilsTest { future.getOrThrow() }.isSameAs(exception) } + + @Test + fun `toFuture - cancel`() { + val subject = PublishSubject.create() + val future = subject.toFuture() + future.cancel(false) + assertThat(subject.hasObservers()).isFalse() + subject.onNext("Hello") + assertThatExceptionOfType(CancellationException::class.java).isThrownBy { + future.get() + } + } } \ No newline at end of file diff --git a/docs/source/clientrpc.rst b/docs/source/clientrpc.rst index 5bf460fd04..3c27d24a92 100644 --- a/docs/source/clientrpc.rst +++ b/docs/source/clientrpc.rst @@ -46,6 +46,14 @@ through to the server where the corresponding server-side observables are also u a warning printed to the logs and the proxy will be closed for you. But don't rely on this, as garbage collection is non-deterministic. +Futures +------- + +A method can also return a ``ListenableFuture`` in its object graph and it will be treated in a similar manner to +observables, including needing to mark the RPC with the ``@RPCReturnsObservables`` annotation. Unlike for an observable, +once the single value (or an exception) has been received all server-side resources will be released automatically. Calling +the ``cancel`` method on the future will unsubscribe it from any future value and release any resources. + Versioning ---------- diff --git a/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt b/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt index 687bc46515..d08bcf7f76 100644 --- a/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt +++ b/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt @@ -9,7 +9,6 @@ import net.corda.core.messaging.startFlow import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.Vault import net.corda.core.serialization.OpaqueBytes -import net.corda.core.toFuture import net.corda.flows.CashCommand import net.corda.flows.CashFlow import net.corda.node.driver.driver @@ -87,7 +86,7 @@ class IntegrationTestingTutorial { amount = i.DOLLARS.issuedBy(alice.nodeInfo.legalIdentity.ref(issueRef)), recipient = alice.nodeInfo.legalIdentity )) - flowHandle.returnValue.toFuture().getOrThrow() + flowHandle.returnValue.getOrThrow() } aliceVaultUpdates.expectEvents { diff --git a/node/capsule/build.gradle b/node/capsule/build.gradle index 039639535e..e6f1ddc61f 100644 --- a/node/capsule/build.gradle +++ b/node/capsule/build.gradle @@ -54,9 +54,11 @@ task buildCordaJAR(type: FatCapsule, dependsOn: ['buildCertSigningRequestUtility applicationSource = files(project.tasks.findByName('jar'), '../build/classes/main/CordaCaplet.class', 'config/dev/log4j2.xml') capsuleManifest { + applicationVersion = corda_version appClassPath = ["jolokia-agent-war-${project.rootProject.ext.jolokia_version}.war"] javaAgents = ["quasar-core-${quasar_version}-jdk8.jar"] systemProperties['visualvm.display.name'] = 'Corda' + systemProperties['corda.version'] = corda_version minJavaVersion = '1.8.0' // This version is known to work and avoids earlier 8u versions that have bugs. minUpdateVersion['1.8'] = '102' diff --git a/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt index 3004eceb21..03461b23be 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt @@ -11,7 +11,6 @@ import net.corda.core.messaging.StateMachineUpdate import net.corda.core.messaging.startFlow import net.corda.core.node.NodeInfo import net.corda.core.serialization.OpaqueBytes -import net.corda.core.toFuture import net.corda.flows.CashCommand import net.corda.flows.CashFlow import net.corda.node.driver.DriverBasedTest @@ -138,13 +137,13 @@ class DistributedServiceTests : DriverBasedTest() { val issueHandle = aliceProxy.startFlow( ::CashFlow, CashCommand.IssueCash(amount, OpaqueBytes.of(0), alice.nodeInfo.legalIdentity, raftNotaryIdentity)) - issueHandle.returnValue.toFuture().getOrThrow() + issueHandle.returnValue.getOrThrow() } private fun paySelf(amount: Amount) { val payHandle = aliceProxy.startFlow( ::CashFlow, CashCommand.PayCash(amount.issuedBy(alice.nodeInfo.legalIdentity.ref(0)), alice.nodeInfo.legalIdentity)) - payHandle.returnValue.toFuture().getOrThrow() + payHandle.returnValue.getOrThrow() } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/driver/Driver.kt b/node/src/main/kotlin/net/corda/node/driver/Driver.kt index fef25192e9..0120e62026 100644 --- a/node/src/main/kotlin/net/corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/net/corda/node/driver/Driver.kt @@ -380,7 +380,7 @@ open class DriverDSL( registerProcess(processFuture) return processFuture.flatMap { process -> establishRpc(messagingAddress, configuration).flatMap { rpc -> - rpc.waitUntilRegisteredWithNetworkMap().toFuture().map { + rpc.waitUntilRegisteredWithNetworkMap().map { NodeHandle(rpc.nodeIdentity(), rpc, configuration, process) } } diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 736b1eeff1..6bf007ddf3 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -15,7 +15,6 @@ import net.corda.core.node.ServiceHub import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.StateMachineTransactionMapping import net.corda.core.node.services.Vault -import net.corda.core.toObservable import net.corda.core.transactions.SignedTransaction import net.corda.node.services.messaging.requirePermission import net.corda.node.services.startFlowPermission @@ -97,7 +96,7 @@ class CordaRPCOpsImpl( return FlowHandle( id = stateMachine.id, progress = stateMachine.logic.track()?.second ?: Observable.empty(), - returnValue = stateMachine.resultFuture.toObservable() + returnValue = stateMachine.resultFuture ) } @@ -111,7 +110,7 @@ class CordaRPCOpsImpl( } } - override fun waitUntilRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered.toObservable() + override fun waitUntilRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered override fun partyFromKey(key: CompositeKey) = services.identityService.partyFromKey(key) override fun partyFromName(name: String) = services.identityService.partyFromName(name) diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index b4b48d0101..b7bbb6f491 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -31,6 +31,8 @@ import org.jetbrains.exposed.sql.Database import java.io.RandomAccessFile import java.lang.management.ManagementFactory import java.nio.channels.FileLock +import java.nio.file.Files +import java.nio.file.Paths import java.time.Clock import javax.management.ObjectName import javax.servlet.* @@ -100,6 +102,27 @@ class Node(override val configuration: FullNodeConfiguration, private lateinit var userService: RPCUserService + init { + checkVersionUnchanged() + } + + /** + * Abort starting the node if an existing deployment with a different version is detected in the current directory. + * The current version is expected to be specified as a system property. If not provided, the check will be ignored. + */ + private fun checkVersionUnchanged() { + val currentVersion = System.getProperty("corda.version") ?: return + val versionFile = Paths.get("version") + if (Files.exists(versionFile)) { + val existingVersion = Files.readAllLines(versionFile)[0] + check(existingVersion == currentVersion) { + "Version change detected - current: $currentVersion, existing: $existingVersion. Node upgrades are not yet supported." + } + } else { + Files.write(versionFile, currentVersion.toByteArray()) + } + } + override fun makeMessagingService(): MessagingServiceInternal { userService = RPCUserServiceImpl(configuration) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt index 431c9f7d0d..d602a5ac3d 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt @@ -8,9 +8,10 @@ import com.esotericsoftware.kryo.Registration import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output -import com.esotericsoftware.kryo.serializers.JavaSerializer import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.ListenableFuture import de.javakaffee.kryoserializers.ArraysAsListSerializer +import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer import de.javakaffee.kryoserializers.guava.* import net.corda.contracts.asset.Cash import net.corda.core.ErrorOr @@ -19,6 +20,8 @@ import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowException +import net.corda.core.flows.IllegalFlowLogicException import net.corda.core.flows.StateMachineRunId import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.StateMachineInfo @@ -26,12 +29,15 @@ import net.corda.core.messaging.StateMachineUpdate import net.corda.core.node.* import net.corda.core.node.services.* import net.corda.core.serialization.* +import net.corda.core.toFuture +import net.corda.core.toObservable import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction import net.corda.node.internal.AbstractNode import net.corda.node.services.User import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress +import net.corda.node.services.statemachine.FlowSessionException import net.i2p.crypto.eddsa.EdDSAPrivateKey import net.i2p.crypto.eddsa.EdDSAPublicKey import org.apache.activemq.artemis.api.core.SimpleString @@ -138,6 +144,7 @@ private class RPCKryo(observableSerializer: Serializer>? = null) register(Array(0,{}).javaClass) register(Class::class.java, ClassSerializer) + UnmodifiableCollectionsSerializer.registerSerializers(this) ImmutableListSerializer.registerSerializers(this) ImmutableSetSerializer.registerSerializers(this) ImmutableSortedSetSerializer.registerSerializers(this) @@ -207,16 +214,18 @@ private class RPCKryo(observableSerializer: Serializer>? = null) register(SimpleString::class.java) register(ServiceEntry::class.java) // Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway. + register(Array::class, read = { kryo, input -> emptyArray() }, write = { kryo, output, obj -> }) + register(FlowException::class.java) + register(FlowSessionException::class.java) + register(IllegalFlowLogicException::class.java) register(RuntimeException::class.java) register(IllegalArgumentException::class.java) register(ArrayIndexOutOfBoundsException::class.java) register(IndexOutOfBoundsException::class.java) - // Kryo couldn't serialize Collections.unmodifiableCollection in Throwable correctly, causing null pointer exception when try to access the deserialize object. - register(NoSuchElementException::class.java, JavaSerializer()) + register(NoSuchElementException::class.java) register(RPCException::class.java) - register(Array::class.java, read = { kryo, input -> emptyArray() }, write = { kryo, output, o -> }) - register(Collections.unmodifiableList(emptyList()).javaClass) register(PermissionException::class.java) + register(Throwable::class.java) register(FlowHandle::class.java) register(KryoException::class.java) register(StringBuffer::class.java) @@ -229,20 +238,45 @@ private class RPCKryo(observableSerializer: Serializer>? = null) pluginRegistries.forEach { it.registerRPCKryoTypes(this) } } - // Helper method, attempt to reduce boiler plate code - private fun register(type: Class, read: (Kryo, Input) -> T, write: (Kryo, Output, T) -> Unit) { - register(type, object : Serializer() { - override fun read(kryo: Kryo, input: Input, type: Class): T = read(kryo, input) - override fun write(kryo: Kryo, output: Output, o: T) = write(kryo, output, o) - }) + // TODO: workaround to prevent Observable registration conflict when using plugin registered kyro classes + private val observableRegistration: Registration? = observableSerializer?.let { register(Observable::class.java, it, 10000) } + + private val listenableFutureRegistration: Registration? = observableSerializer?.let { + // Register ListenableFuture by making use of Observable serialisation. + // TODO Serialisation could be made more efficient as a future can only emit one value (or exception) + @Suppress("UNCHECKED_CAST") + register(ListenableFuture::class, + read = { kryo, input -> it.read(kryo, input, Observable::class.java as Class>).toFuture() }, + write = { kryo, output, obj -> it.write(kryo, output, obj.toObservable()) } + ) } - // TODO: workaround to prevent Observable registration conflict when using plugin registered kyro classes - val observableRegistration: Registration? = if (observableSerializer != null) register(Observable::class.java, observableSerializer, 10000) else null + // Avoid having to worry about the subtypes of FlowException by converting all of them to just FlowException. + // This is a temporary hack until a proper serialisation mechanism is in place. + private val flowExceptionRegistration: Registration = register( + FlowException::class, + read = { kryo, input -> + val message = input.readString() + val cause = kryo.readObjectOrNull(input, Throwable::class.java) + FlowException(message, cause) + }, + write = { kryo, output, obj -> + // The subclass may have overridden toString so we use that + val message = if (obj.javaClass != FlowException::class.java) obj.toString() else obj.message + output.writeString(message) + kryo.writeObjectOrNull(output, obj.cause, Throwable::class.java) + } + ) override fun getRegistration(type: Class<*>): Registration { if (Observable::class.java.isAssignableFrom(type)) - return observableRegistration ?: throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables") + return observableRegistration ?: + throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables") + if (ListenableFuture::class.java.isAssignableFrom(type)) + return listenableFutureRegistration ?: + throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables") + if (FlowException::class.java.isAssignableFrom(type)) + return flowExceptionRegistration return super.getRegistration(type) } } diff --git a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt index 7c823b4e9b..892194c24f 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt @@ -4,6 +4,7 @@ import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture import net.corda.core.bufferUntilSubscribed +import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.Party import net.corda.core.map import net.corda.core.messaging.MessagingService @@ -70,6 +71,8 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach return null } + override fun getNodeByLegalIdentityKey(compositeKey: CompositeKey): NodeInfo? = registeredNodes[Party("", compositeKey)] + override fun track(): Pair, Observable> { synchronized(_changed) { return Pair(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction()) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index ee214162ef..85a890f459 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -368,9 +368,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>, errorResponse: Pair?) { - // TODO Blanking the stack trace prevents the receiving flow from filling in its own stack trace -// @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") -// (errorResponse?.first as java.lang.Throwable?)?.stackTrace = emptyArray() openSessions.values.removeIf { session -> if (session.fiber == fiber) { session.endSession(errorResponse) diff --git a/node/src/test/kotlin/net/corda/node/messaging/ClientRPCInfrastructureTests.kt b/node/src/test/kotlin/net/corda/node/messaging/ClientRPCInfrastructureTests.kt index 6aeb31f049..42ef891a00 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/ClientRPCInfrastructureTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/ClientRPCInfrastructureTests.kt @@ -1,8 +1,13 @@ package net.corda.node.messaging +import com.google.common.util.concurrent.Futures +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture +import net.corda.core.getOrThrow import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCReturnsObservables import net.corda.core.serialization.SerializedBytes +import net.corda.core.success import net.corda.core.utilities.LogHelper import net.corda.node.services.RPCUserService import net.corda.node.services.User @@ -120,30 +125,31 @@ class ClientRPCInfrastructureTests { @RPCReturnsObservables fun makeComplicatedObservable(): Observable>> + @RPCReturnsObservables + fun makeListenableFuture(): ListenableFuture + + @RPCReturnsObservables + fun makeComplicatedListenableFuture(): ListenableFuture>> + @RPCSinceVersion(2) fun addedLater() fun captureUser(): String } - lateinit var complicatedObservable: Observable>> + private lateinit var complicatedObservable: Observable>> + private lateinit var complicatedListenableFuturee: ListenableFuture>> inner class TestOpsImpl : TestOps { override val protocolVersion = 1 - override fun barf(): Unit = throw IllegalArgumentException("Barf!") - - override fun void() { - } - + override fun void() {} override fun someCalculation(str: String, num: Int) = "$str $num" - override fun makeObservable(): Observable = Observable.just(1, 2, 3, 4) - + override fun makeListenableFuture(): ListenableFuture = Futures.immediateFuture(1) override fun makeComplicatedObservable() = complicatedObservable - + override fun makeComplicatedListenableFuture(): ListenableFuture>> = complicatedListenableFuturee override fun addedLater(): Unit = throw UnsupportedOperationException("not implemented") - override fun captureUser(): String = CURRENT_RPC_USER.get().username } @@ -212,6 +218,41 @@ class ClientRPCInfrastructureTests { assertEquals(1, clientSession.addressQuery(rpcQueuesQuery).queueNames.size) } + @Test + fun `simple ListenableFuture`() { + val value = proxy.makeListenableFuture().getOrThrow() + assertThat(value).isEqualTo(1) + } + + @Test + fun `complex ListenableFuture`() { + val serverQuote = SettableFuture.create>>() + complicatedListenableFuturee = serverQuote + + val twainQuote = "Mark Twain" to Futures.immediateFuture("I have never let my schooling interfere with my education.") + + val clientQuotes = LinkedBlockingQueue() + val clientFuture = proxy.makeComplicatedListenableFuture() + + clientFuture.success { + val name = it.first + it.second.success { + clientQuotes += "Quote by $name: $it" + } + } + + val rpcQueuesQuery = SimpleString("clients.${authenticatedUser.username}.rpc.*") + assertEquals(2, clientSession.addressQuery(rpcQueuesQuery).queueNames.size) + + assertThat(clientQuotes).isEmpty() + + serverQuote.set(twainQuote) + assertThat(clientQuotes.take()).isEqualTo("Quote by Mark Twain: I have never let my schooling interfere with my education.") + + // TODO This final assert sometimes fails because the relevant queue hasn't been removed yet +// assertEquals(1, clientSession.addressQuery(rpcQueuesQuery).queueNames.size) + } + @Test fun versioning() { assertFailsWith { proxy.addedLater() } @@ -221,5 +262,4 @@ class ClientRPCInfrastructureTests { fun `authenticated user is available to RPC`() { assertThat(proxy.captureUser()).isEqualTo(authenticatedUser.username) } - } diff --git a/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapCacheTest.kt b/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapCacheTest.kt index bee9b114f2..acac6c76ee 100644 --- a/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapCacheTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapCacheTest.kt @@ -4,7 +4,6 @@ import net.corda.core.getOrThrow import net.corda.core.node.services.ServiceInfo import net.corda.node.services.network.NetworkMapService import net.corda.node.utilities.databaseTransaction -import net.corda.testing.expect import net.corda.testing.node.MockNetwork import org.junit.Test import java.math.BigInteger @@ -34,12 +33,7 @@ class InMemoryNetworkMapCacheTest { databaseTransaction(nodeA.database) { nodeA.netMapCache.addNode(nodeB.info) } - // Now both nodes match, so it throws an error - expect { - nodeA.netMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey) - } - expect { - nodeA.netMapCache.getNodeByLegalIdentityKey(nodeB.info.legalIdentity.owningKey) - } + // The details of node B write over those for node A + assertEquals(nodeA.netMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeB.info) } } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt index ea6a94aef0..d8fdb57330 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt @@ -390,9 +390,8 @@ class StateMachineManagerTests { node2 sent sessionConfirm to node1, node2 sent sessionEnd(errorFlow.exceptionThrown) to node1 ) - // TODO see StateMachineManager.endAllFiberSessions -// // Make sure the original stack trace isn't sent down the wire -// assertThat((sessionTransfers.last().message as SessionEnd).errorResponse!!.stackTrace).isEmpty() + // Make sure the original stack trace isn't sent down the wire + assertThat((sessionTransfers.last().message as SessionEnd).errorResponse!!.stackTrace).isEmpty() } @Test diff --git a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt index 2f84383c2f..577b024995 100644 --- a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt +++ b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt @@ -6,6 +6,7 @@ import net.corda.core.contracts.TransactionType import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.div +import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.utilities.Emoji @@ -83,9 +84,9 @@ fun sender(rpc: CordaRPCOps) { // Send the transaction to the other recipient val stx = ptx.toSignedTransaction() println("Sending ${stx.id}") - val protocolHandle = rpc.startFlow(::FinalityFlow, stx, setOf(otherSide)) - protocolHandle.progress.subscribe(::println) - protocolHandle.returnValue.toBlocking().first() + val flowHandle = rpc.startFlow(::FinalityFlow, stx, setOf(otherSide)) + flowHandle.progress.subscribe(::println) + flowHandle.returnValue.getOrThrow() } fun recipient(rpc: CordaRPCOps) { diff --git a/samples/bank-of-corda-demo/src/integration-test/kotlin/net/corda/bank/BankOfCordaRPCClientTest.kt b/samples/bank-of-corda-demo/src/integration-test/kotlin/net/corda/bank/BankOfCordaRPCClientTest.kt index 5337fface2..504006d755 100644 --- a/samples/bank-of-corda-demo/src/integration-test/kotlin/net/corda/bank/BankOfCordaRPCClientTest.kt +++ b/samples/bank-of-corda-demo/src/integration-test/kotlin/net/corda/bank/BankOfCordaRPCClientTest.kt @@ -5,7 +5,6 @@ import net.corda.core.contracts.DOLLARS import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow import net.corda.core.node.services.ServiceInfo -import net.corda.core.transactions.SignedTransaction import net.corda.flows.IssuerFlow.IssuanceRequester import net.corda.node.driver.driver import net.corda.node.services.User @@ -16,7 +15,6 @@ import net.corda.testing.expect import net.corda.testing.expectEvents import net.corda.testing.sequence import org.junit.Test -import kotlin.test.assertTrue class BankOfCordaRPCClientTest { @Test @@ -45,13 +43,12 @@ class BankOfCordaRPCClientTest { val vaultUpdatesBigCorp = bigCorpProxy.vaultAndUpdates().second // Kick-off actual Issuer Flow - val result = bocProxy.startFlow( + bocProxy.startFlow( ::IssuanceRequester, 1000.DOLLARS, nodeBigCorporation.nodeInfo.legalIdentity, BOC_PARTY_REF, - nodeBankOfCorda.nodeInfo.legalIdentity).returnValue.toBlocking().first() - assertTrue { result is SignedTransaction } + nodeBankOfCorda.nodeInfo.legalIdentity).returnValue.getOrThrow() // Check Bank of Corda Vault Updates vaultUpdatesBoc.expectEvents { diff --git a/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt b/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt index c84e23d628..4069f988de 100644 --- a/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt +++ b/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt @@ -2,14 +2,15 @@ package net.corda.bank.api import com.google.common.net.HostAndPort import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams -import net.corda.flows.IssuerFlow.IssuanceRequester -import net.corda.node.services.messaging.CordaRPCClient import net.corda.core.contracts.Amount import net.corda.core.contracts.currency +import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow import net.corda.core.serialization.OpaqueBytes import net.corda.core.transactions.SignedTransaction +import net.corda.flows.IssuerFlow.IssuanceRequester import net.corda.node.services.config.configureTestSSL +import net.corda.node.services.messaging.CordaRPCClient import net.corda.testing.http.HttpApi /** @@ -43,6 +44,6 @@ class BankOfCordaClientApi(val hostAndPort: HostAndPort) { val amount = Amount(params.amount, currency(params.currency)) val issuerToPartyRef = OpaqueBytes.of(params.issueToPartyRefAsString.toByte()) - return proxy.startFlow(::IssuanceRequester, amount, issueToParty, issuerToPartyRef, issuerBankParty).returnValue.toBlocking().first() + return proxy.startFlow(::IssuanceRequester, amount, issueToParty, issuerToPartyRef, issuerBankParty).returnValue.getOrThrow() } } diff --git a/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaWebApi.kt b/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaWebApi.kt index 877a782bcc..cc00506df8 100644 --- a/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaWebApi.kt +++ b/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaWebApi.kt @@ -1,13 +1,14 @@ package net.corda.bank.api -import net.corda.flows.IssuerFlow.IssuanceRequester import net.corda.core.contracts.Amount import net.corda.core.contracts.currency +import net.corda.core.flows.FlowException +import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.serialization.OpaqueBytes -import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.loggerFor +import net.corda.flows.IssuerFlow.IssuanceRequester import java.time.LocalDateTime import javax.ws.rs.* import javax.ws.rs.core.MediaType @@ -46,12 +47,13 @@ class BankOfCordaWebApi(val rpc: CordaRPCOps) { // invoke client side of Issuer Flow: IssuanceRequester // The line below blocks and waits for the future to resolve. - val result = rpc.startFlow(::IssuanceRequester, amount, issueToParty, issuerToPartyRef, issuerBankParty).returnValue.toBlocking().first() - if (result is SignedTransaction) { - logger.info("Issue request completed successfully: ${params}") - return Response.status(Response.Status.CREATED).build() - } else { - return Response.status(Response.Status.BAD_REQUEST).build() + val status = try { + rpc.startFlow(::IssuanceRequester, amount, issueToParty, issuerToPartyRef, issuerBankParty).returnValue.getOrThrow() + logger.info("Issue request completed successfully: $params") + Response.Status.CREATED + } catch (e: FlowException) { + Response.Status.BAD_REQUEST } + return Response.status(status).build() } } \ No newline at end of file diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/api/InterestRateSwapAPI.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/api/InterestRateSwapAPI.kt index 559c163e6d..09e586f3b3 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/api/InterestRateSwapAPI.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/api/InterestRateSwapAPI.kt @@ -1,6 +1,7 @@ package net.corda.irs.api import net.corda.core.contracts.filterStatesOfType +import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.utilities.loggerFor @@ -66,12 +67,12 @@ class InterestRateSwapAPI(val rpc: CordaRPCOps) { @Path("deals") @Consumes(MediaType.APPLICATION_JSON) fun storeDeal(newDeal: InterestRateSwap.State): Response { - try { - rpc.startFlow(AutoOfferFlow::Requester, newDeal).returnValue.toBlocking().first() - return Response.created(URI.create(generateDealLink(newDeal))).build() + return try { + rpc.startFlow(AutoOfferFlow::Requester, newDeal).returnValue.getOrThrow() + Response.created(URI.create(generateDealLink(newDeal))).build() } catch (ex: Throwable) { logger.info("Exception when creating deal: $ex") - return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(ex.toString()).build() + Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(ex.toString()).build() } } @@ -94,7 +95,7 @@ class InterestRateSwapAPI(val rpc: CordaRPCOps) { val priorDemoDate = fetchDemoDate() // Can only move date forwards if (newDemoDate.isAfter(priorDemoDate)) { - rpc.startFlow(UpdateBusinessDayFlow::Broadcast, newDemoDate).returnValue.toBlocking().first() + rpc.startFlow(UpdateBusinessDayFlow::Broadcast, newDemoDate).returnValue.getOrThrow() return Response.ok().build() } val msg = "demodate is already $priorDemoDate and can only be updated with a later date" @@ -113,7 +114,7 @@ class InterestRateSwapAPI(val rpc: CordaRPCOps) { @Path("restart") @Consumes(MediaType.APPLICATION_JSON) fun exitServer(): Response { - rpc.startFlow(ExitServerFlow::Broadcast, 83).returnValue.toBlocking().first() + rpc.startFlow(ExitServerFlow::Broadcast, 83).returnValue.getOrThrow() return Response.ok().build() } } diff --git a/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/NotaryDemo.kt b/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/NotaryDemo.kt index be4f1f03b4..c88f917d43 100644 --- a/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/NotaryDemo.kt +++ b/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/NotaryDemo.kt @@ -1,8 +1,10 @@ package net.corda.notarydemo import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.Futures import net.corda.core.crypto.toStringShort import net.corda.core.div +import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.transactions.SignedTransaction @@ -60,9 +62,9 @@ private class NotaryDemoClientApi(val rpc: CordaRPCOps) { */ private fun buildTransactions(count: Int): List { val moveTransactions = (1..count).map { - rpc.startFlow(::DummyIssueAndMove, notary, counterpartyNode.legalIdentity).returnValue.toBlocking().toFuture() + rpc.startFlow(::DummyIssueAndMove, notary, counterpartyNode.legalIdentity).returnValue } - return moveTransactions.map { it.get() } + return Futures.allAsList(moveTransactions).getOrThrow() } /** @@ -72,10 +74,8 @@ private class NotaryDemoClientApi(val rpc: CordaRPCOps) { * @return a list of encoded signer public keys - one for every transaction */ private fun notariseTransactions(transactions: List): List { - val signatureFutures = transactions.map { - rpc.startFlow(NotaryFlow::Client, it).returnValue.toBlocking().toFuture() - } - return signatureFutures.map { it.get().by.toStringShort() } + val signatureFutures = transactions.map { rpc.startFlow(NotaryFlow::Client, it).returnValue } + return Futures.allAsList(signatureFutures).getOrThrow().map { it.by.toStringShort() } } } diff --git a/samples/simm-valuation-demo/src/integration-test/kotlin/net/corda/vega/SimmValuationTest.kt b/samples/simm-valuation-demo/src/integration-test/kotlin/net/corda/vega/SimmValuationTest.kt index 2deef657f4..e3254bf057 100644 --- a/samples/simm-valuation-demo/src/integration-test/kotlin/net/corda/vega/SimmValuationTest.kt +++ b/samples/simm-valuation-demo/src/integration-test/kotlin/net/corda/vega/SimmValuationTest.kt @@ -42,8 +42,8 @@ class SimmValuationTest : IntegrationTestCategory { } } - private fun getPartyWithName(partyApi: HttpApi, countryparty: String): PortfolioApi.ApiParty = - getAvailablePartiesFor(partyApi).counterparties.single { it.text == countryparty } + private fun getPartyWithName(partyApi: HttpApi, counterparty: String): PortfolioApi.ApiParty = + getAvailablePartiesFor(partyApi).counterparties.single { it.text == counterparty } private fun getAvailablePartiesFor(partyApi: HttpApi): PortfolioApi.AvailableParties { return partyApi.getJson("whoami") @@ -68,4 +68,4 @@ class SimmValuationTest : IntegrationTestCategory { val valuations = partyApi.getJson("${counterparty.id}/portfolio/valuations") return (valuations.initialMargin.call["total"] != 0.0) } -} \ No newline at end of file +} diff --git a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/api/PortfolioApi.kt b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/api/PortfolioApi.kt index 1c45d087b9..1127fc6c3f 100644 --- a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/api/PortfolioApi.kt +++ b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/api/PortfolioApi.kt @@ -6,6 +6,7 @@ import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.filterStatesOfType import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.Party +import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.vega.analytics.InitialMarginTriple @@ -159,7 +160,7 @@ class PortfolioApi(val rpc: CordaRPCOps) { return withParty(partyName) { val buyer = if (swap.buySell.isBuy) ownParty else it val seller = if (swap.buySell.isSell) ownParty else it - rpc.startFlow(IRSTradeFlow::Requester, swap.toData(buyer, seller), it).returnValue.toBlocking().first() + rpc.startFlow(IRSTradeFlow::Requester, swap.toData(buyer, seller), it).returnValue.getOrThrow() Response.accepted().entity("{}").build() } } @@ -266,12 +267,12 @@ class PortfolioApi(val rpc: CordaRPCOps) { fun startPortfolioCalculations(params: ValuationCreationParams = ValuationCreationParams(LocalDate.of(2016, 6, 6)), @PathParam("party") partyName: String): Response { return withParty(partyName) { otherParty -> val existingSwap = getPortfolioWith(otherParty) - if (existingSwap == null) { - rpc.startFlow(SimmFlow::Requester, otherParty, params.valuationDate).returnValue.toBlocking().first() + val flowHandle = if (existingSwap == null) { + rpc.startFlow(SimmFlow::Requester, otherParty, params.valuationDate) } else { - val handle = rpc.startFlow(SimmRevaluation::Initiator, getPortfolioStateAndRefWith(otherParty).ref, params.valuationDate) - handle.returnValue.toBlocking().first() + rpc.startFlow(SimmRevaluation::Initiator, getPortfolioStateAndRefWith(otherParty).ref, params.valuationDate) } + flowHandle.returnValue.getOrThrow() withPortfolio(otherParty) { portfolioState -> val portfolio = portfolioState.portfolio.toStateAndRef(rpc).toPortfolio() diff --git a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/TraderDemoClientApi.kt b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/TraderDemoClientApi.kt index bc63f62c23..366940a3b6 100644 --- a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/TraderDemoClientApi.kt +++ b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/TraderDemoClientApi.kt @@ -1,19 +1,17 @@ package net.corda.traderdemo -import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.Futures import net.corda.contracts.testing.calculateRandomlySizedAmounts import net.corda.core.contracts.Amount import net.corda.core.contracts.DOLLARS +import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.serialization.OpaqueBytes -import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.Emoji import net.corda.core.utilities.loggerFor import net.corda.flows.IssuerFlow.IssuanceRequester -import net.corda.node.services.messaging.CordaRPCClient import net.corda.testing.BOC -import net.corda.testing.http.HttpApi import net.corda.traderdemo.flow.SellerFlow import java.util.* import kotlin.test.assertEquals @@ -26,20 +24,17 @@ class TraderDemoClientApi(val rpc: CordaRPCOps) { val logger = loggerFor() } - fun runBuyer(amount: Amount = 30000.0.DOLLARS, notary: String = "Notary"): Boolean { + fun runBuyer(amount: Amount = 30000.0.DOLLARS): Boolean { val bankOfCordaParty = rpc.partyFromName(BOC.name) ?: throw Exception("Unable to locate ${BOC.name} in Network Map Service") val me = rpc.nodeIdentity() // TODO: revert back to multiple issue request amounts (3,10) when soft locking implemented val amounts = calculateRandomlySizedAmounts(amount, 1, 1, Random()) - val handles = amounts.map { - rpc.startFlow(::IssuanceRequester, amount, me.legalIdentity, OpaqueBytes.of(1), bankOfCordaParty) - } - - handles.forEach { - require(it.returnValue.toBlocking().first() is SignedTransaction) + val resultFutures = amounts.map { + rpc.startFlow(::IssuanceRequester, amount, me.legalIdentity, OpaqueBytes.of(1), bankOfCordaParty).returnValue } + Futures.allAsList(resultFutures).getOrThrow() return true } @@ -60,7 +55,7 @@ class TraderDemoClientApi(val rpc: CordaRPCOps) { } // The line below blocks and waits for the future to resolve. - val stx = rpc.startFlow(::SellerFlow, otherParty, amount).returnValue.toBlocking().first() + val stx = rpc.startFlow(::SellerFlow, otherParty, amount).returnValue.getOrThrow() logger.info("Sale completed - we have a happy customer!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(stx.tx)}") return true } else { diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt index 17661dfa81..64ee18f7ca 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt @@ -21,7 +21,6 @@ import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow import net.corda.core.node.NodeInfo import net.corda.core.serialization.OpaqueBytes -import net.corda.core.toFuture import net.corda.core.transactions.SignedTransaction import net.corda.explorer.model.CashTransaction import net.corda.explorer.model.IssuerModel @@ -101,7 +100,7 @@ class NewTransaction : Fragment() { rpcProxy.value!!.startFlow(::CashFlow, it) } val response = try { - handle?.returnValue?.toFuture()?.getOrThrow() + handle?.returnValue?.getOrThrow() } catch (e: FlowException) { e } diff --git a/tools/explorer/src/test/kotlin/net/corda/explorer/views/GuiUtilitiesKtTest.kt b/tools/explorer/src/test/kotlin/net/corda/explorer/views/GuiUtilitiesKtTest.kt index 7e830bc914..4902fb69df 100644 --- a/tools/explorer/src/test/kotlin/net/corda/explorer/views/GuiUtilitiesKtTest.kt +++ b/tools/explorer/src/test/kotlin/net/corda/explorer/views/GuiUtilitiesKtTest.kt @@ -2,15 +2,20 @@ package net.corda.explorer.views import org.junit.Assert.assertEquals import org.junit.Test +import java.text.DecimalFormatSymbols +import java.util.* class GuiUtilitiesKtTest { @Test fun `test to string with suffix`() { - assertEquals("10.5k", 10500.toStringWithSuffix()) + //Required for this test to be independent of the default Locale. + val ds = DecimalFormatSymbols(Locale.getDefault()).decimalSeparator + + assertEquals("10${ds}5k", 10500.toStringWithSuffix()) assertEquals("100", 100.toStringWithSuffix()) - assertEquals("5.0M", 5000000.toStringWithSuffix()) - assertEquals("1.0B", 1000000000.toStringWithSuffix()) - assertEquals("1.5T", 1500000000000.toStringWithSuffix()) - assertEquals("1000.0T", 1000000000000000.toStringWithSuffix()) + assertEquals("5${ds}0M", 5000000.toStringWithSuffix()) + assertEquals("1${ds}0B", 1000000000.toStringWithSuffix()) + assertEquals("1${ds}5T", 1500000000000.toStringWithSuffix()) + assertEquals("1000${ds}0T", 1000000000000000.toStringWithSuffix()) } } \ No newline at end of file diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt index 42ed3b0182..495f468a7d 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt @@ -7,12 +7,11 @@ import net.corda.core.contracts.Issued import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.USD import net.corda.core.crypto.Party +import net.corda.core.flows.FlowException import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow import net.corda.core.serialization.OpaqueBytes -import net.corda.core.toFuture import net.corda.flows.CashCommand -import net.corda.flows.CashException import net.corda.flows.CashFlow import net.corda.loadtest.LoadTest import net.corda.loadtest.NodeHandle @@ -208,9 +207,9 @@ val crossCashTest = LoadTest( execute = { command -> try { - val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.toFuture().getOrThrow() + val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.getOrThrow() log.info("Success: $result") - } catch (e: CashException) { + } catch (e: FlowException) { log.error("Failure", e) } }, diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt index 2abc1a1270..51b0767b1c 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt @@ -7,11 +7,10 @@ import net.corda.client.mock.replicatePoisson import net.corda.contracts.asset.Cash import net.corda.core.contracts.USD import net.corda.core.crypto.Party +import net.corda.core.flows.FlowException import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow -import net.corda.core.toFuture import net.corda.flows.CashCommand -import net.corda.flows.CashException import net.corda.flows.CashFlow import net.corda.loadtest.LoadTest import net.corda.loadtest.NodeHandle @@ -63,9 +62,9 @@ val selfIssueTest = LoadTest( execute = { command -> try { - val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.toFuture().getOrThrow() + val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.getOrThrow() log.info("Success: $result") - } catch (e: CashException) { + } catch (e: FlowException) { log.error("Failure", e) } },