From f859d809c7d281e10dcb218d2c82c37c5b2e7be1 Mon Sep 17 00:00:00 2001
From: Ramzi El-Yafi <ramzi.el-yafi@r3.com>
Date: Mon, 4 Oct 2021 17:37:22 +0100
Subject: [PATCH] NAAS-295 Fix notary flow retries after ETA message sent
 (#6965)

---
 .../net/corda/core/internal/IdempotentFlow.kt       |  9 +++++++--
 .../corda/core/internal/notary/NotaryServiceFlow.kt | 13 +++++++++++--
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/core/src/main/kotlin/net/corda/core/internal/IdempotentFlow.kt b/core/src/main/kotlin/net/corda/core/internal/IdempotentFlow.kt
index ddc570bb52..9e7bd9002d 100644
--- a/core/src/main/kotlin/net/corda/core/internal/IdempotentFlow.kt
+++ b/core/src/main/kotlin/net/corda/core/internal/IdempotentFlow.kt
@@ -17,9 +17,14 @@ interface IdempotentFlow
  * next available notary cluster member.
  *
  * Note that any sub-flows called by a [TimedFlow] are assumed to be [IdempotentFlow] and will NOT have checkpoints
- * persisted. Otherwise, it wouldn't be possible to correctly reset the [TimedFlow].
+ * persisted. Otherwise, it wouldn't be possible to correctly reset the [TimedFlow]. An implication of this is that
+ * idempotent flows must not only return the same final result of the flow, but if a flow returns multiple messages
+ * the full set of messages must be returned on subsequent attempts in the same order as the first flow.
+ *
+ * An example of this would be if a notary returns an ETA message at any point, then any subsequent retries of the
+ * flow must also send such a message before returning the actual notarisation result.
  */
 // TODO: allow specifying retry settings per flow
 interface TimedFlow : IdempotentFlow {
     val isTimeoutEnabled: Boolean
-}
\ No newline at end of file
+}
diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt
index 307b675046..987b36d206 100644
--- a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt
+++ b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt
@@ -59,8 +59,17 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
             verifyTransaction(requestPayload)
 
             val eta = service.getEstimatedWaitTime(tx.inputs.size + tx.references.size)
-            if (eta > etaThreshold && counterpartyCanHandleBackPressure()) {
-                otherSideSession.send(WaitTimeUpdate(eta))
+            if (counterpartyCanHandleBackPressure()) {
+                if (eta > etaThreshold) {
+                    otherSideSession.send(WaitTimeUpdate(eta))
+                } else if (stateMachine.ourSenderUUID == null) {
+                    // This implies we are handling a flow retry. As we may have already responded
+                    // with an ETA message on a previous attempt, we must ensure a new unique
+                    // message id is used to prevent the actual response from being de-duplicated
+                    // by the client. This sleep forces an increment of the suspend component of
+                    // the message id. See Jira NAAS-295 for full details.
+                    sleep(Duration.ZERO)
+                }
             }
 
             service.commitInputStates(