mirror of
https://github.com/corda/corda.git
synced 2025-01-12 07:52:38 +00:00
Merge pull request #768 from corda/aslemmer-merge-10c559a3f3dc1cc8055e3204cd289468fbf3e644
CORDA-1334: Merge up to 10c559a3f3
This commit is contained in:
commit
e3c792c93d
@ -1599,9 +1599,9 @@ public final class net.corda.core.flows.TransactionParts extends java.lang.Objec
|
||||
public String toString()
|
||||
##
|
||||
@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.UnexpectedFlowEndException extends net.corda.core.CordaRuntimeException implements net.corda.core.flows.IdentifiableException
|
||||
public <init>(String, Throwable, long)
|
||||
@org.jetbrains.annotations.NotNull public Long getErrorId()
|
||||
public final long getOriginalErrorId()
|
||||
public <init>(String)
|
||||
public <init>(String, Throwable)
|
||||
public <init>(String, Throwable, Long)
|
||||
##
|
||||
@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.identity.AbstractParty extends java.lang.Object
|
||||
public <init>(java.security.PublicKey)
|
||||
|
@ -398,7 +398,7 @@ class RPCStabilityTests {
|
||||
servers[response]!!.shutdown()
|
||||
servers.remove(response)
|
||||
|
||||
//failover will take some time
|
||||
// Failover will take some time.
|
||||
while (true) {
|
||||
try {
|
||||
response = client.serverId()
|
||||
|
@ -10,9 +10,9 @@
|
||||
|
||||
package net.corda.client.rpc
|
||||
|
||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
||||
import net.corda.client.rpc.internal.KryoClientSerializationScheme
|
||||
import net.corda.client.rpc.internal.RPCClient
|
||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
||||
import net.corda.core.context.Actor
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
@ -103,7 +103,8 @@ interface CordaRPCClientConfiguration {
|
||||
* [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw
|
||||
* [RPCException] and previously returned observables will call onError().
|
||||
*
|
||||
* If the client was created using a list of hosts, automatic failover will occur(the servers have to be started in HA mode)
|
||||
* If the client was created using a list of hosts, automatic failover will occur (the servers have to be started in
|
||||
* HA mode).
|
||||
*
|
||||
* @param hostAndPort The network address to connect to.
|
||||
* @param configuration An optional configuration used to tweak client behaviour.
|
||||
|
@ -80,6 +80,12 @@ import kotlin.reflect.jvm.javaMethod
|
||||
* unsubscribing from the [Observable], or if the [Observable] is garbage collected the client will eventually
|
||||
* automatically signal the server. This is done using a cache that holds weak references to the [UnicastSubject]s.
|
||||
* The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor].
|
||||
*
|
||||
* The client will attempt to failover in case the server become unreachable. Depending on the [ServerLocataor] instance
|
||||
* passed in the constructor, failover is either handle at Artemis level or client level. If only one transport
|
||||
* was used to create the [ServerLocator], failover is handled by Artemis (retrying based on [CordaRPCClientConfiguration].
|
||||
* If a list of transport configurations was used, failover is handled locally. Artemis is able to do it, however the
|
||||
* brokers on server side need to be configured in HA mode and the [ServerLocator] needs to be created with HA as well.
|
||||
*/
|
||||
class RPCClientProxyHandler(
|
||||
private val rpcConfiguration: CordaRPCClientConfiguration,
|
||||
@ -185,7 +191,7 @@ class RPCClientProxyHandler(
|
||||
private val deduplicationSequenceNumber = AtomicLong(0)
|
||||
|
||||
private val sendingEnabled = AtomicBoolean(true)
|
||||
// used to interrupt failover thread (i.e. client is closed while failing over)
|
||||
// Used to interrupt failover thread (i.e. client is closed while failing over).
|
||||
private var haFailoverThread: Thread? = null
|
||||
|
||||
/**
|
||||
@ -427,7 +433,7 @@ class RPCClientProxyHandler(
|
||||
}
|
||||
|
||||
private fun attemptReconnect() {
|
||||
var reconnectAttempts = rpcConfiguration.maxReconnectAttempts * serverLocator.staticTransportConfigurations.size
|
||||
var reconnectAttempts = rpcConfiguration.maxReconnectAttempts.times(serverLocator.staticTransportConfigurations.size)
|
||||
var retryInterval = rpcConfiguration.connectionRetryInterval
|
||||
val maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval
|
||||
|
||||
@ -456,7 +462,7 @@ class RPCClientProxyHandler(
|
||||
continue
|
||||
}
|
||||
|
||||
log.debug("Connected successfully using ${transport.params}")
|
||||
log.debug("Connected successfully after $reconnectAttempts attempts using ${transport.params}.")
|
||||
log.info("RPC server available.")
|
||||
sessionFactory!!.addFailoverListener(this::haFailoverHandler)
|
||||
initSessions()
|
||||
@ -495,7 +501,7 @@ class RPCClientProxyHandler(
|
||||
haFailoverThread = Thread.currentThread()
|
||||
attemptReconnect()
|
||||
}
|
||||
/* Other events are not considered as reconnection is not done by Artemis */
|
||||
// Other events are not considered as reconnection is not done by Artemis.
|
||||
}
|
||||
|
||||
private fun failoverHandler(event: FailoverEventType) {
|
||||
|
@ -19,4 +19,3 @@ artifactoryPluginVersion=4.4.18
|
||||
snakeYamlVersion=1.19
|
||||
caffeineVersion=2.6.2
|
||||
metricsVersion=3.2.5
|
||||
|
||||
|
@ -46,7 +46,9 @@ open class FlowException(message: String?, cause: Throwable?) :
|
||||
* that we were not expecting), or the other side had an internal error, or the other side terminated when we
|
||||
* were waiting for a response.
|
||||
*/
|
||||
class UnexpectedFlowEndException(message: String, cause: Throwable?, val originalErrorId: Long) :
|
||||
class UnexpectedFlowEndException(message: String, cause: Throwable?, val originalErrorId: Long?) :
|
||||
CordaRuntimeException(message, cause), IdentifiableException {
|
||||
override fun getErrorId(): Long = originalErrorId
|
||||
constructor(message: String, cause: Throwable?) : this(message, cause, null)
|
||||
constructor(message: String) : this(message, null)
|
||||
override fun getErrorId(): Long? = originalErrorId
|
||||
}
|
||||
|
@ -16,7 +16,10 @@ import net.corda.core.CordaInternal
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.FlowIORequest
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
import net.corda.core.internal.abbreviate
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
@ -141,9 +144,98 @@ abstract class FlowLogic<out T> {
|
||||
* Note: The current implementation returns the single identity of the node. This will change once multiple identities
|
||||
* is implemented.
|
||||
*/
|
||||
|
||||
val ourIdentity: Party get() = stateMachine.ourIdentity
|
||||
|
||||
// Used to implement the deprecated send/receive functions using Party. When such a deprecated function is used we
|
||||
// create a fresh session for the Party, put it here and use it in subsequent deprecated calls.
|
||||
private val deprecatedPartySessionMap = HashMap<Party, FlowSession>()
|
||||
private fun getDeprecatedSessionForParty(party: Party): FlowSession {
|
||||
return deprecatedPartySessionMap.getOrPut(party) { initiateFlow(party) }
|
||||
}
|
||||
/**
|
||||
* Returns a [FlowInfo] object describing the flow [otherParty] is using. With [FlowInfo.flowVersion] it
|
||||
* provides the necessary information needed for the evolution of flows and enabling backwards compatibility.
|
||||
*
|
||||
* This method can be called before any send or receive has been done with [otherParty]. In such a case this will force
|
||||
* them to start their flow.
|
||||
*/
|
||||
@Deprecated("Use FlowSession.getCounterpartyFlowInfo()", level = DeprecationLevel.WARNING)
|
||||
@Suspendable
|
||||
fun getFlowInfo(otherParty: Party): FlowInfo = getDeprecatedSessionForParty(otherParty).getCounterpartyFlowInfo()
|
||||
|
||||
/**
|
||||
* Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
|
||||
* is received, which must be of the given [R] type.
|
||||
*
|
||||
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
|
||||
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
|
||||
* corrupted data in order to exploit your code.
|
||||
*
|
||||
* Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
|
||||
* use this when you expect to do a message swap than do use [send] and then [receive] in turn.
|
||||
*
|
||||
* @return an [UntrustworthyData] wrapper around the received object.
|
||||
*/
|
||||
@Deprecated("Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING)
|
||||
inline fun <reified R : Any> sendAndReceive(otherParty: Party, payload: Any): UntrustworthyData<R> {
|
||||
return sendAndReceive(R::class.java, otherParty, payload)
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
|
||||
* is received, which must be of the given [receiveType]. Remember that when receiving data from other parties the data
|
||||
* should not be trusted until it's been thoroughly verified for consistency and that all expectations are
|
||||
* satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
|
||||
*
|
||||
* Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
|
||||
* use this when you expect to do a message swap than do use [send] and then [receive] in turn.
|
||||
*
|
||||
* @return an [UntrustworthyData] wrapper around the received object.
|
||||
*/
|
||||
@Deprecated("Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING)
|
||||
@Suspendable
|
||||
open fun <R : Any> sendAndReceive(receiveType: Class<R>, otherParty: Party, payload: Any): UntrustworthyData<R> {
|
||||
return getDeprecatedSessionForParty(otherParty).sendAndReceive(receiveType, payload)
|
||||
}
|
||||
|
||||
/**
|
||||
* Suspends until the specified [otherParty] sends us a message of type [R].
|
||||
*
|
||||
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
|
||||
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
|
||||
* corrupted data in order to exploit your code.
|
||||
*/
|
||||
@Deprecated("Use FlowSession.receive()", level = DeprecationLevel.WARNING)
|
||||
inline fun <reified R : Any> receive(otherParty: Party): UntrustworthyData<R> = receive(R::class.java, otherParty)
|
||||
|
||||
/**
|
||||
* Suspends until the specified [otherParty] sends us a message of type [receiveType].
|
||||
*
|
||||
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
|
||||
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
|
||||
* corrupted data in order to exploit your code.
|
||||
*
|
||||
* @return an [UntrustworthyData] wrapper around the received object.
|
||||
*/
|
||||
@Deprecated("Use FlowSession.receive()", level = DeprecationLevel.WARNING)
|
||||
@Suspendable
|
||||
open fun <R : Any> receive(receiveType: Class<R>, otherParty: Party): UntrustworthyData<R> {
|
||||
return getDeprecatedSessionForParty(otherParty).receive(receiveType)
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues the given [payload] for sending to the [otherParty] and continues without suspending.
|
||||
*
|
||||
* Note that the other party may receive the message at some arbitrary later point or not at all: if [otherParty]
|
||||
* is offline then message delivery will be retried until it comes back or until the message is older than the
|
||||
* network's event horizon time.
|
||||
*/
|
||||
@Deprecated("Use FlowSession.send()", level = DeprecationLevel.WARNING)
|
||||
@Suspendable
|
||||
open fun send(otherParty: Party, payload: Any) {
|
||||
getDeprecatedSessionForParty(otherParty).send(payload)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
internal fun <R : Any> FlowSession.sendAndReceiveWithRetry(receiveType: Class<R>, payload: Any): UntrustworthyData<R> {
|
||||
val request = FlowIORequest.SendAndReceive(
|
||||
|
@ -97,4 +97,3 @@ sealed class FlowIORequest<out R : Any> {
|
||||
*/
|
||||
data class ExecuteAsyncOperation<T : Any>(val operation: FlowAsyncOperation<T>) : FlowIORequest<T>()
|
||||
}
|
||||
|
||||
|
@ -32,7 +32,7 @@ interface CheckpointStorage {
|
||||
|
||||
/**
|
||||
* Stream all checkpoints from the store. If this is backed by a database the stream will be valid until the
|
||||
* underlying database connection is open, so any processing should happen before it is closed.
|
||||
* underlying database connection is closed, so any processing should happen before it is closed.
|
||||
*/
|
||||
fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, SerializedBytes<Checkpoint>>>
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ interface ReceivedMessage : Message {
|
||||
val peer: CordaX500Name
|
||||
/** Platform version of the sender's node. */
|
||||
val platformVersion: Int
|
||||
/** UUID representing the sending JVM */
|
||||
/** Sequence number of message with respect to senderUUID */
|
||||
val senderSeqNo: Long?
|
||||
/** True if a flow session init message */
|
||||
val isSessionInit: Boolean
|
||||
@ -188,4 +188,3 @@ interface DeduplicationHandler {
|
||||
}
|
||||
|
||||
typealias MessageHandler = (ReceivedMessage, MessageHandlerRegistration, DeduplicationHandler) -> Unit
|
||||
|
||||
|
@ -70,7 +70,6 @@ class MessagingExecutor(
|
||||
private val releaseVersion = SimpleString(versionInfo.releaseVersion)
|
||||
private val sendMessageSizeMetric = metricRegistry.histogram("SendMessageSize")
|
||||
private val sendLatencyMetric = metricRegistry.timer("SendLatency")
|
||||
private val sendBatchSizeMetric = metricRegistry.histogram("SendBatchSize")
|
||||
private val ourSenderSeqNo = AtomicLong()
|
||||
|
||||
private companion object {
|
||||
|
@ -115,7 +115,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, AutoCloseable {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt()
|
||||
private const val messageMaxRetryCount: Int = 3
|
||||
|
||||
fun createMessageToRedeliver(): PersistentMap<Long, Pair<Message, MessageRecipients>, RetryMessage, Long> {
|
||||
@ -426,7 +425,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
}
|
||||
|
||||
internal fun deliver(artemisMessage: ClientMessage) {
|
||||
|
||||
artemisToCordaMessage(artemisMessage)?.let { cordaMessage ->
|
||||
if (!deduplicator.isDuplicate(cordaMessage)) {
|
||||
deduplicator.signalMessageProcessStart(cordaMessage)
|
||||
@ -439,7 +437,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
}
|
||||
|
||||
private fun deliver(msg: ReceivedMessage, artemisMessage: ClientMessage) {
|
||||
|
||||
state.checkNotLocked()
|
||||
val deliverTo = handlers[msg.topic]
|
||||
if (deliverTo != null) {
|
||||
@ -621,7 +618,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
}
|
||||
|
||||
override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId, additionalHeaders: Map<String, String>): Message {
|
||||
|
||||
return NodeClientMessage(topic, OpaqueBytes(data), deduplicationId, deduplicator.ourSenderUUID, additionalHeaders)
|
||||
}
|
||||
|
||||
|
@ -108,7 +108,7 @@ class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, S
|
||||
|
||||
override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
|
||||
return txStorage.exclusive {
|
||||
DataFeed(allPersisted().map { it.second.toSignedTx() }.toList(), updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
|
||||
DataFeed(allPersisted().map { it.second.toSignedTx() }.toList(), updates.bufferUntilSubscribed().wrapWithDatabaseTransaction())
|
||||
}
|
||||
}
|
||||
|
||||
@ -116,7 +116,7 @@ class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, S
|
||||
return txStorage.exclusive {
|
||||
val existingTransaction = get(id)
|
||||
if (existingTransaction == null) {
|
||||
updatesPublisher.filter { it.id == id }.toFuture()
|
||||
updates.filter { it.id == id }.toFuture()
|
||||
} else {
|
||||
doneFuture(existingTransaction.toSignedTx())
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
interface ActionExecutor {
|
||||
/**
|
||||
* Execute [action] by [fiber].
|
||||
* Precondition: [executeAction] is run inside an open database transaction.
|
||||
*/
|
||||
@Suspendable
|
||||
fun executeAction(fiber: FlowFiber, action: Action)
|
||||
|
@ -46,7 +46,6 @@ interface FlowMessaging {
|
||||
* Implementation of [FlowMessaging] using a [ServiceHubInternal] to do the messaging and routing.
|
||||
*/
|
||||
class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
|
||||
|
||||
companion object {
|
||||
val log = contextLogger()
|
||||
|
||||
@ -73,7 +72,6 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
|
||||
}
|
||||
|
||||
private fun SessionMessage.additionalHeaders(target: Party): Map<String, String> {
|
||||
|
||||
// This prevents a "deadlock" in case an initiated flow tries to start a session against a draining node that is also the initiator.
|
||||
// It does not help in case more than 2 nodes are involved in a circle, so the kill switch via RPC should be used in that case.
|
||||
val mightDeadlockDrainingTarget = FlowStateMachineImpl.currentStateMachine()?.context?.origin.let { it is InvocationOrigin.Peer && it.party == target.name }
|
||||
|
@ -96,4 +96,3 @@ class FlowSessionImpl(
|
||||
require(!type.isPrimitive) { "Cannot receive primitive type $type" }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -98,6 +98,11 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
if (value) field = value else throw IllegalArgumentException("Can only set to true")
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes an event by creating the associated transition and executing it using the given executor.
|
||||
* Try to avoid using this directly, instead use [processEventsUntilFlowIsResumed] or [processEventImmediately]
|
||||
* instead.
|
||||
*/
|
||||
@Suspendable
|
||||
private fun processEvent(transitionExecutor: TransitionExecutor, event: Event): FlowContinuation {
|
||||
val stateMachine = getTransientField(TransientValues::stateMachine)
|
||||
@ -109,10 +114,22 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
return continuation
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes the events in the event queue until a transition indicates that control should be returned to user code
|
||||
* in the form of a regular resume or a throw of an exception. Alternatively the transition may abort the fiber
|
||||
* completely.
|
||||
*
|
||||
* @param isDbTransactionOpenOnEntry indicates whether a DB transaction is expected to be present before the
|
||||
* processing of the eventloop. Purely used for internal invariant checks.
|
||||
* @param isDbTransactionOpenOnExit indicates whether a DB transaction is expected to be present once the eventloop
|
||||
* processing finished. Purely used for internal invariant checks.
|
||||
*/
|
||||
@Suspendable
|
||||
private fun processEventsUntilFlowIsResumed(): Any? {
|
||||
private fun processEventsUntilFlowIsResumed(isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): Any? {
|
||||
checkDbTransaction(isDbTransactionOpenOnEntry)
|
||||
val transitionExecutor = getTransientField(TransientValues::transitionExecutor)
|
||||
val eventQueue = getTransientField(TransientValues::eventQueue)
|
||||
try {
|
||||
eventLoop@while (true) {
|
||||
val nextEvent = eventQueue.receive()
|
||||
val continuation = processEvent(transitionExecutor, nextEvent)
|
||||
@ -126,6 +143,35 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
FlowContinuation.Abort -> abortFiber()
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
checkDbTransaction(isDbTransactionOpenOnExit)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Immediately processes the passed in event. Always called with an open database transaction.
|
||||
*
|
||||
* @param event the event to be processed.
|
||||
* @param isDbTransactionOpenOnEntry indicates whether a DB transaction is expected to be present before the
|
||||
* processing of the event. Purely used for internal invariant checks.
|
||||
* @param isDbTransactionOpenOnExit indicates whether a DB transaction is expected to be present once the event
|
||||
* processing finished. Purely used for internal invariant checks.
|
||||
*/
|
||||
@Suspendable
|
||||
private fun processEventImmediately(event: Event, isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): FlowContinuation {
|
||||
checkDbTransaction(isDbTransactionOpenOnEntry)
|
||||
val transitionExecutor = getTransientField(TransientValues::transitionExecutor)
|
||||
val continuation = processEvent(transitionExecutor, event)
|
||||
checkDbTransaction(isDbTransactionOpenOnExit)
|
||||
return continuation
|
||||
}
|
||||
|
||||
private fun checkDbTransaction(isPresent: Boolean) {
|
||||
if (isPresent) {
|
||||
requireNotNull(contextTransactionOrNull != null)
|
||||
} else {
|
||||
require(contextTransactionOrNull == null)
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@ -155,32 +201,56 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
Event.Error(resultOrError.exception)
|
||||
}
|
||||
}
|
||||
scheduleEvent(finalEvent)
|
||||
processEventsUntilFlowIsResumed()
|
||||
// Immediately process the last event. This is to make sure the transition can assume that it has an open
|
||||
// database transaction.
|
||||
val continuation = processEventImmediately(
|
||||
finalEvent,
|
||||
isDbTransactionOpenOnEntry = true,
|
||||
isDbTransactionOpenOnExit = false
|
||||
)
|
||||
if (continuation == FlowContinuation.ProcessEvents) {
|
||||
// This can happen in case there was an error and there are further things to do e.g. to propagate it.
|
||||
processEventsUntilFlowIsResumed(
|
||||
isDbTransactionOpenOnEntry = false,
|
||||
isDbTransactionOpenOnExit = false
|
||||
)
|
||||
}
|
||||
|
||||
recordDuration(startTime)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun initialiseFlow() {
|
||||
processEventsUntilFlowIsResumed()
|
||||
processEventsUntilFlowIsResumed(
|
||||
isDbTransactionOpenOnEntry = false,
|
||||
isDbTransactionOpenOnExit = true
|
||||
)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun <R> subFlow(subFlow: FlowLogic<R>): R {
|
||||
processEvent(getTransientField(TransientValues::transitionExecutor), Event.EnterSubFlow(subFlow.javaClass))
|
||||
processEventImmediately(
|
||||
Event.EnterSubFlow(subFlow.javaClass),
|
||||
isDbTransactionOpenOnEntry = true,
|
||||
isDbTransactionOpenOnExit = true
|
||||
)
|
||||
return try {
|
||||
subFlow.call()
|
||||
} finally {
|
||||
processEvent(getTransientField(TransientValues::transitionExecutor), Event.LeaveSubFlow)
|
||||
processEventImmediately(
|
||||
Event.LeaveSubFlow,
|
||||
isDbTransactionOpenOnEntry = true,
|
||||
isDbTransactionOpenOnExit = true
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun initiateFlow(party: Party): FlowSession {
|
||||
val resume = processEvent(
|
||||
getTransientField(TransientValues::transitionExecutor),
|
||||
Event.InitiateFlow(party)
|
||||
val resume = processEventImmediately(
|
||||
Event.InitiateFlow(party),
|
||||
isDbTransactionOpenOnEntry = true,
|
||||
isDbTransactionOpenOnExit = true
|
||||
) as FlowContinuation.Resume
|
||||
return resume.result as FlowSession
|
||||
}
|
||||
@ -237,7 +307,6 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
override fun <R : Any> suspend(ioRequest: FlowIORequest<R>, maySkipCheckpoint: Boolean): R {
|
||||
val serializationContext = TransientReference(getTransientField(TransientValues::checkpointSerializationContext))
|
||||
val transaction = extractThreadLocalTransaction()
|
||||
val transitionExecutor = TransientReference(getTransientField(TransientValues::transitionExecutor))
|
||||
parkAndSerialize { _, _ ->
|
||||
logger.trace { "Suspended on $ioRequest" }
|
||||
|
||||
@ -252,12 +321,20 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
Event.Error(throwable)
|
||||
}
|
||||
|
||||
// We must commit the database transaction before returning from this closure, otherwise Quasar may schedule
|
||||
// other fibers
|
||||
require(processEvent(transitionExecutor.value, event) == FlowContinuation.ProcessEvents)
|
||||
// We must commit the database transaction before returning from this closure otherwise Quasar may schedule
|
||||
// other fibers, so we process the event immediately
|
||||
val continuation = processEventImmediately(
|
||||
event,
|
||||
isDbTransactionOpenOnEntry = true,
|
||||
isDbTransactionOpenOnExit = false
|
||||
)
|
||||
require(continuation == FlowContinuation.ProcessEvents)
|
||||
Fiber.unparkDeserialized(this, scheduler)
|
||||
}
|
||||
return uncheckedCast(processEventsUntilFlowIsResumed())
|
||||
return uncheckedCast(processEventsUntilFlowIsResumed(
|
||||
isDbTransactionOpenOnEntry = false,
|
||||
isDbTransactionOpenOnExit = true
|
||||
))
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
|
@ -28,4 +28,3 @@ object PropagatingFlowHospital : FlowHospital {
|
||||
throw IllegalStateException("Flow ${flowFiber.id} cleaned after error propagation triggered")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -44,6 +44,7 @@ import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.services.messaging.ReceivedMessage
|
||||
import net.corda.node.services.statemachine.interceptors.*
|
||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||
import net.corda.node.services.statemachine.transitions.StateMachineConfiguration
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl
|
||||
|
@ -239,4 +239,3 @@ sealed class ErrorState {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -31,6 +31,7 @@ import net.corda.testing.internal.doLookup
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.TestClock
|
||||
import org.junit.Ignore
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TestWatcher
|
||||
@ -275,6 +276,7 @@ class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() {
|
||||
newDatabase.close()
|
||||
}
|
||||
|
||||
@Ignore("Temporarily")
|
||||
@Test
|
||||
fun `test that if schedule is updated then the flow is invoked on the correct schedule`() {
|
||||
val dataSourceProps = MockServices.makeTestDataSourceProperties()
|
||||
|
@ -25,7 +25,6 @@ import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.createDirectory
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
|
@ -20,7 +20,7 @@ import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.util.HashMap
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* A class which provides an implementation of [WritableTransactionStorage] which is used in [MockServices]
|
||||
|
Loading…
Reference in New Issue
Block a user