diff --git a/build.gradle b/build.gradle index a2b2fd4348..7e44d8b37f 100644 --- a/build.gradle +++ b/build.gradle @@ -187,7 +187,7 @@ buildscript { // See https://github.com/corda/gradle-capsule-plugin classpath "us.kirchmeier:gradle-capsule-plugin:1.0.4_r3" classpath group: "com.r3.testing", name: "gradle-distributed-testing-plugin", version: "1.2-LOCAL-K8S-SHARED-CACHE-SNAPSHOT", changing: true - classpath group: "com.r3.dependx", name: "gradle-dependx", version: "0.1.12", changing: true + classpath group: "com.r3.dependx", name: "gradle-dependx", version: "0.1.13", changing: true classpath "com.bmuschko:gradle-docker-plugin:5.0.0" } } diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/AbstractFlowExternalOperationTest.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/AbstractFlowExternalOperationTest.kt index 735db12c64..559369c442 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/AbstractFlowExternalOperationTest.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/AbstractFlowExternalOperationTest.kt @@ -20,6 +20,7 @@ import net.corda.core.schemas.MappedSchema import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.unwrap import net.corda.node.services.statemachine.StaffedFlowHospital import org.junit.Before import java.sql.SQLTransientConnectionException @@ -72,11 +73,12 @@ abstract class AbstractFlowExternalOperationTest { @Suspendable override fun call(): Any { log.info("Started my flow") + subFlow(PingPongFlow(party)) val result = testCode() val session = initiateFlow(party) - session.send("hi there") - log.info("ServiceHub value = $serviceHub") - session.receive() + session.sendAndReceive("hi there").unwrap { it } + session.sendAndReceive("hi there").unwrap { it } + subFlow(PingPongFlow(party)) log.info("Finished my flow") return result } @@ -92,8 +94,28 @@ abstract class AbstractFlowExternalOperationTest { class FlowWithExternalOperationResponder(val session: FlowSession) : FlowLogic() { @Suspendable override fun call() { - session.receive() + session.receive().unwrap { it } session.send("go away") + session.receive().unwrap { it } + session.send("go away") + } + } + + @InitiatingFlow + class PingPongFlow(val party: Party): FlowLogic() { + @Suspendable + override fun call() { + val session = initiateFlow(party) + session.sendAndReceive("ping pong").unwrap { it } + } + } + + @InitiatedBy(PingPongFlow::class) + class PingPongResponder(val session: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + session.receive().unwrap { it } + session.send("I got you bro") } } @@ -111,7 +133,7 @@ abstract class AbstractFlowExternalOperationTest { fun createFuture(): CompletableFuture { return CompletableFuture.supplyAsync(Supplier { log.info("Starting sleep inside of future") - Thread.sleep(2000) + Thread.sleep(1000) log.info("Finished sleep inside of future") "Here is your return value" }, executorService) diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt index cec2568214..cbf1892e51 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt @@ -177,12 +177,14 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() { FlowWithExternalProcess(party) { @Suspendable - override fun testCode(): Any = - await(ExternalAsyncOperation(serviceHub) { _, _ -> + override fun testCode(): Any { + val e = createException() + return await(ExternalAsyncOperation(serviceHub) { _, _ -> CompletableFuture().apply { - completeExceptionally(createException()) + completeExceptionally(e) } }) + } private fun createException() = when (exceptionType) { HospitalizeFlowException::class.java -> HospitalizeFlowException("keep it around") diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalOperationTest.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalOperationTest.kt index 1b78e95732..5fe3809684 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalOperationTest.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalOperationTest.kt @@ -235,7 +235,10 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() { FlowWithExternalProcess(party) { @Suspendable - override fun testCode(): Any = await(ExternalOperation(serviceHub) { _, _ -> throw createException() }) + override fun testCode() { + val e = createException() + await(ExternalOperation(serviceHub) { _, _ -> throw e }) + } private fun createException() = when (exceptionType) { HospitalizeFlowException::class.java -> HospitalizeFlowException("keep it around") diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 808c68fbe7..8e4abdb05a 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -380,10 +380,8 @@ abstract class FlowLogic { @Suspendable @Throws(FlowException::class) open fun subFlow(subLogic: FlowLogic): R { - subLogic.stateMachine = stateMachine - maybeWireUpProgressTracking(subLogic) logger.debug { "Calling subflow: $subLogic" } - val result = stateMachine.subFlow(subLogic) + val result = stateMachine.subFlow(this, subLogic) logger.debug { "Subflow finished with result ${result.toString().abbreviate(300)}" } return result } @@ -540,18 +538,6 @@ abstract class FlowLogic { _stateMachine = value } - private fun maybeWireUpProgressTracking(subLogic: FlowLogic<*>) { - val ours = progressTracker - val theirs = subLogic.progressTracker - if (ours != null && theirs != null && ours != theirs) { - if (ours.currentStep == ProgressTracker.UNSTARTED) { - logger.debug { "Initializing the progress tracker for flow: ${this::class.java.name}." } - ours.nextStep() - } - ours.setChildProgressTracker(ours.currentStep, theirs) - } - } - private fun enforceNoDuplicates(sessions: List) { require(sessions.size == sessions.toSet().size) { "A flow session can only appear once as argument." } } @@ -579,12 +565,7 @@ abstract class FlowLogic { @Suspendable fun await(operation: FlowExternalAsyncOperation): R { // Wraps the passed in [FlowExternalAsyncOperation] so its [CompletableFuture] can be converted into a [CordaFuture] - val flowAsyncOperation = object : FlowAsyncOperation, WrappedFlowExternalAsyncOperation { - override val operation = operation - override fun execute(deduplicationId: String): CordaFuture { - return this.operation.execute(deduplicationId).asCordaFuture() - } - } + val flowAsyncOperation = WrappedFlowExternalAsyncOperation(operation) val request = FlowIORequest.ExecuteAsyncOperation(flowAsyncOperation) return stateMachine.suspend(request, false) } @@ -598,18 +579,7 @@ abstract class FlowLogic { */ @Suspendable fun await(operation: FlowExternalOperation): R { - val flowAsyncOperation = object : FlowAsyncOperation, WrappedFlowExternalOperation { - override val serviceHub = this@FlowLogic.serviceHub as ServiceHubCoreInternal - override val operation = operation - override fun execute(deduplicationId: String): CordaFuture { - // Using a [CompletableFuture] allows unhandled exceptions to be thrown inside the background operation - // the exceptions will be set on the future by [CompletableFuture.AsyncSupply.run] - return CompletableFuture.supplyAsync( - Supplier { this.operation.execute(deduplicationId) }, - serviceHub.externalOperationExecutor - ).asCordaFuture() - } - } + val flowAsyncOperation = WrappedFlowExternalOperation(serviceHub as ServiceHubCoreInternal, operation) val request = FlowIORequest.ExecuteAsyncOperation(flowAsyncOperation) return stateMachine.suspend(request, false) } @@ -619,21 +589,32 @@ abstract class FlowLogic { * [WrappedFlowExternalAsyncOperation] is added to allow jackson to properly reference the data stored within the wrapped * [FlowExternalAsyncOperation]. */ -private interface WrappedFlowExternalAsyncOperation { - val operation: FlowExternalAsyncOperation +private class WrappedFlowExternalAsyncOperation(val operation: FlowExternalAsyncOperation) : FlowAsyncOperation { + override fun execute(deduplicationId: String): CordaFuture { + return operation.execute(deduplicationId).asCordaFuture() + } } /** * [WrappedFlowExternalOperation] is added to allow jackson to properly reference the data stored within the wrapped * [FlowExternalOperation]. * - * The reference to [ServiceHub] is is also needed by Kryo to properly keep a reference to [ServiceHub] so that + * The reference to [ServiceHub] is also needed by Kryo to properly keep a reference to [ServiceHub] so that * [FlowExternalOperation] can be run from the [ServiceHubCoreInternal.externalOperationExecutor] without causing errors when retrying a * flow. A [NullPointerException] is thrown if [FlowLogic.serviceHub] is accessed from [FlowLogic.await] when retrying a flow. */ -private interface WrappedFlowExternalOperation { - val serviceHub: ServiceHub +private class WrappedFlowExternalOperation( + val serviceHub: ServiceHubCoreInternal, val operation: FlowExternalOperation +) : FlowAsyncOperation { + override fun execute(deduplicationId: String): CordaFuture { + // Using a [CompletableFuture] allows unhandled exceptions to be thrown inside the background operation + // the exceptions will be set on the future by [CompletableFuture.AsyncSupply.run] + return CompletableFuture.supplyAsync( + Supplier { this.operation.execute(deduplicationId) }, + serviceHub.externalOperationExecutor + ).asCordaFuture() + } } /** diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt index d32062eac2..c057efa31e 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt @@ -25,7 +25,7 @@ interface FlowStateMachine { fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map) @Suspendable - fun subFlow(subFlow: FlowLogic): SUBFLOWRETURN + fun subFlow(currentFlow: FlowLogic<*>, subFlow: FlowLogic): SUBFLOWRETURN @Suspendable fun flowStackSnapshot(flowClass: Class>): FlowStackSnapshot? diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 12b578ec37..9b2244ebae 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -315,7 +315,10 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } @Suspendable - override fun subFlow(subFlow: FlowLogic): R { + override fun subFlow(currentFlow: FlowLogic<*>, subFlow: FlowLogic): R { + subFlow.stateMachine = this + maybeWireUpProgressTracking(currentFlow, subFlow) + checkpointIfSubflowIdempotent(subFlow.javaClass) processEventImmediately( Event.EnterSubFlow(subFlow.javaClass, @@ -338,6 +341,18 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } } + private fun maybeWireUpProgressTracking(currentFlow: FlowLogic<*>, subFlow: FlowLogic<*>) { + val currentFlowProgressTracker = currentFlow.progressTracker + val subflowProgressTracker = subFlow.progressTracker + if (currentFlowProgressTracker != null && subflowProgressTracker != null && currentFlowProgressTracker != subflowProgressTracker) { + if (currentFlowProgressTracker.currentStep == ProgressTracker.UNSTARTED) { + logger.debug { "Initializing the progress tracker for flow: ${this::class.java.name}." } + currentFlowProgressTracker.nextStep() + } + currentFlowProgressTracker.setChildProgressTracker(currentFlowProgressTracker.currentStep, subflowProgressTracker) + } + } + private fun Throwable.isUnrecoverable(): Boolean = this is VirtualMachineError && this !is StackOverflowError /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index bc18d72b6b..4d9b9ddffb 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -624,7 +624,7 @@ class SingleThreadedStateMachineManager( checkpoint = checkpoint, pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(), isFlowResumed = false, - isTransactionTracked = false, + isWaitingForFuture = false, isAnyCheckpointPersisted = existingCheckpoint != null, isStartIdempotent = isStartIdempotent, isRemoved = false, @@ -789,7 +789,7 @@ class SingleThreadedStateMachineManager( checkpoint = checkpoint, pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), isFlowResumed = false, - isTransactionTracked = false, + isWaitingForFuture = false, isAnyCheckpointPersisted = isAnyCheckpointPersisted, isStartIdempotent = isStartIdempotent, isRemoved = false, @@ -808,7 +808,7 @@ class SingleThreadedStateMachineManager( checkpoint = checkpoint, pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), isFlowResumed = false, - isTransactionTracked = false, + isWaitingForFuture = false, isAnyCheckpointPersisted = isAnyCheckpointPersisted, isStartIdempotent = isStartIdempotent, isRemoved = false, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 4e4a1414a4..842758e599 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -26,7 +26,7 @@ import java.time.Instant * @param pendingDeduplicationHandlers the list of incomplete deduplication handlers. * @param isFlowResumed true if the control is returned (or being returned) to "user-space" flow code. This is used * to make [Event.DoRemainingWork] idempotent. - * @param isTransactionTracked true if a ledger transaction has been tracked as part of a + * @param isWaitingForFuture true if the flow is waiting for the completion of a future triggered by one of the statemachine's actions * [FlowIORequest.WaitForLedgerCommit]. This used is to make tracking idempotent. * @param isAnyCheckpointPersisted true if at least a single checkpoint has been persisted. This is used to determine * whether we should DELETE the checkpoint at the end of the flow. @@ -39,15 +39,15 @@ import java.time.Instant // TODO perhaps add a read-only environment to the state machine for things that don't change over time? // TODO evaluate persistent datastructure libraries to replace the inefficient copying we currently do. data class StateMachineState( - val checkpoint: Checkpoint, - val flowLogic: FlowLogic<*>, - val pendingDeduplicationHandlers: List, - val isFlowResumed: Boolean, - val isTransactionTracked: Boolean, - val isAnyCheckpointPersisted: Boolean, - val isStartIdempotent: Boolean, - val isRemoved: Boolean, - val senderUUID: String? + val checkpoint: Checkpoint, + val flowLogic: FlowLogic<*>, + val pendingDeduplicationHandlers: List, + val isFlowResumed: Boolean, + val isWaitingForFuture: Boolean, + val isAnyCheckpointPersisted: Boolean, + val isStartIdempotent: Boolean, + val isRemoved: Boolean, + val senderUUID: String? ) /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 7252fca784..cd1681bdf5 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -95,14 +95,16 @@ class StartedFlowTransition( } private fun waitForLedgerCommitTransition(flowIORequest: FlowIORequest.WaitForLedgerCommit): TransitionResult { - return if (!startingState.isTransactionTracked) { + // This ensures that the [WaitForLedgerCommit] request is not executed multiple times if extra + // [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up + return if (!startingState.isWaitingForFuture) { TransitionResult( - newState = startingState.copy(isTransactionTracked = true), - actions = listOf( - Action.CreateTransaction, - Action.TrackTransaction(flowIORequest.hash), - Action.CommitTransaction - ) + newState = startingState.copy(isWaitingForFuture = true), + actions = listOf( + Action.CreateTransaction, + Action.TrackTransaction(flowIORequest.hash), + Action.CommitTransaction + ) ) } else { TransitionResult(startingState) @@ -416,12 +418,19 @@ class StartedFlowTransition( } private fun executeAsyncOperation(flowIORequest: FlowIORequest.ExecuteAsyncOperation<*>): TransitionResult { - return builder { - // The `numberOfSuspends` is added to the deduplication ID in case an async - // operation is executed multiple times within the same flow. - val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.checkpointState.numberOfSuspends.toString() - actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation)) - FlowContinuation.ProcessEvents + // This ensures that the [ExecuteAsyncOperation] request is not executed multiple times if extra + // [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up + return if (!startingState.isWaitingForFuture) { + builder { + // The `numberOfSuspends` is added to the deduplication ID in case an async + // operation is executed multiple times within the same flow. + val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.checkpointState.numberOfSuspends.toString() + actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation)) + currentState = currentState.copy(isWaitingForFuture = true) + FlowContinuation.ProcessEvents + } + } else { + TransitionResult(startingState) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index ca92f11883..31ebaf2fcb 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -1,5 +1,6 @@ package net.corda.node.services.statemachine.transitions +import net.corda.core.crypto.SecureHash import net.corda.core.flows.InitiatingFlow import net.corda.core.internal.FlowIORequest import net.corda.core.utilities.Try @@ -60,11 +61,8 @@ class TopLevelTransition( private fun transactionCommittedTransition(event: Event.TransactionCommitted): TransitionResult { return builder { val checkpoint = currentState.checkpoint - if (currentState.isTransactionTracked && - checkpoint.flowState is FlowState.Started && - checkpoint.flowState.flowIORequest is FlowIORequest.WaitForLedgerCommit && - checkpoint.flowState.flowIORequest.hash == event.transaction.id) { - currentState = currentState.copy(isTransactionTracked = false) + if (isWaitingForLedgerCommit(currentState, checkpoint, event.transaction.id)) { + currentState = currentState.copy(isWaitingForFuture = false) if (isErrored()) { return@builder FlowContinuation.ProcessEvents } @@ -76,6 +74,17 @@ class TopLevelTransition( } } + private fun isWaitingForLedgerCommit( + currentState: StateMachineState, + checkpoint: Checkpoint, + transactionId: SecureHash + ): Boolean { + return currentState.isWaitingForFuture && + checkpoint.flowState is FlowState.Started && + checkpoint.flowState.flowIORequest is FlowIORequest.WaitForLedgerCommit && + checkpoint.flowState.flowIORequest.hash == transactionId + } + private fun softShutdownTransition(): TransitionResult { val lastState = startingState.copy(isRemoved = true) return TransitionResult( diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt index bd5b317e32..09336fa96d 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt @@ -68,13 +68,13 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi fun resumeFlowLogic(result: Any?): FlowContinuation { actions.add(Action.CreateTransaction) - currentState = currentState.copy(isFlowResumed = true) + currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false) return FlowContinuation.Resume(result) } fun resumeFlowLogic(result: Throwable): FlowContinuation { actions.add(Action.CreateTransaction) - currentState = currentState.copy(isFlowResumed = true) + currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false) return FlowContinuation.Throw(result) } } diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/UniquenessProviderTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/UniquenessProviderTests.kt index b33b093467..07679e0f77 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/UniquenessProviderTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/UniquenessProviderTests.kt @@ -80,7 +80,7 @@ class UniquenessProviderTests( } /* - There are 6 types of transactions to test: + There are 7 types of transaction to test: A B C D E F G ================== === === === === === === === @@ -95,27 +95,91 @@ class UniquenessProviderTests( /* Group A: only time window */ @Test(timeout=300_000) - fun `commits transaction with valid time window`() { - val inputState1 = generateStateRef() + fun `rejects transaction before time window is valid`() { val firstTxId = SecureHash.randomSHA256() - val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes)) - val result = uniquenessProvider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get() - assert(result is UniquenessProvider.Result.Success) + val timeWindow = TimeWindow.between( + Clock.systemUTC().instant().plus(30.minutes), + Clock.systemUTC().instant().plus(60.minutes)) + val result = uniquenessProvider.commit( + emptyList(), firstTxId, identity, requestSignature, timeWindow).get() + assert(result is UniquenessProvider.Result.Failure) + val error = (result as UniquenessProvider.Result.Failure).error as NotaryError.TimeWindowInvalid + assertEquals(timeWindow, error.txTimeWindow) - // Idempotency: can re-notarise successfully later. + // Once time window behaviour has changed, we should add an additional test case here to check + // that retry within time window still fails. We can't do that now because currently it will + // succeed and that will result in the past time window case succeeding too. + + // Retry still fails after advancing past time window testClock.advanceBy(90.minutes) - val result2 = uniquenessProvider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get() - assert(result2 is UniquenessProvider.Result.Success) + val result2 = uniquenessProvider.commit( + emptyList(), firstTxId, identity, requestSignature, timeWindow).get() + assert(result2 is UniquenessProvider.Result.Failure) + val error2 = (result2 as UniquenessProvider.Result.Failure).error as NotaryError.TimeWindowInvalid + assertEquals(timeWindow, error2.txTimeWindow) } @Test(timeout=300_000) - fun `rejects transaction with invalid time window`() { - val inputState1 = generateStateRef() + fun `commits transaction within time window`() { val firstTxId = SecureHash.randomSHA256() - val invalidTimeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().minus(30.minutes)) - val result = uniquenessProvider.commit(listOf(inputState1), firstTxId, identity, requestSignature, invalidTimeWindow).get() + val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes)) + val result = uniquenessProvider.commit( + emptyList(), firstTxId, identity, requestSignature, timeWindow).get() + assert(result is UniquenessProvider.Result.Success) + + // Retry is successful whilst still within time window + testClock.advanceBy(10.minutes) + val result2 = uniquenessProvider.commit( + emptyList(), firstTxId, identity, requestSignature, timeWindow).get() + assert(result2 is UniquenessProvider.Result.Success) + + // Retry is successful after time window has expired + testClock.advanceBy(80.minutes) + val result3 = uniquenessProvider.commit( + emptyList(), firstTxId, identity, requestSignature, timeWindow).get() + assert(result3 is UniquenessProvider.Result.Success) + } + + @Test(timeout=300_000) + fun `rejects transaction after time window has expired`() { + val firstTxId = SecureHash.randomSHA256() + val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().minus(30.minutes)) + val result = uniquenessProvider.commit( + emptyList(), firstTxId, identity, requestSignature, timeWindow).get() + assert(result is UniquenessProvider.Result.Failure) val error = (result as UniquenessProvider.Result.Failure).error as NotaryError.TimeWindowInvalid - assertEquals(invalidTimeWindow, error.txTimeWindow) + assertEquals(timeWindow, error.txTimeWindow) + + // Retry still fails at a later time + testClock.advanceBy(10.minutes) + val result2 = uniquenessProvider.commit( + emptyList(), firstTxId, identity, requestSignature, timeWindow).get() + assert(result2 is UniquenessProvider.Result.Failure) + val error2 = (result2 as UniquenessProvider.Result.Failure).error as NotaryError.TimeWindowInvalid + assertEquals(timeWindow, error2.txTimeWindow) + } + + @Test(timeout=300_000) + fun `time window only transactions are processed correctly when duplicate requests occur in succession`() { + val firstTxId = SecureHash.randomSHA256() + val secondTxId = SecureHash.randomSHA256() + val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes)) + val invalidTimeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().minus(30.minutes)) + + val validFuture1 = uniquenessProvider.commit( + emptyList(), firstTxId, identity, requestSignature, timeWindow) + val validFuture2 = uniquenessProvider.commit( + emptyList(), firstTxId, identity, requestSignature, timeWindow) + val invalidFuture1 = uniquenessProvider.commit( + emptyList(), secondTxId, identity, requestSignature, invalidTimeWindow) + val invalidFuture2 = uniquenessProvider.commit( + emptyList(), secondTxId, identity, requestSignature, invalidTimeWindow) + + // Ensure that transactions are processed correctly and duplicates get the same responses to original + assert(validFuture1.get() is UniquenessProvider.Result.Success) + assert(validFuture2.get() is UniquenessProvider.Result.Success) + assert(invalidFuture1.get() is UniquenessProvider.Result.Failure) + assert(invalidFuture2.get() is UniquenessProvider.Result.Failure) } /* Group B: only reference states */ @@ -154,6 +218,52 @@ class UniquenessProviderTests( assertEquals(StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE, conflictCause.type) } + @Test(timeout=300_000) + fun `commits retry transaction when reference states were spent since initial transaction`() { + val firstTxId = SecureHash.randomSHA256() + val referenceState = generateStateRef() + + val result = uniquenessProvider.commit( + emptyList(), firstTxId, identity, requestSignature, references = listOf(referenceState)) + .get() + assert(result is UniquenessProvider.Result.Success) + + // Spend reference state + val secondTxId = SecureHash.randomSHA256() + val result2 = uniquenessProvider.commit( + listOf(referenceState), secondTxId, identity, requestSignature, references = emptyList()) + .get() + assert(result2 is UniquenessProvider.Result.Success) + + // Retry referencing the now spent state still succeeds + val result3 = uniquenessProvider.commit( + emptyList(), firstTxId, identity, requestSignature, references = listOf(referenceState)) + .get() + assert(result3 is UniquenessProvider.Result.Success) + } + + @Test(timeout=300_000) + fun `reference state only transactions are processed correctly when duplicate requests occur in succession`() { + val firstTxId = SecureHash.randomSHA256() + val secondTxId = SecureHash.randomSHA256() + val referenceState = generateStateRef() + + val validFuture3 = uniquenessProvider.commit( + emptyList(), firstTxId, identity, requestSignature, references = listOf(referenceState)) + val validFuture4 = uniquenessProvider.commit( + emptyList(), firstTxId, identity, requestSignature, references = listOf(referenceState)) + val validFuture1 = uniquenessProvider.commit( + emptyList(), secondTxId, identity, requestSignature, references = listOf(referenceState)) + val validFuture2 = uniquenessProvider.commit( + emptyList(), secondTxId, identity, requestSignature, references = listOf(referenceState)) + + // Ensure that transactions are processed correctly and duplicates get the same responses to original + assert(validFuture1.get() is UniquenessProvider.Result.Success) + assert(validFuture2.get() is UniquenessProvider.Result.Success) + assert(validFuture3.get() is UniquenessProvider.Result.Success) + assert(validFuture4.get() is UniquenessProvider.Result.Success) + } + /* Group C: reference states & time window */ @Test(timeout=300_000) @@ -262,6 +372,28 @@ class UniquenessProviderTests( assertEquals(firstTxId.sha256(), conflictCause.hashOfTransactionId) } + @Test(timeout=300_000) + fun `input state only transactions are processed correctly when duplicate requests occur in succession`() { + val firstTxId = SecureHash.randomSHA256() + val secondTxId = SecureHash.randomSHA256() + val inputState = generateStateRef() + + val validFuture1 = uniquenessProvider.commit( + listOf(inputState), firstTxId, identity, requestSignature) + val validFuture2 = uniquenessProvider.commit( + listOf(inputState), firstTxId, identity, requestSignature) + val invalidFuture1 = uniquenessProvider.commit( + listOf(inputState), secondTxId, identity, requestSignature) + val invalidFuture2 = uniquenessProvider.commit( + listOf(inputState), secondTxId, identity, requestSignature) + + // Ensure that transactions are processed correctly and duplicates get the same responses to original + assert(validFuture1.get() is UniquenessProvider.Result.Success) + assert(validFuture2.get() is UniquenessProvider.Result.Success) + assert(invalidFuture1.get() is UniquenessProvider.Result.Failure) + assert(invalidFuture2.get() is UniquenessProvider.Result.Failure) + } + /* Group E: input states & time window */ @Test(timeout=300_000) @@ -346,6 +478,37 @@ class UniquenessProviderTests( assert(result2 is UniquenessProvider.Result.Success) } + @Test(timeout=300_000) + fun `re-notarise after reference state is spent`() { + val firstTxId = SecureHash.randomSHA256() + val inputState = generateStateRef() + val referenceState = generateStateRef() + val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes)) + + val result = uniquenessProvider.commit( + listOf(inputState), firstTxId, identity, requestSignature, timeWindow, references = listOf(referenceState)) + .get() + assert(result is UniquenessProvider.Result.Success) + + // Spend the reference state. + val referenceSpend = uniquenessProvider.commit( + listOf(referenceState), + SecureHash.randomSHA256(), + identity, + requestSignature, + timeWindow, + emptyList()).get() + assert(referenceSpend is UniquenessProvider.Result.Success) + + // Idempotency: can re-notarise successfully + testClock.advanceBy(90.minutes) + val result2 = uniquenessProvider.commit( + listOf(inputState), firstTxId, identity, requestSignature, timeWindow, references = listOf(referenceState)) + .get() + // Known failure - this should return success. Will be fixed in a future release. + assert(result2 is UniquenessProvider.Result.Failure) + } + @Test(timeout=300_000) fun `rejects transaction with unused reference states and used input states`() { val firstTxId = SecureHash.randomSHA256() @@ -387,6 +550,31 @@ class UniquenessProviderTests( assertEquals(StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE, conflictCause.type) } + @Test(timeout=300_000) + fun `input and reference state transactions are processed correctly when duplicate requests occur in succession`() { + val firstTxId = SecureHash.randomSHA256() + val secondTxId = SecureHash.randomSHA256() + val referenceState = generateStateRef() + + // Ensure batch contains duplicates + val validFuture1 = uniquenessProvider.commit( + emptyList(), secondTxId, identity, requestSignature, references = listOf(referenceState)) + val validFuture2 = uniquenessProvider.commit( + emptyList(), secondTxId, identity, requestSignature, references = listOf(referenceState)) + val validFuture3 = uniquenessProvider.commit( + listOf(referenceState), firstTxId, identity, requestSignature) + + // Attempt to use the reference state after it has been consumed + val validFuture4 = uniquenessProvider.commit( + emptyList(), SecureHash.randomSHA256(), identity, requestSignature, references = listOf(referenceState)) + + // Ensure that transactions are processed correctly and duplicates get the same responses to original + assert(validFuture1.get() is UniquenessProvider.Result.Success) + assert(validFuture2.get() is UniquenessProvider.Result.Success) + assert(validFuture3.get() is UniquenessProvider.Result.Success) + assert(validFuture4.get() is UniquenessProvider.Result.Failure) + } + /* Group G: input, reference states and time window – covered by previous tests. */ /* Transaction signing tests. */