From 913487cb3265d503e5bc8752d06a6c21c51ba007 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Tue, 25 Apr 2017 12:21:15 +0100 Subject: [PATCH] Deprecated FlowLogic.getCounterpartyMarker as it's complicated and probably not used (replacement is to use sub-flows). Also made flow registration require the client flow class rather than any old class. --- .../kotlin/net/corda/core/flows/FlowLogic.kt | 15 +++---- .../net/corda/core/node/PluginServiceHub.kt | 31 ++++++------- .../serialization/DefaultKryoCustomizer.kt | 1 + .../net/corda/core/serialization/Kryo.kt | 11 +++++ .../net/corda/flows/TwoPartyDealFlow.kt | 22 +-------- .../net/corda/core/flows/FlowsInJavaTest.java | 2 +- .../kotlin/net/corda/core/flows/TxKeyFlow.kt | 2 +- .../corda/docs/FxTransactionBuildTutorial.kt | 2 +- .../docs/WorkflowTransactionBuildTutorial.kt | 2 +- .../main/kotlin/net/corda/flows/IssuerFlow.kt | 2 +- .../kotlin/net/corda/nodeapi/RPCStructures.kt | 15 ------- .../services/messaging/MQSecurityTest.kt | 2 +- .../net/corda/node/internal/AbstractNode.kt | 17 ++++--- .../node/services/NotaryChangeService.kt | 2 +- .../node/services/api/ServiceHubInternal.kt | 6 ++- .../persistence/DataVendingService.kt | 6 +-- .../statemachine/FlowStateMachineImpl.kt | 21 ++++++--- .../services/statemachine/SessionMessage.kt | 5 ++- .../statemachine/StateMachineManager.kt | 22 +++------ .../services/transactions/NotaryService.kt | 2 +- .../node/services/MockServiceHubInternal.kt | 11 +---- .../persistence/DataVendingServiceTests.kt | 2 +- .../statemachine/StateMachineManagerTests.kt | 45 ++++++++++++------- .../net/corda/irs/api/NodeInterestRates.kt | 4 +- .../net/corda/irs/flows/AutoOfferFlow.kt | 2 +- .../kotlin/net/corda/irs/flows/FixingFlow.kt | 2 +- .../corda/irs/flows/UpdateBusinessDayFlow.kt | 2 +- .../net/corda/vega/flows/IRSTradeFlow.kt | 2 +- .../kotlin/net/corda/vega/flows/SimmFlow.kt | 2 +- .../net/corda/traderdemo/flow/BuyerFlow.kt | 2 +- .../kotlin/net/corda/testing/CoreTestUtils.kt | 6 +-- 31 files changed, 123 insertions(+), 145 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 55c8bac354..c4a6d96ca4 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -42,11 +42,9 @@ abstract class FlowLogic { */ val serviceHub: ServiceHub get() = stateMachine.serviceHub - /** - * Return the marker [Class] which [party] has used to register the counterparty flow that is to execute on the - * other side. The default implementation returns the class object of this FlowLogic, but any [Class] instance - * will do as long as the other side registers with it. - */ + @Deprecated("This is no longer used and will be removed in a future release. If you are using this to communicate " + + "with the same party but for two different message streams, then the correct way of doing that is to use sub-flows", + level = DeprecationLevel.ERROR) open fun getCounterpartyMarker(party: Party): Class<*> = javaClass /** @@ -190,9 +188,10 @@ abstract class FlowLogic { private var _stateMachine: FlowStateMachine<*>? = null /** - * Internal only. Reference to the [Fiber] instance that is the top level controller for the entire flow. When - * inside a flow this is equivalent to [Strand.currentStrand]. This is public only because it must be accessed - * across module boundaries. + * @suppress + * Internal only. Reference to the [co.paralleluniverse.fibers.Fiber] instance that is the top level controller for + * the entire flow. When inside a flow this is equivalent to [co.paralleluniverse.strands.Strand.currentStrand]. This + * is public only because it must be accessed across module boundaries. */ var stateMachine: FlowStateMachine<*> get() = _stateMachine ?: throw IllegalStateException("This can only be done after the flow has been started.") diff --git a/core/src/main/kotlin/net/corda/core/node/PluginServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/PluginServiceHub.kt index d27756a355..f8b6ede9e9 100644 --- a/core/src/main/kotlin/net/corda/core/node/PluginServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/PluginServiceHub.kt @@ -2,31 +2,26 @@ package net.corda.core.node import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic -import kotlin.reflect.KClass /** * A service hub to be used by the [CordaPluginRegistry] */ interface PluginServiceHub : ServiceHub { /** - * Register the flow factory we wish to use when a initiating party attempts to communicate with us. The - * registration is done against a marker [Class] which is sent in the session handshake by the other party. If this - * marker class has been registered then the corresponding factory will be used to create the flow which will - * communicate with the other side. If there is no mapping then the session attempt is rejected. - * @param markerClass The marker [Class] present in a session initiation attempt. Conventionally this is a [FlowLogic] - * subclass, however any class can be used, with the default being the class of the initiating flow. This enables - * the registration to be of the form: `registerFlowInitiator(InitiatorFlow.class, InitiatedFlow::new)` - * @param flowFactory The flow factory generating the initiated flow. + * Register the service flow factory to use when an initiating party attempts to communicate with us. The registration + * is done against the [Class] object of the client flow to the service flow. What this means is if a counterparty + * starts a [FlowLogic] represented by [clientFlowClass] and starts communication with us, we will execute the service + * flow produced by [serviceFlowFactory]. This service flow has respond correctly to the sends and receives the client + * does. + * @param clientFlowClass [Class] of the client flow involved in this client-server communication. + * @param serviceFlowFactory Lambda which produces a new service flow for each new client flow communication. The + * [Party] parameter of the factory is the client's identity. */ - fun registerFlowInitiator(markerClass: Class<*>, flowFactory: (Party) -> FlowLogic<*>) + fun registerServiceFlow(clientFlowClass: Class>, serviceFlowFactory: (Party) -> FlowLogic<*>) - @Deprecated(message = "Use overloaded method which uses Class instead of KClass. This is scheduled for removal in a future release.") - fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) { - registerFlowInitiator(markerClass.java, flowFactory) + @Suppress("UNCHECKED_CAST") + @Deprecated("This is scheduled to be removed in a future release", ReplaceWith("registerServiceFlow")) + fun registerFlowInitiator(markerClass: Class<*>, flowFactory: (Party) -> FlowLogic<*>) { + registerServiceFlow(markerClass as Class>, flowFactory) } - - /** - * Return the flow factory that has been registered with [markerClass], or null if no factory is found. - */ - fun getFlowFactory(markerClass: Class<*>): ((Party) -> FlowLogic<*>)? } diff --git a/core/src/main/kotlin/net/corda/core/serialization/DefaultKryoCustomizer.kt b/core/src/main/kotlin/net/corda/core/serialization/DefaultKryoCustomizer.kt index 4b9735c2b0..0bac6a20cb 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/DefaultKryoCustomizer.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/DefaultKryoCustomizer.kt @@ -82,6 +82,7 @@ object DefaultKryoCustomizer { register(MetaData::class.java, MetaDataSerializer) register(BitSet::class.java, BitSetSerializer()) + register(Class::class.java, ClassSerializer) addDefaultSerializer(Logger::class.java, LoggerSerializer) diff --git a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt index 470a88aa5e..153a1ac496 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt @@ -565,6 +565,17 @@ object LoggerSerializer : Serializer() { } } +object ClassSerializer : Serializer>() { + override fun read(kryo: Kryo, input: Input, type: Class>): Class<*> { + val className = input.readString() + return Class.forName(className) + } + + override fun write(kryo: Kryo, output: Output, clazz: Class<*>) { + output.writeString(clazz.name) + } +} + /** * For serialising an [X500Name] without touching Sun internal classes. */ diff --git a/core/src/main/kotlin/net/corda/flows/TwoPartyDealFlow.kt b/core/src/main/kotlin/net/corda/flows/TwoPartyDealFlow.kt index 3adbe4551e..cc31c183a9 100644 --- a/core/src/main/kotlin/net/corda/flows/TwoPartyDealFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/TwoPartyDealFlow.kt @@ -1,13 +1,10 @@ package net.corda.flows import co.paralleluniverse.fibers.Suspendable -import net.corda.core.contracts.ContractState import net.corda.core.contracts.DealState -import net.corda.core.contracts.StateRef import net.corda.core.crypto.* import net.corda.core.flows.FlowLogic import net.corda.core.node.NodeInfo -import net.corda.core.node.services.ServiceType import net.corda.core.seconds import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction @@ -38,12 +35,6 @@ object TwoPartyDealFlow { @CordaSerializable class SignaturesFromPrimary(val sellerSig: DigitalSignature.WithKey, val notarySigs: List) - /** - * [Primary] at the end sends the signed tx to all the regulator parties. This a seperate workflow which needs a - * sepearate session with the regulator. This interface is used to do that in [Primary.getCounterpartyMarker]. - */ - interface MarkerForBogusRegulatorFlow - /** * Abstracted bilateral deal flow participant that initiates communication/handshake. * @@ -69,14 +60,6 @@ object TwoPartyDealFlow { abstract val otherParty: Party abstract val myKeyPair: KeyPair - override fun getCounterpartyMarker(party: Party): Class<*> { - return if (serviceHub.networkMapCache.regulatorNodes.any { it.legalIdentity == party }) { - MarkerForBogusRegulatorFlow::class.java - } else { - super.getCounterpartyMarker(party) - } - } - @Suspendable fun getPartialTransaction(): UntrustworthyData { progressTracker.currentStep = AWAITING_PROPOSAL @@ -146,9 +129,8 @@ object TwoPartyDealFlow { progressTracker.currentStep = COPYING_TO_REGULATOR val regulators = serviceHub.networkMapCache.regulatorNodes if (regulators.isNotEmpty()) { - // Copy the transaction to every regulator in the network. This is obviously completely bogus, it's - // just for demo purposes. - regulators.forEach { send(it.serviceIdentities(ServiceType.regulator).first(), fullySigned) } + // If there are regulators in the network, then we could copy them in on the transaction via a sub-flow + // which would simply send them the transaction. } return fullySigned diff --git a/core/src/test/java/net/corda/core/flows/FlowsInJavaTest.java b/core/src/test/java/net/corda/core/flows/FlowsInJavaTest.java index 1d204f9a31..197c3bb79d 100644 --- a/core/src/test/java/net/corda/core/flows/FlowsInJavaTest.java +++ b/core/src/test/java/net/corda/core/flows/FlowsInJavaTest.java @@ -30,7 +30,7 @@ public class FlowsInJavaTest { @Test public void suspendableActionInsideUnwrap() throws Exception { - node2.getServices().registerFlowInitiator(SendInUnwrapFlow.class, (otherParty) -> new OtherFlow(otherParty, "Hello")); + node2.getServices().registerServiceFlow(SendInUnwrapFlow.class, (otherParty) -> new OtherFlow(otherParty, "Hello")); Future result = node1.getServices().startFlow(new SendInUnwrapFlow(node2.getInfo().getLegalIdentity())).getResultFuture(); net.runNetwork(); assertThat(result.get()).isEqualTo("Hello"); diff --git a/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt b/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt index 603369ac9a..921bb2abdb 100644 --- a/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt +++ b/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt @@ -15,7 +15,7 @@ import java.security.cert.Certificate */ object TxKeyFlow { fun registerFlowInitiator(services: PluginServiceHub) { - services.registerFlowInitiator(Requester::class.java, ::Provider) + services.registerServiceFlow(Requester::class.java, ::Provider) } class Requester(val otherSide: Party, diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt index 98f13339b3..0181264fd5 100644 --- a/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt @@ -24,7 +24,7 @@ import java.util.* object FxTransactionDemoTutorial { // Would normally be called by custom service init in a CorDapp fun registerFxProtocols(pluginHub: PluginServiceHub) { - pluginHub.registerFlowInitiator(ForeignExchangeFlow::class.java, ::ForeignExchangeRemoteFlow) + pluginHub.registerServiceFlow(ForeignExchangeFlow::class.java, ::ForeignExchangeRemoteFlow) } } diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt index 53cfff65f4..371891fe32 100644 --- a/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt @@ -17,7 +17,7 @@ import java.time.Duration object WorkflowTransactionBuildTutorial { // Would normally be called by custom service init in a CorDapp fun registerWorkflowProtocols(pluginHub: PluginServiceHub) { - pluginHub.registerFlowInitiator(SubmitCompletionFlow::class.java, ::RecordCompletionFlow) + pluginHub.registerServiceFlow(SubmitCompletionFlow::class.java, ::RecordCompletionFlow) } } diff --git a/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt b/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt index 8dcbd20226..4e74498b7d 100644 --- a/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt +++ b/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt @@ -96,7 +96,7 @@ object IssuerFlow { class Service(services: PluginServiceHub) { init { - services.registerFlowInitiator(IssuanceRequester::class.java, ::Issuer) + services.registerServiceFlow(IssuanceRequester::class.java, ::Issuer) } } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/RPCStructures.kt b/node-api/src/main/kotlin/net/corda/nodeapi/RPCStructures.kt index 396027eb74..171a1151dd 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/RPCStructures.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/RPCStructures.kt @@ -2,11 +2,8 @@ package net.corda.nodeapi -import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Registration import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output import com.google.common.util.concurrent.ListenableFuture import net.corda.core.flows.FlowException import net.corda.core.serialization.* @@ -71,17 +68,6 @@ open class RPCException(msg: String, cause: Throwable?) : RuntimeException(msg, class DeadlineExceeded(rpcName: String) : RPCException("Deadline exceeded on call to $rpcName") } -object ClassSerializer : Serializer>() { - override fun read(kryo: Kryo, input: Input, type: Class>): Class<*> { - val className = input.readString() - return Class.forName(className) - } - - override fun write(kryo: Kryo, output: Output, clazz: Class<*>) { - output.writeString(clazz.name) - } -} - @CordaSerializable class PermissionException(msg: String) : RuntimeException(msg) @@ -99,7 +85,6 @@ class RPCKryo(observableSerializer: Serializer>) : CordaKryo(mak DefaultKryoCustomizer.customize(this) // RPC specific classes - register(Class::class.java, ClassSerializer) register(MultipartStream.ItemInputStream::class.java, InputStreamSerializer) register(MarshalledObservation::class.java, ImmutableClassSerializer(MarshalledObservation::class)) register(Observable::class.java, observableSerializer) diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt index ad58b4a7e2..db794bba72 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt @@ -230,7 +230,7 @@ abstract class MQSecurityTest : NodeBasedTest() { private fun startBobAndCommunicateWithAlice(): Party { val bob = startNode(BOB.name).getOrThrow() - bob.services.registerFlowInitiator(SendFlow::class.java, ::ReceiveFlow) + bob.services.registerServiceFlow(SendFlow::class.java, ::ReceiveFlow) val bobParty = bob.info.legalIdentity // Perform a protocol exchange to force the peer queue to be created alice.services.startFlow(SendFlow(bobParty, 0)).resultFuture.getOrThrow() diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index b9550ad036..d51081a738 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -52,7 +52,6 @@ import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.transaction import org.apache.activemq.artemis.utils.ReusableLatch -import org.bouncycastle.asn1.x500.X500Name import org.jetbrains.exposed.sql.Database import org.slf4j.Logger import java.io.IOException @@ -108,7 +107,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, // low-performance prototyping period. protected abstract val serverThread: AffinityExecutor - private val flowFactories = ConcurrentHashMap, (Party) -> FlowLogic<*>>() + private val serviceFlowFactories = ConcurrentHashMap, (Party) -> FlowLogic<*>>() protected val partyKeys = mutableSetOf() val services = object : ServiceHubInternal() { @@ -132,14 +131,14 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, return serverThread.fetchFrom { smm.add(logic, flowInitiator) } } - override fun registerFlowInitiator(markerClass: Class<*>, flowFactory: (Party) -> FlowLogic<*>) { - require(markerClass !in flowFactories) { "${markerClass.name} has already been used to register a flow" } - log.info("Registering flow ${markerClass.name}") - flowFactories[markerClass] = flowFactory + override fun registerServiceFlow(clientFlowClass: Class>, serviceFlowFactory: (Party) -> FlowLogic<*>) { + require(clientFlowClass !in serviceFlowFactories) { "${clientFlowClass.name} has already been used to register a service flow" } + log.info("Registering service flow for ${clientFlowClass.name}") + serviceFlowFactories[clientFlowClass] = serviceFlowFactory } - override fun getFlowFactory(markerClass: Class<*>): ((Party) -> FlowLogic<*>)? { - return flowFactories[markerClass] + override fun getServiceFlowFactory(clientFlowClass: Class>): ((Party) -> FlowLogic<*>)? { + return serviceFlowFactories[clientFlowClass] } override fun recordTransactions(txs: Iterable) { @@ -236,7 +235,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, false } startMessagingService(rpcOps) - services.registerFlowInitiator(ContractUpgradeFlow.Instigator::class.java) { ContractUpgradeFlow.Acceptor(it) } + services.registerServiceFlow(ContractUpgradeFlow.Instigator::class.java) { ContractUpgradeFlow.Acceptor(it) } runOnStop += Runnable { net.stop() } _networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured()) smm.start() diff --git a/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt b/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt index be52e5ffe6..4115ff6dfd 100644 --- a/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt +++ b/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt @@ -17,7 +17,7 @@ object NotaryChange { */ class Service(services: PluginServiceHub) : SingletonSerializeAsToken() { init { - services.registerFlowInitiator(NotaryChangeFlow.Instigator::class.java) { NotaryChangeFlow.Acceptor(it) } + services.registerServiceFlow(NotaryChangeFlow.Instigator::class.java) { NotaryChangeFlow.Acceptor(it) } } } } diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 6be557f461..56c5c7247e 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -2,6 +2,7 @@ package net.corda.node.services.api import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ListenableFuture +import net.corda.core.crypto.Party import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogicRefFactory @@ -85,7 +86,8 @@ abstract class ServiceHubInternal : PluginServiceHub { * Note that you must be on the server thread to call this method. [flowInitiator] points how flow was started, * See: [FlowInitiator]. * - * @throws IllegalFlowLogicException or IllegalArgumentException if there are problems with the [logicType] or [args]. + * @throws net.corda.core.flows.IllegalFlowLogicException or IllegalArgumentException if there are problems with the + * [logicType] or [args]. */ fun invokeFlowAsync( logicType: Class>, @@ -96,4 +98,6 @@ abstract class ServiceHubInternal : PluginServiceHub { val logic = flowLogicRefFactory.toFlowLogic(logicRef) as FlowLogic return startFlow(logic, flowInitiator) } + + abstract fun getServiceFlowFactory(clientFlowClass: Class>): ((Party) -> FlowLogic<*>)? } diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt index 1c881236f6..e4f8c77ed2 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt @@ -34,9 +34,9 @@ object DataVending { @ThreadSafe class Service(services: PluginServiceHub) : SingletonSerializeAsToken() { init { - services.registerFlowInitiator(FetchTransactionsFlow::class.java, ::FetchTransactionsHandler) - services.registerFlowInitiator(FetchAttachmentsFlow::class.java, ::FetchAttachmentsHandler) - services.registerFlowInitiator(BroadcastTransactionFlow::class.java, ::NotifyTransactionHandler) + services.registerServiceFlow(FetchTransactionsFlow::class.java, ::FetchTransactionsHandler) + services.registerServiceFlow(FetchAttachmentsFlow::class.java, ::FetchAttachmentsHandler) + services.registerServiceFlow(BroadcastTransactionFlow::class.java, ::NotifyTransactionHandler) } private class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler(otherParty) { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 9089d3ac72..67fcb9a621 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -11,11 +11,7 @@ import net.corda.core.ErrorOr import net.corda.core.abbreviate import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash -import net.corda.core.flows.FlowException -import net.corda.core.flows.FlowInitiator -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.FlowStateMachine -import net.corda.core.flows.StateMachineRunId +import net.corda.core.flows.* import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.FlowProgressHandle import net.corda.core.random63BitValue @@ -34,6 +30,7 @@ import org.jetbrains.exposed.sql.transactions.TransactionManager import org.slf4j.Logger import org.slf4j.LoggerFactory import rx.Observable +import java.lang.reflect.Modifier import java.sql.Connection import java.sql.SQLException import java.util.* @@ -304,8 +301,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, logger.trace { "Initiating a new session with $otherParty" } val session = FlowSession(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty)) openSessions[Pair(sessionFlow, otherParty)] = session - val counterpartyFlow = sessionFlow.getCounterpartyMarker(otherParty).name - val sessionInit = SessionInit(session.ourSessionId, counterpartyFlow, firstPayload) + // We get the top-most concrete class object to cater for the case where the client flow is customised via a sub-class + val clientFlowClass = sessionFlow.topConcreteFlowClass + val sessionInit = SessionInit(session.ourSessionId, clientFlowClass, firstPayload) sendInternal(session, sessionInit) if (waitForConfirmation) { session.waitForConfirmation() @@ -313,6 +311,15 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, return session } + @Suppress("UNCHECKED_CAST") + private val FlowLogic<*>.topConcreteFlowClass: Class> get() { + var current: Class> = javaClass + while (!Modifier.isAbstract(current.superclass.modifiers)) { + current = current.superclass as Class> + } + return current + } + @Suspendable private fun waitForMessage(receiveRequest: ReceiveRequest): ReceivedSessionMessage { return receiveRequest.suspendAndExpectReceive().confirmReceiveType(receiveRequest) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt index 5b74b0330d..d84dab7988 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt @@ -2,6 +2,7 @@ package net.corda.node.services.statemachine import net.corda.core.crypto.Party import net.corda.core.flows.FlowException +import net.corda.core.flows.FlowLogic import net.corda.core.serialization.CordaSerializable import net.corda.core.utilities.UntrustworthyData @@ -12,7 +13,9 @@ import net.corda.core.utilities.UntrustworthyData @CordaSerializable interface SessionMessage -data class SessionInit(val initiatorSessionId: Long, val flowName: String, val firstPayload: Any?) : SessionMessage +data class SessionInit(val initiatorSessionId: Long, + val clientFlowClass: Class>, + val firstPayload: Any?) : SessionMessage interface ExistingSessionMessage : SessionMessage { val recipientSessionId: Long diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 811af28e35..d5a4a41f4b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -19,11 +19,7 @@ import net.corda.core.bufferUntilSubscribed import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.crypto.commonName -import net.corda.core.flows.FlowInitiator -import net.corda.core.flows.FlowException -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.FlowStateMachine -import net.corda.core.flows.StateMachineRunId +import net.corda.core.flows.* import net.corda.core.messaging.ReceivedMessage import net.corda.core.messaging.TopicSession import net.corda.core.messaging.send @@ -345,18 +341,10 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, fun sendSessionReject(message: String) = sendSessionMessage(sender, SessionReject(otherPartySessionId, message)) - val markerClass = try { - Class.forName(sessionInit.flowName) - } catch (e: Exception) { - logger.warn("Received invalid $sessionInit", e) - sendSessionReject("Don't know ${sessionInit.flowName}") - return - } - - val flowFactory = serviceHub.getFlowFactory(markerClass) + val flowFactory = serviceHub.getServiceFlowFactory(sessionInit.clientFlowClass) if (flowFactory == null) { - logger.warn("Unknown flow marker class in $sessionInit") - sendSessionReject("Don't know ${markerClass.name}") + logger.warn("${sessionInit.clientFlowClass} has not been registered with a service flow: $sessionInit") + sendSessionReject("Don't know ${sessionInit.clientFlowClass.name}") return } @@ -378,7 +366,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } sendSessionMessage(sender, SessionConfirm(otherPartySessionId, session.ourSessionId), session.fiber) - session.fiber.logger.debug { "Initiated by $sender using marker ${markerClass.name}" } + session.fiber.logger.debug { "Initiated by $sender using ${sessionInit.clientFlowClass.name}" } session.fiber.logger.trace { "Initiated from $sessionInit on $session" } resumeFiber(session.fiber) } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt index 7b80da2bd4..621e27737d 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt @@ -18,7 +18,7 @@ import net.corda.node.services.api.ServiceHubInternal abstract class NotaryService(services: ServiceHubInternal) : SingletonSerializeAsToken() { init { - services.registerFlowInitiator(NotaryFlow.Client::class.java) { createFlow(it) } + services.registerServiceFlow(NotaryFlow.Client::class.java) { createFlow(it) } } /** Implement a factory that specifies the transaction commit flow for the notary service to use */ diff --git a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt index dededeada0..8fe78212f5 100644 --- a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt +++ b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt @@ -22,7 +22,6 @@ import net.corda.testing.MOCK_IDENTITY_SERVICE import net.corda.testing.node.MockNetworkMapCache import net.corda.testing.node.MockStorageService import java.time.Clock -import java.util.concurrent.ConcurrentHashMap open class MockServiceHubInternal( val customVault: VaultService? = null, @@ -68,8 +67,6 @@ open class MockServiceHubInternal( private val txStorageService: TxWritableStorageService get() = storage ?: throw UnsupportedOperationException() - private val flowFactories = ConcurrentHashMap, (Party) -> FlowLogic<*>>() - lateinit var smm: StateMachineManager init { @@ -86,11 +83,7 @@ open class MockServiceHubInternal( return smm.executor.fetchFrom { smm.add(logic, flowInitiator) } } - override fun registerFlowInitiator(markerClass: Class<*>, flowFactory: (Party) -> FlowLogic<*>) { - flowFactories[markerClass] = flowFactory - } + override fun registerServiceFlow(clientFlowClass: Class>, serviceFlowFactory: (Party) -> FlowLogic<*>) = Unit - override fun getFlowFactory(markerClass: Class<*>): ((Party) -> FlowLogic<*>)? { - return flowFactories[markerClass] - } + override fun getServiceFlowFactory(clientFlowClass: Class>): ((Party) -> FlowLogic<*>)? = null } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt index 4538d64654..97f60a1ccb 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt @@ -89,7 +89,7 @@ class DataVendingServiceTests { } private fun MockNode.sendNotifyTx(tx: SignedTransaction, walletServiceNode: MockNode) { - walletServiceNode.services.registerFlowInitiator(NotifyTxFlow::class.java, ::NotifyTransactionHandler) + walletServiceNode.services.registerServiceFlow(NotifyTxFlow::class.java, ::NotifyTransactionHandler) services.startFlow(NotifyTxFlow(walletServiceNode.info.legalIdentity, tx)) network.runNetwork() } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt index cc5c5f2b03..2b903c9b60 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt @@ -8,7 +8,6 @@ import net.corda.core.* import net.corda.core.contracts.DOLLARS import net.corda.core.contracts.DummyState import net.corda.core.crypto.Party -import net.corda.core.crypto.X509Utilities import net.corda.core.crypto.generateKeyPair import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic @@ -111,7 +110,7 @@ class StateMachineManagerTests { @Test fun `exception while fiber suspended`() { - node2.services.registerFlowInitiator(ReceiveFlow::class.java) { SendFlow("Hello", it) } + node2.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow("Hello", it) } val flow = ReceiveFlow(node2.info.legalIdentity) val fiber = node1.services.startFlow(flow) as FlowStateMachineImpl // Before the flow runs change the suspend action to throw an exception @@ -130,7 +129,7 @@ class StateMachineManagerTests { @Test fun `flow restarted just after receiving payload`() { - node2.services.registerFlowInitiator(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() } + node2.services.registerServiceFlow(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() } node1.services.startFlow(SendFlow("Hello", node2.info.legalIdentity)) // We push through just enough messages to get only the payload sent @@ -180,7 +179,7 @@ class StateMachineManagerTests { @Test fun `flow loaded from checkpoint will respond to messages from before start`() { - node1.services.registerFlowInitiator(ReceiveFlow::class.java) { SendFlow("Hello", it) } + node1.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow("Hello", it) } node2.services.startFlow(ReceiveFlow(node1.info.legalIdentity).nonTerminating()) // Prepare checkpointed receive flow // Make sure the add() has finished initial processing. node2.smm.executor.flush() @@ -244,8 +243,8 @@ class StateMachineManagerTests { fun `sending to multiple parties`() { val node3 = net.createNode(node1.info.address) net.runNetwork() - node2.services.registerFlowInitiator(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() } - node3.services.registerFlowInitiator(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() } + node2.services.registerServiceFlow(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() } + node3.services.registerServiceFlow(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() } val payload = "Hello World" node1.services.startFlow(SendFlow(payload, node2.info.legalIdentity, node3.info.legalIdentity)) net.runNetwork() @@ -278,8 +277,8 @@ class StateMachineManagerTests { net.runNetwork() val node2Payload = "Test 1" val node3Payload = "Test 2" - node2.services.registerFlowInitiator(ReceiveFlow::class.java) { SendFlow(node2Payload, it) } - node3.services.registerFlowInitiator(ReceiveFlow::class.java) { SendFlow(node3Payload, it) } + node2.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow(node2Payload, it) } + node3.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow(node3Payload, it) } val multiReceiveFlow = ReceiveFlow(node2.info.legalIdentity, node3.info.legalIdentity).nonTerminating() node1.services.startFlow(multiReceiveFlow) node1.acceptableLiveFiberCountOnStop = 1 @@ -304,7 +303,7 @@ class StateMachineManagerTests { @Test fun `both sides do a send as their first IO request`() { - node2.services.registerFlowInitiator(PingPongFlow::class.java) { PingPongFlow(it, 20L) } + node2.services.registerServiceFlow(PingPongFlow::class.java) { PingPongFlow(it, 20L) } node1.services.startFlow(PingPongFlow(node2.info.legalIdentity, 10L)) net.runNetwork() @@ -340,7 +339,7 @@ class StateMachineManagerTests { sessionTransfers.expectEvents(isStrict = false) { sequence( // First Pay - expect(match = { it.message is SessionInit && it.message.flowName == NotaryFlow.Client::class.java.name }) { + expect(match = { it.message is SessionInit && it.message.clientFlowClass == NotaryFlow.Client::class.java }) { it.message as SessionInit assertEquals(node1.id, it.from) assertEquals(notary1Address, it.to) @@ -350,7 +349,7 @@ class StateMachineManagerTests { assertEquals(notary1.id, it.from) }, // Second pay - expect(match = { it.message is SessionInit && it.message.flowName == NotaryFlow.Client::class.java.name }) { + expect(match = { it.message is SessionInit && it.message.clientFlowClass == NotaryFlow.Client::class.java }) { it.message as SessionInit assertEquals(node1.id, it.from) assertEquals(notary1Address, it.to) @@ -360,7 +359,7 @@ class StateMachineManagerTests { assertEquals(notary2.id, it.from) }, // Third pay - expect(match = { it.message is SessionInit && it.message.flowName == NotaryFlow.Client::class.java.name }) { + expect(match = { it.message is SessionInit && it.message.clientFlowClass == NotaryFlow.Client::class.java }) { it.message as SessionInit assertEquals(node1.id, it.from) assertEquals(notary1Address, it.to) @@ -375,7 +374,7 @@ class StateMachineManagerTests { @Test fun `other side ends before doing expected send`() { - node2.services.registerFlowInitiator(ReceiveFlow::class.java) { NoOpFlow() } + node2.services.registerServiceFlow(ReceiveFlow::class.java) { NoOpFlow() } val resultFuture = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity)).resultFuture net.runNetwork() assertThatExceptionOfType(FlowSessionException::class.java).isThrownBy { @@ -535,7 +534,7 @@ class StateMachineManagerTests { } } - node2.services.registerFlowInitiator(AskForExceptionFlow::class.java) { ConditionalExceptionFlow(it, "Hello") } + node2.services.registerServiceFlow(AskForExceptionFlow::class.java) { ConditionalExceptionFlow(it, "Hello") } val resultFuture = node1.services.startFlow(RetryOnExceptionFlow(node2.info.legalIdentity)).resultFuture net.runNetwork() assertThat(resultFuture.getOrThrow()).isEqualTo("Hello") @@ -563,7 +562,7 @@ class StateMachineManagerTests { ptx.signWith(node1.services.legalIdentityKey) val stx = ptx.toSignedTransaction() - node1.services.registerFlowInitiator(WaitingFlows.Waiter::class.java) { + node1.services.registerServiceFlow(WaitingFlows.Waiter::class.java) { WaitingFlows.Committer(it) { throw Exception("Error") } } val waiter = node2.services.startFlow(WaitingFlows.Waiter(stx, node1.info.legalIdentity)).resultFuture @@ -580,6 +579,14 @@ class StateMachineManagerTests { assertThatThrownBy { result.getOrThrow() }.hasMessageContaining("Vault").hasMessageContaining("private method") } + @Test + fun `custom client flow`() { + val receiveFlowFuture = node2.initiateSingleShotFlow(SendFlow::class) { ReceiveFlow(it) } + node1.services.startFlow(CustomSendFlow("Hello", node2.info.legalIdentity)).resultFuture + net.runNetwork() + assertThat(receiveFlowFuture.getOrThrow().receivedPayloads).containsOnly("Hello") + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////// //region Helpers @@ -598,7 +605,9 @@ class StateMachineManagerTests { return smm.findStateMachines(P::class.java).single() } - private fun sessionInit(flowMarker: KClass<*>, payload: Any? = null) = SessionInit(0, flowMarker.java.name, payload) + private fun sessionInit(clientFlowClass: KClass>, payload: Any? = null): SessionInit { + return SessionInit(0, clientFlowClass.java, payload) + } private val sessionConfirm = SessionConfirm(0, 0) private fun sessionData(payload: Any) = SessionData(0, payload) private val normalEnd = NormalSessionEnd(0) @@ -663,7 +672,7 @@ class StateMachineManagerTests { } - private class SendFlow(val payload: String, vararg val otherParties: Party) : FlowLogic() { + private open class SendFlow(val payload: String, vararg val otherParties: Party) : FlowLogic() { init { require(otherParties.isNotEmpty()) } @@ -672,6 +681,8 @@ class StateMachineManagerTests { override fun call() = otherParties.forEach { send(it, payload) } } + private interface CustomInterface + private class CustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty) private class ReceiveFlow(vararg val otherParties: Party) : FlowLogic() { object START_STEP : ProgressTracker.Step("Starting") diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/api/NodeInterestRates.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/api/NodeInterestRates.kt index 3698e18a85..23de6aa059 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/api/NodeInterestRates.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/api/NodeInterestRates.kt @@ -75,8 +75,8 @@ object NodeInterestRates { // Note: access to the singleton oracle property is via the registered SingletonSerializeAsToken Service. // Otherwise the Kryo serialisation of the call stack in the Quasar Fiber extends to include // the framework Oracle and the flow will crash. - services.registerFlowInitiator(RatesFixFlow.FixSignFlow::class.java) { FixSignHandler(it, this) } - services.registerFlowInitiator(RatesFixFlow.FixQueryFlow::class.java) { FixQueryHandler(it, this) } + services.registerServiceFlow(RatesFixFlow.FixSignFlow::class.java) { FixSignHandler(it, this) } + services.registerServiceFlow(RatesFixFlow.FixQueryFlow::class.java) { FixQueryHandler(it, this) } } private class FixSignHandler(val otherParty: Party, val service: Service) : FlowLogic() { diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/AutoOfferFlow.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/AutoOfferFlow.kt index 026c731135..b68a23f39e 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/AutoOfferFlow.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/AutoOfferFlow.kt @@ -31,7 +31,7 @@ object AutoOfferFlow { class Service(services: PluginServiceHub) : SingletonSerializeAsToken() { init { - services.registerFlowInitiator(Instigator::class.java) { Acceptor(it) } + services.registerServiceFlow(Instigator::class.java) { Acceptor(it) } } } diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/FixingFlow.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/FixingFlow.kt index 79a853ca38..918c45eb1e 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/FixingFlow.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/FixingFlow.kt @@ -24,7 +24,7 @@ object FixingFlow { class Service(services: PluginServiceHub) { init { - services.registerFlowInitiator(Floater::class.java) { Fixer(it) } + services.registerServiceFlow(Floater::class.java) { Fixer(it) } } } diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/UpdateBusinessDayFlow.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/UpdateBusinessDayFlow.kt index 036d840d1f..ffbbdb795e 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/UpdateBusinessDayFlow.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/UpdateBusinessDayFlow.kt @@ -30,7 +30,7 @@ object UpdateBusinessDayFlow { class Service(services: PluginServiceHub) { init { - services.registerFlowInitiator(Broadcast::class.java, ::UpdateBusinessDayHandler) + services.registerServiceFlow(Broadcast::class.java, ::UpdateBusinessDayHandler) } } diff --git a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/IRSTradeFlow.kt b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/IRSTradeFlow.kt index 8ca7780e14..b8cf1a3ec6 100644 --- a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/IRSTradeFlow.kt +++ b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/IRSTradeFlow.kt @@ -44,7 +44,7 @@ object IRSTradeFlow { class Service(services: PluginServiceHub) { init { - services.registerFlowInitiator(Requester::class.java, ::Receiver) + services.registerServiceFlow(Requester::class.java, ::Receiver) } } diff --git a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt index c4cc3100bc..f660b6a102 100644 --- a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt +++ b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt @@ -184,7 +184,7 @@ object SimmFlow { */ class Service(services: PluginServiceHub) { init { - services.registerFlowInitiator(Requester::class.java, ::Receiver) + services.registerServiceFlow(Requester::class.java, ::Receiver) } } diff --git a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/BuyerFlow.kt b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/BuyerFlow.kt index 95916b9a32..d7824dfec5 100644 --- a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/BuyerFlow.kt +++ b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/BuyerFlow.kt @@ -31,7 +31,7 @@ class BuyerFlow(val otherParty: Party, it.automaticallyExtractAttachments = true it.storePath } - services.registerFlowInitiator(SellerFlow::class.java) { BuyerFlow(it, attachmentsPath.toString()) } + services.registerServiceFlow(SellerFlow::class.java) { BuyerFlow(it, attachmentsPath.toString()) } } } diff --git a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt index 28ca2a8637..b64f505a62 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt @@ -140,14 +140,14 @@ fun getFreeLocalPorts(hostName: String, numberToAlloc: Int): List { /** * The given flow factory will be used to initiate just one instance of a flow of type [P] when a counterparty - * flow requests for it using [markerClass]. + * flow requests for it using [clientFlowClass]. * @return Returns a [ListenableFuture] holding the single [FlowStateMachineImpl] created by the request. */ inline fun > AbstractNode.initiateSingleShotFlow( - markerClass: KClass>, + clientFlowClass: KClass>, noinline flowFactory: (Party) -> P): ListenableFuture

{ val future = smm.changes.filter { it is StateMachineManager.Change.Add && it.logic is P }.map { it.logic as P }.toFuture() - services.registerFlowInitiator(markerClass.java, flowFactory) + services.registerServiceFlow(clientFlowClass.java, flowFactory) return future }