CORDA-3677 FlowExternalOperation serialising reference to FlowLogic (#6094)

* Stop capturing 'FlowLogic' references in flowAsyncOperation;

Creating concrete classes removes the implicit reference to FlowLogic (as this) being included in the anonymous object

* Modify test code so that lambdas no longer get implicit references to their enclosing 'FlowLogic'
This commit is contained in:
Kyriakos Tharrouniatis 2020-03-26 09:23:38 +00:00 committed by GitHub
parent b73a498062
commit caf152f175
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 27 deletions

View File

@ -177,12 +177,14 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any =
await(ExternalAsyncOperation(serviceHub) { _, _ ->
override fun testCode(): Any {
val e = createException()
return await(ExternalAsyncOperation(serviceHub) { _, _ ->
CompletableFuture<Any>().apply {
completeExceptionally(createException())
completeExceptionally(e)
}
})
}
private fun createException() = when (exceptionType) {
HospitalizeFlowException::class.java -> HospitalizeFlowException("keep it around")

View File

@ -235,7 +235,10 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any = await(ExternalOperation(serviceHub) { _, _ -> throw createException() })
override fun testCode() {
val e = createException()
await(ExternalOperation(serviceHub) { _, _ -> throw e })
}
private fun createException() = when (exceptionType) {
HospitalizeFlowException::class.java -> HospitalizeFlowException("keep it around")

View File

@ -565,12 +565,7 @@ abstract class FlowLogic<out T> {
@Suspendable
fun <R : Any> await(operation: FlowExternalAsyncOperation<R>): R {
// Wraps the passed in [FlowExternalAsyncOperation] so its [CompletableFuture] can be converted into a [CordaFuture]
val flowAsyncOperation = object : FlowAsyncOperation<R>, WrappedFlowExternalAsyncOperation<R> {
override val operation = operation
override fun execute(deduplicationId: String): CordaFuture<R> {
return this.operation.execute(deduplicationId).asCordaFuture()
}
}
val flowAsyncOperation = WrappedFlowExternalAsyncOperation(operation)
val request = FlowIORequest.ExecuteAsyncOperation(flowAsyncOperation)
return stateMachine.suspend(request, false)
}
@ -584,18 +579,7 @@ abstract class FlowLogic<out T> {
*/
@Suspendable
fun <R : Any> await(operation: FlowExternalOperation<R>): R {
val flowAsyncOperation = object : FlowAsyncOperation<R>, WrappedFlowExternalOperation<R> {
override val serviceHub = this@FlowLogic.serviceHub as ServiceHubCoreInternal
override val operation = operation
override fun execute(deduplicationId: String): CordaFuture<R> {
// Using a [CompletableFuture] allows unhandled exceptions to be thrown inside the background operation
// the exceptions will be set on the future by [CompletableFuture.AsyncSupply.run]
return CompletableFuture.supplyAsync(
Supplier { this.operation.execute(deduplicationId) },
serviceHub.externalOperationExecutor
).asCordaFuture()
}
}
val flowAsyncOperation = WrappedFlowExternalOperation(serviceHub as ServiceHubCoreInternal, operation)
val request = FlowIORequest.ExecuteAsyncOperation(flowAsyncOperation)
return stateMachine.suspend(request, false)
}
@ -605,21 +589,32 @@ abstract class FlowLogic<out T> {
* [WrappedFlowExternalAsyncOperation] is added to allow jackson to properly reference the data stored within the wrapped
* [FlowExternalAsyncOperation].
*/
private interface WrappedFlowExternalAsyncOperation<R : Any> {
val operation: FlowExternalAsyncOperation<R>
private class WrappedFlowExternalAsyncOperation<R : Any>(val operation: FlowExternalAsyncOperation<R>) : FlowAsyncOperation<R> {
override fun execute(deduplicationId: String): CordaFuture<R> {
return operation.execute(deduplicationId).asCordaFuture()
}
}
/**
* [WrappedFlowExternalOperation] is added to allow jackson to properly reference the data stored within the wrapped
* [FlowExternalOperation].
*
* The reference to [ServiceHub] is is also needed by Kryo to properly keep a reference to [ServiceHub] so that
* The reference to [ServiceHub] is also needed by Kryo to properly keep a reference to [ServiceHub] so that
* [FlowExternalOperation] can be run from the [ServiceHubCoreInternal.externalOperationExecutor] without causing errors when retrying a
* flow. A [NullPointerException] is thrown if [FlowLogic.serviceHub] is accessed from [FlowLogic.await] when retrying a flow.
*/
private interface WrappedFlowExternalOperation<R : Any> {
val serviceHub: ServiceHub
private class WrappedFlowExternalOperation<R : Any>(
val serviceHub: ServiceHubCoreInternal,
val operation: FlowExternalOperation<R>
) : FlowAsyncOperation<R> {
override fun execute(deduplicationId: String): CordaFuture<R> {
// Using a [CompletableFuture] allows unhandled exceptions to be thrown inside the background operation
// the exceptions will be set on the future by [CompletableFuture.AsyncSupply.run]
return CompletableFuture.supplyAsync(
Supplier { this.operation.execute(deduplicationId) },
serviceHub.externalOperationExecutor
).asCordaFuture()
}
}
/**