diff --git a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt index e772ad8618..8ddb260ed0 100644 --- a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt @@ -58,6 +58,7 @@ interface ServiceHub { /** * Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the flow. + * Note that you must be on the server thread to call this method. * * @throws IllegalFlowLogicException or IllegalArgumentException if there are problems with the [logicType] or [args]. */ diff --git a/finance/src/test/kotlin/net/corda/flows/IssuerFlowTest.kt b/finance/src/test/kotlin/net/corda/flows/IssuerFlowTest.kt index 6ee80dddb0..5ee35635cc 100644 --- a/finance/src/test/kotlin/net/corda/flows/IssuerFlowTest.kt +++ b/finance/src/test/kotlin/net/corda/flows/IssuerFlowTest.kt @@ -1,9 +1,6 @@ package net.corda.flows import com.google.common.util.concurrent.ListenableFuture -import net.corda.testing.BOC -import net.corda.testing.BOC_KEY -import net.corda.flows.IssuerFlow.IssuanceRequester import net.corda.core.contracts.Amount import net.corda.core.contracts.DOLLARS import net.corda.core.contracts.PartyAndReference @@ -14,10 +11,8 @@ import net.corda.core.serialization.OpaqueBytes import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY_KEY -import net.corda.testing.MEGA_CORP -import net.corda.testing.MEGA_CORP_KEY -import net.corda.testing.initiateSingleShotFlow -import net.corda.testing.ledger +import net.corda.flows.IssuerFlow.IssuanceRequester +import net.corda.testing.* import net.corda.testing.node.MockNetwork import org.junit.Test import java.util.* @@ -62,7 +57,7 @@ class IssuerFlowTest { }.map { it.fsm } val issueRequest = IssuanceRequester(amount, issueToPartyAndRef.party, issueToPartyAndRef.reference, bankOfCordaNode.info.legalIdentity) - val issueRequestResultFuture = bankClientNode.smm.add(issueRequest).resultFuture + val issueRequestResultFuture = bankClientNode.services.startFlow(issueRequest).resultFuture return IssuerFlowTest.RunResult(issuerFuture, issueRequestResultFuture) } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 5371343682..901045648c 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -6,7 +6,6 @@ import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.MoreExecutors import com.google.common.util.concurrent.SettableFuture import net.corda.core.* -import net.corda.core.contracts.Amount import net.corda.core.crypto.Party import net.corda.core.crypto.X509Utilities import net.corda.core.flows.FlowLogic @@ -17,12 +16,14 @@ import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.* import net.corda.core.node.services.* import net.corda.core.node.services.NetworkMapCache.MapChange -import net.corda.core.serialization.OpaqueBytes import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction -import net.corda.flows.* +import net.corda.flows.CashCommand +import net.corda.flows.CashFlow +import net.corda.flows.FinalityFlow +import net.corda.flows.sendRequest import net.corda.node.api.APIServer import net.corda.node.services.api.* import net.corda.node.services.config.NodeConfiguration @@ -123,7 +124,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) override val flowLogicRefFactory: FlowLogicRefFactory get() = flowLogicFactory - override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> = smm.add(logic) + override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> { + return serverThread.fetchFrom { smm.add(logic) } + } override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) { require(markerClass !in flowFactories) { "${markerClass.java.name} has already been used to register a flow" } diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index ac32fbf048..6f22c5e81c 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -64,9 +64,7 @@ abstract class ServiceHubInternal : PluginServiceHub { } /** - * TODO: borrowing this method from service manager work in another branch. It's required to avoid circular dependency - * between SMM and the scheduler. That particular problem should also be resolved by the service manager work - * itself, at which point this method would not be needed (by the scheduler). + * Starts an already constructed flow. Note that you must be on the server thread to call this method. */ abstract fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index dfc7d22ece..ccfcae6953 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -381,8 +381,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, * Kicks off a brand new state machine of the given class. * The state machine will be persisted when it suspends, with automated restart if the StateMachineManager is * restarted with checkpointed state machines in the storage service. + * + * Note that you must be on the [executor] thread. */ fun <T> add(logic: FlowLogic<T>): FlowStateMachine<T> { + executor.checkOnThread() // We swap out the parent transaction context as using this frequently leads to a deadlock as we wait // on the flow completion future inside that context. The problem is that any progress checkpoints are // unable to acquire the table lock and move forward till the calling transaction finishes. diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeProtocolTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeProtocolTests.kt index 8eacae3048..8b4e5dbf23 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeProtocolTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeProtocolTests.kt @@ -59,7 +59,6 @@ import kotlin.test.assertTrue * We assume that Alice and Bob already found each other via some market, and have agreed the details already. */ class TwoPartyTradeFlowTests { - lateinit var net: MockNetwork lateinit var notaryNode: MockNetwork.MockNode lateinit var aliceNode: MockNetwork.MockNode @@ -418,7 +417,7 @@ class TwoPartyTradeFlowTests { Buyer(otherParty, notaryNode.info.notaryIdentity, 1000.DOLLARS, CommercialPaper.State::class.java) }.map { it.fsm } val seller = Seller(bobNode.info.legalIdentity, notaryNode.info, assetToSell, 1000.DOLLARS, ALICE_KEY) - val sellerResultFuture = aliceNode.smm.add(seller).resultFuture + val sellerResultFuture = aliceNode.services.startFlow(seller).resultFuture return RunResult(buyerFuture, sellerResultFuture, seller.fsm.id) } diff --git a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt index 85550fd72a..0309eaa766 100644 --- a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt +++ b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt @@ -79,7 +79,9 @@ open class MockServiceHubInternal( override fun recordTransactions(txs: Iterable<SignedTransaction>) = recordTransactionsInternal(txStorageService, txs) - override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> = smm.add(logic) + override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> { + return smm.executor.fetchFrom { smm.add(logic) } + } override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) { flowFactories[markerClass.java] = flowFactory diff --git a/node/src/test/kotlin/net/corda/node/services/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/NodeSchedulerServiceTest.kt index f4b0546321..1ed483629e 100644 --- a/node/src/test/kotlin/net/corda/node/services/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/NodeSchedulerServiceTest.kt @@ -38,7 +38,6 @@ import java.util.concurrent.TimeUnit import kotlin.test.assertTrue class NodeSchedulerServiceTest : SingletonSerializeAsToken() { - val realClock: Clock = Clock.systemUTC() val stoppedClock = Clock.fixed(realClock.instant(), realClock.zone) val testClock = TestClock(stoppedClock) diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt index d7090810e9..d66c4f93b9 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt @@ -39,7 +39,6 @@ import kotlin.test.assertEquals import kotlin.test.assertTrue class StateMachineManagerTests { - private val net = MockNetwork(servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin()) private val sessionTransfers = ArrayList<SessionTransfer>() private lateinit var node1: MockNode @@ -69,7 +68,7 @@ class StateMachineManagerTests { @Test fun `newly added flow is preserved on restart`() { - node1.smm.add(NoOpFlow(nonTerminating = true)) + node1.services.startFlow(NoOpFlow(nonTerminating = true)) node1.acceptableLiveFiberCountOnStop = 1 val restoredFlow = node1.restartAndGetRestoredFlow<NoOpFlow>() assertThat(restoredFlow.flowStarted).isTrue() @@ -82,7 +81,7 @@ class StateMachineManagerTests { @Suspendable override fun call() = Unit } - node1.smm.add(flow) + node1.services.startFlow(flow) assertThat(flow.lazyTime).isNotNull() } @@ -90,7 +89,7 @@ class StateMachineManagerTests { fun `flow restarted just after receiving payload`() { node2.services.registerFlowInitiator(SendFlow::class) { ReceiveThenSuspendFlow(it) } val payload = random63BitValue() - node1.smm.add(SendFlow(payload, node2.info.legalIdentity)) + node1.services.startFlow(SendFlow(payload, node2.info.legalIdentity)) // We push through just enough messages to get only the payload sent node2.pumpReceive() @@ -106,7 +105,7 @@ class StateMachineManagerTests { fun `flow added before network map does run after init`() { val node3 = net.createNode(node1.info.address) //create vanilla node val flow = NoOpFlow() - node3.smm.add(flow) + node3.services.startFlow(flow) assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet net.runNetwork() // Allow network map messages to flow assertEquals(true, flow.flowStarted) // Now we should have run the flow @@ -116,7 +115,7 @@ class StateMachineManagerTests { fun `flow added before network map will be init checkpointed`() { var node3 = net.createNode(node1.info.address) //create vanilla node val flow = NoOpFlow() - node3.smm.add(flow) + node3.services.startFlow(flow) assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet node3.disableDBCloseOnStop() node3.stop() @@ -141,7 +140,7 @@ class StateMachineManagerTests { fun `flow loaded from checkpoint will respond to messages from before start`() { val payload = random63BitValue() node1.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { SendFlow(payload, it) } - node2.smm.add(ReceiveThenSuspendFlow(node1.info.legalIdentity)) // Prepare checkpointed receive flow + node2.services.startFlow(ReceiveThenSuspendFlow(node1.info.legalIdentity)) // Prepare checkpointed receive flow // Make sure the add() has finished initial processing. node2.smm.executor.flush() node2.disableDBCloseOnStop() @@ -163,7 +162,7 @@ class StateMachineManagerTests { net.runNetwork() // Kick off first send and receive - node2.smm.add(PingPongFlow(node3.info.legalIdentity, payload)) + node2.services.startFlow(PingPongFlow(node3.info.legalIdentity, payload)) databaseTransaction(node2.database) { assertEquals(1, node2.checkpointStorage.checkpoints().size) } @@ -207,7 +206,7 @@ class StateMachineManagerTests { node2.services.registerFlowInitiator(SendFlow::class) { ReceiveThenSuspendFlow(it) } node3.services.registerFlowInitiator(SendFlow::class) { ReceiveThenSuspendFlow(it) } val payload = random63BitValue() - node1.smm.add(SendFlow(payload, node2.info.legalIdentity, node3.info.legalIdentity)) + node1.services.startFlow(SendFlow(payload, node2.info.legalIdentity, node3.info.legalIdentity)) net.runNetwork() val node2Flow = node2.getSingleFlow<ReceiveThenSuspendFlow>().first val node3Flow = node3.getSingleFlow<ReceiveThenSuspendFlow>().first @@ -241,7 +240,7 @@ class StateMachineManagerTests { node2.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { SendFlow(node2Payload, it) } node3.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { SendFlow(node3Payload, it) } val multiReceiveFlow = ReceiveThenSuspendFlow(node2.info.legalIdentity, node3.info.legalIdentity) - node1.smm.add(multiReceiveFlow) + node1.services.startFlow(multiReceiveFlow) node1.acceptableLiveFiberCountOnStop = 1 net.runNetwork() assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(node2Payload) @@ -265,7 +264,7 @@ class StateMachineManagerTests { @Test fun `both sides do a send as their first IO request`() { node2.services.registerFlowInitiator(PingPongFlow::class) { PingPongFlow(it, 20L) } - node1.smm.add(PingPongFlow(node2.info.legalIdentity, 10L)) + node1.services.startFlow(PingPongFlow(node2.info.legalIdentity, 10L)) net.runNetwork() assertSessionTransfers( @@ -332,7 +331,7 @@ class StateMachineManagerTests { @Test fun `exception thrown on other side`() { node2.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { ExceptionFlow } - val future = node1.smm.add(ReceiveThenSuspendFlow(node2.info.legalIdentity)).resultFuture + val future = node1.services.startFlow(ReceiveThenSuspendFlow(node2.info.legalIdentity)).resultFuture net.runNetwork() assertThatThrownBy { future.getOrThrow() }.isInstanceOf(FlowSessionException::class.java) assertSessionTransfers( @@ -413,7 +412,6 @@ class StateMachineManagerTests { private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic<Unit>() { - @Transient var flowStarted = false @Suspendable @@ -427,7 +425,6 @@ class StateMachineManagerTests { private class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic<Unit>() { - init { require(otherParties.isNotEmpty()) } @@ -438,7 +435,6 @@ class StateMachineManagerTests { private class ReceiveThenSuspendFlow(vararg val otherParties: Party) : FlowLogic<Unit>() { - init { require(otherParties.isNotEmpty()) } @@ -453,7 +449,6 @@ class StateMachineManagerTests { } private class PingPongFlow(val otherParty: Party, val payload: Long) : FlowLogic<Unit>() { - @Transient var receivedPayload: Long? = null @Transient var receivedPayload2: Long? = null