diff --git a/core/src/main/kotlin/net/corda/core/internal/IdempotentFlow.kt b/core/src/main/kotlin/net/corda/core/internal/IdempotentFlow.kt index ddc570bb52..9e7bd9002d 100644 --- a/core/src/main/kotlin/net/corda/core/internal/IdempotentFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/IdempotentFlow.kt @@ -17,9 +17,14 @@ interface IdempotentFlow * next available notary cluster member. * * Note that any sub-flows called by a [TimedFlow] are assumed to be [IdempotentFlow] and will NOT have checkpoints - * persisted. Otherwise, it wouldn't be possible to correctly reset the [TimedFlow]. + * persisted. Otherwise, it wouldn't be possible to correctly reset the [TimedFlow]. An implication of this is that + * idempotent flows must not only return the same final result of the flow, but if a flow returns multiple messages + * the full set of messages must be returned on subsequent attempts in the same order as the first flow. + * + * An example of this would be if a notary returns an ETA message at any point, then any subsequent retries of the + * flow must also send such a message before returning the actual notarisation result. */ // TODO: allow specifying retry settings per flow interface TimedFlow : IdempotentFlow { val isTimeoutEnabled: Boolean -} \ No newline at end of file +} diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt index e105d945a7..e4217c93bd 100644 --- a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt @@ -58,8 +58,17 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: verifyTransaction(requestPayload) val eta = service.getEstimatedWaitTime(tx.inputs.size + tx.references.size) - if (eta > etaThreshold && counterpartyCanHandleBackPressure()) { - otherSideSession.send(WaitTimeUpdate(eta)) + if (counterpartyCanHandleBackPressure()) { + if (eta > etaThreshold) { + otherSideSession.send(WaitTimeUpdate(eta)) + } else if (stateMachine.ourSenderUUID == null) { + // This implies we are handling a flow retry. As we may have already responded + // with an ETA message on a previous attempt, we must ensure a new unique + // message id is used to prevent the actual response from being de-duplicated + // by the client. This sleep forces an increment of the suspend component of + // the message id. See Jira NAAS-295 for full details. + sleep(Duration.ZERO) + } } service.commitInputStates(