diff --git a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt index 782d3ee35f..eba602a47e 100644 --- a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt +++ b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt @@ -119,7 +119,7 @@ class ContractUpgradeFlowTest { return startRpcClient( rpcAddress = startRpcServer( rpcUser = user, - ops = CordaRPCOpsImpl(node.services, node.smm, node.database) + ops = CordaRPCOpsImpl(node.services, node.smm, node.database, node.services) ).get().broker.hostAndPort!!, username = user.username, password = user.password diff --git a/core/src/test/kotlin/net/corda/core/flows/FinalityFlowTests.kt b/core/src/test/kotlin/net/corda/core/flows/FinalityFlowTests.kt index bdc76e1d83..eec08b4c6b 100644 --- a/core/src/test/kotlin/net/corda/core/flows/FinalityFlowTests.kt +++ b/core/src/test/kotlin/net/corda/core/flows/FinalityFlowTests.kt @@ -6,7 +6,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.finance.POUNDS import net.corda.finance.contracts.asset.Cash import net.corda.finance.issuedBy -import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.api.StartedNodeServices import net.corda.testing.* import net.corda.testing.node.MockNetwork import org.junit.After @@ -17,8 +17,8 @@ import kotlin.test.assertFailsWith class FinalityFlowTests { private lateinit var mockNet: MockNetwork - private lateinit var aliceServices: ServiceHubInternal - private lateinit var bobServices: ServiceHubInternal + private lateinit var aliceServices: StartedNodeServices + private lateinit var bobServices: StartedNodeServices private lateinit var alice: Party private lateinit var bob: Party private lateinit var notary: Party diff --git a/docs/source/example-code/src/test/kotlin/net/corda/docs/CustomVaultQueryTest.kt b/docs/source/example-code/src/test/kotlin/net/corda/docs/CustomVaultQueryTest.kt index 1ec43e7271..509cb65312 100644 --- a/docs/source/example-code/src/test/kotlin/net/corda/docs/CustomVaultQueryTest.kt +++ b/docs/source/example-code/src/test/kotlin/net/corda/docs/CustomVaultQueryTest.kt @@ -32,7 +32,7 @@ class CustomVaultQueryTest { nodeA = mockNet.createPartyNode() nodeB = mockNet.createPartyNode() nodeA.internals.registerInitiatedFlow(TopupIssuerFlow.TopupIssuer::class.java) - nodeA.internals.installCordaService(CustomVaultQuery.Service::class.java) + nodeA.installCordaService(CustomVaultQuery.Service::class.java) notary = nodeA.services.getDefaultNotary() } diff --git a/docs/source/example-code/src/test/kotlin/net/corda/docs/WorkflowTransactionBuildTutorialTest.kt b/docs/source/example-code/src/test/kotlin/net/corda/docs/WorkflowTransactionBuildTutorialTest.kt index f7b99ead79..833f4c140d 100644 --- a/docs/source/example-code/src/test/kotlin/net/corda/docs/WorkflowTransactionBuildTutorialTest.kt +++ b/docs/source/example-code/src/test/kotlin/net/corda/docs/WorkflowTransactionBuildTutorialTest.kt @@ -9,7 +9,7 @@ import net.corda.core.node.services.queryBy import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.toFuture import net.corda.core.utilities.getOrThrow -import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.api.StartedNodeServices import net.corda.testing.* import net.corda.testing.node.MockNetwork import org.junit.After @@ -19,8 +19,8 @@ import kotlin.test.assertEquals class WorkflowTransactionBuildTutorialTest { lateinit var mockNet: MockNetwork - lateinit var aliceServices: ServiceHubInternal - lateinit var bobServices: ServiceHubInternal + lateinit var aliceServices: StartedNodeServices + lateinit var bobServices: StartedNodeServices lateinit var alice: Party lateinit var bob: Party diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 781d8f1663..148e630def 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -78,6 +78,7 @@ import java.security.cert.CertificateFactory import java.security.cert.X509Certificate import java.sql.Connection import java.time.Clock +import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit.SECONDS @@ -108,7 +109,7 @@ abstract class AbstractNode(config: NodeConfiguration, private class StartedNodeImpl( override val internals: N, - override val services: ServiceHubInternalImpl, + services: ServiceHubInternalImpl, override val info: NodeInfo, override val checkpointStorage: CheckpointStorage, override val smm: StateMachineManager, @@ -116,8 +117,11 @@ abstract class AbstractNode(config: NodeConfiguration, override val inNodeNetworkMapService: NetworkMapService, override val network: MessagingService, override val database: CordaPersistence, - override val rpcOps: CordaRPCOps) : StartedNode - + override val rpcOps: CordaRPCOps, + flowStarter: FlowStarter, + internal val schedulerService: NodeSchedulerService) : StartedNode { + override val services: StartedNodeServices = object : StartedNodeServices, ServiceHubInternal by services, FlowStarter by flowStarter {} + } // TODO: Persist this, as well as whether the node is registered. /** * Sequence number of changes sent to the network map service, when registering/de-registering this node. @@ -167,8 +171,8 @@ abstract class AbstractNode(config: NodeConfiguration, @Volatile private var _started: StartedNode? = null /** The implementation of the [CordaRPCOps] interface used by this node. */ - open fun makeRPCOps(): CordaRPCOps { - return CordaRPCOpsImpl(services, smm, database) + open fun makeRPCOps(flowStarter: FlowStarter): CordaRPCOps { + return CordaRPCOpsImpl(services, smm, database, flowStarter) } private fun saveOwnNodeInfo() { @@ -190,7 +194,8 @@ abstract class AbstractNode(config: NodeConfiguration, log.info("Generating nodeInfo ...") val schemaService = makeSchemaService() initialiseDatabasePersistence(schemaService) { - makeServices(schemaService) + val transactionStorage = makeTransactionStorage() + makeServices(schemaService, transactionStorage, StateLoaderImpl(transactionStorage)) saveOwnNodeInfo() } } @@ -202,7 +207,9 @@ abstract class AbstractNode(config: NodeConfiguration, val schemaService = makeSchemaService() // Do all of this in a database transaction so anything that might need a connection has one. val startedImpl = initialiseDatabasePersistence(schemaService) { - val tokenizableServices = makeServices(schemaService) + val transactionStorage = makeTransactionStorage() + val stateLoader = StateLoaderImpl(transactionStorage) + val tokenizableServices = makeServices(schemaService, transactionStorage, stateLoader) saveOwnNodeInfo() smm = StateMachineManager(services, checkpointStorage, @@ -210,9 +217,10 @@ abstract class AbstractNode(config: NodeConfiguration, database, busyNodeLatch, cordappLoader.appClassLoader) - + val flowStarter = FlowStarterImpl(serverThread, smm) + val schedulerService = NodeSchedulerService(platformClock, this@AbstractNode.database, flowStarter, stateLoader, unfinishedSchedules = busyNodeLatch, serverThread = serverThread) smm.tokenizableServices.addAll(tokenizableServices) - + smm.tokenizableServices.add(schedulerService) if (serverThread is ExecutorService) { runOnStop += { // We wait here, even though any in-flight messages should have been drained away because the @@ -221,20 +229,17 @@ abstract class AbstractNode(config: NodeConfiguration, MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS) } } - - makeVaultObservers() - - val rpcOps = makeRPCOps() + makeVaultObservers(schedulerService) + val rpcOps = makeRPCOps(flowStarter) startMessagingService(rpcOps) installCoreFlows() - - installCordaServices() + installCordaServices(flowStarter) registerCordappFlows() _services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows } FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader runOnStop += network::stop - StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, inNodeNetworkMapService, network, database, rpcOps) + StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, inNodeNetworkMapService, network, database, rpcOps, flowStarter, schedulerService) } // If we successfully loaded network data from database, we set this future to Unit. _nodeReadyFuture.captureLater(registerWithNetworkMapIfConfigured()) @@ -243,7 +248,7 @@ abstract class AbstractNode(config: NodeConfiguration, smm.start() // Shut down the SMM so no Fibers are scheduled. runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) } - services.schedulerService.start() + schedulerService.start() } _started = this } @@ -251,11 +256,11 @@ abstract class AbstractNode(config: NodeConfiguration, private class ServiceInstantiationException(cause: Throwable?) : CordaException("Service Instantiation Error", cause) - private fun installCordaServices() { + private fun installCordaServices(flowStarter: FlowStarter) { val loadedServices = cordappLoader.cordapps.flatMap { it.services } filterServicesToInstall(loadedServices).forEach { try { - installCordaService(it) + installCordaService(flowStarter, it) } catch (e: NoSuchMethodException) { log.error("${it.name}, as a Corda service, must have a constructor with a single parameter of type " + ServiceHub::class.java.name) @@ -288,7 +293,7 @@ abstract class AbstractNode(config: NodeConfiguration, /** * This customizes the ServiceHub for each CordaService that is initiating flows */ - private class AppServiceHubImpl(val serviceHub: ServiceHubInternal) : AppServiceHub, ServiceHub by serviceHub { + private class AppServiceHubImpl(private val serviceHub: ServiceHub, private val flowStarter: FlowStarter) : AppServiceHub, ServiceHub by serviceHub { lateinit var serviceInstance: T override fun startTrackedFlow(flow: FlowLogic): FlowProgressHandle { val stateMachine = startFlowChecked(flow) @@ -308,34 +313,24 @@ abstract class AbstractNode(config: NodeConfiguration, val logicType = flow.javaClass require(logicType.isAnnotationPresent(StartableByService::class.java)) { "${logicType.name} was not designed for starting by a CordaService" } val currentUser = FlowInitiator.Service(serviceInstance.javaClass.name) - return serviceHub.startFlow(flow, currentUser) + return flowStarter.startFlow(flow, currentUser) } override fun equals(other: Any?): Boolean { if (this === other) return true if (other !is AppServiceHubImpl<*>) return false - - if (serviceHub != other.serviceHub) return false - if (serviceInstance != other.serviceInstance) return false - - return true + return serviceHub == other.serviceHub + && flowStarter == other.flowStarter + && serviceInstance == other.serviceInstance } - override fun hashCode(): Int { - var result = serviceHub.hashCode() - result = 31 * result + serviceInstance.hashCode() - return result - } + override fun hashCode() = Objects.hash(serviceHub, flowStarter, serviceInstance) } - /** - * Use this method to install your Corda services in your tests. This is automatically done by the node when it - * starts up for all classes it finds which are annotated with [CordaService]. - */ - fun installCordaService(serviceClass: Class): T { + internal fun installCordaService(flowStarter: FlowStarter, serviceClass: Class): T { serviceClass.requireAnnotation() val service = try { - val serviceContext = AppServiceHubImpl(services) + val serviceContext = AppServiceHubImpl(services, flowStarter) if (isNotaryService(serviceClass)) { check(myNotaryIdentity != null) { "Trying to install a notary service but no notary identity specified" } val constructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java, PublicKey::class.java).apply { isAccessible = true } @@ -466,19 +461,18 @@ abstract class AbstractNode(config: NodeConfiguration, * Builds node internal, advertised, and plugin services. * Returns a list of tokenizable services to be added to the serialisation context. */ - private fun makeServices(schemaService: SchemaService): MutableList { + private fun makeServices(schemaService: SchemaService, transactionStorage: WritableTransactionStorage, stateLoader: StateLoader): MutableList { checkpointStorage = DBCheckpointStorage() - val transactionStorage = makeTransactionStorage() val metrics = MetricRegistry() attachments = NodeAttachmentService(metrics) val cordappProvider = CordappProviderImpl(cordappLoader, attachments) - _services = ServiceHubInternalImpl(schemaService, transactionStorage, StateLoaderImpl(transactionStorage), MonitoringService(metrics), cordappProvider) + _services = ServiceHubInternalImpl(schemaService, transactionStorage, stateLoader, MonitoringService(metrics), cordappProvider) legalIdentity = obtainIdentity(notaryConfig = null) network = makeMessagingService(legalIdentity) info = makeInfo(legalIdentity) val networkMapCache = services.networkMapCache val tokenizableServices = mutableListOf(attachments, network, services.vaultService, - services.keyManagementService, services.identityService, platformClock, services.schedulerService, + services.keyManagementService, services.identityService, platformClock, services.auditService, services.monitoringService, networkMapCache, services.schemaService, services.transactionVerifierService, services.validatedTransactions, services.contractUpgradeService, services, cordappProvider, this) @@ -488,9 +482,9 @@ abstract class AbstractNode(config: NodeConfiguration, protected open fun makeTransactionStorage(): WritableTransactionStorage = DBTransactionStorage() - private fun makeVaultObservers() { + private fun makeVaultObservers(schedulerService: SchedulerService) { VaultSoftLockManager.install(services.vaultService, smm) - ScheduledActivityObserver.install(services.vaultService, services.schedulerService) + ScheduledActivityObserver.install(services.vaultService, schedulerService) HibernateObserver.install(services.vaultService.rawUpdates, database.hibernateConfig) } @@ -788,7 +782,6 @@ abstract class AbstractNode(config: NodeConfiguration, // the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with // the identity key. But the infrastructure to make that easy isn't here yet. override val keyManagementService by lazy { makeKeyManagementService(identityService) } - override val schedulerService by lazy { NodeSchedulerService(this, unfinishedSchedules = busyNodeLatch, serverThread = serverThread) } override val identityService by lazy { val trustStore = KeyStoreWrapper(configuration.trustStoreFile, configuration.trustStorePassword) val caKeyStore = KeyStoreWrapper(configuration.nodeKeystore, configuration.keyStorePassword) @@ -808,10 +801,6 @@ abstract class AbstractNode(config: NodeConfiguration, return cordappServices.getInstance(type) ?: throw IllegalArgumentException("Corda service ${type.name} does not exist") } - override fun startFlow(logic: FlowLogic, flowInitiator: FlowInitiator, ourIdentity: Party?): FlowStateMachineImpl { - return serverThread.fetchFrom { smm.add(logic, flowInitiator, ourIdentity) } - } - override fun getFlowFactory(initiatingFlowClass: Class>): InitiatedFlowFactory<*>? { return flowFactories[initiatingFlowClass] } @@ -825,3 +814,9 @@ abstract class AbstractNode(config: NodeConfiguration, override fun jdbcSession(): Connection = database.createSession() } } + +internal class FlowStarterImpl(private val serverThread: AffinityExecutor, private val smm: StateMachineManager) : FlowStarter { + override fun startFlow(logic: FlowLogic, flowInitiator: FlowInitiator, ourIdentity: Party?): FlowStateMachineImpl { + return serverThread.fetchFrom { smm.add(logic, flowInitiator, ourIdentity) } + } +} diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index ee685466d3..af827b9e39 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -19,6 +19,7 @@ import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.Sort import net.corda.core.transactions.SignedTransaction import net.corda.node.services.FlowPermissions.Companion.startFlowPermission +import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.messaging.getRpcContext import net.corda.node.services.messaging.requirePermission @@ -37,7 +38,8 @@ import java.time.Instant class CordaRPCOpsImpl( private val services: ServiceHubInternal, private val smm: StateMachineManager, - private val database: CordaPersistence + private val database: CordaPersistence, + private val flowStarter: FlowStarter ) : CordaRPCOps { override fun networkMapSnapshot(): List { val (snapshot, updates) = networkMapFeed() @@ -150,7 +152,7 @@ class CordaRPCOpsImpl( rpcContext.requirePermission(startFlowPermission(logicType)) val currentUser = FlowInitiator.RPC(rpcContext.currentUser.username) // TODO RPC flows should have mapping user -> identity that should be resolved automatically on starting flow. - return services.invokeFlowAsync(logicType, currentUser, *args) + return flowStarter.invokeFlowAsync(logicType, currentUser, *args) } override fun attachmentExists(id: SecureHash): Boolean { diff --git a/node/src/main/kotlin/net/corda/node/internal/StartedNode.kt b/node/src/main/kotlin/net/corda/node/internal/StartedNode.kt index e0320d9c62..2d0645cef4 100644 --- a/node/src/main/kotlin/net/corda/node/internal/StartedNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/StartedNode.kt @@ -7,9 +7,11 @@ import net.corda.core.flows.FlowLogic import net.corda.core.messaging.CordaRPCOps import net.corda.core.node.NodeInfo import net.corda.core.node.StateLoader +import net.corda.core.node.services.CordaService import net.corda.core.node.services.TransactionStorage +import net.corda.core.serialization.SerializeAsToken import net.corda.node.services.api.CheckpointStorage -import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.api.StartedNodeServices import net.corda.node.services.messaging.MessagingService import net.corda.node.services.network.NetworkMapService import net.corda.node.services.persistence.NodeAttachmentService @@ -18,7 +20,7 @@ import net.corda.node.utilities.CordaPersistence interface StartedNode { val internals: N - val services: ServiceHubInternal + val services: StartedNodeServices val info: NodeInfo val checkpointStorage: CheckpointStorage val smm: StateMachineManager @@ -29,6 +31,11 @@ interface StartedNode { val rpcOps: CordaRPCOps fun dispose() = internals.stop() fun > registerInitiatedFlow(initiatedFlowClass: Class) = internals.registerInitiatedFlow(initiatedFlowClass) + /** + * Use this method to install your Corda services in your tests. This is automatically done by the node when it + * starts up for all classes it finds which are annotated with [CordaService]. + */ + fun installCordaService(serviceClass: Class) = internals.installCordaService(services, serviceClass) } class StateLoaderImpl(private val validatedTransactions: TransactionStorage) : StateLoader { diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 5fdb7993a0..36f9f1fada 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -84,7 +84,6 @@ interface ServiceHubInternal : ServiceHub { val monitoringService: MonitoringService val schemaService: SchemaService override val networkMapCache: NetworkMapCacheInternal - val schedulerService: SchedulerService val auditService: AuditService val rpcFlows: List>> val networkService: MessagingService @@ -109,6 +108,10 @@ interface ServiceHubInternal : ServiceHub { } } + fun getFlowFactory(initiatingFlowClass: Class>): InitiatedFlowFactory<*>? +} + +interface FlowStarter { /** * Starts an already constructed flow. Note that you must be on the server thread to call this method. [FlowInitiator] * defaults to [FlowInitiator.RPC] with username "Only For Testing". @@ -138,10 +141,9 @@ interface ServiceHubInternal : ServiceHub { val logic: FlowLogic = uncheckedCast(FlowLogicRefFactoryImpl.toFlowLogic(logicRef)) return startFlow(logic, flowInitiator, ourIdentity = null) } - - fun getFlowFactory(initiatingFlowClass: Class>): InitiatedFlowFactory<*>? } +interface StartedNodeServices : ServiceHubInternal, FlowStarter /** * Thread-safe storage of transactions. */ diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index 26a8322b80..a0c208a53e 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -14,15 +14,17 @@ import net.corda.core.flows.FlowLogic import net.corda.core.internal.ThreadBox import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.until +import net.corda.core.node.StateLoader import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace import net.corda.node.internal.MutableClock +import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.SchedulerService -import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl import net.corda.node.utilities.AffinityExecutor +import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.NODE_DATABASE_PREFIX import net.corda.node.utilities.PersistentMap import org.apache.activemq.artemis.utils.ReusableLatch @@ -47,12 +49,14 @@ import javax.persistence.Entity * in the nodes, maybe we can consider multiple activities and whether the activities have been completed or not, * but that starts to sound a lot like off-ledger state. * - * @param services Core node services. * @param schedulerTimerExecutor The executor the scheduler blocks on waiting for the clock to advance to the next * activity. Only replace this for unit testing purposes. This is not the executor the [FlowLogic] is launched on. */ @ThreadSafe -class NodeSchedulerService(private val services: ServiceHubInternal, +class NodeSchedulerService(private val clock: Clock, + private val database: CordaPersistence, + private val flowStarter: FlowStarter, + private val stateLoader: StateLoader, private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(), private val unfinishedSchedules: ReusableLatch = ReusableLatch(), private val serverThread: AffinityExecutor) @@ -108,8 +112,8 @@ class NodeSchedulerService(private val services: ServiceHubInternal, toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) }, fromPersistentEntity = { //TODO null check will become obsolete after making DB/JPA columns not nullable - var txId = it.output.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId") - var index = it.output.index ?: throw IllegalStateException("DB returned null SecureHash index") + val txId = it.output.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId") + val index = it.output.index ?: throw IllegalStateException("DB returned null SecureHash index") Pair(StateRef(SecureHash.parse(txId), index), ScheduledStateRef(StateRef(SecureHash.parse(txId), index), it.scheduledAt)) }, @@ -172,7 +176,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal, mutex.locked { val previousState = scheduledStates[action.ref] scheduledStates[action.ref] = action - var previousEarliest = scheduledStatesQueue.peek() + val previousEarliest = scheduledStatesQueue.peek() scheduledStatesQueue.remove(previousState) scheduledStatesQueue.add(action) if (previousState == null) { @@ -223,7 +227,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal, log.trace { "Scheduling as next $scheduledState" } // This will block the scheduler single thread until the scheduled time (returns false) OR // the Future is cancelled due to rescheduling (returns true). - if (!awaitWithDeadline(services.clock, scheduledState.scheduledAt, ourRescheduledFuture)) { + if (!awaitWithDeadline(clock, scheduledState.scheduledAt, ourRescheduledFuture)) { log.trace { "Invoking as next $scheduledState" } onTimeReached(scheduledState) } else { @@ -237,11 +241,11 @@ class NodeSchedulerService(private val services: ServiceHubInternal, serverThread.execute { var flowName: String? = "(unknown)" try { - services.database.transaction { + database.transaction { val scheduledFlow = getScheduledFlow(scheduledState) if (scheduledFlow != null) { flowName = scheduledFlow.javaClass.name - val future = services.startFlow(scheduledFlow, FlowInitiator.Scheduled(scheduledState)).resultFuture + val future = flowStarter.startFlow(scheduledFlow, FlowInitiator.Scheduled(scheduledState)).resultFuture future.then { unfinishedSchedules.countDown() } @@ -265,9 +269,9 @@ class NodeSchedulerService(private val services: ServiceHubInternal, unfinishedSchedules.countDown() scheduledStates.remove(scheduledState.ref) scheduledStatesQueue.remove(scheduledState) - } else if (scheduledActivity.scheduledAt.isAfter(services.clock.instant())) { + } else if (scheduledActivity.scheduledAt.isAfter(clock.instant())) { log.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.") - var newState = ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt) + val newState = ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt) scheduledStates[scheduledState.ref] = newState scheduledStatesQueue.remove(scheduledState) scheduledStatesQueue.add(newState) @@ -286,7 +290,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal, } private fun getScheduledActivity(scheduledState: ScheduledStateRef): ScheduledActivity? { - val txState = services.loadState(scheduledState.ref) + val txState = stateLoader.loadState(scheduledState.ref) val state = txState.data as SchedulableState return try { // This can throw as running contract code. diff --git a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt index 0020970296..ec69a89fe5 100644 --- a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt @@ -65,7 +65,7 @@ class CordaRPCOpsImplTest { mockNet = MockNetwork(cordappPackages = listOf("net.corda.finance.contracts.asset")) aliceNode = mockNet.createNode() notaryNode = mockNet.createNotaryNode(validating = false) - rpc = CordaRPCOpsImpl(aliceNode.services, aliceNode.smm, aliceNode.database) + rpc = CordaRPCOpsImpl(aliceNode.services, aliceNode.smm, aliceNode.database, aliceNode.services) CURRENT_RPC_CONTEXT.set(RpcContext(User("user", "pwd", permissions = setOf( startFlowPermission(), startFlowPermission() diff --git a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt index d54bfa8754..2040fb562b 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt @@ -12,6 +12,7 @@ import net.corda.core.node.ServiceHub import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.days +import net.corda.node.internal.FlowStarterImpl import net.corda.node.internal.cordapp.CordappLoader import net.corda.node.internal.cordapp.CordappProviderImpl import net.corda.node.services.api.VaultServiceInternal @@ -100,8 +101,8 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() { override val cordappProvider = CordappProviderImpl(CordappLoader.createWithTestPackages(listOf("net.corda.testing.contracts")), attachments) } smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1) - scheduler = NodeSchedulerService(services, schedulerGatedExecutor, serverThread = smmExecutor) val mockSMM = StateMachineManager(services, DBCheckpointStorage(), smmExecutor, database) + scheduler = NodeSchedulerService(testClock, database, FlowStarterImpl(smmExecutor, mockSMM), services.stateLoader, schedulerGatedExecutor, serverThread = smmExecutor) mockSMM.changes.subscribe { change -> if (change is StateMachineManager.Change.Removed && mockSMM.allStateMachines.isEmpty()) { smmHasRemovedAllFlows.countDown() diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt index ec8155a18e..29ade9ec4f 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/NotaryServiceTests.kt @@ -13,7 +13,7 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds -import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.api.StartedNodeServices import net.corda.testing.* import net.corda.testing.contracts.DummyContract import net.corda.testing.node.MockNetwork @@ -28,8 +28,8 @@ import kotlin.test.assertFailsWith class NotaryServiceTests { lateinit var mockNet: MockNetwork - lateinit var notaryServices: ServiceHubInternal - lateinit var aliceServices: ServiceHubInternal + lateinit var notaryServices: StartedNodeServices + lateinit var aliceServices: StartedNodeServices lateinit var notary: Party lateinit var alice: Party diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt index 3700848e47..6f3dd1ac66 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt @@ -13,7 +13,7 @@ import net.corda.core.node.ServiceHub import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow -import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.api.StartedNodeServices import net.corda.node.services.issueInvalidState import net.corda.testing.* import net.corda.testing.contracts.DummyContract @@ -28,8 +28,8 @@ import kotlin.test.assertFailsWith class ValidatingNotaryServiceTests { lateinit var mockNet: MockNetwork - lateinit var notaryServices: ServiceHubInternal - lateinit var aliceServices: ServiceHubInternal + lateinit var notaryServices: StartedNodeServices + lateinit var aliceServices: StartedNodeServices lateinit var notary: Party lateinit var alice: Party diff --git a/samples/irs-demo/src/test/kotlin/net/corda/irs/api/NodeInterestRatesTest.kt b/samples/irs-demo/src/test/kotlin/net/corda/irs/api/NodeInterestRatesTest.kt index d43192cdf8..3953c876b0 100644 --- a/samples/irs-demo/src/test/kotlin/net/corda/irs/api/NodeInterestRatesTest.kt +++ b/samples/irs-demo/src/test/kotlin/net/corda/irs/api/NodeInterestRatesTest.kt @@ -206,7 +206,7 @@ class NodeInterestRatesTest : TestDependencyInjectionBase() { internals.registerInitiatedFlow(NodeInterestRates.FixQueryHandler::class.java) internals.registerInitiatedFlow(NodeInterestRates.FixSignHandler::class.java) database.transaction { - internals.installCordaService(NodeInterestRates.Oracle::class.java).knownFixes = TEST_DATA + installCordaService(NodeInterestRates.Oracle::class.java).knownFixes = TEST_DATA } } val tx = makePartialTX() diff --git a/testing/node-driver/src/main/kotlin/net/corda/node/testing/MockServiceHubInternal.kt b/testing/node-driver/src/main/kotlin/net/corda/node/testing/MockServiceHubInternal.kt index 1188e1571e..3c4631376e 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/node/testing/MockServiceHubInternal.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/node/testing/MockServiceHubInternal.kt @@ -1,9 +1,7 @@ package net.corda.node.testing import com.codahale.metrics.MetricRegistry -import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowLogic -import net.corda.core.identity.Party import net.corda.core.node.NodeInfo import net.corda.core.node.StateLoader import net.corda.core.node.services.* @@ -17,7 +15,6 @@ import net.corda.node.serialization.NodeClock import net.corda.node.services.api.* import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.messaging.MessagingService -import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.node.utilities.CordaPersistence @@ -43,12 +40,11 @@ open class MockServiceHubInternal( override val validatedTransactions: WritableTransactionStorage = MockTransactionStorage(), override val stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage = MockStateMachineRecordedTransactionMappingStorage(), val mapCache: NetworkMapCacheInternal? = null, - val scheduler: SchedulerService? = null, val overrideClock: Clock? = NodeClock(), val customContractUpgradeService: ContractUpgradeService? = null, val customTransactionVerifierService: TransactionVerifierService? = InMemoryTransactionVerifierService(2), override val cordappProvider: CordappProviderInternal = CordappProviderImpl(CordappLoader.createDefault(Paths.get(".")), attachments), - protected val stateLoader: StateLoaderImpl = StateLoaderImpl(validatedTransactions) + val stateLoader: StateLoaderImpl = StateLoaderImpl(validatedTransactions) ) : ServiceHubInternal, StateLoader by stateLoader { override val transactionVerifierService: TransactionVerifierService get() = customTransactionVerifierService ?: throw UnsupportedOperationException() @@ -64,8 +60,6 @@ open class MockServiceHubInternal( get() = network ?: throw UnsupportedOperationException() override val networkMapCache: NetworkMapCacheInternal get() = mapCache ?: MockNetworkMapCache(this) - override val schedulerService: SchedulerService - get() = scheduler ?: throw UnsupportedOperationException() override val clock: Clock get() = overrideClock ?: throw UnsupportedOperationException() override val myInfo: NodeInfo @@ -79,11 +73,6 @@ open class MockServiceHubInternal( lateinit var smm: StateMachineManager override fun cordaService(type: Class): T = throw UnsupportedOperationException() - - override fun startFlow(logic: FlowLogic, flowInitiator: FlowInitiator, ourIdentity: Party?): FlowStateMachineImpl { - return smm.executor.fetchFrom { smm.add(logic, flowInitiator, ourIdentity) } - } - override fun getFlowFactory(initiatingFlowClass: Class>): InitiatedFlowFactory<*>? = null override fun jdbcSession(): Connection = database.createSession()