CORDA-3809 Expose client side unique RPC ID for flow starts (#6307)

Introducing a new flow start method (`startFlowDynamicWithClientId`) passing in a `clientId`.

Once `startFlowDynamicWithClientId` gets called, the `clientId` gets injected into `InvocationContext` and also pushed to the logging context.

If a new flow starts with this method, then a < `clientId` to flow > pair is kept on node side, even after the flow's lifetime. If `startFlowDynamicWithClientId` is called again with the same `clientId` then the node identifies that this `clientId` refers to an existing < `clientId` to flow > pair and returns back to the rpc client a `FlowStateMachineHandle` future, created out of that pair.

`FlowStateMachineHandle` interface was introduced as a thinner `FlowStateMachine`. All `FlowStateMachine` properties used by call sites are moved into this new interface along with `clientId` and then `FlowStateMachine` extends it.

Introducing an acknowledgement method (`removeClientId`). Calling this method removes the < `clientId` to flow > pair on the node side and frees resources.
This commit is contained in:
Kyriakos Tharrouniatis 2020-07-16 10:52:08 +01:00 committed by GitHub
parent db13e3beb9
commit 22d92d5ef0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 903 additions and 144 deletions

View File

@ -6,7 +6,7 @@ import com.natpryce.hamkrest.Matcher
import com.natpryce.hamkrest.equalTo
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.startFlow
@ -16,7 +16,7 @@ import net.corda.testing.node.internal.TestStartedNode
interface WithFinality : WithMockNet {
//region Operations
fun TestStartedNode.finalise(stx: SignedTransaction, vararg recipients: Party): FlowStateMachine<SignedTransaction> {
fun TestStartedNode.finalise(stx: SignedTransaction, vararg recipients: Party): FlowStateMachineHandle<SignedTransaction> {
return startFlowAndRunNetwork(FinalityInvoker(stx, recipients.toSet(), emptySet()))
}

View File

@ -6,7 +6,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.testing.core.makeUnique
@ -48,12 +48,12 @@ interface WithMockNet {
/**
* Start a flow
*/
fun <T> TestStartedNode.startFlow(logic: FlowLogic<T>): FlowStateMachine<T> = services.startFlow(logic)
fun <T> TestStartedNode.startFlow(logic: FlowLogic<T>): FlowStateMachineHandle<T> = services.startFlow(logic)
/**
* Start a flow and run the network immediately afterwards
*/
fun <T> TestStartedNode.startFlowAndRunNetwork(logic: FlowLogic<T>): FlowStateMachine<T> =
fun <T> TestStartedNode.startFlowAndRunNetwork(logic: FlowLogic<T>): FlowStateMachineHandle<T> =
startFlow(logic).andRunNetwork()
fun TestStartedNode.createConfidentialIdentity(party: Party) =

View File

@ -24,7 +24,8 @@ data class InvocationContext(
val actor: Actor?,
val externalTrace: Trace? = null,
val impersonatedActor: Actor? = null,
val arguments: List<Any?> = emptyList()
val arguments: List<Any?>? = emptyList(), // 'arguments' is nullable so that a - >= 4.6 version - RPC client can be backwards compatible against - < 4.6 version - nodes
val clientId: String? = null
) {
constructor(
@ -49,8 +50,9 @@ data class InvocationContext(
actor: Actor? = null,
externalTrace: Trace? = null,
impersonatedActor: Actor? = null,
arguments: List<Any?> = emptyList()
) = InvocationContext(origin, trace, actor, externalTrace, impersonatedActor, arguments)
arguments: List<Any?> = emptyList(),
clientId: String? = null
) = InvocationContext(origin, trace, actor, externalTrace, impersonatedActor, arguments, clientId)
/**
* Creates an [InvocationContext] with [InvocationOrigin.RPC] origin.
@ -113,7 +115,8 @@ data class InvocationContext(
actor = actor,
externalTrace = externalTrace,
impersonatedActor = impersonatedActor,
arguments = arguments
arguments = arguments,
clientId = clientId
)
}
}

View File

@ -11,10 +11,19 @@ import net.corda.core.node.ServiceHub
import net.corda.core.serialization.SerializedBytes
import org.slf4j.Logger
@DeleteForDJVM
@DoNotImplement
interface FlowStateMachineHandle<FLOWRETURN> {
val logic: FlowLogic<FLOWRETURN>?
val id: StateMachineRunId
val resultFuture: CordaFuture<FLOWRETURN>
val clientId: String?
}
/** This is an internal interface that is implemented by code in the node module. You should look at [FlowLogic]. */
@DeleteForDJVM
@DoNotImplement
interface FlowStateMachine<FLOWRETURN> {
interface FlowStateMachine<FLOWRETURN> : FlowStateMachineHandle<FLOWRETURN> {
@Suspendable
fun <SUSPENDRETURN : Any> suspend(ioRequest: FlowIORequest<SUSPENDRETURN>, maySkipCheckpoint: Boolean): SUSPENDRETURN
@ -38,14 +47,11 @@ interface FlowStateMachine<FLOWRETURN> {
fun updateTimedFlowTimeout(timeoutSeconds: Long)
val logic: FlowLogic<FLOWRETURN>
val serviceHub: ServiceHub
val logger: Logger
val id: StateMachineRunId
val resultFuture: CordaFuture<FLOWRETURN>
val context: InvocationContext
val ourIdentity: Party
val ourSenderUUID: String?
val creationTime: Long
val isKilled: Boolean
}
}

View File

@ -264,6 +264,16 @@ interface CordaRPCOps : RPCOps {
@RPCReturnsObservables
fun <T> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T>
/**
* Start the given flow with the given arguments and a [clientId]. [logicType] must be annotated
* with [net.corda.core.flows.StartableByRPC]. The flow's result/ exception will be available for the client to
* re-connect and retrieve them even after the flow's lifetime, by re-calling [startFlowDynamicWithClientId] with the same
* [clientId]. Upon calling [removeClientId], the node's resources holding the result/ exception will be freed
* and the result/ exception will no longer be available.
*/
@RPCReturnsObservables
fun <T> startFlowDynamicWithClientId(clientId: String, logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandleWithClientId<T>
/**
* Start the given flow with the given arguments, returning an [Observable] with a single observation of the
* result of running the flow. [logicType] must be annotated with [net.corda.core.flows.StartableByRPC].
@ -278,6 +288,15 @@ interface CordaRPCOps : RPCOps {
*/
fun killFlow(id: StateMachineRunId): Boolean
/**
* Removes a flow's [clientId] to result/ exception mapping. If the mapping is of a running flow, then the mapping will not get removed.
*
* See [startFlowDynamicWithClientId] for more information.
*
* @return whether the mapping was removed.
*/
fun removeClientId(clientId: String): Boolean
/** Returns Node's NodeInfo, assuming this will not change while the node is running. */
fun nodeInfo(): NodeInfo
@ -542,6 +561,79 @@ inline fun <T, A, B, C, D, E, F, reified R : FlowLogic<T>> CordaRPCOps.startFlow
arg5: F
): FlowHandle<T> = startFlowDynamic(R::class.java, arg0, arg1, arg2, arg3, arg4, arg5)
/**
* Extension function for type safe invocation of flows from Kotlin, with [clientId].
*/
@Suppress("unused")
inline fun <T, reified R : FlowLogic<T>> CordaRPCOps.startFlowWithClientId(
clientId: String,
@Suppress("unused_parameter")
flowConstructor: () -> R
): FlowHandleWithClientId<T> = startFlowDynamicWithClientId(clientId, R::class.java)
@Suppress("unused")
inline fun <T, A, reified R : FlowLogic<T>> CordaRPCOps.startFlowWithClientId(
clientId: String,
@Suppress("unused_parameter")
flowConstructor: (A) -> R,
arg0: A
): FlowHandleWithClientId<T> = startFlowDynamicWithClientId(clientId, R::class.java, arg0)
@Suppress("unused")
inline fun <T, A, B, reified R : FlowLogic<T>> CordaRPCOps.startFlowWithClientId(
clientId: String,
@Suppress("unused_parameter")
flowConstructor: (A, B) -> R,
arg0: A,
arg1: B
): FlowHandleWithClientId<T> = startFlowDynamicWithClientId(clientId, R::class.java, arg0, arg1)
@Suppress("unused")
inline fun <T, A, B, C, reified R : FlowLogic<T>> CordaRPCOps.startFlowWithClientId(
clientId: String,
@Suppress("unused_parameter")
flowConstructor: (A, B, C) -> R,
arg0: A,
arg1: B,
arg2: C
): FlowHandleWithClientId<T> = startFlowDynamicWithClientId(clientId, R::class.java, arg0, arg1, arg2)
@Suppress("unused")
inline fun <T, A, B, C, D, reified R : FlowLogic<T>> CordaRPCOps.startFlowWithClientId(
clientId: String,
@Suppress("unused_parameter")
flowConstructor: (A, B, C, D) -> R,
arg0: A,
arg1: B,
arg2: C,
arg3: D
): FlowHandleWithClientId<T> = startFlowDynamicWithClientId(clientId, R::class.java, arg0, arg1, arg2, arg3)
@Suppress("unused")
inline fun <T, A, B, C, D, E, reified R : FlowLogic<T>> CordaRPCOps.startFlowWithClientId(
clientId: String,
@Suppress("unused_parameter")
flowConstructor: (A, B, C, D, E) -> R,
arg0: A,
arg1: B,
arg2: C,
arg3: D,
arg4: E
): FlowHandleWithClientId<T> = startFlowDynamicWithClientId(clientId, R::class.java, arg0, arg1, arg2, arg3, arg4)
@Suppress("unused")
inline fun <T, A, B, C, D, E, F, reified R : FlowLogic<T>> CordaRPCOps.startFlowWithClientId(
clientId: String,
@Suppress("unused_parameter")
flowConstructor: (A, B, C, D, E, F) -> R,
arg0: A,
arg1: B,
arg2: C,
arg3: D,
arg4: E,
arg5: F
): FlowHandleWithClientId<T> = startFlowDynamicWithClientId(clientId, R::class.java, arg0, arg1, arg2, arg3, arg4, arg5)
/**
* Extension function for type safe invocation of flows from Kotlin, with progress tracking enabled.
*/

View File

@ -28,6 +28,14 @@ interface FlowHandle<A> : AutoCloseable {
override fun close()
}
interface FlowHandleWithClientId<A> : FlowHandle<A> {
/**
* The [clientId] with which the client has started the flow.
*/
val clientId: String
}
/**
* [FlowProgressHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value.
*/
@ -66,6 +74,18 @@ data class FlowHandleImpl<A>(
}
}
@CordaSerializable
data class FlowHandleWithClientIdImpl<A>(
override val id: StateMachineRunId,
override val returnValue: CordaFuture<A>,
override val clientId: String) : FlowHandleWithClientId<A> {
// Remember to add @Throws to FlowHandle.close() if this throws an exception.
override fun close() {
returnValue.cancel(false)
}
}
@CordaSerializable
data class FlowProgressHandleImpl<A> @JvmOverloads constructor(
override val id: StateMachineRunId,

View File

@ -9,23 +9,11 @@
<ID>ClassNaming:BuyerFlow.kt$BuyerFlow$STARTING_BUY : Step</ID>
<ID>ClassNaming:CompositeMemberCompositeSchemaToClassCarpenterTests.kt$I_</ID>
<ID>ClassNaming:CordaServiceTest.kt$CordaServiceTest.DummyServiceFlow.Companion$TEST_STEP : Step</ID>
<ID>ClassNaming:CustomVaultQuery.kt$TopupIssuerFlow.TopupIssuer.Companion$AWAITING_REQUEST : Step</ID>
<ID>ClassNaming:CustomVaultQuery.kt$TopupIssuerFlow.TopupIssuer.Companion$SENDING_TOP_UP_ISSUE_REQUEST : Step</ID>
<ID>ClassNaming:DeserializeNeedingCarpentryTests.kt$DeserializeNeedingCarpentryTests$outer</ID>
<ID>ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$FINALISING_TRANSACTION : Step</ID>
<ID>ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$GENERATING_TRANSACTION : Step</ID>
<ID>ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$SIGNING_TRANSACTION : Step</ID>
<ID>ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$VERIFYING_TRANSACTION : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$EXTRACTING_VAULT_STATES : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$ID_OTHER_NODES : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$OTHER_TX_COMPONENTS : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$SENDING_AND_RECEIVING_DATA : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$SIGS_GATHERING : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_BUILDING : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_SIGNING : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_VERIFICATION : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$VERIFYING_SIGS : Step</ID>
<ID>ClassNaming:FlowCookbook.kt$ResponderFlow.Companion$RECEIVING_AND_SENDING_DATA : Step</ID>
<ID>ClassNaming:FlowFrameworkTests.kt$ExceptionFlow$START_STEP : Step</ID>
<ID>ClassNaming:FlowFrameworkTests.kt$InitiatedReceiveFlow$RECEIVED_STEP : Step</ID>
<ID>ClassNaming:FlowFrameworkTests.kt$InitiatedReceiveFlow$START_STEP : Step</ID>
@ -178,7 +166,6 @@
<ID>ComplexMethod:RPCServer.kt$RPCServer$private fun clientArtemisMessageHandler(artemisMessage: ClientMessage)</ID>
<ID>ComplexMethod:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ReconnectingRPCConnection$ private tailrec fun establishConnectionWithRetry( retryInterval: Duration, roundRobinIndex: Int = 0, retries: Int = -1 ): CordaRPCConnection?</ID>
<ID>ComplexMethod:RemoteTypeCarpenter.kt$SchemaBuildingRemoteTypeCarpenter$override fun carpent(typeInformation: RemoteTypeInformation): Type</ID>
<ID>ComplexMethod:RpcReconnectTests.kt$RpcReconnectTests$ @Test(timeout=300_000) fun `test that the RPC client is able to reconnect and proceed after node failure, restart, or connection reset`()</ID>
<ID>ComplexMethod:SchemaMigration.kt$SchemaMigration$ private fun migrateOlderDatabaseToUseLiquibase(existingCheckpoints: Boolean): Boolean</ID>
<ID>ComplexMethod:SchemaMigration.kt$SchemaMigration$private fun doRunMigration( run: Boolean, check: Boolean, existingCheckpoints: Boolean? = null )</ID>
<ID>ComplexMethod:SendTransactionFlow.kt$DataVendingFlow$@Suspendable override fun call(): Void?</ID>
@ -310,7 +297,6 @@
<ID>ForbiddenComment:CordappProviderImplTests.kt$CordappProviderImplTests.Companion$// TODO: Cordapp name should differ from the JAR name</ID>
<ID>ForbiddenComment:CoreFlowHandlers.kt$NotaryChangeHandler$// TODO: Right now all nodes will automatically approve the notary change. We need to figure out if stricter controls are necessary.</ID>
<ID>ForbiddenComment:CrossCashTest.kt$CrossCashState$// TODO: Alternative: We may possibly reduce the complexity of the search even further using some form of</ID>
<ID>ForbiddenComment:Crypto.kt$Crypto$// TODO: Check if non-ECC keys satisfy params (i.e. approved/valid RSA modulus size).</ID>
<ID>ForbiddenComment:Crypto.kt$Crypto$// TODO: We currently use SHA256(seed) when retrying, but BIP32 just skips a counter (i) that results to an invalid key.</ID>
<ID>ForbiddenComment:Crypto.kt$Crypto$// TODO: change the val name to a more descriptive one as it's now confusing and looks like a Key type.</ID>
<ID>ForbiddenComment:Crypto.kt$Crypto$// TODO: change val name to SPHINCS256_SHA512. This will break backwards compatibility.</ID>
@ -435,7 +421,6 @@
<ID>ForbiddenComment:RatesFixFlow.kt$RatesFixFlow.FixQueryFlow$// TODO: add deadline to receive</ID>
<ID>ForbiddenComment:ResolveTransactionsFlow.kt$ResolveTransactionsFlow$// TODO: This could be done in parallel with other fetches for extra speed.</ID>
<ID>ForbiddenComment:ResolveTransactionsFlowTest.kt$ResolveTransactionsFlowTest$// TODO: this operation should not require an explicit transaction</ID>
<ID>ForbiddenComment:RestrictedEntityManager.kt$RestrictedEntityManager$// TODO: Figure out which other methods on EntityManager need to be blocked?</ID>
<ID>ForbiddenComment:ScheduledActivityObserver.kt$ScheduledActivityObserver.Companion$// TODO: Beware we are calling dynamically loaded contract code inside here.</ID>
<ID>ForbiddenComment:ScheduledFlowIntegrationTests.kt$ScheduledFlowIntegrationTests$// TODO: the queries below are not atomic so we need to allow enough time for the scheduler to finish. Would be better to query scheduler.</ID>
<ID>ForbiddenComment:SendTransactionFlow.kt$DataVendingFlow$// Security TODO: Check for abnormally large or malformed data requests</ID>
@ -622,7 +607,6 @@
<ID>FunctionNaming:VersionExtractorTest.kt$VersionExtractorTest$@Test(timeout=300_000) fun version_header_extraction_present()</ID>
<ID>LargeClass:AbstractNode.kt$AbstractNode&lt;S&gt; : SingletonSerializeAsToken</ID>
<ID>LargeClass:SingleThreadedStateMachineManager.kt$SingleThreadedStateMachineManager : StateMachineManagerStateMachineManagerInternal</ID>
<ID>LongMethod:FlowCookbook.kt$InitiatorFlow$@Suppress("RemoveExplicitTypeArguments") @Suspendable override fun call()</ID>
<ID>LongMethod:HibernateQueryCriteriaParser.kt$HibernateQueryCriteriaParser$override fun parseCriteria(criteria: CommonQueryCriteria): Collection&lt;Predicate&gt;</ID>
<ID>LongParameterList:AMQPSerializer.kt$AMQPSerializer$(obj: Any, data: Data, type: Type, output: SerializationOutput, context: SerializationContext, debugIndent: Int = 0)</ID>
<ID>LongParameterList:AbstractCashSelection.kt$AbstractCashSelection$(connection: Connection, amount: Amount&lt;Currency&gt;, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set&lt;AbstractParty&gt;, withIssuerRefs: Set&lt;OpaqueBytes&gt;, withResultSet: (ResultSet) -&gt; Boolean)</ID>
@ -657,6 +641,9 @@
<ID>LongParameterList:CordaRPCOps.kt$( @Suppress("UNUSED_PARAMETER") flowConstructor: (A, B, C, D, E, F) -&gt; R, arg0: A, arg1: B, arg2: C, arg3: D, arg4: E, arg5: F )</ID>
<ID>LongParameterList:CordaRPCOps.kt$( @Suppress("unused_parameter") flowConstructor: (A, B, C, D, E) -&gt; R, arg0: A, arg1: B, arg2: C, arg3: D, arg4: E )</ID>
<ID>LongParameterList:CordaRPCOps.kt$( @Suppress("unused_parameter") flowConstructor: (A, B, C, D, E, F) -&gt; R, arg0: A, arg1: B, arg2: C, arg3: D, arg4: E, arg5: F )</ID>
<ID>LongParameterList:CordaRPCOps.kt$( clientId: String, @Suppress("unused_parameter") flowConstructor: (A, B, C, D) -&gt; R, arg0: A, arg1: B, arg2: C, arg3: D )</ID>
<ID>LongParameterList:CordaRPCOps.kt$( clientId: String, @Suppress("unused_parameter") flowConstructor: (A, B, C, D, E) -&gt; R, arg0: A, arg1: B, arg2: C, arg3: D, arg4: E )</ID>
<ID>LongParameterList:CordaRPCOps.kt$( clientId: String, @Suppress("unused_parameter") flowConstructor: (A, B, C, D, E, F) -&gt; R, arg0: A, arg1: B, arg2: C, arg3: D, arg4: E, arg5: F )</ID>
<ID>LongParameterList:Driver.kt$DriverParameters$( isDebug: Boolean, driverDirectory: Path, portAllocation: PortAllocation, debugPortAllocation: PortAllocation, systemProperties: Map&lt;String, String&gt;, useTestClock: Boolean, startNodesInProcess: Boolean, waitForAllNodesToFinish: Boolean, notarySpecs: List&lt;NotarySpec&gt;, extraCordappPackagesToScan: List&lt;String&gt;, jmxPolicy: JmxPolicy, networkParameters: NetworkParameters )</ID>
<ID>LongParameterList:Driver.kt$DriverParameters$( isDebug: Boolean, driverDirectory: Path, portAllocation: PortAllocation, debugPortAllocation: PortAllocation, systemProperties: Map&lt;String, String&gt;, useTestClock: Boolean, startNodesInProcess: Boolean, waitForAllNodesToFinish: Boolean, notarySpecs: List&lt;NotarySpec&gt;, extraCordappPackagesToScan: List&lt;String&gt;, jmxPolicy: JmxPolicy, networkParameters: NetworkParameters, cordappsForAllNodes: Set&lt;TestCordapp&gt;? )</ID>
<ID>LongParameterList:DriverDSL.kt$DriverDSL$( defaultParameters: NodeParameters = NodeParameters(), providedName: CordaX500Name? = defaultParameters.providedName, rpcUsers: List&lt;User&gt; = defaultParameters.rpcUsers, verifierType: VerifierType = defaultParameters.verifierType, customOverrides: Map&lt;String, Any?&gt; = defaultParameters.customOverrides, startInSameProcess: Boolean? = defaultParameters.startInSameProcess, maximumHeapSize: String = defaultParameters.maximumHeapSize )</ID>
@ -753,7 +740,6 @@
<ID>MagicNumber:AttachmentDemo.kt$10009</ID>
<ID>MagicNumber:AttachmentDemo.kt$10010</ID>
<ID>MagicNumber:AttachmentTrustTable.kt$AttachmentTrustTable$3</ID>
<ID>MagicNumber:AttachmentsClassLoader.kt$AttachmentsClassLoader$4</ID>
<ID>MagicNumber:AzureSmbVolume.kt$AzureSmbVolume$5000</ID>
<ID>MagicNumber:BFTSmart.kt$BFTSmart.Client$100</ID>
<ID>MagicNumber:BFTSmart.kt$BFTSmart.Replica.&lt;no name provided&gt;$20000</ID>
@ -775,12 +761,6 @@
<ID>MagicNumber:CashViewer.kt$CashViewer.StateRowGraphic$16</ID>
<ID>MagicNumber:CashViewer.kt$CashViewer.StateRowGraphic$30.0</ID>
<ID>MagicNumber:ClassCarpenter.kt$ClassCarpenterImpl$3</ID>
<ID>MagicNumber:ClientRpcExample.kt$ClientRpcExample$3</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$0.7</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$0.8</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$1000</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$10000</ID>
<ID>MagicNumber:ClientRpcTutorial.kt$2000</ID>
<ID>MagicNumber:CommercialPaperIssueFlow.kt$CommercialPaperIssueFlow$10</ID>
<ID>MagicNumber:CommercialPaperIssueFlow.kt$CommercialPaperIssueFlow$30</ID>
<ID>MagicNumber:CompositeSignature.kt$CompositeSignature$1024</ID>
@ -846,11 +826,6 @@
<ID>MagicNumber:ExchangeRateModel.kt$1.18</ID>
<ID>MagicNumber:ExchangeRateModel.kt$1.31</ID>
<ID>MagicNumber:FixingFlow.kt$FixingFlow.Fixer.&lt;no name provided&gt;$30</ID>
<ID>MagicNumber:FlowCookbook.kt$InitiatorFlow$30</ID>
<ID>MagicNumber:FlowCookbook.kt$InitiatorFlow$45</ID>
<ID>MagicNumber:FlowCookbook.kt$InitiatorFlow$777</ID>
<ID>MagicNumber:FlowCookbook.kt$ResponderFlow$99</ID>
<ID>MagicNumber:FlowCookbook.kt$ResponderFlow.&lt;no name provided&gt;$777</ID>
<ID>MagicNumber:FlowLogic.kt$FlowLogic$300</ID>
<ID>MagicNumber:FlowLogic.kt$FlowLogic.Companion$5</ID>
<ID>MagicNumber:FlowMonitor.kt$FlowMonitor$1000</ID>
@ -864,7 +839,6 @@
<ID>MagicNumber:HTTPNetworkRegistrationService.kt$HTTPNetworkRegistrationService$10</ID>
<ID>MagicNumber:HttpUtils.kt$HttpUtils$5</ID>
<ID>MagicNumber:HttpUtils.kt$HttpUtils$60</ID>
<ID>MagicNumber:IOUFlowResponder.kt$IOUFlowResponder.&lt;no name provided&gt;$100</ID>
<ID>MagicNumber:IRS.kt$RatePaymentEvent$360.0</ID>
<ID>MagicNumber:IRS.kt$RatePaymentEvent$4</ID>
<ID>MagicNumber:IRS.kt$RatePaymentEvent$8</ID>
@ -1026,7 +1000,6 @@
<ID>MagicNumber:NodeWebServer.kt$NodeWebServer$100</ID>
<ID>MagicNumber:NodeWebServer.kt$NodeWebServer$32768</ID>
<ID>MagicNumber:NodeWebServer.kt$NodeWebServer$40</ID>
<ID>MagicNumber:NonValidatingNotaryFlow.kt$NonValidatingNotaryFlow$4</ID>
<ID>MagicNumber:Notarise.kt$10</ID>
<ID>MagicNumber:Notarise.kt$10003</ID>
<ID>MagicNumber:NullKeys.kt$NullKeys$32</ID>
@ -1143,7 +1116,6 @@
<ID>MagicNumber:StandaloneShell.kt$StandaloneShell$7</ID>
<ID>MagicNumber:StateRevisionFlow.kt$StateRevisionFlow.Requester$30</ID>
<ID>MagicNumber:Structures.kt$PrivacySalt$32</ID>
<ID>MagicNumber:TargetVersionDependentRules.kt$StateContractValidationEnforcementRule$4</ID>
<ID>MagicNumber:TestNodeInfoBuilder.kt$TestNodeInfoBuilder$1234</ID>
<ID>MagicNumber:TestUtils.kt$10000</ID>
<ID>MagicNumber:TestUtils.kt$30000</ID>
@ -1154,7 +1126,6 @@
<ID>MagicNumber:TraderDemoClientApi.kt$TraderDemoClientApi$3</ID>
<ID>MagicNumber:TransactionBuilder.kt$TransactionBuilder$4</ID>
<ID>MagicNumber:TransactionDSLInterpreter.kt$TransactionDSL$30</ID>
<ID>MagicNumber:TransactionUtils.kt$4</ID>
<ID>MagicNumber:TransactionVerificationException.kt$TransactionVerificationException.ConstraintPropagationRejection$3</ID>
<ID>MagicNumber:TransactionViewer.kt$TransactionViewer$15.0</ID>
<ID>MagicNumber:TransactionViewer.kt$TransactionViewer$20.0</ID>
@ -1179,8 +1150,6 @@
<ID>MagicNumber:WebServer.kt$100.0</ID>
<ID>MagicNumber:WebServer.kt$WebServer$500</ID>
<ID>MagicNumber:WireTransaction.kt$WireTransaction$4</ID>
<ID>MagicNumber:WorkflowTransactionBuildTutorial.kt$SubmitCompletionFlow$60</ID>
<ID>MagicNumber:WorkflowTransactionBuildTutorial.kt$SubmitTradeApprovalFlow$60</ID>
<ID>MagicNumber:X509Utilities.kt$X509Utilities$3650</ID>
<ID>MagicNumber:errorAndTerminate.kt$10</ID>
<ID>MatchingDeclarationName:AMQPSerializerFactories.kt$net.corda.serialization.internal.amqp.AMQPSerializerFactories.kt</ID>
@ -1225,7 +1194,6 @@
<ID>MatchingDeclarationName:TestConstants.kt$net.corda.testing.core.TestConstants.kt</ID>
<ID>MatchingDeclarationName:TestUtils.kt$net.corda.testing.core.TestUtils.kt</ID>
<ID>MatchingDeclarationName:TransactionTypes.kt$net.corda.explorer.model.TransactionTypes.kt</ID>
<ID>MatchingDeclarationName:TutorialFlowStateMachines.kt$net.corda.docs.kotlin.tutorial.flowstatemachines.TutorialFlowStateMachines.kt</ID>
<ID>MatchingDeclarationName:Utils.kt$io.cryptoblk.core.Utils.kt</ID>
<ID>MatchingDeclarationName:VirtualCordapps.kt$net.corda.node.internal.cordapp.VirtualCordapps.kt</ID>
<ID>ModifierOrder:NodeNamedCache.kt$DefaultNamedCacheFactory$open protected</ID>
@ -1298,7 +1266,6 @@
<ID>SpreadOperator:ConfigUtilities.kt$(*pairs)</ID>
<ID>SpreadOperator:Configuration.kt$Configuration.Validation.Error$(*(containingPath.toList() + this.containingPath).toTypedArray())</ID>
<ID>SpreadOperator:ContractJarTestUtils.kt$ContractJarTestUtils$(jarName, *contractNames.map{ "${it.replace(".", "/")}.class" }.toTypedArray())</ID>
<ID>SpreadOperator:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$(logicType, context(), *args)</ID>
<ID>SpreadOperator:CordaX500Name.kt$CordaX500Name.Companion$(*Locale.getISOCountries(), unspecifiedCountry)</ID>
<ID>SpreadOperator:CustomCordapp.kt$CustomCordapp$(*classes.map { it.name }.toTypedArray())</ID>
<ID>SpreadOperator:CustomCordapp.kt$CustomCordapp$(*packages.map { it.replace('.', '/') }.toTypedArray())</ID>
@ -1320,10 +1287,6 @@
<ID>SpreadOperator:FlowOverrideTests.kt$FlowOverrideTests$(*nodeBClasses.toTypedArray())</ID>
<ID>SpreadOperator:FlowTestsUtils.kt$(*allSessions)</ID>
<ID>SpreadOperator:FlowTestsUtils.kt$(session, *sessions)</ID>
<ID>SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*ourInputStates.toTypedArray())</ID>
<ID>SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*ourOutputState.map { StateAndContract(it, Cash.PROGRAM_ID) }.toTypedArray())</ID>
<ID>SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*theirInputStates.toTypedArray())</ID>
<ID>SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*theirOutputState.map { StateAndContract(it, Cash.PROGRAM_ID) }.toTypedArray())</ID>
<ID>SpreadOperator:HTTPNetworkRegistrationService.kt$HTTPNetworkRegistrationService$(OpaqueBytes(request.encoded), "Platform-Version" to "${versionInfo.platformVersion}", "Client-Version" to versionInfo.releaseVersion, "Private-Network-Map" to (config.pnm?.toString() ?: ""), *(config.csrToken?.let { arrayOf(CENM_SUBMISSION_TOKEN to it) } ?: arrayOf()))</ID>
<ID>SpreadOperator:HibernateQueryCriteriaParser.kt$AbstractQueryCriteriaParser$(*leftPredicates.toTypedArray())</ID>
<ID>SpreadOperator:HibernateQueryCriteriaParser.kt$AbstractQueryCriteriaParser$(*leftPredicates.toTypedArray(), *rightPredicates.toTypedArray())</ID>
@ -1574,7 +1537,6 @@
<ID>TooGenericExceptionCaught:NotaryUtils.kt$e: Exception</ID>
<ID>TooGenericExceptionCaught:ObjectDiffer.kt$ObjectDiffer$throwable: Exception</ID>
<ID>TooGenericExceptionCaught:P2PMessagingClient.kt$P2PMessagingClient$e: Exception</ID>
<ID>TooGenericExceptionCaught:PersistentIdentityMigrationNewTableTest.kt$PersistentIdentityMigrationNewTableTest$e: Exception</ID>
<ID>TooGenericExceptionCaught:PersistentUniquenessProvider.kt$PersistentUniquenessProvider$e: Exception</ID>
<ID>TooGenericExceptionCaught:ProfileController.kt$ProfileController$e: Exception</ID>
<ID>TooGenericExceptionCaught:PropertyValidationTest.kt$PropertyValidationTest$e: Exception</ID>
@ -1742,8 +1704,6 @@
<ID>UnusedImports:Amount.kt$import net.corda.core.crypto.CompositeKey</ID>
<ID>UnusedImports:Amount.kt$import net.corda.core.identity.Party</ID>
<ID>UnusedImports:DummyLinearStateSchemaV1.kt$import net.corda.core.contracts.ContractState</ID>
<ID>UnusedImports:FlowsExecutionModeRpcTest.kt$import net.corda.core.internal.packageName</ID>
<ID>UnusedImports:FlowsExecutionModeRpcTest.kt$import net.corda.finance.schemas.CashSchemaV1</ID>
<ID>UnusedImports:InternalTestUtils.kt$import java.nio.file.Files</ID>
<ID>UnusedImports:InternalTestUtils.kt$import net.corda.nodeapi.internal.loadDevCaTrustStore</ID>
<ID>UnusedImports:NetworkMap.kt$import net.corda.core.node.NodeInfo</ID>
@ -2012,8 +1972,6 @@
<ID>WildcardImport:CordaModule.kt$import net.corda.core.identity.*</ID>
<ID>WildcardImport:CordaModule.kt$import net.corda.core.transactions.*</ID>
<ID>WildcardImport:CordaRPCOps.kt$import net.corda.core.node.services.vault.*</ID>
<ID>WildcardImport:CordaRPCOpsImplTest.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:CordaRPCOpsImplTest.kt$import org.assertj.core.api.Assertions.*</ID>
<ID>WildcardImport:CordaServiceTest.kt$import kotlin.test.*</ID>
<ID>WildcardImport:CordaViewModel.kt$import tornadofx.*</ID>
<ID>WildcardImport:Cordapp.kt$import net.corda.core.cordapp.Cordapp.Info.*</ID>
@ -2031,10 +1989,6 @@
<ID>WildcardImport:CryptoSignUtils.kt$import net.corda.core.crypto.*</ID>
<ID>WildcardImport:CryptoUtilsTest.kt$import kotlin.test.*</ID>
<ID>WildcardImport:CustomCordapp.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:CustomVaultQuery.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:CustomVaultQuery.kt$import net.corda.core.utilities.*</ID>
<ID>WildcardImport:CustomVaultQueryTest.kt$import net.corda.core.node.services.vault.*</ID>
<ID>WildcardImport:CustomVaultQueryTest.kt$import net.corda.finance.*</ID>
<ID>WildcardImport:DBNetworkParametersStorage.kt$import javax.persistence.*</ID>
<ID>WildcardImport:DBRunnerExtension.kt$import org.junit.jupiter.api.extension.*</ID>
<ID>WildcardImport:DBTransactionStorage.kt$import javax.persistence.*</ID>
@ -2057,7 +2011,6 @@
<ID>WildcardImport:DeserializeSimpleTypesTests.kt$import net.corda.serialization.internal.amqp.testutils.*</ID>
<ID>WildcardImport:DigitalSignatureWithCert.kt$import java.security.cert.*</ID>
<ID>WildcardImport:DistributedServiceTests.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:DoRemainingWorkTransition.kt$import net.corda.node.services.statemachine.*</ID>
<ID>WildcardImport:DockerInstantiator.kt$import com.github.dockerjava.api.model.*</ID>
<ID>WildcardImport:DriverDSLImpl.kt$import net.corda.testing.driver.*</ID>
<ID>WildcardImport:DummyContract.kt$import net.corda.core.contracts.*</ID>
@ -2076,8 +2029,6 @@
<ID>WildcardImport:EvolutionSerializerFactoryTests.kt$import kotlin.test.*</ID>
<ID>WildcardImport:EvolutionSerializerFactoryTests.kt$import net.corda.serialization.internal.amqp.testutils.*</ID>
<ID>WildcardImport:Explorer.kt$import tornadofx.*</ID>
<ID>WildcardImport:FiberDeserializationCheckingInterceptor.kt$import net.corda.node.services.statemachine.*</ID>
<ID>WildcardImport:FinalityFlowMigration.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FinalityFlowTests.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:FinalityFlowTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FinalityHandlerTest.kt$import net.corda.node.services.statemachine.StaffedFlowHospital.*</ID>
@ -2089,11 +2040,7 @@
<ID>WildcardImport:FlowCheckpointCordapp.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowCheckpointVersionNodeStartupCheckTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowCheckpointVersionNodeStartupCheckTest.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:FlowCookbook.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:FlowCookbook.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowFrameworkPersistenceTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FlowFrameworkTests.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowFrameworkTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FlowFrameworkTripartyTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FlowLogicRefFactoryImpl.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowMatchers.kt$import net.corda.coretesting.internal.matchers.*</ID>
@ -2101,10 +2048,7 @@
<ID>WildcardImport:FlowRetryTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowStackSnapshotTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowStateMachine.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowStateMachineImpl.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowStateMachineImpl.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:FlowsDrainingModeContentionTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FxTransactionBuildTutorialTest.kt$import net.corda.finance.*</ID>
<ID>WildcardImport:GenericsTests.kt$import net.corda.serialization.internal.amqp.testutils.*</ID>
<ID>WildcardImport:Gui.kt$import tornadofx.*</ID>
<ID>WildcardImport:GuiUtilities.kt$import tornadofx.*</ID>
@ -2121,8 +2065,6 @@
<ID>WildcardImport:HibernateQueryCriteriaParser.kt$import net.corda.core.node.services.vault.EqualityComparisonOperator.*</ID>
<ID>WildcardImport:HibernateQueryCriteriaParser.kt$import net.corda.core.node.services.vault.LikenessOperator.*</ID>
<ID>WildcardImport:HibernateStatistics.kt$import org.hibernate.stat.*</ID>
<ID>WildcardImport:IOUContract.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:IOUFlowResponder.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:IRS.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:IRS.kt$import net.corda.finance.contracts.*</ID>
<ID>WildcardImport:IRSState.kt$import net.corda.core.contracts.*</ID>
@ -2169,7 +2111,6 @@
<ID>WildcardImport:JarSignatureCollectorTest.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:KeyStoreUtilities.kt$import java.security.*</ID>
<ID>WildcardImport:KeyStoreUtilities.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:KotlinIntegrationTestingTutorial.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:Kryo.kt$import com.esotericsoftware.kryo.*</ID>
<ID>WildcardImport:Kryo.kt$import net.corda.core.transactions.*</ID>
<ID>WildcardImport:KryoStreamsTest.kt$import java.io.*</ID>
@ -2420,8 +2361,6 @@
<ID>WildcardImport:SignedTransaction.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:SignedTransaction.kt$import net.corda.core.crypto.*</ID>
<ID>WildcardImport:SimpleMQClient.kt$import org.apache.activemq.artemis.api.core.client.*</ID>
<ID>WildcardImport:SingleThreadedStateMachineManager.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:SingleThreadedStateMachineManager.kt$import net.corda.node.services.statemachine.interceptors.*</ID>
<ID>WildcardImport:SpringDriver.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:StandaloneCordaRPClientTest.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:StandaloneCordaRPClientTest.kt$import net.corda.core.node.services.vault.*</ID>
@ -2473,8 +2412,6 @@
<ID>WildcardImport:TransactionViewer.kt$import net.corda.client.jfx.utils.*</ID>
<ID>WildcardImport:TransactionViewer.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:TransactionViewer.kt$import tornadofx.*</ID>
<ID>WildcardImport:TutorialContract.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:TutorialTestDSL.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:TwoPartyDealFlow.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:TwoPartyTradeFlow.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:TwoPartyTradeFlow.kt$import net.corda.core.flows.*</ID>
@ -2502,7 +2439,6 @@
<ID>WildcardImport:ValidatingNotaryServiceTests.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:ValidatingNotaryServiceTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:VaultFiller.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:VaultFlowTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.*</ID>
<ID>WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.vault.*</ID>
<ID>WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.vault.QueryCriteria.*</ID>

View File

@ -0,0 +1,77 @@
package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlowWithClientId
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import org.junit.Before
import org.junit.Test
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
import kotlin.test.assertTrue
class FlowWithClientIdTest {
@Before
fun reset() {
ResultFlow.hook = null
}
@Test(timeout=300_000)
fun `start flow with client id`() {
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode().getOrThrow()
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
assertEquals(5, flowHandle.returnValue.getOrThrow(20.seconds))
assertEquals(clientId, flowHandle.clientId)
}
}
@Test(timeout=300_000)
fun `remove client id`() {
val clientId = UUID.randomUUID().toString()
var counter = 0
ResultFlow.hook = { counter++ }
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode().getOrThrow()
val flowHandle0 = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
flowHandle0.returnValue.getOrThrow(20.seconds)
val removed = nodeA.rpc.removeClientId(clientId)
val flowHandle1 = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
flowHandle1.returnValue.getOrThrow(20.seconds)
assertTrue(removed)
assertNotEquals(flowHandle0.id, flowHandle1.id)
assertEquals(flowHandle0.clientId, flowHandle1.clientId)
assertEquals(2, counter) // this asserts that 2 different flows were spawned indeed
}
}
}
@StartableByRPC
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
companion object {
var hook: (() -> Unit)? = null
var suspendableHook: FlowLogic<Unit>? = null
}
@Suspendable
override fun call(): A {
hook?.invoke()
suspendableHook?.let { subFlow(it) }
return result
}
}

View File

@ -28,7 +28,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.AttachmentTrustCalculator
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.internal.NODE_INFO_DIRECTORY
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.NetworkParametersStorage
@ -331,7 +331,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
val checkpointStorage = DBCheckpointStorage(DBCheckpointPerformanceRecorder(services.monitoringService.metrics), platformClock)
@Suppress("LeakingThis")
val smm = makeStateMachineManager()
val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory)
val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory, DBCheckpointStorage.MAX_CLIENT_ID_LENGTH)
private val schedulerService = NodeSchedulerService(
platformClock,
database,
@ -1268,13 +1268,22 @@ internal fun logVendorString(database: CordaPersistence, log: Logger) {
}
// TODO Move this into its own file
class FlowStarterImpl(private val smm: StateMachineManager, private val flowLogicRefFactory: FlowLogicRefFactory) : FlowStarter {
override fun <T> startFlow(event: ExternalEvent.ExternalStartFlowEvent<T>): CordaFuture<FlowStateMachine<T>> {
smm.deliverExternalEvent(event)
class FlowStarterImpl(
private val smm: StateMachineManager,
private val flowLogicRefFactory: FlowLogicRefFactory,
private val maxClientIdLength: Int
) : FlowStarter {
override fun <T> startFlow(event: ExternalEvent.ExternalStartFlowEvent<T>): CordaFuture<out FlowStateMachineHandle<T>> {
val clientId = event.context.clientId
if (clientId != null && clientId.length > maxClientIdLength) {
throw IllegalArgumentException("clientId cannot be longer than $maxClientIdLength characters")
} else {
smm.deliverExternalEvent(event)
}
return event.future
}
override fun <T> startFlow(logic: FlowLogic<T>, context: InvocationContext): CordaFuture<FlowStateMachine<T>> {
override fun <T> startFlow(logic: FlowLogic<T>, context: InvocationContext): CordaFuture<out FlowStateMachineHandle<T>> {
val startFlowEvent = object : ExternalEvent.ExternalStartFlowEvent<T>, DeduplicationHandler {
override fun insideDatabaseTransaction() {}
@ -1291,12 +1300,12 @@ class FlowStarterImpl(private val smm: StateMachineManager, private val flowLogi
override val context: InvocationContext
get() = context
override fun wireUpFuture(flowFuture: CordaFuture<FlowStateMachine<T>>) {
override fun wireUpFuture(flowFuture: CordaFuture<out FlowStateMachineHandle<T>>) {
_future.captureLater(flowFuture)
}
private val _future = openFuture<FlowStateMachine<T>>()
override val future: CordaFuture<FlowStateMachine<T>>
private val _future = openFuture<FlowStateMachineHandle<T>>()
override val future: CordaFuture<FlowStateMachineHandle<T>>
get() = _future
}
return startFlow(startFlowEvent)
@ -1305,7 +1314,7 @@ class FlowStarterImpl(private val smm: StateMachineManager, private val flowLogi
override fun <T> invokeFlowAsync(
logicType: Class<out FlowLogic<T>>,
context: InvocationContext,
vararg args: Any?): CordaFuture<FlowStateMachine<T>> {
vararg args: Any?): CordaFuture<out FlowStateMachineHandle<T>> {
val logicRef = flowLogicRefFactory.createForRPC(logicType, *args)
val logic: FlowLogic<T> = uncheckedCast(flowLogicRefFactory.toFlowLogic(logicRef))
return startFlow(logic, context)

View File

@ -3,7 +3,7 @@ package net.corda.node.internal
import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByService
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowHandleImpl
@ -78,7 +78,7 @@ internal class AppServiceHubImpl<T : SerializeAsToken>(private val serviceHub: S
return FlowProgressHandleImpl(
id = stateMachine.id,
returnValue = stateMachine.resultFuture,
progress = stateMachine.logic.track()?.updates ?: Observable.empty()
progress = stateMachine.logic?.track()?.updates ?: Observable.empty()
)
}
@ -95,7 +95,7 @@ internal class AppServiceHubImpl<T : SerializeAsToken>(private val serviceHub: S
}
}
private fun <T> startFlowChecked(flow: FlowLogic<T>): FlowStateMachine<T> {
private fun <T> startFlowChecked(flow: FlowLogic<T>): FlowStateMachineHandle<T> {
val logicType = flow.javaClass
require(logicType.isAnnotationPresent(StartableByService::class.java)) { "${logicType.name} was not designed for starting by a CordaService" }
// TODO check service permissions

View File

@ -19,7 +19,7 @@ import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.AttachmentTrustInfo
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.internal.RPC_UPLOADER
import net.corda.core.internal.STRUCTURAL_STEP_PREFIX
import net.corda.core.internal.messaging.InternalCordaRPCOps
@ -27,6 +27,8 @@ import net.corda.core.internal.sign
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowHandleImpl
import net.corda.core.messaging.FlowHandleWithClientId
import net.corda.core.messaging.FlowHandleWithClientIdImpl
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.FlowProgressHandleImpl
import net.corda.core.messaging.ParametersUpdateInfo
@ -170,6 +172,8 @@ internal class CordaRPCOpsImpl(
override fun killFlow(id: StateMachineRunId): Boolean = smm.killFlow(id)
override fun removeClientId(clientId: String): Boolean = smm.removeClientId(clientId)
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
val (allStateMachines, changes) = smm.track()
@ -236,27 +240,33 @@ internal class CordaRPCOpsImpl(
}
override fun <T> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> {
val stateMachine = startFlow(logicType, args)
val stateMachine = startFlow(logicType, context(), args)
return FlowProgressHandleImpl(
id = stateMachine.id,
returnValue = stateMachine.resultFuture,
progress = stateMachine.logic.track()?.updates?.filter { !it.startsWith(STRUCTURAL_STEP_PREFIX) } ?: Observable.empty(),
stepsTreeIndexFeed = stateMachine.logic.trackStepsTreeIndex(),
stepsTreeFeed = stateMachine.logic.trackStepsTree()
progress = stateMachine.logic?.track()?.updates?.filter { !it.startsWith(STRUCTURAL_STEP_PREFIX) } ?: Observable.empty(),
stepsTreeIndexFeed = stateMachine.logic?.trackStepsTreeIndex(),
stepsTreeFeed = stateMachine.logic?.trackStepsTree()
)
}
override fun <T> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T> {
val stateMachine = startFlow(logicType, args)
val stateMachine = startFlow(logicType, context(), args)
return FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture)
}
private fun <T> startFlow(logicType: Class<out FlowLogic<T>>, args: Array<out Any?>): FlowStateMachine<T> {
override fun <T> startFlowDynamicWithClientId(clientId: String, logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandleWithClientId<T> {
val stateMachine = startFlow(logicType, context().withClientId(clientId), args)
return FlowHandleWithClientIdImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture, clientId = stateMachine.clientId!!)
}
@Suppress("SpreadOperator")
private fun <T> startFlow(logicType: Class<out FlowLogic<T>>, context: InvocationContext, args: Array<out Any?>): FlowStateMachineHandle<T> {
if (!logicType.isAnnotationPresent(StartableByRPC::class.java)) throw NonRpcFlowException(logicType)
if (isFlowsDrainingModeEnabled()) {
throw RejectedCommandException("Node is draining before shutdown. Cannot start new flows through RPC.")
}
return flowStarter.invokeFlowAsync(logicType, context(), *args).getOrThrow()
return flowStarter.invokeFlowAsync(logicType, context, *args).getOrThrow()
}
override fun attachmentExists(id: SecureHash): Boolean {
@ -464,4 +474,6 @@ internal class CordaRPCOpsImpl(
private inline fun <reified TARGET> Class<*>.checkIsA() {
require(TARGET::class.java.isAssignableFrom(this)) { "$name is not a ${TARGET::class.java.name}" }
}
private fun InvocationContext.withClientId(clientId: String) = copy(clientId = clientId)
}

View File

@ -215,13 +215,13 @@ interface FlowStarter {
* just synthesizes an [ExternalEvent.ExternalStartFlowEvent] and calls the method below.
* @param context indicates who started the flow, see: [InvocationContext].
*/
fun <T> startFlow(logic: FlowLogic<T>, context: InvocationContext): CordaFuture<FlowStateMachine<T>>
fun <T> startFlow(logic: FlowLogic<T>, context: InvocationContext): CordaFuture<out FlowStateMachineHandle<T>>
/**
* Starts a flow as described by an [ExternalEvent.ExternalStartFlowEvent]. If a transient error
* occurs during invocation, it will re-attempt to start the flow.
*/
fun <T> startFlow(event: ExternalEvent.ExternalStartFlowEvent<T>): CordaFuture<FlowStateMachine<T>>
fun <T> startFlow(event: ExternalEvent.ExternalStartFlowEvent<T>): CordaFuture<out FlowStateMachineHandle<T>>
/**
* Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the flow.
@ -232,9 +232,10 @@ interface FlowStarter {
* [logicType] or [args].
*/
fun <T> invokeFlowAsync(
logicType: Class<out FlowLogic<T>>,
context: InvocationContext,
vararg args: Any?): CordaFuture<FlowStateMachine<T>>
logicType: Class<out FlowLogic<T>>,
context: InvocationContext,
vararg args: Any?
): CordaFuture<out FlowStateMachineHandle<T>>
}
interface StartedNodeServices : ServiceHubInternal, FlowStarter

View File

@ -258,7 +258,7 @@ class NodeSchedulerService(private val clock: CordaClock,
return "${javaClass.simpleName}($scheduledState)"
}
override fun wireUpFuture(flowFuture: CordaFuture<FlowStateMachine<Any?>>) {
override fun wireUpFuture(flowFuture: CordaFuture<out FlowStateMachineHandle<Any?>>) {
_future.captureLater(flowFuture)
val future = _future.flatMap { it.resultFuture }
future.then {
@ -266,8 +266,8 @@ class NodeSchedulerService(private val clock: CordaClock,
}
}
private val _future = openFuture<FlowStateMachine<Any?>>()
override val future: CordaFuture<FlowStateMachine<Any?>>
private val _future = openFuture<FlowStateMachineHandle<Any?>>()
override val future: CordaFuture<FlowStateMachineHandle<Any?>>
get() = _future
}

View File

@ -13,6 +13,12 @@ internal fun InvocationContext.pushToLoggingContext() {
origin.pushToLoggingContext()
externalTrace?.pushToLoggingContext("external_")
impersonatedActor?.pushToLoggingContext("impersonating_")
clientId?.let {
MDC.getMDCAdapter().apply {
put("client_id", it)
}
}
}
internal fun Trace.pushToLoggingContext(prefix: String = "") {

View File

@ -55,6 +55,7 @@ class DBCheckpointStorage(
private const val MAX_EXC_TYPE_LENGTH = 256
private const val MAX_FLOW_NAME_LENGTH = 128
private const val MAX_PROGRESS_STEP_LENGTH = 256
const val MAX_CLIENT_ID_LENGTH = 512
private val RUNNABLE_CHECKPOINTS = setOf(FlowStatus.RUNNABLE, FlowStatus.HOSPITALIZED)
@ -513,8 +514,7 @@ class DBCheckpointStorage(
// Truncate the flow name to fit into the database column
// Flow names are unlikely to be this long
flowName = flowInfo.flowClass.name.take(MAX_FLOW_NAME_LENGTH),
// will come from the context
userSuppliedIdentifier = null,
userSuppliedIdentifier = context.clientId,
startType = context.getStartedType(),
initialParameters = context.getFlowParameters().storageSerialize().bytes,
launchingCordapp = (flowInfo.subFlowVersion as? SubFlowVersion.CorDappFlow)?.corDappName ?: "Core flow",
@ -627,10 +627,14 @@ class DBCheckpointStorage(
}
}
@Suppress("MagicNumber")
private fun InvocationContext.getFlowParameters(): List<Any?> {
// Only RPC flows have parameters which are found in index 1
return if (arguments.isNotEmpty()) {
uncheckedCast<Any?, Array<Any?>>(arguments[1]).toList()
// Only RPC flows have parameters which are found in index 1 or index 2 (if called with client id)
return if (arguments!!.isNotEmpty()) {
arguments!!.run {
check(size == 2 || size == 3) { "Unexpected argument number provided in rpc call" }
uncheckedCast<Any?, Array<Any?>>(last()).toList()
}
} else {
emptyList()
}

View File

@ -28,6 +28,8 @@ import java.security.SecureRandom
class Flow<A>(val fiber: FlowStateMachineImpl<A>, val resultFuture: OpenFuture<Any?>)
class NonResidentFlow(val runId: StateMachineRunId, val checkpoint: Checkpoint) {
val resultFuture: OpenFuture<Any?> = openFuture()
val externalEvents = mutableListOf<Event.DeliverSessionMessage>()
fun addExternalEvent(message: Event.DeliverSessionMessage) {
@ -62,13 +64,12 @@ class FlowCreator(
}
else -> nonResidentFlow.checkpoint
}
return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint)
return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint, nonResidentFlow.resultFuture)
}
fun createFlowFromCheckpoint(runId: StateMachineRunId, oldCheckpoint: Checkpoint): Flow<*>? {
fun createFlowFromCheckpoint(runId: StateMachineRunId, oldCheckpoint: Checkpoint, resultFuture: OpenFuture<Any?> = openFuture()): Flow<*>? {
val checkpoint = oldCheckpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE)
val fiber = checkpoint.getFiberFromCheckpoint(runId) ?: return null
val resultFuture = openFuture<Any?>()
fiber.transientValues = TransientReference(createTransientValues(runId, resultFuture))
fiber.logic.stateMachine = fiber
verifyFlowLogicIsSuspendable(fiber.logic)

View File

@ -132,6 +132,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override val context: InvocationContext get() = transientState!!.value.checkpoint.checkpointState.invocationContext
override val ourIdentity: Party get() = transientState!!.value.checkpoint.checkpointState.ourIdentity
override val isKilled: Boolean get() = transientState!!.value.isKilled
override val clientId: String?
get() = transientState!!.value.checkpoint.checkpointState.invocationContext.clientId
internal val softLockedStates = mutableSetOf<StateRef>()

View File

@ -13,12 +13,17 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.castIfPossible
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.mapError
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.mapNotNull
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.CheckpointSerializationContext
@ -75,12 +80,21 @@ internal class SingleThreadedStateMachineManager(
) : StateMachineManager, StateMachineManagerInternal {
companion object {
private val logger = contextLogger()
@VisibleForTesting
var beforeClientIDCheck: (() -> Unit)? = null
@VisibleForTesting
var onClientIDNotFound: (() -> Unit)? = null
@VisibleForTesting
var onCallingStartFlowInternal: (() -> Unit)? = null
@VisibleForTesting
var onStartFlowInternalThrewAndAboutToRemove: (() -> Unit)? = null
}
private val innerState = StateMachineInnerStateImpl()
private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor)
private val scheduledFutureExecutor = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryBuilder().setNameFormat("flow-scheduled-future-thread").setDaemon(true).build()
ThreadFactoryBuilder().setNameFormat("flow-scheduled-future-thread").setDaemon(true).build()
)
// How many Fibers are running (this includes suspended flows). If zero and stopping is true, then we are halted.
private val liveFibers = ReusableLatch()
@ -158,6 +172,25 @@ internal class SingleThreadedStateMachineManager(
}
}
}
// at the moment we have RUNNABLE, HOSPITALIZED and PAUSED flows
// - RESULTED flows need to be fetched upon implementing https://r3-cev.atlassian.net/browse/CORDA-3692
// - FAILED flows need to be fetched upon implementing https://r3-cev.atlassian.net/browse/CORDA-3681
// - Incompatible checkpoints need to be handled upon implementing CORDA-3897
for (flow in fibers) {
flow.fiber.clientId?.let {
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(doneFuture(flow.fiber))
}
}
for (pausedFlow in pausedFlows) {
pausedFlow.value.checkpoint.checkpointState.invocationContext.clientId?.let {
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(
doneClientIdFuture(pausedFlow.key, pausedFlow.value.resultFuture, it)
)
}
}
return serviceHub.networkMapCache.nodeReady.map {
logger.info("Node ready, info: ${serviceHub.myInfo}")
resumeRestoredFlows(fibers)
@ -229,21 +262,62 @@ internal class SingleThreadedStateMachineManager(
}
}
@Suppress("ComplexMethod")
private fun <A> startFlow(
flowId: StateMachineRunId,
flowLogic: FlowLogic<A>,
context: InvocationContext,
ourIdentity: Party?,
deduplicationHandler: DeduplicationHandler?
): CordaFuture<FlowStateMachine<A>> {
return startFlowInternal(
): CordaFuture<out FlowStateMachineHandle<A>> {
beforeClientIDCheck?.invoke()
var newFuture: OpenFuture<FlowStateMachineHandle<A>>? = null
val clientId = context.clientId
if (clientId != null) {
var existingFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>? = null
innerState.withLock {
clientIdsToFlowIds.compute(clientId) { _, existingStatus ->
if (existingStatus != null) {
existingFuture = when (existingStatus) {
is FlowWithClientIdStatus.Active -> existingStatus.flowStateMachineFuture
// This below dummy future ('doneFuture(5)') will be populated from DB upon implementing CORDA-3692 and CORDA-3681 - for now just return a dummy future
is FlowWithClientIdStatus.Removed -> doneClientIdFuture(existingStatus.flowId, doneFuture(@Suppress("MagicNumber")5), clientId)
}
existingStatus
} else {
newFuture = openFuture()
FlowWithClientIdStatus.Active(newFuture!!)
}
}
}
if (existingFuture != null) return uncheckedCast(existingFuture)
onClientIDNotFound?.invoke()
}
return try {
startFlowInternal(
flowId,
invocationContext = context,
flowLogic = flowLogic,
flowStart = FlowStart.Explicit,
ourIdentity = ourIdentity ?: ourFirstIdentity,
deduplicationHandler = deduplicationHandler
)
).also {
newFuture?.captureLater(uncheckedCast(it))
}
} catch (t: Throwable) {
onStartFlowInternalThrewAndAboutToRemove?.invoke()
innerState.withLock {
clientIdsToFlowIds.remove(clientId)
newFuture?.setException(t)
}
// Throwing the exception plain here is the same as to return an exceptionally completed future since the caller calls
// getOrThrow() on the returned future at [CordaRPCOpsImpl.startFlow].
throw t
}
}
override fun killFlow(id: StateMachineRunId): Boolean {
@ -591,6 +665,7 @@ internal class SingleThreadedStateMachineManager(
ourIdentity: Party,
deduplicationHandler: DeduplicationHandler?
): CordaFuture<FlowStateMachine<A>> {
onCallingStartFlowInternal?.invoke()
val existingFlow = innerState.withLock { flows[flowId] }
val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState?.value?.isAnyCheckpointPersisted == true) {
@ -736,6 +811,7 @@ internal class SingleThreadedStateMachineManager(
require(lastState.isRemoved) { "Flow must be in removable state before removal" }
require(lastState.checkpoint.checkpointState.subFlowStack.size == 1) { "Checkpointed stack must be empty" }
require(flow.fiber.id !in sessionToFlow.values) { "Flow fibre must not be needed by an existing session" }
flow.fiber.clientId?.let { setClientIdAsSucceeded(it, flow.fiber.id) }
flow.resultFuture.set(removalReason.flowReturnValue)
lastState.flowLogic.progressTracker?.currentStep = ProgressTracker.DONE
changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Success(removalReason.flowReturnValue)))
@ -747,13 +823,14 @@ internal class SingleThreadedStateMachineManager(
lastState: StateMachineState
) {
drainFlowEventQueue(flow)
// Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition])
startedFutures.remove(flow.fiber.id)?.set(Unit)
flow.fiber.clientId?.let { setClientIdAsFailed(it, flow.fiber.id) }
val flowError = removalReason.flowErrors[0] // TODO what to do with several?
val exception = flowError.exception
(exception as? FlowException)?.originalErrorId = flowError.errorId
flow.resultFuture.setException(exception)
lastState.flowLogic.progressTracker?.endWithError(exception)
// Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition])
startedFutures.remove(flow.fiber.id)?.set(Unit)
changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Failure<Nothing>(exception)))
}
@ -781,4 +858,55 @@ internal class SingleThreadedStateMachineManager(
}
}
}
private fun StateMachineInnerState.setClientIdAsSucceeded(clientId: String, id: StateMachineRunId) {
setClientIdAsRemoved(clientId, id, true)
}
private fun StateMachineInnerState.setClientIdAsFailed(clientId: String, id: StateMachineRunId) {
setClientIdAsRemoved(clientId, id, false)
}
private fun StateMachineInnerState.setClientIdAsRemoved(
clientId: String,
id: StateMachineRunId,
succeeded: Boolean
) {
clientIdsToFlowIds.compute(clientId) { _, existingStatus ->
require(existingStatus != null && existingStatus is FlowWithClientIdStatus.Active)
FlowWithClientIdStatus.Removed(id, succeeded)
}
}
/**
* The flow out of which a [doneFuture] will be produced should be a started flow,
* i.e. it should not exist in [mutex.content.startedFutures].
*/
private fun doneClientIdFuture(
id: StateMachineRunId,
resultFuture: CordaFuture<Any?>,
clientId: String
): CordaFuture<FlowStateMachineHandle<Any?>> =
doneFuture(object : FlowStateMachineHandle<Any?> {
override val logic: Nothing? = null
override val id: StateMachineRunId = id
override val resultFuture: CordaFuture<Any?> = resultFuture
override val clientId: String? = clientId
}
)
override fun removeClientId(clientId: String): Boolean {
var removed = false
innerState.withLock {
clientIdsToFlowIds.compute(clientId) { _, existingStatus ->
if (existingStatus != null && existingStatus is FlowWithClientIdStatus.Removed) {
removed = true
null
} else { // don't remove
existingStatus
}
}
}
return removed
}
}

View File

@ -17,6 +17,7 @@ internal interface StateMachineInnerState {
val changesPublisher: PublishSubject<Change>
/** Flows scheduled to be retried if not finished within the specified timeout period. */
val timedFlows: MutableMap<StateMachineRunId, ScheduledTimeout>
val clientIdsToFlowIds: MutableMap<String, FlowWithClientIdStatus>
fun <R> withMutex(block: StateMachineInnerState.() -> R): R
}
@ -30,6 +31,7 @@ internal class StateMachineInnerStateImpl : StateMachineInnerState {
override val pausedFlows = HashMap<StateMachineRunId, NonResidentFlow>()
override val startedFutures = HashMap<StateMachineRunId, OpenFuture<Unit>>()
override val timedFlows = HashMap<StateMachineRunId, ScheduledTimeout>()
override val clientIdsToFlowIds = HashMap<String, FlowWithClientIdStatus>()
override fun <R> withMutex(block: StateMachineInnerState.() -> R): R = lock.withLock { block(this) }
}

View File

@ -5,6 +5,7 @@ import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.messaging.DataFeed
import net.corda.core.utilities.Try
import net.corda.node.services.messaging.DeduplicationHandler
@ -98,6 +99,13 @@ interface StateMachineManager {
* Returns a snapshot of all [FlowStateMachineImpl]s currently managed.
*/
fun snapshot(): Set<FlowStateMachineImpl<*>>
/**
* Removes a flow's [clientId] to result/ exception mapping.
*
* @return whether the mapping was removed.
*/
fun removeClientId(clientId: String): Boolean
}
// These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call
@ -139,11 +147,11 @@ interface ExternalEvent {
/**
* A callback for the state machine to pass back the [CordaFuture] associated with the flow start to the submitter.
*/
fun wireUpFuture(flowFuture: CordaFuture<FlowStateMachine<T>>)
fun wireUpFuture(flowFuture: CordaFuture<out FlowStateMachineHandle<T>>)
/**
* The future representing the flow start, passed back from the state machine to the submitter of this event.
*/
val future: CordaFuture<FlowStateMachine<T>>
val future: CordaFuture<out FlowStateMachineHandle<T>>
}
}

View File

@ -1,12 +1,15 @@
package net.corda.node.services.statemachine
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
@ -109,8 +112,8 @@ data class Checkpoint(
listOf(topLevelSubFlow),
numberOfSuspends = 0
),
errorState = ErrorState.Clean,
flowState = FlowState.Unstarted(flowStart, frozenFlowLogic)
flowState = FlowState.Unstarted(flowStart, frozenFlowLogic),
errorState = ErrorState.Clean
)
}
}
@ -380,3 +383,8 @@ sealed class SubFlowVersion {
data class CoreFlow(override val platformVersion: Int) : SubFlowVersion()
data class CorDappFlow(override val platformVersion: Int, val corDappName: String, val corDappHash: SecureHash) : SubFlowVersion()
}
sealed class FlowWithClientIdStatus {
data class Active(val flowStateMachineFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>) : FlowWithClientIdStatus()
data class Removed(val flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus()
}

View File

@ -178,7 +178,7 @@ class TopLevelTransition(
private fun suspendTransition(event: Event.Suspend): TransitionResult {
return builder {
val newCheckpoint = currentState.checkpoint.run {
val newCheckpointState = if (checkpointState.invocationContext.arguments.isNotEmpty()) {
val newCheckpointState = if (checkpointState.invocationContext.arguments!!.isNotEmpty()) {
checkpointState.copy(
invocationContext = checkpointState.invocationContext.copy(arguments = emptyList()),
numberOfSuspends = checkpointState.numberOfSuspends + 1

View File

@ -0,0 +1,417 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.concurrent.Semaphore
import net.corda.core.flows.FlowLogic
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.startFlowWithClientId
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import java.lang.IllegalStateException
import java.sql.SQLTransientConnectionException
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class FlowClientIdTests {
private lateinit var mockNet: InternalMockNetwork
private lateinit var aliceNode: TestStartedNode
@Before
fun setUpMockNet() {
mockNet = InternalMockNetwork(
cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP, FINANCE_CONTRACTS_CORDAPP),
servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin()
)
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
}
@After
fun cleanUp() {
mockNet.stopNodes()
ResultFlow.hook = null
ResultFlow.suspendableHook = null
SingleThreadedStateMachineManager.beforeClientIDCheck = null
SingleThreadedStateMachineManager.onClientIDNotFound = null
SingleThreadedStateMachineManager.onCallingStartFlowInternal = null
SingleThreadedStateMachineManager.onStartFlowInternalThrewAndAboutToRemove = null
}
@Test(timeout=300_000)
fun `no new flow starts if the client id provided pre exists`() {
var counter = 0
ResultFlow.hook = { counter++ }
val clientId = UUID.randomUUID().toString()
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow()
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow()
Assert.assertEquals(1, counter)
}
@Test(timeout=300_000)
fun `flow's result is retrievable after flow's lifetime, when flow is started with a client id - different parameters are ignored`() {
val clientId = UUID.randomUUID().toString()
val handle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
val clientId0 = handle0.clientId
val flowId0 = handle0.id
val result0 = handle0.resultFuture.getOrThrow()
val handle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10))
val clientId1 = handle1.clientId
val flowId1 = handle1.id
val result1 = handle1.resultFuture.getOrThrow()
Assert.assertEquals(clientId0, clientId1)
Assert.assertEquals(flowId0, flowId1)
Assert.assertEquals(result0, result1)
}
@Test(timeout=300_000)
fun `flow's result is available if reconnect after flow had retried from previous checkpoint, when flow is started with a client id`() {
var firstRun = true
ResultFlow.hook = {
if (firstRun) {
firstRun = false
throw SQLTransientConnectionException("connection is not available")
}
}
val clientId = UUID.randomUUID().toString()
val result0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow()
val result1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow()
Assert.assertEquals(result0, result1)
}
@Test(timeout=300_000)
fun `flow's result is available if reconnect during flow's retrying from previous checkpoint, when flow is started with a client id`() {
var firstRun = true
val waitForSecondRequest = Semaphore(0)
val waitUntilFlowHasRetried = Semaphore(0)
ResultFlow.suspendableHook = object : FlowLogic<Unit>() {
@Suspendable
override fun call() {
if (firstRun) {
firstRun = false
throw SQLTransientConnectionException("connection is not available")
} else {
waitUntilFlowHasRetried.release()
waitForSecondRequest.acquire()
}
}
}
var result1 = 0
val clientId = UUID.randomUUID().toString()
val handle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
waitUntilFlowHasRetried.acquire()
val t = thread { result1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow() }
Thread.sleep(1000)
waitForSecondRequest.release()
val result0 = handle0.resultFuture.getOrThrow()
t.join()
Assert.assertEquals(result0, result1)
}
@Ignore // this is to be unignored upon implementing CORDA-3681
@Test(timeout=300_000)
fun `flow's exception is available after flow's lifetime if flow is started with a client id`() {
ResultFlow.hook = { throw IllegalStateException() }
val clientId = UUID.randomUUID().toString()
assertFailsWith<IllegalStateException> {
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow()
}
assertFailsWith<IllegalStateException> {
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow()
}
}
@Test(timeout=300_000)
fun `flow's client id mapping gets removed upon request`() {
val clientId = UUID.randomUUID().toString()
var counter = 0
ResultFlow.hook = { counter++ }
val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle0.resultFuture.getOrThrow(20.seconds)
val removed = aliceNode.smm.removeClientId(clientId)
// On new request with clientId, after the same clientId was removed, a brand new flow will start with that clientId
val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle1.resultFuture.getOrThrow(20.seconds)
assertTrue(removed)
Assert.assertNotEquals(flowHandle0.id, flowHandle1.id)
Assert.assertEquals(flowHandle0.clientId, flowHandle1.clientId)
Assert.assertEquals(2, counter)
}
@Test(timeout=300_000)
fun `flow's client id mapping can only get removed once the flow gets removed`() {
val clientId = UUID.randomUUID().toString()
var tries = 0
val maxTries = 10
var failedRemovals = 0
val semaphore = Semaphore(0)
ResultFlow.suspendableHook = object : FlowLogic<Unit>() {
@Suspendable
override fun call() {
semaphore.acquire()
}
}
val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
var removed = false
while (!removed) {
removed = aliceNode.smm.removeClientId(clientId)
if (!removed) ++failedRemovals
++tries
if (tries >= maxTries) {
semaphore.release()
flowHandle0.resultFuture.getOrThrow(20.seconds)
}
}
assertTrue(removed)
Assert.assertEquals(maxTries, failedRemovals)
}
@Test(timeout=300_000)
fun `only one flow starts upon concurrent requests with the same client id`() {
val requests = 2
val counter = AtomicInteger(0)
val resultsCounter = AtomicInteger(0)
ResultFlow.hook = { counter.incrementAndGet() }
//(aliceNode.smm as SingleThreadedStateMachineManager).concurrentRequests = true
val clientId = UUID.randomUUID().toString()
val threads = arrayOfNulls<Thread>(requests)
for (i in 0 until requests) {
threads[i] = Thread {
val result = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow()
resultsCounter.addAndGet(result)
}
}
val beforeCount = AtomicInteger(0)
SingleThreadedStateMachineManager.beforeClientIDCheck = {
beforeCount.incrementAndGet()
}
val clientIdNotFound = Semaphore(0)
val waitUntilClientIdNotFound = Semaphore(0)
SingleThreadedStateMachineManager.onClientIDNotFound = {
// Only the first request should reach this point
waitUntilClientIdNotFound.release()
clientIdNotFound.acquire()
}
for (i in 0 until requests) {
threads[i]!!.start()
}
waitUntilClientIdNotFound.acquire()
for (i in 0 until requests) {
clientIdNotFound.release()
}
for (thread in threads) {
thread!!.join()
}
Assert.assertEquals(1, counter.get())
Assert.assertEquals(2, beforeCount.get())
Assert.assertEquals(10, resultsCounter.get())
}
@Test(timeout=300_000)
fun `on node start -running- flows with client id are hook-able`() {
val clientId = UUID.randomUUID().toString()
var noSecondFlowWasSpawned = 0
var firstRun = true
var firstFiber: Fiber<out Any?>? = null
val flowIsRunning = Semaphore(0)
val waitUntilFlowIsRunning = Semaphore(0)
ResultFlow.suspendableHook = object : FlowLogic<Unit>() {
@Suspendable
override fun call() {
if (firstRun) {
firstFiber = Fiber.currentFiber()
firstRun = false
}
waitUntilFlowIsRunning.release()
try {
flowIsRunning.acquire() // make flow wait here to impersonate a running flow
} catch (e: InterruptedException) {
flowIsRunning.release()
throw e
}
noSecondFlowWasSpawned++
}
}
val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
waitUntilFlowIsRunning.acquire()
aliceNode.internals.acceptableLiveFiberCountOnStop = 1
val aliceNode = mockNet.restartNode(aliceNode)
// Blow up the first fiber running our flow as it is leaked here, on normal node shutdown that fiber should be gone
firstFiber!!.interrupt()
waitUntilFlowIsRunning.acquire()
// Re-hook a running flow
val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowIsRunning.release()
Assert.assertEquals(flowHandle0.id, flowHandle1.id)
Assert.assertEquals(clientId, flowHandle1.clientId)
Assert.assertEquals(5, flowHandle1.resultFuture.getOrThrow(20.seconds))
Assert.assertEquals(1, noSecondFlowWasSpawned)
}
// @Test(timeout=300_000)
// fun `on node restart -paused- flows with client id are hook-able`() {
// val clientId = UUID.randomUUID().toString()
// var noSecondFlowWasSpawned = 0
// var firstRun = true
// var firstFiber: Fiber<out Any?>? = null
// val flowIsRunning = Semaphore(0)
// val waitUntilFlowIsRunning = Semaphore(0)
//
// ResultFlow.suspendableHook = object : FlowLogic<Unit>() {
// @Suspendable
// override fun call() {
// if (firstRun) {
// firstFiber = Fiber.currentFiber()
// firstRun = false
// }
//
// waitUntilFlowIsRunning.release()
// try {
// flowIsRunning.acquire() // make flow wait here to impersonate a running flow
// } catch (e: InterruptedException) {
// flowIsRunning.release()
// throw e
// }
//
// noSecondFlowWasSpawned++
// }
// }
//
// val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
// waitUntilFlowIsRunning.acquire()
// aliceNode.internals.acceptableLiveFiberCountOnStop = 1
// // Pause the flow on node restart
// val aliceNode = mockNet.restartNode(aliceNode,
// InternalMockNodeParameters(
// configOverrides = {
// doReturn(StateMachineManager.StartMode.Safe).whenever(it).smmStartMode
// }
// ))
// // Blow up the first fiber running our flow as it is leaked here, on normal node shutdown that fiber should be gone
// firstFiber!!.interrupt()
//
// // Re-hook a paused flow
// val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
//
// Assert.assertEquals(flowHandle0.id, flowHandle1.id)
// Assert.assertEquals(clientId, flowHandle1.clientId)
// aliceNode.smm.unPauseFlow(flowHandle1.id)
// Assert.assertEquals(5, flowHandle1.resultFuture.getOrThrow(20.seconds))
// Assert.assertEquals(1, noSecondFlowWasSpawned)
// }
@Test(timeout=300_000)
fun `On 'startFlowInternal' throwing, subsequent request with same client id does not get de-duplicated and starts a new flow`() {
val clientId = UUID.randomUUID().toString()
var firstRequest = true
SingleThreadedStateMachineManager.onCallingStartFlowInternal = {
if (firstRequest) {
firstRequest = false
throw IllegalStateException("Yet another one")
}
}
var counter = 0
ResultFlow.hook = { counter++ }
assertFailsWith<IllegalStateException> {
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
}
val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle1.resultFuture.getOrThrow(20.seconds)
assertEquals(clientId, flowHandle1.clientId)
assertEquals(1, counter)
}
// @Test(timeout=300_000)
// fun `On 'startFlowInternal' throwing, subsequent request with same client hits the time window in which the previous request was about to remove the client id mapping`() {
// val clientId = UUID.randomUUID().toString()
// var firstRequest = true
// SingleThreadedStateMachineManager.onCallingStartFlowInternal = {
// if (firstRequest) {
// firstRequest = false
// throw IllegalStateException("Yet another one")
// }
// }
//
// val wait = Semaphore(0)
// val waitForFirstRequest = Semaphore(0)
// SingleThreadedStateMachineManager.onStartFlowInternalThrewAndAboutToRemove = {
// waitForFirstRequest.release()
// wait.acquire()
// Thread.sleep(10000)
// }
// var counter = 0
// ResultFlow.hook = { counter++ }
//
// thread {
// assertFailsWith<IllegalStateException> {
// aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
// }
// }
//
// waitForFirstRequest.acquire()
// wait.release()
// assertFailsWith<IllegalStateException> {
// // the subsequent request will not hang on a never ending future, because the previous request ,upon failing, will also complete the future exceptionally
// aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
// }
//
// assertEquals(0, counter)
// }
}
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
companion object {
var hook: (() -> Unit)? = null
var suspendableHook: FlowLogic<Unit>? = null
}
@Suspendable
override fun call(): A {
hook?.invoke()
suspendableHook?.let { subFlow(it) }
return result
}
}

View File

@ -2,6 +2,7 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.CordaRuntimeException
import net.corda.core.context.InvocationContext
import net.corda.core.contracts.BelongsToContract
import net.corda.core.contracts.LinearState
@ -46,12 +47,14 @@ import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import java.time.Instant
import java.util.UUID
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import java.util.concurrent.Semaphore
import java.util.function.Supplier
import kotlin.reflect.jvm.jvmName
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
@ -87,9 +90,11 @@ class FlowMetadataRecordingTest {
metadata = metadataFromHook
}
val clientId = UUID.randomUUID().toString()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(
::MyFlow,
it.proxy.startFlowDynamicWithClientId(
clientId,
MyFlow::class.java,
nodeBHandle.nodeInfo.singleIdentity(),
string,
someObject
@ -101,7 +106,7 @@ class FlowMetadataRecordingTest {
assertEquals(flowId!!.uuid.toString(), it.flowId)
assertEquals(MyFlow::class.java.name, it.flowName)
// Should be changed when [userSuppliedIdentifier] gets filled in future changes
assertNull(it.userSuppliedIdentifier)
assertEquals(clientId, it.userSuppliedIdentifier)
assertEquals(DBCheckpointStorage.StartReason.RPC, it.startType)
assertEquals(
listOf(nodeBHandle.nodeInfo.singleIdentity(), string, someObject),
@ -192,7 +197,7 @@ class FlowMetadataRecordingTest {
assertEquals(
listOf(nodeBHandle.nodeInfo.singleIdentity(), string, someObject),
uncheckedCast<Any?, Array<Any?>>(context!!.arguments[1]).toList()
uncheckedCast<Any?, Array<Any?>>(context!!.arguments!![1]).toList()
)
assertEquals(
listOf(nodeBHandle.nodeInfo.singleIdentity(), string, someObject),
@ -393,6 +398,19 @@ class FlowMetadataRecordingTest {
}
}
@Test(timeout = 300_000)
fun `assert that flow started with longer client id than MAX_CLIENT_ID_LENGTH fails`() {
val clientId = "1".repeat(513) // DBCheckpointStorage.MAX_CLIENT_ID_LENGTH == 512
driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val rpc = CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).proxy
assertFailsWith<CordaRuntimeException>("clientId cannot be longer than ${DBCheckpointStorage.MAX_CLIENT_ID_LENGTH} characters") {
rpc.startFlowDynamicWithClientId(clientId, EmptyFlow::class.java).returnValue.getOrThrow()
}
}
}
@InitiatingFlow
@StartableByRPC
@StartableByService
@ -553,4 +571,11 @@ class FlowMetadataRecordingTest {
return ScheduledActivity(logicRef, Instant.now())
}
}
@StartableByRPC
class EmptyFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
}
}
}

View File

@ -11,7 +11,6 @@ import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.KilledFlowException
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.messaging.MessageRecipients
import net.corda.core.utilities.UntrustworthyData
@ -148,7 +147,7 @@ class RetryFlowMockTest {
// Make sure we have seen an update from the hospital, and thus the flow went there.
val alice = TestIdentity(CordaX500Name.parse("L=London,O=Alice Ltd,OU=Trade,C=GB")).party
val records = nodeA.smm.flowHospital.track().updates.toBlocking().toIterable().iterator()
val flow: FlowStateMachine<Unit> = nodeA.services.startFlow(FinalityHandler(object : FlowSession() {
val flow = nodeA.services.startFlow(FinalityHandler(object : FlowSession() {
override val destination: Destination get() = alice
override val counterparty: Party get() = alice

View File

@ -2,23 +2,23 @@ package net.corda.coretesting.internal.matchers.flow
import com.natpryce.hamkrest.Matcher
import com.natpryce.hamkrest.equalTo
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.coretesting.internal.matchers.*
/**
* Matches a Flow that succeeds with a result matched by the given matcher
*/
fun <T> willReturn(): Matcher<FlowStateMachine<T>> = net.corda.coretesting.internal.matchers.future.willReturn<T>()
.extrude(FlowStateMachine<T>::resultFuture)
fun <T> willReturn(): Matcher<FlowStateMachineHandle<T>> = net.corda.coretesting.internal.matchers.future.willReturn<T>()
.extrude(FlowStateMachineHandle<T>::resultFuture)
.redescribe { "is a flow that will return" }
fun <T> willReturn(expected: T): Matcher<FlowStateMachine<T>> = willReturn(equalTo(expected))
fun <T> willReturn(expected: T): Matcher<FlowStateMachineHandle<T>> = willReturn(equalTo(expected))
/**
* Matches a Flow that succeeds with a result matched by the given matcher
*/
fun <T> willReturn(successMatcher: Matcher<T>) = net.corda.coretesting.internal.matchers.future.willReturn(successMatcher)
.extrude(FlowStateMachine<out T>::resultFuture)
.extrude(FlowStateMachineHandle<out T>::resultFuture)
.redescribe { "is a flow that will return with a value that ${successMatcher.description}" }
/**
@ -26,7 +26,7 @@ fun <T> willReturn(successMatcher: Matcher<T>) = net.corda.coretesting.internal.
*/
inline fun <reified E: Exception> willThrow(failureMatcher: Matcher<E>) =
net.corda.coretesting.internal.matchers.future.willThrow(failureMatcher)
.extrude(FlowStateMachine<*>::resultFuture)
.extrude(FlowStateMachineHandle<*>::resultFuture)
.redescribe { "is a flow that will fail, throwing an exception that ${failureMatcher.description}" }
/**
@ -34,5 +34,5 @@ inline fun <reified E: Exception> willThrow(failureMatcher: Matcher<E>) =
*/
inline fun <reified E: Exception> willThrow() =
net.corda.coretesting.internal.matchers.future.willThrow<E>()
.extrude(FlowStateMachine<*>::resultFuture)
.extrude(FlowStateMachineHandle<*>::resultFuture)
.redescribe { "is a flow that will fail with an exception of type ${E::class.java.simpleName}" }

View File

@ -7,7 +7,7 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.div
@ -256,7 +256,10 @@ class NodeListenProcessDeathException(hostAndPort: NetworkHostAndPort, listenPro
""".trimIndent()
)
fun <T> StartedNodeServices.startFlow(logic: FlowLogic<T>): FlowStateMachine<T> = startFlow(logic, newContext()).getOrThrow()
fun <T> StartedNodeServices.startFlow(logic: FlowLogic<T>): FlowStateMachineHandle<T> = startFlow(logic, newContext()).getOrThrow()
fun <T> StartedNodeServices.startFlowWithClientId(clientId: String, logic: FlowLogic<T>): FlowStateMachineHandle<T> =
startFlow(logic, newContext().copy(clientId = clientId)).getOrThrow()
fun StartedNodeServices.newContext(): InvocationContext = testContext(myInfo.chooseIdentity().name)