Flow registration takes in a Class object rather than a KClass

This commit is contained in:
Shams Asari 2017-03-15 11:55:20 +00:00
parent 6a6698b598
commit f581844f3f
21 changed files with 52 additions and 58 deletions

View File

@ -10,18 +10,20 @@ import kotlin.reflect.KClass
interface PluginServiceHub : ServiceHub {
/**
* Register the flow factory we wish to use when a initiating party attempts to communicate with us. The
* registration is done against a marker [KClass] which is sent in the session handshake by the other party. If this
* registration is done against a marker [Class] which is sent in the session handshake by the other party. If this
* marker class has been registered then the corresponding factory will be used to create the flow which will
* communicate with the other side. If there is no mapping then the session attempt is rejected.
* @param markerClass The marker [KClass] present in a session initiation attempt, which is a 1:1 mapping to a [Class]
* using the <pre>::class</pre> construct. Conventionally this is a [FlowLogic] subclass, however any class can
* be used, with the default being the class of the initiating flow. This enables the registration to be of the
* form: registerFlowInitiator(InitiatorFlow::class, ::InitiatedFlow)
* @param markerClass The marker [Class] present in a session initiation attempt. Conventionally this is a [FlowLogic]
* subclass, however any class can be used, with the default being the class of the initiating flow. This enables
* the registration to be of the form: `registerFlowInitiator(InitiatorFlow.class, InitiatedFlow::new)`
* @param flowFactory The flow factory generating the initiated flow.
*/
fun registerFlowInitiator(markerClass: Class<*>, flowFactory: (Party) -> FlowLogic<*>)
// TODO: remove dependency on Kotlin relfection (Kotlin KClass -> Java Class).
fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>)
@Deprecated(message = "Use overloaded method which uses Class instead of KClass. This is scheduled for removal in a future release.")
fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) {
registerFlowInitiator(markerClass.java, flowFactory)
}
/**
* Return the flow factory that has been registered with [markerClass], or null if no factory is found.

View File

@ -24,7 +24,7 @@ import java.util.*
object FxTransactionDemoTutorial {
// Would normally be called by custom service init in a CorDapp
fun registerFxProtocols(pluginHub: PluginServiceHub) {
pluginHub.registerFlowInitiator(ForeignExchangeFlow::class, ::ForeignExchangeRemoteFlow)
pluginHub.registerFlowInitiator(ForeignExchangeFlow::class.java, ::ForeignExchangeRemoteFlow)
}
}

View File

@ -17,7 +17,7 @@ import java.time.Duration
object WorkflowTransactionBuildTutorial {
// Would normally be called by custom service init in a CorDapp
fun registerWorkflowProtocols(pluginHub: PluginServiceHub) {
pluginHub.registerFlowInitiator(SubmitCompletionFlow::class, ::RecordCompletionFlow)
pluginHub.registerFlowInitiator(SubmitCompletionFlow::class.java, ::RecordCompletionFlow)
}
}

View File

@ -97,7 +97,7 @@ object IssuerFlow {
class Service(services: PluginServiceHub) {
init {
services.registerFlowInitiator(IssuanceRequester::class, ::Issuer)
services.registerFlowInitiator(IssuanceRequester::class.java, ::Issuer)
}
}
}

View File

@ -229,7 +229,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
private fun startBobAndCommunicateWithAlice(): Party {
val bob = startNode("Bob").getOrThrow()
bob.services.registerFlowInitiator(SendFlow::class, ::ReceiveFlow)
bob.services.registerFlowInitiator(SendFlow::class.java, ::ReceiveFlow)
val bobParty = bob.info.legalIdentity
// Perform a protocol exchange to force the peer queue to be created
alice.services.startFlow(SendFlow(bobParty, 0)).resultFuture.getOrThrow()

View File

@ -59,7 +59,6 @@ import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import kotlin.reflect.KClass
import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
/**
@ -126,10 +125,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
return serverThread.fetchFrom { smm.add(logic) }
}
override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) {
require(markerClass !in flowFactories) { "${markerClass.java.name} has already been used to register a flow" }
log.info("Registering flow ${markerClass.java.name}")
flowFactories[markerClass.java] = flowFactory
override fun registerFlowInitiator(markerClass: Class<*>, flowFactory: (Party) -> FlowLogic<*>) {
require(markerClass !in flowFactories) { "${markerClass.name} has already been used to register a flow" }
log.info("Registering flow ${markerClass.name}")
flowFactories[markerClass] = flowFactory
}
override fun getFlowFactory(markerClass: Class<*>): ((Party) -> FlowLogic<*>)? {
@ -224,7 +223,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
false
}
startMessagingService(CordaRPCOpsImpl(services, smm, database))
services.registerFlowInitiator(ContractUpgradeFlow.Instigator::class) { ContractUpgradeFlow.Acceptor(it) }
services.registerFlowInitiator(ContractUpgradeFlow.Instigator::class.java) { ContractUpgradeFlow.Acceptor(it) }
runOnStop += Runnable { net.stop() }
_networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured())
smm.start()

View File

@ -1,9 +1,9 @@
package net.corda.node.services
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.PluginServiceHub
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.flows.NotaryChangeFlow
import net.corda.core.node.CordaPluginRegistry
import java.util.function.Function
object NotaryChange {
@ -17,7 +17,7 @@ object NotaryChange {
*/
class Service(services: PluginServiceHub) : SingletonSerializeAsToken() {
init {
services.registerFlowInitiator(NotaryChangeFlow.Instigator::class) { NotaryChangeFlow.Acceptor(it) }
services.registerFlowInitiator(NotaryChangeFlow.Instigator::class.java) { NotaryChangeFlow.Acceptor(it) }
}
}
}

View File

@ -7,7 +7,6 @@ import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.PluginServiceHub
import net.corda.core.node.recordTransactions
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.unwrap
@ -35,9 +34,9 @@ object DataVending {
@ThreadSafe
class Service(services: PluginServiceHub) : SingletonSerializeAsToken() {
init {
services.registerFlowInitiator(FetchTransactionsFlow::class, ::FetchTransactionsHandler)
services.registerFlowInitiator(FetchAttachmentsFlow::class, ::FetchAttachmentsHandler)
services.registerFlowInitiator(BroadcastTransactionFlow::class, ::NotifyTransactionHandler)
services.registerFlowInitiator(FetchTransactionsFlow::class.java, ::FetchTransactionsHandler)
services.registerFlowInitiator(FetchAttachmentsFlow::class.java, ::FetchAttachmentsHandler)
services.registerFlowInitiator(BroadcastTransactionFlow::class.java, ::NotifyTransactionHandler)
}
private class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTransaction>(otherParty) {

View File

@ -18,7 +18,7 @@ import net.corda.node.services.api.ServiceHubInternal
abstract class NotaryService(services: ServiceHubInternal) : SingletonSerializeAsToken() {
init {
services.registerFlowInitiator(NotaryFlow.Client::class) { createFlow(it) }
services.registerFlowInitiator(NotaryFlow.Client::class.java) { createFlow(it) }
}
/** Implement a factory that specifies the transaction commit flow for the notary service to use */

View File

@ -21,7 +21,6 @@ import net.corda.testing.node.MockNetworkMapCache
import net.corda.testing.node.MockStorageService
import java.time.Clock
import java.util.concurrent.ConcurrentHashMap
import kotlin.reflect.KClass
open class MockServiceHubInternal(
val customVault: VaultService? = null,
@ -82,8 +81,8 @@ open class MockServiceHubInternal(
return smm.executor.fetchFrom { smm.add(logic) }
}
override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) {
flowFactories[markerClass.java] = flowFactory
override fun registerFlowInitiator(markerClass: Class<*>, flowFactory: (Party) -> FlowLogic<*>) {
flowFactories[markerClass] = flowFactory
}
override fun getFlowFactory(markerClass: Class<*>): ((Party) -> FlowLogic<*>)? {

View File

@ -89,7 +89,7 @@ class DataVendingServiceTests {
}
private fun MockNode.sendNotifyTx(tx: SignedTransaction, walletServiceNode: MockNode) {
walletServiceNode.services.registerFlowInitiator(NotifyTxFlow::class, ::NotifyTransactionHandler)
walletServiceNode.services.registerFlowInitiator(NotifyTxFlow::class.java, ::NotifyTransactionHandler)
services.startFlow(NotifyTxFlow(walletServiceNode.info.legalIdentity, tx))
network.runNetwork()
}

View File

@ -109,7 +109,7 @@ class StateMachineManagerTests {
@Test
fun `exception while fiber suspended`() {
node2.services.registerFlowInitiator(ReceiveFlow::class) { SendFlow("Hello", it) }
node2.services.registerFlowInitiator(ReceiveFlow::class.java) { SendFlow("Hello", it) }
val flow = ReceiveFlow(node2.info.legalIdentity)
val fiber = node1.services.startFlow(flow) as FlowStateMachineImpl
// Before the flow runs change the suspend action to throw an exception
@ -128,7 +128,7 @@ class StateMachineManagerTests {
@Test
fun `flow restarted just after receiving payload`() {
node2.services.registerFlowInitiator(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
node2.services.registerFlowInitiator(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() }
node1.services.startFlow(SendFlow("Hello", node2.info.legalIdentity))
// We push through just enough messages to get only the payload sent
@ -178,7 +178,7 @@ class StateMachineManagerTests {
@Test
fun `flow loaded from checkpoint will respond to messages from before start`() {
node1.services.registerFlowInitiator(ReceiveFlow::class) { SendFlow("Hello", it) }
node1.services.registerFlowInitiator(ReceiveFlow::class.java) { SendFlow("Hello", it) }
node2.services.startFlow(ReceiveFlow(node1.info.legalIdentity).nonTerminating()) // Prepare checkpointed receive flow
// Make sure the add() has finished initial processing.
node2.smm.executor.flush()
@ -242,8 +242,8 @@ class StateMachineManagerTests {
fun `sending to multiple parties`() {
val node3 = net.createNode(node1.info.address)
net.runNetwork()
node2.services.registerFlowInitiator(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
node3.services.registerFlowInitiator(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
node2.services.registerFlowInitiator(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() }
node3.services.registerFlowInitiator(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() }
val payload = "Hello World"
node1.services.startFlow(SendFlow(payload, node2.info.legalIdentity, node3.info.legalIdentity))
net.runNetwork()
@ -276,8 +276,8 @@ class StateMachineManagerTests {
net.runNetwork()
val node2Payload = "Test 1"
val node3Payload = "Test 2"
node2.services.registerFlowInitiator(ReceiveFlow::class) { SendFlow(node2Payload, it) }
node3.services.registerFlowInitiator(ReceiveFlow::class) { SendFlow(node3Payload, it) }
node2.services.registerFlowInitiator(ReceiveFlow::class.java) { SendFlow(node2Payload, it) }
node3.services.registerFlowInitiator(ReceiveFlow::class.java) { SendFlow(node3Payload, it) }
val multiReceiveFlow = ReceiveFlow(node2.info.legalIdentity, node3.info.legalIdentity).nonTerminating()
node1.services.startFlow(multiReceiveFlow)
node1.acceptableLiveFiberCountOnStop = 1
@ -302,7 +302,7 @@ class StateMachineManagerTests {
@Test
fun `both sides do a send as their first IO request`() {
node2.services.registerFlowInitiator(PingPongFlow::class) { PingPongFlow(it, 20L) }
node2.services.registerFlowInitiator(PingPongFlow::class.java) { PingPongFlow(it, 20L) }
node1.services.startFlow(PingPongFlow(node2.info.legalIdentity, 10L))
net.runNetwork()
@ -375,7 +375,7 @@ class StateMachineManagerTests {
@Test
fun `other side ends before doing expected send`() {
node2.services.registerFlowInitiator(ReceiveFlow::class) { NoOpFlow() }
node2.services.registerFlowInitiator(ReceiveFlow::class.java) { NoOpFlow() }
val resultFuture = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity)).resultFuture
net.runNetwork()
assertThatExceptionOfType(FlowSessionException::class.java).isThrownBy {
@ -535,7 +535,7 @@ class StateMachineManagerTests {
}
}
node2.services.registerFlowInitiator(AskForExceptionFlow::class) { ConditionalExceptionFlow(it, "Hello") }
node2.services.registerFlowInitiator(AskForExceptionFlow::class.java) { ConditionalExceptionFlow(it, "Hello") }
val resultFuture = node1.services.startFlow(RetryOnExceptionFlow(node2.info.legalIdentity)).resultFuture
net.runNetwork()
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello")
@ -563,7 +563,7 @@ class StateMachineManagerTests {
ptx.signWith(node1.services.legalIdentityKey)
val stx = ptx.toSignedTransaction()
node1.services.registerFlowInitiator(WaitingFlows.Waiter::class) {
node1.services.registerFlowInitiator(WaitingFlows.Waiter::class.java) {
WaitingFlows.Committer(it) { throw Exception("Error") }
}
val waiter = node2.services.startFlow(WaitingFlows.Waiter(stx, node1.info.legalIdentity)).resultFuture

View File

@ -3,7 +3,10 @@ package net.corda.irs.api
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.RetryableException
import net.corda.core.contracts.*
import net.corda.core.crypto.*
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.MerkleTreeException
import net.corda.core.crypto.Party
import net.corda.core.crypto.signWithECDSA
import net.corda.core.flows.FlowLogic
import net.corda.core.math.CubicSplineInterpolator
import net.corda.core.math.Interpolator
@ -71,8 +74,8 @@ object NodeInterestRates {
// Note: access to the singleton oracle property is via the registered SingletonSerializeAsToken Service.
// Otherwise the Kryo serialisation of the call stack in the Quasar Fiber extends to include
// the framework Oracle and the flow will crash.
services.registerFlowInitiator(RatesFixFlow.FixSignFlow::class) { FixSignHandler(it, this) }
services.registerFlowInitiator(RatesFixFlow.FixQueryFlow::class) { FixQueryHandler(it, this) }
services.registerFlowInitiator(RatesFixFlow.FixSignFlow::class.java) { FixSignHandler(it, this) }
services.registerFlowInitiator(RatesFixFlow.FixQueryFlow::class.java) { FixQueryHandler(it, this) }
}
private class FixSignHandler(val otherParty: Party, val service: Service) : FlowLogic<Unit>() {

View File

@ -30,13 +30,8 @@ object AutoOfferFlow {
class Service(services: PluginServiceHub) : SingletonSerializeAsToken() {
object DEALING : ProgressTracker.Step("Starting the deal flow") {
override fun childProgressTracker(): ProgressTracker = TwoPartyDealFlow.Secondary.tracker()
}
init {
services.registerFlowInitiator(Instigator::class) { Acceptor(it) }
services.registerFlowInitiator(Instigator::class.java) { Acceptor(it) }
}
}

View File

@ -27,7 +27,7 @@ object ExitServerFlow {
class Service(services: PluginServiceHub) {
init {
services.registerFlowInitiator(Broadcast::class, ::ExitServerHandler)
services.registerFlowInitiator(Broadcast::class.java, ::ExitServerHandler)
enabled = true
}
}

View File

@ -22,7 +22,7 @@ object FixingFlow {
class Service(services: PluginServiceHub) {
init {
services.registerFlowInitiator(Floater::class) { Fixer(it) }
services.registerFlowInitiator(Floater::class.java) { Fixer(it) }
}
}

View File

@ -30,7 +30,7 @@ object UpdateBusinessDayFlow {
class Service(services: PluginServiceHub) {
init {
services.registerFlowInitiator(Broadcast::class, ::UpdateBusinessDayHandler)
services.registerFlowInitiator(Broadcast::class.java, ::UpdateBusinessDayHandler)
}
}

View File

@ -44,7 +44,7 @@ object IRSTradeFlow {
class Service(services: PluginServiceHub) {
init {
services.registerFlowInitiator(Requester::class, ::Receiver)
services.registerFlowInitiator(Requester::class.java, ::Receiver)
}
}

View File

@ -184,7 +184,7 @@ object SimmFlow {
*/
class Service(services: PluginServiceHub) {
init {
services.registerFlowInitiator(Requester::class, ::Receiver)
services.registerFlowInitiator(Requester::class.java, ::Receiver)
}
}

View File

@ -31,7 +31,7 @@ class BuyerFlow(val otherParty: Party,
it.automaticallyExtractAttachments = true
it.storePath
}
services.registerFlowInitiator(SellerFlow::class) { BuyerFlow(it, attachmentsPath) }
services.registerFlowInitiator(SellerFlow::class.java) { BuyerFlow(it, attachmentsPath) }
}
}

View File

@ -5,7 +5,6 @@ package net.corda.testing
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import com.typesafe.config.Config
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.*
import net.corda.core.flows.FlowLogic
@ -148,7 +147,7 @@ inline fun <reified P : FlowLogic<*>> AbstractNode.initiateSingleShotFlow(
markerClass: KClass<out FlowLogic<*>>,
noinline flowFactory: (Party) -> P): ListenableFuture<P> {
val future = smm.changes.filter { it.addOrRemove == ADD && it.logic is P }.map { it.logic as P }.toFuture()
services.registerFlowInitiator(markerClass, flowFactory)
services.registerFlowInitiator(markerClass.java, flowFactory)
return future
}
@ -164,5 +163,3 @@ data class TestNodeConfiguration(
override val exportJMXto: String = "",
override val devMode: Boolean = true,
override val certificateSigningService: URL = URL("http://localhost")) : NodeConfiguration
fun Config.getHostAndPort(name: String) = HostAndPort.fromString(getString(name))