diff --git a/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt b/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt index 95baaca2ca..fd5f01ebc4 100644 --- a/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt +++ b/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt @@ -2,6 +2,7 @@ package net.corda.client import net.corda.core.contracts.DOLLARS import net.corda.core.getOrThrow +import net.corda.core.messaging.startFlow import net.corda.core.node.services.ServiceInfo import net.corda.core.random63BitValue import net.corda.core.serialization.OpaqueBytes @@ -12,7 +13,7 @@ import net.corda.node.driver.driver import net.corda.node.services.User import net.corda.node.services.config.configureTestSSL import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.toHostAndPort -import net.corda.node.services.messaging.startFlow +import net.corda.node.services.messaging.CordaRPCClient import net.corda.node.services.startFlowPermission import net.corda.node.services.transactions.ValidatingNotaryService import org.apache.activemq.artemis.api.core.ActiveMQSecurityException diff --git a/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt b/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt index 215ad570a4..2f85a897f1 100644 --- a/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt +++ b/client/src/integration-test/kotlin/net/corda/client/NodeMonitorModelTest.kt @@ -9,6 +9,7 @@ import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.USD import net.corda.core.flows.StateMachineRunId import net.corda.core.getOrThrow +import net.corda.core.messaging.StateMachineUpdate import net.corda.core.node.NodeInfo import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.ServiceInfo @@ -22,7 +23,6 @@ import net.corda.node.driver.driver import net.corda.node.services.User import net.corda.node.services.config.configureTestSSL import net.corda.node.services.messaging.ArtemisMessagingComponent -import net.corda.node.services.messaging.StateMachineUpdate import net.corda.node.services.network.NetworkMapService import net.corda.node.services.startFlowPermission import net.corda.node.services.transactions.SimpleNotaryService diff --git a/client/src/main/kotlin/net/corda/client/model/GatheredTransactionDataModel.kt b/client/src/main/kotlin/net/corda/client/model/GatheredTransactionDataModel.kt index 5132b3b2c1..69a466b39b 100644 --- a/client/src/main/kotlin/net/corda/client/model/GatheredTransactionDataModel.kt +++ b/client/src/main/kotlin/net/corda/client/model/GatheredTransactionDataModel.kt @@ -10,8 +10,8 @@ import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateRef import net.corda.core.crypto.SecureHash import net.corda.core.flows.StateMachineRunId +import net.corda.core.messaging.StateMachineUpdate import net.corda.core.transactions.SignedTransaction -import net.corda.node.services.messaging.StateMachineUpdate import org.fxmisc.easybind.EasyBind data class GatheredTransactionData( diff --git a/client/src/main/kotlin/net/corda/client/model/NodeMonitorModel.kt b/client/src/main/kotlin/net/corda/client/model/NodeMonitorModel.kt index 5fbf0a30d6..afcb7ebcad 100644 --- a/client/src/main/kotlin/net/corda/client/model/NodeMonitorModel.kt +++ b/client/src/main/kotlin/net/corda/client/model/NodeMonitorModel.kt @@ -2,8 +2,11 @@ package net.corda.client.model import com.google.common.net.HostAndPort import javafx.beans.property.SimpleObjectProperty -import net.corda.client.CordaRPCClient import net.corda.core.flows.StateMachineRunId +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.StateMachineInfo +import net.corda.core.messaging.StateMachineUpdate +import net.corda.core.messaging.startFlow import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.node.services.StateMachineTransactionMapping import net.corda.core.node.services.Vault @@ -11,10 +14,7 @@ import net.corda.core.transactions.SignedTransaction import net.corda.flows.CashCommand import net.corda.flows.CashFlow import net.corda.node.services.config.NodeSSLConfiguration -import net.corda.node.services.messaging.CordaRPCOps -import net.corda.node.services.messaging.StateMachineInfo -import net.corda.node.services.messaging.StateMachineUpdate -import net.corda.node.services.messaging.startFlow +import net.corda.node.services.messaging.CordaRPCClient import rx.Observable import rx.subjects.PublishSubject diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt similarity index 77% rename from node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCOps.kt rename to core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index c9afdd6117..56f9384747 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -1,7 +1,9 @@ -package net.corda.node.services.messaging +package net.corda.core.messaging import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef +import net.corda.core.crypto.CompositeKey +import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId @@ -10,49 +12,19 @@ import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.StateMachineTransactionMapping import net.corda.core.node.services.Vault import net.corda.core.transactions.SignedTransaction -import net.corda.core.utilities.ProgressTracker -import net.corda.node.services.statemachine.FlowStateMachineImpl -import net.corda.node.services.statemachine.StateMachineManager -import net.corda.node.utilities.AddOrRemove import rx.Observable +import java.io.InputStream +import java.time.Instant data class StateMachineInfo( val id: StateMachineRunId, val flowLogicClassName: String, val progressTrackerStepAndUpdates: Pair>? -) { - companion object { - fun fromFlowStateMachineImpl(psm: FlowStateMachineImpl<*>): StateMachineInfo { - return StateMachineInfo( - id = psm.id, - flowLogicClassName = psm.logic.javaClass.simpleName, - progressTrackerStepAndUpdates = psm.logic.track() - ) - } - } -} +) sealed class StateMachineUpdate(val id: StateMachineRunId) { class Added(val stateMachineInfo: StateMachineInfo) : StateMachineUpdate(stateMachineInfo.id) class Removed(id: StateMachineRunId) : StateMachineUpdate(id) - - companion object { - fun fromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate { - return when (change.addOrRemove) { - AddOrRemove.ADD -> { - val stateMachineInfo = StateMachineInfo( - id = change.id, - flowLogicClassName = change.logic.javaClass.simpleName, - progressTrackerStepAndUpdates = change.logic.track() - ) - StateMachineUpdate.Added(stateMachineInfo) - } - AddOrRemove.REMOVE -> { - StateMachineUpdate.Removed(change.id) - } - } - } - } } /** @@ -112,6 +84,33 @@ interface CordaRPCOps : RPCOps { * Retrieve existing note(s) for a given Vault transaction */ fun getVaultTransactionNotes(txnId: SecureHash): Iterable + + /** + * Checks whether an attachment with the given hash is stored on the node. + */ + fun attachmentExists(id: SecureHash): Boolean + + /** + * Uploads a jar to the node, returns it's hash. + */ + fun uploadAttachment(jar: InputStream): SecureHash + + /** + * Returns the node-local current time. + */ + fun currentNodeTime(): Instant + + // TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of + // the node's state locally and query that directly. + /** + * Returns the [Party] corresponding to the given key, if found. + */ + fun partyFromKey(key: CompositeKey): Party? + + /** + * Returns the [Party] with the given name as it's [Party.name] + */ + fun partyFromName(name: String): Party? } /** @@ -158,8 +157,17 @@ inline fun > CordaRPCOps.startFlow arg3: D ) = startFlowDynamic(R::class.java, arg0, arg1, arg2, arg3) +/** + * [FlowHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value. + * + * @param id The started state machine's ID. + * @param progress The stream of progress tracker events. + * @param returnValue An Observable emitting a single event containing the flow's return value. + * To block on this value: + * val returnValue = rpc.startFlow(::MyFlow).returnValue.toBlocking().first() + */ data class FlowHandle( val id: StateMachineRunId, - val progress: Observable, + val progress: Observable, val returnValue: Observable ) diff --git a/core/src/main/kotlin/net/corda/core/messaging/RPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/RPCOps.kt new file mode 100644 index 0000000000..6a3eaa4c1d --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/messaging/RPCOps.kt @@ -0,0 +1,10 @@ +package net.corda.core.messaging + +/** + * Base interface that all RPC servers must implement. Note: in Corda there's only one RPC interface. This base + * interface is here in case we split the RPC system out into a separate library one day. + */ +interface RPCOps { + /** Returns the RPC protocol version. Exists since version 0 so guaranteed to be present. */ + val protocolVersion: Int +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/messaging/RPCReturnsObservables.kt b/core/src/main/kotlin/net/corda/core/messaging/RPCReturnsObservables.kt new file mode 100644 index 0000000000..1c749b82a4 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/messaging/RPCReturnsObservables.kt @@ -0,0 +1,10 @@ +package net.corda.core.messaging + +/** + * If an RPC is tagged with this annotation it may return one or more observables anywhere in its response graph. + * Calling such a method comes with consequences: it's slower, and consumes server side resources as observations + * will buffer up on the server until they're consumed by the client. + */ +@Target(AnnotationTarget.FUNCTION) +@MustBeDocumented +annotation class RPCReturnsObservables \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/node/CordaPluginRegistry.kt b/core/src/main/kotlin/net/corda/core/node/CordaPluginRegistry.kt index b3b434c586..797e1fe275 100644 --- a/core/src/main/kotlin/net/corda/core/node/CordaPluginRegistry.kt +++ b/core/src/main/kotlin/net/corda/core/node/CordaPluginRegistry.kt @@ -1,52 +1,53 @@ package net.corda.core.node import com.esotericsoftware.kryo.Kryo +import net.corda.core.messaging.CordaRPCOps +import java.util.function.Function /** * Implement this interface on a class advertised in a META-INF/services/net.corda.core.node.CordaPluginRegistry file * to extend a Corda node with additional application services. */ -abstract class CordaPluginRegistry { - /** - * List of JAX-RS classes inside the contract jar. They are expected to have a single parameter constructor that takes a ServiceHub as input. - * These are listed as Class<*>, because in the future they will be instantiated inside a ClassLoader so that - * Cordapp code can be loaded dynamically. - */ - open val webApis: List> = emptyList() +abstract class CordaPluginRegistry( + /** + * List of lambdas returning JAX-RS objects. They may only depend on the RPC interface, as the webserver should + * potentially be able to live in a process separate from the node itself. + */ + open val webApis: List> = emptyList(), - /** - * Map of static serving endpoints to the matching resource directory. All endpoints will be prefixed with "/web" and postfixed with "\*. - * Resource directories can be either on disk directories (especially when debugging) in the form "a/b/c". Serving from a JAR can - * be specified with: javaClass.getResource("").toExternalForm() - */ - open val staticServeDirs: Map = emptyMap() + /** + * Map of static serving endpoints to the matching resource directory. All endpoints will be prefixed with "/web" and postfixed with "\*. + * Resource directories can be either on disk directories (especially when debugging) in the form "a/b/c". Serving from a JAR can + * be specified with: javaClass.getResource("").toExternalForm() + */ + open val staticServeDirs: Map = emptyMap(), - /** - * A Map with an entry for each consumed flow used by the webAPIs. - * The key of each map entry should contain the FlowLogic class name. - * The associated map values are the union of all concrete class names passed to the flow constructor. - * Standard java.lang.* and kotlin.* types do not need to be included explicitly. - * This is used to extend the white listed flows that can be initiated from the ServiceHub invokeFlowAsync method. - */ - open val requiredFlows: Map> = emptyMap() + /** + * A Map with an entry for each consumed Flow used by the webAPIs. + * The key of each map entry should contain the FlowLogic class name. + * The associated map values are the union of all concrete class names passed to the Flow constructor. + * Standard java.lang.* and kotlin.* types do not need to be included explicitly. + * This is used to extend the white listed Flows that can be initiated from the ServiceHub invokeFlowAsync method. + */ + open val requiredFlows: Map> = emptyMap(), - /** - * List of additional long lived services to be hosted within the node. - * They are expected to have a single parameter constructor that takes a [PluginServiceHub] as input. - * The [PluginServiceHub] will be fully constructed before the plugin service is created and will - * allow access to the flow factory and flow initiation entry points there. - */ - open val servicePlugins: List> = emptyList() - - /** - * Optionally register types with [Kryo] for use over RPC, as we lock down the types that can be serialised in this - * particular use case. - * For example, if you add an RPC interface that carries some contract states back and forth, you need to register - * those classes here using the [register] method on Kryo. - * - * TODO: Kryo and likely the requirement to register classes here will go away when we replace the serialization implementation. - * - * @return true if you register types, otherwise you will be filtered out of the list of plugins considered in future. - */ - open fun registerRPCKryoTypes(kryo: Kryo): Boolean = false -} + /** + * List of lambdas constructing additional long lived services to be hosted within the node. + * They expect a single [PluginServiceHub] parameter as input. + * The [PluginServiceHub] will be fully constructed before the plugin service is created and will + * allow access to the Flow factory and Flow initiation entry points there. + */ + open val servicePlugins: List> = emptyList() +) { + /** + * Optionally register types with [Kryo] for use over RPC, as we lock down the types that can be serialised in this + * particular use case. + * For example, if you add an RPC interface that carries some contract states back and forth, you need to register + * those classes here using the [register] method on Kryo. + * + * TODO: Kryo and likely the requirement to register classes here will go away when we replace the serialization implementation. + * + * @return true if you register types, otherwise you will be filtered out of the list of plugins considered in future. + */ + open fun registerRPCKryoTypes(kryo: Kryo): Boolean = false +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt index e4f0e86f9d..b7d3e37e68 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt @@ -25,9 +25,7 @@ import net.i2p.crypto.eddsa.EdDSAPublicKey import net.i2p.crypto.eddsa.spec.EdDSAPrivateKeySpec import net.i2p.crypto.eddsa.spec.EdDSAPublicKeySpec import org.objenesis.strategy.StdInstantiatorStrategy -import java.io.ByteArrayOutputStream -import java.io.ObjectInputStream -import java.io.ObjectOutputStream +import java.io.* import java.lang.reflect.InvocationTargetException import java.nio.file.Files import java.nio.file.Path @@ -202,6 +200,55 @@ class ImmutableClassSerializer(val klass: KClass) : Serializer() } } +// TODO This is a temporary inefficient serialiser for sending InputStreams through RPC. This may be done much more +// efficiently using Artemis's large message feature. +class InputStreamSerializer : Serializer() { + override fun write(kryo: Kryo, output: Output, stream: InputStream) { + val buffer = ByteArray(4096) + while (true) { + val numberOfBytesRead = stream.read(buffer) + if (numberOfBytesRead > 0) { + output.writeInt(numberOfBytesRead, true) + output.writeBytes(buffer, 0, numberOfBytesRead) + } else { + output.writeInt(0) + break + } + } + } + + override fun read(kryo: Kryo, input: Input, type: Class): InputStream { + val chunks = ArrayList() + while (true) { + val chunk = input.readBytesWithLength() + if (chunk.isEmpty()) { + break + } else { + chunks.add(chunk) + } + } + + return object : InputStream() { + var offset = 0 + override fun read(): Int { + while (!chunks.isEmpty()) { + val chunk = chunks[0] + if (offset >= chunk.size) { + offset = 0 + chunks.removeAt(0) + } else { + val byte = chunk[offset] + offset++ + return byte.toInt() and 0xFF + } + } + return -1 + } + } + } + +} + inline fun Kryo.useClassLoader(cl: ClassLoader, body: () -> T): T { val tmp = this.classLoader ?: ClassLoader.getSystemClassLoader() this.classLoader = cl @@ -405,6 +452,8 @@ fun createKryo(k: Kryo = Kryo()): Kryo { /** This ensures any kotlin objects that implement [DeserializeAsKotlinObjectDef] are read back in as singletons. */ addDefaultSerializer(DeserializeAsKotlinObjectDef::class.java, KotlinObjectSerializer) + addDefaultSerializer(InputStream::class.java, InputStreamSerializer()) + ImmutableListSerializer.registerSerializers(k) ImmutableSetSerializer.registerSerializers(k) ImmutableSortedSetSerializer.registerSerializers(k) diff --git a/core/src/main/kotlin/net/corda/core/utilities/ApiUtils.kt b/core/src/main/kotlin/net/corda/core/utilities/ApiUtils.kt index 7a0083a537..d8502eb06d 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/ApiUtils.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/ApiUtils.kt @@ -2,13 +2,13 @@ package net.corda.core.utilities import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.Party -import net.corda.core.node.ServiceHub +import net.corda.core.messaging.CordaRPCOps import javax.ws.rs.core.Response /** * Utility functions to reduce boilerplate when developing HTTP APIs */ -class ApiUtils(val services: ServiceHub) { +class ApiUtils(val rpc: CordaRPCOps) { private val defaultNotFound = { msg: String -> Response.status(Response.Status.NOT_FOUND).entity(msg).build() } /** @@ -18,7 +18,7 @@ class ApiUtils(val services: ServiceHub) { fun withParty(partyKeyStr: String, notFound: (String) -> Response = defaultNotFound, found: (Party) -> Response): Response { return try { val partyKey = CompositeKey.parseFromBase58(partyKeyStr) - val party = services.identityService.partyFromKey(partyKey) + val party = rpc.partyFromKey(partyKey) if (party == null) notFound("Unknown party") else found(party) } catch (e: IllegalArgumentException) { notFound("Invalid base58 key passed for party key") diff --git a/core/src/main/kotlin/net/corda/flows/FinalityFlow.kt b/core/src/main/kotlin/net/corda/flows/FinalityFlow.kt index 9f4df48ba2..f6556f5217 100644 --- a/core/src/main/kotlin/net/corda/flows/FinalityFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/FinalityFlow.kt @@ -16,7 +16,9 @@ import net.corda.core.utilities.ProgressTracker */ class FinalityFlow(val transaction: SignedTransaction, val participants: Set, - override val progressTracker: ProgressTracker = tracker()) : FlowLogic() { + override val progressTracker: ProgressTracker) : FlowLogic() { + constructor(transaction: SignedTransaction, participants: Set) : this(transaction, participants, tracker()) + companion object { object NOTARISING : ProgressTracker.Step("Requesting signature by notary service") object BROADCASTING : ProgressTracker.Step("Broadcasting transaction to participants") diff --git a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt index 2d1586b1ae..1fef4059d5 100644 --- a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt @@ -22,7 +22,8 @@ object NotaryFlow { * by another transaction or the timestamp is invalid. */ open class Client(private val stx: SignedTransaction, - override val progressTracker: ProgressTracker = Client.tracker()) : FlowLogic() { + override val progressTracker: ProgressTracker) : FlowLogic() { + constructor(stx: SignedTransaction) : this(stx, Client.tracker()) companion object { diff --git a/core/src/test/kotlin/net/corda/core/serialization/KryoTests.kt b/core/src/test/kotlin/net/corda/core/serialization/KryoTests.kt index f0207f9cc4..d0ace823e7 100644 --- a/core/src/test/kotlin/net/corda/core/serialization/KryoTests.kt +++ b/core/src/test/kotlin/net/corda/core/serialization/KryoTests.kt @@ -7,8 +7,10 @@ import net.corda.core.messaging.Ack import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.Test +import java.io.InputStream import java.time.Instant import java.util.* +import kotlin.test.assertEquals class KryoTests { @@ -82,6 +84,16 @@ class KryoTests { assertThat(tokenizableAfter).isSameAs(tokenizableBefore) } + @Test + fun `InputStream serialisation`() { + val rubbish = ByteArray(12345, { (it * it * 0.12345).toByte() }) + val readRubbishStream: InputStream = rubbish.inputStream().serialize(kryo).deserialize(kryo) + for (i in 0 .. 12344) { + assertEquals(rubbish[i], readRubbishStream.read().toByte()) + } + assertEquals(-1, readRubbishStream.read()) + } + private data class Person(val name: String, val birthday: Instant?) @Suppress("unused") diff --git a/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt b/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt index 64b2902048..fd551faf71 100644 --- a/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt +++ b/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt @@ -1,9 +1,9 @@ package net.corda.docs -import net.corda.client.CordaRPCClient import net.corda.contracts.asset.Cash import net.corda.core.contracts.DOLLARS import net.corda.core.contracts.issuedBy +import net.corda.core.messaging.startFlow import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.Vault import net.corda.core.serialization.OpaqueBytes @@ -14,7 +14,7 @@ import net.corda.node.driver.driver import net.corda.node.services.User import net.corda.node.services.config.configureTestSSL import net.corda.node.services.messaging.ArtemisMessagingComponent -import net.corda.node.services.messaging.startFlow +import net.corda.node.services.messaging.CordaRPCClient import net.corda.node.services.startFlowPermission import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.testing.expect diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt index 27cb6bf166..04fc07e226 100644 --- a/docs/source/example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt @@ -1,13 +1,14 @@ package net.corda.docs import com.esotericsoftware.kryo.Kryo -import net.corda.client.CordaRPCClient import net.corda.contracts.asset.Cash import net.corda.core.contracts.Amount import net.corda.core.contracts.Issued import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.USD import net.corda.core.div +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.startFlow import net.corda.core.node.CordaPluginRegistry import net.corda.core.node.services.ServiceInfo import net.corda.core.serialization.OpaqueBytes @@ -18,8 +19,7 @@ import net.corda.node.driver.driver import net.corda.node.services.User import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.NodeSSLConfiguration -import net.corda.node.services.messaging.CordaRPCOps -import net.corda.node.services.messaging.startFlow +import net.corda.node.services.messaging.CordaRPCClient import net.corda.node.services.startFlowPermission import net.corda.node.services.transactions.ValidatingNotaryService import org.graphstream.graph.Edge diff --git a/finance/src/main/kotlin/net/corda/contracts/testing/VaultFiller.kt b/finance/src/main/kotlin/net/corda/contracts/testing/VaultFiller.kt index c8355d9b33..a4284a41a3 100644 --- a/finance/src/main/kotlin/net/corda/contracts/testing/VaultFiller.kt +++ b/finance/src/main/kotlin/net/corda/contracts/testing/VaultFiller.kt @@ -63,7 +63,7 @@ fun ServiceHub.fillWithSomeTestCash(howMuch: Amount, return Vault(states) } -private fun calculateRandomlySizedAmounts(howMuch: Amount, min: Int, max: Int, rng: Random): LongArray { +fun calculateRandomlySizedAmounts(howMuch: Amount, min: Int, max: Int, rng: Random): LongArray { val numSlots = min + Math.floor(rng.nextDouble() * (max - min)).toInt() val baseSize = howMuch.quantity / numSlots check(baseSize > 0) { baseSize } diff --git a/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt b/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt index 4209904d32..22fcacc56e 100644 --- a/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt @@ -11,7 +11,7 @@ import org.junit.Test class DriverTests { companion object { - fun nodeMustBeUp(nodeInfo: NodeInfo, nodeName: String) { + fun nodeMustBeUp(nodeInfo: NodeInfo) { val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address) // Check that the port is bound addressMustBeBound(hostAndPort) @@ -30,8 +30,8 @@ class DriverTests { val notary = startNode("TestNotary", setOf(ServiceInfo(SimpleNotaryService.type))) val regulator = startNode("Regulator", setOf(ServiceInfo(RegulatorService.type))) - nodeMustBeUp(notary.getOrThrow().nodeInfo, "TestNotary") - nodeMustBeUp(regulator.getOrThrow().nodeInfo, "Regulator") + nodeMustBeUp(notary.getOrThrow().nodeInfo) + nodeMustBeUp(regulator.getOrThrow().nodeInfo) Pair(notary.getOrThrow(), regulator.getOrThrow()) } nodeMustBeDown(notary.nodeInfo) @@ -42,7 +42,7 @@ class DriverTests { fun startingNodeWithNoServicesWorks() { val noService = driver { val noService = startNode("NoService") - nodeMustBeUp(noService.getOrThrow().nodeInfo, "NoService") + nodeMustBeUp(noService.getOrThrow().nodeInfo) noService.getOrThrow() } nodeMustBeDown(noService.nodeInfo) @@ -52,7 +52,7 @@ class DriverTests { fun randomFreePortAllocationWorks() { val nodeInfo = driver(portAllocation = PortAllocation.RandomFree()) { val nodeInfo = startNode("NoService") - nodeMustBeUp(nodeInfo.getOrThrow().nodeInfo, "NoService") + nodeMustBeUp(nodeInfo.getOrThrow().nodeInfo) nodeInfo.getOrThrow() } nodeMustBeDown(nodeInfo.nodeInfo) diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt index 571b937a62..c9bcacdf1f 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt @@ -2,12 +2,12 @@ package net.corda.services.messaging import co.paralleluniverse.fibers.Suspendable import com.google.common.net.HostAndPort -import net.corda.client.impl.CordaRPCClientImpl import net.corda.core.crypto.Party import net.corda.core.crypto.composite import net.corda.core.crypto.generateKeyPair import net.corda.core.flows.FlowLogic import net.corda.core.getOrThrow +import net.corda.core.messaging.CordaRPCOps import net.corda.core.random63BitValue import net.corda.core.seconds import net.corda.node.internal.Node @@ -18,7 +18,7 @@ import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NOT import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.P2P_QUEUE import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE -import net.corda.node.services.messaging.CordaRPCOps +import net.corda.node.services.messaging.CordaRPCClientImpl import net.corda.node.services.messaging.NodeMessagingClient.Companion.RPC_QUEUE_REMOVALS_QUEUE import net.corda.testing.messaging.SimpleMQClient import net.corda.testing.node.NodeBasedTest diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 8a3afb46f0..ac5bea5091 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -11,6 +11,7 @@ import net.corda.core.crypto.X509Utilities import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogicRefFactory import net.corda.core.flows.FlowStateMachine +import net.corda.core.messaging.RPCOps import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.* import net.corda.core.node.services.* @@ -21,6 +22,7 @@ import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction import net.corda.flows.CashCommand import net.corda.flows.CashFlow +import net.corda.flows.FinalityFlow import net.corda.flows.sendRequest import net.corda.node.api.APIServer import net.corda.node.services.api.* @@ -30,7 +32,6 @@ import net.corda.node.services.events.NodeSchedulerService import net.corda.node.services.events.ScheduledActivityObserver import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.keys.PersistentKeyManagementService -import net.corda.node.services.messaging.RPCOps import net.corda.node.services.network.InMemoryNetworkMapCache import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW_TOPIC @@ -82,7 +83,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo CashCommand.IssueCash::class.java, CashCommand.PayCash::class.java, CashCommand.ExitCash::class.java - ) + ), + FinalityFlow::class.java to emptySet() ) } @@ -340,8 +342,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo private fun buildPluginServices(tokenizableServices: MutableList): List { val pluginServices = pluginRegistries.flatMap { x -> x.servicePlugins } val serviceList = mutableListOf() - for (serviceClass in pluginServices) { - val service = serviceClass.getConstructor(PluginServiceHub::class.java).newInstance(services) + for (serviceConstructor in pluginServices) { + val service = serviceConstructor.apply(services) serviceList.add(service) tokenizableServices.add(service) if (service is AcceptsFileUpload) { diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 933028ce7e..36cdf90443 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -2,23 +2,36 @@ package net.corda.node.internal import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef +import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.FlowHandle +import net.corda.core.messaging.StateMachineInfo +import net.corda.core.messaging.StateMachineUpdate import net.corda.core.node.NodeInfo import net.corda.core.node.ServiceHub import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.StateMachineTransactionMapping import net.corda.core.node.services.Vault +import net.corda.core.serialization.serialize +import net.corda.node.services.messaging.requirePermission import net.corda.core.toObservable import net.corda.core.transactions.SignedTransaction -import net.corda.core.utilities.ProgressTracker -import net.corda.node.services.messaging.* +import net.corda.node.services.messaging.createRPCKryo import net.corda.node.services.startFlowPermission import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.services.statemachine.StateMachineManager +import net.corda.node.utilities.AddOrRemove import net.corda.node.utilities.databaseTransaction import org.jetbrains.exposed.sql.Database import rx.Observable +import java.io.BufferedInputStream +import java.io.File +import java.io.FileInputStream +import java.io.InputStream +import java.time.Instant +import java.time.LocalDateTime /** * Server side implementations of RPCs available to MQ based client tools. Execution takes place on the server @@ -51,8 +64,8 @@ class CordaRPCOpsImpl( override fun stateMachinesAndUpdates(): Pair, Observable> { val (allStateMachines, changes) = smm.track() return Pair( - allStateMachines.map { StateMachineInfo.fromFlowStateMachineImpl(it) }, - changes.map { StateMachineUpdate.fromStateMachineChange(it) } + allStateMachines.map { stateMachineInfoFromFlowStateMachineImpl(it) }, + changes.map { stateMachineUpdateFromStateMachineChange(it) } ) } @@ -84,8 +97,41 @@ class CordaRPCOpsImpl( val stateMachine = services.invokeFlowAsync(logicType, *args) as FlowStateMachineImpl return FlowHandle( id = stateMachine.id, - progress = stateMachine.logic.progressTracker?.changes ?: Observable.empty(), + progress = stateMachine.logic.track()?.second ?: Observable.empty(), returnValue = stateMachine.resultFuture.toObservable() ) } + + override fun attachmentExists(id: SecureHash) = services.storageService.attachments.openAttachment(id) != null + override fun uploadAttachment(jar: InputStream) = services.storageService.attachments.importAttachment(jar) + override fun currentNodeTime(): Instant = Instant.now(services.clock) + + override fun partyFromKey(key: CompositeKey) = services.identityService.partyFromKey(key) + override fun partyFromName(name: String) = services.identityService.partyFromName(name) + + companion object { + fun stateMachineInfoFromFlowStateMachineImpl(stateMachine: FlowStateMachineImpl<*>): StateMachineInfo { + return StateMachineInfo( + id = stateMachine.id, + flowLogicClassName = stateMachine.logic.javaClass.name, + progressTrackerStepAndUpdates = stateMachine.logic.track() + ) + } + + fun stateMachineUpdateFromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate { + return when (change.addOrRemove) { + AddOrRemove.ADD -> { + val stateMachineInfo = StateMachineInfo( + id = change.id, + flowLogicClassName = change.logic.javaClass.name, + progressTrackerStepAndUpdates = change.logic.track() + ) + StateMachineUpdate.Added(stateMachineInfo) + } + AddOrRemove.REMOVE -> { + StateMachineUpdate.Removed(change.id) + } + } + } + } } diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 9adc82ff3a..48778eb305 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -2,12 +2,14 @@ package net.corda.node.internal import com.codahale.metrics.JmxReporter import net.corda.core.div +import net.corda.core.getOrThrow +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.RPCOps import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.ServiceHub import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceType import net.corda.core.node.services.UniquenessProvider -import net.corda.core.then import net.corda.core.utilities.loggerFor import net.corda.node.printBasicNodeInfo import net.corda.node.serialization.NodeClock @@ -17,8 +19,9 @@ import net.corda.node.services.api.MessagingServiceInternal import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress import net.corda.node.services.messaging.ArtemisMessagingServer +import net.corda.node.services.messaging.CordaRPCClient import net.corda.node.services.messaging.NodeMessagingClient -import net.corda.node.services.messaging.RPCOps +import net.corda.node.services.startFlowPermission import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.RaftUniquenessProvider import net.corda.node.services.transactions.RaftValidatingNotaryService @@ -50,6 +53,7 @@ import java.util.* import javax.management.ObjectName import javax.servlet.* import kotlin.concurrent.thread +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER class ConfigurationException(message: String) : Exception(message) @@ -120,6 +124,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: override fun makeMessagingService(): MessagingServiceInternal { userService = RPCUserServiceImpl(configuration) + val serverAddr = with(configuration) { messagingServerAddress ?: { messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache, userService) @@ -146,7 +151,8 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: net.start(rpcOps, userService) } - private fun initWebServer(): Server { + // TODO: add flag to enable/disable webserver + private fun initWebServer(localRpc: CordaRPCOps): Server { // Note that the web server handlers will all run concurrently, and not on the node thread. val handlerCollection = HandlerCollection() @@ -167,7 +173,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: } // API, data upload and download to services (attachments, rates oracles etc) - handlerCollection.addHandler(buildServletContextHandler()) + handlerCollection.addHandler(buildServletContextHandler(localRpc)) val server = Server() @@ -204,7 +210,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: return server } - private fun buildServletContextHandler(): ServletContextHandler { + private fun buildServletContextHandler(localRpc: CordaRPCOps): ServletContextHandler { return ServletContextHandler().apply { contextPath = "/" setAttribute("node", this@Node) @@ -219,17 +225,11 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: val webAPIsOnClasspath = pluginRegistries.flatMap { x -> x.webApis } for (webapi in webAPIsOnClasspath) { - log.info("Add plugin web API from attachment ${webapi.name}") - val constructor = try { - webapi.getConstructor(ServiceHub::class.java) - } catch (ex: NoSuchMethodException) { - log.error("Missing constructor ${webapi.name}(ServiceHub)") - continue - } + log.info("Add plugin web API from attachment $webapi") val customAPI = try { - constructor.newInstance(services) + webapi.apply(localRpc) } catch (ex: InvocationTargetException) { - log.error("Constructor ${webapi.name}(ServiceHub) threw an error: ", ex.targetException) + log.error("Constructor $webapi threw an error: ", ex.targetException) continue } resourceConfig.register(customAPI) @@ -299,13 +299,20 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: super.initialiseDatabasePersistence(insideTransaction) } + private fun connectLocalRpcAsNodeUser(): CordaRPCOps { + val client = CordaRPCClient(configuration.artemisAddress, configuration) + client.start(NODE_USER, NODE_USER) + return client.proxy() + } + override fun start(): Node { alreadyRunningNodeCheck() super.start() // Only start the service API requests once the network map registration is complete - networkMapRegistrationFuture.then { + thread(name = "WebServer") { + networkMapRegistrationFuture.getOrThrow() try { - webServer = initWebServer() + webServer = initWebServer(connectLocalRpcAsNodeUser()) } catch(ex: Exception) { // TODO: We need to decide if this is a fatal error, given the API is unavailable, or whether the API // is not critical and we continue anyway. diff --git a/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt b/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt index f6954cab6c..5ef0ab4973 100644 --- a/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt +++ b/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt @@ -1,13 +1,14 @@ package net.corda.node.services -import net.corda.core.node.CordaPluginRegistry import net.corda.core.node.PluginServiceHub import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.flows.NotaryChangeFlow +import net.corda.core.node.CordaPluginRegistry +import java.util.function.Function object NotaryChange { class Plugin : CordaPluginRegistry() { - override val servicePlugins: List> = listOf(Service::class.java) + override val servicePlugins = listOf(Function(::Service)) } /** diff --git a/node/src/main/kotlin/net/corda/node/services/RPCUserService.kt b/node/src/main/kotlin/net/corda/node/services/RPCUserService.kt index e515717155..760f3fe8d3 100644 --- a/node/src/main/kotlin/net/corda/node/services/RPCUserService.kt +++ b/node/src/main/kotlin/net/corda/node/services/RPCUserService.kt @@ -28,5 +28,6 @@ data class User(val username: String, val password: String, val permissions: Set override fun toString(): String = "${javaClass.simpleName}($username, permissions=$permissions)" } -fun

> startFlowPermission(clazz: Class

) = "StartFlow.${clazz.name}" +fun startFlowPermission(className: String) = "StartFlow.$className" +fun

> startFlowPermission(clazz: Class

) = startFlowPermission(clazz.name) inline fun > startFlowPermission(): String = startFlowPermission(P::class.java) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index a6f6bd5a8f..076418a2c2 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -184,7 +184,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node securityRoles[P2P_QUEUE] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true)) securityRoles[RPC_REQUESTS_QUEUE] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true)) - for ((username) in userService.users) { + // TODO remove NODE_USER once webserver doesn't need it + val possibleClientUserNames = userService.users.map { it.username } + listOf(NODE_USER) + for (username in possibleClientUserNames) { securityRoles["$CLIENTS_PREFIX$username.rpc.*"] = setOf( nodeInternalRole, restrictedRole("$CLIENTS_PREFIX$username", consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true)) @@ -344,7 +346,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, principals += RolePrincipal("$CLIENTS_PREFIX$username") // This enables the RPC client to receive responses username } - principals += UserPrincipal(validatedUser) loginSucceeded = true diff --git a/client/src/main/kotlin/net/corda/client/CordaRPCClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClient.kt similarity index 91% rename from client/src/main/kotlin/net/corda/client/CordaRPCClient.kt rename to node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClient.kt index 6f28a20ffc..b5485a333c 100644 --- a/client/src/main/kotlin/net/corda/client/CordaRPCClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClient.kt @@ -1,26 +1,17 @@ -package net.corda.client +package net.corda.node.services.messaging import com.google.common.net.HostAndPort -import net.corda.client.impl.CordaRPCClientImpl import net.corda.core.ThreadBox +import net.corda.core.messaging.CordaRPCOps import net.corda.node.services.config.NodeSSLConfiguration -import net.corda.node.services.messaging.ArtemisMessagingComponent -import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX -import net.corda.node.services.messaging.CordaRPCOps -import net.corda.node.services.messaging.RPCException -import net.corda.node.services.messaging.rpcLog import org.apache.activemq.artemis.api.core.ActiveMQException -import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.api.core.client.ClientSessionFactory -import org.slf4j.LoggerFactory import rx.Observable import java.io.Closeable -import java.nio.file.Path import java.time.Duration import javax.annotation.concurrent.ThreadSafe -import kotlin.concurrent.thread /** * An RPC client connects to the specified server and allows you to make calls to the server that perform various diff --git a/client/src/main/kotlin/net/corda/client/impl/CordaRPCClientImpl.kt b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt similarity index 99% rename from client/src/main/kotlin/net/corda/client/impl/CordaRPCClientImpl.kt rename to node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt index dec85330fc..6d614d15e8 100644 --- a/client/src/main/kotlin/net/corda/client/impl/CordaRPCClientImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt @@ -1,4 +1,4 @@ -package net.corda.client.impl +package net.corda.node.services.messaging import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.KryoException @@ -6,14 +6,14 @@ import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output import com.google.common.cache.CacheBuilder -import net.corda.client.CordaRPCClient import net.corda.core.ErrorOr import net.corda.core.bufferUntilSubscribed +import net.corda.core.messaging.RPCOps +import net.corda.core.messaging.RPCReturnsObservables import net.corda.core.random63BitValue import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.debug -import net.corda.node.services.messaging.* import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.SimpleString diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index 7143569d7c..4ae5f53bb0 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -158,7 +158,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, session.createTemporaryQueue(NOTIFICATIONS_ADDRESS, RPC_QUEUE_REMOVALS_QUEUE, "_AMQ_NotifType = 1") rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE) rpcNotificationConsumer = session.createConsumer(RPC_QUEUE_REMOVALS_QUEUE) - rpcDispatcher = createRPCDispatcher(rpcOps, userService) + rpcDispatcher = createRPCDispatcher(rpcOps, userService, config.myLegalName) } } @@ -436,16 +436,17 @@ class NodeMessagingClient(override val config: NodeConfiguration, } } - private fun createRPCDispatcher(ops: RPCOps, userService: RPCUserService) = object : RPCDispatcher(ops, userService) { - override fun send(data: SerializedBytes<*>, toAddress: String) { - state.locked { - val msg = session!!.createMessage(false).apply { - writeBodyBufferBytes(data.bytes) - // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + private fun createRPCDispatcher(ops: RPCOps, userService: RPCUserService, nodeLegalName: String) = + object : RPCDispatcher(ops, userService, nodeLegalName) { + override fun send(data: SerializedBytes<*>, toAddress: String) { + state.locked { + val msg = session!!.createMessage(false).apply { + writeBodyBufferBytes(data.bytes) + // Use the magic deduplication property built into Artemis as our message identity too + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + } + producer!!.send(toAddress, msg) + } } - producer!!.send(toAddress, msg) } - } - } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt index 36abd15042..00cd7a657f 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt @@ -8,16 +8,22 @@ import com.esotericsoftware.kryo.io.Output import com.google.common.annotations.VisibleForTesting import com.google.common.collect.HashMultimap import net.corda.core.ErrorOr +import net.corda.core.messaging.RPCOps +import net.corda.core.messaging.RPCReturnsObservables import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.debug import net.corda.node.services.RPCUserService import net.corda.node.services.User +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.node.utilities.AffinityExecutor import org.apache.activemq.artemis.api.core.Message import org.apache.activemq.artemis.api.core.client.ClientConsumer import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.bouncycastle.asn1.x500.X500Name +import org.bouncycastle.asn1.x500.style.BCStyle import rx.Notification import rx.Observable import rx.Subscription @@ -30,7 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger * wrong system (you could just send a message). If you want complex customisation of how requests/responses * are handled, this is probably the wrong system. */ -abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService) { +// TODO remove the nodeLegalName parameter once the webserver doesn't need special privileges +abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService, val nodeLegalName: String) { // Throw an exception if there are overloaded methods private val methodTable = ops.javaClass.declaredMethods.groupBy { it.name }.mapValues { it.value.single() } @@ -153,9 +160,19 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService) { return ClientRPCRequestMessage(SerializedBytes(argBytes), replyTo, observationsTo, methodName, user) } + // TODO remove this User once webserver doesn't need it + val nodeUser = User(NODE_USER, NODE_USER, setOf()) @VisibleForTesting protected open fun getUser(message: ClientMessage): User { - return userService.getUser(message.requiredString(Message.HDR_VALIDATED_USER.toString()))!! + val validatedUser = message.requiredString(Message.HDR_VALIDATED_USER.toString()) + val rpcUser = userService.getUser(validatedUser) + if (rpcUser != null) { + return rpcUser + } else if (X500Name(validatedUser).getRDNs(BCStyle.CN).first().first.value.toString() == nodeLegalName) { + return nodeUser + } else { + throw IllegalArgumentException("Validated user '$validatedUser' is not an RPC user nor the NODE user") + } } private fun ClientMessage.getReturnAddress(user: User, property: String, required: Boolean): String? { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt index bbe9285964..5c433e1458 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt @@ -20,6 +20,9 @@ import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.flows.StateMachineRunId +import net.corda.core.messaging.FlowHandle +import net.corda.core.messaging.StateMachineInfo +import net.corda.core.messaging.StateMachineUpdate import net.corda.core.node.* import net.corda.core.node.services.* import net.corda.core.serialization.* @@ -28,6 +31,7 @@ import net.corda.core.transactions.WireTransaction import net.corda.flows.CashFlowResult import net.corda.node.internal.AbstractNode import net.corda.node.services.User +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER import net.i2p.crypto.eddsa.EdDSAPrivateKey import net.i2p.crypto.eddsa.EdDSAPublicKey import org.apache.activemq.artemis.api.core.SimpleString @@ -37,6 +41,7 @@ import org.slf4j.LoggerFactory import rx.Notification import rx.Observable import java.time.Instant +import java.time.LocalDateTime import java.util.* /** Global RPC logger */ @@ -45,15 +50,6 @@ val rpcLog: Logger by lazy { LoggerFactory.getLogger("net.corda.rpc") } /** Used in the RPC wire protocol to wrap an observation with the handle of the observable it's intended for. */ data class MarshalledObservation(val forHandle: Int, val what: Notification<*>) -/** - * If an RPC is tagged with this annotation it may return one or more observables anywhere in its response graph. - * Calling such a method comes with consequences: it's slower, and consumes server side resources as observations - * will buffer up on the server until they're consumed by the client. - */ -@Target(AnnotationTarget.FUNCTION) -@MustBeDocumented -annotation class RPCReturnsObservables - /** Records the protocol version in which this RPC was added. */ @Target(AnnotationTarget.FUNCTION) @MustBeDocumented @@ -74,15 +70,6 @@ data class ClientRPCRequestMessage( } } -/** - * Base interface that all RPC servers must implement. Note: in Corda there's only one RPC interface. This base - * interface is here in case we split the RPC system out into a separate library one day. - */ -interface RPCOps { - /** Returns the RPC protocol version. Exists since version 0 so guaranteed to be present. */ - val protocolVersion: Int -} - /** * This is available to RPC implementations to query the validated [User] that is calling it. Each user has a set of * permissions they're entitled to which can be used to control access. @@ -92,8 +79,11 @@ val CURRENT_RPC_USER: ThreadLocal = ThreadLocal() /** Helper method which checks that the current RPC user is entitled for the given permission. Throws a [PermissionException] otherwise. */ fun requirePermission(permission: String) { - if (permission !in CURRENT_RPC_USER.get().permissions) { - throw PermissionException("User not permissioned for $permission") + // TODO remove the NODE_USER condition once webserver doesn't need it + val currentUser = CURRENT_RPC_USER.get() + val currentUserPermissions = currentUser.permissions + if (currentUser.username != NODE_USER && permission !in currentUserPermissions) { + throw PermissionException("User not permissioned for $permission, permissions are $currentUserPermissions") } } @@ -236,6 +226,7 @@ private class RPCKryo(observableSerializer: Serializer>? = null) register(FlowHandle::class.java) register(KryoException::class.java) register(StringBuffer::class.java) + register(Unit::class.java) for ((_flow, argumentTypes) in AbstractNode.defaultFlowWhiteList) { for (type in argumentTypes) { register(type) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt index 7f6f559014..e9f56ec295 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt @@ -3,19 +3,20 @@ package net.corda.node.services.persistence import co.paralleluniverse.fibers.Suspendable import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic -import net.corda.core.node.CordaPluginRegistry import net.corda.core.node.PluginServiceHub import net.corda.core.node.recordTransactions import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.loggerFor import net.corda.flows.* +import net.corda.core.node.CordaPluginRegistry import java.io.InputStream import javax.annotation.concurrent.ThreadSafe +import java.util.function.Function object DataVending { class Plugin : CordaPluginRegistry() { - override val servicePlugins: List> = listOf(Service::class.java) + override val servicePlugins = listOf(Function(::Service)) } /** @@ -37,8 +38,6 @@ object DataVending { val logger = loggerFor() } - class TransactionRejectedError(msg: String) : Exception(msg) - init { services.registerFlowInitiator(FetchTransactionsFlow::class, ::FetchTransactionsHandler) services.registerFlowInitiator(FetchAttachmentsFlow::class, ::FetchAttachmentsHandler) diff --git a/node/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry b/node/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry index 5e1ec153c6..a76441b592 100644 --- a/node/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry +++ b/node/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry @@ -1,3 +1,3 @@ -# Register a ServiceLoader service extending from net.corda.node.CordaPluginRegistry +# Register a ServiceLoader service extending from net.corda.core.node.CordaPluginRegistry net.corda.node.services.NotaryChange$Plugin net.corda.node.services.persistence.DataVending$Plugin diff --git a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt index 718f398c6c..13588beab5 100644 --- a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt @@ -3,6 +3,8 @@ package net.corda.node import net.corda.contracts.asset.Cash import net.corda.core.contracts.* import net.corda.core.flows.StateMachineRunId +import net.corda.core.messaging.StateMachineUpdate +import net.corda.core.messaging.startFlow import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.Vault import net.corda.core.serialization.OpaqueBytes @@ -13,8 +15,6 @@ import net.corda.node.internal.CordaRPCOpsImpl import net.corda.node.services.User import net.corda.node.services.messaging.CURRENT_RPC_USER import net.corda.node.services.messaging.PermissionException -import net.corda.node.services.messaging.StateMachineUpdate -import net.corda.node.services.messaging.startFlow import net.corda.node.services.network.NetworkMapService import net.corda.node.services.startFlowPermission import net.corda.node.services.transactions.SimpleNotaryService diff --git a/client/src/test/kotlin/net/corda/client/ClientRPCInfrastructureTests.kt b/node/src/test/kotlin/net/corda/node/messaging/ClientRPCInfrastructureTests.kt similarity index 95% rename from client/src/test/kotlin/net/corda/client/ClientRPCInfrastructureTests.kt rename to node/src/test/kotlin/net/corda/node/messaging/ClientRPCInfrastructureTests.kt index 83a6b6321d..6aeb31f049 100644 --- a/client/src/test/kotlin/net/corda/client/ClientRPCInfrastructureTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/ClientRPCInfrastructureTests.kt @@ -1,12 +1,16 @@ -package net.corda.client +package net.corda.node.messaging -import net.corda.client.impl.CordaRPCClientImpl +import net.corda.core.messaging.RPCOps +import net.corda.core.messaging.RPCReturnsObservables import net.corda.core.serialization.SerializedBytes import net.corda.core.utilities.LogHelper import net.corda.node.services.RPCUserService import net.corda.node.services.User -import net.corda.node.services.messaging.* import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE +import net.corda.node.services.messaging.CURRENT_RPC_USER +import net.corda.node.services.messaging.CordaRPCClientImpl +import net.corda.node.services.messaging.RPCDispatcher +import net.corda.node.services.messaging.RPCSinceVersion import net.corda.node.utilities.AffinityExecutor import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.SimpleString @@ -67,7 +71,7 @@ class ClientRPCInfrastructureTests { override fun getUser(username: String): User? = throw UnsupportedOperationException() override val users: List get() = throw UnsupportedOperationException() } - val dispatcher = object : RPCDispatcher(TestOpsImpl(), userService) { + val dispatcher = object : RPCDispatcher(TestOpsImpl(), userService, "SomeName") { override fun send(data: SerializedBytes<*>, toAddress: String) { val msg = serverSession.createMessage(false).apply { writeBodyBufferBytes(data.bytes) diff --git a/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt index 04a22a9d0a..b474ce1138 100644 --- a/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt @@ -16,7 +16,7 @@ import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.NodeMessagingClient -import net.corda.node.services.messaging.RPCOps +import net.corda.core.messaging.RPCOps import net.corda.node.services.network.InMemoryNetworkMapCache import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.PersistentUniquenessProvider diff --git a/samples/attachment-demo/build.gradle b/samples/attachment-demo/build.gradle index a0f09c3d9e..ae1132d1de 100644 --- a/samples/attachment-demo/build.gradle +++ b/samples/attachment-demo/build.gradle @@ -47,8 +47,6 @@ dependencies { // Corda integration dependencies runtime project(path: ":node", configuration: 'runtimeArtifacts') compile project(':core') - compile project(':client') - compile project(':node') compile project(':test-utils') // Javax is required for webapis diff --git a/samples/attachment-demo/src/integration-test/kotlin/net/corda/attachmentdemo/AttachmentDemoTest.kt b/samples/attachment-demo/src/integration-test/kotlin/net/corda/attachmentdemo/AttachmentDemoTest.kt index 75d2083896..35e8fb701b 100644 --- a/samples/attachment-demo/src/integration-test/kotlin/net/corda/attachmentdemo/AttachmentDemoTest.kt +++ b/samples/attachment-demo/src/integration-test/kotlin/net/corda/attachmentdemo/AttachmentDemoTest.kt @@ -12,10 +12,12 @@ class AttachmentDemoTest { @Test fun `runs attachment demo`() { driver(dsl = { startNode("Notary", setOf(ServiceInfo(SimpleNotaryService.Companion.type))) - val nodeA = startNode("Bank A").getOrThrow() - val nodeAApiAddr = nodeA.config.getHostAndPort("webAddress") + val nodeAFuture = startNode("Bank A") val nodeBApiAddr = startNode("Bank B").getOrThrow().config.getHostAndPort("webAddress") + val nodeA = nodeAFuture.getOrThrow() + val nodeAApiAddr = nodeA.config.getHostAndPort("webAddress") + var recipientReturn: Boolean? = null var senderReturn: Boolean? = null val recipientThread = thread { diff --git a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/api/AttachmentDemoApi.kt b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/api/AttachmentDemoApi.kt index 6d85e8cabd..34c0aa177e 100644 --- a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/api/AttachmentDemoApi.kt +++ b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/api/AttachmentDemoApi.kt @@ -2,9 +2,8 @@ package net.corda.attachmentdemo.api import net.corda.core.contracts.TransactionType import net.corda.core.crypto.SecureHash -import net.corda.core.failure -import net.corda.core.node.ServiceHub -import net.corda.core.success +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.startFlow import net.corda.core.utilities.ApiUtils import net.corda.core.utilities.Emoji import net.corda.core.utilities.loggerFor @@ -17,8 +16,8 @@ import javax.ws.rs.core.Response import kotlin.test.assertEquals @Path("attachmentdemo") -class AttachmentDemoApi(val services: ServiceHub) { - private val utils = ApiUtils(services) +class AttachmentDemoApi(val rpc: CordaRPCOps) { + private val utils = ApiUtils(rpc) private companion object { val PROSPECTUS_HASH = SecureHash.parse("decd098666b9657314870e192ced0c3519c2c9d395507a238338f8d003929de9") @@ -32,9 +31,9 @@ class AttachmentDemoApi(val services: ServiceHub) { return utils.withParty(partyKey) { // Make sure we have the file in storage // TODO: We should have our own demo file, not share the trader demo file - if (services.storageService.attachments.openAttachment(PROSPECTUS_HASH) == null) { + if (!rpc.attachmentExists(PROSPECTUS_HASH)) { javaClass.classLoader.getResourceAsStream("bank-of-london-cp.jar").use { - val id = services.storageService.attachments.importAttachment(it) + val id = rpc.uploadAttachment(it) assertEquals(PROSPECTUS_HASH, id) } } @@ -42,18 +41,16 @@ class AttachmentDemoApi(val services: ServiceHub) { // Create a trivial transaction that just passes across the attachment - in normal cases there would be // inputs, outputs and commands that refer to this attachment. val ptx = TransactionType.General.Builder(notary = null) - ptx.addAttachment(services.storageService.attachments.openAttachment(PROSPECTUS_HASH)!!.id) + require(rpc.attachmentExists(PROSPECTUS_HASH)) + ptx.addAttachment(PROSPECTUS_HASH) // Despite not having any states, we have to have at least one signature on the transaction ptx.signWith(ALICE_KEY) // Send the transaction to the other recipient val tx = ptx.toSignedTransaction() - services.invokeFlowAsync(FinalityFlow::class.java, tx, setOf(it)).resultFuture.success { - println("Successfully sent attachment with the FinalityFlow") - }.failure { - logger.error("Failed to send attachment with the FinalityFlow") - } + val protocolHandle = rpc.startFlow(::FinalityFlow, tx, setOf(it)) + protocolHandle.returnValue.toBlocking().first() Response.accepted().build() } @@ -66,14 +63,14 @@ class AttachmentDemoApi(val services: ServiceHub) { val future = CompletableFuture() // Normally we would receive the transaction from a more specific flow, but in this case we let [FinalityFlow] // handle receiving it for us. - services.storageService.validatedTransactions.updates.subscribe { event -> + rpc.verifiedTransactions().second.subscribe { event -> // When the transaction is received, it's passed through [ResolveTransactionsFlow], which first fetches any // attachments for us, then verifies the transaction. As such, by the time it hits the validated transaction store, // we have a copy of the attachment. val tx = event.tx val response = if (tx.attachments.isNotEmpty()) { - val attachment = services.storageService.attachments.openAttachment(tx.attachments.first()) - assertEquals(PROSPECTUS_HASH, attachment?.id) + assertEquals(PROSPECTUS_HASH, tx.attachments.first()) + require(rpc.attachmentExists(PROSPECTUS_HASH)) println("File received - we're happy!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(event.tx)}") Response.ok().entity("Final transaction is: ${Emoji.renderIfSupported(event.tx)}").build() @@ -93,7 +90,8 @@ class AttachmentDemoApi(val services: ServiceHub) { @Path("other-side-key") @Produces(MediaType.APPLICATION_JSON) fun getOtherSide(): Response? { - val key = services.networkMapCache.partyNodes.first { it != services.myInfo }.legalIdentity.owningKey.toBase58String() + val myInfo = rpc.nodeIdentity() + val key = rpc.networkMapUpdates().first.first { it != myInfo }.legalIdentity.owningKey.toBase58String() return Response.ok().entity(key).build() } } diff --git a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/plugin/AttachmentDemoPlugin.kt b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/plugin/AttachmentDemoPlugin.kt index a5bfd72082..dc1c5206c7 100644 --- a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/plugin/AttachmentDemoPlugin.kt +++ b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/plugin/AttachmentDemoPlugin.kt @@ -1,16 +1,16 @@ package net.corda.attachmentdemo.plugin import net.corda.attachmentdemo.api.AttachmentDemoApi -import net.corda.core.node.CordaPluginRegistry import net.corda.core.transactions.SignedTransaction import net.corda.flows.FinalityFlow +import net.corda.core.node.CordaPluginRegistry +import java.util.function.Function class AttachmentDemoPlugin : CordaPluginRegistry() { // A list of classes that expose web APIs. - override val webApis: List> = listOf(AttachmentDemoApi::class.java) - // A list of flows that are required for this cordapp + override val webApis = listOf(Function(::AttachmentDemoApi)) + // A list of Flows that are required for this cordapp override val requiredFlows: Map> = mapOf( FinalityFlow::class.java.name to setOf(SignedTransaction::class.java.name, setOf(Unit).javaClass.name, setOf(Unit).javaClass.name) ) - override val servicePlugins: List> = listOf() } diff --git a/samples/attachment-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry b/samples/attachment-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry index 0e2e16a975..2c117fcac4 100644 --- a/samples/attachment-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry +++ b/samples/attachment-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry @@ -1,2 +1,2 @@ -# Register a ServiceLoader service extending from net.corda.node.CordaPluginRegistry +# Register a ServiceLoader service extending from net.corda.core.node.CordaPluginRegistry net.corda.attachmentdemo.plugin.AttachmentDemoPlugin diff --git a/samples/irs-demo/build.gradle b/samples/irs-demo/build.gradle index 27d3cd3ede..ff76afd800 100644 --- a/samples/irs-demo/build.gradle +++ b/samples/irs-demo/build.gradle @@ -50,8 +50,6 @@ dependencies { // Corda integration dependencies runtime project(path: ":node", configuration: 'runtimeArtifacts') compile project(':core') - compile project(':client') - compile project(':node') compile project(':finance') compile project(':test-utils') diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/api/InterestRateSwapAPI.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/api/InterestRateSwapAPI.kt index a23cbd59f4..559c163e6d 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/api/InterestRateSwapAPI.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/api/InterestRateSwapAPI.kt @@ -1,7 +1,8 @@ package net.corda.irs.api -import net.corda.core.node.ServiceHub -import net.corda.core.node.services.linearHeadsOfType +import net.corda.core.contracts.filterStatesOfType +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.startFlow import net.corda.core.utilities.loggerFor import net.corda.irs.contract.InterestRateSwap import net.corda.irs.flows.AutoOfferFlow @@ -10,6 +11,7 @@ import net.corda.irs.flows.UpdateBusinessDayFlow import java.net.URI import java.time.LocalDate import java.time.LocalDateTime +import java.time.ZoneId import javax.ws.rs.* import javax.ws.rs.core.MediaType import javax.ws.rs.core.Response @@ -35,23 +37,23 @@ import javax.ws.rs.core.Response * or if the demodate or population of deals should be reset (will only work while persistence is disabled). */ @Path("irs") -class InterestRateSwapAPI(val services: ServiceHub) { +class InterestRateSwapAPI(val rpc: CordaRPCOps) { private val logger = loggerFor() private fun generateDealLink(deal: InterestRateSwap.State) = "/api/irs/deals/" + deal.common.tradeID private fun getDealByRef(ref: String): InterestRateSwap.State? { - val states = services.vaultService.linearHeadsOfType().filterValues { it.state.data.ref == ref } + val states = rpc.vaultAndUpdates().first.filterStatesOfType().filter { it.state.data.ref == ref } return if (states.isEmpty()) null else { - val deals = states.values.map { it.state.data } + val deals = states.map { it.state.data } return if (deals.isEmpty()) null else deals[0] } } private fun getAllDeals(): Array { - val states = services.vaultService.linearHeadsOfType() - val swaps = states.values.map { it.state.data }.toTypedArray() + val states = rpc.vaultAndUpdates().first.filterStatesOfType() + val swaps = states.map { it.state.data }.toTypedArray() return swaps } @@ -65,7 +67,7 @@ class InterestRateSwapAPI(val services: ServiceHub) { @Consumes(MediaType.APPLICATION_JSON) fun storeDeal(newDeal: InterestRateSwap.State): Response { try { - services.invokeFlowAsync(AutoOfferFlow.Requester::class.java, newDeal).resultFuture.get() + rpc.startFlow(AutoOfferFlow::Requester, newDeal).returnValue.toBlocking().first() return Response.created(URI.create(generateDealLink(newDeal))).build() } catch (ex: Throwable) { logger.info("Exception when creating deal: $ex") @@ -92,7 +94,7 @@ class InterestRateSwapAPI(val services: ServiceHub) { val priorDemoDate = fetchDemoDate() // Can only move date forwards if (newDemoDate.isAfter(priorDemoDate)) { - services.invokeFlowAsync(UpdateBusinessDayFlow.Broadcast::class.java, newDemoDate).resultFuture.get() + rpc.startFlow(UpdateBusinessDayFlow::Broadcast, newDemoDate).returnValue.toBlocking().first() return Response.ok().build() } val msg = "demodate is already $priorDemoDate and can only be updated with a later date" @@ -104,14 +106,14 @@ class InterestRateSwapAPI(val services: ServiceHub) { @Path("demodate") @Produces(MediaType.APPLICATION_JSON) fun fetchDemoDate(): LocalDate { - return LocalDateTime.now(services.clock).toLocalDate() + return LocalDateTime.ofInstant(rpc.currentNodeTime(), ZoneId.systemDefault()).toLocalDate() } @PUT @Path("restart") @Consumes(MediaType.APPLICATION_JSON) fun exitServer(): Response { - services.invokeFlowAsync(ExitServerFlow.Broadcast::class.java, 83).resultFuture.get() + rpc.startFlow(ExitServerFlow::Broadcast, 83).returnValue.toBlocking().first() return Response.ok().build() } } diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/api/NodeInterestRates.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/api/NodeInterestRates.kt index 077def22e4..b172a9768d 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/api/NodeInterestRates.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/api/NodeInterestRates.kt @@ -31,6 +31,7 @@ import java.time.Duration import java.time.Instant import java.time.LocalDate import java.util.* +import java.util.function.Function import javax.annotation.concurrent.ThreadSafe /** @@ -49,8 +50,8 @@ object NodeInterestRates { * Register the flow that is used with the Fixing integration tests. */ class Plugin : CordaPluginRegistry() { - override val requiredFlows: Map> = mapOf(Pair(FixingFlow.FixingRoleDecider::class.java.name, setOf(Duration::class.java.name, StateRef::class.java.name))) - override val servicePlugins: List> = listOf(Service::class.java) + override val requiredFlows = mapOf(Pair(FixingFlow.FixingRoleDecider::class.java.name, setOf(Duration::class.java.name, StateRef::class.java.name))) + override val servicePlugins = listOf(Function(::Service)) } /** diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/AutoOfferFlow.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/AutoOfferFlow.kt index 4f800808aa..2d1cf59900 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/AutoOfferFlow.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/AutoOfferFlow.kt @@ -13,6 +13,7 @@ import net.corda.flows.TwoPartyDealFlow import net.corda.flows.TwoPartyDealFlow.Acceptor import net.corda.flows.TwoPartyDealFlow.AutoOffer import net.corda.flows.TwoPartyDealFlow.Instigator +import java.util.function.Function /** * This whole class is really part of a demo just to initiate the agreement of a deal with a simple @@ -24,7 +25,7 @@ import net.corda.flows.TwoPartyDealFlow.Instigator object AutoOfferFlow { class Plugin : CordaPluginRegistry() { - override val servicePlugins: List> = listOf(Service::class.java) + override val servicePlugins = listOf(Function(::Service)) } diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/ExitServerFlow.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/ExitServerFlow.kt index f15f34e5fd..044036d19c 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/ExitServerFlow.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/ExitServerFlow.kt @@ -9,6 +9,7 @@ import net.corda.core.node.NodeInfo import net.corda.core.node.PluginServiceHub import net.corda.testing.node.MockNetworkMapCache import java.util.concurrent.TimeUnit +import java.util.function.Function object ExitServerFlow { @@ -20,7 +21,7 @@ object ExitServerFlow { data class ExitMessage(val exitCode: Int) class Plugin : CordaPluginRegistry() { - override val servicePlugins: List> = listOf(Service::class.java) + override val servicePlugins = listOf(Function(::Service)) } class Service(services: PluginServiceHub) { diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/UpdateBusinessDayFlow.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/UpdateBusinessDayFlow.kt index ba8e9abde2..cd4a14b0d3 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/UpdateBusinessDayFlow.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/UpdateBusinessDayFlow.kt @@ -10,6 +10,7 @@ import net.corda.core.utilities.ProgressTracker import net.corda.node.utilities.TestClock import net.corda.testing.node.MockNetworkMapCache import java.time.LocalDate +import java.util.function.Function /** * This is a less temporary, demo-oriented way of initiating processing of temporal events. @@ -21,7 +22,7 @@ object UpdateBusinessDayFlow { data class UpdateBusinessDayMessage(val date: LocalDate) class Plugin : CordaPluginRegistry() { - override val servicePlugins: List> = listOf(Service::class.java) + override val servicePlugins = listOf(Function(::Service)) } class Service(services: PluginServiceHub) { @@ -38,8 +39,8 @@ object UpdateBusinessDayFlow { } - class Broadcast(val date: LocalDate, - override val progressTracker: ProgressTracker = Broadcast.tracker()) : FlowLogic() { + class Broadcast(val date: LocalDate, override val progressTracker: ProgressTracker) : FlowLogic() { + constructor(date: LocalDate) : this(date, tracker()) companion object { object NOTIFYING : ProgressTracker.Step("Notifying peers") diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/plugin/IRSPlugin.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/plugin/IRSPlugin.kt index a4cf88a8b7..9cecf80a61 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/plugin/IRSPlugin.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/plugin/IRSPlugin.kt @@ -10,17 +10,18 @@ import net.corda.irs.flows.ExitServerFlow import net.corda.irs.flows.FixingFlow import net.corda.irs.flows.UpdateBusinessDayFlow import java.time.Duration +import java.util.function.Function class IRSPlugin : CordaPluginRegistry() { - override val webApis: List> = listOf(InterestRateSwapAPI::class.java) + override val webApis = listOf(Function(::InterestRateSwapAPI)) override val staticServeDirs: Map = mapOf( "irsdemo" to javaClass.classLoader.getResource("irsweb").toExternalForm() ) - override val servicePlugins: List> = listOf(FixingFlow.Service::class.java) + override val servicePlugins = listOf(Function(FixingFlow::Service)) override val requiredFlows: Map> = mapOf( - Pair(AutoOfferFlow.Requester::class.java.name, setOf(InterestRateSwap.State::class.java.name)), - Pair(UpdateBusinessDayFlow.Broadcast::class.java.name, setOf(java.time.LocalDate::class.java.name)), - Pair(ExitServerFlow.Broadcast::class.java.name, setOf(kotlin.Int::class.java.name)), - Pair(FixingFlow.FixingRoleDecider::class.java.name, setOf(StateRef::class.java.name, Duration::class.java.name)), - Pair(FixingFlow.Floater::class.java.name, setOf(Party::class.java.name, FixingFlow.FixingSession::class.java.name))) + AutoOfferFlow.Requester::class.java.name to setOf(InterestRateSwap.State::class.java.name), + UpdateBusinessDayFlow.Broadcast::class.java.name to setOf(java.time.LocalDate::class.java.name), + ExitServerFlow.Broadcast::class.java.name to setOf(kotlin.Int::class.java.name), + FixingFlow.FixingRoleDecider::class.java.name to setOf(StateRef::class.java.name, Duration::class.java.name), + FixingFlow.Floater::class.java.name to setOf(Party::class.java.name, FixingFlow.FixingSession::class.java.name)) } diff --git a/samples/irs-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry b/samples/irs-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry index c3eae17451..27d759838b 100644 --- a/samples/irs-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry +++ b/samples/irs-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry @@ -1,4 +1,4 @@ -# Register a ServiceLoader service extending from net.corda.node.CordaPluginRegistry +# Register a ServiceLoader service extending from net.corda.core.node.CordaPluginRegistry net.corda.irs.plugin.IRSPlugin net.corda.irs.api.NodeInterestRates$Plugin net.corda.irs.flows.AutoOfferFlow$Plugin diff --git a/samples/network-visualiser/build.gradle b/samples/network-visualiser/build.gradle index fcfbfbde06..f070c6d410 100644 --- a/samples/network-visualiser/build.gradle +++ b/samples/network-visualiser/build.gradle @@ -23,8 +23,6 @@ dependencies { // Corda integration dependencies runtime project(path: ":node", configuration: 'runtimeArtifacts') compile project(':core') - compile project(':client') - compile project(':node') compile project(':finance') testCompile project(':test-utils') diff --git a/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/api/NotaryDemoApi.kt b/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/api/NotaryDemoApi.kt index e388310cc2..6f9e82ee93 100644 --- a/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/api/NotaryDemoApi.kt +++ b/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/api/NotaryDemoApi.kt @@ -3,10 +3,13 @@ package net.corda.notarydemo.api import net.corda.core.contracts.DummyContract import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.toStringShort +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.startFlow import net.corda.core.node.ServiceHub import net.corda.core.node.recordTransactions import net.corda.core.transactions.SignedTransaction import net.corda.flows.NotaryFlow +import net.corda.notarydemo.flows.DummyIssueAndMove import java.util.* import javax.ws.rs.GET import javax.ws.rs.Path @@ -14,17 +17,15 @@ import javax.ws.rs.PathParam import javax.ws.rs.core.Response @Path("notarydemo") -class NotaryDemoApi(val services: ServiceHub) { +class NotaryDemoApi(val rpc: CordaRPCOps) { private val notary by lazy { - services.networkMapCache.getAnyNotary() ?: throw IllegalStateException("No notary found on the network") + rpc.networkMapUpdates().first.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity } private val counterpartyNode by lazy { - services.networkMapCache.getNodeByLegalName("Counterparty") ?: throw IllegalStateException("Counterparty not found") + rpc.networkMapUpdates().first.first { it.legalIdentity.name == "Counterparty" } } - private val random = Random() - @GET @Path("/notarise/{count}") fun notarise(@PathParam("count") count: Int): Response { @@ -41,25 +42,10 @@ class NotaryDemoApi(val services: ServiceHub) { * as it consumes the original asset and creates a copy with the new owner as its output. */ private fun buildTransactions(count: Int): List { - val myIdentity = services.myInfo.legalIdentity - val myKeyPair = services.legalIdentityKey val moveTransactions = (1..count).map { - // Self issue an asset - val issueTx = DummyContract.generateInitial(myIdentity.ref(0), random.nextInt(), notary).apply { - signWith(myKeyPair) - } - services.recordTransactions(issueTx.toSignedTransaction()) - // Move ownership of the asset to the counterparty - val counterPartyKey = counterpartyNode.legalIdentity.owningKey - val asset = issueTx.toWireTransaction().outRef(0) - val moveTx = DummyContract.move(asset, counterPartyKey).apply { - signWith(myKeyPair) - } - // We don't check signatures because we know that the notary's signature is missing - moveTx.toSignedTransaction(checkSufficientSignatures = false) + rpc.startFlow(::DummyIssueAndMove, notary, counterpartyNode.legalIdentity).returnValue.toBlocking().toFuture() } - - return moveTransactions + return moveTransactions.map { it.get() } } /** @@ -70,8 +56,7 @@ class NotaryDemoApi(val services: ServiceHub) { */ private fun notariseTransactions(transactions: List): List { val signatureFutures = transactions.map { - val protocol = NotaryFlow.Client::class.java - services.invokeFlowAsync(protocol, it).resultFuture + rpc.startFlow(NotaryFlow::Client, it).returnValue.toBlocking().toFuture() } val signers = signatureFutures.map { it.get().by.toStringShort() } return signers diff --git a/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/flows/DummyIssueAndMove.kt b/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/flows/DummyIssueAndMove.kt new file mode 100644 index 0000000000..6858396298 --- /dev/null +++ b/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/flows/DummyIssueAndMove.kt @@ -0,0 +1,30 @@ +package net.corda.notarydemo.flows + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.contracts.DummyContract +import net.corda.core.crypto.Party +import net.corda.core.flows.FlowLogic +import net.corda.core.node.recordTransactions +import net.corda.core.transactions.SignedTransaction +import java.util.* + +class DummyIssueAndMove(private val notary: Party, private val counterpartyNode: Party) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + val random = Random() + val myKeyPair = serviceHub.legalIdentityKey + // Self issue an asset + val issueTx = DummyContract.generateInitial(serviceHub.myInfo.legalIdentity.ref(0), random.nextInt(), notary).apply { + signWith(myKeyPair) + } + serviceHub.recordTransactions(issueTx.toSignedTransaction()) + // Move ownership of the asset to the counterparty + val counterPartyKey = counterpartyNode.owningKey + val asset = issueTx.toWireTransaction().outRef(0) + val moveTx = DummyContract.move(asset, counterPartyKey).apply { + signWith(myKeyPair) + } + // We don't check signatures because we know that the notary's signature is missing + return moveTx.toSignedTransaction(checkSufficientSignatures = false) + } +} diff --git a/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/plugin/NotaryDemoPlugin.kt b/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/plugin/NotaryDemoPlugin.kt index be78442533..598de289dc 100644 --- a/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/plugin/NotaryDemoPlugin.kt +++ b/samples/raft-notary-demo/src/main/kotlin/net/corda/notarydemo/plugin/NotaryDemoPlugin.kt @@ -1,16 +1,19 @@ package net.corda.notarydemo.plugin +import net.corda.core.crypto.Party import net.corda.core.node.CordaPluginRegistry import net.corda.core.transactions.SignedTransaction import net.corda.flows.NotaryFlow import net.corda.notarydemo.api.NotaryDemoApi +import net.corda.notarydemo.flows.DummyIssueAndMove +import java.util.function.Function class NotaryDemoPlugin : CordaPluginRegistry() { // A list of classes that expose web APIs. - override val webApis: List> = listOf(NotaryDemoApi::class.java) + override val webApis = listOf(Function(::NotaryDemoApi)) // A list of protocols that are required for this cordapp - override val requiredFlows: Map> = mapOf( - NotaryFlow.Client::class.java.name to setOf(SignedTransaction::class.java.name, setOf(Unit).javaClass.name) + override val requiredFlows = mapOf( + NotaryFlow.Client::class.java.name to setOf(SignedTransaction::class.java.name, setOf(Unit).javaClass.name), + DummyIssueAndMove::class.java.name to setOf(Party::class.java.name) ) - override val servicePlugins: List> = listOf() } diff --git a/samples/simm-valuation-demo/build.gradle b/samples/simm-valuation-demo/build.gradle index 23c54c3359..9a1a4f829e 100644 --- a/samples/simm-valuation-demo/build.gradle +++ b/samples/simm-valuation-demo/build.gradle @@ -43,7 +43,6 @@ dependencies { // Corda integration dependencies runtime project(path: ":node", configuration: 'runtimeArtifacts') compile project(':core') - compile project(':client') compile project(':node') compile project(':finance') testCompile project(':test-utils') diff --git a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/api/PortfolioApi.kt b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/api/PortfolioApi.kt index 205923ad82..fb1bac56df 100644 --- a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/api/PortfolioApi.kt +++ b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/api/PortfolioApi.kt @@ -1,11 +1,13 @@ package net.corda.vega.api import com.opengamma.strata.basics.currency.MultiCurrencyAmount +import net.corda.core.contracts.DealState import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.filterStatesOfType import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.Party -import net.corda.core.node.ServiceHub -import net.corda.core.node.services.dealsWith +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.startFlow import net.corda.vega.analytics.InitialMarginTriple import net.corda.vega.contracts.IRSState import net.corda.vega.contracts.PortfolioState @@ -17,23 +19,29 @@ import net.corda.vega.portfolio.toPortfolio import net.corda.vega.portfolio.toStateAndRef import java.time.LocalDate import java.time.LocalDateTime +import java.time.ZoneId import javax.ws.rs.* import javax.ws.rs.core.MediaType import javax.ws.rs.core.Response //TODO: Change import namespaces vega -> .... + @Path("simmvaluationdemo") -class PortfolioApi(val services: ServiceHub) { - private val ownParty: Party get() = services.myInfo.legalIdentity +class PortfolioApi(val rpc: CordaRPCOps) { + private val ownParty: Party get() = rpc.nodeIdentity().legalIdentity private val portfolioUtils = PortfolioApiUtils(ownParty) + private inline fun dealsWith(party: Party): List> { + return rpc.vaultAndUpdates().first.filterStatesOfType().filter { it.state.data.parties.any { it == party } } + } + /** * DSL to get a party and then executing the passed function with the party as a parameter. * Used as such: withParty(name) { doSomethingWith(it) } */ private fun withParty(partyName: String, func: (Party) -> Response): Response { - val otherParty = services.identityService.partyFromKey(CompositeKey.parseFromBase58(partyName)) + val otherParty = rpc.partyFromKey(CompositeKey.parseFromBase58(partyName)) return if (otherParty != null) { func(otherParty) } else { @@ -57,13 +65,13 @@ class PortfolioApi(val services: ServiceHub) { /** * Gets all existing IRSStates with the party provided. */ - private fun getTradesWith(party: Party) = services.vaultService.dealsWith(party) + private fun getTradesWith(party: Party) = dealsWith(party) /** * Gets the most recent portfolio state, or null if not extant, with the party provided. */ private fun getPortfolioWith(party: Party): PortfolioState? { - val portfolios = services.vaultService.dealsWith(party) + val portfolios = dealsWith(party) // Can have at most one between any two parties with the current no split portfolio model require(portfolios.size < 2) { "This API currently only supports one portfolio with a counterparty" } return portfolios.firstOrNull()?.state?.data @@ -75,7 +83,7 @@ class PortfolioApi(val services: ServiceHub) { * @warning Do not call if you have not agreed a portfolio with the other party. */ private fun getPortfolioStateAndRefWith(party: Party): StateAndRef { - val portfolios = services.vaultService.dealsWith(party) + val portfolios = dealsWith(party) // Can have at most one between any two parties with the current no split portfolio model require(portfolios.size < 2) { "This API currently only supports one portfolio with a counterparty" } return portfolios.first() @@ -92,7 +100,7 @@ class PortfolioApi(val services: ServiceHub) { fun getBusinessDate(): Any { return json { obj( - "business-date" to LocalDateTime.now(services.clock).toLocalDate() + "business-date" to LocalDateTime.ofInstant(rpc.currentNodeTime(), ZoneId.systemDefault()).toLocalDate() ) } } @@ -106,13 +114,13 @@ class PortfolioApi(val services: ServiceHub) { @Produces(MediaType.APPLICATION_JSON) fun getPartyTrades(@PathParam("party") partyName: String): Response { return withParty(partyName) { - val states = services.vaultService.dealsWith(it) + val states = dealsWith(it) val latestPortfolioStateRef: StateAndRef var latestPortfolioStateData: PortfolioState? = null var PVs: Map? = null var IMs: Map? = null - if (services.vaultService.dealsWith(it).isNotEmpty()) { - latestPortfolioStateRef = services.vaultService.dealsWith(it).last() + if (dealsWith(it).isNotEmpty()) { + latestPortfolioStateRef = dealsWith(it).last() latestPortfolioStateData = latestPortfolioStateRef.state.data PVs = latestPortfolioStateData.valuation?.presentValues IMs = latestPortfolioStateData.valuation?.imContributionMap @@ -121,9 +129,9 @@ class PortfolioApi(val services: ServiceHub) { val swaps = states.map { it.state.data.swap } Response.ok().entity(swaps.map { it.toView(ownParty, - latestPortfolioStateData?.portfolio?.toStateAndRef(services)?.toPortfolio(), - PVs?.get(it.id.second) ?: MultiCurrencyAmount.empty(), - IMs?.get(it.id.second) ?: InitialMarginTriple.zero() + latestPortfolioStateData?.portfolio?.toStateAndRef(rpc)?.toPortfolio(), + PVs?.get(it.id.second.toString()) ?: MultiCurrencyAmount.empty(), + IMs?.get(it.id.second.toString()) ?: InitialMarginTriple.zero() ) }).build() } @@ -137,7 +145,7 @@ class PortfolioApi(val services: ServiceHub) { @Produces(MediaType.APPLICATION_JSON) fun getPartyTrade(@PathParam("party") partyName: String, @PathParam("tradeId") tradeId: String): Response { return withParty(partyName) { - val states = services.vaultService.dealsWith(it) + val states = dealsWith(it) val tradeState = states.first { it.state.data.swap.id.second == tradeId }.state.data Response.ok().entity(portfolioUtils.createTradeView(tradeState)).build() } @@ -153,7 +161,7 @@ class PortfolioApi(val services: ServiceHub) { return withParty(partyName) { val buyer = if (swap.buySell.isBuy) ownParty else it val seller = if (swap.buySell.isSell) ownParty else it - services.invokeFlowAsync(IRSTradeFlow.Requester::class.java, swap.toData(buyer, seller), it).resultFuture.get() + rpc.startFlow(IRSTradeFlow::Requester, swap.toData(buyer, seller), it).returnValue.toBlocking().first() Response.accepted().entity("{}").build() } } @@ -169,7 +177,7 @@ class PortfolioApi(val services: ServiceHub) { fun getPartyPortfolioValuations(@PathParam("party") partyName: String): Response { return withParty(partyName) { otherParty -> withPortfolio(otherParty) { portfolioState -> - val portfolio = portfolioState.portfolio.toStateAndRef(services).toPortfolio() + val portfolio = portfolioState.portfolio.toStateAndRef(rpc).toPortfolio() Response.ok().entity(portfolioUtils.createValuations(portfolioState, portfolio)).build() } } @@ -238,7 +246,7 @@ class PortfolioApi(val services: ServiceHub) { @Path("whoami") @Produces(MediaType.APPLICATION_JSON) fun getWhoAmI(): Any { - val counterParties = services.networkMapCache.partyNodes.filter { it.legalIdentity.name != "NetworkMapService" && it.legalIdentity.name != "Notary" && it.legalIdentity.name != ownParty.name } + val counterParties = rpc.networkMapUpdates().first.filter { it.legalIdentity.name != "NetworkMapService" && it.legalIdentity.name != "Notary" && it.legalIdentity.name != ownParty.name } return json { obj( "self" to obj( @@ -268,13 +276,14 @@ class PortfolioApi(val services: ServiceHub) { return withParty(partyName) { otherParty -> val existingSwap = getPortfolioWith(otherParty) if (existingSwap == null) { - services.invokeFlowAsync(SimmFlow.Requester::class.java, otherParty, params.valuationDate).resultFuture.get() + rpc.startFlow(SimmFlow::Requester, otherParty, params.valuationDate).returnValue.toBlocking().first() } else { - services.invokeFlowAsync(SimmRevaluation.Initiator::class.java, getPortfolioStateAndRefWith(otherParty).ref, params.valuationDate).resultFuture.get() + val handle = rpc.startFlow(SimmRevaluation::Initiator, getPortfolioStateAndRefWith(otherParty).ref, params.valuationDate) + handle.returnValue.toBlocking().first() } withPortfolio(otherParty) { portfolioState -> - val portfolio = portfolioState.portfolio.toStateAndRef(services).toPortfolio() + val portfolio = portfolioState.portfolio.toStateAndRef(rpc).toPortfolio() Response.ok().entity(portfolioUtils.createValuations(portfolioState, portfolio)).build() } } diff --git a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt index 0eebdf2fac..1da82f1f5d 100644 --- a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt +++ b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt @@ -47,8 +47,10 @@ object SimmFlow { */ class Requester(val otherParty: Party, val valuationDate: LocalDate, - val existing: StateAndRef? = null) - : FlowLogic>() { + val existing: StateAndRef?) + : FlowLogic>() { + constructor(otherParty: Party, valuationDate: LocalDate) : this(otherParty, valuationDate, null) + lateinit var myIdentity: Party lateinit var notary: Party @@ -69,7 +71,7 @@ object SimmFlow { val portfolioStateRef = serviceHub.vaultService.dealsWith(otherParty).first() val state = updateValuation(portfolioStateRef) logger.info("SimmFlow done") - return state; + return state } @Suspendable diff --git a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/portfolio/Portfolio.kt b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/portfolio/Portfolio.kt index 3b6be1c0d7..b7b326035f 100644 --- a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/portfolio/Portfolio.kt +++ b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/portfolio/Portfolio.kt @@ -1,10 +1,8 @@ package net.corda.vega.portfolio -import net.corda.core.contracts.ContractState -import net.corda.core.contracts.StateAndRef -import net.corda.core.contracts.StateRef -import net.corda.core.contracts.TransactionState +import net.corda.core.contracts.* import net.corda.core.crypto.Party +import net.corda.core.messaging.CordaRPCOps import net.corda.core.node.ServiceHub import net.corda.core.sum import net.corda.vega.contracts.IRSState @@ -34,6 +32,11 @@ fun List>.toPortfolio(): Portfolio { return Portfolio(this) } +inline fun List.toStateAndRef(rpc: CordaRPCOps): List> { + val stateRefs = rpc.vaultAndUpdates().first.associateBy { it.ref } + return mapNotNull { stateRefs[it] }.filterStatesOfType() +} + // TODO: This should probably have its generics fixed and moved into the core platform API. @Suppress("UNCHECKED_CAST") fun List.toStateAndRef(services: ServiceHub): List> { diff --git a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/services/SimmService.kt b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/services/SimmService.kt index 47e0d59562..cf75f41f88 100644 --- a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/services/SimmService.kt +++ b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/services/SimmService.kt @@ -9,6 +9,7 @@ import net.corda.vega.flows.IRSTradeFlow import net.corda.vega.flows.SimmFlow import net.corda.vega.flows.SimmRevaluation import java.time.LocalDate +import java.util.function.Function /** * [SimmService] is the object that makes available the flows and services for the Simm agreement / evaluation flow @@ -17,14 +18,12 @@ import java.time.LocalDate */ object SimmService { class Plugin : CordaPluginRegistry() { - override val webApis: List> = listOf(PortfolioApi::class.java) + override val webApis = listOf(Function(::PortfolioApi)) override val requiredFlows: Map> = mapOf( SimmFlow.Requester::class.java.name to setOf(Party::class.java.name, LocalDate::class.java.name), SimmRevaluation.Initiator::class.java.name to setOf(StateRef::class.java.name, LocalDate::class.java.name), IRSTradeFlow.Requester::class.java.name to setOf(SwapData::class.java.name, Party::class.java.name)) - override val servicePlugins: List> = listOf( - SimmFlow.Service::class.java, - IRSTradeFlow.Service::class.java) override val staticServeDirs: Map = mapOf("simmvaluationdemo" to javaClass.classLoader.getResource("simmvaluationweb").toExternalForm()) + override val servicePlugins = listOf(Function(SimmFlow::Service), Function(IRSTradeFlow::Service)) } } diff --git a/samples/simm-valuation-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry b/samples/simm-valuation-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry index 7ea1f8d0e6..bdfd21fedd 100644 --- a/samples/simm-valuation-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry +++ b/samples/simm-valuation-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry @@ -1,2 +1,2 @@ -# Register a ServiceLoader service extending from net.corda.node.CordaPluginRegistry +# Register a ServiceLoader service extending from net.corda.core.node.CordaPluginRegistry net.corda.vega.services.SimmService$Plugin diff --git a/samples/trader-demo/build.gradle b/samples/trader-demo/build.gradle index c1be618df3..9fa03f942e 100644 --- a/samples/trader-demo/build.gradle +++ b/samples/trader-demo/build.gradle @@ -47,8 +47,6 @@ dependencies { // Corda integration dependencies runtime project(path: ":node", configuration: 'runtimeArtifacts') compile project(':core') - compile project(':client') - compile project(':node') compile project(':finance') compile project(':test-utils') diff --git a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/api/TraderDemoApi.kt b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/api/TraderDemoApi.kt index a812e561bb..b089ec6862 100644 --- a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/api/TraderDemoApi.kt +++ b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/api/TraderDemoApi.kt @@ -1,12 +1,17 @@ package net.corda.traderdemo.api -import net.corda.contracts.testing.fillWithSomeTestCash +import net.corda.contracts.testing.calculateRandomlySizedAmounts import net.corda.core.contracts.DOLLARS -import net.corda.core.node.ServiceHub -import net.corda.core.transactions.SignedTransaction +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.startFlow +import net.corda.core.serialization.OpaqueBytes import net.corda.core.utilities.Emoji import net.corda.core.utilities.loggerFor +import net.corda.flows.CashCommand +import net.corda.flows.CashFlow +import net.corda.flows.CashFlowResult import net.corda.traderdemo.flow.SellerFlow +import java.util.* import javax.ws.rs.* import javax.ws.rs.core.MediaType import javax.ws.rs.core.Response @@ -14,7 +19,7 @@ import kotlin.test.assertEquals // API is accessible from /api/traderdemo. All paths specified below are relative to it. @Path("traderdemo") -class TraderDemoApi(val services: ServiceHub) { +class TraderDemoApi(val rpc: CordaRPCOps) { data class TestCashParams(val amount: Int, val notary: String) data class SellParams(val amount: Int) private companion object { @@ -29,10 +34,20 @@ class TraderDemoApi(val services: ServiceHub) { @Path("create-test-cash") @Consumes(MediaType.APPLICATION_JSON) fun createTestCash(params: TestCashParams): Response { - val notary = services.networkMapCache.notaryNodes.single { it.legalIdentity.name == params.notary }.notaryIdentity - services.fillWithSomeTestCash(params.amount.DOLLARS, - outputNotary = notary, - ownedBy = services.myInfo.legalIdentity.owningKey) + val notary = rpc.networkMapUpdates().first.first { it.legalIdentity.name == params.notary } + val me = rpc.nodeIdentity() + val amounts = calculateRandomlySizedAmounts(params.amount.DOLLARS, 3, 10, Random()) + val handles = amounts.map { + rpc.startFlow(::CashFlow, CashCommand.IssueCash( + amount = params.amount.DOLLARS, + issueRef = OpaqueBytes.of(1), + recipient = me.legalIdentity, + notary = notary.notaryIdentity + )) + } + handles.forEach { + require(it.returnValue.toBlocking().first() is CashFlowResult.Success) + } return Response.status(Response.Status.CREATED).build() } @@ -40,7 +55,7 @@ class TraderDemoApi(val services: ServiceHub) { @Path("{party}/sell-cash") @Consumes(MediaType.APPLICATION_JSON) fun sellCash(params: SellParams, @PathParam("party") partyName: String): Response { - val otherParty = services.identityService.partyFromName(partyName) + val otherParty = rpc.partyFromName(partyName) if (otherParty != null) { // The seller will sell some commercial paper to the buyer, who will pay with (self issued) cash. // @@ -48,15 +63,15 @@ class TraderDemoApi(val services: ServiceHub) { // attachment. Make sure we have the transaction prospectus attachment loaded into our store. // // This can also be done via an HTTP upload, but here we short-circuit and do it from code. - if (services.storageService.attachments.openAttachment(SellerFlow.PROSPECTUS_HASH) == null) { + if (!rpc.attachmentExists(SellerFlow.PROSPECTUS_HASH)) { javaClass.classLoader.getResourceAsStream("bank-of-london-cp.jar").use { - val id = services.storageService.attachments.importAttachment(it) + val id = rpc.uploadAttachment(it) assertEquals(SellerFlow.PROSPECTUS_HASH, id) } } // The line below blocks and waits for the future to resolve. - val stx = services.invokeFlowAsync(SellerFlow::class.java, otherParty, params.amount.DOLLARS).resultFuture.get() + val stx = rpc.startFlow(::SellerFlow, otherParty, params.amount.DOLLARS).returnValue.toBlocking().first() logger.info("Sale completed - we have a happy customer!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(stx.tx)}") return Response.status(Response.Status.OK).build() } else { diff --git a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/SellerFlow.kt b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/SellerFlow.kt index 9966f23a12..6ae1c8f5d7 100644 --- a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/SellerFlow.kt +++ b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/SellerFlow.kt @@ -18,7 +18,9 @@ import java.util.* class SellerFlow(val otherParty: Party, val amount: Amount, - override val progressTracker: ProgressTracker = tracker()) : FlowLogic() { + override val progressTracker: ProgressTracker) : FlowLogic() { + constructor(otherParty: Party, amount: Amount) : this(otherParty, amount, tracker()) + companion object { val PROSPECTUS_HASH = SecureHash.parse("decd098666b9657314870e192ced0c3519c2c9d395507a238338f8d003929de9") diff --git a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/plugin/TraderDemoPlugin.kt b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/plugin/TraderDemoPlugin.kt index d0f0e31761..b429a80b1a 100644 --- a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/plugin/TraderDemoPlugin.kt +++ b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/plugin/TraderDemoPlugin.kt @@ -6,13 +6,14 @@ import net.corda.core.node.CordaPluginRegistry import net.corda.traderdemo.api.TraderDemoApi import net.corda.traderdemo.flow.BuyerFlow import net.corda.traderdemo.flow.SellerFlow +import java.util.function.Function class TraderDemoPlugin : CordaPluginRegistry() { // A list of classes that expose web APIs. - override val webApis: List> = listOf(TraderDemoApi::class.java) - // A list of flows that are required for this cordapp + override val webApis = listOf(Function(::TraderDemoApi)) + // A list of Flows that are required for this cordapp override val requiredFlows: Map> = mapOf( SellerFlow::class.java.name to setOf(Party::class.java.name, Amount::class.java.name) ) - override val servicePlugins: List> = listOf(BuyerFlow.Service::class.java) + override val servicePlugins = listOf(Function(BuyerFlow::Service)) } diff --git a/samples/trader-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry b/samples/trader-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry index 8fdc026ec2..8ac62a0cd8 100644 --- a/samples/trader-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry +++ b/samples/trader-demo/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry @@ -1,2 +1,2 @@ -# Register a ServiceLoader service extending from net.corda.node.CordaPluginRegistry +# Register a ServiceLoader service extending from net.corda.core.node.CordaPluginRegistry net.corda.traderdemo.plugin.TraderDemoPlugin diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index c90a0d75db..1bafa5fc3f 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -15,7 +15,7 @@ import net.corda.node.internal.AbstractNode import net.corda.node.services.api.MessagingServiceInternal import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.keys.E2ETestKeyManagementService -import net.corda.node.services.messaging.RPCOps +import net.corda.core.messaging.RPCOps import net.corda.node.services.network.InMemoryNetworkMapService import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.InMemoryUniquenessProvider diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt index 381c5edeae..abb2564860 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt @@ -46,11 +46,11 @@ abstract class NodeBasedTest { )) } - private fun startNode(legalName: String, config: Map): Node { + private fun startNode(legalName: String, configOverrides: Map): Node { val config = ConfigHelper.loadConfig( baseDirectoryPath = tempFolder.newFolder(legalName).toPath(), allowMissingConfig = true, - configOverrides = config + mapOf( + configOverrides = configOverrides + mapOf( "myLegalName" to legalName, "artemisAddress" to freeLocalHostAndPort().toString(), "extraAdvertisedServiceIds" to "" diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt index 6d78a66cb7..ed7238d0b8 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt @@ -8,10 +8,10 @@ import javafx.scene.control.ButtonType import javafx.scene.image.Image import javafx.stage.Stage import jfxtras.resources.JFXtrasFontRoboto -import net.corda.client.CordaRPCClient import net.corda.client.mock.EventGenerator import net.corda.client.model.Models import net.corda.client.model.observableValue +import net.corda.core.messaging.startFlow import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceType import net.corda.explorer.model.CordaViewModel @@ -24,7 +24,7 @@ import net.corda.node.driver.driver import net.corda.node.services.User import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.messaging.ArtemisMessagingComponent -import net.corda.node.services.messaging.startFlow +import net.corda.node.services.messaging.CordaRPCClient import net.corda.node.services.startFlowPermission import net.corda.node.services.transactions.SimpleNotaryService import org.apache.commons.lang.SystemUtils diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt index aea56e6014..1aae570fcf 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt @@ -12,6 +12,7 @@ import net.corda.client.fxutils.unique import net.corda.client.model.* import net.corda.core.contracts.* import net.corda.core.crypto.Party +import net.corda.core.messaging.startFlow import net.corda.core.node.NodeInfo import net.corda.core.serialization.OpaqueBytes import net.corda.explorer.model.CashTransaction @@ -21,7 +22,6 @@ import net.corda.explorer.views.stringConverter import net.corda.flows.CashCommand import net.corda.flows.CashFlow import net.corda.flows.CashFlowResult -import net.corda.node.services.messaging.startFlow import org.controlsfx.dialog.ExceptionDialog import tornadofx.Fragment import tornadofx.booleanBinding diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/ConnectionManager.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/ConnectionManager.kt index e3ff259c11..f44a16b744 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/ConnectionManager.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/ConnectionManager.kt @@ -7,12 +7,12 @@ import com.jcraft.jsch.agentproxy.connector.SSHAgentConnector import com.jcraft.jsch.agentproxy.usocket.JNAUSocketFactory import kotlinx.support.jdk8.collections.parallelStream import kotlinx.support.jdk8.streams.toList -import net.corda.client.CordaRPCClient import net.corda.core.createDirectories import net.corda.core.div +import net.corda.core.messaging.CordaRPCOps import net.corda.node.driver.PortAllocation import net.corda.node.services.config.NodeSSLConfiguration -import net.corda.node.services.messaging.CordaRPCOps +import net.corda.node.services.messaging.CordaRPCClient import org.slf4j.LoggerFactory import java.io.ByteArrayOutputStream import java.io.Closeable diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt index 111cb963ca..e50256132e 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt @@ -7,13 +7,13 @@ import net.corda.core.contracts.Issued import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.USD import net.corda.core.crypto.Party +import net.corda.core.messaging.startFlow import net.corda.core.serialization.OpaqueBytes import net.corda.flows.CashCommand import net.corda.flows.CashFlow import net.corda.flows.CashFlowResult import net.corda.loadtest.LoadTest import net.corda.loadtest.NodeHandle -import net.corda.node.services.messaging.startFlow import org.slf4j.LoggerFactory import java.util.* diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt index a6cb04a9c9..d8201bab0d 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt @@ -7,12 +7,12 @@ import net.corda.client.mock.replicatePoisson import net.corda.contracts.asset.Cash import net.corda.core.contracts.USD import net.corda.core.crypto.Party +import net.corda.core.messaging.startFlow import net.corda.flows.CashCommand import net.corda.flows.CashFlow import net.corda.flows.CashFlowResult import net.corda.loadtest.LoadTest import net.corda.loadtest.NodeHandle -import net.corda.node.services.messaging.startFlow import org.slf4j.LoggerFactory import java.util.*