CORDA-891 Refactoring for #2273 (#2306)

* Make FlowLogicRefFactoryImpl a class.

* Replace instanceof with polymorphism.

* Fix out-of-scope spelling error.
This commit is contained in:
Andrzej Cichocki 2018-01-02 13:11:43 +00:00 committed by GitHub
parent a9f9bf2c0b
commit fe3c2b3983
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 69 additions and 59 deletions

View File

@ -11,6 +11,8 @@ import net.corda.core.serialization.CordaSerializable
@DoNotImplement
interface FlowLogicRefFactory {
fun create(flowClass: Class<out FlowLogic<*>>, vararg args: Any?): FlowLogicRef
fun createForRPC(flowClass: Class<out FlowLogic<*>>, vararg args: Any?): FlowLogicRef
fun toFlowLogic(ref: FlowLogicRef): FlowLogic<*>
}
@CordaSerializable

View File

@ -104,7 +104,7 @@ import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
// In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the
// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in.
abstract class AbstractNode(val configuration: NodeConfiguration,
val platformClock: Clock,
val platformClock: CordaClock,
protected val versionInfo: VersionInfo,
protected val cordappLoader: CordappLoader,
private val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() {
@ -217,14 +217,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, info, identityService, networkMapCache)
val notaryService = makeNotaryService(nodeServices, database)
val smm = makeStateMachineManager(database)
val flowStarter = FlowStarterImpl(serverThread, smm)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
val flowStarter = FlowStarterImpl(serverThread, smm, flowLogicRefFactory)
val schedulerService = NodeSchedulerService(
platformClock,
database,
flowStarter,
transactionStorage,
unfinishedSchedules = busyNodeLatch,
serverThread = serverThread)
serverThread = serverThread,
flowLogicRefFactory = flowLogicRefFactory)
if (serverThread is ExecutorService) {
runOnStop += {
// We wait here, even though any in-flight messages should have been drained away because the
@ -233,7 +235,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS)
}
}
makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService)
makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService, flowLogicRefFactory)
val rpcOps = makeRPCOps(flowStarter, database, smm)
startMessagingService(rpcOps)
installCoreFlows()
@ -241,7 +243,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
tokenizableServices = nodeServices + cordaServices + schedulerService
registerCordappFlows(smm)
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader
startShell(rpcOps)
Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
}
@ -558,10 +559,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
protected open fun makeTransactionStorage(database: CordaPersistence): WritableTransactionStorage = DBTransactionStorage()
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration, smm: StateMachineManager, schemaService: SchemaService) {
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration, smm: StateMachineManager, schemaService: SchemaService, flowLogicRefFactory: FlowLogicRefFactory) {
VaultSoftLockManager.install(services.vaultService, smm)
ScheduledActivityObserver.install(services.vaultService, schedulerService)
ScheduledActivityObserver.install(services.vaultService, schedulerService, flowLogicRefFactory)
HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig, schemaService)
}
@ -820,10 +820,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
internal class FlowStarterImpl(private val serverThread: AffinityExecutor, private val smm: StateMachineManager) : FlowStarter {
internal class FlowStarterImpl(private val serverThread: AffinityExecutor, private val smm: StateMachineManager, private val flowLogicRefFactory: FlowLogicRefFactory) : FlowStarter {
override fun <T> startFlow(logic: FlowLogic<T>, context: InvocationContext): CordaFuture<FlowStateMachine<T>> {
return serverThread.fetchFrom { smm.startFlow(logic, context) }
}
override fun <T> invokeFlowAsync(
logicType: Class<out FlowLogic<T>>,
context: InvocationContext,
vararg args: Any?): CordaFuture<FlowStateMachine<T>> {
val logicRef = flowLogicRefFactory.createForRPC(logicType, *args)
val logic: FlowLogic<T> = uncheckedCast(flowLogicRefFactory.toFlowLogic(logicRef))
return startFlow(logic, context)
}
}
class ConfigurationException(message: String) : CordaException(message)

View File

@ -22,10 +22,15 @@ abstract class CordaClock : Clock(), SerializeAsToken {
override fun getZone(): ZoneId = delegateClock.zone
@Deprecated("Do not use this. Instead seek to use ZonedDateTime methods.", level = DeprecationLevel.ERROR)
override fun withZone(zone: ZoneId) = throw UnsupportedOperationException("Tokenized clock does not support withZone()")
/** This is an observer on the mutation count of this [Clock], which reflects the occurrence of mutations. */
abstract val mutations: Observable<Long>
}
@ThreadSafe
class SimpleClock(override val delegateClock: Clock) : CordaClock()
class SimpleClock(override val delegateClock: Clock) : CordaClock() {
override val mutations: Observable<Long> = Observable.never()
}
/**
* An abstract class with helper methods for a type of Clock that might have it's concept of "now" adjusted externally.
@ -38,8 +43,7 @@ abstract class MutableClock(private var _delegateClock: Clock) : CordaClock() {
_delegateClock = clock
}
private val _version = AtomicLong(0L)
/** This is an observer on the mutation count of this [Clock], which reflects the occurence of mutations. */
val mutations: Observable<Long> by lazy {
override val mutations: Observable<Long> by lazy {
Observable.create { subscriber: Subscriber<in Long> ->
if (!subscriber.isUnsubscribed) {
mutationObservers.add(subscriber)

View File

@ -2,7 +2,6 @@ package net.corda.node.internal
import com.codahale.metrics.JmxReporter
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.AuthServiceId
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.internal.uncheckedCast
@ -67,7 +66,7 @@ open class Node(configuration: NodeConfiguration,
exitProcess(1)
}
private fun createClock(configuration: NodeConfiguration): Clock {
private fun createClock(configuration: NodeConfiguration): CordaClock {
return (if (configuration.useTestClock) ::DemoClock else ::SimpleClock)(Clock.systemUTC())
}

View File

@ -6,7 +6,6 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.node.NodeInfo
@ -21,7 +20,6 @@ import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.internal.cordapp.CordappProviderInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -137,11 +135,7 @@ interface FlowStarter {
fun <T> invokeFlowAsync(
logicType: Class<out FlowLogic<T>>,
context: InvocationContext,
vararg args: Any?): CordaFuture<FlowStateMachine<T>> {
val logicRef = FlowLogicRefFactoryImpl.createForRPC(logicType, *args)
val logic: FlowLogic<T> = uncheckedCast(FlowLogicRefFactoryImpl.toFlowLogic(logicRef))
return startFlow(logic, context)
}
vararg args: Any?): CordaFuture<FlowStateMachine<T>>
}
interface StartedNodeServices : ServiceHubInternal, FlowStarter

View File

@ -10,6 +10,7 @@ import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.flatMap
@ -19,10 +20,10 @@ import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.trace
import net.corda.node.internal.CordaClock
import net.corda.node.internal.MutableClock
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.SchedulerService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -55,13 +56,14 @@ import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture
* activity. Only replace this for unit testing purposes. This is not the executor the [FlowLogic] is launched on.
*/
@ThreadSafe
class NodeSchedulerService(private val clock: Clock,
class NodeSchedulerService(private val clock: CordaClock,
private val database: CordaPersistence,
private val flowStarter: FlowStarter,
private val stateLoader: StateLoader,
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(),
private val unfinishedSchedules: ReusableLatch = ReusableLatch(),
private val serverThread: AffinityExecutor)
private val serverThread: AffinityExecutor,
private val flowLogicRefFactory: FlowLogicRefFactory)
: SchedulerService, SingletonSerializeAsToken() {
companion object {
@ -78,16 +80,12 @@ class NodeSchedulerService(private val clock: Clock,
@Suspendable
@VisibleForTesting
// We specify full classpath on SettableFuture to differentiate it from the Quasar class of the same name
fun awaitWithDeadline(clock: Clock, deadline: Instant, future: Future<*> = GuavaSettableFuture.create<Any>()): Boolean {
fun awaitWithDeadline(clock: CordaClock, deadline: Instant, future: Future<*> = GuavaSettableFuture.create<Any>()): Boolean {
var nanos: Long
do {
val originalFutureCompleted = makeStrandFriendlySettableFuture(future)
val subscription = if (clock is MutableClock) {
clock.mutations.first().subscribe {
originalFutureCompleted.set(false)
}
} else {
null
val subscription = clock.mutations.first().subscribe {
originalFutureCompleted.set(false)
}
nanos = (clock.instant() until deadline).toNanos()
if (nanos > 0) {
@ -102,7 +100,7 @@ class NodeSchedulerService(private val clock: Clock,
// No need to take action as will fall out of the loop due to future.isDone
}
}
subscription?.unsubscribe()
subscription.unsubscribe()
originalFutureCompleted.cancel(false)
} while (nanos > 0 && !future.isDone)
return future.isDone
@ -279,7 +277,7 @@ class NodeSchedulerService(private val clock: Clock,
scheduledStatesQueue.remove(scheduledState)
scheduledStatesQueue.add(newState)
} else {
val flowLogic = FlowLogicRefFactoryImpl.toFlowLogic(scheduledActivity.logicRef)
val flowLogic = flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef)
log.trace { "Scheduler starting FlowLogic $flowLogic" }
scheduledFlow = flowLogic
scheduledStates.remove(scheduledState.ref)
@ -297,7 +295,7 @@ class NodeSchedulerService(private val clock: Clock,
val state = txState.data as SchedulableState
return try {
// This can throw as running contract code.
state.nextScheduledActivity(scheduledState.ref, FlowLogicRefFactoryImpl)
state.nextScheduledActivity(scheduledState.ref, flowLogicRefFactory)
} catch (e: Exception) {
log.error("Attempt to run scheduled state $scheduledState resulted in error.", e)
null

View File

@ -4,19 +4,19 @@ import net.corda.core.contracts.ContractState
import net.corda.core.contracts.SchedulableState
import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.contracts.StateAndRef
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.node.services.VaultService
import net.corda.node.services.api.SchedulerService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
/**
* This observes the vault and schedules and unschedules activities appropriately based on state production and
* consumption.
*/
class ScheduledActivityObserver private constructor(private val schedulerService: SchedulerService) {
class ScheduledActivityObserver private constructor(private val schedulerService: SchedulerService, private val FlowLogicRefFactory: FlowLogicRefFactory) {
companion object {
@JvmStatic
fun install(vaultService: VaultService, schedulerService: SchedulerService) {
val observer = ScheduledActivityObserver(schedulerService)
fun install(vaultService: VaultService, schedulerService: SchedulerService, flowLogicRefFactory: FlowLogicRefFactory) {
val observer = ScheduledActivityObserver(schedulerService, flowLogicRefFactory)
vaultService.rawUpdates.subscribe { (consumed, produced) ->
consumed.forEach { schedulerService.unscheduleStateActivity(it.ref) }
produced.forEach { observer.scheduleStateActivity(it) }
@ -32,7 +32,7 @@ class ScheduledActivityObserver private constructor(private val schedulerService
private fun scheduleStateActivity(produced: StateAndRef<ContractState>) {
val producedState = produced.state.data
if (producedState is SchedulableState) {
val scheduledAt = sandbox { producedState.nextScheduledActivity(produced.ref, FlowLogicRefFactoryImpl)?.scheduledAt } ?: return
val scheduledAt = sandbox { producedState.nextScheduledActivity(produced.ref, FlowLogicRefFactory)?.scheduledAt } ?: return
schedulerService.scheduleStateActivity(ScheduledStateRef(produced.ref, scheduledAt))
}
}

View File

@ -31,10 +31,8 @@ data class FlowLogicRefImpl internal constructor(val flowLogicClassName: String,
* measure we might want the ability for the node admin to blacklist a flow such that it moves immediately to the "Flow Hospital"
* in response to a potential malicious use or buggy update to an app etc.
*/
object FlowLogicRefFactoryImpl : SingletonSerializeAsToken(), FlowLogicRefFactory {
// TODO: Replace with a per app classloader/cordapp provider/cordapp loader - this will do for now
var classloader: ClassLoader = javaClass.classLoader
// TODO: Replace with a per app classloader/cordapp provider/cordapp loader - this will do for now
class FlowLogicRefFactoryImpl(private val classloader: ClassLoader) : SingletonSerializeAsToken(), FlowLogicRefFactory {
override fun create(flowClass: Class<out FlowLogic<*>>, vararg args: Any?): FlowLogicRef {
if (!flowClass.isAnnotationPresent(SchedulableFlow::class.java)) {
throw IllegalFlowLogicException(flowClass, "because it's not a schedulable flow")
@ -42,7 +40,7 @@ object FlowLogicRefFactoryImpl : SingletonSerializeAsToken(), FlowLogicRefFactor
return createForRPC(flowClass, *args)
}
fun createForRPC(flowClass: Class<out FlowLogic<*>>, vararg args: Any?): FlowLogicRef {
override fun createForRPC(flowClass: Class<out FlowLogic<*>>, vararg args: Any?): FlowLogicRef {
// TODO: This is used via RPC but it's probably better if we pass in argument names and values explicitly
// to avoid requiring only a single constructor.
val argTypes = args.map { it?.javaClass }
@ -81,7 +79,7 @@ object FlowLogicRefFactoryImpl : SingletonSerializeAsToken(), FlowLogicRefFactor
return FlowLogicRefImpl(type.name, args)
}
fun toFlowLogic(ref: FlowLogicRef): FlowLogic<*> {
override fun toFlowLogic(ref: FlowLogicRef): FlowLogic<*> {
if (ref !is FlowLogicRefImpl) throw IllegalFlowLogicException(ref.javaClass, "FlowLogicRef was not created via correct FlowLogicRefFactory interface")
val klass = Class.forName(ref.flowLogicClassName, true, classloader).asSubclass(FlowLogic::class.java)
return createConstructor(klass, ref.args)()

View File

@ -48,13 +48,15 @@ public class FlowLogicRefFromJavaTest {
}
}
private final FlowLogicRefFactoryImpl flowLogicRefFactory = new FlowLogicRefFactoryImpl(FlowLogicRefFactoryImpl.class.getClassLoader());
@Test
public void test() {
FlowLogicRefFactoryImpl.INSTANCE.createForRPC(JavaFlowLogic.class, new ParamType1(1), new ParamType2("Hello Jack"));
flowLogicRefFactory.createForRPC(JavaFlowLogic.class, new ParamType1(1), new ParamType2("Hello Jack"));
}
@Test
public void testNoArg() {
FlowLogicRefFactoryImpl.INSTANCE.createForRPC(JavaNoArgFlowLogic.class);
flowLogicRefFactory.createForRPC(JavaNoArgFlowLogic.class);
}
}

View File

@ -34,47 +34,48 @@ class FlowLogicRefTest {
override fun call() = Unit
}
private val flowLogicRefFactory = FlowLogicRefFactoryImpl(FlowLogicRefFactoryImpl::class.java.classLoader)
@Test
fun `create kotlin no arg`() {
FlowLogicRefFactoryImpl.createForRPC(KotlinNoArgFlowLogic::class.java)
flowLogicRefFactory.createForRPC(KotlinNoArgFlowLogic::class.java)
}
@Test
fun `create kotlin`() {
val args = mapOf(Pair("A", ParamType1(1)), Pair("b", ParamType2("Hello Jack")))
FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, args)
flowLogicRefFactory.createKotlin(KotlinFlowLogic::class.java, args)
}
@Test
fun `create primary`() {
FlowLogicRefFactoryImpl.createForRPC(KotlinFlowLogic::class.java, ParamType1(1), ParamType2("Hello Jack"))
flowLogicRefFactory.createForRPC(KotlinFlowLogic::class.java, ParamType1(1), ParamType2("Hello Jack"))
}
@Test
fun `create kotlin void`() {
FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, emptyMap())
flowLogicRefFactory.createKotlin(KotlinFlowLogic::class.java, emptyMap())
}
@Test
fun `create kotlin non primary`() {
val args = mapOf(Pair("C", ParamType2("Hello Jack")))
FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, args)
flowLogicRefFactory.createKotlin(KotlinFlowLogic::class.java, args)
}
@Test
fun `create java primitive no registration required`() {
val args = mapOf(Pair("primitive", "A string"))
FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, args)
flowLogicRefFactory.createKotlin(KotlinFlowLogic::class.java, args)
}
@Test
fun `create kotlin primitive no registration required`() {
val args = mapOf(Pair("kotlinType", 3))
FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, args)
flowLogicRefFactory.createKotlin(KotlinFlowLogic::class.java, args)
}
@Test(expected = IllegalFlowLogicException::class)
fun `create for non-schedulable flow logic`() {
FlowLogicRefFactoryImpl.create(NonSchedulableFlow::class.java)
flowLogicRefFactory.create(NonSchedulableFlow::class.java)
}
}

View File

@ -63,6 +63,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
private val flowLogicRefFactory = FlowLogicRefFactoryImpl(FlowLogicRefFactoryImpl::class.java.classLoader)
private val realClock: Clock = Clock.systemUTC()
private val stoppedClock: Clock = Clock.fixed(realClock.instant(), realClock.zone)
private val testClock = TestClock(stoppedClock)
@ -121,7 +122,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
}
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
mockSMM = StateMachineManagerImpl(services, DBCheckpointStorage(), smmExecutor, database)
scheduler = NodeSchedulerService(testClock, database, FlowStarterImpl(smmExecutor, mockSMM), validatedTransactions, schedulerGatedExecutor, serverThread = smmExecutor)
scheduler = NodeSchedulerService(testClock, database, FlowStarterImpl(smmExecutor, mockSMM, flowLogicRefFactory), validatedTransactions, schedulerGatedExecutor, serverThread = smmExecutor, flowLogicRefFactory = flowLogicRefFactory)
mockSMM.changes.subscribe { change ->
if (change is StateMachineManager.Change.Removed && mockSMM.allStateMachines.isEmpty()) {
smmHasRemovedAllFlows.countDown()
@ -304,7 +305,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
database.transaction {
apply {
val freshKey = kms.freshKey()
val state = TestState(FlowLogicRefFactoryImpl.createForRPC(TestFlowLogic::class.java, increment), instant, DUMMY_IDENTITY_1.party)
val state = TestState(flowLogicRefFactory.createForRPC(TestFlowLogic::class.java, increment), instant, DUMMY_IDENTITY_1.party)
val builder = TransactionBuilder(null).apply {
addOutputState(state, DummyContract.PROGRAM_ID, DUMMY_NOTARY)
addCommand(Command(), freshKey)

View File

@ -8,6 +8,8 @@ import com.google.common.util.concurrent.SettableFuture
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.hours
import net.corda.core.utilities.minutes
import net.corda.node.internal.CordaClock
import net.corda.node.internal.SimpleClock
import net.corda.node.services.events.NodeSchedulerService
import net.corda.testing.node.TestClock
import org.junit.After
@ -25,13 +27,13 @@ import kotlin.test.fail
class ClockUtilsTest {
lateinit var realClock: Clock
lateinit var stoppedClock: Clock
lateinit var stoppedClock: CordaClock
lateinit var executor: ExecutorService
@Before
fun setup() {
realClock = Clock.systemUTC()
stoppedClock = Clock.fixed(realClock.instant(), realClock.zone)
stoppedClock = SimpleClock(Clock.fixed(realClock.instant(), realClock.zone))
executor = Executors.newSingleThreadExecutor()
}