From 22d92d5ef0fbd6c42b570b19546e30547fa55561 Mon Sep 17 00:00:00 2001 From: Kyriakos Tharrouniatis Date: Thu, 16 Jul 2020 10:52:08 +0100 Subject: [PATCH] CORDA-3809 Expose client side unique RPC ID for flow starts (#6307) Introducing a new flow start method (`startFlowDynamicWithClientId`) passing in a `clientId`. Once `startFlowDynamicWithClientId` gets called, the `clientId` gets injected into `InvocationContext` and also pushed to the logging context. If a new flow starts with this method, then a < `clientId` to flow > pair is kept on node side, even after the flow's lifetime. If `startFlowDynamicWithClientId` is called again with the same `clientId` then the node identifies that this `clientId` refers to an existing < `clientId` to flow > pair and returns back to the rpc client a `FlowStateMachineHandle` future, created out of that pair. `FlowStateMachineHandle` interface was introduced as a thinner `FlowStateMachine`. All `FlowStateMachine` properties used by call sites are moved into this new interface along with `clientId` and then `FlowStateMachine` extends it. Introducing an acknowledgement method (`removeClientId`). Calling this method removes the < `clientId` to flow > pair on the node side and frees resources. --- .../net/corda/coretests/flows/WithFinality.kt | 4 +- .../net/corda/coretests/flows/WithMockNet.kt | 6 +- .../corda/core/context/InvocationContext.kt | 11 +- .../corda/core/internal/FlowStateMachine.kt | 16 +- .../net/corda/core/messaging/CordaRPCOps.kt | 92 ++++ .../net/corda/core/messaging/FlowHandle.kt | 20 + detekt-baseline.xml | 70 +-- .../corda/node/flows/FlowWithClientIdTest.kt | 77 ++++ .../net/corda/node/internal/AbstractNode.kt | 29 +- .../corda/node/internal/AppServiceHubImpl.kt | 6 +- .../corda/node/internal/CordaRPCOpsImpl.kt | 28 +- .../node/services/api/ServiceHubInternal.kt | 11 +- .../services/events/NodeSchedulerService.kt | 6 +- .../logging/ContextualLoggingUtils.kt | 6 + .../persistence/DBCheckpointStorage.kt | 14 +- .../node/services/statemachine/FlowCreator.kt | 7 +- .../statemachine/FlowStateMachineImpl.kt | 2 + .../SingleThreadedStateMachineManager.kt | 140 +++++- .../statemachine/StateMachineInnerState.kt | 2 + .../statemachine/StateMachineManager.kt | 12 +- .../statemachine/StateMachineState.kt | 12 +- .../transitions/TopLevelTransition.kt | 2 +- .../statemachine/FlowClientIdTests.kt | 417 ++++++++++++++++++ .../statemachine/FlowMetadataRecordingTest.kt | 33 +- .../statemachine/RetryFlowMockTest.kt | 3 +- .../internal/matchers/flow/FlowMatchers.kt | 14 +- .../node/internal/InternalTestUtils.kt | 7 +- 27 files changed, 903 insertions(+), 144 deletions(-) create mode 100644 node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt create mode 100644 node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/WithFinality.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/WithFinality.kt index 5e1daa8a09..9ed9b04679 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/WithFinality.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/WithFinality.kt @@ -6,7 +6,7 @@ import com.natpryce.hamkrest.Matcher import com.natpryce.hamkrest.equalTo import net.corda.core.flows.* import net.corda.core.identity.Party -import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.startFlow @@ -16,7 +16,7 @@ import net.corda.testing.node.internal.TestStartedNode interface WithFinality : WithMockNet { //region Operations - fun TestStartedNode.finalise(stx: SignedTransaction, vararg recipients: Party): FlowStateMachine { + fun TestStartedNode.finalise(stx: SignedTransaction, vararg recipients: Party): FlowStateMachineHandle { return startFlowAndRunNetwork(FinalityInvoker(stx, recipients.toSet(), emptySet())) } diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/WithMockNet.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/WithMockNet.kt index 8069b6d807..4a4574112e 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/WithMockNet.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/WithMockNet.kt @@ -6,7 +6,7 @@ import net.corda.core.flows.FlowLogic import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate -import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.testing.core.makeUnique @@ -48,12 +48,12 @@ interface WithMockNet { /** * Start a flow */ - fun TestStartedNode.startFlow(logic: FlowLogic): FlowStateMachine = services.startFlow(logic) + fun TestStartedNode.startFlow(logic: FlowLogic): FlowStateMachineHandle = services.startFlow(logic) /** * Start a flow and run the network immediately afterwards */ - fun TestStartedNode.startFlowAndRunNetwork(logic: FlowLogic): FlowStateMachine = + fun TestStartedNode.startFlowAndRunNetwork(logic: FlowLogic): FlowStateMachineHandle = startFlow(logic).andRunNetwork() fun TestStartedNode.createConfidentialIdentity(party: Party) = diff --git a/core/src/main/kotlin/net/corda/core/context/InvocationContext.kt b/core/src/main/kotlin/net/corda/core/context/InvocationContext.kt index ef90810b05..06e9210801 100644 --- a/core/src/main/kotlin/net/corda/core/context/InvocationContext.kt +++ b/core/src/main/kotlin/net/corda/core/context/InvocationContext.kt @@ -24,7 +24,8 @@ data class InvocationContext( val actor: Actor?, val externalTrace: Trace? = null, val impersonatedActor: Actor? = null, - val arguments: List = emptyList() + val arguments: List? = emptyList(), // 'arguments' is nullable so that a - >= 4.6 version - RPC client can be backwards compatible against - < 4.6 version - nodes + val clientId: String? = null ) { constructor( @@ -49,8 +50,9 @@ data class InvocationContext( actor: Actor? = null, externalTrace: Trace? = null, impersonatedActor: Actor? = null, - arguments: List = emptyList() - ) = InvocationContext(origin, trace, actor, externalTrace, impersonatedActor, arguments) + arguments: List = emptyList(), + clientId: String? = null + ) = InvocationContext(origin, trace, actor, externalTrace, impersonatedActor, arguments, clientId) /** * Creates an [InvocationContext] with [InvocationOrigin.RPC] origin. @@ -113,7 +115,8 @@ data class InvocationContext( actor = actor, externalTrace = externalTrace, impersonatedActor = impersonatedActor, - arguments = arguments + arguments = arguments, + clientId = clientId ) } } diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt index c8a96da1cd..42db120f36 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt @@ -11,10 +11,19 @@ import net.corda.core.node.ServiceHub import net.corda.core.serialization.SerializedBytes import org.slf4j.Logger +@DeleteForDJVM +@DoNotImplement +interface FlowStateMachineHandle { + val logic: FlowLogic? + val id: StateMachineRunId + val resultFuture: CordaFuture + val clientId: String? +} + /** This is an internal interface that is implemented by code in the node module. You should look at [FlowLogic]. */ @DeleteForDJVM @DoNotImplement -interface FlowStateMachine { +interface FlowStateMachine : FlowStateMachineHandle { @Suspendable fun suspend(ioRequest: FlowIORequest, maySkipCheckpoint: Boolean): SUSPENDRETURN @@ -38,14 +47,11 @@ interface FlowStateMachine { fun updateTimedFlowTimeout(timeoutSeconds: Long) - val logic: FlowLogic val serviceHub: ServiceHub val logger: Logger - val id: StateMachineRunId - val resultFuture: CordaFuture val context: InvocationContext val ourIdentity: Party val ourSenderUUID: String? val creationTime: Long val isKilled: Boolean -} +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index 6098b0c707..0d1a586c33 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -264,6 +264,16 @@ interface CordaRPCOps : RPCOps { @RPCReturnsObservables fun startFlowDynamic(logicType: Class>, vararg args: Any?): FlowHandle + /** + * Start the given flow with the given arguments and a [clientId]. [logicType] must be annotated + * with [net.corda.core.flows.StartableByRPC]. The flow's result/ exception will be available for the client to + * re-connect and retrieve them even after the flow's lifetime, by re-calling [startFlowDynamicWithClientId] with the same + * [clientId]. Upon calling [removeClientId], the node's resources holding the result/ exception will be freed + * and the result/ exception will no longer be available. + */ + @RPCReturnsObservables + fun startFlowDynamicWithClientId(clientId: String, logicType: Class>, vararg args: Any?): FlowHandleWithClientId + /** * Start the given flow with the given arguments, returning an [Observable] with a single observation of the * result of running the flow. [logicType] must be annotated with [net.corda.core.flows.StartableByRPC]. @@ -278,6 +288,15 @@ interface CordaRPCOps : RPCOps { */ fun killFlow(id: StateMachineRunId): Boolean + /** + * Removes a flow's [clientId] to result/ exception mapping. If the mapping is of a running flow, then the mapping will not get removed. + * + * See [startFlowDynamicWithClientId] for more information. + * + * @return whether the mapping was removed. + */ + fun removeClientId(clientId: String): Boolean + /** Returns Node's NodeInfo, assuming this will not change while the node is running. */ fun nodeInfo(): NodeInfo @@ -542,6 +561,79 @@ inline fun > CordaRPCOps.startFlow arg5: F ): FlowHandle = startFlowDynamic(R::class.java, arg0, arg1, arg2, arg3, arg4, arg5) +/** + * Extension function for type safe invocation of flows from Kotlin, with [clientId]. + */ +@Suppress("unused") +inline fun > CordaRPCOps.startFlowWithClientId( + clientId: String, + @Suppress("unused_parameter") + flowConstructor: () -> R +): FlowHandleWithClientId = startFlowDynamicWithClientId(clientId, R::class.java) + +@Suppress("unused") +inline fun > CordaRPCOps.startFlowWithClientId( + clientId: String, + @Suppress("unused_parameter") + flowConstructor: (A) -> R, + arg0: A +): FlowHandleWithClientId = startFlowDynamicWithClientId(clientId, R::class.java, arg0) + +@Suppress("unused") +inline fun > CordaRPCOps.startFlowWithClientId( + clientId: String, + @Suppress("unused_parameter") + flowConstructor: (A, B) -> R, + arg0: A, + arg1: B +): FlowHandleWithClientId = startFlowDynamicWithClientId(clientId, R::class.java, arg0, arg1) + +@Suppress("unused") +inline fun > CordaRPCOps.startFlowWithClientId( + clientId: String, + @Suppress("unused_parameter") + flowConstructor: (A, B, C) -> R, + arg0: A, + arg1: B, + arg2: C +): FlowHandleWithClientId = startFlowDynamicWithClientId(clientId, R::class.java, arg0, arg1, arg2) + +@Suppress("unused") +inline fun > CordaRPCOps.startFlowWithClientId( + clientId: String, + @Suppress("unused_parameter") + flowConstructor: (A, B, C, D) -> R, + arg0: A, + arg1: B, + arg2: C, + arg3: D +): FlowHandleWithClientId = startFlowDynamicWithClientId(clientId, R::class.java, arg0, arg1, arg2, arg3) + +@Suppress("unused") +inline fun > CordaRPCOps.startFlowWithClientId( + clientId: String, + @Suppress("unused_parameter") + flowConstructor: (A, B, C, D, E) -> R, + arg0: A, + arg1: B, + arg2: C, + arg3: D, + arg4: E +): FlowHandleWithClientId = startFlowDynamicWithClientId(clientId, R::class.java, arg0, arg1, arg2, arg3, arg4) + +@Suppress("unused") +inline fun > CordaRPCOps.startFlowWithClientId( + clientId: String, + @Suppress("unused_parameter") + flowConstructor: (A, B, C, D, E, F) -> R, + arg0: A, + arg1: B, + arg2: C, + arg3: D, + arg4: E, + arg5: F +): FlowHandleWithClientId = startFlowDynamicWithClientId(clientId, R::class.java, arg0, arg1, arg2, arg3, arg4, arg5) + /** * Extension function for type safe invocation of flows from Kotlin, with progress tracking enabled. */ diff --git a/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt b/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt index 4d540d69c8..88bff4fe6d 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt @@ -28,6 +28,14 @@ interface FlowHandle : AutoCloseable { override fun close() } +interface FlowHandleWithClientId : FlowHandle { + + /** + * The [clientId] with which the client has started the flow. + */ + val clientId: String +} + /** * [FlowProgressHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value. */ @@ -66,6 +74,18 @@ data class FlowHandleImpl( } } +@CordaSerializable +data class FlowHandleWithClientIdImpl( + override val id: StateMachineRunId, + override val returnValue: CordaFuture, + override val clientId: String) : FlowHandleWithClientId { + + // Remember to add @Throws to FlowHandle.close() if this throws an exception. + override fun close() { + returnValue.cancel(false) + } +} + @CordaSerializable data class FlowProgressHandleImpl @JvmOverloads constructor( override val id: StateMachineRunId, diff --git a/detekt-baseline.xml b/detekt-baseline.xml index 8e72535cd4..e57ef1360b 100644 --- a/detekt-baseline.xml +++ b/detekt-baseline.xml @@ -9,23 +9,11 @@ ClassNaming:BuyerFlow.kt$BuyerFlow$STARTING_BUY : Step ClassNaming:CompositeMemberCompositeSchemaToClassCarpenterTests.kt$I_ ClassNaming:CordaServiceTest.kt$CordaServiceTest.DummyServiceFlow.Companion$TEST_STEP : Step - ClassNaming:CustomVaultQuery.kt$TopupIssuerFlow.TopupIssuer.Companion$AWAITING_REQUEST : Step - ClassNaming:CustomVaultQuery.kt$TopupIssuerFlow.TopupIssuer.Companion$SENDING_TOP_UP_ISSUE_REQUEST : Step ClassNaming:DeserializeNeedingCarpentryTests.kt$DeserializeNeedingCarpentryTests$outer ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$FINALISING_TRANSACTION : Step ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$GENERATING_TRANSACTION : Step ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$SIGNING_TRANSACTION : Step ClassNaming:FlowCheckpointCordapp.kt$SendMessageFlow.Companion$VERIFYING_TRANSACTION : Step - ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$EXTRACTING_VAULT_STATES : Step - ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$ID_OTHER_NODES : Step - ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$OTHER_TX_COMPONENTS : Step - ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$SENDING_AND_RECEIVING_DATA : Step - ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$SIGS_GATHERING : Step - ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_BUILDING : Step - ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_SIGNING : Step - ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$TX_VERIFICATION : Step - ClassNaming:FlowCookbook.kt$InitiatorFlow.Companion$VERIFYING_SIGS : Step - ClassNaming:FlowCookbook.kt$ResponderFlow.Companion$RECEIVING_AND_SENDING_DATA : Step ClassNaming:FlowFrameworkTests.kt$ExceptionFlow$START_STEP : Step ClassNaming:FlowFrameworkTests.kt$InitiatedReceiveFlow$RECEIVED_STEP : Step ClassNaming:FlowFrameworkTests.kt$InitiatedReceiveFlow$START_STEP : Step @@ -178,7 +166,6 @@ ComplexMethod:RPCServer.kt$RPCServer$private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) ComplexMethod:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ReconnectingRPCConnection$ private tailrec fun establishConnectionWithRetry( retryInterval: Duration, roundRobinIndex: Int = 0, retries: Int = -1 ): CordaRPCConnection? ComplexMethod:RemoteTypeCarpenter.kt$SchemaBuildingRemoteTypeCarpenter$override fun carpent(typeInformation: RemoteTypeInformation): Type - ComplexMethod:RpcReconnectTests.kt$RpcReconnectTests$ @Test(timeout=300_000) fun `test that the RPC client is able to reconnect and proceed after node failure, restart, or connection reset`() ComplexMethod:SchemaMigration.kt$SchemaMigration$ private fun migrateOlderDatabaseToUseLiquibase(existingCheckpoints: Boolean): Boolean ComplexMethod:SchemaMigration.kt$SchemaMigration$private fun doRunMigration( run: Boolean, check: Boolean, existingCheckpoints: Boolean? = null ) ComplexMethod:SendTransactionFlow.kt$DataVendingFlow$@Suspendable override fun call(): Void? @@ -310,7 +297,6 @@ ForbiddenComment:CordappProviderImplTests.kt$CordappProviderImplTests.Companion$// TODO: Cordapp name should differ from the JAR name ForbiddenComment:CoreFlowHandlers.kt$NotaryChangeHandler$// TODO: Right now all nodes will automatically approve the notary change. We need to figure out if stricter controls are necessary. ForbiddenComment:CrossCashTest.kt$CrossCashState$// TODO: Alternative: We may possibly reduce the complexity of the search even further using some form of - ForbiddenComment:Crypto.kt$Crypto$// TODO: Check if non-ECC keys satisfy params (i.e. approved/valid RSA modulus size). ForbiddenComment:Crypto.kt$Crypto$// TODO: We currently use SHA256(seed) when retrying, but BIP32 just skips a counter (i) that results to an invalid key. ForbiddenComment:Crypto.kt$Crypto$// TODO: change the val name to a more descriptive one as it's now confusing and looks like a Key type. ForbiddenComment:Crypto.kt$Crypto$// TODO: change val name to SPHINCS256_SHA512. This will break backwards compatibility. @@ -435,7 +421,6 @@ ForbiddenComment:RatesFixFlow.kt$RatesFixFlow.FixQueryFlow$// TODO: add deadline to receive ForbiddenComment:ResolveTransactionsFlow.kt$ResolveTransactionsFlow$// TODO: This could be done in parallel with other fetches for extra speed. ForbiddenComment:ResolveTransactionsFlowTest.kt$ResolveTransactionsFlowTest$// TODO: this operation should not require an explicit transaction - ForbiddenComment:RestrictedEntityManager.kt$RestrictedEntityManager$// TODO: Figure out which other methods on EntityManager need to be blocked? ForbiddenComment:ScheduledActivityObserver.kt$ScheduledActivityObserver.Companion$// TODO: Beware we are calling dynamically loaded contract code inside here. ForbiddenComment:ScheduledFlowIntegrationTests.kt$ScheduledFlowIntegrationTests$// TODO: the queries below are not atomic so we need to allow enough time for the scheduler to finish. Would be better to query scheduler. ForbiddenComment:SendTransactionFlow.kt$DataVendingFlow$// Security TODO: Check for abnormally large or malformed data requests @@ -622,7 +607,6 @@ FunctionNaming:VersionExtractorTest.kt$VersionExtractorTest$@Test(timeout=300_000) fun version_header_extraction_present() LargeClass:AbstractNode.kt$AbstractNode<S> : SingletonSerializeAsToken LargeClass:SingleThreadedStateMachineManager.kt$SingleThreadedStateMachineManager : StateMachineManagerStateMachineManagerInternal - LongMethod:FlowCookbook.kt$InitiatorFlow$@Suppress("RemoveExplicitTypeArguments") @Suspendable override fun call() LongMethod:HibernateQueryCriteriaParser.kt$HibernateQueryCriteriaParser$override fun parseCriteria(criteria: CommonQueryCriteria): Collection<Predicate> LongParameterList:AMQPSerializer.kt$AMQPSerializer$(obj: Any, data: Data, type: Type, output: SerializationOutput, context: SerializationContext, debugIndent: Int = 0) LongParameterList:AbstractCashSelection.kt$AbstractCashSelection$(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>, withResultSet: (ResultSet) -> Boolean) @@ -657,6 +641,9 @@ LongParameterList:CordaRPCOps.kt$( @Suppress("UNUSED_PARAMETER") flowConstructor: (A, B, C, D, E, F) -> R, arg0: A, arg1: B, arg2: C, arg3: D, arg4: E, arg5: F ) LongParameterList:CordaRPCOps.kt$( @Suppress("unused_parameter") flowConstructor: (A, B, C, D, E) -> R, arg0: A, arg1: B, arg2: C, arg3: D, arg4: E ) LongParameterList:CordaRPCOps.kt$( @Suppress("unused_parameter") flowConstructor: (A, B, C, D, E, F) -> R, arg0: A, arg1: B, arg2: C, arg3: D, arg4: E, arg5: F ) + LongParameterList:CordaRPCOps.kt$( clientId: String, @Suppress("unused_parameter") flowConstructor: (A, B, C, D) -> R, arg0: A, arg1: B, arg2: C, arg3: D ) + LongParameterList:CordaRPCOps.kt$( clientId: String, @Suppress("unused_parameter") flowConstructor: (A, B, C, D, E) -> R, arg0: A, arg1: B, arg2: C, arg3: D, arg4: E ) + LongParameterList:CordaRPCOps.kt$( clientId: String, @Suppress("unused_parameter") flowConstructor: (A, B, C, D, E, F) -> R, arg0: A, arg1: B, arg2: C, arg3: D, arg4: E, arg5: F ) LongParameterList:Driver.kt$DriverParameters$( isDebug: Boolean, driverDirectory: Path, portAllocation: PortAllocation, debugPortAllocation: PortAllocation, systemProperties: Map<String, String>, useTestClock: Boolean, startNodesInProcess: Boolean, waitForAllNodesToFinish: Boolean, notarySpecs: List<NotarySpec>, extraCordappPackagesToScan: List<String>, jmxPolicy: JmxPolicy, networkParameters: NetworkParameters ) LongParameterList:Driver.kt$DriverParameters$( isDebug: Boolean, driverDirectory: Path, portAllocation: PortAllocation, debugPortAllocation: PortAllocation, systemProperties: Map<String, String>, useTestClock: Boolean, startNodesInProcess: Boolean, waitForAllNodesToFinish: Boolean, notarySpecs: List<NotarySpec>, extraCordappPackagesToScan: List<String>, jmxPolicy: JmxPolicy, networkParameters: NetworkParameters, cordappsForAllNodes: Set<TestCordapp>? ) LongParameterList:DriverDSL.kt$DriverDSL$( defaultParameters: NodeParameters = NodeParameters(), providedName: CordaX500Name? = defaultParameters.providedName, rpcUsers: List<User> = defaultParameters.rpcUsers, verifierType: VerifierType = defaultParameters.verifierType, customOverrides: Map<String, Any?> = defaultParameters.customOverrides, startInSameProcess: Boolean? = defaultParameters.startInSameProcess, maximumHeapSize: String = defaultParameters.maximumHeapSize ) @@ -753,7 +740,6 @@ MagicNumber:AttachmentDemo.kt$10009 MagicNumber:AttachmentDemo.kt$10010 MagicNumber:AttachmentTrustTable.kt$AttachmentTrustTable$3 - MagicNumber:AttachmentsClassLoader.kt$AttachmentsClassLoader$4 MagicNumber:AzureSmbVolume.kt$AzureSmbVolume$5000 MagicNumber:BFTSmart.kt$BFTSmart.Client$100 MagicNumber:BFTSmart.kt$BFTSmart.Replica.<no name provided>$20000 @@ -775,12 +761,6 @@ MagicNumber:CashViewer.kt$CashViewer.StateRowGraphic$16 MagicNumber:CashViewer.kt$CashViewer.StateRowGraphic$30.0 MagicNumber:ClassCarpenter.kt$ClassCarpenterImpl$3 - MagicNumber:ClientRpcExample.kt$ClientRpcExample$3 - MagicNumber:ClientRpcTutorial.kt$0.7 - MagicNumber:ClientRpcTutorial.kt$0.8 - MagicNumber:ClientRpcTutorial.kt$1000 - MagicNumber:ClientRpcTutorial.kt$10000 - MagicNumber:ClientRpcTutorial.kt$2000 MagicNumber:CommercialPaperIssueFlow.kt$CommercialPaperIssueFlow$10 MagicNumber:CommercialPaperIssueFlow.kt$CommercialPaperIssueFlow$30 MagicNumber:CompositeSignature.kt$CompositeSignature$1024 @@ -846,11 +826,6 @@ MagicNumber:ExchangeRateModel.kt$1.18 MagicNumber:ExchangeRateModel.kt$1.31 MagicNumber:FixingFlow.kt$FixingFlow.Fixer.<no name provided>$30 - MagicNumber:FlowCookbook.kt$InitiatorFlow$30 - MagicNumber:FlowCookbook.kt$InitiatorFlow$45 - MagicNumber:FlowCookbook.kt$InitiatorFlow$777 - MagicNumber:FlowCookbook.kt$ResponderFlow$99 - MagicNumber:FlowCookbook.kt$ResponderFlow.<no name provided>$777 MagicNumber:FlowLogic.kt$FlowLogic$300 MagicNumber:FlowLogic.kt$FlowLogic.Companion$5 MagicNumber:FlowMonitor.kt$FlowMonitor$1000 @@ -864,7 +839,6 @@ MagicNumber:HTTPNetworkRegistrationService.kt$HTTPNetworkRegistrationService$10 MagicNumber:HttpUtils.kt$HttpUtils$5 MagicNumber:HttpUtils.kt$HttpUtils$60 - MagicNumber:IOUFlowResponder.kt$IOUFlowResponder.<no name provided>$100 MagicNumber:IRS.kt$RatePaymentEvent$360.0 MagicNumber:IRS.kt$RatePaymentEvent$4 MagicNumber:IRS.kt$RatePaymentEvent$8 @@ -1026,7 +1000,6 @@ MagicNumber:NodeWebServer.kt$NodeWebServer$100 MagicNumber:NodeWebServer.kt$NodeWebServer$32768 MagicNumber:NodeWebServer.kt$NodeWebServer$40 - MagicNumber:NonValidatingNotaryFlow.kt$NonValidatingNotaryFlow$4 MagicNumber:Notarise.kt$10 MagicNumber:Notarise.kt$10003 MagicNumber:NullKeys.kt$NullKeys$32 @@ -1143,7 +1116,6 @@ MagicNumber:StandaloneShell.kt$StandaloneShell$7 MagicNumber:StateRevisionFlow.kt$StateRevisionFlow.Requester$30 MagicNumber:Structures.kt$PrivacySalt$32 - MagicNumber:TargetVersionDependentRules.kt$StateContractValidationEnforcementRule$4 MagicNumber:TestNodeInfoBuilder.kt$TestNodeInfoBuilder$1234 MagicNumber:TestUtils.kt$10000 MagicNumber:TestUtils.kt$30000 @@ -1154,7 +1126,6 @@ MagicNumber:TraderDemoClientApi.kt$TraderDemoClientApi$3 MagicNumber:TransactionBuilder.kt$TransactionBuilder$4 MagicNumber:TransactionDSLInterpreter.kt$TransactionDSL$30 - MagicNumber:TransactionUtils.kt$4 MagicNumber:TransactionVerificationException.kt$TransactionVerificationException.ConstraintPropagationRejection$3 MagicNumber:TransactionViewer.kt$TransactionViewer$15.0 MagicNumber:TransactionViewer.kt$TransactionViewer$20.0 @@ -1179,8 +1150,6 @@ MagicNumber:WebServer.kt$100.0 MagicNumber:WebServer.kt$WebServer$500 MagicNumber:WireTransaction.kt$WireTransaction$4 - MagicNumber:WorkflowTransactionBuildTutorial.kt$SubmitCompletionFlow$60 - MagicNumber:WorkflowTransactionBuildTutorial.kt$SubmitTradeApprovalFlow$60 MagicNumber:X509Utilities.kt$X509Utilities$3650 MagicNumber:errorAndTerminate.kt$10 MatchingDeclarationName:AMQPSerializerFactories.kt$net.corda.serialization.internal.amqp.AMQPSerializerFactories.kt @@ -1225,7 +1194,6 @@ MatchingDeclarationName:TestConstants.kt$net.corda.testing.core.TestConstants.kt MatchingDeclarationName:TestUtils.kt$net.corda.testing.core.TestUtils.kt MatchingDeclarationName:TransactionTypes.kt$net.corda.explorer.model.TransactionTypes.kt - MatchingDeclarationName:TutorialFlowStateMachines.kt$net.corda.docs.kotlin.tutorial.flowstatemachines.TutorialFlowStateMachines.kt MatchingDeclarationName:Utils.kt$io.cryptoblk.core.Utils.kt MatchingDeclarationName:VirtualCordapps.kt$net.corda.node.internal.cordapp.VirtualCordapps.kt ModifierOrder:NodeNamedCache.kt$DefaultNamedCacheFactory$open protected @@ -1298,7 +1266,6 @@ SpreadOperator:ConfigUtilities.kt$(*pairs) SpreadOperator:Configuration.kt$Configuration.Validation.Error$(*(containingPath.toList() + this.containingPath).toTypedArray()) SpreadOperator:ContractJarTestUtils.kt$ContractJarTestUtils$(jarName, *contractNames.map{ "${it.replace(".", "/")}.class" }.toTypedArray()) - SpreadOperator:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$(logicType, context(), *args) SpreadOperator:CordaX500Name.kt$CordaX500Name.Companion$(*Locale.getISOCountries(), unspecifiedCountry) SpreadOperator:CustomCordapp.kt$CustomCordapp$(*classes.map { it.name }.toTypedArray()) SpreadOperator:CustomCordapp.kt$CustomCordapp$(*packages.map { it.replace('.', '/') }.toTypedArray()) @@ -1320,10 +1287,6 @@ SpreadOperator:FlowOverrideTests.kt$FlowOverrideTests$(*nodeBClasses.toTypedArray()) SpreadOperator:FlowTestsUtils.kt$(*allSessions) SpreadOperator:FlowTestsUtils.kt$(session, *sessions) - SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*ourInputStates.toTypedArray()) - SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*ourOutputState.map { StateAndContract(it, Cash.PROGRAM_ID) }.toTypedArray()) - SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*theirInputStates.toTypedArray()) - SpreadOperator:FxTransactionBuildTutorial.kt$ForeignExchangeFlow$(*theirOutputState.map { StateAndContract(it, Cash.PROGRAM_ID) }.toTypedArray()) SpreadOperator:HTTPNetworkRegistrationService.kt$HTTPNetworkRegistrationService$(OpaqueBytes(request.encoded), "Platform-Version" to "${versionInfo.platformVersion}", "Client-Version" to versionInfo.releaseVersion, "Private-Network-Map" to (config.pnm?.toString() ?: ""), *(config.csrToken?.let { arrayOf(CENM_SUBMISSION_TOKEN to it) } ?: arrayOf())) SpreadOperator:HibernateQueryCriteriaParser.kt$AbstractQueryCriteriaParser$(*leftPredicates.toTypedArray()) SpreadOperator:HibernateQueryCriteriaParser.kt$AbstractQueryCriteriaParser$(*leftPredicates.toTypedArray(), *rightPredicates.toTypedArray()) @@ -1574,7 +1537,6 @@ TooGenericExceptionCaught:NotaryUtils.kt$e: Exception TooGenericExceptionCaught:ObjectDiffer.kt$ObjectDiffer$throwable: Exception TooGenericExceptionCaught:P2PMessagingClient.kt$P2PMessagingClient$e: Exception - TooGenericExceptionCaught:PersistentIdentityMigrationNewTableTest.kt$PersistentIdentityMigrationNewTableTest$e: Exception TooGenericExceptionCaught:PersistentUniquenessProvider.kt$PersistentUniquenessProvider$e: Exception TooGenericExceptionCaught:ProfileController.kt$ProfileController$e: Exception TooGenericExceptionCaught:PropertyValidationTest.kt$PropertyValidationTest$e: Exception @@ -1742,8 +1704,6 @@ UnusedImports:Amount.kt$import net.corda.core.crypto.CompositeKey UnusedImports:Amount.kt$import net.corda.core.identity.Party UnusedImports:DummyLinearStateSchemaV1.kt$import net.corda.core.contracts.ContractState - UnusedImports:FlowsExecutionModeRpcTest.kt$import net.corda.core.internal.packageName - UnusedImports:FlowsExecutionModeRpcTest.kt$import net.corda.finance.schemas.CashSchemaV1 UnusedImports:InternalTestUtils.kt$import java.nio.file.Files UnusedImports:InternalTestUtils.kt$import net.corda.nodeapi.internal.loadDevCaTrustStore UnusedImports:NetworkMap.kt$import net.corda.core.node.NodeInfo @@ -2012,8 +1972,6 @@ WildcardImport:CordaModule.kt$import net.corda.core.identity.* WildcardImport:CordaModule.kt$import net.corda.core.transactions.* WildcardImport:CordaRPCOps.kt$import net.corda.core.node.services.vault.* - WildcardImport:CordaRPCOpsImplTest.kt$import net.corda.core.messaging.* - WildcardImport:CordaRPCOpsImplTest.kt$import org.assertj.core.api.Assertions.* WildcardImport:CordaServiceTest.kt$import kotlin.test.* WildcardImport:CordaViewModel.kt$import tornadofx.* WildcardImport:Cordapp.kt$import net.corda.core.cordapp.Cordapp.Info.* @@ -2031,10 +1989,6 @@ WildcardImport:CryptoSignUtils.kt$import net.corda.core.crypto.* WildcardImport:CryptoUtilsTest.kt$import kotlin.test.* WildcardImport:CustomCordapp.kt$import net.corda.core.internal.* - WildcardImport:CustomVaultQuery.kt$import net.corda.core.flows.* - WildcardImport:CustomVaultQuery.kt$import net.corda.core.utilities.* - WildcardImport:CustomVaultQueryTest.kt$import net.corda.core.node.services.vault.* - WildcardImport:CustomVaultQueryTest.kt$import net.corda.finance.* WildcardImport:DBNetworkParametersStorage.kt$import javax.persistence.* WildcardImport:DBRunnerExtension.kt$import org.junit.jupiter.api.extension.* WildcardImport:DBTransactionStorage.kt$import javax.persistence.* @@ -2057,7 +2011,6 @@ WildcardImport:DeserializeSimpleTypesTests.kt$import net.corda.serialization.internal.amqp.testutils.* WildcardImport:DigitalSignatureWithCert.kt$import java.security.cert.* WildcardImport:DistributedServiceTests.kt$import net.corda.testing.core.* - WildcardImport:DoRemainingWorkTransition.kt$import net.corda.node.services.statemachine.* WildcardImport:DockerInstantiator.kt$import com.github.dockerjava.api.model.* WildcardImport:DriverDSLImpl.kt$import net.corda.testing.driver.* WildcardImport:DummyContract.kt$import net.corda.core.contracts.* @@ -2076,8 +2029,6 @@ WildcardImport:EvolutionSerializerFactoryTests.kt$import kotlin.test.* WildcardImport:EvolutionSerializerFactoryTests.kt$import net.corda.serialization.internal.amqp.testutils.* WildcardImport:Explorer.kt$import tornadofx.* - WildcardImport:FiberDeserializationCheckingInterceptor.kt$import net.corda.node.services.statemachine.* - WildcardImport:FinalityFlowMigration.kt$import net.corda.core.flows.* WildcardImport:FinalityFlowTests.kt$import net.corda.testing.core.* WildcardImport:FinalityFlowTests.kt$import net.corda.testing.node.internal.* WildcardImport:FinalityHandlerTest.kt$import net.corda.node.services.statemachine.StaffedFlowHospital.* @@ -2089,11 +2040,7 @@ WildcardImport:FlowCheckpointCordapp.kt$import net.corda.core.flows.* WildcardImport:FlowCheckpointVersionNodeStartupCheckTest.kt$import net.corda.core.flows.* WildcardImport:FlowCheckpointVersionNodeStartupCheckTest.kt$import net.corda.core.internal.* - WildcardImport:FlowCookbook.kt$import net.corda.core.contracts.* - WildcardImport:FlowCookbook.kt$import net.corda.core.flows.* WildcardImport:FlowFrameworkPersistenceTests.kt$import net.corda.testing.node.internal.* - WildcardImport:FlowFrameworkTests.kt$import net.corda.core.flows.* - WildcardImport:FlowFrameworkTests.kt$import net.corda.testing.node.internal.* WildcardImport:FlowFrameworkTripartyTests.kt$import net.corda.testing.node.internal.* WildcardImport:FlowLogicRefFactoryImpl.kt$import net.corda.core.flows.* WildcardImport:FlowMatchers.kt$import net.corda.coretesting.internal.matchers.* @@ -2101,10 +2048,7 @@ WildcardImport:FlowRetryTest.kt$import net.corda.core.flows.* WildcardImport:FlowStackSnapshotTest.kt$import net.corda.core.flows.* WildcardImport:FlowStateMachine.kt$import net.corda.core.flows.* - WildcardImport:FlowStateMachineImpl.kt$import net.corda.core.flows.* - WildcardImport:FlowStateMachineImpl.kt$import net.corda.core.internal.* WildcardImport:FlowsDrainingModeContentionTest.kt$import net.corda.core.flows.* - WildcardImport:FxTransactionBuildTutorialTest.kt$import net.corda.finance.* WildcardImport:GenericsTests.kt$import net.corda.serialization.internal.amqp.testutils.* WildcardImport:Gui.kt$import tornadofx.* WildcardImport:GuiUtilities.kt$import tornadofx.* @@ -2121,8 +2065,6 @@ WildcardImport:HibernateQueryCriteriaParser.kt$import net.corda.core.node.services.vault.EqualityComparisonOperator.* WildcardImport:HibernateQueryCriteriaParser.kt$import net.corda.core.node.services.vault.LikenessOperator.* WildcardImport:HibernateStatistics.kt$import org.hibernate.stat.* - WildcardImport:IOUContract.kt$import net.corda.core.contracts.* - WildcardImport:IOUFlowResponder.kt$import net.corda.core.flows.* WildcardImport:IRS.kt$import net.corda.core.contracts.* WildcardImport:IRS.kt$import net.corda.finance.contracts.* WildcardImport:IRSState.kt$import net.corda.core.contracts.* @@ -2169,7 +2111,6 @@ WildcardImport:JarSignatureCollectorTest.kt$import net.corda.core.internal.* WildcardImport:KeyStoreUtilities.kt$import java.security.* WildcardImport:KeyStoreUtilities.kt$import net.corda.core.internal.* - WildcardImport:KotlinIntegrationTestingTutorial.kt$import net.corda.testing.core.* WildcardImport:Kryo.kt$import com.esotericsoftware.kryo.* WildcardImport:Kryo.kt$import net.corda.core.transactions.* WildcardImport:KryoStreamsTest.kt$import java.io.* @@ -2420,8 +2361,6 @@ WildcardImport:SignedTransaction.kt$import net.corda.core.contracts.* WildcardImport:SignedTransaction.kt$import net.corda.core.crypto.* WildcardImport:SimpleMQClient.kt$import org.apache.activemq.artemis.api.core.client.* - WildcardImport:SingleThreadedStateMachineManager.kt$import net.corda.core.internal.* - WildcardImport:SingleThreadedStateMachineManager.kt$import net.corda.node.services.statemachine.interceptors.* WildcardImport:SpringDriver.kt$import net.corda.testing.node.internal.* WildcardImport:StandaloneCordaRPClientTest.kt$import net.corda.core.messaging.* WildcardImport:StandaloneCordaRPClientTest.kt$import net.corda.core.node.services.vault.* @@ -2473,8 +2412,6 @@ WildcardImport:TransactionViewer.kt$import net.corda.client.jfx.utils.* WildcardImport:TransactionViewer.kt$import net.corda.core.contracts.* WildcardImport:TransactionViewer.kt$import tornadofx.* - WildcardImport:TutorialContract.kt$import net.corda.core.contracts.* - WildcardImport:TutorialTestDSL.kt$import net.corda.testing.core.* WildcardImport:TwoPartyDealFlow.kt$import net.corda.core.flows.* WildcardImport:TwoPartyTradeFlow.kt$import net.corda.core.contracts.* WildcardImport:TwoPartyTradeFlow.kt$import net.corda.core.flows.* @@ -2502,7 +2439,6 @@ WildcardImport:ValidatingNotaryServiceTests.kt$import net.corda.core.flows.* WildcardImport:ValidatingNotaryServiceTests.kt$import net.corda.testing.node.internal.* WildcardImport:VaultFiller.kt$import net.corda.core.contracts.* - WildcardImport:VaultFlowTest.kt$import net.corda.core.flows.* WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.* WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.vault.* WildcardImport:VaultQueryExceptionsTests.kt$import net.corda.core.node.services.vault.QueryCriteria.* diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt new file mode 100644 index 0000000000..2f8fc40935 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt @@ -0,0 +1,77 @@ +package net.corda.node.flows + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.messaging.startFlowWithClientId +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import org.junit.Before +import org.junit.Test +import java.util.* +import kotlin.test.assertEquals +import kotlin.test.assertNotEquals +import kotlin.test.assertTrue + +class FlowWithClientIdTest { + + @Before + fun reset() { + ResultFlow.hook = null + } + + @Test(timeout=300_000) + fun `start flow with client id`() { + val clientId = UUID.randomUUID().toString() + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { + val nodeA = startNode().getOrThrow() + val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5) + + assertEquals(5, flowHandle.returnValue.getOrThrow(20.seconds)) + assertEquals(clientId, flowHandle.clientId) + } + } + + @Test(timeout=300_000) + fun `remove client id`() { + val clientId = UUID.randomUUID().toString() + var counter = 0 + ResultFlow.hook = { counter++ } + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { + val nodeA = startNode().getOrThrow() + + val flowHandle0 = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5) + flowHandle0.returnValue.getOrThrow(20.seconds) + + val removed = nodeA.rpc.removeClientId(clientId) + + val flowHandle1 = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5) + flowHandle1.returnValue.getOrThrow(20.seconds) + + assertTrue(removed) + assertNotEquals(flowHandle0.id, flowHandle1.id) + assertEquals(flowHandle0.clientId, flowHandle1.clientId) + assertEquals(2, counter) // this asserts that 2 different flows were spawned indeed + } + + } + +} + +@StartableByRPC +internal class ResultFlow(private val result: A): FlowLogic() { + companion object { + var hook: (() -> Unit)? = null + var suspendableHook: FlowLogic? = null + } + + @Suspendable + override fun call(): A { + hook?.invoke() + suspendableHook?.let { subFlow(it) } + return result + } +} + diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 74dcee8c01..00862192c1 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -28,7 +28,7 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.AttachmentTrustCalculator -import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.internal.NODE_INFO_DIRECTORY import net.corda.core.internal.NamedCacheFactory import net.corda.core.internal.NetworkParametersStorage @@ -331,7 +331,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val checkpointStorage = DBCheckpointStorage(DBCheckpointPerformanceRecorder(services.monitoringService.metrics), platformClock) @Suppress("LeakingThis") val smm = makeStateMachineManager() - val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory) + val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory, DBCheckpointStorage.MAX_CLIENT_ID_LENGTH) private val schedulerService = NodeSchedulerService( platformClock, database, @@ -1268,13 +1268,22 @@ internal fun logVendorString(database: CordaPersistence, log: Logger) { } // TODO Move this into its own file -class FlowStarterImpl(private val smm: StateMachineManager, private val flowLogicRefFactory: FlowLogicRefFactory) : FlowStarter { - override fun startFlow(event: ExternalEvent.ExternalStartFlowEvent): CordaFuture> { - smm.deliverExternalEvent(event) +class FlowStarterImpl( + private val smm: StateMachineManager, + private val flowLogicRefFactory: FlowLogicRefFactory, + private val maxClientIdLength: Int +) : FlowStarter { + override fun startFlow(event: ExternalEvent.ExternalStartFlowEvent): CordaFuture> { + val clientId = event.context.clientId + if (clientId != null && clientId.length > maxClientIdLength) { + throw IllegalArgumentException("clientId cannot be longer than $maxClientIdLength characters") + } else { + smm.deliverExternalEvent(event) + } return event.future } - override fun startFlow(logic: FlowLogic, context: InvocationContext): CordaFuture> { + override fun startFlow(logic: FlowLogic, context: InvocationContext): CordaFuture> { val startFlowEvent = object : ExternalEvent.ExternalStartFlowEvent, DeduplicationHandler { override fun insideDatabaseTransaction() {} @@ -1291,12 +1300,12 @@ class FlowStarterImpl(private val smm: StateMachineManager, private val flowLogi override val context: InvocationContext get() = context - override fun wireUpFuture(flowFuture: CordaFuture>) { + override fun wireUpFuture(flowFuture: CordaFuture>) { _future.captureLater(flowFuture) } - private val _future = openFuture>() - override val future: CordaFuture> + private val _future = openFuture>() + override val future: CordaFuture> get() = _future } return startFlow(startFlowEvent) @@ -1305,7 +1314,7 @@ class FlowStarterImpl(private val smm: StateMachineManager, private val flowLogi override fun invokeFlowAsync( logicType: Class>, context: InvocationContext, - vararg args: Any?): CordaFuture> { + vararg args: Any?): CordaFuture> { val logicRef = flowLogicRefFactory.createForRPC(logicType, *args) val logic: FlowLogic = uncheckedCast(flowLogicRefFactory.toFlowLogic(logicRef)) return startFlow(logic, context) diff --git a/node/src/main/kotlin/net/corda/node/internal/AppServiceHubImpl.kt b/node/src/main/kotlin/net/corda/node/internal/AppServiceHubImpl.kt index 9c90173cb2..922316045b 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AppServiceHubImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AppServiceHubImpl.kt @@ -3,7 +3,7 @@ package net.corda.node.internal import net.corda.core.context.InvocationContext import net.corda.core.flows.FlowLogic import net.corda.core.flows.StartableByService -import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.internal.concurrent.doneFuture import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.FlowHandleImpl @@ -78,7 +78,7 @@ internal class AppServiceHubImpl(private val serviceHub: S return FlowProgressHandleImpl( id = stateMachine.id, returnValue = stateMachine.resultFuture, - progress = stateMachine.logic.track()?.updates ?: Observable.empty() + progress = stateMachine.logic?.track()?.updates ?: Observable.empty() ) } @@ -95,7 +95,7 @@ internal class AppServiceHubImpl(private val serviceHub: S } } - private fun startFlowChecked(flow: FlowLogic): FlowStateMachine { + private fun startFlowChecked(flow: FlowLogic): FlowStateMachineHandle { val logicType = flow.javaClass require(logicType.isAnnotationPresent(StartableByService::class.java)) { "${logicType.name} was not designed for starting by a CordaService" } // TODO check service permissions diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 6d058aaf37..66493ac033 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -19,7 +19,7 @@ import net.corda.core.identity.AbstractParty import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.AttachmentTrustInfo -import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.internal.RPC_UPLOADER import net.corda.core.internal.STRUCTURAL_STEP_PREFIX import net.corda.core.internal.messaging.InternalCordaRPCOps @@ -27,6 +27,8 @@ import net.corda.core.internal.sign import net.corda.core.messaging.DataFeed import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.FlowHandleImpl +import net.corda.core.messaging.FlowHandleWithClientId +import net.corda.core.messaging.FlowHandleWithClientIdImpl import net.corda.core.messaging.FlowProgressHandle import net.corda.core.messaging.FlowProgressHandleImpl import net.corda.core.messaging.ParametersUpdateInfo @@ -170,6 +172,8 @@ internal class CordaRPCOpsImpl( override fun killFlow(id: StateMachineRunId): Boolean = smm.killFlow(id) + override fun removeClientId(clientId: String): Boolean = smm.removeClientId(clientId) + override fun stateMachinesFeed(): DataFeed, StateMachineUpdate> { val (allStateMachines, changes) = smm.track() @@ -236,27 +240,33 @@ internal class CordaRPCOpsImpl( } override fun startTrackedFlowDynamic(logicType: Class>, vararg args: Any?): FlowProgressHandle { - val stateMachine = startFlow(logicType, args) + val stateMachine = startFlow(logicType, context(), args) return FlowProgressHandleImpl( id = stateMachine.id, returnValue = stateMachine.resultFuture, - progress = stateMachine.logic.track()?.updates?.filter { !it.startsWith(STRUCTURAL_STEP_PREFIX) } ?: Observable.empty(), - stepsTreeIndexFeed = stateMachine.logic.trackStepsTreeIndex(), - stepsTreeFeed = stateMachine.logic.trackStepsTree() + progress = stateMachine.logic?.track()?.updates?.filter { !it.startsWith(STRUCTURAL_STEP_PREFIX) } ?: Observable.empty(), + stepsTreeIndexFeed = stateMachine.logic?.trackStepsTreeIndex(), + stepsTreeFeed = stateMachine.logic?.trackStepsTree() ) } override fun startFlowDynamic(logicType: Class>, vararg args: Any?): FlowHandle { - val stateMachine = startFlow(logicType, args) + val stateMachine = startFlow(logicType, context(), args) return FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture) } - private fun startFlow(logicType: Class>, args: Array): FlowStateMachine { + override fun startFlowDynamicWithClientId(clientId: String, logicType: Class>, vararg args: Any?): FlowHandleWithClientId { + val stateMachine = startFlow(logicType, context().withClientId(clientId), args) + return FlowHandleWithClientIdImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture, clientId = stateMachine.clientId!!) + } + + @Suppress("SpreadOperator") + private fun startFlow(logicType: Class>, context: InvocationContext, args: Array): FlowStateMachineHandle { if (!logicType.isAnnotationPresent(StartableByRPC::class.java)) throw NonRpcFlowException(logicType) if (isFlowsDrainingModeEnabled()) { throw RejectedCommandException("Node is draining before shutdown. Cannot start new flows through RPC.") } - return flowStarter.invokeFlowAsync(logicType, context(), *args).getOrThrow() + return flowStarter.invokeFlowAsync(logicType, context, *args).getOrThrow() } override fun attachmentExists(id: SecureHash): Boolean { @@ -464,4 +474,6 @@ internal class CordaRPCOpsImpl( private inline fun Class<*>.checkIsA() { require(TARGET::class.java.isAssignableFrom(this)) { "$name is not a ${TARGET::class.java.name}" } } + + private fun InvocationContext.withClientId(clientId: String) = copy(clientId = clientId) } diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 6fa3ed5869..273a95dfaa 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -215,13 +215,13 @@ interface FlowStarter { * just synthesizes an [ExternalEvent.ExternalStartFlowEvent] and calls the method below. * @param context indicates who started the flow, see: [InvocationContext]. */ - fun startFlow(logic: FlowLogic, context: InvocationContext): CordaFuture> + fun startFlow(logic: FlowLogic, context: InvocationContext): CordaFuture> /** * Starts a flow as described by an [ExternalEvent.ExternalStartFlowEvent]. If a transient error * occurs during invocation, it will re-attempt to start the flow. */ - fun startFlow(event: ExternalEvent.ExternalStartFlowEvent): CordaFuture> + fun startFlow(event: ExternalEvent.ExternalStartFlowEvent): CordaFuture> /** * Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the flow. @@ -232,9 +232,10 @@ interface FlowStarter { * [logicType] or [args]. */ fun invokeFlowAsync( - logicType: Class>, - context: InvocationContext, - vararg args: Any?): CordaFuture> + logicType: Class>, + context: InvocationContext, + vararg args: Any? + ): CordaFuture> } interface StartedNodeServices : ServiceHubInternal, FlowStarter diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index 57c3254cfc..ff341af5d2 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -258,7 +258,7 @@ class NodeSchedulerService(private val clock: CordaClock, return "${javaClass.simpleName}($scheduledState)" } - override fun wireUpFuture(flowFuture: CordaFuture>) { + override fun wireUpFuture(flowFuture: CordaFuture>) { _future.captureLater(flowFuture) val future = _future.flatMap { it.resultFuture } future.then { @@ -266,8 +266,8 @@ class NodeSchedulerService(private val clock: CordaClock, } } - private val _future = openFuture>() - override val future: CordaFuture> + private val _future = openFuture>() + override val future: CordaFuture> get() = _future } diff --git a/node/src/main/kotlin/net/corda/node/services/logging/ContextualLoggingUtils.kt b/node/src/main/kotlin/net/corda/node/services/logging/ContextualLoggingUtils.kt index 2e2211b695..03a4f6fac9 100644 --- a/node/src/main/kotlin/net/corda/node/services/logging/ContextualLoggingUtils.kt +++ b/node/src/main/kotlin/net/corda/node/services/logging/ContextualLoggingUtils.kt @@ -13,6 +13,12 @@ internal fun InvocationContext.pushToLoggingContext() { origin.pushToLoggingContext() externalTrace?.pushToLoggingContext("external_") impersonatedActor?.pushToLoggingContext("impersonating_") + + clientId?.let { + MDC.getMDCAdapter().apply { + put("client_id", it) + } + } } internal fun Trace.pushToLoggingContext(prefix: String = "") { diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 38cbf1d833..a5be0edef5 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -55,6 +55,7 @@ class DBCheckpointStorage( private const val MAX_EXC_TYPE_LENGTH = 256 private const val MAX_FLOW_NAME_LENGTH = 128 private const val MAX_PROGRESS_STEP_LENGTH = 256 + const val MAX_CLIENT_ID_LENGTH = 512 private val RUNNABLE_CHECKPOINTS = setOf(FlowStatus.RUNNABLE, FlowStatus.HOSPITALIZED) @@ -513,8 +514,7 @@ class DBCheckpointStorage( // Truncate the flow name to fit into the database column // Flow names are unlikely to be this long flowName = flowInfo.flowClass.name.take(MAX_FLOW_NAME_LENGTH), - // will come from the context - userSuppliedIdentifier = null, + userSuppliedIdentifier = context.clientId, startType = context.getStartedType(), initialParameters = context.getFlowParameters().storageSerialize().bytes, launchingCordapp = (flowInfo.subFlowVersion as? SubFlowVersion.CorDappFlow)?.corDappName ?: "Core flow", @@ -627,10 +627,14 @@ class DBCheckpointStorage( } } + @Suppress("MagicNumber") private fun InvocationContext.getFlowParameters(): List { - // Only RPC flows have parameters which are found in index 1 - return if (arguments.isNotEmpty()) { - uncheckedCast>(arguments[1]).toList() + // Only RPC flows have parameters which are found in index 1 or index 2 (if called with client id) + return if (arguments!!.isNotEmpty()) { + arguments!!.run { + check(size == 2 || size == 3) { "Unexpected argument number provided in rpc call" } + uncheckedCast>(last()).toList() + } } else { emptyList() } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt index be8026b73f..ab0121d8f6 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt @@ -28,6 +28,8 @@ import java.security.SecureRandom class Flow(val fiber: FlowStateMachineImpl, val resultFuture: OpenFuture) class NonResidentFlow(val runId: StateMachineRunId, val checkpoint: Checkpoint) { + val resultFuture: OpenFuture = openFuture() + val externalEvents = mutableListOf() fun addExternalEvent(message: Event.DeliverSessionMessage) { @@ -62,13 +64,12 @@ class FlowCreator( } else -> nonResidentFlow.checkpoint } - return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint) + return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint, nonResidentFlow.resultFuture) } - fun createFlowFromCheckpoint(runId: StateMachineRunId, oldCheckpoint: Checkpoint): Flow<*>? { + fun createFlowFromCheckpoint(runId: StateMachineRunId, oldCheckpoint: Checkpoint, resultFuture: OpenFuture = openFuture()): Flow<*>? { val checkpoint = oldCheckpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE) val fiber = checkpoint.getFiberFromCheckpoint(runId) ?: return null - val resultFuture = openFuture() fiber.transientValues = TransientReference(createTransientValues(runId, resultFuture)) fiber.logic.stateMachine = fiber verifyFlowLogicIsSuspendable(fiber.logic) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 5277d89638..68ffae6251 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -132,6 +132,8 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, override val context: InvocationContext get() = transientState!!.value.checkpoint.checkpointState.invocationContext override val ourIdentity: Party get() = transientState!!.value.checkpoint.checkpointState.ourIdentity override val isKilled: Boolean get() = transientState!!.value.isKilled + override val clientId: String? + get() = transientState!!.value.checkpoint.checkpointState.invocationContext.clientId internal val softLockedStates = mutableSetOf() diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 2914aecd5d..4bbd292c77 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -13,12 +13,17 @@ import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.FlowStateMachineHandle +import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.castIfPossible +import net.corda.core.internal.concurrent.OpenFuture +import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.mapError import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.mapNotNull +import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.DataFeed import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.CheckpointSerializationContext @@ -75,12 +80,21 @@ internal class SingleThreadedStateMachineManager( ) : StateMachineManager, StateMachineManagerInternal { companion object { private val logger = contextLogger() + + @VisibleForTesting + var beforeClientIDCheck: (() -> Unit)? = null + @VisibleForTesting + var onClientIDNotFound: (() -> Unit)? = null + @VisibleForTesting + var onCallingStartFlowInternal: (() -> Unit)? = null + @VisibleForTesting + var onStartFlowInternalThrewAndAboutToRemove: (() -> Unit)? = null } private val innerState = StateMachineInnerStateImpl() private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor) private val scheduledFutureExecutor = Executors.newSingleThreadScheduledExecutor( - ThreadFactoryBuilder().setNameFormat("flow-scheduled-future-thread").setDaemon(true).build() + ThreadFactoryBuilder().setNameFormat("flow-scheduled-future-thread").setDaemon(true).build() ) // How many Fibers are running (this includes suspended flows). If zero and stopping is true, then we are halted. private val liveFibers = ReusableLatch() @@ -158,6 +172,25 @@ internal class SingleThreadedStateMachineManager( } } } + + // at the moment we have RUNNABLE, HOSPITALIZED and PAUSED flows + // - RESULTED flows need to be fetched upon implementing https://r3-cev.atlassian.net/browse/CORDA-3692 + // - FAILED flows need to be fetched upon implementing https://r3-cev.atlassian.net/browse/CORDA-3681 + // - Incompatible checkpoints need to be handled upon implementing CORDA-3897 + for (flow in fibers) { + flow.fiber.clientId?.let { + innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(doneFuture(flow.fiber)) + } + } + + for (pausedFlow in pausedFlows) { + pausedFlow.value.checkpoint.checkpointState.invocationContext.clientId?.let { + innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active( + doneClientIdFuture(pausedFlow.key, pausedFlow.value.resultFuture, it) + ) + } + } + return serviceHub.networkMapCache.nodeReady.map { logger.info("Node ready, info: ${serviceHub.myInfo}") resumeRestoredFlows(fibers) @@ -229,21 +262,62 @@ internal class SingleThreadedStateMachineManager( } } + @Suppress("ComplexMethod") private fun startFlow( flowId: StateMachineRunId, flowLogic: FlowLogic, context: InvocationContext, ourIdentity: Party?, deduplicationHandler: DeduplicationHandler? - ): CordaFuture> { - return startFlowInternal( + ): CordaFuture> { + beforeClientIDCheck?.invoke() + + var newFuture: OpenFuture>? = null + + val clientId = context.clientId + if (clientId != null) { + var existingFuture: CordaFuture>? = null + innerState.withLock { + clientIdsToFlowIds.compute(clientId) { _, existingStatus -> + if (existingStatus != null) { + existingFuture = when (existingStatus) { + is FlowWithClientIdStatus.Active -> existingStatus.flowStateMachineFuture + // This below dummy future ('doneFuture(5)') will be populated from DB upon implementing CORDA-3692 and CORDA-3681 - for now just return a dummy future + is FlowWithClientIdStatus.Removed -> doneClientIdFuture(existingStatus.flowId, doneFuture(@Suppress("MagicNumber")5), clientId) + } + existingStatus + } else { + newFuture = openFuture() + FlowWithClientIdStatus.Active(newFuture!!) + } + } + } + if (existingFuture != null) return uncheckedCast(existingFuture) + + onClientIDNotFound?.invoke() + } + + return try { + startFlowInternal( flowId, invocationContext = context, flowLogic = flowLogic, flowStart = FlowStart.Explicit, ourIdentity = ourIdentity ?: ourFirstIdentity, deduplicationHandler = deduplicationHandler - ) + ).also { + newFuture?.captureLater(uncheckedCast(it)) + } + } catch (t: Throwable) { + onStartFlowInternalThrewAndAboutToRemove?.invoke() + innerState.withLock { + clientIdsToFlowIds.remove(clientId) + newFuture?.setException(t) + } + // Throwing the exception plain here is the same as to return an exceptionally completed future since the caller calls + // getOrThrow() on the returned future at [CordaRPCOpsImpl.startFlow]. + throw t + } } override fun killFlow(id: StateMachineRunId): Boolean { @@ -591,6 +665,7 @@ internal class SingleThreadedStateMachineManager( ourIdentity: Party, deduplicationHandler: DeduplicationHandler? ): CordaFuture> { + onCallingStartFlowInternal?.invoke() val existingFlow = innerState.withLock { flows[flowId] } val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState?.value?.isAnyCheckpointPersisted == true) { @@ -736,6 +811,7 @@ internal class SingleThreadedStateMachineManager( require(lastState.isRemoved) { "Flow must be in removable state before removal" } require(lastState.checkpoint.checkpointState.subFlowStack.size == 1) { "Checkpointed stack must be empty" } require(flow.fiber.id !in sessionToFlow.values) { "Flow fibre must not be needed by an existing session" } + flow.fiber.clientId?.let { setClientIdAsSucceeded(it, flow.fiber.id) } flow.resultFuture.set(removalReason.flowReturnValue) lastState.flowLogic.progressTracker?.currentStep = ProgressTracker.DONE changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Success(removalReason.flowReturnValue))) @@ -747,13 +823,14 @@ internal class SingleThreadedStateMachineManager( lastState: StateMachineState ) { drainFlowEventQueue(flow) + // Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition]) + startedFutures.remove(flow.fiber.id)?.set(Unit) + flow.fiber.clientId?.let { setClientIdAsFailed(it, flow.fiber.id) } val flowError = removalReason.flowErrors[0] // TODO what to do with several? val exception = flowError.exception (exception as? FlowException)?.originalErrorId = flowError.errorId flow.resultFuture.setException(exception) lastState.flowLogic.progressTracker?.endWithError(exception) - // Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition]) - startedFutures.remove(flow.fiber.id)?.set(Unit) changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Failure(exception))) } @@ -781,4 +858,55 @@ internal class SingleThreadedStateMachineManager( } } } + + private fun StateMachineInnerState.setClientIdAsSucceeded(clientId: String, id: StateMachineRunId) { + setClientIdAsRemoved(clientId, id, true) + } + + private fun StateMachineInnerState.setClientIdAsFailed(clientId: String, id: StateMachineRunId) { + setClientIdAsRemoved(clientId, id, false) + } + + private fun StateMachineInnerState.setClientIdAsRemoved( + clientId: String, + id: StateMachineRunId, + succeeded: Boolean + ) { + clientIdsToFlowIds.compute(clientId) { _, existingStatus -> + require(existingStatus != null && existingStatus is FlowWithClientIdStatus.Active) + FlowWithClientIdStatus.Removed(id, succeeded) + } + } + + /** + * The flow out of which a [doneFuture] will be produced should be a started flow, + * i.e. it should not exist in [mutex.content.startedFutures]. + */ + private fun doneClientIdFuture( + id: StateMachineRunId, + resultFuture: CordaFuture, + clientId: String + ): CordaFuture> = + doneFuture(object : FlowStateMachineHandle { + override val logic: Nothing? = null + override val id: StateMachineRunId = id + override val resultFuture: CordaFuture = resultFuture + override val clientId: String? = clientId + } + ) + + override fun removeClientId(clientId: String): Boolean { + var removed = false + innerState.withLock { + clientIdsToFlowIds.compute(clientId) { _, existingStatus -> + if (existingStatus != null && existingStatus is FlowWithClientIdStatus.Removed) { + removed = true + null + } else { // don't remove + existingStatus + } + } + } + return removed + } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineInnerState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineInnerState.kt index 0252e21e80..66017480ca 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineInnerState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineInnerState.kt @@ -17,6 +17,7 @@ internal interface StateMachineInnerState { val changesPublisher: PublishSubject /** Flows scheduled to be retried if not finished within the specified timeout period. */ val timedFlows: MutableMap + val clientIdsToFlowIds: MutableMap fun withMutex(block: StateMachineInnerState.() -> R): R } @@ -30,6 +31,7 @@ internal class StateMachineInnerStateImpl : StateMachineInnerState { override val pausedFlows = HashMap() override val startedFutures = HashMap>() override val timedFlows = HashMap() + override val clientIdsToFlowIds = HashMap() override fun withMutex(block: StateMachineInnerState.() -> R): R = lock.withLock { block(this) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 66a5a60797..4cd7702582 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -5,6 +5,7 @@ import net.corda.core.context.InvocationContext import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.messaging.DataFeed import net.corda.core.utilities.Try import net.corda.node.services.messaging.DeduplicationHandler @@ -98,6 +99,13 @@ interface StateMachineManager { * Returns a snapshot of all [FlowStateMachineImpl]s currently managed. */ fun snapshot(): Set> + + /** + * Removes a flow's [clientId] to result/ exception mapping. + * + * @return whether the mapping was removed. + */ + fun removeClientId(clientId: String): Boolean } // These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call @@ -139,11 +147,11 @@ interface ExternalEvent { /** * A callback for the state machine to pass back the [CordaFuture] associated with the flow start to the submitter. */ - fun wireUpFuture(flowFuture: CordaFuture>) + fun wireUpFuture(flowFuture: CordaFuture>) /** * The future representing the flow start, passed back from the state machine to the submitter of this event. */ - val future: CordaFuture> + val future: CordaFuture> } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 58a072fc99..83e5bd44c0 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -1,12 +1,15 @@ package net.corda.node.services.statemachine +import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext import net.corda.core.crypto.SecureHash import net.corda.core.flows.Destination import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes @@ -109,8 +112,8 @@ data class Checkpoint( listOf(topLevelSubFlow), numberOfSuspends = 0 ), - errorState = ErrorState.Clean, - flowState = FlowState.Unstarted(flowStart, frozenFlowLogic) + flowState = FlowState.Unstarted(flowStart, frozenFlowLogic), + errorState = ErrorState.Clean ) } } @@ -380,3 +383,8 @@ sealed class SubFlowVersion { data class CoreFlow(override val platformVersion: Int) : SubFlowVersion() data class CorDappFlow(override val platformVersion: Int, val corDappName: String, val corDappHash: SecureHash) : SubFlowVersion() } + +sealed class FlowWithClientIdStatus { + data class Active(val flowStateMachineFuture: CordaFuture>) : FlowWithClientIdStatus() + data class Removed(val flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus() +} diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 1b7d79dfec..9d1c2c0b59 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -178,7 +178,7 @@ class TopLevelTransition( private fun suspendTransition(event: Event.Suspend): TransitionResult { return builder { val newCheckpoint = currentState.checkpoint.run { - val newCheckpointState = if (checkpointState.invocationContext.arguments.isNotEmpty()) { + val newCheckpointState = if (checkpointState.invocationContext.arguments!!.isNotEmpty()) { checkpointState.copy( invocationContext = checkpointState.invocationContext.copy(arguments = emptyList()), numberOfSuspends = checkpointState.numberOfSuspends + 1 diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt new file mode 100644 index 0000000000..b600525b6c --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt @@ -0,0 +1,417 @@ +package net.corda.node.services.statemachine + +import co.paralleluniverse.fibers.Fiber +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.strands.concurrent.Semaphore +import net.corda.core.flows.FlowLogic +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.node.InMemoryMessagingNetwork +import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP +import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP +import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.InternalMockNodeParameters +import net.corda.testing.node.internal.TestStartedNode +import net.corda.testing.node.internal.startFlowWithClientId +import org.junit.After +import org.junit.Assert +import org.junit.Before +import org.junit.Ignore +import org.junit.Test +import java.lang.IllegalStateException +import java.sql.SQLTransientConnectionException +import java.util.UUID +import java.util.concurrent.atomic.AtomicInteger +import kotlin.concurrent.thread +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue + +class FlowClientIdTests { + + private lateinit var mockNet: InternalMockNetwork + private lateinit var aliceNode: TestStartedNode + + @Before + fun setUpMockNet() { + mockNet = InternalMockNetwork( + cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP, FINANCE_CONTRACTS_CORDAPP), + servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin() + ) + + aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) + + } + + @After + fun cleanUp() { + mockNet.stopNodes() + ResultFlow.hook = null + ResultFlow.suspendableHook = null + SingleThreadedStateMachineManager.beforeClientIDCheck = null + SingleThreadedStateMachineManager.onClientIDNotFound = null + SingleThreadedStateMachineManager.onCallingStartFlowInternal = null + SingleThreadedStateMachineManager.onStartFlowInternalThrewAndAboutToRemove = null + } + + @Test(timeout=300_000) + fun `no new flow starts if the client id provided pre exists`() { + var counter = 0 + ResultFlow.hook = { counter++ } + val clientId = UUID.randomUUID().toString() + aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow() + aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow() + Assert.assertEquals(1, counter) + } + + @Test(timeout=300_000) + fun `flow's result is retrievable after flow's lifetime, when flow is started with a client id - different parameters are ignored`() { + val clientId = UUID.randomUUID().toString() + val handle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) + val clientId0 = handle0.clientId + val flowId0 = handle0.id + val result0 = handle0.resultFuture.getOrThrow() + + val handle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10)) + val clientId1 = handle1.clientId + val flowId1 = handle1.id + val result1 = handle1.resultFuture.getOrThrow() + + Assert.assertEquals(clientId0, clientId1) + Assert.assertEquals(flowId0, flowId1) + Assert.assertEquals(result0, result1) + } + + @Test(timeout=300_000) + fun `flow's result is available if reconnect after flow had retried from previous checkpoint, when flow is started with a client id`() { + var firstRun = true + ResultFlow.hook = { + if (firstRun) { + firstRun = false + throw SQLTransientConnectionException("connection is not available") + } + } + + val clientId = UUID.randomUUID().toString() + val result0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow() + val result1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow() + Assert.assertEquals(result0, result1) + } + + @Test(timeout=300_000) + fun `flow's result is available if reconnect during flow's retrying from previous checkpoint, when flow is started with a client id`() { + var firstRun = true + val waitForSecondRequest = Semaphore(0) + val waitUntilFlowHasRetried = Semaphore(0) + ResultFlow.suspendableHook = object : FlowLogic() { + @Suspendable + override fun call() { + if (firstRun) { + firstRun = false + throw SQLTransientConnectionException("connection is not available") + } else { + waitUntilFlowHasRetried.release() + waitForSecondRequest.acquire() + } + } + } + + var result1 = 0 + val clientId = UUID.randomUUID().toString() + val handle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) + waitUntilFlowHasRetried.acquire() + val t = thread { result1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow() } + + Thread.sleep(1000) + waitForSecondRequest.release() + val result0 = handle0.resultFuture.getOrThrow() + t.join() + Assert.assertEquals(result0, result1) + } + + @Ignore // this is to be unignored upon implementing CORDA-3681 + @Test(timeout=300_000) + fun `flow's exception is available after flow's lifetime if flow is started with a client id`() { + ResultFlow.hook = { throw IllegalStateException() } + val clientId = UUID.randomUUID().toString() + + assertFailsWith { + aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow() + } + + assertFailsWith { + aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow() + } + } + + @Test(timeout=300_000) + fun `flow's client id mapping gets removed upon request`() { + val clientId = UUID.randomUUID().toString() + var counter = 0 + ResultFlow.hook = { counter++ } + val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) + flowHandle0.resultFuture.getOrThrow(20.seconds) + val removed = aliceNode.smm.removeClientId(clientId) + // On new request with clientId, after the same clientId was removed, a brand new flow will start with that clientId + val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) + flowHandle1.resultFuture.getOrThrow(20.seconds) + + assertTrue(removed) + Assert.assertNotEquals(flowHandle0.id, flowHandle1.id) + Assert.assertEquals(flowHandle0.clientId, flowHandle1.clientId) + Assert.assertEquals(2, counter) + } + + @Test(timeout=300_000) + fun `flow's client id mapping can only get removed once the flow gets removed`() { + val clientId = UUID.randomUUID().toString() + var tries = 0 + val maxTries = 10 + var failedRemovals = 0 + val semaphore = Semaphore(0) + ResultFlow.suspendableHook = object : FlowLogic() { + @Suspendable + override fun call() { + semaphore.acquire() + } + } + val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) + + var removed = false + while (!removed) { + removed = aliceNode.smm.removeClientId(clientId) + if (!removed) ++failedRemovals + ++tries + if (tries >= maxTries) { + semaphore.release() + flowHandle0.resultFuture.getOrThrow(20.seconds) + } + } + + assertTrue(removed) + Assert.assertEquals(maxTries, failedRemovals) + } + + @Test(timeout=300_000) + fun `only one flow starts upon concurrent requests with the same client id`() { + val requests = 2 + val counter = AtomicInteger(0) + val resultsCounter = AtomicInteger(0) + ResultFlow.hook = { counter.incrementAndGet() } + //(aliceNode.smm as SingleThreadedStateMachineManager).concurrentRequests = true + + val clientId = UUID.randomUUID().toString() + val threads = arrayOfNulls(requests) + for (i in 0 until requests) { + threads[i] = Thread { + val result = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow() + resultsCounter.addAndGet(result) + } + } + + val beforeCount = AtomicInteger(0) + SingleThreadedStateMachineManager.beforeClientIDCheck = { + beforeCount.incrementAndGet() + } + + val clientIdNotFound = Semaphore(0) + val waitUntilClientIdNotFound = Semaphore(0) + SingleThreadedStateMachineManager.onClientIDNotFound = { + // Only the first request should reach this point + waitUntilClientIdNotFound.release() + clientIdNotFound.acquire() + } + + for (i in 0 until requests) { + threads[i]!!.start() + } + + waitUntilClientIdNotFound.acquire() + for (i in 0 until requests) { + clientIdNotFound.release() + } + + for (thread in threads) { + thread!!.join() + } + Assert.assertEquals(1, counter.get()) + Assert.assertEquals(2, beforeCount.get()) + Assert.assertEquals(10, resultsCounter.get()) + } + + + @Test(timeout=300_000) + fun `on node start -running- flows with client id are hook-able`() { + val clientId = UUID.randomUUID().toString() + var noSecondFlowWasSpawned = 0 + var firstRun = true + var firstFiber: Fiber? = null + val flowIsRunning = Semaphore(0) + val waitUntilFlowIsRunning = Semaphore(0) + + ResultFlow.suspendableHook = object : FlowLogic() { + @Suspendable + override fun call() { + if (firstRun) { + firstFiber = Fiber.currentFiber() + firstRun = false + } + + waitUntilFlowIsRunning.release() + try { + flowIsRunning.acquire() // make flow wait here to impersonate a running flow + } catch (e: InterruptedException) { + flowIsRunning.release() + throw e + } + + noSecondFlowWasSpawned++ + } + } + + val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) + waitUntilFlowIsRunning.acquire() + aliceNode.internals.acceptableLiveFiberCountOnStop = 1 + val aliceNode = mockNet.restartNode(aliceNode) + // Blow up the first fiber running our flow as it is leaked here, on normal node shutdown that fiber should be gone + firstFiber!!.interrupt() + + waitUntilFlowIsRunning.acquire() + // Re-hook a running flow + val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) + flowIsRunning.release() + + Assert.assertEquals(flowHandle0.id, flowHandle1.id) + Assert.assertEquals(clientId, flowHandle1.clientId) + Assert.assertEquals(5, flowHandle1.resultFuture.getOrThrow(20.seconds)) + Assert.assertEquals(1, noSecondFlowWasSpawned) + } + +// @Test(timeout=300_000) +// fun `on node restart -paused- flows with client id are hook-able`() { +// val clientId = UUID.randomUUID().toString() +// var noSecondFlowWasSpawned = 0 +// var firstRun = true +// var firstFiber: Fiber? = null +// val flowIsRunning = Semaphore(0) +// val waitUntilFlowIsRunning = Semaphore(0) +// +// ResultFlow.suspendableHook = object : FlowLogic() { +// @Suspendable +// override fun call() { +// if (firstRun) { +// firstFiber = Fiber.currentFiber() +// firstRun = false +// } +// +// waitUntilFlowIsRunning.release() +// try { +// flowIsRunning.acquire() // make flow wait here to impersonate a running flow +// } catch (e: InterruptedException) { +// flowIsRunning.release() +// throw e +// } +// +// noSecondFlowWasSpawned++ +// } +// } +// +// val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) +// waitUntilFlowIsRunning.acquire() +// aliceNode.internals.acceptableLiveFiberCountOnStop = 1 +// // Pause the flow on node restart +// val aliceNode = mockNet.restartNode(aliceNode, +// InternalMockNodeParameters( +// configOverrides = { +// doReturn(StateMachineManager.StartMode.Safe).whenever(it).smmStartMode +// } +// )) +// // Blow up the first fiber running our flow as it is leaked here, on normal node shutdown that fiber should be gone +// firstFiber!!.interrupt() +// +// // Re-hook a paused flow +// val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) +// +// Assert.assertEquals(flowHandle0.id, flowHandle1.id) +// Assert.assertEquals(clientId, flowHandle1.clientId) +// aliceNode.smm.unPauseFlow(flowHandle1.id) +// Assert.assertEquals(5, flowHandle1.resultFuture.getOrThrow(20.seconds)) +// Assert.assertEquals(1, noSecondFlowWasSpawned) +// } + + @Test(timeout=300_000) + fun `On 'startFlowInternal' throwing, subsequent request with same client id does not get de-duplicated and starts a new flow`() { + val clientId = UUID.randomUUID().toString() + var firstRequest = true + SingleThreadedStateMachineManager.onCallingStartFlowInternal = { + if (firstRequest) { + firstRequest = false + throw IllegalStateException("Yet another one") + } + } + var counter = 0 + ResultFlow.hook = { counter++ } + + assertFailsWith { + aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) + } + + val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) + flowHandle1.resultFuture.getOrThrow(20.seconds) + + assertEquals(clientId, flowHandle1.clientId) + assertEquals(1, counter) + } + +// @Test(timeout=300_000) +// fun `On 'startFlowInternal' throwing, subsequent request with same client hits the time window in which the previous request was about to remove the client id mapping`() { +// val clientId = UUID.randomUUID().toString() +// var firstRequest = true +// SingleThreadedStateMachineManager.onCallingStartFlowInternal = { +// if (firstRequest) { +// firstRequest = false +// throw IllegalStateException("Yet another one") +// } +// } +// +// val wait = Semaphore(0) +// val waitForFirstRequest = Semaphore(0) +// SingleThreadedStateMachineManager.onStartFlowInternalThrewAndAboutToRemove = { +// waitForFirstRequest.release() +// wait.acquire() +// Thread.sleep(10000) +// } +// var counter = 0 +// ResultFlow.hook = { counter++ } +// +// thread { +// assertFailsWith { +// aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) +// } +// } +// +// waitForFirstRequest.acquire() +// wait.release() +// assertFailsWith { +// // the subsequent request will not hang on a never ending future, because the previous request ,upon failing, will also complete the future exceptionally +// aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) +// } +// +// assertEquals(0, counter) +// } +} + +internal class ResultFlow(private val result: A): FlowLogic() { + companion object { + var hook: (() -> Unit)? = null + var suspendableHook: FlowLogic? = null + } + + @Suspendable + override fun call(): A { + hook?.invoke() + suspendableHook?.let { subFlow(it) } + return result + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt index ddac3afba8..a3b36ee391 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt @@ -2,6 +2,7 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable import net.corda.client.rpc.CordaRPCClient +import net.corda.core.CordaRuntimeException import net.corda.core.context.InvocationContext import net.corda.core.contracts.BelongsToContract import net.corda.core.contracts.LinearState @@ -46,12 +47,14 @@ import org.junit.Before import org.junit.Ignore import org.junit.Test import java.time.Instant +import java.util.UUID import java.util.concurrent.CompletableFuture import java.util.concurrent.Executors import java.util.concurrent.Semaphore import java.util.function.Supplier import kotlin.reflect.jvm.jvmName import kotlin.test.assertEquals +import kotlin.test.assertFailsWith import kotlin.test.assertNotNull import kotlin.test.assertNull import kotlin.test.assertTrue @@ -87,9 +90,11 @@ class FlowMetadataRecordingTest { metadata = metadataFromHook } + val clientId = UUID.randomUUID().toString() CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { - it.proxy.startFlow( - ::MyFlow, + it.proxy.startFlowDynamicWithClientId( + clientId, + MyFlow::class.java, nodeBHandle.nodeInfo.singleIdentity(), string, someObject @@ -101,7 +106,7 @@ class FlowMetadataRecordingTest { assertEquals(flowId!!.uuid.toString(), it.flowId) assertEquals(MyFlow::class.java.name, it.flowName) // Should be changed when [userSuppliedIdentifier] gets filled in future changes - assertNull(it.userSuppliedIdentifier) + assertEquals(clientId, it.userSuppliedIdentifier) assertEquals(DBCheckpointStorage.StartReason.RPC, it.startType) assertEquals( listOf(nodeBHandle.nodeInfo.singleIdentity(), string, someObject), @@ -192,7 +197,7 @@ class FlowMetadataRecordingTest { assertEquals( listOf(nodeBHandle.nodeInfo.singleIdentity(), string, someObject), - uncheckedCast>(context!!.arguments[1]).toList() + uncheckedCast>(context!!.arguments!![1]).toList() ) assertEquals( listOf(nodeBHandle.nodeInfo.singleIdentity(), string, someObject), @@ -393,6 +398,19 @@ class FlowMetadataRecordingTest { } } + @Test(timeout = 300_000) + fun `assert that flow started with longer client id than MAX_CLIENT_ID_LENGTH fails`() { + val clientId = "1".repeat(513) // DBCheckpointStorage.MAX_CLIENT_ID_LENGTH == 512 + driver(DriverParameters(startNodesInProcess = true)) { + val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val rpc = CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).proxy + + assertFailsWith("clientId cannot be longer than ${DBCheckpointStorage.MAX_CLIENT_ID_LENGTH} characters") { + rpc.startFlowDynamicWithClientId(clientId, EmptyFlow::class.java).returnValue.getOrThrow() + } + } + } + @InitiatingFlow @StartableByRPC @StartableByService @@ -553,4 +571,11 @@ class FlowMetadataRecordingTest { return ScheduledActivity(logicRef, Instant.now()) } } + + @StartableByRPC + class EmptyFlow : FlowLogic() { + @Suspendable + override fun call() { + } + } } \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index 3f5c249424..3f0d51612d 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -11,7 +11,6 @@ import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.KilledFlowException import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party -import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.concurrent.flatMap import net.corda.core.messaging.MessageRecipients import net.corda.core.utilities.UntrustworthyData @@ -148,7 +147,7 @@ class RetryFlowMockTest { // Make sure we have seen an update from the hospital, and thus the flow went there. val alice = TestIdentity(CordaX500Name.parse("L=London,O=Alice Ltd,OU=Trade,C=GB")).party val records = nodeA.smm.flowHospital.track().updates.toBlocking().toIterable().iterator() - val flow: FlowStateMachine = nodeA.services.startFlow(FinalityHandler(object : FlowSession() { + val flow = nodeA.services.startFlow(FinalityHandler(object : FlowSession() { override val destination: Destination get() = alice override val counterparty: Party get() = alice diff --git a/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/matchers/flow/FlowMatchers.kt b/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/matchers/flow/FlowMatchers.kt index 42b27f10c9..abb1ee9ab9 100644 --- a/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/matchers/flow/FlowMatchers.kt +++ b/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/matchers/flow/FlowMatchers.kt @@ -2,23 +2,23 @@ package net.corda.coretesting.internal.matchers.flow import com.natpryce.hamkrest.Matcher import com.natpryce.hamkrest.equalTo -import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.FlowStateMachineHandle import net.corda.coretesting.internal.matchers.* /** * Matches a Flow that succeeds with a result matched by the given matcher */ -fun willReturn(): Matcher> = net.corda.coretesting.internal.matchers.future.willReturn() - .extrude(FlowStateMachine::resultFuture) +fun willReturn(): Matcher> = net.corda.coretesting.internal.matchers.future.willReturn() + .extrude(FlowStateMachineHandle::resultFuture) .redescribe { "is a flow that will return" } -fun willReturn(expected: T): Matcher> = willReturn(equalTo(expected)) +fun willReturn(expected: T): Matcher> = willReturn(equalTo(expected)) /** * Matches a Flow that succeeds with a result matched by the given matcher */ fun willReturn(successMatcher: Matcher) = net.corda.coretesting.internal.matchers.future.willReturn(successMatcher) - .extrude(FlowStateMachine::resultFuture) + .extrude(FlowStateMachineHandle::resultFuture) .redescribe { "is a flow that will return with a value that ${successMatcher.description}" } /** @@ -26,7 +26,7 @@ fun willReturn(successMatcher: Matcher) = net.corda.coretesting.internal. */ inline fun willThrow(failureMatcher: Matcher) = net.corda.coretesting.internal.matchers.future.willThrow(failureMatcher) - .extrude(FlowStateMachine<*>::resultFuture) + .extrude(FlowStateMachineHandle<*>::resultFuture) .redescribe { "is a flow that will fail, throwing an exception that ${failureMatcher.description}" } /** @@ -34,5 +34,5 @@ inline fun willThrow(failureMatcher: Matcher) = */ inline fun willThrow() = net.corda.coretesting.internal.matchers.future.willThrow() - .extrude(FlowStateMachine<*>::resultFuture) + .extrude(FlowStateMachineHandle<*>::resultFuture) .redescribe { "is a flow that will fail with an exception of type ${E::class.java.simpleName}" } \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt index 67bfb8e1b8..d009c9a286 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt @@ -7,7 +7,7 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext import net.corda.core.flows.FlowLogic import net.corda.core.identity.CordaX500Name -import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.div @@ -256,7 +256,10 @@ class NodeListenProcessDeathException(hostAndPort: NetworkHostAndPort, listenPro """.trimIndent() ) -fun StartedNodeServices.startFlow(logic: FlowLogic): FlowStateMachine = startFlow(logic, newContext()).getOrThrow() +fun StartedNodeServices.startFlow(logic: FlowLogic): FlowStateMachineHandle = startFlow(logic, newContext()).getOrThrow() + +fun StartedNodeServices.startFlowWithClientId(clientId: String, logic: FlowLogic): FlowStateMachineHandle = + startFlow(logic, newContext().copy(clientId = clientId)).getOrThrow() fun StartedNodeServices.newContext(): InvocationContext = testContext(myInfo.chooseIdentity().name)