CORDA-1622 Correct broken de-dup header logic on node restart for res… (#3352)

* CORDA-1622 Correct broken de-dup header logic on node restart for restored flows.

* Missed a volatile
This commit is contained in:
Rick Parker
2018-06-12 18:00:44 +01:00
committed by GitHub
parent 92889cef5c
commit 2f737cd6bc
2 changed files with 73 additions and 27 deletions

View File

@ -345,8 +345,7 @@ class SingleThreadedStateMachineManager(
checkpoint = checkpoint, checkpoint = checkpoint,
initialDeduplicationHandler = null, initialDeduplicationHandler = null,
isAnyCheckpointPersisted = true, isAnyCheckpointPersisted = true,
isStartIdempotent = false, isStartIdempotent = false
senderUUID = null
) )
} else { } else {
// Just flow initiation message // Just flow initiation message
@ -652,8 +651,7 @@ class SingleThreadedStateMachineManager(
checkpoint: Checkpoint, checkpoint: Checkpoint,
isAnyCheckpointPersisted: Boolean, isAnyCheckpointPersisted: Boolean,
isStartIdempotent: Boolean, isStartIdempotent: Boolean,
initialDeduplicationHandler: DeduplicationHandler?, initialDeduplicationHandler: DeduplicationHandler?
senderUUID: String? = ourSenderUUID
): Flow { ): Flow {
val flowState = checkpoint.flowState val flowState = checkpoint.flowState
val resultFuture = openFuture<Any?>() val resultFuture = openFuture<Any?>()
@ -669,7 +667,7 @@ class SingleThreadedStateMachineManager(
isStartIdempotent = isStartIdempotent, isStartIdempotent = isStartIdempotent,
isRemoved = false, isRemoved = false,
flowLogic = logic, flowLogic = logic,
senderUUID = senderUUID senderUUID = null
) )
val fiber = FlowStateMachineImpl(id, logic, scheduler) val fiber = FlowStateMachineImpl(id, logic, scheduler)
fiber.transientValues = TransientReference(createTransientValues(id, resultFuture)) fiber.transientValues = TransientReference(createTransientValues(id, resultFuture))
@ -688,7 +686,7 @@ class SingleThreadedStateMachineManager(
isStartIdempotent = isStartIdempotent, isStartIdempotent = isStartIdempotent,
isRemoved = false, isRemoved = false,
flowLogic = fiber.logic, flowLogic = fiber.logic,
senderUUID = senderUUID senderUUID = null
) )
fiber.transientValues = TransientReference(createTransientValues(id, resultFuture)) fiber.transientValues = TransientReference(createTransientValues(id, resultFuture))
fiber.transientState = TransientReference(state) fiber.transientState = TransientReference(state)

View File

@ -29,6 +29,7 @@ import org.junit.Before
import org.junit.Test import org.junit.Test
import java.sql.SQLException import java.sql.SQLException
import java.time.Duration import java.time.Duration
import java.util.*
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertNotNull import kotlin.test.assertNotNull
import kotlin.test.assertNull import kotlin.test.assertNull
@ -47,6 +48,7 @@ class RetryFlowMockTest {
RetryFlow.count = 0 RetryFlow.count = 0
SendAndRetryFlow.count = 0 SendAndRetryFlow.count = 0
RetryInsertFlow.count = 0 RetryInsertFlow.count = 0
KeepSendingFlow.count = 0
} }
private fun <T> StartedNode<MockNode>.startFlow(logic: FlowLogic<T>): CordaFuture<T> { private fun <T> StartedNode<MockNode>.startFlow(logic: FlowLogic<T>): CordaFuture<T> {
@ -74,7 +76,7 @@ class RetryFlowMockTest {
@Test @Test
fun `Retry does not set senderUUID`() { fun `Retry does not set senderUUID`() {
val messagesSent = mutableListOf<Message>() val messagesSent = Collections.synchronizedList(mutableListOf<Message>())
val partyB = nodeB.info.legalIdentities.first() val partyB = nodeB.info.legalIdentities.first()
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy(nodeA.network) { nodeA.setMessagingServiceSpy(object : MessagingServiceSpy(nodeA.network) {
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) { override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
@ -88,6 +90,38 @@ class RetryFlowMockTest {
assertEquals(2, SendAndRetryFlow.count) assertEquals(2, SendAndRetryFlow.count)
} }
@Test
fun `Restart does not set senderUUID`() {
val messagesSent = Collections.synchronizedList(mutableListOf<Message>())
val partyB = nodeB.info.legalIdentities.first()
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy(nodeA.network) {
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
messagesSent.add(message)
messagingService.send(message, target)
}
})
val count = 10000 // Lots of iterations so the flow keeps going long enough
nodeA.startFlow(KeepSendingFlow(count, partyB))
while (messagesSent.size < 1) {
Thread.sleep(10)
}
assertNotNull(messagesSent.first().senderUUID)
nodeA = mockNet.restartNode(nodeA)
// This is a bit racy because restarting the node actually starts it, so we need to make sure there's enough iterations we get here with flow still going.
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy(nodeA.network) {
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
messagesSent.add(message)
messagingService.send(message, target)
}
})
// Now short circuit the iterations so the flow finishes soon.
KeepSendingFlow.count = count - 2
while (nodeA.smm.allStateMachines.size > 0) {
Thread.sleep(10)
}
assertNull(messagesSent.last().senderUUID)
}
@Test @Test
fun `Retry duplicate insert`() { fun `Retry duplicate insert`() {
assertEquals(Unit, nodeA.startFlow(RetryInsertFlow(1)).get()) assertEquals(Unit, nodeA.startFlow(RetryInsertFlow(1)).get())
@ -158,6 +192,7 @@ class RetryCausingError : SQLException("deadlock")
class RetryFlow(private val i: Int) : FlowLogic<Unit>() { class RetryFlow(private val i: Int) : FlowLogic<Unit>() {
companion object { companion object {
@Volatile
var count = 0 var count = 0
} }
@ -174,29 +209,10 @@ class RetryFlow(private val i: Int) : FlowLogic<Unit>() {
} }
} }
class RetryAndSleepFlow(private val i: Int) : FlowLogic<Unit>() {
companion object {
var count = 0
}
@Suspendable
override fun call() {
logger.info("Hello $count")
if (count++ < i) {
if (i == Int.MAX_VALUE) {
throw LimitedRetryCausingError()
} else {
throw RetryCausingError()
}
} else {
sleep(Duration.ofDays(1))
}
}
}
@InitiatingFlow @InitiatingFlow
class SendAndRetryFlow(private val i: Int, private val other: Party) : FlowLogic<Unit>() { class SendAndRetryFlow(private val i: Int, private val other: Party) : FlowLogic<Unit>() {
companion object { companion object {
@Volatile
var count = 0 var count = 0
} }
@ -221,8 +237,40 @@ class ReceiveFlow2(private val other: FlowSession) : FlowLogic<Unit>() {
} }
} }
@InitiatingFlow
class KeepSendingFlow(private val i: Int, private val other: Party) : FlowLogic<Unit>() {
companion object {
@Volatile
var count = 0
}
@Suspendable
override fun call() {
val session = initiateFlow(other)
session.send(i.toString())
do {
logger.info("Sending... $count")
session.send("Boo")
} while (count++ < i)
}
}
@Suppress("unused")
@InitiatedBy(KeepSendingFlow::class)
class ReceiveFlow3(private val other: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
var count = other.receive<String>().unwrap { it.toInt() }
while (count-- > 0) {
val received = other.receive<String>().unwrap { it }
logger.info("Received... $received $count")
}
}
}
class RetryInsertFlow(private val i: Int) : FlowLogic<Unit>() { class RetryInsertFlow(private val i: Int) : FlowLogic<Unit>() {
companion object { companion object {
@Volatile
var count = 0 var count = 0
} }