CORDA-2683: Requests to start unknown flows can be deleted using killFlow (#4903)

Normally, these requests are left unacknowledged in the MQ broker to allow recovery via installing the missing CorDapp. If this is not applicable then the request be dropped (and the other side informed of the error).
This commit is contained in:
Shams Asari 2019-03-22 14:46:02 +00:00 committed by GitHub
parent 8ac4b33b08
commit acd2f8bce4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 9 additions and 3 deletions

View File

@ -136,7 +136,7 @@ internal class CordaRPCOpsImpl(
return snapshot
}
override fun killFlow(id: StateMachineRunId) = smm.killFlow(id)
override fun killFlow(id: StateMachineRunId): Boolean = if (smm.killFlow(id)) true else smm.flowHospital.dropSessionInit(id.uuid)
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
val (allStateMachines, changes) = smm.track()

View File

@ -55,6 +55,11 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
mutex.locked {
if (outcome != Outcome.UNTREATABLE) {
treatableSessionInits[id] = InternalSessionInitRecord(sessionMessage, event, record)
log.warn("$sender has sent a flow request for an unknown flow ${sessionMessage.initiatorFlowClassName}. Install the missing " +
"CorDapp this flow belongs to and restart.")
log.warn("If you know it's safe to ignore this flow request then it can be deleted permanently using the killFlow RPC and " +
"the UUID $id (from the node shell you can run 'flow kill $id'). BE VERY CAUTIOUS OF THIS SECOND APPROACH AS THE " +
"REQUEST MAY CONTAIN A NOTARISED TRANSACTION THAT NEEDS TO BE RECORDED IN YOUR VAULT.")
}
recordsPublisher.onNext(record)
}
@ -78,12 +83,13 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
* to send back the relevant session error to the initiator party and acknowledge its receipt from the message broker
* so that it never gets redelivered.
*/
fun dropSessionInit(id: UUID) {
fun dropSessionInit(id: UUID): Boolean {
val (sessionMessage, event, publicRecord) = mutex.locked {
requireNotNull(treatableSessionInits.remove(id)) { "$id does not refer to any session init message" }
treatableSessionInits.remove(id) ?: return false
}
log.info("Errored session-init permanently dropped: $publicRecord")
sendBackError(publicRecord.error, sessionMessage, publicRecord.sender, event)
return true
}
/**