Introducing StartableByRPC and SchedulableFlow annotations, needed by flows started via RPC and schedulable flows respectively.

CordaPluginRegistry.requiredFlows is no longer needed as a result.
This commit is contained in:
Shams Asari
2017-05-10 11:28:25 +01:00
parent b155764023
commit 48f58b6dbc
52 changed files with 401 additions and 434 deletions

View File

@ -150,6 +150,9 @@ dependencies {
// Requery: object mapper for Kotlin
compile "io.requery:requery-kotlin:$requery_version"
// FastClasspathScanner: classpath scanning
compile 'io.github.lukehutch:fast-classpath-scanner:2.0.19'
// Integration test helpers
integrationTestCompile "junit:junit:$junit_version"
}

View File

@ -1,10 +1,11 @@
package net.corda.node
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.div
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.utilities.ALICE
import net.corda.node.driver.driver
import net.corda.node.services.startFlowPermission
@ -48,18 +49,12 @@ class BootTests {
}
}
@StartableByRPC
class ObjectInputStreamFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
System.clearProperty("jdk.serialFilter") // This checks that the node has already consumed the property.
val data = ByteArrayOutputStream().apply { ObjectOutputStream(this).use { it.writeObject(object : Serializable {}) } }.toByteArray()
ObjectInputStream(data.inputStream()).use { it.readObject() }
}
}
class BootTestsPlugin : CordaPluginRegistry() {
override val requiredFlows: Map<String, Set<String>> = mapOf(ObjectInputStreamFlow::class.java.name to setOf())
}

View File

@ -5,15 +5,16 @@ import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner
import io.github.lukehutch.fastclasspathscanner.scanner.ClassInfo
import net.corda.core.*
import net.corda.core.contracts.Amount
import net.corda.core.contracts.PartyAndReference
import net.corda.core.crypto.KeyStoreUtilities
import net.corda.core.crypto.X509Utilities
import net.corda.core.crypto.replaceCommonName
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.RPCOps
@ -21,7 +22,6 @@ import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
@ -47,7 +47,6 @@ import net.corda.node.services.network.PersistentNetworkMapService
import net.corda.node.services.persistence.*
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.statemachine.flowVersion
@ -64,8 +63,11 @@ import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.Database
import org.slf4j.Logger
import java.io.IOException
import java.lang.reflect.Modifier.*
import java.net.URL
import java.nio.file.FileAlreadyExistsException
import java.nio.file.Path
import java.nio.file.Paths
import java.security.KeyPair
import java.security.KeyStoreException
import java.time.Clock
@ -73,6 +75,7 @@ import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit.SECONDS
import kotlin.collections.ArrayList
import kotlin.reflect.KClass
import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
@ -93,14 +96,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
companion object {
val PRIVATE_KEY_FILE_NAME = "identity-private-key"
val PUBLIC_IDENTITY_FILE_NAME = "identity-public"
val defaultFlowWhiteList: Map<Class<out FlowLogic<*>>, Set<Class<*>>> = mapOf(
CashExitFlow::class.java to setOf(Amount::class.java, PartyAndReference::class.java),
CashIssueFlow::class.java to setOf(Amount::class.java, OpaqueBytes::class.java, Party::class.java),
CashPaymentFlow::class.java to setOf(Amount::class.java, Party::class.java),
FinalityFlow::class.java to setOf(LinkedHashSet::class.java),
ContractUpgradeFlow::class.java to emptySet()
)
}
// TODO: Persist this, as well as whether the node is registered.
@ -133,10 +128,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
override val schemaService: SchemaService get() = schemas
override val transactionVerifierService: TransactionVerifierService get() = txVerifierService
override val auditService: AuditService get() = auditService
override val rpcFlows: List<Class<out FlowLogic<*>>> get() = this@AbstractNode.rpcFlows
// Internal only
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val flowLogicRefFactory: FlowLogicRefFactoryInternal get() = flowLogicFactory
override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T> {
return serverThread.fetchFrom { smm.add(logic, flowInitiator) }
@ -176,13 +171,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
lateinit var net: MessagingService
lateinit var netMapCache: NetworkMapCacheInternal
lateinit var scheduler: NodeSchedulerService
lateinit var flowLogicFactory: FlowLogicRefFactoryInternal
lateinit var schemas: SchemaService
lateinit var auditService: AuditService
val customServices: ArrayList<Any> = ArrayList()
protected val runOnStop: ArrayList<Runnable> = ArrayList()
lateinit var database: Database
protected var dbCloser: Runnable? = null
private lateinit var rpcFlows: List<Class<out FlowLogic<*>>>
/** Locates and returns a service of the given type if loaded, or throws an exception if not found. */
inline fun <reified T : Any> findService() = customServices.filterIsInstance<T>().single()
@ -250,6 +245,16 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
startMessagingService(rpcOps)
installCoreFlows()
fun Class<out FlowLogic<*>>.isUserInvokable(): Boolean {
return isPublic(modifiers) && !isLocalClass && !isAnonymousClass && (!isMemberClass || isStatic(modifiers))
}
val flows = scanForFlows()
rpcFlows = flows.filter { it.isUserInvokable() && it.isAnnotationPresent(StartableByRPC::class.java) } +
// Add any core flows here
listOf(ContractUpgradeFlow::class.java)
runOnStop += Runnable { net.stop() }
_networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured())
smm.start()
@ -305,8 +310,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
// the identity key. But the infrastructure to make that easy isn't here yet.
keyManagement = makeKeyManagementService()
flowLogicFactory = initialiseFlowLogicFactory()
scheduler = NodeSchedulerService(services, database, flowLogicFactory, unfinishedSchedules = busyNodeLatch)
scheduler = NodeSchedulerService(services, database, unfinishedSchedules = busyNodeLatch)
val tokenizableServices = mutableListOf(storage, net, vault, keyManagement, identity, platformClock, scheduler)
makeAdvertisedServices(tokenizableServices)
@ -318,6 +322,53 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
return tokenizableServices
}
private fun scanForFlows(): List<Class<out FlowLogic<*>>> {
val pluginsDir = configuration.baseDirectory / "plugins"
log.info("Scanning plugins in $pluginsDir ...")
if (!pluginsDir.exists()) return emptyList()
val pluginJars = pluginsDir.list {
it.filter { it.isRegularFile() && it.toString().endsWith(".jar") }.toArray()
}
val scanResult = FastClasspathScanner().overrideClasspath(*pluginJars).scan() // This will only scan the plugin jars and nothing else
fun loadFlowClass(className: String): Class<out FlowLogic<*>>? {
return try {
// TODO Make sure this is loaded by the correct class loader
@Suppress("UNCHECKED_CAST")
Class.forName(className, false, javaClass.classLoader) as Class<out FlowLogic<*>>
} catch (e: Exception) {
log.warn("Unable to load flow class $className", e)
null
}
}
val flowClasses = scanResult.getNamesOfSubclassesOf(FlowLogic::class.java)
.mapNotNull { loadFlowClass(it) }
.filterNot { isAbstract(it.modifiers) }
fun URL.pluginName(): String {
return try {
Paths.get(toURI()).fileName.toString()
} catch (e: Exception) {
toString()
}
}
val classpathURLsField = ClassInfo::class.java.getDeclaredField("classpathElementURLs").apply { isAccessible = true }
flowClasses.groupBy {
val classInfo = scanResult.classNameToClassInfo[it.name]
@Suppress("UNCHECKED_CAST")
(classpathURLsField.get(classInfo) as Set<URL>).first()
}.forEach { url, classes ->
log.info("Found flows in plugin ${url.pluginName()}: ${classes.joinToString { it.name }}")
}
return flowClasses
}
private fun initUploaders(storageServices: Pair<TxWritableStorageService, CheckpointStorage>) {
val uploaders: List<FileUploader> = listOf(storageServices.first.attachments as NodeAttachmentService) +
customServices.filterIsInstance(AcceptsFileUpload::class.java)
@ -386,26 +437,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
}
private fun initialiseFlowLogicFactory(): FlowLogicRefFactoryInternal {
val flowWhitelist = HashMap<String, Set<String>>()
for ((flowClass, extraArgumentTypes) in defaultFlowWhiteList) {
val argumentWhitelistClassNames = HashSet(extraArgumentTypes.map { it.name })
flowClass.constructors.forEach {
it.parameters.mapTo(argumentWhitelistClassNames) { it.type.name }
}
flowWhitelist.merge(flowClass.name, argumentWhitelistClassNames, { x, y -> x + y })
}
for (plugin in pluginRegistries) {
for ((className, classWhitelist) in plugin.requiredFlows) {
flowWhitelist.merge(className, classWhitelist, { x, y -> x + y })
}
}
return FlowLogicRefFactoryImpl(flowWhitelist)
}
private fun makePluginServices(tokenizableServices: MutableList<Any>): List<Any> {
val pluginServices = pluginRegistries.flatMap { it.servicePlugins }.map { it.apply(services) }
tokenizableServices.addAll(pluginServices)

View File

@ -7,6 +7,7 @@ import net.corda.core.contracts.UpgradedContract
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
@ -20,6 +21,7 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.getRpcContext
import net.corda.node.services.messaging.requirePermission
import net.corda.node.services.startFlowPermission
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.transaction
import org.bouncycastle.asn1.x500.X500Name
@ -39,8 +41,6 @@ class CordaRPCOpsImpl(
private val smm: StateMachineManager,
private val database: Database
) : CordaRPCOps {
override val protocolVersion: Int = 0
override fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>> {
return database.transaction {
services.networkMapCache.track()
@ -115,12 +115,8 @@ class CordaRPCOpsImpl(
}
}
// TODO: Check that this flow is annotated as being intended for RPC invocation
override fun <T : Any> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> {
val rpcContext = getRpcContext()
rpcContext.requirePermission(startFlowPermission(logicType))
val currentUser = FlowInitiator.RPC(rpcContext.currentUser.username)
val stateMachine = services.invokeFlowAsync(logicType, currentUser, *args)
val stateMachine = startFlow(logicType, args)
return FlowProgressHandleImpl(
id = stateMachine.id,
returnValue = stateMachine.resultFuture,
@ -128,13 +124,17 @@ class CordaRPCOpsImpl(
)
}
// TODO: Check that this flow is annotated as being intended for RPC invocation
override fun <T : Any> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T> {
val stateMachine = startFlow(logicType, args)
return FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture)
}
private fun <T : Any> startFlow(logicType: Class<out FlowLogic<T>>, args: Array<out Any?>): FlowStateMachineImpl<T> {
require(logicType.isAnnotationPresent(StartableByRPC::class.java)) { "${logicType.name} was not designed for RPC" }
val rpcContext = getRpcContext()
rpcContext.requirePermission(startFlowPermission(logicType))
val currentUser = FlowInitiator.RPC(rpcContext.currentUser.username)
val stateMachine = services.invokeFlowAsync(logicType, currentUser, *args)
return FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture)
return services.invokeFlowAsync(logicType, currentUser, *args)
}
override fun attachmentExists(id: SecureHash): Boolean {
@ -171,11 +171,12 @@ class CordaRPCOpsImpl(
override fun waitUntilRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered
override fun partyFromKey(key: PublicKey) = services.identityService.partyFromKey(key)
@Suppress("DEPRECATION")
@Deprecated("Use partyFromX500Name instead")
override fun partyFromName(name: String) = services.identityService.partyFromName(name)
override fun partyFromX500Name(x500Name: X500Name)= services.identityService.partyFromX500Name(x500Name)
override fun registeredFlows(): List<String> = services.flowLogicRefFactory.flowWhitelist.keys.sorted()
override fun registeredFlows(): List<String> = services.rpcFlows.map { it.name }.sorted()
companion object {
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {

View File

@ -2,7 +2,9 @@ package net.corda.node.services.api
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.flows.*
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
import net.corda.core.node.PluginServiceHub
@ -13,6 +15,7 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.ServiceFlowInfo
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.FlowStateMachineImpl
interface NetworkMapCacheInternal : NetworkMapCache {
@ -47,11 +50,6 @@ interface NetworkMapCacheInternal : NetworkMapCache {
}
interface FlowLogicRefFactoryInternal : FlowLogicRefFactory {
val flowWhitelist: Map<String, Set<String>>
fun toFlowLogic(ref: FlowLogicRef): FlowLogic<*>
}
@CordaSerializable
sealed class NetworkCacheError : Exception() {
/** Indicates a failure to deregister, because of a rejected request from the remote node */
@ -64,12 +62,11 @@ abstract class ServiceHubInternal : PluginServiceHub {
}
abstract val monitoringService: MonitoringService
abstract val flowLogicRefFactory: FlowLogicRefFactoryInternal
abstract val schemaService: SchemaService
abstract override val networkMapCache: NetworkMapCacheInternal
abstract val schedulerService: SchedulerService
abstract val auditService: AuditService
abstract val rpcFlows: List<Class<out FlowLogic<*>>>
abstract val networkService: MessagingService
/**
@ -119,9 +116,9 @@ abstract class ServiceHubInternal : PluginServiceHub {
logicType: Class<out FlowLogic<T>>,
flowInitiator: FlowInitiator,
vararg args: Any?): FlowStateMachineImpl<T> {
val logicRef = flowLogicRefFactory.create(logicType, *args)
val logicRef = FlowLogicRefFactoryImpl.createForRPC(logicType, *args)
@Suppress("UNCHECKED_CAST")
val logic = flowLogicRefFactory.toFlowLogic(logicRef) as FlowLogic<T>
val logic = FlowLogicRefFactoryImpl.toFlowLogic(logicRef) as FlowLogic<T>
return startFlow(logic, flowInitiator)
}

View File

@ -12,9 +12,9 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.then
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.services.api.FlowLogicRefFactoryInternal
import net.corda.node.services.api.SchedulerService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.utilities.*
import org.apache.activemq.artemis.utils.ReusableLatch
import org.jetbrains.exposed.sql.Database
@ -38,14 +38,12 @@ import javax.annotation.concurrent.ThreadSafe
* but that starts to sound a lot like off-ledger state.
*
* @param services Core node services.
* @param flowLogicRefFactory Factory for restoring [FlowLogic] instances from references.
* @param schedulerTimerExecutor The executor the scheduler blocks on waiting for the clock to advance to the next
* activity. Only replace this for unit testing purposes. This is not the executor the [FlowLogic] is launched on.
*/
@ThreadSafe
class NodeSchedulerService(private val services: ServiceHubInternal,
private val database: Database,
private val flowLogicRefFactory: FlowLogicRefFactoryInternal,
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(),
private val unfinishedSchedules: ReusableLatch = ReusableLatch())
: SchedulerService, SingletonSerializeAsToken() {
@ -192,7 +190,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt)
} else {
// TODO: FlowLogicRefFactory needs to sort out the class loader etc
val flowLogic = flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef)
val flowLogic = FlowLogicRefFactoryImpl.toFlowLogic(scheduledActivity.logicRef)
log.trace { "Scheduler starting FlowLogic $flowLogic" }
scheduledFlow = flowLogic
null
@ -213,7 +211,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
val state = txState.data as SchedulableState
return try {
// This can throw as running contract code.
state.nextScheduledActivity(scheduledState.ref, flowLogicRefFactory)
state.nextScheduledActivity(scheduledState.ref, FlowLogicRefFactoryImpl)
} catch (e: Exception) {
log.error("Attempt to run scheduled state $scheduledState resulted in error.", e)
null

View File

@ -4,8 +4,8 @@ import net.corda.core.contracts.ContractState
import net.corda.core.contracts.SchedulableState
import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.contracts.StateAndRef
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
/**
* This observes the vault and schedules and unschedules activities appropriately based on state production and
@ -13,16 +13,16 @@ import net.corda.node.services.api.ServiceHubInternal
*/
class ScheduledActivityObserver(val services: ServiceHubInternal) {
init {
services.vaultService.rawUpdates.subscribe { update ->
update.consumed.forEach { services.schedulerService.unscheduleStateActivity(it.ref) }
update.produced.forEach { scheduleStateActivity(it, services.flowLogicRefFactory) }
services.vaultService.rawUpdates.subscribe { (consumed, produced) ->
consumed.forEach { services.schedulerService.unscheduleStateActivity(it.ref) }
produced.forEach { scheduleStateActivity(it) }
}
}
private fun scheduleStateActivity(produced: StateAndRef<ContractState>, flowLogicRefFactory: FlowLogicRefFactory) {
private fun scheduleStateActivity(produced: StateAndRef<ContractState>) {
val producedState = produced.state.data
if (producedState is SchedulableState) {
val scheduledAt = sandbox { producedState.nextScheduledActivity(produced.ref, flowLogicRefFactory)?.scheduledAt } ?: return
val scheduledAt = sandbox { producedState.nextScheduledActivity(produced.ref, FlowLogicRefFactoryImpl)?.scheduledAt } ?: return
services.schedulerService.scheduleStateActivity(ScheduledStateRef(produced.ref, scheduledAt))
}
}

View File

@ -1,14 +1,10 @@
package net.corda.node.services.statemachine
import com.google.common.annotations.VisibleForTesting
import com.google.common.primitives.Primitives
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.AppContext
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRef
import net.corda.core.flows.IllegalFlowLogicException
import net.corda.core.flows.*
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.node.services.api.FlowLogicRefFactoryInternal
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.util.*
@ -29,59 +25,26 @@ data class FlowLogicRefImpl internal constructor(val flowLogicClassName: String,
* Validation of types is performed on the way in and way out in case this object is passed between JVMs which might have differing
* whitelists.
*
* TODO: Ways to populate whitelist of "blessed" flows per node/party
* TODO: Ways to populate argument types whitelist. Per node/party or global?
* TODO: Align with API related logic for passing in FlowLogic references (FlowRef)
* TODO: Actual support for AppContext / AttachmentsClassLoader
* TODO: at some point check whether there is permission, beyond the annotations, to start flows. For example, as a security
* measure we might want the ability for the node admin to blacklist a flow such that it moves immediately to the "Flow Hospital"
* in response to a potential malicious use or buggy update to an app etc.
*/
class FlowLogicRefFactoryImpl(override val flowWhitelist: Map<String, Set<String>>) : SingletonSerializeAsToken(), FlowLogicRefFactoryInternal {
constructor() : this(mapOf())
// Pending real dependence on AppContext for class loading etc
@Suppress("UNUSED_PARAMETER")
private fun validateFlowClassName(className: String, appContext: AppContext) {
// TODO: make this specific to the attachments in the [AppContext] by including [SecureHash] in whitelist check
require(flowWhitelist.containsKey(className)) { "${FlowLogic::class.java.simpleName} of ${FlowLogicRef::class.java.simpleName} must have type on the whitelist: $className" }
}
// Pending real dependence on AppContext for class loading etc
@Suppress("UNUSED_PARAMETER")
private fun validateArgClassName(className: String, argClassName: String, appContext: AppContext) {
// TODO: consider more carefully what to whitelist and how to secure flows
// For now automatically accept standard java.lang.* and kotlin.* types.
// All other types require manual specification at FlowLogicRefFactory construction time.
if (argClassName.startsWith("java.lang.") || argClassName.startsWith("kotlin.")) {
return
object FlowLogicRefFactoryImpl : SingletonSerializeAsToken(), FlowLogicRefFactory {
override fun create(flowClass: Class<out FlowLogic<*>>, vararg args: Any?): FlowLogicRef {
if (!flowClass.isAnnotationPresent(SchedulableFlow::class.java)) {
throw IllegalFlowLogicException(flowClass, "because it's not a schedulable flow")
}
// TODO: make this specific to the attachments in the [AppContext] by including [SecureHash] in whitelist check
require(flowWhitelist[className]!!.contains(argClassName)) { "Args to $className must have types on the args whitelist: $argClassName, but it has ${flowWhitelist[className]}" }
return createForRPC(flowClass, *args)
}
/**
* Create a [FlowLogicRef] for the Kotlin primary constructor of a named [FlowLogic]
*/
fun createKotlin(flowLogicClassName: String, args: Map<String, Any?>, attachments: List<SecureHash> = emptyList()): FlowLogicRef {
val context = AppContext(attachments)
validateFlowClassName(flowLogicClassName, context)
for (arg in args.values.filterNotNull()) {
validateArgClassName(flowLogicClassName, arg.javaClass.name, context)
}
val clazz = Class.forName(flowLogicClassName)
require(FlowLogic::class.java.isAssignableFrom(clazz)) { "$flowLogicClassName is not a FlowLogic" }
@Suppress("UNCHECKED_CAST")
val logic = clazz as Class<FlowLogic<FlowLogic<*>>>
return createKotlin(logic, args)
}
/**
* Create a [FlowLogicRef] by assuming a single constructor and the given args.
*/
override fun create(type: Class<out FlowLogic<*>>, vararg args: Any?): FlowLogicRef {
fun createForRPC(flowClass: Class<out FlowLogic<*>>, vararg args: Any?): FlowLogicRef {
// TODO: This is used via RPC but it's probably better if we pass in argument names and values explicitly
// to avoid requiring only a single constructor.
val argTypes = args.map { it?.javaClass }
val constructor = try {
type.kotlin.constructors.single { ctor ->
flowClass.kotlin.constructors.single { ctor ->
// Get the types of the arguments, always boxed (as that's what we get in the invocation).
val ctorTypes = ctor.javaConstructor!!.parameterTypes.map { Primitives.wrap(it) }
if (argTypes.size != ctorTypes.size)
@ -93,14 +56,14 @@ class FlowLogicRefFactoryImpl(override val flowWhitelist: Map<String, Set<String
true
}
} catch (e: IllegalArgumentException) {
throw IllegalFlowLogicException(type, "due to ambiguous match against the constructors: $argTypes")
throw IllegalFlowLogicException(flowClass, "due to ambiguous match against the constructors: $argTypes")
} catch (e: NoSuchElementException) {
throw IllegalFlowLogicException(type, "due to missing constructor for arguments: $argTypes")
throw IllegalFlowLogicException(flowClass, "due to missing constructor for arguments: $argTypes")
}
// Build map of args from array
val argsMap = args.zip(constructor.parameters).map { Pair(it.second.name!!, it.first) }.toMap()
return createKotlin(type, argsMap)
return createKotlin(flowClass, argsMap)
}
/**
@ -108,44 +71,32 @@ class FlowLogicRefFactoryImpl(override val flowWhitelist: Map<String, Set<String
*
* TODO: Rethink language specific naming.
*/
fun createKotlin(type: Class<out FlowLogic<*>>, args: Map<String, Any?>): FlowLogicRef {
@VisibleForTesting
internal fun createKotlin(type: Class<out FlowLogic<*>>, args: Map<String, Any?>): FlowLogicRef {
// TODO: we need to capture something about the class loader or "application context" into the ref,
// perhaps as some sort of ThreadLocal style object. For now, just create an empty one.
val appContext = AppContext(emptyList())
validateFlowClassName(type.name, appContext)
// Check we can find a constructor and populate the args to it, but don't call it
createConstructor(appContext, type, args)
createConstructor(type, args)
return FlowLogicRefImpl(type.name, appContext, args)
}
/**
* Create a [FlowLogicRef] by trying to find a Java constructor that matches the given args.
*/
private fun createJava(type: Class<out FlowLogic<*>>, vararg args: Any?): FlowLogicRef {
// Build map for each
val argsMap = HashMap<String, Any?>(args.size)
var index = 0
args.forEach { argsMap["arg${index++}"] = it }
return createKotlin(type, argsMap)
}
override fun toFlowLogic(ref: FlowLogicRef): FlowLogic<*> {
fun toFlowLogic(ref: FlowLogicRef): FlowLogic<*> {
if (ref !is FlowLogicRefImpl) throw IllegalFlowLogicException(ref.javaClass, "FlowLogicRef was not created via correct FlowLogicRefFactory interface")
validateFlowClassName(ref.flowLogicClassName, ref.appContext)
val klass = Class.forName(ref.flowLogicClassName, true, ref.appContext.classLoader).asSubclass(FlowLogic::class.java)
return createConstructor(ref.appContext, klass, ref.args)()
return createConstructor(klass, ref.args)()
}
private fun createConstructor(appContext: AppContext, clazz: Class<out FlowLogic<*>>, args: Map<String, Any?>): () -> FlowLogic<*> {
private fun createConstructor(clazz: Class<out FlowLogic<*>>, args: Map<String, Any?>): () -> FlowLogic<*> {
for (constructor in clazz.kotlin.constructors) {
val params = buildParams(appContext, clazz, constructor, args) ?: continue
val params = buildParams(constructor, args) ?: continue
// If we get here then we matched every parameter
return { constructor.callBy(params) }
}
throw IllegalFlowLogicException(clazz, "as could not find matching constructor for: $args")
}
private fun buildParams(appContext: AppContext, clazz: Class<out FlowLogic<*>>, constructor: KFunction<FlowLogic<*>>, args: Map<String, Any?>): HashMap<KParameter, Any?>? {
private fun buildParams(constructor: KFunction<FlowLogic<*>>, args: Map<String, Any?>): HashMap<KParameter, Any?>? {
val params = hashMapOf<KParameter, Any?>()
val usedKeys = hashSetOf<String>()
for (parameter in constructor.parameters) {
@ -159,7 +110,6 @@ class FlowLogicRefFactoryImpl(override val flowWhitelist: Map<String, Set<String
// Not all args were used
return null
}
params.values.forEach { if (it is Any) validateArgClassName(clazz.name, it.javaClass.name, appContext) }
return params
}

View File

@ -210,7 +210,7 @@ object InteractiveShell {
*/
@JvmStatic
fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter) {
val matches = node.flowLogicFactory.flowWhitelist.keys.filter { nameFragment in it }
val matches = node.services.rpcFlows.filter { nameFragment in it.name }
if (matches.isEmpty()) {
output.println("No matching flow found, run 'flow list' to see your options.", Color.red)
return
@ -219,14 +219,12 @@ object InteractiveShell {
matches.forEachIndexed { i, s -> output.println("${i + 1}. $s", Color.yellow) }
return
}
val match = matches.single()
val clazz = Class.forName(match)
if (!FlowLogic::class.java.isAssignableFrom(clazz))
throw IllegalStateException("Found a non-FlowLogic class in the whitelist? $clazz")
@Suppress("UNCHECKED_CAST")
val clazz = matches.single() as Class<FlowLogic<*>>
try {
// TODO Flow invocation should use startFlowDynamic.
@Suppress("UNCHECKED_CAST")
val fsm = runFlowFromString({ node.services.startFlow(it, FlowInitiator.Shell) }, inputData, clazz as Class<FlowLogic<*>>)
val fsm = runFlowFromString({ node.services.startFlow(it, FlowInitiator.Shell) }, inputData, clazz)
// Show the progress tracker on the console until the flow completes or is interrupted with a
// Ctrl-C keypress.
val latch = CountDownLatch(1)
@ -275,10 +273,7 @@ object InteractiveShell {
var paramNamesFromConstructor: List<String>? = null
fun getPrototype(ctor: Constructor<*>): List<String> {
val argTypes = ctor.parameterTypes.map { it.simpleName }
val prototype = paramNamesFromConstructor!!.zip(argTypes).map { pair ->
val (name, type) = pair
"$name: $type"
}
val prototype = paramNamesFromConstructor!!.zip(argTypes).map { (name, type) -> "$name: $type" }
return prototype
}
try {

View File

@ -1,15 +1,9 @@
package net.corda.node.services.events;
import net.corda.core.flows.FlowLogic;
import net.corda.core.flows.FlowLogicRefFactory;
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl;
import org.junit.Test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class FlowLogicRefFromJavaTest {
@SuppressWarnings("unused")
@ -56,20 +50,11 @@ public class FlowLogicRefFromJavaTest {
@Test
public void test() {
Map<String, Set<String>> whiteList = new HashMap<>();
Set<String> argsList = new HashSet<>();
argsList.add(ParamType1.class.getName());
argsList.add(ParamType2.class.getName());
whiteList.put(JavaFlowLogic.class.getName(), argsList);
FlowLogicRefFactory factory = new FlowLogicRefFactoryImpl(whiteList);
factory.create(JavaFlowLogic.class, new ParamType1(1), new ParamType2("Hello Jack"));
FlowLogicRefFactoryImpl.INSTANCE.createForRPC(JavaFlowLogic.class, new ParamType1(1), new ParamType2("Hello Jack"));
}
@Test
public void testNoArg() {
Map<String, Set<String>> whiteList = new HashMap<>();
whiteList.put(JavaNoArgFlowLogic.class.getName(), new HashSet<>());
FlowLogicRefFactory factory = new FlowLogicRefFactoryImpl(whiteList);
factory.create(JavaNoArgFlowLogic.class);
FlowLogicRefFactoryImpl.INSTANCE.createForRPC(JavaNoArgFlowLogic.class);
}
}

View File

@ -1,9 +1,11 @@
package net.corda.node
import co.paralleluniverse.fibers.Suspendable
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.*
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.crypto.keys
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
@ -35,7 +37,9 @@ import org.junit.Before
import org.junit.Test
import rx.Observable
import java.io.ByteArrayOutputStream
import kotlin.test.*
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class CordaRPCOpsImplTest {
@ -118,7 +122,6 @@ class CordaRPCOpsImplTest {
@Test
fun `issue and move`() {
rpc.startFlow(::CashIssueFlow,
Amount(100, USD),
OpaqueBytes(ByteArray(1, { 1 })),
@ -225,4 +228,19 @@ class CordaRPCOpsImplTest {
assertArrayEquals(bufferFile.toByteArray(), bufferRpc.toByteArray())
}
@Test
fun `attempt to start non-RPC flow`() {
CURRENT_RPC_CONTEXT.set(RpcContext(User("user", "pwd", permissions = setOf(
startFlowPermission<NonRPCFlow>()
))))
assertThatExceptionOfType(IllegalArgumentException::class.java).isThrownBy {
rpc.startFlow(::NonRPCFlow)
}
}
class NonRPCFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() = Unit
}
}

View File

@ -12,7 +12,6 @@ import net.corda.node.serialization.NodeClock
import net.corda.node.services.api.*
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
@ -30,7 +29,6 @@ open class MockServiceHubInternal(
val mapCache: NetworkMapCacheInternal? = MockNetworkMapCache(),
val scheduler: SchedulerService? = null,
val overrideClock: Clock? = NodeClock(),
val flowFactory: FlowLogicRefFactoryInternal? = FlowLogicRefFactoryImpl(),
val schemas: SchemaService? = NodeSchemaService(),
val customTransactionVerifierService: TransactionVerifierService? = InMemoryTransactionVerifierService(2)
) : ServiceHubInternal() {
@ -56,8 +54,8 @@ open class MockServiceHubInternal(
get() = throw UnsupportedOperationException()
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val flowLogicRefFactory: FlowLogicRefFactoryInternal
get() = flowFactory ?: throw UnsupportedOperationException()
override val rpcFlows: List<Class<out FlowLogic<*>>>
get() = throw UnsupportedOperationException()
override val schemaService: SchemaService
get() = schemas ?: throw UnsupportedOperationException()
override val auditService: AuditService = DummyAuditService()

View File

@ -1,9 +1,8 @@
package net.corda.node.services.events
import net.corda.core.days
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.IllegalFlowLogicException
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import org.junit.Before
import org.junit.Test
import java.time.Duration
@ -31,67 +30,51 @@ class FlowLogicRefTest {
override fun call() = Unit
}
@Suppress("UNUSED_PARAMETER") // We will never use A or b
class NotWhiteListedKotlinFlowLogic(A: Int, b: String) : FlowLogic<Unit>() {
class NonSchedulableFlow : FlowLogic<Unit>() {
override fun call() = Unit
}
lateinit var factory: FlowLogicRefFactoryImpl
@Before
fun setup() {
// We have to allow Java boxed primitives but Kotlin warns we shouldn't be using them
factory = FlowLogicRefFactoryImpl(mapOf(Pair(KotlinFlowLogic::class.java.name, setOf(ParamType1::class.java.name, ParamType2::class.java.name)),
Pair(KotlinNoArgFlowLogic::class.java.name, setOf())))
@Test
fun `create kotlin no arg`() {
FlowLogicRefFactoryImpl.createForRPC(KotlinNoArgFlowLogic::class.java)
}
@Test
fun testCreateKotlinNoArg() {
factory.create(KotlinNoArgFlowLogic::class.java)
}
@Test
fun testCreateKotlin() {
fun `create kotlin`() {
val args = mapOf(Pair("A", ParamType1(1)), Pair("b", ParamType2("Hello Jack")))
factory.createKotlin(KotlinFlowLogic::class.java, args)
FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, args)
}
@Test
fun testCreatePrimary() {
factory.create(KotlinFlowLogic::class.java, ParamType1(1), ParamType2("Hello Jack"))
}
@Test(expected = IllegalArgumentException::class)
fun testCreateNotWhiteListed() {
factory.create(NotWhiteListedKotlinFlowLogic::class.java, ParamType1(1), ParamType2("Hello Jack"))
fun `create primary`() {
FlowLogicRefFactoryImpl.createForRPC(KotlinFlowLogic::class.java, ParamType1(1), ParamType2("Hello Jack"))
}
@Test
fun testCreateKotlinVoid() {
factory.createKotlin(KotlinFlowLogic::class.java, emptyMap())
fun `create kotlin void`() {
FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, emptyMap())
}
@Test
fun testCreateKotlinNonPrimary() {
fun `create kotlin non primary`() {
val args = mapOf(Pair("C", ParamType2("Hello Jack")))
factory.createKotlin(KotlinFlowLogic::class.java, args)
}
@Test(expected = IllegalArgumentException::class)
fun testCreateArgNotWhiteListed() {
val args = mapOf(Pair("illegal", 1.days))
factory.createKotlin(KotlinFlowLogic::class.java, args)
FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, args)
}
@Test
fun testCreateJavaPrimitiveNoRegistrationRequired() {
fun `create java primitive no registration required`() {
val args = mapOf(Pair("primitive", "A string"))
factory.createKotlin(KotlinFlowLogic::class.java, args)
FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, args)
}
@Test
fun testCreateKotlinPrimitiveNoRegistrationRequired() {
fun `create kotlin primitive no registration required`() {
val args = mapOf(Pair("kotlinType", 3))
factory.createKotlin(KotlinFlowLogic::class.java, args)
FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, args)
}
@Test(expected = IllegalFlowLogicException::class)
fun `create for non-schedulable flow logic`() {
FlowLogicRefFactoryImpl.create(NonSchedulableFlow::class.java)
}
}

View File

@ -44,10 +44,6 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
val schedulerGatedExecutor = AffinityExecutor.Gate(true)
// We have to allow Java boxed primitives but Kotlin warns we shouldn't be using them
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
val factory = FlowLogicRefFactoryImpl(mapOf(Pair(TestFlowLogic::class.java.name, setOf(NodeSchedulerServiceTest::class.java.name, Integer::class.java.name))))
lateinit var services: MockServiceHubInternal
lateinit var scheduler: NodeSchedulerService
@ -82,12 +78,16 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
database.transaction {
val kms = MockKeyManagementService(ALICE_KEY)
val nullIdentity = X500Name("cn=None")
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.PeerHandle(0, nullIdentity), AffinityExecutor.ServiceAffinityExecutor("test", 1), database)
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(
false,
InMemoryMessagingNetwork.PeerHandle(0, nullIdentity),
AffinityExecutor.ServiceAffinityExecutor("test", 1),
database)
services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
override val testReference = this@NodeSchedulerServiceTest
}
scheduler = NodeSchedulerService(services, database, factory, schedulerGatedExecutor)
scheduler = NodeSchedulerService(services, database, schedulerGatedExecutor)
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
val mockSMM = StateMachineManager(services, listOf(services, scheduler), DBCheckpointStorage(), smmExecutor, database)
mockSMM.changes.subscribe { change ->
@ -269,7 +269,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
database.transaction {
apply {
val freshKey = services.keyManagementService.freshKey()
val state = TestState(factory.create(TestFlowLogic::class.java, increment), instant)
val state = TestState(FlowLogicRefFactoryImpl.createForRPC(TestFlowLogic::class.java, increment), instant)
val usefulTX = TransactionType.General.Builder(null).apply {
addOutputState(state, DUMMY_NOTARY)
addCommand(Command(), freshKey.public)

View File

@ -7,7 +7,7 @@ import net.corda.core.crypto.containsAny
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.flows.SchedulableFlow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.linearHeadsOfType
import net.corda.core.utilities.DUMMY_NOTARY
@ -15,7 +15,6 @@ import net.corda.flows.FinalityFlow
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.transaction
import net.corda.testing.node.MockNetwork
import org.junit.After
@ -68,6 +67,7 @@ class ScheduledFlowTests {
}
}
@SchedulableFlow
class ScheduledFlow(val stateRef: StateRef) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
@ -87,14 +87,6 @@ class ScheduledFlowTests {
}
}
object ScheduledFlowTestPlugin : CordaPluginRegistry() {
override val requiredFlows: Map<String, Set<String>> = mapOf(
InsertInitialStateFlow::class.java.name to setOf(Party::class.java.name),
ScheduledFlow::class.java.name to setOf(StateRef::class.java.name)
)
}
@Before
fun setup() {
net = MockNetwork(threadPerNode = true)
@ -103,8 +95,6 @@ class ScheduledFlowTests {
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type)))
nodeA = net.createNode(notaryNode.info.address, start = false)
nodeB = net.createNode(notaryNode.info.address, start = false)
nodeA.testPluginRegistries.add(ScheduledFlowTestPlugin)
nodeB.testPluginRegistries.add(ScheduledFlowTestPlugin)
net.startNodes()
}
@ -138,7 +128,7 @@ class ScheduledFlowTests {
}
@Test
fun `Run a whole batch of scheduled flows`() {
fun `run a whole batch of scheduled flows`() {
val N = 100
for (i in 0..N - 1) {
nodeA.services.startFlow(InsertInitialStateFlow(nodeB.info.legalIdentity))