node: Fix race in Clock.doInterruptibly by not relying on version counter

This commit is contained in:
Andras Slemmer
2016-09-05 17:41:33 +01:00
parent ac35673074
commit fc7000c152

View File

@ -7,6 +7,8 @@ import co.paralleluniverse.strands.Strand
import co.paralleluniverse.strands.SuspendableRunnable import co.paralleluniverse.strands.SuspendableRunnable
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.then import com.r3corda.core.then
import com.r3corda.node.log
import org.slf4j.LoggerFactory
import rx.Observable import rx.Observable
import rx.Subscriber import rx.Subscriber
import rx.Subscription import rx.Subscription
@ -82,19 +84,20 @@ abstract class MutableClock : Clock() {
@Suppress("UNUSED_VALUE") // This is here due to the compiler thinking version is not used @Suppress("UNUSED_VALUE") // This is here due to the compiler thinking version is not used
@Suspendable @Suspendable
private fun Clock.doInterruptibly(runnable: SuspendableRunnable) { private fun Clock.doInterruptibly(runnable: SuspendableRunnable) {
var version = 0L
var subscription: Subscription? = null var subscription: Subscription? = null
var interruptedByMutation = false
try { try {
if (this is MutableClock) { if (this is MutableClock) {
version = this.mutationCount
val strand = Strand.currentStrand() val strand = Strand.currentStrand()
subscription = this.mutations.subscribe { strand.interrupt() } subscription = this.mutations.subscribe {
interruptedByMutation = true
strand.interrupt()
}
} }
runnable.run() runnable.run()
} catch(e: InterruptedException) { } catch(e: InterruptedException) {
// If clock has not mutated, then re-throw // If clock has not mutated, then re-throw
val newVersion = if (this is MutableClock) this.mutationCount else version if (!interruptedByMutation) {
if (newVersion == version) {
throw e throw e
} }
} finally { } finally {