Merge branch 'release/os/4.5' into os_4.5-feature_checkpoint_table_improvements-merge

This commit is contained in:
Kyriakos Tharrouniatis 2020-03-26 11:37:00 +00:00
commit 6baa775e23
13 changed files with 326 additions and 97 deletions
build.gradle
core-tests/src/test/kotlin/net/corda/coretests/flows
core/src/main/kotlin/net/corda/core
node/src

@ -187,7 +187,7 @@ buildscript {
// See https://github.com/corda/gradle-capsule-plugin // See https://github.com/corda/gradle-capsule-plugin
classpath "us.kirchmeier:gradle-capsule-plugin:1.0.4_r3" classpath "us.kirchmeier:gradle-capsule-plugin:1.0.4_r3"
classpath group: "com.r3.testing", name: "gradle-distributed-testing-plugin", version: "1.2-LOCAL-K8S-SHARED-CACHE-SNAPSHOT", changing: true classpath group: "com.r3.testing", name: "gradle-distributed-testing-plugin", version: "1.2-LOCAL-K8S-SHARED-CACHE-SNAPSHOT", changing: true
classpath group: "com.r3.dependx", name: "gradle-dependx", version: "0.1.12", changing: true classpath group: "com.r3.dependx", name: "gradle-dependx", version: "0.1.13", changing: true
classpath "com.bmuschko:gradle-docker-plugin:5.0.0" classpath "com.bmuschko:gradle-docker-plugin:5.0.0"
} }
} }

@ -20,6 +20,7 @@ import net.corda.core.schemas.MappedSchema
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.statemachine.StaffedFlowHospital import net.corda.node.services.statemachine.StaffedFlowHospital
import org.junit.Before import org.junit.Before
import java.sql.SQLTransientConnectionException import java.sql.SQLTransientConnectionException
@ -72,11 +73,12 @@ abstract class AbstractFlowExternalOperationTest {
@Suspendable @Suspendable
override fun call(): Any { override fun call(): Any {
log.info("Started my flow") log.info("Started my flow")
subFlow(PingPongFlow(party))
val result = testCode() val result = testCode()
val session = initiateFlow(party) val session = initiateFlow(party)
session.send("hi there") session.sendAndReceive<String>("hi there").unwrap { it }
log.info("ServiceHub value = $serviceHub") session.sendAndReceive<String>("hi there").unwrap { it }
session.receive<String>() subFlow(PingPongFlow(party))
log.info("Finished my flow") log.info("Finished my flow")
return result return result
} }
@ -92,8 +94,28 @@ abstract class AbstractFlowExternalOperationTest {
class FlowWithExternalOperationResponder(val session: FlowSession) : FlowLogic<Unit>() { class FlowWithExternalOperationResponder(val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable @Suspendable
override fun call() { override fun call() {
session.receive<String>() session.receive<String>().unwrap { it }
session.send("go away") session.send("go away")
session.receive<String>().unwrap { it }
session.send("go away")
}
}
@InitiatingFlow
class PingPongFlow(val party: Party): FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(party)
session.sendAndReceive<String>("ping pong").unwrap { it }
}
}
@InitiatedBy(PingPongFlow::class)
class PingPongResponder(val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
session.send("I got you bro")
} }
} }
@ -111,7 +133,7 @@ abstract class AbstractFlowExternalOperationTest {
fun createFuture(): CompletableFuture<Any> { fun createFuture(): CompletableFuture<Any> {
return CompletableFuture.supplyAsync(Supplier<Any> { return CompletableFuture.supplyAsync(Supplier<Any> {
log.info("Starting sleep inside of future") log.info("Starting sleep inside of future")
Thread.sleep(2000) Thread.sleep(1000)
log.info("Finished sleep inside of future") log.info("Finished sleep inside of future")
"Here is your return value" "Here is your return value"
}, executorService) }, executorService)

@ -177,12 +177,14 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
FlowWithExternalProcess(party) { FlowWithExternalProcess(party) {
@Suspendable @Suspendable
override fun testCode(): Any = override fun testCode(): Any {
await(ExternalAsyncOperation(serviceHub) { _, _ -> val e = createException()
return await(ExternalAsyncOperation(serviceHub) { _, _ ->
CompletableFuture<Any>().apply { CompletableFuture<Any>().apply {
completeExceptionally(createException()) completeExceptionally(e)
} }
}) })
}
private fun createException() = when (exceptionType) { private fun createException() = when (exceptionType) {
HospitalizeFlowException::class.java -> HospitalizeFlowException("keep it around") HospitalizeFlowException::class.java -> HospitalizeFlowException("keep it around")

@ -235,7 +235,10 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
FlowWithExternalProcess(party) { FlowWithExternalProcess(party) {
@Suspendable @Suspendable
override fun testCode(): Any = await(ExternalOperation(serviceHub) { _, _ -> throw createException() }) override fun testCode() {
val e = createException()
await(ExternalOperation(serviceHub) { _, _ -> throw e })
}
private fun createException() = when (exceptionType) { private fun createException() = when (exceptionType) {
HospitalizeFlowException::class.java -> HospitalizeFlowException("keep it around") HospitalizeFlowException::class.java -> HospitalizeFlowException("keep it around")

@ -380,10 +380,8 @@ abstract class FlowLogic<out T> {
@Suspendable @Suspendable
@Throws(FlowException::class) @Throws(FlowException::class)
open fun <R> subFlow(subLogic: FlowLogic<R>): R { open fun <R> subFlow(subLogic: FlowLogic<R>): R {
subLogic.stateMachine = stateMachine
maybeWireUpProgressTracking(subLogic)
logger.debug { "Calling subflow: $subLogic" } logger.debug { "Calling subflow: $subLogic" }
val result = stateMachine.subFlow(subLogic) val result = stateMachine.subFlow(this, subLogic)
logger.debug { "Subflow finished with result ${result.toString().abbreviate(300)}" } logger.debug { "Subflow finished with result ${result.toString().abbreviate(300)}" }
return result return result
} }
@ -540,18 +538,6 @@ abstract class FlowLogic<out T> {
_stateMachine = value _stateMachine = value
} }
private fun maybeWireUpProgressTracking(subLogic: FlowLogic<*>) {
val ours = progressTracker
val theirs = subLogic.progressTracker
if (ours != null && theirs != null && ours != theirs) {
if (ours.currentStep == ProgressTracker.UNSTARTED) {
logger.debug { "Initializing the progress tracker for flow: ${this::class.java.name}." }
ours.nextStep()
}
ours.setChildProgressTracker(ours.currentStep, theirs)
}
}
private fun enforceNoDuplicates(sessions: List<FlowSession>) { private fun enforceNoDuplicates(sessions: List<FlowSession>) {
require(sessions.size == sessions.toSet().size) { "A flow session can only appear once as argument." } require(sessions.size == sessions.toSet().size) { "A flow session can only appear once as argument." }
} }
@ -579,12 +565,7 @@ abstract class FlowLogic<out T> {
@Suspendable @Suspendable
fun <R : Any> await(operation: FlowExternalAsyncOperation<R>): R { fun <R : Any> await(operation: FlowExternalAsyncOperation<R>): R {
// Wraps the passed in [FlowExternalAsyncOperation] so its [CompletableFuture] can be converted into a [CordaFuture] // Wraps the passed in [FlowExternalAsyncOperation] so its [CompletableFuture] can be converted into a [CordaFuture]
val flowAsyncOperation = object : FlowAsyncOperation<R>, WrappedFlowExternalAsyncOperation<R> { val flowAsyncOperation = WrappedFlowExternalAsyncOperation(operation)
override val operation = operation
override fun execute(deduplicationId: String): CordaFuture<R> {
return this.operation.execute(deduplicationId).asCordaFuture()
}
}
val request = FlowIORequest.ExecuteAsyncOperation(flowAsyncOperation) val request = FlowIORequest.ExecuteAsyncOperation(flowAsyncOperation)
return stateMachine.suspend(request, false) return stateMachine.suspend(request, false)
} }
@ -598,18 +579,7 @@ abstract class FlowLogic<out T> {
*/ */
@Suspendable @Suspendable
fun <R : Any> await(operation: FlowExternalOperation<R>): R { fun <R : Any> await(operation: FlowExternalOperation<R>): R {
val flowAsyncOperation = object : FlowAsyncOperation<R>, WrappedFlowExternalOperation<R> { val flowAsyncOperation = WrappedFlowExternalOperation(serviceHub as ServiceHubCoreInternal, operation)
override val serviceHub = this@FlowLogic.serviceHub as ServiceHubCoreInternal
override val operation = operation
override fun execute(deduplicationId: String): CordaFuture<R> {
// Using a [CompletableFuture] allows unhandled exceptions to be thrown inside the background operation
// the exceptions will be set on the future by [CompletableFuture.AsyncSupply.run]
return CompletableFuture.supplyAsync(
Supplier { this.operation.execute(deduplicationId) },
serviceHub.externalOperationExecutor
).asCordaFuture()
}
}
val request = FlowIORequest.ExecuteAsyncOperation(flowAsyncOperation) val request = FlowIORequest.ExecuteAsyncOperation(flowAsyncOperation)
return stateMachine.suspend(request, false) return stateMachine.suspend(request, false)
} }
@ -619,21 +589,32 @@ abstract class FlowLogic<out T> {
* [WrappedFlowExternalAsyncOperation] is added to allow jackson to properly reference the data stored within the wrapped * [WrappedFlowExternalAsyncOperation] is added to allow jackson to properly reference the data stored within the wrapped
* [FlowExternalAsyncOperation]. * [FlowExternalAsyncOperation].
*/ */
private interface WrappedFlowExternalAsyncOperation<R : Any> { private class WrappedFlowExternalAsyncOperation<R : Any>(val operation: FlowExternalAsyncOperation<R>) : FlowAsyncOperation<R> {
val operation: FlowExternalAsyncOperation<R> override fun execute(deduplicationId: String): CordaFuture<R> {
return operation.execute(deduplicationId).asCordaFuture()
}
} }
/** /**
* [WrappedFlowExternalOperation] is added to allow jackson to properly reference the data stored within the wrapped * [WrappedFlowExternalOperation] is added to allow jackson to properly reference the data stored within the wrapped
* [FlowExternalOperation]. * [FlowExternalOperation].
* *
* The reference to [ServiceHub] is is also needed by Kryo to properly keep a reference to [ServiceHub] so that * The reference to [ServiceHub] is also needed by Kryo to properly keep a reference to [ServiceHub] so that
* [FlowExternalOperation] can be run from the [ServiceHubCoreInternal.externalOperationExecutor] without causing errors when retrying a * [FlowExternalOperation] can be run from the [ServiceHubCoreInternal.externalOperationExecutor] without causing errors when retrying a
* flow. A [NullPointerException] is thrown if [FlowLogic.serviceHub] is accessed from [FlowLogic.await] when retrying a flow. * flow. A [NullPointerException] is thrown if [FlowLogic.serviceHub] is accessed from [FlowLogic.await] when retrying a flow.
*/ */
private interface WrappedFlowExternalOperation<R : Any> { private class WrappedFlowExternalOperation<R : Any>(
val serviceHub: ServiceHub val serviceHub: ServiceHubCoreInternal,
val operation: FlowExternalOperation<R> val operation: FlowExternalOperation<R>
) : FlowAsyncOperation<R> {
override fun execute(deduplicationId: String): CordaFuture<R> {
// Using a [CompletableFuture] allows unhandled exceptions to be thrown inside the background operation
// the exceptions will be set on the future by [CompletableFuture.AsyncSupply.run]
return CompletableFuture.supplyAsync(
Supplier { this.operation.execute(deduplicationId) },
serviceHub.externalOperationExecutor
).asCordaFuture()
}
} }
/** /**

@ -25,7 +25,7 @@ interface FlowStateMachine<FLOWRETURN> {
fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>) fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>)
@Suspendable @Suspendable
fun <SUBFLOWRETURN> subFlow(subFlow: FlowLogic<SUBFLOWRETURN>): SUBFLOWRETURN fun <SUBFLOWRETURN> subFlow(currentFlow: FlowLogic<*>, subFlow: FlowLogic<SUBFLOWRETURN>): SUBFLOWRETURN
@Suspendable @Suspendable
fun flowStackSnapshot(flowClass: Class<out FlowLogic<*>>): FlowStackSnapshot? fun flowStackSnapshot(flowClass: Class<out FlowLogic<*>>): FlowStackSnapshot?

@ -315,7 +315,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
} }
@Suspendable @Suspendable
override fun <R> subFlow(subFlow: FlowLogic<R>): R { override fun <R> subFlow(currentFlow: FlowLogic<*>, subFlow: FlowLogic<R>): R {
subFlow.stateMachine = this
maybeWireUpProgressTracking(currentFlow, subFlow)
checkpointIfSubflowIdempotent(subFlow.javaClass) checkpointIfSubflowIdempotent(subFlow.javaClass)
processEventImmediately( processEventImmediately(
Event.EnterSubFlow(subFlow.javaClass, Event.EnterSubFlow(subFlow.javaClass,
@ -338,6 +341,18 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
} }
} }
private fun maybeWireUpProgressTracking(currentFlow: FlowLogic<*>, subFlow: FlowLogic<*>) {
val currentFlowProgressTracker = currentFlow.progressTracker
val subflowProgressTracker = subFlow.progressTracker
if (currentFlowProgressTracker != null && subflowProgressTracker != null && currentFlowProgressTracker != subflowProgressTracker) {
if (currentFlowProgressTracker.currentStep == ProgressTracker.UNSTARTED) {
logger.debug { "Initializing the progress tracker for flow: ${this::class.java.name}." }
currentFlowProgressTracker.nextStep()
}
currentFlowProgressTracker.setChildProgressTracker(currentFlowProgressTracker.currentStep, subflowProgressTracker)
}
}
private fun Throwable.isUnrecoverable(): Boolean = this is VirtualMachineError && this !is StackOverflowError private fun Throwable.isUnrecoverable(): Boolean = this is VirtualMachineError && this !is StackOverflowError
/** /**

@ -624,7 +624,7 @@ class SingleThreadedStateMachineManager(
checkpoint = checkpoint, checkpoint = checkpoint,
pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(), pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false, isFlowResumed = false,
isTransactionTracked = false, isWaitingForFuture = false,
isAnyCheckpointPersisted = existingCheckpoint != null, isAnyCheckpointPersisted = existingCheckpoint != null,
isStartIdempotent = isStartIdempotent, isStartIdempotent = isStartIdempotent,
isRemoved = false, isRemoved = false,
@ -789,7 +789,7 @@ class SingleThreadedStateMachineManager(
checkpoint = checkpoint, checkpoint = checkpoint,
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false, isFlowResumed = false,
isTransactionTracked = false, isWaitingForFuture = false,
isAnyCheckpointPersisted = isAnyCheckpointPersisted, isAnyCheckpointPersisted = isAnyCheckpointPersisted,
isStartIdempotent = isStartIdempotent, isStartIdempotent = isStartIdempotent,
isRemoved = false, isRemoved = false,
@ -808,7 +808,7 @@ class SingleThreadedStateMachineManager(
checkpoint = checkpoint, checkpoint = checkpoint,
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false, isFlowResumed = false,
isTransactionTracked = false, isWaitingForFuture = false,
isAnyCheckpointPersisted = isAnyCheckpointPersisted, isAnyCheckpointPersisted = isAnyCheckpointPersisted,
isStartIdempotent = isStartIdempotent, isStartIdempotent = isStartIdempotent,
isRemoved = false, isRemoved = false,

@ -26,7 +26,7 @@ import java.time.Instant
* @param pendingDeduplicationHandlers the list of incomplete deduplication handlers. * @param pendingDeduplicationHandlers the list of incomplete deduplication handlers.
* @param isFlowResumed true if the control is returned (or being returned) to "user-space" flow code. This is used * @param isFlowResumed true if the control is returned (or being returned) to "user-space" flow code. This is used
* to make [Event.DoRemainingWork] idempotent. * to make [Event.DoRemainingWork] idempotent.
* @param isTransactionTracked true if a ledger transaction has been tracked as part of a * @param isWaitingForFuture true if the flow is waiting for the completion of a future triggered by one of the statemachine's actions
* [FlowIORequest.WaitForLedgerCommit]. This used is to make tracking idempotent. * [FlowIORequest.WaitForLedgerCommit]. This used is to make tracking idempotent.
* @param isAnyCheckpointPersisted true if at least a single checkpoint has been persisted. This is used to determine * @param isAnyCheckpointPersisted true if at least a single checkpoint has been persisted. This is used to determine
* whether we should DELETE the checkpoint at the end of the flow. * whether we should DELETE the checkpoint at the end of the flow.
@ -43,7 +43,7 @@ data class StateMachineState(
val flowLogic: FlowLogic<*>, val flowLogic: FlowLogic<*>,
val pendingDeduplicationHandlers: List<DeduplicationHandler>, val pendingDeduplicationHandlers: List<DeduplicationHandler>,
val isFlowResumed: Boolean, val isFlowResumed: Boolean,
val isTransactionTracked: Boolean, val isWaitingForFuture: Boolean,
val isAnyCheckpointPersisted: Boolean, val isAnyCheckpointPersisted: Boolean,
val isStartIdempotent: Boolean, val isStartIdempotent: Boolean,
val isRemoved: Boolean, val isRemoved: Boolean,

@ -95,9 +95,11 @@ class StartedFlowTransition(
} }
private fun waitForLedgerCommitTransition(flowIORequest: FlowIORequest.WaitForLedgerCommit): TransitionResult { private fun waitForLedgerCommitTransition(flowIORequest: FlowIORequest.WaitForLedgerCommit): TransitionResult {
return if (!startingState.isTransactionTracked) { // This ensures that the [WaitForLedgerCommit] request is not executed multiple times if extra
// [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up
return if (!startingState.isWaitingForFuture) {
TransitionResult( TransitionResult(
newState = startingState.copy(isTransactionTracked = true), newState = startingState.copy(isWaitingForFuture = true),
actions = listOf( actions = listOf(
Action.CreateTransaction, Action.CreateTransaction,
Action.TrackTransaction(flowIORequest.hash), Action.TrackTransaction(flowIORequest.hash),
@ -416,13 +418,20 @@ class StartedFlowTransition(
} }
private fun executeAsyncOperation(flowIORequest: FlowIORequest.ExecuteAsyncOperation<*>): TransitionResult { private fun executeAsyncOperation(flowIORequest: FlowIORequest.ExecuteAsyncOperation<*>): TransitionResult {
return builder { // This ensures that the [ExecuteAsyncOperation] request is not executed multiple times if extra
// [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up
return if (!startingState.isWaitingForFuture) {
builder {
// The `numberOfSuspends` is added to the deduplication ID in case an async // The `numberOfSuspends` is added to the deduplication ID in case an async
// operation is executed multiple times within the same flow. // operation is executed multiple times within the same flow.
val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.checkpointState.numberOfSuspends.toString() val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.checkpointState.numberOfSuspends.toString()
actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation)) actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation))
currentState = currentState.copy(isWaitingForFuture = true)
FlowContinuation.ProcessEvents FlowContinuation.ProcessEvents
} }
} else {
TransitionResult(startingState)
}
} }
private fun executeForceCheckpoint(): TransitionResult { private fun executeForceCheckpoint(): TransitionResult {

@ -1,5 +1,6 @@
package net.corda.node.services.statemachine.transitions package net.corda.node.services.statemachine.transitions
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.InitiatingFlow
import net.corda.core.internal.FlowIORequest import net.corda.core.internal.FlowIORequest
import net.corda.core.utilities.Try import net.corda.core.utilities.Try
@ -60,11 +61,8 @@ class TopLevelTransition(
private fun transactionCommittedTransition(event: Event.TransactionCommitted): TransitionResult { private fun transactionCommittedTransition(event: Event.TransactionCommitted): TransitionResult {
return builder { return builder {
val checkpoint = currentState.checkpoint val checkpoint = currentState.checkpoint
if (currentState.isTransactionTracked && if (isWaitingForLedgerCommit(currentState, checkpoint, event.transaction.id)) {
checkpoint.flowState is FlowState.Started && currentState = currentState.copy(isWaitingForFuture = false)
checkpoint.flowState.flowIORequest is FlowIORequest.WaitForLedgerCommit &&
checkpoint.flowState.flowIORequest.hash == event.transaction.id) {
currentState = currentState.copy(isTransactionTracked = false)
if (isErrored()) { if (isErrored()) {
return@builder FlowContinuation.ProcessEvents return@builder FlowContinuation.ProcessEvents
} }
@ -76,6 +74,17 @@ class TopLevelTransition(
} }
} }
private fun isWaitingForLedgerCommit(
currentState: StateMachineState,
checkpoint: Checkpoint,
transactionId: SecureHash
): Boolean {
return currentState.isWaitingForFuture &&
checkpoint.flowState is FlowState.Started &&
checkpoint.flowState.flowIORequest is FlowIORequest.WaitForLedgerCommit &&
checkpoint.flowState.flowIORequest.hash == transactionId
}
private fun softShutdownTransition(): TransitionResult { private fun softShutdownTransition(): TransitionResult {
val lastState = startingState.copy(isRemoved = true) val lastState = startingState.copy(isRemoved = true)
return TransitionResult( return TransitionResult(

@ -68,13 +68,13 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
fun resumeFlowLogic(result: Any?): FlowContinuation { fun resumeFlowLogic(result: Any?): FlowContinuation {
actions.add(Action.CreateTransaction) actions.add(Action.CreateTransaction)
currentState = currentState.copy(isFlowResumed = true) currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false)
return FlowContinuation.Resume(result) return FlowContinuation.Resume(result)
} }
fun resumeFlowLogic(result: Throwable): FlowContinuation { fun resumeFlowLogic(result: Throwable): FlowContinuation {
actions.add(Action.CreateTransaction) actions.add(Action.CreateTransaction)
currentState = currentState.copy(isFlowResumed = true) currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false)
return FlowContinuation.Throw(result) return FlowContinuation.Throw(result)
} }
} }

@ -80,7 +80,7 @@ class UniquenessProviderTests(
} }
/* /*
There are 6 types of transactions to test: There are 7 types of transaction to test:
A B C D E F G A B C D E F G
================== === === === === === === === ================== === === === === === === ===
@ -95,27 +95,91 @@ class UniquenessProviderTests(
/* Group A: only time window */ /* Group A: only time window */
@Test(timeout=300_000) @Test(timeout=300_000)
fun `commits transaction with valid time window`() { fun `rejects transaction before time window is valid`() {
val inputState1 = generateStateRef()
val firstTxId = SecureHash.randomSHA256() val firstTxId = SecureHash.randomSHA256()
val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes)) val timeWindow = TimeWindow.between(
val result = uniquenessProvider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get() Clock.systemUTC().instant().plus(30.minutes),
assert(result is UniquenessProvider.Result.Success) Clock.systemUTC().instant().plus(60.minutes))
val result = uniquenessProvider.commit(
emptyList(), firstTxId, identity, requestSignature, timeWindow).get()
assert(result is UniquenessProvider.Result.Failure)
val error = (result as UniquenessProvider.Result.Failure).error as NotaryError.TimeWindowInvalid
assertEquals(timeWindow, error.txTimeWindow)
// Idempotency: can re-notarise successfully later. // Once time window behaviour has changed, we should add an additional test case here to check
// that retry within time window still fails. We can't do that now because currently it will
// succeed and that will result in the past time window case succeeding too.
// Retry still fails after advancing past time window
testClock.advanceBy(90.minutes) testClock.advanceBy(90.minutes)
val result2 = uniquenessProvider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get() val result2 = uniquenessProvider.commit(
assert(result2 is UniquenessProvider.Result.Success) emptyList(), firstTxId, identity, requestSignature, timeWindow).get()
assert(result2 is UniquenessProvider.Result.Failure)
val error2 = (result2 as UniquenessProvider.Result.Failure).error as NotaryError.TimeWindowInvalid
assertEquals(timeWindow, error2.txTimeWindow)
} }
@Test(timeout=300_000) @Test(timeout=300_000)
fun `rejects transaction with invalid time window`() { fun `commits transaction within time window`() {
val inputState1 = generateStateRef()
val firstTxId = SecureHash.randomSHA256() val firstTxId = SecureHash.randomSHA256()
val invalidTimeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().minus(30.minutes)) val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes))
val result = uniquenessProvider.commit(listOf(inputState1), firstTxId, identity, requestSignature, invalidTimeWindow).get() val result = uniquenessProvider.commit(
emptyList(), firstTxId, identity, requestSignature, timeWindow).get()
assert(result is UniquenessProvider.Result.Success)
// Retry is successful whilst still within time window
testClock.advanceBy(10.minutes)
val result2 = uniquenessProvider.commit(
emptyList(), firstTxId, identity, requestSignature, timeWindow).get()
assert(result2 is UniquenessProvider.Result.Success)
// Retry is successful after time window has expired
testClock.advanceBy(80.minutes)
val result3 = uniquenessProvider.commit(
emptyList(), firstTxId, identity, requestSignature, timeWindow).get()
assert(result3 is UniquenessProvider.Result.Success)
}
@Test(timeout=300_000)
fun `rejects transaction after time window has expired`() {
val firstTxId = SecureHash.randomSHA256()
val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().minus(30.minutes))
val result = uniquenessProvider.commit(
emptyList(), firstTxId, identity, requestSignature, timeWindow).get()
assert(result is UniquenessProvider.Result.Failure)
val error = (result as UniquenessProvider.Result.Failure).error as NotaryError.TimeWindowInvalid val error = (result as UniquenessProvider.Result.Failure).error as NotaryError.TimeWindowInvalid
assertEquals(invalidTimeWindow, error.txTimeWindow) assertEquals(timeWindow, error.txTimeWindow)
// Retry still fails at a later time
testClock.advanceBy(10.minutes)
val result2 = uniquenessProvider.commit(
emptyList(), firstTxId, identity, requestSignature, timeWindow).get()
assert(result2 is UniquenessProvider.Result.Failure)
val error2 = (result2 as UniquenessProvider.Result.Failure).error as NotaryError.TimeWindowInvalid
assertEquals(timeWindow, error2.txTimeWindow)
}
@Test(timeout=300_000)
fun `time window only transactions are processed correctly when duplicate requests occur in succession`() {
val firstTxId = SecureHash.randomSHA256()
val secondTxId = SecureHash.randomSHA256()
val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes))
val invalidTimeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().minus(30.minutes))
val validFuture1 = uniquenessProvider.commit(
emptyList(), firstTxId, identity, requestSignature, timeWindow)
val validFuture2 = uniquenessProvider.commit(
emptyList(), firstTxId, identity, requestSignature, timeWindow)
val invalidFuture1 = uniquenessProvider.commit(
emptyList(), secondTxId, identity, requestSignature, invalidTimeWindow)
val invalidFuture2 = uniquenessProvider.commit(
emptyList(), secondTxId, identity, requestSignature, invalidTimeWindow)
// Ensure that transactions are processed correctly and duplicates get the same responses to original
assert(validFuture1.get() is UniquenessProvider.Result.Success)
assert(validFuture2.get() is UniquenessProvider.Result.Success)
assert(invalidFuture1.get() is UniquenessProvider.Result.Failure)
assert(invalidFuture2.get() is UniquenessProvider.Result.Failure)
} }
/* Group B: only reference states */ /* Group B: only reference states */
@ -154,6 +218,52 @@ class UniquenessProviderTests(
assertEquals(StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE, conflictCause.type) assertEquals(StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE, conflictCause.type)
} }
@Test(timeout=300_000)
fun `commits retry transaction when reference states were spent since initial transaction`() {
val firstTxId = SecureHash.randomSHA256()
val referenceState = generateStateRef()
val result = uniquenessProvider.commit(
emptyList(), firstTxId, identity, requestSignature, references = listOf(referenceState))
.get()
assert(result is UniquenessProvider.Result.Success)
// Spend reference state
val secondTxId = SecureHash.randomSHA256()
val result2 = uniquenessProvider.commit(
listOf(referenceState), secondTxId, identity, requestSignature, references = emptyList())
.get()
assert(result2 is UniquenessProvider.Result.Success)
// Retry referencing the now spent state still succeeds
val result3 = uniquenessProvider.commit(
emptyList(), firstTxId, identity, requestSignature, references = listOf(referenceState))
.get()
assert(result3 is UniquenessProvider.Result.Success)
}
@Test(timeout=300_000)
fun `reference state only transactions are processed correctly when duplicate requests occur in succession`() {
val firstTxId = SecureHash.randomSHA256()
val secondTxId = SecureHash.randomSHA256()
val referenceState = generateStateRef()
val validFuture3 = uniquenessProvider.commit(
emptyList(), firstTxId, identity, requestSignature, references = listOf(referenceState))
val validFuture4 = uniquenessProvider.commit(
emptyList(), firstTxId, identity, requestSignature, references = listOf(referenceState))
val validFuture1 = uniquenessProvider.commit(
emptyList(), secondTxId, identity, requestSignature, references = listOf(referenceState))
val validFuture2 = uniquenessProvider.commit(
emptyList(), secondTxId, identity, requestSignature, references = listOf(referenceState))
// Ensure that transactions are processed correctly and duplicates get the same responses to original
assert(validFuture1.get() is UniquenessProvider.Result.Success)
assert(validFuture2.get() is UniquenessProvider.Result.Success)
assert(validFuture3.get() is UniquenessProvider.Result.Success)
assert(validFuture4.get() is UniquenessProvider.Result.Success)
}
/* Group C: reference states & time window */ /* Group C: reference states & time window */
@Test(timeout=300_000) @Test(timeout=300_000)
@ -262,6 +372,28 @@ class UniquenessProviderTests(
assertEquals(firstTxId.sha256(), conflictCause.hashOfTransactionId) assertEquals(firstTxId.sha256(), conflictCause.hashOfTransactionId)
} }
@Test(timeout=300_000)
fun `input state only transactions are processed correctly when duplicate requests occur in succession`() {
val firstTxId = SecureHash.randomSHA256()
val secondTxId = SecureHash.randomSHA256()
val inputState = generateStateRef()
val validFuture1 = uniquenessProvider.commit(
listOf(inputState), firstTxId, identity, requestSignature)
val validFuture2 = uniquenessProvider.commit(
listOf(inputState), firstTxId, identity, requestSignature)
val invalidFuture1 = uniquenessProvider.commit(
listOf(inputState), secondTxId, identity, requestSignature)
val invalidFuture2 = uniquenessProvider.commit(
listOf(inputState), secondTxId, identity, requestSignature)
// Ensure that transactions are processed correctly and duplicates get the same responses to original
assert(validFuture1.get() is UniquenessProvider.Result.Success)
assert(validFuture2.get() is UniquenessProvider.Result.Success)
assert(invalidFuture1.get() is UniquenessProvider.Result.Failure)
assert(invalidFuture2.get() is UniquenessProvider.Result.Failure)
}
/* Group E: input states & time window */ /* Group E: input states & time window */
@Test(timeout=300_000) @Test(timeout=300_000)
@ -346,6 +478,37 @@ class UniquenessProviderTests(
assert(result2 is UniquenessProvider.Result.Success) assert(result2 is UniquenessProvider.Result.Success)
} }
@Test(timeout=300_000)
fun `re-notarise after reference state is spent`() {
val firstTxId = SecureHash.randomSHA256()
val inputState = generateStateRef()
val referenceState = generateStateRef()
val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes))
val result = uniquenessProvider.commit(
listOf(inputState), firstTxId, identity, requestSignature, timeWindow, references = listOf(referenceState))
.get()
assert(result is UniquenessProvider.Result.Success)
// Spend the reference state.
val referenceSpend = uniquenessProvider.commit(
listOf(referenceState),
SecureHash.randomSHA256(),
identity,
requestSignature,
timeWindow,
emptyList()).get()
assert(referenceSpend is UniquenessProvider.Result.Success)
// Idempotency: can re-notarise successfully
testClock.advanceBy(90.minutes)
val result2 = uniquenessProvider.commit(
listOf(inputState), firstTxId, identity, requestSignature, timeWindow, references = listOf(referenceState))
.get()
// Known failure - this should return success. Will be fixed in a future release.
assert(result2 is UniquenessProvider.Result.Failure)
}
@Test(timeout=300_000) @Test(timeout=300_000)
fun `rejects transaction with unused reference states and used input states`() { fun `rejects transaction with unused reference states and used input states`() {
val firstTxId = SecureHash.randomSHA256() val firstTxId = SecureHash.randomSHA256()
@ -387,6 +550,31 @@ class UniquenessProviderTests(
assertEquals(StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE, conflictCause.type) assertEquals(StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE, conflictCause.type)
} }
@Test(timeout=300_000)
fun `input and reference state transactions are processed correctly when duplicate requests occur in succession`() {
val firstTxId = SecureHash.randomSHA256()
val secondTxId = SecureHash.randomSHA256()
val referenceState = generateStateRef()
// Ensure batch contains duplicates
val validFuture1 = uniquenessProvider.commit(
emptyList(), secondTxId, identity, requestSignature, references = listOf(referenceState))
val validFuture2 = uniquenessProvider.commit(
emptyList(), secondTxId, identity, requestSignature, references = listOf(referenceState))
val validFuture3 = uniquenessProvider.commit(
listOf(referenceState), firstTxId, identity, requestSignature)
// Attempt to use the reference state after it has been consumed
val validFuture4 = uniquenessProvider.commit(
emptyList(), SecureHash.randomSHA256(), identity, requestSignature, references = listOf(referenceState))
// Ensure that transactions are processed correctly and duplicates get the same responses to original
assert(validFuture1.get() is UniquenessProvider.Result.Success)
assert(validFuture2.get() is UniquenessProvider.Result.Success)
assert(validFuture3.get() is UniquenessProvider.Result.Success)
assert(validFuture4.get() is UniquenessProvider.Result.Failure)
}
/* Group G: input, reference states and time window covered by previous tests. */ /* Group G: input, reference states and time window covered by previous tests. */
/* Transaction signing tests. */ /* Transaction signing tests. */