Merge commit '640e5c60886b23068162205db82b1ae5c7c7eec8' into aslemmer-merge-640e5c60886b23068162205db82b1ae5c7c7eec8

This commit is contained in:
Andras Slemmer 2018-04-24 11:01:21 +01:00
commit 67f5bcc38e
42 changed files with 285 additions and 74 deletions

View File

@ -1599,9 +1599,9 @@ public final class net.corda.core.flows.TransactionParts extends java.lang.Objec
public String toString() public String toString()
## ##
@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.UnexpectedFlowEndException extends net.corda.core.CordaRuntimeException implements net.corda.core.flows.IdentifiableException @net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.UnexpectedFlowEndException extends net.corda.core.CordaRuntimeException implements net.corda.core.flows.IdentifiableException
public <init>(String, Throwable, long) public <init>(String)
@org.jetbrains.annotations.NotNull public Long getErrorId() public <init>(String, Throwable)
public final long getOriginalErrorId() public <init>(String, Throwable, Long)
## ##
@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.identity.AbstractParty extends java.lang.Object @net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.identity.AbstractParty extends java.lang.Object
public <init>(java.security.PublicKey) public <init>(java.security.PublicKey)

View File

@ -19,4 +19,3 @@ artifactoryPluginVersion=4.4.18
snakeYamlVersion=1.19 snakeYamlVersion=1.19
caffeineVersion=2.6.2 caffeineVersion=2.6.2
metricsVersion=3.2.5 metricsVersion=3.2.5

View File

@ -46,7 +46,9 @@ open class FlowException(message: String?, cause: Throwable?) :
* that we were not expecting), or the other side had an internal error, or the other side terminated when we * that we were not expecting), or the other side had an internal error, or the other side terminated when we
* were waiting for a response. * were waiting for a response.
*/ */
class UnexpectedFlowEndException(message: String, cause: Throwable?, val originalErrorId: Long) : class UnexpectedFlowEndException(message: String, cause: Throwable?, val originalErrorId: Long?) :
CordaRuntimeException(message, cause), IdentifiableException { CordaRuntimeException(message, cause), IdentifiableException {
override fun getErrorId(): Long = originalErrorId constructor(message: String, cause: Throwable?) : this(message, cause, null)
constructor(message: String) : this(message, null)
override fun getErrorId(): Long? = originalErrorId
} }

View File

@ -16,7 +16,10 @@ import net.corda.core.CordaInternal
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.* import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.abbreviate
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
@ -141,9 +144,98 @@ abstract class FlowLogic<out T> {
* Note: The current implementation returns the single identity of the node. This will change once multiple identities * Note: The current implementation returns the single identity of the node. This will change once multiple identities
* is implemented. * is implemented.
*/ */
val ourIdentity: Party get() = stateMachine.ourIdentity val ourIdentity: Party get() = stateMachine.ourIdentity
// Used to implement the deprecated send/receive functions using Party. When such a deprecated function is used we
// create a fresh session for the Party, put it here and use it in subsequent deprecated calls.
private val deprecatedPartySessionMap = HashMap<Party, FlowSession>()
private fun getDeprecatedSessionForParty(party: Party): FlowSession {
return deprecatedPartySessionMap.getOrPut(party) { initiateFlow(party) }
}
/**
* Returns a [FlowInfo] object describing the flow [otherParty] is using. With [FlowInfo.flowVersion] it
* provides the necessary information needed for the evolution of flows and enabling backwards compatibility.
*
* This method can be called before any send or receive has been done with [otherParty]. In such a case this will force
* them to start their flow.
*/
@Deprecated("Use FlowSession.getCounterpartyFlowInfo()", level = DeprecationLevel.WARNING)
@Suspendable
fun getFlowInfo(otherParty: Party): FlowInfo = getDeprecatedSessionForParty(otherParty).getCounterpartyFlowInfo()
/**
* Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
* is received, which must be of the given [R] type.
*
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
* corrupted data in order to exploit your code.
*
* Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
* use this when you expect to do a message swap than do use [send] and then [receive] in turn.
*
* @return an [UntrustworthyData] wrapper around the received object.
*/
@Deprecated("Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING)
inline fun <reified R : Any> sendAndReceive(otherParty: Party, payload: Any): UntrustworthyData<R> {
return sendAndReceive(R::class.java, otherParty, payload)
}
/**
* Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
* is received, which must be of the given [receiveType]. Remember that when receiving data from other parties the data
* should not be trusted until it's been thoroughly verified for consistency and that all expectations are
* satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
*
* Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
* use this when you expect to do a message swap than do use [send] and then [receive] in turn.
*
* @return an [UntrustworthyData] wrapper around the received object.
*/
@Deprecated("Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING)
@Suspendable
open fun <R : Any> sendAndReceive(receiveType: Class<R>, otherParty: Party, payload: Any): UntrustworthyData<R> {
return getDeprecatedSessionForParty(otherParty).sendAndReceive(receiveType, payload)
}
/**
* Suspends until the specified [otherParty] sends us a message of type [R].
*
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
* corrupted data in order to exploit your code.
*/
@Deprecated("Use FlowSession.receive()", level = DeprecationLevel.WARNING)
inline fun <reified R : Any> receive(otherParty: Party): UntrustworthyData<R> = receive(R::class.java, otherParty)
/**
* Suspends until the specified [otherParty] sends us a message of type [receiveType].
*
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
* corrupted data in order to exploit your code.
*
* @return an [UntrustworthyData] wrapper around the received object.
*/
@Deprecated("Use FlowSession.receive()", level = DeprecationLevel.WARNING)
@Suspendable
open fun <R : Any> receive(receiveType: Class<R>, otherParty: Party): UntrustworthyData<R> {
return getDeprecatedSessionForParty(otherParty).receive(receiveType)
}
/**
* Queues the given [payload] for sending to the [otherParty] and continues without suspending.
*
* Note that the other party may receive the message at some arbitrary later point or not at all: if [otherParty]
* is offline then message delivery will be retried until it comes back or until the message is older than the
* network's event horizon time.
*/
@Deprecated("Use FlowSession.send()", level = DeprecationLevel.WARNING)
@Suspendable
open fun send(otherParty: Party, payload: Any) {
getDeprecatedSessionForParty(otherParty).send(payload)
}
@Suspendable @Suspendable
internal fun <R : Any> FlowSession.sendAndReceiveWithRetry(receiveType: Class<R>, payload: Any): UntrustworthyData<R> { internal fun <R : Any> FlowSession.sendAndReceiveWithRetry(receiveType: Class<R>, payload: Any): UntrustworthyData<R> {
val request = FlowIORequest.SendAndReceive( val request = FlowIORequest.SendAndReceive(
@ -403,4 +495,4 @@ data class FlowInfo(
* to deduplicate it from other releases of the same CorDapp, typically a version string. See the * to deduplicate it from other releases of the same CorDapp, typically a version string. See the
* [CorDapp JAR format](https://docs.corda.net/cordapp-build-systems.html#cordapp-jar-format) for more details. * [CorDapp JAR format](https://docs.corda.net/cordapp-build-systems.html#cordapp-jar-format) for more details.
*/ */
val appName: String) val appName: String)

View File

@ -30,4 +30,4 @@ interface FlowAsyncOperation<R : Any> {
fun <T, R : Any> FlowLogic<T>.executeAsync(operation: FlowAsyncOperation<R>, maySkipCheckpoint: Boolean = false): R { fun <T, R : Any> FlowLogic<T>.executeAsync(operation: FlowAsyncOperation<R>, maySkipCheckpoint: Boolean = false): R {
val request = FlowIORequest.ExecuteAsyncOperation(operation) val request = FlowIORequest.ExecuteAsyncOperation(operation)
return stateMachine.suspend(request, maySkipCheckpoint) return stateMachine.suspend(request, maySkipCheckpoint)
} }

View File

@ -97,4 +97,3 @@ sealed class FlowIORequest<out R : Any> {
*/ */
data class ExecuteAsyncOperation<T : Any>(val operation: FlowAsyncOperation<T>) : FlowIORequest<T>() data class ExecuteAsyncOperation<T : Any>(val operation: FlowAsyncOperation<T>) : FlowIORequest<T>()
} }

View File

@ -53,4 +53,4 @@ fun <T : Any> SerializedBytes<Any>.checkPayloadIs(type: Class<T>): Untrustworthy
return type.castIfPossible(payloadData)?.let { UntrustworthyData(it) } ?: return type.castIfPossible(payloadData)?.let { UntrustworthyData(it) } ?:
throw IllegalArgumentException("We were expecting a ${type.name} but we instead got a " + throw IllegalArgumentException("We were expecting a ${type.name} but we instead got a " +
"${payloadData.javaClass.name} (${payloadData})") "${payloadData.javaClass.name} (${payloadData})")
} }

View File

@ -32,7 +32,7 @@ interface CheckpointStorage {
/** /**
* Stream all checkpoints from the store. If this is backed by a database the stream will be valid until the * Stream all checkpoints from the store. If this is backed by a database the stream will be valid until the
* underlying database connection is open, so any processing should happen before it is closed. * underlying database connection is closed, so any processing should happen before it is closed.
*/ */
fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, SerializedBytes<Checkpoint>>> fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, SerializedBytes<Checkpoint>>>
} }

View File

@ -188,6 +188,29 @@ class NodeSchedulerService(private val clock: CordaClock,
} }
} }
/**
* Stop scheduler service.
*/
fun stop() {
mutex.locked {
schedulerTimerExecutor.shutdown()
scheduledStatesQueue.clear()
scheduledStates.clear()
}
}
/**
* Resume scheduler service after having called [stop].
*/
fun resume() {
mutex.locked {
schedulerTimerExecutor = Executors.newSingleThreadExecutor()
scheduledStates.putAll(createMap())
scheduledStatesQueue.addAll(scheduledStates.values)
rescheduleWakeUp()
}
}
override fun scheduleStateActivity(action: ScheduledStateRef) { override fun scheduleStateActivity(action: ScheduledStateRef) {
log.trace { "Schedule $action" } log.trace { "Schedule $action" }
val previousState = scheduledStates[action.ref] val previousState = scheduledStates[action.ref]
@ -227,7 +250,7 @@ class NodeSchedulerService(private val clock: CordaClock,
} }
} }
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor() private var schedulerTimerExecutor = Executors.newSingleThreadExecutor()
/** /**
* This method first cancels the [java.util.concurrent.Future] for any pending action so that the * This method first cancels the [java.util.concurrent.Future] for any pending action so that the
* [awaitWithDeadline] used below drops through without running the action. We then create a new * [awaitWithDeadline] used below drops through without running the action. We then create a new
@ -358,4 +381,4 @@ class NodeSchedulerService(private val clock: CordaClock,
null null
} }
} }
} }

View File

@ -149,7 +149,7 @@ interface ReceivedMessage : Message {
val peer: CordaX500Name val peer: CordaX500Name
/** Platform version of the sender's node. */ /** Platform version of the sender's node. */
val platformVersion: Int val platformVersion: Int
/** UUID representing the sending JVM */ /** Sequence number of message with respect to senderUUID */
val senderSeqNo: Long? val senderSeqNo: Long?
/** True if a flow session init message */ /** True if a flow session init message */
val isSessionInit: Boolean val isSessionInit: Boolean
@ -188,4 +188,3 @@ interface DeduplicationHandler {
} }
typealias MessageHandler = (ReceivedMessage, MessageHandlerRegistration, DeduplicationHandler) -> Unit typealias MessageHandler = (ReceivedMessage, MessageHandlerRegistration, DeduplicationHandler) -> Unit

View File

@ -70,7 +70,6 @@ class MessagingExecutor(
private val releaseVersion = SimpleString(versionInfo.releaseVersion) private val releaseVersion = SimpleString(versionInfo.releaseVersion)
private val sendMessageSizeMetric = metricRegistry.histogram("SendMessageSize") private val sendMessageSizeMetric = metricRegistry.histogram("SendMessageSize")
private val sendLatencyMetric = metricRegistry.timer("SendLatency") private val sendLatencyMetric = metricRegistry.timer("SendLatency")
private val sendBatchSizeMetric = metricRegistry.histogram("SendBatchSize")
private val ourSenderSeqNo = AtomicLong() private val ourSenderSeqNo = AtomicLong()
private companion object { private companion object {
@ -185,4 +184,4 @@ class MessagingExecutor(
private fun acknowledgeJob(job: Job.Acknowledge) { private fun acknowledgeJob(job: Job.Acknowledge) {
job.message.individualAcknowledge() job.message.individualAcknowledge()
} }
} }

View File

@ -169,4 +169,4 @@ class P2PMessageDeduplicator(private val database: CordaPersistence) {
private data class MessageMeta(val insertionTime: Instant, val senderHash: String?, val senderSeqNo: Long?) private data class MessageMeta(val insertionTime: Instant, val senderHash: String?, val senderSeqNo: Long?)
private data class SenderKey(val senderUUID: String, val peer: CordaX500Name, val isSessionInit: Boolean) private data class SenderKey(val senderUUID: String, val peer: CordaX500Name, val isSessionInit: Boolean)
} }

View File

@ -15,6 +15,7 @@ import com.codahale.metrics.MetricRegistry
import net.corda.core.crypto.toStringShort import net.corda.core.crypto.toStringShort
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.ThreadBox import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.messaging.SingleMessageRecipient
@ -115,7 +116,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, AutoCloseable { ) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, AutoCloseable {
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt()
private const val messageMaxRetryCount: Int = 3 private const val messageMaxRetryCount: Int = 3
fun createMessageToRedeliver(): PersistentMap<Long, Pair<Message, MessageRecipients>, RetryMessage, Long> { fun createMessageToRedeliver(): PersistentMap<Long, Pair<Message, MessageRecipients>, RetryMessage, Long> {
@ -352,6 +352,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val shutdownLatch = CountDownLatch(1) private val shutdownLatch = CountDownLatch(1)
var runningFuture = openFuture<Unit>()
/** /**
* Starts the p2p event loop: this method only returns once [stop] has been called. * Starts the p2p event loop: this method only returns once [stop] has been called.
*/ */
@ -362,6 +364,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
check(started) { "start must be called first" } check(started) { "start must be called first" }
check(!running) { "run can't be called twice" } check(!running) { "run can't be called twice" }
running = true running = true
runningFuture.set(Unit)
// If it's null, it means we already called stop, so return immediately. // If it's null, it means we already called stop, so return immediately.
if (p2pConsumer == null) { if (p2pConsumer == null) {
return return
@ -426,7 +429,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
} }
internal fun deliver(artemisMessage: ClientMessage) { internal fun deliver(artemisMessage: ClientMessage) {
artemisToCordaMessage(artemisMessage)?.let { cordaMessage -> artemisToCordaMessage(artemisMessage)?.let { cordaMessage ->
if (!deduplicator.isDuplicate(cordaMessage)) { if (!deduplicator.isDuplicate(cordaMessage)) {
deduplicator.signalMessageProcessStart(cordaMessage) deduplicator.signalMessageProcessStart(cordaMessage)
@ -439,7 +441,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
} }
private fun deliver(msg: ReceivedMessage, artemisMessage: ClientMessage) { private fun deliver(msg: ReceivedMessage, artemisMessage: ClientMessage) {
state.checkNotLocked() state.checkNotLocked()
val deliverTo = handlers[msg.topic] val deliverTo = handlers[msg.topic]
if (deliverTo != null) { if (deliverTo != null) {
@ -480,6 +481,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
check(started) check(started)
val prevRunning = running val prevRunning = running
running = false running = false
runningFuture = openFuture()
networkChangeSubscription?.unsubscribe() networkChangeSubscription?.unsubscribe()
require(p2pConsumer != null, { "stop can't be called twice" }) require(p2pConsumer != null, { "stop can't be called twice" })
require(producer != null, { "stop can't be called twice" }) require(producer != null, { "stop can't be called twice" })
@ -621,7 +623,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
} }
override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId, additionalHeaders: Map<String, String>): Message { override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId, additionalHeaders: Map<String, String>): Message {
return NodeClientMessage(topic, OpaqueBytes(data), deduplicationId, deduplicator.ourSenderUUID, additionalHeaders) return NodeClientMessage(topic, OpaqueBytes(data), deduplicationId, deduplicator.ourSenderUUID, additionalHeaders)
} }

View File

@ -108,7 +108,7 @@ class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, S
override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> { override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return txStorage.exclusive { return txStorage.exclusive {
DataFeed(allPersisted().map { it.second.toSignedTx() }.toList(), updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction()) DataFeed(allPersisted().map { it.second.toSignedTx() }.toList(), updates.bufferUntilSubscribed().wrapWithDatabaseTransaction())
} }
} }
@ -116,7 +116,7 @@ class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, S
return txStorage.exclusive { return txStorage.exclusive {
val existingTransaction = get(id) val existingTransaction = get(id)
if (existingTransaction == null) { if (existingTransaction == null) {
updatesPublisher.filter { it.id == id }.toFuture() updates.filter { it.id == id }.toFuture()
} else { } else {
doneFuture(existingTransaction.toSignedTx()) doneFuture(existingTransaction.toSignedTx())
} }

View File

@ -18,7 +18,6 @@ import co.paralleluniverse.fibers.Suspendable
interface ActionExecutor { interface ActionExecutor {
/** /**
* Execute [action] by [fiber]. * Execute [action] by [fiber].
* Precondition: [executeAction] is run inside an open database transaction.
*/ */
@Suspendable @Suspendable
fun executeAction(fiber: FlowFiber, action: Action) fun executeAction(fiber: FlowFiber, action: Action)

View File

@ -73,4 +73,4 @@ class CountUpDownLatch(initialValue: Int) {
fun countUp() { fun countUp() {
sync.increment() sync.increment()
} }
} }

View File

@ -54,4 +54,4 @@ data class DeduplicationId(val toString: String) {
return DeduplicationId("E-$errorId-${recipientSessionId.toLong}") return DeduplicationId("E-$errorId-${recipientSessionId.toLong}")
} }
} }
} }

View File

@ -25,4 +25,4 @@ interface FlowHospital {
* The flow running in [flowFiber] has cleaned, possibly as a result of a flow hospital resume. * The flow running in [flowFiber] has cleaned, possibly as a result of a flow hospital resume.
*/ */
fun flowCleaned(flowFiber: FlowFiber) fun flowCleaned(flowFiber: FlowFiber)
} }

View File

@ -166,4 +166,4 @@ class FlowLogicRefFactoryImpl(private val classloader: ClassLoader) : SingletonS
return false return false
} }
} }
} }

View File

@ -46,7 +46,6 @@ interface FlowMessaging {
* Implementation of [FlowMessaging] using a [ServiceHubInternal] to do the messaging and routing. * Implementation of [FlowMessaging] using a [ServiceHubInternal] to do the messaging and routing.
*/ */
class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging { class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
companion object { companion object {
val log = contextLogger() val log = contextLogger()
@ -73,7 +72,6 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
} }
private fun SessionMessage.additionalHeaders(target: Party): Map<String, String> { private fun SessionMessage.additionalHeaders(target: Party): Map<String, String> {
// This prevents a "deadlock" in case an initiated flow tries to start a session against a draining node that is also the initiator. // This prevents a "deadlock" in case an initiated flow tries to start a session against a draining node that is also the initiator.
// It does not help in case more than 2 nodes are involved in a circle, so the kill switch via RPC should be used in that case. // It does not help in case more than 2 nodes are involved in a circle, so the kill switch via RPC should be used in that case.
val mightDeadlockDrainingTarget = FlowStateMachineImpl.currentStateMachine()?.context?.origin.let { it is InvocationOrigin.Peer && it.party == target.name } val mightDeadlockDrainingTarget = FlowStateMachineImpl.currentStateMachine()?.context?.origin.let { it is InvocationOrigin.Peer && it.party == target.name }

View File

@ -96,4 +96,3 @@ class FlowSessionImpl(
require(!type.isPrimitive) { "Cannot receive primitive type $type" } require(!type.isPrimitive) { "Cannot receive primitive type $type" }
} }
} }

View File

@ -98,6 +98,11 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
if (value) field = value else throw IllegalArgumentException("Can only set to true") if (value) field = value else throw IllegalArgumentException("Can only set to true")
} }
/**
* Processes an event by creating the associated transition and executing it using the given executor.
* Try to avoid using this directly, instead use [processEventsUntilFlowIsResumed] or [processEventImmediately]
* instead.
*/
@Suspendable @Suspendable
private fun processEvent(transitionExecutor: TransitionExecutor, event: Event): FlowContinuation { private fun processEvent(transitionExecutor: TransitionExecutor, event: Event): FlowContinuation {
val stateMachine = getTransientField(TransientValues::stateMachine) val stateMachine = getTransientField(TransientValues::stateMachine)
@ -109,22 +114,63 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
return continuation return continuation
} }
/**
* Processes the events in the event queue until a transition indicates that control should be returned to user code
* in the form of a regular resume or a throw of an exception. Alternatively the transition may abort the fiber
* completely.
*
* @param isDbTransactionOpenOnEntry indicates whether a DB transaction is expected to be present before the
* processing of the eventloop. Purely used for internal invariant checks.
* @param isDbTransactionOpenOnExit indicates whether a DB transaction is expected to be present once the eventloop
* processing finished. Purely used for internal invariant checks.
*/
@Suspendable @Suspendable
private fun processEventsUntilFlowIsResumed(): Any? { private fun processEventsUntilFlowIsResumed(isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): Any? {
checkDbTransaction(isDbTransactionOpenOnEntry)
val transitionExecutor = getTransientField(TransientValues::transitionExecutor) val transitionExecutor = getTransientField(TransientValues::transitionExecutor)
val eventQueue = getTransientField(TransientValues::eventQueue) val eventQueue = getTransientField(TransientValues::eventQueue)
eventLoop@while (true) { try {
val nextEvent = eventQueue.receive() eventLoop@while (true) {
val continuation = processEvent(transitionExecutor, nextEvent) val nextEvent = eventQueue.receive()
when (continuation) { val continuation = processEvent(transitionExecutor, nextEvent)
is FlowContinuation.Resume -> return continuation.result when (continuation) {
is FlowContinuation.Throw -> { is FlowContinuation.Resume -> return continuation.result
continuation.throwable.fillInStackTrace() is FlowContinuation.Throw -> {
throw continuation.throwable continuation.throwable.fillInStackTrace()
throw continuation.throwable
}
FlowContinuation.ProcessEvents -> continue@eventLoop
FlowContinuation.Abort -> abortFiber()
} }
FlowContinuation.ProcessEvents -> continue@eventLoop
FlowContinuation.Abort -> abortFiber()
} }
} finally {
checkDbTransaction(isDbTransactionOpenOnExit)
}
}
/**
* Immediately processes the passed in event. Always called with an open database transaction.
*
* @param event the event to be processed.
* @param isDbTransactionOpenOnEntry indicates whether a DB transaction is expected to be present before the
* processing of the event. Purely used for internal invariant checks.
* @param isDbTransactionOpenOnExit indicates whether a DB transaction is expected to be present once the event
* processing finished. Purely used for internal invariant checks.
*/
@Suspendable
private fun processEventImmediately(event: Event, isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): FlowContinuation {
checkDbTransaction(isDbTransactionOpenOnEntry)
val transitionExecutor = getTransientField(TransientValues::transitionExecutor)
val continuation = processEvent(transitionExecutor, event)
checkDbTransaction(isDbTransactionOpenOnExit)
return continuation
}
private fun checkDbTransaction(isPresent: Boolean) {
if (isPresent) {
requireNotNull(contextTransactionOrNull != null)
} else {
require(contextTransactionOrNull == null)
} }
} }
@ -155,32 +201,56 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
Event.Error(resultOrError.exception) Event.Error(resultOrError.exception)
} }
} }
scheduleEvent(finalEvent) // Immediately process the last event. This is to make sure the transition can assume that it has an open
processEventsUntilFlowIsResumed() // database transaction.
val continuation = processEventImmediately(
finalEvent,
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = false
)
if (continuation == FlowContinuation.ProcessEvents) {
// This can happen in case there was an error and there are further things to do e.g. to propagate it.
processEventsUntilFlowIsResumed(
isDbTransactionOpenOnEntry = false,
isDbTransactionOpenOnExit = false
)
}
recordDuration(startTime) recordDuration(startTime)
} }
@Suspendable @Suspendable
private fun initialiseFlow() { private fun initialiseFlow() {
processEventsUntilFlowIsResumed() processEventsUntilFlowIsResumed(
isDbTransactionOpenOnEntry = false,
isDbTransactionOpenOnExit = true
)
} }
@Suspendable @Suspendable
override fun <R> subFlow(subFlow: FlowLogic<R>): R { override fun <R> subFlow(subFlow: FlowLogic<R>): R {
processEvent(getTransientField(TransientValues::transitionExecutor), Event.EnterSubFlow(subFlow.javaClass)) processEventImmediately(
Event.EnterSubFlow(subFlow.javaClass),
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = true
)
return try { return try {
subFlow.call() subFlow.call()
} finally { } finally {
processEvent(getTransientField(TransientValues::transitionExecutor), Event.LeaveSubFlow) processEventImmediately(
Event.LeaveSubFlow,
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = true
)
} }
} }
@Suspendable @Suspendable
override fun initiateFlow(party: Party): FlowSession { override fun initiateFlow(party: Party): FlowSession {
val resume = processEvent( val resume = processEventImmediately(
getTransientField(TransientValues::transitionExecutor), Event.InitiateFlow(party),
Event.InitiateFlow(party) isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = true
) as FlowContinuation.Resume ) as FlowContinuation.Resume
return resume.result as FlowSession return resume.result as FlowSession
} }
@ -237,7 +307,6 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override fun <R : Any> suspend(ioRequest: FlowIORequest<R>, maySkipCheckpoint: Boolean): R { override fun <R : Any> suspend(ioRequest: FlowIORequest<R>, maySkipCheckpoint: Boolean): R {
val serializationContext = TransientReference(getTransientField(TransientValues::checkpointSerializationContext)) val serializationContext = TransientReference(getTransientField(TransientValues::checkpointSerializationContext))
val transaction = extractThreadLocalTransaction() val transaction = extractThreadLocalTransaction()
val transitionExecutor = TransientReference(getTransientField(TransientValues::transitionExecutor))
parkAndSerialize { _, _ -> parkAndSerialize { _, _ ->
logger.trace { "Suspended on $ioRequest" } logger.trace { "Suspended on $ioRequest" }
@ -252,12 +321,20 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
Event.Error(throwable) Event.Error(throwable)
} }
// We must commit the database transaction before returning from this closure, otherwise Quasar may schedule // We must commit the database transaction before returning from this closure otherwise Quasar may schedule
// other fibers // other fibers, so we process the event immediately
require(processEvent(transitionExecutor.value, event) == FlowContinuation.ProcessEvents) val continuation = processEventImmediately(
event,
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = false
)
require(continuation == FlowContinuation.ProcessEvents)
Fiber.unparkDeserialized(this, scheduler) Fiber.unparkDeserialized(this, scheduler)
} }
return uncheckedCast(processEventsUntilFlowIsResumed()) return uncheckedCast(processEventsUntilFlowIsResumed(
isDbTransactionOpenOnEntry = false,
isDbTransactionOpenOnExit = true
))
} }
@Suspendable @Suspendable

View File

@ -71,6 +71,10 @@ class MultiThreadedStateMachineManager(
private val unfinishedFibers: ReusableLatch = ReusableLatch(), private val unfinishedFibers: ReusableLatch = ReusableLatch(),
private val classloader: ClassLoader = MultiThreadedStateMachineManager::class.java.classLoader private val classloader: ClassLoader = MultiThreadedStateMachineManager::class.java.classLoader
) : StateMachineManager, StateMachineManagerInternal { ) : StateMachineManager, StateMachineManagerInternal {
override fun resume() {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
companion object { companion object {
private val logger = contextLogger() private val logger = contextLogger()
} }

View File

@ -28,4 +28,3 @@ object PropagatingFlowHospital : FlowHospital {
throw IllegalStateException("Flow ${flowFiber.id} cleaned after error propagation triggered") throw IllegalStateException("Flow ${flowFiber.id} cleaned after error propagation triggered")
} }
} }

View File

@ -15,4 +15,4 @@ import net.corda.core.CordaException
/** /**
* An exception propagated and thrown in case a session initiation fails. * An exception propagated and thrown in case a session initiation fails.
*/ */
class SessionRejectException(reason: String) : CordaException(reason) class SessionRejectException(reason: String) : CordaException(reason)

View File

@ -44,6 +44,7 @@ import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.ReceivedMessage import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.statemachine.interceptors.* import net.corda.node.services.statemachine.interceptors.*
import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.services.statemachine.transitions.StateMachineConfiguration
import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl
@ -143,6 +144,20 @@ class SingleThreadedStateMachineManager(
} }
} }
override fun resume() {
fiberDeserializationChecker?.start(checkpointSerializationContext!!)
val fibers = restoreFlowsFromCheckpoints()
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
}
serviceHub.networkMapCache.nodeReady.then {
resumeRestoredFlows(fibers)
}
mutex.locked {
stopping = false
}
}
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> { override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
return mutex.locked { return mutex.locked {
flows.values.mapNotNull { flows.values.mapNotNull {

View File

@ -49,6 +49,11 @@ interface StateMachineManager {
*/ */
fun stop(allowedUnsuspendedFiberCount: Int) fun stop(allowedUnsuspendedFiberCount: Int)
/**
* Resume state machine manager after having called [stop].
*/
fun resume()
/** /**
* Starts a new flow. * Starts a new flow.
* *

View File

@ -239,4 +239,3 @@ sealed class ErrorState {
} }
} }
} }

View File

@ -58,4 +58,4 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti
return Pair(continuation, nextState) return Pair(continuation, nextState)
} }
} }

View File

@ -99,6 +99,7 @@ class FiberDeserializationChecker {
fun stop(): Boolean { fun stop(): Boolean {
jobQueue.add(Job.Finish) jobQueue.add(Job.Finish)
checkerThread?.join() checkerThread?.join()
checkerThread = null
return foundUnrestorableFibers return foundUnrestorableFibers
} }
} }

View File

@ -53,4 +53,4 @@ class HospitalisingInterceptor(
} }
return Pair(continuation, nextState) return Pair(continuation, nextState)
} }
} }

View File

@ -58,4 +58,4 @@ data class TransitionDiagnosticRecord(
) )
).joinToString("\n") ).joinToString("\n")
} }
} }

View File

@ -196,4 +196,4 @@ class DeliverSessionMessageTransition(
} }
} }
} }

View File

@ -44,4 +44,4 @@ class DoRemainingWorkTransition(
private fun erroredTransition(errorState: ErrorState.Errored): TransitionResult { private fun erroredTransition(errorState: ErrorState.Errored): TransitionResult {
return ErrorFlowTransition(context, startingState, errorState).transition() return ErrorFlowTransition(context, startingState, errorState).transition()
} }
} }

View File

@ -132,4 +132,4 @@ class ErrorFlowTransition(
} }
return Pair(initiatedSessions, newSessions) return Pair(initiatedSessions, newSessions)
} }
} }

View File

@ -417,4 +417,4 @@ class StartedFlowTransition(
FlowContinuation.ProcessEvents FlowContinuation.ProcessEvents
} }
} }
} }

View File

@ -251,4 +251,4 @@ class TopLevelTransition(
resumeFlowLogic(event.returnValue) resumeFlowLogic(event.returnValue)
} }
} }
} }

View File

@ -87,4 +87,4 @@ class UnstartedFlowTransition(
isAnyCheckpointPersisted = true isAnyCheckpointPersisted = true
) )
} }
} }

View File

@ -31,6 +31,7 @@ import net.corda.testing.internal.doLookup
import net.corda.testing.internal.rigorousMock import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices import net.corda.testing.node.MockServices
import net.corda.testing.node.TestClock import net.corda.testing.node.TestClock
import org.junit.Ignore
import org.junit.Rule import org.junit.Rule
import org.junit.Test import org.junit.Test
import org.junit.rules.TestWatcher import org.junit.rules.TestWatcher
@ -275,6 +276,7 @@ class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() {
newDatabase.close() newDatabase.close()
} }
@Ignore("Temporarily")
@Test @Test
fun `test that if schedule is updated then the flow is invoked on the correct schedule`() { fun `test that if schedule is updated then the flow is invoked on the correct schedule`() {
val dataSourceProps = MockServices.makeTestDataSourceProperties() val dataSourceProps = MockServices.makeTestDataSourceProperties()
@ -306,4 +308,4 @@ class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() {
scheduler.join() scheduler.join()
database.close() database.close()
} }
} }

View File

@ -141,4 +141,4 @@ class MaxTransactionSizeTests {
otherSide.send(Unit) otherSide.send(Unit)
} }
} }
} }

View File

@ -25,7 +25,6 @@ import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.createDirectories import net.corda.core.internal.createDirectories
import net.corda.core.internal.createDirectory import net.corda.core.internal.createDirectory
import net.corda.core.internal.uncheckedCast import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.messaging.SingleMessageRecipient

View File

@ -20,7 +20,7 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.WritableTransactionStorage import net.corda.node.services.api.WritableTransactionStorage
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.util.HashMap import java.util.*
/** /**
* A class which provides an implementation of [WritableTransactionStorage] which is used in [MockServices] * A class which provides an implementation of [WritableTransactionStorage] which is used in [MockServices]