From fe4635a81e4f4cf216a8f3cdecd762cfe79317ce Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Tue, 3 Oct 2017 17:47:47 +0100 Subject: [PATCH] Start Flows from services Update tests Add a test to show progress tracking works Include service startable flow in Cordapp data structures. Respond to PR comments Fixup change of api inline with PR comments. Address PR comments --- .../kotlin/net/corda/core/cordapp/Cordapp.kt | 2 + .../net/corda/core/flows/FlowInitiator.kt | 4 + .../corda/core/flows/StartableByService.kt | 12 ++ .../core/internal/cordapp/CordappImpl.kt | 1 + .../net/corda/core/node/AppServiceHub.kt | 30 ++++ .../corda/core/node/services/CordaService.kt | 2 +- .../net/corda/node/internal/AbstractNode.kt | 60 +++++++- .../node/internal/cordapp/CordappLoader.kt | 14 +- .../node/shell/FlowWatchPrintingSubscriber.kt | 1 + .../corda/node/internal/CordaServiceTest.kt | 132 ++++++++++++++++++ .../internal/cordapp/CordappLoaderTest.kt | 5 + .../corda/testing/node/MockCordappProvider.kt | 2 +- 12 files changed, 254 insertions(+), 11 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/flows/StartableByService.kt create mode 100644 core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt create mode 100644 node/src/test/kotlin/net/corda/node/internal/CordaServiceTest.kt diff --git a/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt b/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt index 32c011866e..8f7f5f8d56 100644 --- a/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt +++ b/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt @@ -17,6 +17,7 @@ import java.net.URL * @property contractClassNames List of contracts * @property initiatedFlows List of initiatable flow classes * @property rpcFlows List of RPC initiable flows classes + * @property serviceFlows List of [CordaService] initiable flows classes * @property schedulableFlows List of flows startable by the scheduler * @property servies List of RPC services * @property serializationWhitelists List of Corda plugin registries @@ -28,6 +29,7 @@ interface Cordapp { val contractClassNames: List val initiatedFlows: List>> val rpcFlows: List>> + val serviceFlows: List>> val schedulableFlows: List>> val services: List> val serializationWhitelists: List diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowInitiator.kt b/core/src/main/kotlin/net/corda/core/flows/FlowInitiator.kt index 3860e73a06..549af46dcf 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowInitiator.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowInitiator.kt @@ -20,6 +20,10 @@ sealed class FlowInitiator : Principal { data class Peer(val party: Party) : FlowInitiator() { override fun getName(): String = party.name.toString() } + /** Started by a CordaService. */ + data class Service(val serviceClassName: String) : FlowInitiator() { + override fun getName(): String = serviceClassName + } /** Started as scheduled activity. */ data class Scheduled(val scheduledState: ScheduledStateRef) : FlowInitiator() { override fun getName(): String = "Scheduler" diff --git a/core/src/main/kotlin/net/corda/core/flows/StartableByService.kt b/core/src/main/kotlin/net/corda/core/flows/StartableByService.kt new file mode 100644 index 0000000000..3674c53e04 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/flows/StartableByService.kt @@ -0,0 +1,12 @@ +package net.corda.core.flows + +import kotlin.annotation.AnnotationTarget.CLASS + +/** + * Any [FlowLogic] which is to be started by the [AppServiceHub] interface from + * within a [CordaService] must have this annotation. If it's missing the + * flow will not be allowed to start and an exception will be thrown. + */ +@Target(CLASS) +@MustBeDocumented +annotation class StartableByService \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt b/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt index e03e83f92c..02410db37a 100644 --- a/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt +++ b/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt @@ -12,6 +12,7 @@ data class CordappImpl( override val contractClassNames: List, override val initiatedFlows: List>>, override val rpcFlows: List>>, + override val serviceFlows: List>>, override val schedulableFlows: List>>, override val services: List>, override val serializationWhitelists: List, diff --git a/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt new file mode 100644 index 0000000000..94ab6da2e9 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt @@ -0,0 +1,30 @@ +package net.corda.core.node + +import net.corda.core.flows.FlowLogic +import net.corda.core.messaging.FlowHandle +import net.corda.core.messaging.FlowProgressHandle +import rx.Observable + +/** + * A [CordaService] annotated class requires a constructor taking a + * single parameter of type [AppServiceHub]. + * With the [AppServiceHub] parameter a [CordaService] is able to access to privileged operations. + * In particular such a [CordaService] can initiate and track flows marked with [net.corda.core.flows.StartableByService]. + */ +interface AppServiceHub : ServiceHub { + + /** + * Start the given flow with the given arguments. [flow] must be annotated + * with [net.corda.core.flows.StartableByService]. + * TODO it is assumed here that the flow object has an appropriate classloader. + */ + fun startFlow(flow: FlowLogic): FlowHandle + + /** + * Start the given flow with the given arguments, returning an [Observable] with a single observation of the + * result of running the flow. [flow] must be annotated with [net.corda.core.flows.StartableByService]. + * TODO it is assumed here that the flow object has an appropriate classloader. + */ + fun startTrackedFlow(flow: FlowLogic): FlowProgressHandle + +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/node/services/CordaService.kt b/core/src/main/kotlin/net/corda/core/node/services/CordaService.kt index 0e40e3acce..cdefa92037 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/CordaService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/CordaService.kt @@ -7,7 +7,7 @@ import kotlin.annotation.AnnotationTarget.CLASS /** * Annotate any class that needs to be a long-lived service within the node, such as an oracle, with this annotation. - * Such a class needs to have a constructor with a single parameter of type [ServiceHub]. This constructor will be invoked + * Such a class needs to have a constructor with a single parameter of type [AppServiceHub]. This constructor will be invoked * during node start to initialise the service. The service hub provided can be used to get information about the node * that may be necessary for the service. Corda services are created as singletons within the node and are available * to flows via [ServiceHub.cordaService]. diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index d8146ff1fe..15ed414620 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -20,9 +20,8 @@ import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.toX509CertHolder import net.corda.core.internal.uncheckedCast -import net.corda.core.messaging.CordaRPCOps -import net.corda.core.messaging.RPCOps -import net.corda.core.messaging.SingleMessageRecipient +import net.corda.core.messaging.* +import net.corda.core.node.AppServiceHub import net.corda.core.node.NodeInfo import net.corda.core.node.ServiceHub import net.corda.core.node.services.* @@ -264,6 +263,49 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, } } + /** + * This customizes the ServiceHub for each CordaService that is initiating flows + */ + private class AppServiceHubImpl(val serviceHub: ServiceHubInternal): AppServiceHub, ServiceHub by serviceHub { + lateinit var serviceInstance: T + override fun startTrackedFlow(flow: FlowLogic): FlowProgressHandle { + val stateMachine = startFlowChecked(flow) + return FlowProgressHandleImpl( + id = stateMachine.id, + returnValue = stateMachine.resultFuture, + progress = stateMachine.logic.track()?.updates ?: Observable.empty() + ) + } + + override fun startFlow(flow: FlowLogic): FlowHandle { + val stateMachine = startFlowChecked(flow) + return FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture) + } + + private fun startFlowChecked(flow: FlowLogic): FlowStateMachineImpl { + val logicType = flow.javaClass + require(logicType.isAnnotationPresent(StartableByService::class.java)) { "${logicType.name} was not designed for starting by a CordaService" } + val currentUser = FlowInitiator.Service(serviceInstance.javaClass.name) + return serviceHub.startFlow(flow, currentUser) + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is AppServiceHubImpl<*>) return false + + if (serviceHub != other.serviceHub) return false + if (serviceInstance != other.serviceInstance) return false + + return true + } + + override fun hashCode(): Int { + var result = serviceHub.hashCode() + result = 31 * result + serviceInstance.hashCode() + return result + } + } + /** * Use this method to install your Corda services in your tests. This is automatically done by the node when it * starts up for all classes it finds which are annotated with [CordaService]. @@ -276,8 +318,16 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val constructor = serviceClass.getDeclaredConstructor(ServiceHub::class.java, PublicKey::class.java).apply { isAccessible = true } constructor.newInstance(services, myNotaryIdentity!!.owningKey) } else { - val constructor = serviceClass.getDeclaredConstructor(ServiceHub::class.java).apply { isAccessible = true } - constructor.newInstance(services) + try { + val extendedServiceConstructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java).apply { isAccessible = true } + val serviceContext = AppServiceHubImpl(services) + serviceContext.serviceInstance = extendedServiceConstructor.newInstance(serviceContext) + serviceContext.serviceInstance + } catch (ex: NoSuchMethodException) { + val constructor = serviceClass.getDeclaredConstructor(ServiceHub::class.java).apply { isAccessible = true } + log.warn("${serviceClass.name} is using legacy CordaService constructor with ServiceHub parameter. Upgrade to an AppServiceHub parameter to enable updated API features.") + constructor.newInstance(services) + } } } catch (e: InvocationTargetException) { throw ServiceInstantiationException(e.cause) diff --git a/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappLoader.kt b/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappLoader.kt index 1449a32fb8..e1cec24244 100644 --- a/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappLoader.kt +++ b/node/src/main/kotlin/net/corda/node/internal/cordapp/CordappLoader.kt @@ -169,6 +169,7 @@ class CordappLoader private constructor(private val cordappJarPaths: List) listOf(), listOf(), listOf(), + listOf(), setOf(), ContractUpgradeFlow.javaClass.protectionDomain.codeSource.location // Core JAR location ) @@ -180,6 +181,7 @@ class CordappLoader private constructor(private val cordappJarPaths: List) CordappImpl(findContractClassNames(scanResult), findInitiatedFlows(scanResult), findRPCFlows(scanResult), + findServiceFlows(scanResult), findSchedulableFlows(scanResult), findServices(scanResult), findPlugins(it), @@ -207,14 +209,18 @@ class CordappLoader private constructor(private val cordappJarPaths: List) } } - private fun findRPCFlows(scanResult: ScanResult): List>> { - fun Class>.isUserInvokable(): Boolean { - return Modifier.isPublic(modifiers) && !isLocalClass && !isAnonymousClass && (!isMemberClass || Modifier.isStatic(modifiers)) - } + private fun Class>.isUserInvokable(): Boolean { + return Modifier.isPublic(modifiers) && !isLocalClass && !isAnonymousClass && (!isMemberClass || Modifier.isStatic(modifiers)) + } + private fun findRPCFlows(scanResult: ScanResult): List>> { return scanResult.getClassesWithAnnotation(FlowLogic::class, StartableByRPC::class).filter { it.isUserInvokable() } } + private fun findServiceFlows(scanResult: ScanResult): List>> { + return scanResult.getClassesWithAnnotation(FlowLogic::class, StartableByService::class) + } + private fun findSchedulableFlows(scanResult: ScanResult): List>> { return scanResult.getClassesWithAnnotation(FlowLogic::class, SchedulableFlow::class) } diff --git a/node/src/main/kotlin/net/corda/node/shell/FlowWatchPrintingSubscriber.kt b/node/src/main/kotlin/net/corda/node/shell/FlowWatchPrintingSubscriber.kt index 0c6db1a7b8..32920ffd23 100644 --- a/node/src/main/kotlin/net/corda/node/shell/FlowWatchPrintingSubscriber.kt +++ b/node/src/main/kotlin/net/corda/node/shell/FlowWatchPrintingSubscriber.kt @@ -111,6 +111,7 @@ class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Sub is FlowInitiator.Shell -> "Shell" // TODO Change when we will have more information on shell user. is FlowInitiator.Peer -> flowInitiator.party.name.organisation is FlowInitiator.RPC -> "RPC: " + flowInitiator.username + is FlowInitiator.Service -> "Service: " + flowInitiator.name } } diff --git a/node/src/test/kotlin/net/corda/node/internal/CordaServiceTest.kt b/node/src/test/kotlin/net/corda/node/internal/CordaServiceTest.kt new file mode 100644 index 0000000000..579b865c1f --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/CordaServiceTest.kt @@ -0,0 +1,132 @@ +package net.corda.node.internal + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowInitiator +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByService +import net.corda.core.node.AppServiceHub +import net.corda.core.node.ServiceHub +import net.corda.core.node.services.CordaService +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.ProgressTracker +import net.corda.finance.DOLLARS +import net.corda.finance.flows.CashIssueFlow +import net.corda.node.internal.cordapp.DummyRPCFlow +import net.corda.node.services.transactions.ValidatingNotaryService +import net.corda.nodeapi.internal.ServiceInfo +import net.corda.testing.DUMMY_NOTARY +import net.corda.testing.node.MockNetwork +import net.corda.testing.setCordappPackages +import net.corda.testing.unsetCordappPackages +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.* + +@StartableByService +class DummyServiceFlow : FlowLogic() { + companion object { + object TEST_STEP : ProgressTracker.Step("Custom progress step") + } + override val progressTracker: ProgressTracker = ProgressTracker(TEST_STEP) + + @Suspendable + override fun call(): FlowInitiator { + // We call a subFlow, otehrwise there is no chance to subscribe to the ProgressTracker + subFlow(CashIssueFlow(100.DOLLARS, OpaqueBytes.of(1), serviceHub.networkMapCache.notaryIdentities.first())) + progressTracker.currentStep = TEST_STEP + return stateMachine.flowInitiator + } +} + +@CordaService +class TestCordaService(val appServiceHub: AppServiceHub): SingletonSerializeAsToken() { + fun startServiceFlow() { + val handle = appServiceHub.startFlow(DummyServiceFlow()) + val initiator = handle.returnValue.get() + initiator as FlowInitiator.Service + assertEquals(this.javaClass.name, initiator.serviceClassName) + } + + fun startServiceFlowAndTrack() { + val handle = appServiceHub.startTrackedFlow(DummyServiceFlow()) + val count = AtomicInteger(0) + val subscriber = handle.progress.subscribe { count.incrementAndGet() } + handle.returnValue.get() + // Simply prove some progress was made. + // The actual number is currently 11, but don't want to hard code an implementation detail. + assertTrue(count.get() > 1) + subscriber.unsubscribe() + } + +} + +@CordaService +class TestCordaService2(val appServiceHub: AppServiceHub): SingletonSerializeAsToken() { + fun startInvalidRPCFlow() { + val handle = appServiceHub.startFlow(DummyRPCFlow()) + handle.returnValue.get() + } + +} + +@CordaService +class LegacyCordaService(val simpleServiceHub: ServiceHub): SingletonSerializeAsToken() { + +} + +class CordaServiceTest { + lateinit var mockNet: MockNetwork + lateinit var notaryNode: StartedNode + lateinit var nodeA: StartedNode + + @Before + fun start() { + setCordappPackages("net.corda.node.internal","net.corda.finance") + mockNet = MockNetwork(threadPerNode = true) + notaryNode = mockNet.createNode( + legalName = DUMMY_NOTARY.name, + advertisedServices = *arrayOf(ServiceInfo(ValidatingNotaryService.type))) + nodeA = mockNet.createNode(notaryNode.network.myAddress) + mockNet.startNodes() + } + + @After + fun cleanUp() { + mockNet.stopNodes() + unsetCordappPackages() + } + + @Test + fun `Can find distinct services on node`() { + val service = nodeA.services.cordaService(TestCordaService::class.java) + val service2 = nodeA.services.cordaService(TestCordaService2::class.java) + val legacyService = nodeA.services.cordaService(LegacyCordaService::class.java) + assertEquals(TestCordaService::class.java, service.javaClass) + assertEquals(TestCordaService2::class.java, service2.javaClass) + assertNotEquals(service.appServiceHub, service2.appServiceHub) // Each gets a customised AppServiceHub + assertEquals(LegacyCordaService::class.java, legacyService.javaClass) + } + + @Test + fun `Can start StartableByService flows`() { + val service = nodeA.services.cordaService(TestCordaService::class.java) + service.startServiceFlow() + } + + @Test + fun `Can't start StartableByRPC flows`() { + val service = nodeA.services.cordaService(TestCordaService2::class.java) + assertFailsWith { service.startInvalidRPCFlow() } + } + + + @Test + fun `Test flow with progress tracking`() { + val service = nodeA.services.cordaService(TestCordaService::class.java) + service.startServiceFlowAndTrack() + } + +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/internal/cordapp/CordappLoaderTest.kt b/node/src/test/kotlin/net/corda/node/internal/cordapp/CordappLoaderTest.kt index f2352d475d..c1a1b6e5b0 100644 --- a/node/src/test/kotlin/net/corda/node/internal/cordapp/CordappLoaderTest.kt +++ b/node/src/test/kotlin/net/corda/node/internal/cordapp/CordappLoaderTest.kt @@ -1,5 +1,6 @@ package net.corda.node.internal.cordapp +import co.paralleluniverse.fibers.Suspendable import net.corda.core.flows.* import org.assertj.core.api.Assertions.assertThat import org.junit.Test @@ -7,21 +8,25 @@ import java.nio.file.Paths @InitiatingFlow class DummyFlow : FlowLogic() { + @Suspendable override fun call() { } } @InitiatedBy(DummyFlow::class) class LoaderTestFlow(unusedSession: FlowSession) : FlowLogic() { + @Suspendable override fun call() { } } @SchedulableFlow class DummySchedulableFlow : FlowLogic() { + @Suspendable override fun call() { } } @StartableByRPC class DummyRPCFlow : FlowLogic() { + @Suspendable override fun call() { } } diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/node/MockCordappProvider.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/node/MockCordappProvider.kt index f51d59d20d..976d595ec6 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/node/MockCordappProvider.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/node/MockCordappProvider.kt @@ -14,7 +14,7 @@ class MockCordappProvider(cordappLoader: CordappLoader) : CordappProviderImpl(co val cordappRegistry = mutableListOf>() fun addMockCordapp(contractClassName: ContractClassName, services: ServiceHub) { - val cordapp = CordappImpl(listOf(contractClassName), emptyList(), emptyList(), emptyList(), emptyList(), emptyList(), emptySet(), Paths.get(".").toUri().toURL()) + val cordapp = CordappImpl(listOf(contractClassName), emptyList(), emptyList(), emptyList(), emptyList(), emptyList(), emptyList(), emptySet(), Paths.get(".").toUri().toURL()) if (cordappRegistry.none { it.first.contractClassNames.contains(contractClassName) }) { cordappRegistry.add(Pair(cordapp, findOrImportAttachment(contractClassName.toByteArray(), services))) }