mirror of
https://github.com/corda/corda.git
synced 2025-02-22 18:12:53 +00:00
Merge pull request #6969 from corda/nandor-os-4.5-os-4.6-20211011
NAAS-300 Forward merge notary ETA fix
This commit is contained in:
commit
22f81b161d
@ -17,9 +17,14 @@ interface IdempotentFlow
|
|||||||
* next available notary cluster member.
|
* next available notary cluster member.
|
||||||
*
|
*
|
||||||
* Note that any sub-flows called by a [TimedFlow] are assumed to be [IdempotentFlow] and will NOT have checkpoints
|
* Note that any sub-flows called by a [TimedFlow] are assumed to be [IdempotentFlow] and will NOT have checkpoints
|
||||||
* persisted. Otherwise, it wouldn't be possible to correctly reset the [TimedFlow].
|
* persisted. Otherwise, it wouldn't be possible to correctly reset the [TimedFlow]. An implication of this is that
|
||||||
|
* idempotent flows must not only return the same final result of the flow, but if a flow returns multiple messages
|
||||||
|
* the full set of messages must be returned on subsequent attempts in the same order as the first flow.
|
||||||
|
*
|
||||||
|
* An example of this would be if a notary returns an ETA message at any point, then any subsequent retries of the
|
||||||
|
* flow must also send such a message before returning the actual notarisation result.
|
||||||
*/
|
*/
|
||||||
// TODO: allow specifying retry settings per flow
|
// TODO: allow specifying retry settings per flow
|
||||||
interface TimedFlow : IdempotentFlow {
|
interface TimedFlow : IdempotentFlow {
|
||||||
val isTimeoutEnabled: Boolean
|
val isTimeoutEnabled: Boolean
|
||||||
}
|
}
|
||||||
|
@ -58,8 +58,17 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
|
|||||||
verifyTransaction(requestPayload)
|
verifyTransaction(requestPayload)
|
||||||
|
|
||||||
val eta = service.getEstimatedWaitTime(tx.inputs.size + tx.references.size)
|
val eta = service.getEstimatedWaitTime(tx.inputs.size + tx.references.size)
|
||||||
if (eta > etaThreshold && counterpartyCanHandleBackPressure()) {
|
if (counterpartyCanHandleBackPressure()) {
|
||||||
otherSideSession.send(WaitTimeUpdate(eta))
|
if (eta > etaThreshold) {
|
||||||
|
otherSideSession.send(WaitTimeUpdate(eta))
|
||||||
|
} else if (stateMachine.ourSenderUUID == null) {
|
||||||
|
// This implies we are handling a flow retry. As we may have already responded
|
||||||
|
// with an ETA message on a previous attempt, we must ensure a new unique
|
||||||
|
// message id is used to prevent the actual response from being de-duplicated
|
||||||
|
// by the client. This sleep forces an increment of the suspend component of
|
||||||
|
// the message id. See Jira NAAS-295 for full details.
|
||||||
|
sleep(Duration.ZERO)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
service.commitInputStates(
|
service.commitInputStates(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user