Fix race in IntegrationTestingTutorial. (#594)

This commit is contained in:
Andrzej Cichocki 2017-04-27 09:15:12 +01:00 committed by GitHub
parent 3fcb773a31
commit b3894fa38a
4 changed files with 11 additions and 22 deletions

View File

@ -275,11 +275,7 @@ class TransientProperty<out T>(private val initializer: () -> T) {
@Transient private var v: T? = null @Transient private var v: T? = null
@Synchronized @Synchronized
operator fun getValue(thisRef: Any?, property: KProperty<*>): T { operator fun getValue(thisRef: Any?, property: KProperty<*>) = v ?: initializer().also { v = it }
if (v == null)
v = initializer()
return v!!
}
} }
/** /**

View File

@ -65,7 +65,7 @@ object LogHelper {
/** /**
* May fail to restore the original level due to unavoidable race if called by multiple threads. * May fail to restore the original level due to unavoidable race if called by multiple threads.
*/ */
inline fun <T> withLevel(logName: String, levelName: String, block: () -> T) = let { inline fun <T> withLevel(logName: String, levelName: String, block: () -> T) = run {
val level = Level.valueOf(levelName) val level = Level.valueOf(levelName)
val oldLevel = LogManager.getLogger(logName).level val oldLevel = LogManager.getLogger(logName).level
Configurator.setLevel(logName, level) Configurator.setLevel(logName, level)

View File

@ -60,7 +60,7 @@ class IntegrationTestingTutorial {
// START 4 // START 4
val issueRef = OpaqueBytes.of(0) val issueRef = OpaqueBytes.of(0)
val futures = Stack<ListenableFuture<*>>() val futures = Stack<ListenableFuture<*>>()
for (i in 1 .. 10) { (1..10).map { i ->
thread { thread {
futures.push(aliceProxy.startFlow(::CashIssueFlow, futures.push(aliceProxy.startFlow(::CashIssueFlow,
i.DOLLARS, i.DOLLARS,
@ -69,14 +69,12 @@ class IntegrationTestingTutorial {
notary.nodeInfo.notaryIdentity notary.nodeInfo.notaryIdentity
).returnValue) ).returnValue)
} }
} }.forEach(Thread::join) // Ensure the stack of futures is populated.
while (!futures.empty()) { futures.forEach { it.getOrThrow() }
futures.pop().getOrThrow()
}
bobVaultUpdates.expectEvents { bobVaultUpdates.expectEvents {
parallel( parallel(
(1 .. 10).map { i -> (1..10).map { i ->
expect( expect(
match = { update: Vault.Update -> match = { update: Vault.Update ->
(update.produced.first().state.data as Cash.State).amount.quantity == i * 100L (update.produced.first().state.data as Cash.State).amount.quantity == i * 100L
@ -90,13 +88,13 @@ class IntegrationTestingTutorial {
// END 4 // END 4
// START 5 // START 5
for (i in 1 .. 10) { for (i in 1..10) {
bobProxy.startFlow(::CashPaymentFlow, i.DOLLARS, alice.nodeInfo.legalIdentity).returnValue.getOrThrow() bobProxy.startFlow(::CashPaymentFlow, i.DOLLARS, alice.nodeInfo.legalIdentity).returnValue.getOrThrow()
} }
aliceVaultUpdates.expectEvents { aliceVaultUpdates.expectEvents {
sequence( sequence(
(1 .. 10).map { i -> (1..10).map { i ->
expect { update: Vault.Update -> expect { update: Vault.Update ->
println("Alice got vault update of $update") println("Alice got vault update of $update")
assertEquals((update.produced.first().state.data as Cash.State).amount.quantity, i * 100L) assertEquals((update.produced.first().state.data as Cash.State).amount.quantity, i * 100L)

View File

@ -87,13 +87,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Transient private var _resultFuture: SettableFuture<R>? = SettableFuture.create<R>() @Transient private var _resultFuture: SettableFuture<R>? = SettableFuture.create<R>()
/** This future will complete when the call method returns. */ /** This future will complete when the call method returns. */
override val resultFuture: ListenableFuture<R> get() { override val resultFuture: ListenableFuture<R>
return _resultFuture ?: run { get() = _resultFuture ?: SettableFuture.create<R>().also { _resultFuture = it }
val f = SettableFuture.create<R>()
_resultFuture = f
return f
}
}
// This state IS serialised, as we need it to know what the fiber is waiting for. // This state IS serialised, as we need it to know what the fiber is waiting for.
internal val openSessions = HashMap<Pair<FlowLogic<*>, Party>, FlowSession>() internal val openSessions = HashMap<Pair<FlowLogic<*>, Party>, FlowSession>()
@ -456,7 +451,7 @@ private data class FlowHandleImpl<A>(
} }
@CordaSerializable @CordaSerializable
private data class FlowProgressHandleImpl<A> ( private data class FlowProgressHandleImpl<A>(
override val id: StateMachineRunId, override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>, override val returnValue: ListenableFuture<A>,
override val progress: Observable<String>) : FlowProgressHandle<A> { override val progress: Observable<String>) : FlowProgressHandle<A> {