Additional thread affinity for the StateMachineManager.

Check that the SMM.add method is being called on the SMM thread and throw if not. Make ServiceHubInternal.startFlow() do a blocking call onto the server thread. Update unit tests.

This resolves an issue whereby the scheduler was starting flows outside of the server thread, which isn't intended.
This commit is contained in:
Mike Hearn 2017-01-03 18:20:51 +01:00
parent 53903c6521
commit 1a53834a60
9 changed files with 30 additions and 35 deletions

View File

@ -58,6 +58,7 @@ interface ServiceHub {
/**
* Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the flow.
* Note that you must be on the server thread to call this method.
*
* @throws IllegalFlowLogicException or IllegalArgumentException if there are problems with the [logicType] or [args].
*/

View File

@ -1,9 +1,6 @@
package net.corda.flows
import com.google.common.util.concurrent.ListenableFuture
import net.corda.testing.BOC
import net.corda.testing.BOC_KEY
import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.core.contracts.Amount
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.PartyAndReference
@ -14,10 +11,8 @@ import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.testing.MEGA_CORP
import net.corda.testing.MEGA_CORP_KEY
import net.corda.testing.initiateSingleShotFlow
import net.corda.testing.ledger
import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.testing.*
import net.corda.testing.node.MockNetwork
import org.junit.Test
import java.util.*
@ -62,7 +57,7 @@ class IssuerFlowTest {
}.map { it.fsm }
val issueRequest = IssuanceRequester(amount, issueToPartyAndRef.party, issueToPartyAndRef.reference, bankOfCordaNode.info.legalIdentity)
val issueRequestResultFuture = bankClientNode.smm.add(issueRequest).resultFuture
val issueRequestResultFuture = bankClientNode.services.startFlow(issueRequest).resultFuture
return IssuerFlowTest.RunResult(issuerFuture, issueRequestResultFuture)
}

View File

@ -6,7 +6,6 @@ import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.*
import net.corda.core.contracts.Amount
import net.corda.core.crypto.Party
import net.corda.core.crypto.X509Utilities
import net.corda.core.flows.FlowLogic
@ -17,12 +16,14 @@ import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.*
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.FinalityFlow
import net.corda.flows.sendRequest
import net.corda.node.api.APIServer
import net.corda.node.services.api.*
import net.corda.node.services.config.NodeConfiguration
@ -123,7 +124,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val flowLogicRefFactory: FlowLogicRefFactory get() = flowLogicFactory
override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> = smm.add(logic)
override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> {
return serverThread.fetchFrom { smm.add(logic) }
}
override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) {
require(markerClass !in flowFactories) { "${markerClass.java.name} has already been used to register a flow" }

View File

@ -64,9 +64,7 @@ abstract class ServiceHubInternal : PluginServiceHub {
}
/**
* TODO: borrowing this method from service manager work in another branch. It's required to avoid circular dependency
* between SMM and the scheduler. That particular problem should also be resolved by the service manager work
* itself, at which point this method would not be needed (by the scheduler).
* Starts an already constructed flow. Note that you must be on the server thread to call this method.
*/
abstract fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T>

View File

@ -381,8 +381,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
* Kicks off a brand new state machine of the given class.
* The state machine will be persisted when it suspends, with automated restart if the StateMachineManager is
* restarted with checkpointed state machines in the storage service.
*
* Note that you must be on the [executor] thread.
*/
fun <T> add(logic: FlowLogic<T>): FlowStateMachine<T> {
executor.checkOnThread()
// We swap out the parent transaction context as using this frequently leads to a deadlock as we wait
// on the flow completion future inside that context. The problem is that any progress checkpoints are
// unable to acquire the table lock and move forward till the calling transaction finishes.

View File

@ -59,7 +59,6 @@ import kotlin.test.assertTrue
* We assume that Alice and Bob already found each other via some market, and have agreed the details already.
*/
class TwoPartyTradeFlowTests {
lateinit var net: MockNetwork
lateinit var notaryNode: MockNetwork.MockNode
lateinit var aliceNode: MockNetwork.MockNode
@ -418,7 +417,7 @@ class TwoPartyTradeFlowTests {
Buyer(otherParty, notaryNode.info.notaryIdentity, 1000.DOLLARS, CommercialPaper.State::class.java)
}.map { it.fsm }
val seller = Seller(bobNode.info.legalIdentity, notaryNode.info, assetToSell, 1000.DOLLARS, ALICE_KEY)
val sellerResultFuture = aliceNode.smm.add(seller).resultFuture
val sellerResultFuture = aliceNode.services.startFlow(seller).resultFuture
return RunResult(buyerFuture, sellerResultFuture, seller.fsm.id)
}

View File

@ -79,7 +79,9 @@ open class MockServiceHubInternal(
override fun recordTransactions(txs: Iterable<SignedTransaction>) = recordTransactionsInternal(txStorageService, txs)
override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> = smm.add(logic)
override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> {
return smm.executor.fetchFrom { smm.add(logic) }
}
override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) {
flowFactories[markerClass.java] = flowFactory

View File

@ -38,7 +38,6 @@ import java.util.concurrent.TimeUnit
import kotlin.test.assertTrue
class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
val realClock: Clock = Clock.systemUTC()
val stoppedClock = Clock.fixed(realClock.instant(), realClock.zone)
val testClock = TestClock(stoppedClock)

View File

@ -39,7 +39,6 @@ import kotlin.test.assertEquals
import kotlin.test.assertTrue
class StateMachineManagerTests {
private val net = MockNetwork(servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin())
private val sessionTransfers = ArrayList<SessionTransfer>()
private lateinit var node1: MockNode
@ -69,7 +68,7 @@ class StateMachineManagerTests {
@Test
fun `newly added flow is preserved on restart`() {
node1.smm.add(NoOpFlow(nonTerminating = true))
node1.services.startFlow(NoOpFlow(nonTerminating = true))
node1.acceptableLiveFiberCountOnStop = 1
val restoredFlow = node1.restartAndGetRestoredFlow<NoOpFlow>()
assertThat(restoredFlow.flowStarted).isTrue()
@ -82,7 +81,7 @@ class StateMachineManagerTests {
@Suspendable
override fun call() = Unit
}
node1.smm.add(flow)
node1.services.startFlow(flow)
assertThat(flow.lazyTime).isNotNull()
}
@ -90,7 +89,7 @@ class StateMachineManagerTests {
fun `flow restarted just after receiving payload`() {
node2.services.registerFlowInitiator(SendFlow::class) { ReceiveThenSuspendFlow(it) }
val payload = random63BitValue()
node1.smm.add(SendFlow(payload, node2.info.legalIdentity))
node1.services.startFlow(SendFlow(payload, node2.info.legalIdentity))
// We push through just enough messages to get only the payload sent
node2.pumpReceive()
@ -106,7 +105,7 @@ class StateMachineManagerTests {
fun `flow added before network map does run after init`() {
val node3 = net.createNode(node1.info.address) //create vanilla node
val flow = NoOpFlow()
node3.smm.add(flow)
node3.services.startFlow(flow)
assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet
net.runNetwork() // Allow network map messages to flow
assertEquals(true, flow.flowStarted) // Now we should have run the flow
@ -116,7 +115,7 @@ class StateMachineManagerTests {
fun `flow added before network map will be init checkpointed`() {
var node3 = net.createNode(node1.info.address) //create vanilla node
val flow = NoOpFlow()
node3.smm.add(flow)
node3.services.startFlow(flow)
assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet
node3.disableDBCloseOnStop()
node3.stop()
@ -141,7 +140,7 @@ class StateMachineManagerTests {
fun `flow loaded from checkpoint will respond to messages from before start`() {
val payload = random63BitValue()
node1.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { SendFlow(payload, it) }
node2.smm.add(ReceiveThenSuspendFlow(node1.info.legalIdentity)) // Prepare checkpointed receive flow
node2.services.startFlow(ReceiveThenSuspendFlow(node1.info.legalIdentity)) // Prepare checkpointed receive flow
// Make sure the add() has finished initial processing.
node2.smm.executor.flush()
node2.disableDBCloseOnStop()
@ -163,7 +162,7 @@ class StateMachineManagerTests {
net.runNetwork()
// Kick off first send and receive
node2.smm.add(PingPongFlow(node3.info.legalIdentity, payload))
node2.services.startFlow(PingPongFlow(node3.info.legalIdentity, payload))
databaseTransaction(node2.database) {
assertEquals(1, node2.checkpointStorage.checkpoints().size)
}
@ -207,7 +206,7 @@ class StateMachineManagerTests {
node2.services.registerFlowInitiator(SendFlow::class) { ReceiveThenSuspendFlow(it) }
node3.services.registerFlowInitiator(SendFlow::class) { ReceiveThenSuspendFlow(it) }
val payload = random63BitValue()
node1.smm.add(SendFlow(payload, node2.info.legalIdentity, node3.info.legalIdentity))
node1.services.startFlow(SendFlow(payload, node2.info.legalIdentity, node3.info.legalIdentity))
net.runNetwork()
val node2Flow = node2.getSingleFlow<ReceiveThenSuspendFlow>().first
val node3Flow = node3.getSingleFlow<ReceiveThenSuspendFlow>().first
@ -241,7 +240,7 @@ class StateMachineManagerTests {
node2.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { SendFlow(node2Payload, it) }
node3.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { SendFlow(node3Payload, it) }
val multiReceiveFlow = ReceiveThenSuspendFlow(node2.info.legalIdentity, node3.info.legalIdentity)
node1.smm.add(multiReceiveFlow)
node1.services.startFlow(multiReceiveFlow)
node1.acceptableLiveFiberCountOnStop = 1
net.runNetwork()
assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(node2Payload)
@ -265,7 +264,7 @@ class StateMachineManagerTests {
@Test
fun `both sides do a send as their first IO request`() {
node2.services.registerFlowInitiator(PingPongFlow::class) { PingPongFlow(it, 20L) }
node1.smm.add(PingPongFlow(node2.info.legalIdentity, 10L))
node1.services.startFlow(PingPongFlow(node2.info.legalIdentity, 10L))
net.runNetwork()
assertSessionTransfers(
@ -332,7 +331,7 @@ class StateMachineManagerTests {
@Test
fun `exception thrown on other side`() {
node2.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { ExceptionFlow }
val future = node1.smm.add(ReceiveThenSuspendFlow(node2.info.legalIdentity)).resultFuture
val future = node1.services.startFlow(ReceiveThenSuspendFlow(node2.info.legalIdentity)).resultFuture
net.runNetwork()
assertThatThrownBy { future.getOrThrow() }.isInstanceOf(FlowSessionException::class.java)
assertSessionTransfers(
@ -413,7 +412,6 @@ class StateMachineManagerTests {
private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic<Unit>() {
@Transient var flowStarted = false
@Suspendable
@ -427,7 +425,6 @@ class StateMachineManagerTests {
private class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic<Unit>() {
init {
require(otherParties.isNotEmpty())
}
@ -438,7 +435,6 @@ class StateMachineManagerTests {
private class ReceiveThenSuspendFlow(vararg val otherParties: Party) : FlowLogic<Unit>() {
init {
require(otherParties.isNotEmpty())
}
@ -453,7 +449,6 @@ class StateMachineManagerTests {
}
private class PingPongFlow(val otherParty: Party, val payload: Long) : FlowLogic<Unit>() {
@Transient var receivedPayload: Long? = null
@Transient var receivedPayload2: Long? = null