From 3499e06e2766ba12375d9f5a9aa6d43f0175c565 Mon Sep 17 00:00:00 2001 From: Ross Nicoll Date: Wed, 1 Feb 2017 18:36:35 +0000 Subject: [PATCH 1/5] Correct typo - "countryparty" to "counterparty" --- .../kotlin/net/corda/vega/SimmValuationTest.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/samples/simm-valuation-demo/src/integration-test/kotlin/net/corda/vega/SimmValuationTest.kt b/samples/simm-valuation-demo/src/integration-test/kotlin/net/corda/vega/SimmValuationTest.kt index 2deef657f4..e3254bf057 100644 --- a/samples/simm-valuation-demo/src/integration-test/kotlin/net/corda/vega/SimmValuationTest.kt +++ b/samples/simm-valuation-demo/src/integration-test/kotlin/net/corda/vega/SimmValuationTest.kt @@ -42,8 +42,8 @@ class SimmValuationTest : IntegrationTestCategory { } } - private fun getPartyWithName(partyApi: HttpApi, countryparty: String): PortfolioApi.ApiParty = - getAvailablePartiesFor(partyApi).counterparties.single { it.text == countryparty } + private fun getPartyWithName(partyApi: HttpApi, counterparty: String): PortfolioApi.ApiParty = + getAvailablePartiesFor(partyApi).counterparties.single { it.text == counterparty } private fun getAvailablePartiesFor(partyApi: HttpApi): PortfolioApi.AvailableParties { return partyApi.getJson("whoami") @@ -68,4 +68,4 @@ class SimmValuationTest : IntegrationTestCategory { val valuations = partyApi.getJson("${counterparty.id}/portfolio/valuations") return (valuations.initialMargin.call["total"] != 0.0) } -} \ No newline at end of file +} From bc9407d2c88ad2c2d12151fb160afe1fffd7e014 Mon Sep 17 00:00:00 2001 From: Ross Nicoll Date: Wed, 1 Feb 2017 17:37:11 +0000 Subject: [PATCH 2/5] Correct network map cache tests InMemoryNetworkMapCacheTest was not actually asserting that an expected exception was thrown, which meant when earlier changes to the service changed the operation it wasn't caught. The service now overwrites previous node if a new matching node is added, and this updates the test to follow that design. --- .../net/corda/core/node/services/NetworkMapCache.kt | 7 +------ .../node/services/network/InMemoryNetworkMapCache.kt | 3 +++ .../corda/node/services/InMemoryNetworkMapCacheTest.kt | 10 ++-------- 3 files changed, 6 insertions(+), 14 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt index 86c6fdbae3..3687381a68 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt @@ -72,12 +72,7 @@ interface NetworkMapCache { */ /** Look up the node info for a specific peer key. */ - fun getNodeByLegalIdentityKey(compositeKey: CompositeKey): NodeInfo? { - // Although we should never have more than one match, it is theoretically possible. Report an error if it happens. - val candidates = partyNodes.filter { it.legalIdentity.owningKey == compositeKey } - check(candidates.size <= 1) { "Found more than one match for key $compositeKey" } - return candidates.singleOrNull() - } + fun getNodeByLegalIdentityKey(compositeKey: CompositeKey): NodeInfo? /** Look up all nodes advertising the service owned by [compositeKey] */ fun getNodesByAdvertisedServiceIdentityKey(compositeKey: CompositeKey): List { return partyNodes.filter { it.advertisedServices.any { it.identity.owningKey == compositeKey } } diff --git a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt index 7c823b4e9b..892194c24f 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt @@ -4,6 +4,7 @@ import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture import net.corda.core.bufferUntilSubscribed +import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.Party import net.corda.core.map import net.corda.core.messaging.MessagingService @@ -70,6 +71,8 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach return null } + override fun getNodeByLegalIdentityKey(compositeKey: CompositeKey): NodeInfo? = registeredNodes[Party("", compositeKey)] + override fun track(): Pair, Observable> { synchronized(_changed) { return Pair(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction()) diff --git a/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapCacheTest.kt b/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapCacheTest.kt index bee9b114f2..acac6c76ee 100644 --- a/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapCacheTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/InMemoryNetworkMapCacheTest.kt @@ -4,7 +4,6 @@ import net.corda.core.getOrThrow import net.corda.core.node.services.ServiceInfo import net.corda.node.services.network.NetworkMapService import net.corda.node.utilities.databaseTransaction -import net.corda.testing.expect import net.corda.testing.node.MockNetwork import org.junit.Test import java.math.BigInteger @@ -34,12 +33,7 @@ class InMemoryNetworkMapCacheTest { databaseTransaction(nodeA.database) { nodeA.netMapCache.addNode(nodeB.info) } - // Now both nodes match, so it throws an error - expect { - nodeA.netMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey) - } - expect { - nodeA.netMapCache.getNodeByLegalIdentityKey(nodeB.info.legalIdentity.owningKey) - } + // The details of node B write over those for node A + assertEquals(nodeA.netMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeB.info) } } From 087570e74df29f6c951f922e8b44ad19b119230d Mon Sep 17 00:00:00 2001 From: chalkido Date: Thu, 2 Feb 2017 18:06:48 +0000 Subject: [PATCH 3/5] toStringWithSuffix decimal mark unit-test issue on non UK/US Locale (#209) Make use of default Locale to temporarily bypass unit-testing of toStringWithSuffix failing due to different decimal marks on non anglo saxon Locales. --- .../corda/explorer/views/GuiUtilitiesKtTest.kt | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tools/explorer/src/test/kotlin/net/corda/explorer/views/GuiUtilitiesKtTest.kt b/tools/explorer/src/test/kotlin/net/corda/explorer/views/GuiUtilitiesKtTest.kt index 7e830bc914..4902fb69df 100644 --- a/tools/explorer/src/test/kotlin/net/corda/explorer/views/GuiUtilitiesKtTest.kt +++ b/tools/explorer/src/test/kotlin/net/corda/explorer/views/GuiUtilitiesKtTest.kt @@ -2,15 +2,20 @@ package net.corda.explorer.views import org.junit.Assert.assertEquals import org.junit.Test +import java.text.DecimalFormatSymbols +import java.util.* class GuiUtilitiesKtTest { @Test fun `test to string with suffix`() { - assertEquals("10.5k", 10500.toStringWithSuffix()) + //Required for this test to be independent of the default Locale. + val ds = DecimalFormatSymbols(Locale.getDefault()).decimalSeparator + + assertEquals("10${ds}5k", 10500.toStringWithSuffix()) assertEquals("100", 100.toStringWithSuffix()) - assertEquals("5.0M", 5000000.toStringWithSuffix()) - assertEquals("1.0B", 1000000000.toStringWithSuffix()) - assertEquals("1.5T", 1500000000000.toStringWithSuffix()) - assertEquals("1000.0T", 1000000000000000.toStringWithSuffix()) + assertEquals("5${ds}0M", 5000000.toStringWithSuffix()) + assertEquals("1${ds}0B", 1000000000.toStringWithSuffix()) + assertEquals("1${ds}5T", 1500000000000.toStringWithSuffix()) + assertEquals("1000${ds}0T", 1000000000000000.toStringWithSuffix()) } } \ No newline at end of file From 56dbf1e8445bba8493a6f4d50e3a26b1b5914141 Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Thu, 2 Feb 2017 18:21:00 +0000 Subject: [PATCH 4/5] Prevent node from starting across upgrades (until we support it better) (#199) * Prevent node from starting across upgrades (until we support it better). On first run a version file is created in the node dir, and on subsequent runs the node version is matched against it. * Move version check from caplet to node. --- node/capsule/build.gradle | 2 ++ .../kotlin/net/corda/node/internal/Node.kt | 23 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/node/capsule/build.gradle b/node/capsule/build.gradle index 039639535e..e6f1ddc61f 100644 --- a/node/capsule/build.gradle +++ b/node/capsule/build.gradle @@ -54,9 +54,11 @@ task buildCordaJAR(type: FatCapsule, dependsOn: ['buildCertSigningRequestUtility applicationSource = files(project.tasks.findByName('jar'), '../build/classes/main/CordaCaplet.class', 'config/dev/log4j2.xml') capsuleManifest { + applicationVersion = corda_version appClassPath = ["jolokia-agent-war-${project.rootProject.ext.jolokia_version}.war"] javaAgents = ["quasar-core-${quasar_version}-jdk8.jar"] systemProperties['visualvm.display.name'] = 'Corda' + systemProperties['corda.version'] = corda_version minJavaVersion = '1.8.0' // This version is known to work and avoids earlier 8u versions that have bugs. minUpdateVersion['1.8'] = '102' diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index b4b48d0101..b7bbb6f491 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -31,6 +31,8 @@ import org.jetbrains.exposed.sql.Database import java.io.RandomAccessFile import java.lang.management.ManagementFactory import java.nio.channels.FileLock +import java.nio.file.Files +import java.nio.file.Paths import java.time.Clock import javax.management.ObjectName import javax.servlet.* @@ -100,6 +102,27 @@ class Node(override val configuration: FullNodeConfiguration, private lateinit var userService: RPCUserService + init { + checkVersionUnchanged() + } + + /** + * Abort starting the node if an existing deployment with a different version is detected in the current directory. + * The current version is expected to be specified as a system property. If not provided, the check will be ignored. + */ + private fun checkVersionUnchanged() { + val currentVersion = System.getProperty("corda.version") ?: return + val versionFile = Paths.get("version") + if (Files.exists(versionFile)) { + val existingVersion = Files.readAllLines(versionFile)[0] + check(existingVersion == currentVersion) { + "Version change detected - current: $currentVersion, existing: $existingVersion. Node upgrades are not yet supported." + } + } else { + Files.write(versionFile, currentVersion.toByteArray()) + } + } + override fun makeMessagingService(): MessagingServiceInternal { userService = RPCUserServiceImpl(configuration) From b86c80691efb85aad75c4048b18b2c473835817c Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Wed, 1 Feb 2017 21:27:06 +0000 Subject: [PATCH 5/5] FlowException serialised over RPC (subtypes are flattened), and improvement to startFlow RPC for correct exception handling --- .../net/corda/client/CordaRPCClientTest.kt | 41 ++++++++---- .../net/corda/client/NodeMonitorModelTest.kt | 2 +- core/src/main/kotlin/net/corda/core/Utils.kt | 32 ++++++---- .../net/corda/core/messaging/CordaRPCOps.kt | 15 +++-- .../net/corda/core/serialization/Kryo.kt | 35 +++++++---- .../test/kotlin/net/corda/core/UtilsTest.kt | 13 ++++ docs/source/clientrpc.rst | 8 +++ .../corda/docs/IntegrationTestingTutorial.kt | 3 +- .../node/services/DistributedServiceTests.kt | 5 +- .../kotlin/net/corda/node/driver/Driver.kt | 2 +- .../corda/node/internal/CordaRPCOpsImpl.kt | 5 +- .../node/services/messaging/RPCStructures.kt | 62 ++++++++++++++----- .../statemachine/StateMachineManager.kt | 3 - .../messaging/ClientRPCInfrastructureTests.kt | 62 +++++++++++++++---- .../statemachine/StateMachineManagerTests.kt | 5 +- .../corda/attachmentdemo/AttachmentDemo.kt | 7 ++- .../corda/bank/BankOfCordaRPCClientTest.kt | 7 +-- .../corda/bank/api/BankOfCordaClientApi.kt | 7 ++- .../net/corda/bank/api/BankOfCordaWebApi.kt | 18 +++--- .../net/corda/irs/api/InterestRateSwapAPI.kt | 13 ++-- .../kotlin/net/corda/notarydemo/NotaryDemo.kt | 12 ++-- .../kotlin/net/corda/vega/api/PortfolioApi.kt | 11 ++-- .../corda/traderdemo/TraderDemoClientApi.kt | 19 +++--- .../views/cordapps/cash/NewTransaction.kt | 3 +- .../net/corda/loadtest/tests/CrossCashTest.kt | 7 +-- .../net/corda/loadtest/tests/SelfIssueTest.kt | 7 +-- 26 files changed, 261 insertions(+), 143 deletions(-) diff --git a/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt b/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt index db5092e396..f4aaac98a8 100644 --- a/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt +++ b/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt @@ -1,6 +1,8 @@ package net.corda.client import net.corda.core.contracts.DOLLARS +import net.corda.core.contracts.issuedBy +import net.corda.core.flows.FlowException import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow import net.corda.core.node.services.ServiceInfo @@ -8,26 +10,27 @@ import net.corda.core.random63BitValue import net.corda.core.serialization.OpaqueBytes import net.corda.flows.CashCommand import net.corda.flows.CashFlow -import net.corda.node.driver.DriverBasedTest -import net.corda.node.driver.NodeHandle -import net.corda.node.driver.driver +import net.corda.node.internal.Node import net.corda.node.services.User +import net.corda.node.services.config.configureTestSSL import net.corda.node.services.messaging.CordaRPCClient import net.corda.node.services.startFlowPermission import net.corda.node.services.transactions.ValidatingNotaryService +import net.corda.testing.node.NodeBasedTest import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.junit.Before import org.junit.Test -class CordaRPCClientTest : DriverBasedTest() { +class CordaRPCClientTest : NodeBasedTest() { private val rpcUser = User("user1", "test", permissions = setOf(startFlowPermission())) - private lateinit var node: NodeHandle + private lateinit var node: Node private lateinit var client: CordaRPCClient - override fun setup() = driver(isDebug = true) { - node = startNode(rpcUsers = listOf(rpcUser), advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type))).getOrThrow() - client = node.rpcClientToNode() - runTest() + @Before + fun setUp() { + node = startNode("Alice", rpcUsers = listOf(rpcUser), advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type))).getOrThrow() + client = CordaRPCClient(node.configuration.artemisAddress, configureTestSSL()) } @Test @@ -50,7 +53,7 @@ class CordaRPCClientTest : DriverBasedTest() { } @Test - fun `indefinite block bug`() { + fun `close-send deadlock and premature shutdown on empty observable`() { println("Starting client") client.start(rpcUser.username, rpcUser.password) println("Creating proxy") @@ -58,11 +61,25 @@ class CordaRPCClientTest : DriverBasedTest() { println("Starting flow") val flowHandle = proxy.startFlow( ::CashFlow, - CashCommand.IssueCash(20.DOLLARS, OpaqueBytes.of(0), node.nodeInfo.legalIdentity, node.nodeInfo.legalIdentity)) + CashCommand.IssueCash(20.DOLLARS, OpaqueBytes.of(0), node.info.legalIdentity, node.info.legalIdentity)) println("Started flow, waiting on result") flowHandle.progress.subscribe { println("PROGRESS $it") } - println("Result: ${flowHandle.returnValue.toBlocking().first()}") + println("Result: ${flowHandle.returnValue.getOrThrow()}") + } + + @Test + fun `FlowException thrown by flow`() { + client.start(rpcUser.username, rpcUser.password) + val proxy = client.proxy() + val handle = proxy.startFlow(::CashFlow, CashCommand.PayCash( + amount = 100.DOLLARS.issuedBy(node.info.legalIdentity.ref(1)), + recipient = node.info.legalIdentity + )) + // TODO Restrict this to CashException once RPC serialisation has been fixed + assertThatExceptionOfType(FlowException::class.java).isThrownBy { + handle.returnValue.getOrThrow() + } } } diff --git a/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt b/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt index e0a8d6b1b3..12eab3fb5e 100644 --- a/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt +++ b/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt @@ -124,7 +124,7 @@ class NodeMonitorModelTest : DriverBasedTest() { issueRef = OpaqueBytes(ByteArray(1, { 1 })), recipient = aliceNode.legalIdentity, notary = notaryNode.notaryIdentity - )).returnValue.toBlocking().first() + )).returnValue.getOrThrow() rpc.startFlow(::CashFlow, CashCommand.PayCash( amount = Amount(100, Issued(PartyAndReference(aliceNode.legalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), diff --git a/core/src/main/kotlin/net/corda/core/Utils.kt b/core/src/main/kotlin/net/corda/core/Utils.kt index d5ca5f4312..abb9107a2e 100644 --- a/core/src/main/kotlin/net/corda/core/Utils.kt +++ b/core/src/main/kotlin/net/corda/core/Utils.kt @@ -6,10 +6,7 @@ package net.corda.core import com.google.common.base.Function import com.google.common.base.Throwables import com.google.common.io.ByteStreams -import com.google.common.util.concurrent.Futures -import com.google.common.util.concurrent.ListenableFuture -import com.google.common.util.concurrent.MoreExecutors -import com.google.common.util.concurrent.SettableFuture +import com.google.common.util.concurrent.* import kotlinx.support.jdk7.use import net.corda.core.crypto.newSecureRandom import org.slf4j.Logger @@ -115,7 +112,7 @@ inline fun SettableFuture.catch(block: () -> T) { } } -fun ListenableFuture.toObservable(): Observable { +fun ListenableFuture.toObservable(): Observable { return Observable.create { subscriber -> success { subscriber.onNext(it) @@ -384,15 +381,24 @@ fun Observer.tee(vararg teeTo: Observer): Observer { /** * Returns a [ListenableFuture] bound to the *first* item emitted by this Observable. The future will complete with a - * NoSuchElementException if no items are emitted or any other error thrown by the Observable. + * NoSuchElementException if no items are emitted or any other error thrown by the Observable. If it's cancelled then + * it will unsubscribe from the observable. */ -fun Observable.toFuture(): ListenableFuture { - val future = SettableFuture.create() - first().subscribe( - { future.set(it) }, - { future.setException(it) } - ) - return future +fun Observable.toFuture(): ListenableFuture = ObservableToFuture(this) + +private class ObservableToFuture(observable: Observable) : AbstractFuture(), Observer { + private val subscription = observable.first().subscribe(this) + override fun onNext(value: T) { + set(value) + } + override fun onError(e: Throwable) { + setException(e) + } + override fun cancel(mayInterruptIfRunning: Boolean): Boolean { + subscription.unsubscribe() + return super.cancel(mayInterruptIfRunning) + } + override fun onCompleted() {} } /** Return the sum of an Iterable of [BigDecimal]s. */ diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index fdb5939dfb..aba12670e0 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -1,5 +1,6 @@ package net.corda.core.messaging +import com.google.common.util.concurrent.ListenableFuture import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef import net.corda.core.crypto.CompositeKey @@ -107,15 +108,16 @@ interface CordaRPCOps : RPCOps { fun uploadFile(dataType: String, name: String?, file: InputStream): String /** - * Returns the node-local current time. + * Returns the node's current time. */ fun currentNodeTime(): Instant /** - * Returns an Observable emitting a single Unit once the node is registered with the network map. + * Returns a [ListenableFuture] which completes when the node has registered wih the network map service. It can also + * complete with an exception if it is unable to. */ @RPCReturnsObservables - fun waitUntilRegisteredWithNetworkMap(): Observable + fun waitUntilRegisteredWithNetworkMap(): ListenableFuture // TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of // the node's state locally and query that directly. @@ -179,13 +181,10 @@ inline fun > CordaRPCOps.startFlow * * @param id The started state machine's ID. * @param progress The stream of progress tracker events. - * @param returnValue An Observable emitting a single event containing the flow's return value. - * To block on this value: - * val returnValue = rpc.startFlow(::MyFlow).returnValue.toBlocking().first() + * @param returnValue A [ListenableFuture] of the flow's return value. */ data class FlowHandle( val id: StateMachineRunId, val progress: Observable, - // TODO This should be ListenableFuture - val returnValue: Observable + val returnValue: ListenableFuture ) diff --git a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt index d02e0b35bc..bee92be51f 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt @@ -5,6 +5,7 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy import com.esotericsoftware.kryo.KryoException +import com.esotericsoftware.kryo.Registration import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output @@ -66,7 +67,7 @@ import kotlin.reflect.jvm.javaType */ // A convenient instance of Kryo pre-configured with some useful things. Used as a default by various functions. -val THREAD_LOCAL_KRYO = ThreadLocal.withInitial { createKryo() } +val THREAD_LOCAL_KRYO: ThreadLocal = ThreadLocal.withInitial { createKryo() } /** * A type safe wrapper around a byte array that contains a serialised object. You can call [SerializedBytes.deserialize] @@ -76,7 +77,7 @@ class SerializedBytes(bytes: ByteArray) : OpaqueBytes(bytes) { // It's OK to use lazy here because SerializedBytes is configured to use the ImmutableClassSerializer. val hash: SecureHash by lazy { bytes.sha256() } - fun writeToFile(path: Path) = Files.write(path, bytes) + fun writeToFile(path: Path): Path = Files.write(path, bytes) } // Some extension functions that make deserialisation convenient and provide auto-casting of the result. @@ -385,8 +386,7 @@ object KotlinObjectSerializer : Serializer() { return type.getField("INSTANCE").get(null) as DeserializeAsKotlinObjectDef } - override fun write(kryo: Kryo, output: Output, obj: DeserializeAsKotlinObjectDef) { - } + override fun write(kryo: Kryo, output: Output, obj: DeserializeAsKotlinObjectDef) {} } fun createKryo(k: Kryo = Kryo()): Kryo { @@ -402,12 +402,10 @@ fun createKryo(k: Kryo = Kryo()): Kryo { // Because we like to stick a Kryo object in a ThreadLocal to speed things up a bit, we can end up trying to // serialise the Kryo object itself when suspending a fiber. That's dumb, useless AND can cause crashes, so // we avoid it here. - register(Kryo::class.java, object : Serializer() { - override fun read(kryo: Kryo, input: Input, type: Class): Kryo { - return createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo) - } - override fun write(kryo: Kryo, output: Output, obj: Kryo) {} - }) + register(Kryo::class, + read = { kryo, input -> createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo) }, + write = { kryo, output, obj -> } + ) register(EdDSAPublicKey::class.java, Ed25519PublicKeySerializer) register(EdDSAPrivateKey::class.java, Ed25519PrivateKeySerializer) @@ -435,6 +433,8 @@ fun createKryo(k: Kryo = Kryo()): Kryo { // This ensures a NonEmptySetSerializer is constructed with an initial value. register(NonEmptySet::class.java, NonEmptySetSerializer) + register(Array::class, read = { kryo, input -> emptyArray() }, write = { kryo, output, o -> }) + /** This ensures any kotlin objects that implement [DeserializeAsKotlinObjectDef] are read back in as singletons. */ addDefaultSerializer(DeserializeAsKotlinObjectDef::class.java, KotlinObjectSerializer) @@ -451,6 +451,19 @@ fun createKryo(k: Kryo = Kryo()): Kryo { } } +inline fun Kryo.register( + type: KClass, + crossinline read: (Kryo, Input) -> T, + crossinline write: (Kryo, Output, T) -> Unit): Registration { + return register( + type.java, + object : Serializer() { + override fun read(kryo: Kryo, input: Input, type: Class): T = read(kryo, input) + override fun write(kryo: Kryo, output: Output, obj: T) = write(kryo, output, obj) + } + ) +} + /** * Use this method to mark any types which can have the same instance within it more than once. This will make sure * the serialised form is stable across multiple serialise-deserialise cycles. Using this on a type with internal cyclic @@ -517,7 +530,7 @@ var Kryo.attachmentStorage: AttachmentStorage? //Used in Merkle tree calculation. It doesn't cover all the cases of unstable serialization format. fun extendKryoHash(kryo: Kryo): Kryo { return kryo.apply { - setReferences(false) + references = false register(LinkedHashMap::class.java, MapSerializer()) register(HashMap::class.java, OrderedSerializer) } diff --git a/core/src/test/kotlin/net/corda/core/UtilsTest.kt b/core/src/test/kotlin/net/corda/core/UtilsTest.kt index f1e4e587c8..7988d7eadc 100644 --- a/core/src/test/kotlin/net/corda/core/UtilsTest.kt +++ b/core/src/test/kotlin/net/corda/core/UtilsTest.kt @@ -4,6 +4,7 @@ import org.assertj.core.api.Assertions.* import org.junit.Test import rx.subjects.PublishSubject import java.util.* +import java.util.concurrent.CancellationException class UtilsTest { @Test @@ -44,4 +45,16 @@ class UtilsTest { future.getOrThrow() }.isSameAs(exception) } + + @Test + fun `toFuture - cancel`() { + val subject = PublishSubject.create() + val future = subject.toFuture() + future.cancel(false) + assertThat(subject.hasObservers()).isFalse() + subject.onNext("Hello") + assertThatExceptionOfType(CancellationException::class.java).isThrownBy { + future.get() + } + } } \ No newline at end of file diff --git a/docs/source/clientrpc.rst b/docs/source/clientrpc.rst index 5bf460fd04..3c27d24a92 100644 --- a/docs/source/clientrpc.rst +++ b/docs/source/clientrpc.rst @@ -46,6 +46,14 @@ through to the server where the corresponding server-side observables are also u a warning printed to the logs and the proxy will be closed for you. But don't rely on this, as garbage collection is non-deterministic. +Futures +------- + +A method can also return a ``ListenableFuture`` in its object graph and it will be treated in a similar manner to +observables, including needing to mark the RPC with the ``@RPCReturnsObservables`` annotation. Unlike for an observable, +once the single value (or an exception) has been received all server-side resources will be released automatically. Calling +the ``cancel`` method on the future will unsubscribe it from any future value and release any resources. + Versioning ---------- diff --git a/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt b/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt index 687bc46515..d08bcf7f76 100644 --- a/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt +++ b/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt @@ -9,7 +9,6 @@ import net.corda.core.messaging.startFlow import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.Vault import net.corda.core.serialization.OpaqueBytes -import net.corda.core.toFuture import net.corda.flows.CashCommand import net.corda.flows.CashFlow import net.corda.node.driver.driver @@ -87,7 +86,7 @@ class IntegrationTestingTutorial { amount = i.DOLLARS.issuedBy(alice.nodeInfo.legalIdentity.ref(issueRef)), recipient = alice.nodeInfo.legalIdentity )) - flowHandle.returnValue.toFuture().getOrThrow() + flowHandle.returnValue.getOrThrow() } aliceVaultUpdates.expectEvents { diff --git a/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt index 3004eceb21..03461b23be 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt @@ -11,7 +11,6 @@ import net.corda.core.messaging.StateMachineUpdate import net.corda.core.messaging.startFlow import net.corda.core.node.NodeInfo import net.corda.core.serialization.OpaqueBytes -import net.corda.core.toFuture import net.corda.flows.CashCommand import net.corda.flows.CashFlow import net.corda.node.driver.DriverBasedTest @@ -138,13 +137,13 @@ class DistributedServiceTests : DriverBasedTest() { val issueHandle = aliceProxy.startFlow( ::CashFlow, CashCommand.IssueCash(amount, OpaqueBytes.of(0), alice.nodeInfo.legalIdentity, raftNotaryIdentity)) - issueHandle.returnValue.toFuture().getOrThrow() + issueHandle.returnValue.getOrThrow() } private fun paySelf(amount: Amount) { val payHandle = aliceProxy.startFlow( ::CashFlow, CashCommand.PayCash(amount.issuedBy(alice.nodeInfo.legalIdentity.ref(0)), alice.nodeInfo.legalIdentity)) - payHandle.returnValue.toFuture().getOrThrow() + payHandle.returnValue.getOrThrow() } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/driver/Driver.kt b/node/src/main/kotlin/net/corda/node/driver/Driver.kt index fef25192e9..0120e62026 100644 --- a/node/src/main/kotlin/net/corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/net/corda/node/driver/Driver.kt @@ -380,7 +380,7 @@ open class DriverDSL( registerProcess(processFuture) return processFuture.flatMap { process -> establishRpc(messagingAddress, configuration).flatMap { rpc -> - rpc.waitUntilRegisteredWithNetworkMap().toFuture().map { + rpc.waitUntilRegisteredWithNetworkMap().map { NodeHandle(rpc.nodeIdentity(), rpc, configuration, process) } } diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 736b1eeff1..6bf007ddf3 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -15,7 +15,6 @@ import net.corda.core.node.ServiceHub import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.StateMachineTransactionMapping import net.corda.core.node.services.Vault -import net.corda.core.toObservable import net.corda.core.transactions.SignedTransaction import net.corda.node.services.messaging.requirePermission import net.corda.node.services.startFlowPermission @@ -97,7 +96,7 @@ class CordaRPCOpsImpl( return FlowHandle( id = stateMachine.id, progress = stateMachine.logic.track()?.second ?: Observable.empty(), - returnValue = stateMachine.resultFuture.toObservable() + returnValue = stateMachine.resultFuture ) } @@ -111,7 +110,7 @@ class CordaRPCOpsImpl( } } - override fun waitUntilRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered.toObservable() + override fun waitUntilRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered override fun partyFromKey(key: CompositeKey) = services.identityService.partyFromKey(key) override fun partyFromName(name: String) = services.identityService.partyFromName(name) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt index 431c9f7d0d..d602a5ac3d 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt @@ -8,9 +8,10 @@ import com.esotericsoftware.kryo.Registration import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output -import com.esotericsoftware.kryo.serializers.JavaSerializer import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.ListenableFuture import de.javakaffee.kryoserializers.ArraysAsListSerializer +import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer import de.javakaffee.kryoserializers.guava.* import net.corda.contracts.asset.Cash import net.corda.core.ErrorOr @@ -19,6 +20,8 @@ import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowException +import net.corda.core.flows.IllegalFlowLogicException import net.corda.core.flows.StateMachineRunId import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.StateMachineInfo @@ -26,12 +29,15 @@ import net.corda.core.messaging.StateMachineUpdate import net.corda.core.node.* import net.corda.core.node.services.* import net.corda.core.serialization.* +import net.corda.core.toFuture +import net.corda.core.toObservable import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction import net.corda.node.internal.AbstractNode import net.corda.node.services.User import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress +import net.corda.node.services.statemachine.FlowSessionException import net.i2p.crypto.eddsa.EdDSAPrivateKey import net.i2p.crypto.eddsa.EdDSAPublicKey import org.apache.activemq.artemis.api.core.SimpleString @@ -138,6 +144,7 @@ private class RPCKryo(observableSerializer: Serializer>? = null) register(Array(0,{}).javaClass) register(Class::class.java, ClassSerializer) + UnmodifiableCollectionsSerializer.registerSerializers(this) ImmutableListSerializer.registerSerializers(this) ImmutableSetSerializer.registerSerializers(this) ImmutableSortedSetSerializer.registerSerializers(this) @@ -207,16 +214,18 @@ private class RPCKryo(observableSerializer: Serializer>? = null) register(SimpleString::class.java) register(ServiceEntry::class.java) // Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway. + register(Array::class, read = { kryo, input -> emptyArray() }, write = { kryo, output, obj -> }) + register(FlowException::class.java) + register(FlowSessionException::class.java) + register(IllegalFlowLogicException::class.java) register(RuntimeException::class.java) register(IllegalArgumentException::class.java) register(ArrayIndexOutOfBoundsException::class.java) register(IndexOutOfBoundsException::class.java) - // Kryo couldn't serialize Collections.unmodifiableCollection in Throwable correctly, causing null pointer exception when try to access the deserialize object. - register(NoSuchElementException::class.java, JavaSerializer()) + register(NoSuchElementException::class.java) register(RPCException::class.java) - register(Array::class.java, read = { kryo, input -> emptyArray() }, write = { kryo, output, o -> }) - register(Collections.unmodifiableList(emptyList()).javaClass) register(PermissionException::class.java) + register(Throwable::class.java) register(FlowHandle::class.java) register(KryoException::class.java) register(StringBuffer::class.java) @@ -229,20 +238,45 @@ private class RPCKryo(observableSerializer: Serializer>? = null) pluginRegistries.forEach { it.registerRPCKryoTypes(this) } } - // Helper method, attempt to reduce boiler plate code - private fun register(type: Class, read: (Kryo, Input) -> T, write: (Kryo, Output, T) -> Unit) { - register(type, object : Serializer() { - override fun read(kryo: Kryo, input: Input, type: Class): T = read(kryo, input) - override fun write(kryo: Kryo, output: Output, o: T) = write(kryo, output, o) - }) + // TODO: workaround to prevent Observable registration conflict when using plugin registered kyro classes + private val observableRegistration: Registration? = observableSerializer?.let { register(Observable::class.java, it, 10000) } + + private val listenableFutureRegistration: Registration? = observableSerializer?.let { + // Register ListenableFuture by making use of Observable serialisation. + // TODO Serialisation could be made more efficient as a future can only emit one value (or exception) + @Suppress("UNCHECKED_CAST") + register(ListenableFuture::class, + read = { kryo, input -> it.read(kryo, input, Observable::class.java as Class>).toFuture() }, + write = { kryo, output, obj -> it.write(kryo, output, obj.toObservable()) } + ) } - // TODO: workaround to prevent Observable registration conflict when using plugin registered kyro classes - val observableRegistration: Registration? = if (observableSerializer != null) register(Observable::class.java, observableSerializer, 10000) else null + // Avoid having to worry about the subtypes of FlowException by converting all of them to just FlowException. + // This is a temporary hack until a proper serialisation mechanism is in place. + private val flowExceptionRegistration: Registration = register( + FlowException::class, + read = { kryo, input -> + val message = input.readString() + val cause = kryo.readObjectOrNull(input, Throwable::class.java) + FlowException(message, cause) + }, + write = { kryo, output, obj -> + // The subclass may have overridden toString so we use that + val message = if (obj.javaClass != FlowException::class.java) obj.toString() else obj.message + output.writeString(message) + kryo.writeObjectOrNull(output, obj.cause, Throwable::class.java) + } + ) override fun getRegistration(type: Class<*>): Registration { if (Observable::class.java.isAssignableFrom(type)) - return observableRegistration ?: throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables") + return observableRegistration ?: + throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables") + if (ListenableFuture::class.java.isAssignableFrom(type)) + return listenableFutureRegistration ?: + throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables") + if (FlowException::class.java.isAssignableFrom(type)) + return flowExceptionRegistration return super.getRegistration(type) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index ee214162ef..85a890f459 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -368,9 +368,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>, errorResponse: Pair?) { - // TODO Blanking the stack trace prevents the receiving flow from filling in its own stack trace -// @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") -// (errorResponse?.first as java.lang.Throwable?)?.stackTrace = emptyArray() openSessions.values.removeIf { session -> if (session.fiber == fiber) { session.endSession(errorResponse) diff --git a/node/src/test/kotlin/net/corda/node/messaging/ClientRPCInfrastructureTests.kt b/node/src/test/kotlin/net/corda/node/messaging/ClientRPCInfrastructureTests.kt index 6aeb31f049..42ef891a00 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/ClientRPCInfrastructureTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/ClientRPCInfrastructureTests.kt @@ -1,8 +1,13 @@ package net.corda.node.messaging +import com.google.common.util.concurrent.Futures +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture +import net.corda.core.getOrThrow import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCReturnsObservables import net.corda.core.serialization.SerializedBytes +import net.corda.core.success import net.corda.core.utilities.LogHelper import net.corda.node.services.RPCUserService import net.corda.node.services.User @@ -120,30 +125,31 @@ class ClientRPCInfrastructureTests { @RPCReturnsObservables fun makeComplicatedObservable(): Observable>> + @RPCReturnsObservables + fun makeListenableFuture(): ListenableFuture + + @RPCReturnsObservables + fun makeComplicatedListenableFuture(): ListenableFuture>> + @RPCSinceVersion(2) fun addedLater() fun captureUser(): String } - lateinit var complicatedObservable: Observable>> + private lateinit var complicatedObservable: Observable>> + private lateinit var complicatedListenableFuturee: ListenableFuture>> inner class TestOpsImpl : TestOps { override val protocolVersion = 1 - override fun barf(): Unit = throw IllegalArgumentException("Barf!") - - override fun void() { - } - + override fun void() {} override fun someCalculation(str: String, num: Int) = "$str $num" - override fun makeObservable(): Observable = Observable.just(1, 2, 3, 4) - + override fun makeListenableFuture(): ListenableFuture = Futures.immediateFuture(1) override fun makeComplicatedObservable() = complicatedObservable - + override fun makeComplicatedListenableFuture(): ListenableFuture>> = complicatedListenableFuturee override fun addedLater(): Unit = throw UnsupportedOperationException("not implemented") - override fun captureUser(): String = CURRENT_RPC_USER.get().username } @@ -212,6 +218,41 @@ class ClientRPCInfrastructureTests { assertEquals(1, clientSession.addressQuery(rpcQueuesQuery).queueNames.size) } + @Test + fun `simple ListenableFuture`() { + val value = proxy.makeListenableFuture().getOrThrow() + assertThat(value).isEqualTo(1) + } + + @Test + fun `complex ListenableFuture`() { + val serverQuote = SettableFuture.create>>() + complicatedListenableFuturee = serverQuote + + val twainQuote = "Mark Twain" to Futures.immediateFuture("I have never let my schooling interfere with my education.") + + val clientQuotes = LinkedBlockingQueue() + val clientFuture = proxy.makeComplicatedListenableFuture() + + clientFuture.success { + val name = it.first + it.second.success { + clientQuotes += "Quote by $name: $it" + } + } + + val rpcQueuesQuery = SimpleString("clients.${authenticatedUser.username}.rpc.*") + assertEquals(2, clientSession.addressQuery(rpcQueuesQuery).queueNames.size) + + assertThat(clientQuotes).isEmpty() + + serverQuote.set(twainQuote) + assertThat(clientQuotes.take()).isEqualTo("Quote by Mark Twain: I have never let my schooling interfere with my education.") + + // TODO This final assert sometimes fails because the relevant queue hasn't been removed yet +// assertEquals(1, clientSession.addressQuery(rpcQueuesQuery).queueNames.size) + } + @Test fun versioning() { assertFailsWith { proxy.addedLater() } @@ -221,5 +262,4 @@ class ClientRPCInfrastructureTests { fun `authenticated user is available to RPC`() { assertThat(proxy.captureUser()).isEqualTo(authenticatedUser.username) } - } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt index ea6a94aef0..d8fdb57330 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt @@ -390,9 +390,8 @@ class StateMachineManagerTests { node2 sent sessionConfirm to node1, node2 sent sessionEnd(errorFlow.exceptionThrown) to node1 ) - // TODO see StateMachineManager.endAllFiberSessions -// // Make sure the original stack trace isn't sent down the wire -// assertThat((sessionTransfers.last().message as SessionEnd).errorResponse!!.stackTrace).isEmpty() + // Make sure the original stack trace isn't sent down the wire + assertThat((sessionTransfers.last().message as SessionEnd).errorResponse!!.stackTrace).isEmpty() } @Test diff --git a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt index 2f84383c2f..577b024995 100644 --- a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt +++ b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt @@ -6,6 +6,7 @@ import net.corda.core.contracts.TransactionType import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.div +import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.utilities.Emoji @@ -83,9 +84,9 @@ fun sender(rpc: CordaRPCOps) { // Send the transaction to the other recipient val stx = ptx.toSignedTransaction() println("Sending ${stx.id}") - val protocolHandle = rpc.startFlow(::FinalityFlow, stx, setOf(otherSide)) - protocolHandle.progress.subscribe(::println) - protocolHandle.returnValue.toBlocking().first() + val flowHandle = rpc.startFlow(::FinalityFlow, stx, setOf(otherSide)) + flowHandle.progress.subscribe(::println) + flowHandle.returnValue.getOrThrow() } fun recipient(rpc: CordaRPCOps) { diff --git a/samples/bank-of-corda-demo/src/integration-test/kotlin/net/corda/bank/BankOfCordaRPCClientTest.kt b/samples/bank-of-corda-demo/src/integration-test/kotlin/net/corda/bank/BankOfCordaRPCClientTest.kt index 5337fface2..504006d755 100644 --- a/samples/bank-of-corda-demo/src/integration-test/kotlin/net/corda/bank/BankOfCordaRPCClientTest.kt +++ b/samples/bank-of-corda-demo/src/integration-test/kotlin/net/corda/bank/BankOfCordaRPCClientTest.kt @@ -5,7 +5,6 @@ import net.corda.core.contracts.DOLLARS import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow import net.corda.core.node.services.ServiceInfo -import net.corda.core.transactions.SignedTransaction import net.corda.flows.IssuerFlow.IssuanceRequester import net.corda.node.driver.driver import net.corda.node.services.User @@ -16,7 +15,6 @@ import net.corda.testing.expect import net.corda.testing.expectEvents import net.corda.testing.sequence import org.junit.Test -import kotlin.test.assertTrue class BankOfCordaRPCClientTest { @Test @@ -45,13 +43,12 @@ class BankOfCordaRPCClientTest { val vaultUpdatesBigCorp = bigCorpProxy.vaultAndUpdates().second // Kick-off actual Issuer Flow - val result = bocProxy.startFlow( + bocProxy.startFlow( ::IssuanceRequester, 1000.DOLLARS, nodeBigCorporation.nodeInfo.legalIdentity, BOC_PARTY_REF, - nodeBankOfCorda.nodeInfo.legalIdentity).returnValue.toBlocking().first() - assertTrue { result is SignedTransaction } + nodeBankOfCorda.nodeInfo.legalIdentity).returnValue.getOrThrow() // Check Bank of Corda Vault Updates vaultUpdatesBoc.expectEvents { diff --git a/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt b/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt index c84e23d628..4069f988de 100644 --- a/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt +++ b/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt @@ -2,14 +2,15 @@ package net.corda.bank.api import com.google.common.net.HostAndPort import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams -import net.corda.flows.IssuerFlow.IssuanceRequester -import net.corda.node.services.messaging.CordaRPCClient import net.corda.core.contracts.Amount import net.corda.core.contracts.currency +import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow import net.corda.core.serialization.OpaqueBytes import net.corda.core.transactions.SignedTransaction +import net.corda.flows.IssuerFlow.IssuanceRequester import net.corda.node.services.config.configureTestSSL +import net.corda.node.services.messaging.CordaRPCClient import net.corda.testing.http.HttpApi /** @@ -43,6 +44,6 @@ class BankOfCordaClientApi(val hostAndPort: HostAndPort) { val amount = Amount(params.amount, currency(params.currency)) val issuerToPartyRef = OpaqueBytes.of(params.issueToPartyRefAsString.toByte()) - return proxy.startFlow(::IssuanceRequester, amount, issueToParty, issuerToPartyRef, issuerBankParty).returnValue.toBlocking().first() + return proxy.startFlow(::IssuanceRequester, amount, issueToParty, issuerToPartyRef, issuerBankParty).returnValue.getOrThrow() } } diff --git a/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaWebApi.kt b/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaWebApi.kt index 877a782bcc..cc00506df8 100644 --- a/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaWebApi.kt +++ b/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaWebApi.kt @@ -1,13 +1,14 @@ package net.corda.bank.api -import net.corda.flows.IssuerFlow.IssuanceRequester import net.corda.core.contracts.Amount import net.corda.core.contracts.currency +import net.corda.core.flows.FlowException +import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.serialization.OpaqueBytes -import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.loggerFor +import net.corda.flows.IssuerFlow.IssuanceRequester import java.time.LocalDateTime import javax.ws.rs.* import javax.ws.rs.core.MediaType @@ -46,12 +47,13 @@ class BankOfCordaWebApi(val rpc: CordaRPCOps) { // invoke client side of Issuer Flow: IssuanceRequester // The line below blocks and waits for the future to resolve. - val result = rpc.startFlow(::IssuanceRequester, amount, issueToParty, issuerToPartyRef, issuerBankParty).returnValue.toBlocking().first() - if (result is SignedTransaction) { - logger.info("Issue request completed successfully: ${params}") - return Response.status(Response.Status.CREATED).build() - } else { - return Response.status(Response.Status.BAD_REQUEST).build() + val status = try { + rpc.startFlow(::IssuanceRequester, amount, issueToParty, issuerToPartyRef, issuerBankParty).returnValue.getOrThrow() + logger.info("Issue request completed successfully: $params") + Response.Status.CREATED + } catch (e: FlowException) { + Response.Status.BAD_REQUEST } + return Response.status(status).build() } } \ No newline at end of file diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/api/InterestRateSwapAPI.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/api/InterestRateSwapAPI.kt index 559c163e6d..09e586f3b3 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/api/InterestRateSwapAPI.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/api/InterestRateSwapAPI.kt @@ -1,6 +1,7 @@ package net.corda.irs.api import net.corda.core.contracts.filterStatesOfType +import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.utilities.loggerFor @@ -66,12 +67,12 @@ class InterestRateSwapAPI(val rpc: CordaRPCOps) { @Path("deals") @Consumes(MediaType.APPLICATION_JSON) fun storeDeal(newDeal: InterestRateSwap.State): Response { - try { - rpc.startFlow(AutoOfferFlow::Requester, newDeal).returnValue.toBlocking().first() - return Response.created(URI.create(generateDealLink(newDeal))).build() + return try { + rpc.startFlow(AutoOfferFlow::Requester, newDeal).returnValue.getOrThrow() + Response.created(URI.create(generateDealLink(newDeal))).build() } catch (ex: Throwable) { logger.info("Exception when creating deal: $ex") - return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(ex.toString()).build() + Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(ex.toString()).build() } } @@ -94,7 +95,7 @@ class InterestRateSwapAPI(val rpc: CordaRPCOps) { val priorDemoDate = fetchDemoDate() // Can only move date forwards if (newDemoDate.isAfter(priorDemoDate)) { - rpc.startFlow(UpdateBusinessDayFlow::Broadcast, newDemoDate).returnValue.toBlocking().first() + rpc.startFlow(UpdateBusinessDayFlow::Broadcast, newDemoDate).returnValue.getOrThrow() return Response.ok().build() } val msg = "demodate is already $priorDemoDate and can only be updated with a later date" @@ -113,7 +114,7 @@ class InterestRateSwapAPI(val rpc: CordaRPCOps) { @Path("restart") @Consumes(MediaType.APPLICATION_JSON) fun exitServer(): Response { - rpc.startFlow(ExitServerFlow::Broadcast, 83).returnValue.toBlocking().first() + rpc.startFlow(ExitServerFlow::Broadcast, 83).returnValue.getOrThrow() return Response.ok().build() } } diff --git a/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/NotaryDemo.kt b/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/NotaryDemo.kt index be4f1f03b4..c88f917d43 100644 --- a/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/NotaryDemo.kt +++ b/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/NotaryDemo.kt @@ -1,8 +1,10 @@ package net.corda.notarydemo import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.Futures import net.corda.core.crypto.toStringShort import net.corda.core.div +import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.transactions.SignedTransaction @@ -60,9 +62,9 @@ private class NotaryDemoClientApi(val rpc: CordaRPCOps) { */ private fun buildTransactions(count: Int): List { val moveTransactions = (1..count).map { - rpc.startFlow(::DummyIssueAndMove, notary, counterpartyNode.legalIdentity).returnValue.toBlocking().toFuture() + rpc.startFlow(::DummyIssueAndMove, notary, counterpartyNode.legalIdentity).returnValue } - return moveTransactions.map { it.get() } + return Futures.allAsList(moveTransactions).getOrThrow() } /** @@ -72,10 +74,8 @@ private class NotaryDemoClientApi(val rpc: CordaRPCOps) { * @return a list of encoded signer public keys - one for every transaction */ private fun notariseTransactions(transactions: List): List { - val signatureFutures = transactions.map { - rpc.startFlow(NotaryFlow::Client, it).returnValue.toBlocking().toFuture() - } - return signatureFutures.map { it.get().by.toStringShort() } + val signatureFutures = transactions.map { rpc.startFlow(NotaryFlow::Client, it).returnValue } + return Futures.allAsList(signatureFutures).getOrThrow().map { it.by.toStringShort() } } } diff --git a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/api/PortfolioApi.kt b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/api/PortfolioApi.kt index 1c45d087b9..1127fc6c3f 100644 --- a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/api/PortfolioApi.kt +++ b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/api/PortfolioApi.kt @@ -6,6 +6,7 @@ import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.filterStatesOfType import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.Party +import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.vega.analytics.InitialMarginTriple @@ -159,7 +160,7 @@ class PortfolioApi(val rpc: CordaRPCOps) { return withParty(partyName) { val buyer = if (swap.buySell.isBuy) ownParty else it val seller = if (swap.buySell.isSell) ownParty else it - rpc.startFlow(IRSTradeFlow::Requester, swap.toData(buyer, seller), it).returnValue.toBlocking().first() + rpc.startFlow(IRSTradeFlow::Requester, swap.toData(buyer, seller), it).returnValue.getOrThrow() Response.accepted().entity("{}").build() } } @@ -266,12 +267,12 @@ class PortfolioApi(val rpc: CordaRPCOps) { fun startPortfolioCalculations(params: ValuationCreationParams = ValuationCreationParams(LocalDate.of(2016, 6, 6)), @PathParam("party") partyName: String): Response { return withParty(partyName) { otherParty -> val existingSwap = getPortfolioWith(otherParty) - if (existingSwap == null) { - rpc.startFlow(SimmFlow::Requester, otherParty, params.valuationDate).returnValue.toBlocking().first() + val flowHandle = if (existingSwap == null) { + rpc.startFlow(SimmFlow::Requester, otherParty, params.valuationDate) } else { - val handle = rpc.startFlow(SimmRevaluation::Initiator, getPortfolioStateAndRefWith(otherParty).ref, params.valuationDate) - handle.returnValue.toBlocking().first() + rpc.startFlow(SimmRevaluation::Initiator, getPortfolioStateAndRefWith(otherParty).ref, params.valuationDate) } + flowHandle.returnValue.getOrThrow() withPortfolio(otherParty) { portfolioState -> val portfolio = portfolioState.portfolio.toStateAndRef(rpc).toPortfolio() diff --git a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/TraderDemoClientApi.kt b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/TraderDemoClientApi.kt index bc63f62c23..366940a3b6 100644 --- a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/TraderDemoClientApi.kt +++ b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/TraderDemoClientApi.kt @@ -1,19 +1,17 @@ package net.corda.traderdemo -import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.Futures import net.corda.contracts.testing.calculateRandomlySizedAmounts import net.corda.core.contracts.Amount import net.corda.core.contracts.DOLLARS +import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.serialization.OpaqueBytes -import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.Emoji import net.corda.core.utilities.loggerFor import net.corda.flows.IssuerFlow.IssuanceRequester -import net.corda.node.services.messaging.CordaRPCClient import net.corda.testing.BOC -import net.corda.testing.http.HttpApi import net.corda.traderdemo.flow.SellerFlow import java.util.* import kotlin.test.assertEquals @@ -26,20 +24,17 @@ class TraderDemoClientApi(val rpc: CordaRPCOps) { val logger = loggerFor() } - fun runBuyer(amount: Amount = 30000.0.DOLLARS, notary: String = "Notary"): Boolean { + fun runBuyer(amount: Amount = 30000.0.DOLLARS): Boolean { val bankOfCordaParty = rpc.partyFromName(BOC.name) ?: throw Exception("Unable to locate ${BOC.name} in Network Map Service") val me = rpc.nodeIdentity() // TODO: revert back to multiple issue request amounts (3,10) when soft locking implemented val amounts = calculateRandomlySizedAmounts(amount, 1, 1, Random()) - val handles = amounts.map { - rpc.startFlow(::IssuanceRequester, amount, me.legalIdentity, OpaqueBytes.of(1), bankOfCordaParty) - } - - handles.forEach { - require(it.returnValue.toBlocking().first() is SignedTransaction) + val resultFutures = amounts.map { + rpc.startFlow(::IssuanceRequester, amount, me.legalIdentity, OpaqueBytes.of(1), bankOfCordaParty).returnValue } + Futures.allAsList(resultFutures).getOrThrow() return true } @@ -60,7 +55,7 @@ class TraderDemoClientApi(val rpc: CordaRPCOps) { } // The line below blocks and waits for the future to resolve. - val stx = rpc.startFlow(::SellerFlow, otherParty, amount).returnValue.toBlocking().first() + val stx = rpc.startFlow(::SellerFlow, otherParty, amount).returnValue.getOrThrow() logger.info("Sale completed - we have a happy customer!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(stx.tx)}") return true } else { diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt index 17661dfa81..64ee18f7ca 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt @@ -21,7 +21,6 @@ import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow import net.corda.core.node.NodeInfo import net.corda.core.serialization.OpaqueBytes -import net.corda.core.toFuture import net.corda.core.transactions.SignedTransaction import net.corda.explorer.model.CashTransaction import net.corda.explorer.model.IssuerModel @@ -101,7 +100,7 @@ class NewTransaction : Fragment() { rpcProxy.value!!.startFlow(::CashFlow, it) } val response = try { - handle?.returnValue?.toFuture()?.getOrThrow() + handle?.returnValue?.getOrThrow() } catch (e: FlowException) { e } diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt index 42ed3b0182..495f468a7d 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt @@ -7,12 +7,11 @@ import net.corda.core.contracts.Issued import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.USD import net.corda.core.crypto.Party +import net.corda.core.flows.FlowException import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow import net.corda.core.serialization.OpaqueBytes -import net.corda.core.toFuture import net.corda.flows.CashCommand -import net.corda.flows.CashException import net.corda.flows.CashFlow import net.corda.loadtest.LoadTest import net.corda.loadtest.NodeHandle @@ -208,9 +207,9 @@ val crossCashTest = LoadTest( execute = { command -> try { - val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.toFuture().getOrThrow() + val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.getOrThrow() log.info("Success: $result") - } catch (e: CashException) { + } catch (e: FlowException) { log.error("Failure", e) } }, diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt index 2abc1a1270..51b0767b1c 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt @@ -7,11 +7,10 @@ import net.corda.client.mock.replicatePoisson import net.corda.contracts.asset.Cash import net.corda.core.contracts.USD import net.corda.core.crypto.Party +import net.corda.core.flows.FlowException import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow -import net.corda.core.toFuture import net.corda.flows.CashCommand -import net.corda.flows.CashException import net.corda.flows.CashFlow import net.corda.loadtest.LoadTest import net.corda.loadtest.NodeHandle @@ -63,9 +62,9 @@ val selfIssueTest = LoadTest( execute = { command -> try { - val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.toFuture().getOrThrow() + val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.getOrThrow() log.info("Success: $result") - } catch (e: CashException) { + } catch (e: FlowException) { log.error("Failure", e) } },