Improve the flow commit API.

Make FinalityFlow do more, and be used more consistently.

Add a new waitForLedgerCommit API that is intended to be used at the end of flows, or at any other point where a flow wants to wait for a transaction to finalise (but the finalisation flow is being done by someone else).

Update the docs a bit.
This commit is contained in:
Mike Hearn
2017-01-06 10:42:04 +01:00
parent f8c1996a48
commit cc20a10225
23 changed files with 374 additions and 217 deletions

View File

@ -15,7 +15,6 @@ import java.util.function.Function
import javax.annotation.concurrent.ThreadSafe
object DataVending {
class Plugin : CordaPluginRegistry() {
override val servicePlugins = listOf(Function(::Service))
}

View File

@ -1,5 +1,6 @@
package net.corda.node.services.statemachine
import net.corda.core.crypto.SecureHash
import net.corda.node.services.statemachine.StateMachineManager.FlowSession
// TODO revisit when Kotlin 1.1 is released and data classes can extend other classes
@ -7,14 +8,17 @@ interface FlowIORequest {
// This is used to identify where we suspended, in case of message mismatch errors and other things where we
// don't have the original stack trace because it's in a suspended fiber.
val stackTraceInCaseOfProblems: StackSnapshot
}
interface SessionedFlowIORequest : FlowIORequest {
val session: FlowSession
}
interface SendRequest : FlowIORequest {
interface SendRequest : SessionedFlowIORequest {
val message: SessionMessage
}
interface ReceiveRequest<T : SessionMessage> : FlowIORequest {
interface ReceiveRequest<T : SessionMessage> : SessionedFlowIORequest {
val receiveType: Class<T>
}
@ -36,4 +40,9 @@ data class SendOnly(override val session: FlowSession, override val message: Ses
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
}
data class WaitForLedgerCommit(val hash: SecureHash, val fiber: FlowStateMachineImpl<*>) : FlowIORequest {
@Transient
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
}
class StackSnapshot : Throwable("This is a stack trace to help identify the source of the underlying problem")

View File

@ -7,11 +7,13 @@ import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId
import net.corda.core.random63BitValue
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.trace
import net.corda.node.services.api.ServiceHubInternal
@ -72,7 +74,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
}
// This state IS serialised, as we need it to know what the fiber is waiting for.
internal val openSessions = HashMap<Pair<FlowLogic<*>, Party>, FlowSession>()
internal var waitingForLedgerCommitOf: SecureHash? = null
init {
logic.stateMachine = this
@ -172,6 +176,16 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
}
@Suspendable
override fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction {
waitingForLedgerCommitOf = hash
logger.info("Waiting for transaction $hash to commit")
suspend(WaitForLedgerCommit(hash, sessionFlow.stateMachine as FlowStateMachineImpl<*>))
logger.info("Transaction $hash has committed to the ledger, resuming")
val stx = serviceHub.storageService.validatedTransactions.getTransaction(hash)
return stx ?: throw IllegalStateException("We were resumed after waiting for $hash but it wasn't found in our local storage")
}
private fun createSessionData(session: FlowSession, payload: Any): SessionData {
val sessionState = session.state
val peerSessionId = when (sessionState) {
@ -266,10 +280,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Suspendable
private fun suspend(ioRequest: FlowIORequest) {
// we have to pass the Thread local Transaction across via a transient field as the Fiber Park swaps them out.
// We have to pass the thread local database transaction across via a transient field as the fiber park
// swaps them out.
txTrampoline = TransactionManager.currentOrNull()
StrandLocalTransactionManager.setThreadLocalTx(null)
ioRequest.session.waitingForResponse = (ioRequest is ReceiveRequest<*>)
if (ioRequest is SessionedFlowIORequest)
ioRequest.session.waitingForResponse = (ioRequest is ReceiveRequest<*>)
var exceptionDuringSuspend: Throwable? = null
parkAndSerialize { fiber, serializer ->

View File

@ -6,11 +6,13 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import co.paralleluniverse.strands.Strand
import com.codahale.metrics.Gauge
import com.esotericsoftware.kryo.Kryo
import com.google.common.collect.HashMultimap
import com.google.common.util.concurrent.ListenableFuture
import kotlinx.support.jdk8.collections.removeIf
import net.corda.core.ThreadBox
import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.commonName
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
@ -62,7 +64,7 @@ import javax.annotation.concurrent.ThreadSafe
* TODO: Timeouts
* TODO: Surfacing of exceptions via an API and/or management UI
* TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt
* TODO: Implement stub/skel classes that provide a basic RPC framework on top of this.
* TODO: Don't store all active flows in memory, load from the database on demand.
*/
@ThreadSafe
class StateMachineManager(val serviceHub: ServiceHubInternal,
@ -89,15 +91,17 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
// property.
private val mutex = ThreadBox(object {
private class InnerState {
var started = false
val stateMachines = LinkedHashMap<FlowStateMachineImpl<*>, Checkpoint>()
val changesPublisher = PublishSubject.create<Change>()
val changesPublisher = PublishSubject.create<Change>()!!
val fibersWaitingForLedgerCommit = HashMultimap.create<SecureHash, FlowStateMachineImpl<*>>()!!
fun notifyChangeObservers(fiber: FlowStateMachineImpl<*>, addOrRemove: AddOrRemove) {
changesPublisher.bufferUntilDatabaseCommit().onNext(Change(fiber.logic, addOrRemove, fiber.id))
}
})
}
private val mutex = ThreadBox(InnerState())
// True if we're shutting down, so don't resume anything.
@Volatile private var stopping = false
@ -152,9 +156,27 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
fun start() {
restoreFibersFromCheckpoints()
listenToLedgerTransactions()
serviceHub.networkMapCache.mapServiceRegistered.then(executor) { resumeRestoredFibers() }
}
private fun listenToLedgerTransactions() {
// Observe the stream of committed, validated transactions and resume fibers that are waiting for them.
serviceHub.storageService.validatedTransactions.updates.subscribe { stx ->
val hash = stx.id
val flows: Set<FlowStateMachineImpl<*>> = mutex.locked { fibersWaitingForLedgerCommit.removeAll(hash) }
if (flows.isNotEmpty()) {
executor.executeASAP {
for (flow in flows) {
logger.info("Resuming ${flow.id} because it was waiting for tx ${flow.waitingForLedgerCommitOf!!} which is now committed.")
flow.waitingForLedgerCommitOf = null
resumeFiber(flow)
}
}
}
}
}
private fun decrementLiveFibers() {
liveFibers.countDown()
}
@ -217,8 +239,20 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun resumeRestoredFiber(fiber: FlowStateMachineImpl<*>) {
fiber.openSessions.values.forEach { openSessions[it.ourSessionId] = it }
val waitingForHash = fiber.waitingForLedgerCommitOf
if (fiber.openSessions.values.any { it.waitingForResponse }) {
fiber.logger.info("Restored, pending on receive")
} else if (waitingForHash != null) {
val stx = databaseTransaction(database) {
serviceHub.storageService.validatedTransactions.getTransaction(waitingForHash)
}
if (stx != null) {
fiber.logger.info("Resuming fiber as tx $waitingForHash has committed.")
resumeFiber(fiber)
} else {
fiber.logger.info("Restored, pending on ledger commit of $waitingForHash")
mutex.locked { fibersWaitingForLedgerCommit.put(waitingForHash, fiber) }
}
} else {
resumeFiber(fiber)
}
@ -424,6 +458,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
* Note that you must be on the [executor] thread.
*/
fun <T> add(logic: FlowLogic<T>): FlowStateMachine<T> {
// TODO: Check that logic has @Suspendable on its call method.
executor.checkOnThread()
// We swap out the parent transaction context as using this frequently leads to a deadlock as we wait
// on the flow completion future inside that context. The problem is that any progress checkpoints are
@ -457,8 +492,10 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun resumeFiber(fiber: FlowStateMachineImpl<*>) {
// Avoid race condition when setting stopping to true and then checking liveFibers
incrementLiveFibers()
if (!stopping) executor.executeASAP {
fiber.resume(scheduler)
if (!stopping) {
executor.executeASAP {
fiber.resume(scheduler)
}
} else {
fiber.logger.debug("Not resuming as SMM is stopping.")
decrementLiveFibers()
@ -466,6 +503,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
private fun processIORequest(ioRequest: FlowIORequest) {
executor.checkOnThread()
if (ioRequest is SendRequest) {
if (ioRequest.message is SessionInit) {
openSessions[ioRequest.session.ourSessionId] = ioRequest.session
@ -475,6 +513,24 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
resumeFiber(ioRequest.session.fiber)
}
} else if (ioRequest is WaitForLedgerCommit) {
// Is it already committed?
val stx = databaseTransaction(database) {
serviceHub.storageService.validatedTransactions.getTransaction(ioRequest.hash)
}
if (stx != null) {
resumeFiber(ioRequest.fiber)
} else {
// No, then register to wait.
//
// We assume this code runs on the server thread, which is the only place transactions are committed
// currently. When we liberalise our threading somewhat, handing of wait requests will need to be
// reworked to make the wait atomic in another way. Otherwise there is a race between checking the
// database and updating the waiting list.
mutex.locked {
fibersWaitingForLedgerCommit[ioRequest.hash] += ioRequest.fiber
}
}
}
}

View File

@ -195,13 +195,6 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
}
}
/**
* Generate a transaction that moves an amount of currency to the given pubkey.
*
* @param onlyFromParties if non-null, the asset states will be filtered to only include those issued by the set
* of given parties. This can be useful if the party you're trying to pay has expectations
* about which type of asset claims they are willing to accept.
*/
override fun generateSpend(tx: TransactionBuilder,
amount: Amount<Currency>,
to: CompositeKey,

View File

@ -5,8 +5,10 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand.UncaughtExceptionHandler
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.DummyState
import net.corda.core.contracts.issuedBy
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.generateKeyPair
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
@ -19,8 +21,11 @@ import net.corda.core.random63BitValue
import net.corda.core.rootCause
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.serialization.deserialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.FinalityFlow
import net.corda.flows.NotaryFlow
import net.corda.node.services.persistence.checkpoints
import net.corda.node.services.transactions.ValidatingNotaryService
@ -483,9 +488,26 @@ class StateMachineManagerTests {
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello")
}
private inline fun <reified P : FlowLogic<*>> MockNode.restartAndGetRestoredFlow(
networkMapNode: MockNode? = null): P {
disableDBCloseOnStop() //Handover DB to new node copy
@Test
fun `wait for transaction`() {
val ptx = TransactionBuilder(notary = notary1.info.notaryIdentity)
ptx.addOutputState(DummyState())
ptx.signWith(node1.services.legalIdentityKey)
val stx = ptx.toSignedTransaction()
val future1 = node2.services.startFlow(WaitingFlows.Waiter(stx.id)).resultFuture
val future2 = node1.services.startFlow(WaitingFlows.Committer(stx, node2.info.legalIdentity)).resultFuture
net.runNetwork()
future1.getOrThrow()
future2.getOrThrow()
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////
//region Helpers
private inline fun <reified P : FlowLogic<*>> MockNode.restartAndGetRestoredFlow(networkMapNode: MockNode? = null): P {
disableDBCloseOnStop() // Handover DB to new node copy
stop()
val newNode = mockNet.createNode(networkMapNode?.info?.address, id, advertisedServices = *advertisedServices.toTypedArray())
newNode.acceptableLiveFiberCountOnStop = 1
@ -611,4 +633,22 @@ class StateMachineManagerTests {
override fun equals(other: Any?): Boolean = other is MyFlowException && other.message == this.message
override fun hashCode(): Int = message?.hashCode() ?: 31
}
private object WaitingFlows {
class Waiter(private val hash: SecureHash) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
waitForLedgerCommit(hash)
}
}
class Committer(private val stx: SignedTransaction, private val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(FinalityFlow(stx, setOf(otherParty)))
}
}
}
//endregion Helpers
}