Merge pull request #106 from corda/mike-thread-affinity-for-smm

Additional thread affinity for the StateMachineManager.
This commit is contained in:
Mike Hearn 2017-01-05 11:03:35 +01:00 committed by GitHub
commit 875efbfa11
9 changed files with 30 additions and 35 deletions

View File

@ -58,6 +58,7 @@ interface ServiceHub {
/**
* Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the flow.
* Note that you must be on the server thread to call this method.
*
* @throws IllegalFlowLogicException or IllegalArgumentException if there are problems with the [logicType] or [args].
*/

View File

@ -1,9 +1,6 @@
package net.corda.flows
import com.google.common.util.concurrent.ListenableFuture
import net.corda.testing.BOC
import net.corda.testing.BOC_KEY
import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.core.contracts.Amount
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.PartyAndReference
@ -14,10 +11,8 @@ import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.testing.MEGA_CORP
import net.corda.testing.MEGA_CORP_KEY
import net.corda.testing.initiateSingleShotFlow
import net.corda.testing.ledger
import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.testing.*
import net.corda.testing.node.MockNetwork
import org.junit.Test
import java.util.*
@ -62,7 +57,7 @@ class IssuerFlowTest {
}.map { it.fsm }
val issueRequest = IssuanceRequester(amount, issueToPartyAndRef.party, issueToPartyAndRef.reference, bankOfCordaNode.info.legalIdentity)
val issueRequestResultFuture = bankClientNode.smm.add(issueRequest).resultFuture
val issueRequestResultFuture = bankClientNode.services.startFlow(issueRequest).resultFuture
return IssuerFlowTest.RunResult(issuerFuture, issueRequestResultFuture)
}

View File

@ -6,7 +6,6 @@ import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.*
import net.corda.core.contracts.Amount
import net.corda.core.crypto.Party
import net.corda.core.crypto.X509Utilities
import net.corda.core.flows.FlowLogic
@ -17,12 +16,14 @@ import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.*
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.FinalityFlow
import net.corda.flows.sendRequest
import net.corda.node.api.APIServer
import net.corda.node.services.api.*
import net.corda.node.services.config.NodeConfiguration
@ -123,7 +124,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val flowLogicRefFactory: FlowLogicRefFactory get() = flowLogicFactory
override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> = smm.add(logic)
override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> {
return serverThread.fetchFrom { smm.add(logic) }
}
override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) {
require(markerClass !in flowFactories) { "${markerClass.java.name} has already been used to register a flow" }

View File

@ -64,9 +64,7 @@ abstract class ServiceHubInternal : PluginServiceHub {
}
/**
* TODO: borrowing this method from service manager work in another branch. It's required to avoid circular dependency
* between SMM and the scheduler. That particular problem should also be resolved by the service manager work
* itself, at which point this method would not be needed (by the scheduler).
* Starts an already constructed flow. Note that you must be on the server thread to call this method.
*/
abstract fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T>

View File

@ -381,8 +381,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
* Kicks off a brand new state machine of the given class.
* The state machine will be persisted when it suspends, with automated restart if the StateMachineManager is
* restarted with checkpointed state machines in the storage service.
*
* Note that you must be on the [executor] thread.
*/
fun <T> add(logic: FlowLogic<T>): FlowStateMachine<T> {
executor.checkOnThread()
// We swap out the parent transaction context as using this frequently leads to a deadlock as we wait
// on the flow completion future inside that context. The problem is that any progress checkpoints are
// unable to acquire the table lock and move forward till the calling transaction finishes.

View File

@ -59,7 +59,6 @@ import kotlin.test.assertTrue
* We assume that Alice and Bob already found each other via some market, and have agreed the details already.
*/
class TwoPartyTradeFlowTests {
lateinit var net: MockNetwork
lateinit var notaryNode: MockNetwork.MockNode
lateinit var aliceNode: MockNetwork.MockNode
@ -418,7 +417,7 @@ class TwoPartyTradeFlowTests {
Buyer(otherParty, notaryNode.info.notaryIdentity, 1000.DOLLARS, CommercialPaper.State::class.java)
}.map { it.fsm }
val seller = Seller(bobNode.info.legalIdentity, notaryNode.info, assetToSell, 1000.DOLLARS, ALICE_KEY)
val sellerResultFuture = aliceNode.smm.add(seller).resultFuture
val sellerResultFuture = aliceNode.services.startFlow(seller).resultFuture
return RunResult(buyerFuture, sellerResultFuture, seller.fsm.id)
}

View File

@ -79,7 +79,9 @@ open class MockServiceHubInternal(
override fun recordTransactions(txs: Iterable<SignedTransaction>) = recordTransactionsInternal(txStorageService, txs)
override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> = smm.add(logic)
override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> {
return smm.executor.fetchFrom { smm.add(logic) }
}
override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) {
flowFactories[markerClass.java] = flowFactory

View File

@ -38,7 +38,6 @@ import java.util.concurrent.TimeUnit
import kotlin.test.assertTrue
class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
val realClock: Clock = Clock.systemUTC()
val stoppedClock = Clock.fixed(realClock.instant(), realClock.zone)
val testClock = TestClock(stoppedClock)

View File

@ -39,7 +39,6 @@ import kotlin.test.assertEquals
import kotlin.test.assertTrue
class StateMachineManagerTests {
private val net = MockNetwork(servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin())
private val sessionTransfers = ArrayList<SessionTransfer>()
private lateinit var node1: MockNode
@ -69,7 +68,7 @@ class StateMachineManagerTests {
@Test
fun `newly added flow is preserved on restart`() {
node1.smm.add(NoOpFlow(nonTerminating = true))
node1.services.startFlow(NoOpFlow(nonTerminating = true))
node1.acceptableLiveFiberCountOnStop = 1
val restoredFlow = node1.restartAndGetRestoredFlow<NoOpFlow>()
assertThat(restoredFlow.flowStarted).isTrue()
@ -82,7 +81,7 @@ class StateMachineManagerTests {
@Suspendable
override fun call() = Unit
}
node1.smm.add(flow)
node1.services.startFlow(flow)
assertThat(flow.lazyTime).isNotNull()
}
@ -90,7 +89,7 @@ class StateMachineManagerTests {
fun `flow restarted just after receiving payload`() {
node2.services.registerFlowInitiator(SendFlow::class) { ReceiveThenSuspendFlow(it) }
val payload = random63BitValue()
node1.smm.add(SendFlow(payload, node2.info.legalIdentity))
node1.services.startFlow(SendFlow(payload, node2.info.legalIdentity))
// We push through just enough messages to get only the payload sent
node2.pumpReceive()
@ -106,7 +105,7 @@ class StateMachineManagerTests {
fun `flow added before network map does run after init`() {
val node3 = net.createNode(node1.info.address) //create vanilla node
val flow = NoOpFlow()
node3.smm.add(flow)
node3.services.startFlow(flow)
assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet
net.runNetwork() // Allow network map messages to flow
assertEquals(true, flow.flowStarted) // Now we should have run the flow
@ -116,7 +115,7 @@ class StateMachineManagerTests {
fun `flow added before network map will be init checkpointed`() {
var node3 = net.createNode(node1.info.address) //create vanilla node
val flow = NoOpFlow()
node3.smm.add(flow)
node3.services.startFlow(flow)
assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet
node3.disableDBCloseOnStop()
node3.stop()
@ -141,7 +140,7 @@ class StateMachineManagerTests {
fun `flow loaded from checkpoint will respond to messages from before start`() {
val payload = random63BitValue()
node1.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { SendFlow(payload, it) }
node2.smm.add(ReceiveThenSuspendFlow(node1.info.legalIdentity)) // Prepare checkpointed receive flow
node2.services.startFlow(ReceiveThenSuspendFlow(node1.info.legalIdentity)) // Prepare checkpointed receive flow
// Make sure the add() has finished initial processing.
node2.smm.executor.flush()
node2.disableDBCloseOnStop()
@ -163,7 +162,7 @@ class StateMachineManagerTests {
net.runNetwork()
// Kick off first send and receive
node2.smm.add(PingPongFlow(node3.info.legalIdentity, payload))
node2.services.startFlow(PingPongFlow(node3.info.legalIdentity, payload))
databaseTransaction(node2.database) {
assertEquals(1, node2.checkpointStorage.checkpoints().size)
}
@ -207,7 +206,7 @@ class StateMachineManagerTests {
node2.services.registerFlowInitiator(SendFlow::class) { ReceiveThenSuspendFlow(it) }
node3.services.registerFlowInitiator(SendFlow::class) { ReceiveThenSuspendFlow(it) }
val payload = random63BitValue()
node1.smm.add(SendFlow(payload, node2.info.legalIdentity, node3.info.legalIdentity))
node1.services.startFlow(SendFlow(payload, node2.info.legalIdentity, node3.info.legalIdentity))
net.runNetwork()
val node2Flow = node2.getSingleFlow<ReceiveThenSuspendFlow>().first
val node3Flow = node3.getSingleFlow<ReceiveThenSuspendFlow>().first
@ -241,7 +240,7 @@ class StateMachineManagerTests {
node2.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { SendFlow(node2Payload, it) }
node3.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { SendFlow(node3Payload, it) }
val multiReceiveFlow = ReceiveThenSuspendFlow(node2.info.legalIdentity, node3.info.legalIdentity)
node1.smm.add(multiReceiveFlow)
node1.services.startFlow(multiReceiveFlow)
node1.acceptableLiveFiberCountOnStop = 1
net.runNetwork()
assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(node2Payload)
@ -265,7 +264,7 @@ class StateMachineManagerTests {
@Test
fun `both sides do a send as their first IO request`() {
node2.services.registerFlowInitiator(PingPongFlow::class) { PingPongFlow(it, 20L) }
node1.smm.add(PingPongFlow(node2.info.legalIdentity, 10L))
node1.services.startFlow(PingPongFlow(node2.info.legalIdentity, 10L))
net.runNetwork()
assertSessionTransfers(
@ -332,7 +331,7 @@ class StateMachineManagerTests {
@Test
fun `exception thrown on other side`() {
node2.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { ExceptionFlow }
val future = node1.smm.add(ReceiveThenSuspendFlow(node2.info.legalIdentity)).resultFuture
val future = node1.services.startFlow(ReceiveThenSuspendFlow(node2.info.legalIdentity)).resultFuture
net.runNetwork()
assertThatThrownBy { future.getOrThrow() }.isInstanceOf(FlowSessionException::class.java)
assertSessionTransfers(
@ -413,7 +412,6 @@ class StateMachineManagerTests {
private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic<Unit>() {
@Transient var flowStarted = false
@Suspendable
@ -427,7 +425,6 @@ class StateMachineManagerTests {
private class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic<Unit>() {
init {
require(otherParties.isNotEmpty())
}
@ -438,7 +435,6 @@ class StateMachineManagerTests {
private class ReceiveThenSuspendFlow(vararg val otherParties: Party) : FlowLogic<Unit>() {
init {
require(otherParties.isNotEmpty())
}
@ -453,7 +449,6 @@ class StateMachineManagerTests {
}
private class PingPongFlow(val otherParty: Party, val payload: Long) : FlowLogic<Unit>() {
@Transient var receivedPayload: Long? = null
@Transient var receivedPayload2: Long? = null