mirror of
https://github.com/corda/corda.git
synced 2025-02-22 10:10:59 +00:00
Testing: expose a future from the Simulation.start method to let you find out when the simulation has finished (if it finishes at all).
Add a simple test that just forces the IRS simulation through to completion (no real checks on the output).
This commit is contained in:
parent
8bcc6bdf1c
commit
f9920cbc28
@ -1,6 +1,7 @@
|
|||||||
package core.testing
|
package core.testing
|
||||||
|
|
||||||
import com.fasterxml.jackson.module.kotlin.readValue
|
import com.fasterxml.jackson.module.kotlin.readValue
|
||||||
|
import com.google.common.util.concurrent.FutureCallback
|
||||||
import com.google.common.util.concurrent.Futures
|
import com.google.common.util.concurrent.Futures
|
||||||
import com.google.common.util.concurrent.ListenableFuture
|
import com.google.common.util.concurrent.ListenableFuture
|
||||||
import com.google.common.util.concurrent.SettableFuture
|
import com.google.common.util.concurrent.SettableFuture
|
||||||
@ -28,26 +29,40 @@ class IRSSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork
|
|||||||
|
|
||||||
private val executeOnNextIteration = Collections.synchronizedList(LinkedList<() -> Unit>())
|
private val executeOnNextIteration = Collections.synchronizedList(LinkedList<() -> Unit>())
|
||||||
|
|
||||||
override fun startMainSimulation() {
|
override fun startMainSimulation(): ListenableFuture<Unit> {
|
||||||
|
val future = SettableFuture.create<Unit>()
|
||||||
startIRSDealBetween(0, 1).success {
|
startIRSDealBetween(0, 1).success {
|
||||||
// Next iteration is a pause.
|
// Next iteration is a pause.
|
||||||
executeOnNextIteration.add {}
|
executeOnNextIteration.add {}
|
||||||
executeOnNextIteration.add {
|
executeOnNextIteration.add {
|
||||||
// Keep fixing until there's no more left to do.
|
// Keep fixing until there's no more left to do.
|
||||||
doNextFixing(0, 1)?.addListener(object : Runnable {
|
val initialFixFuture = doNextFixing(0, 1)
|
||||||
override fun run() {
|
|
||||||
|
Futures.addCallback(initialFixFuture, object : FutureCallback<Unit> {
|
||||||
|
override fun onFailure(t: Throwable) {
|
||||||
|
future.setException(t) // Propagate the error.
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onSuccess(result: Unit?) {
|
||||||
// Pause for an iteration.
|
// Pause for an iteration.
|
||||||
executeOnNextIteration.add {}
|
executeOnNextIteration.add {}
|
||||||
executeOnNextIteration.add {
|
executeOnNextIteration.add {
|
||||||
doNextFixing(0, 1)?.addListener(this, RunOnCallerThread)
|
val f = doNextFixing(0, 1)
|
||||||
|
if (f != null) {
|
||||||
|
Futures.addCallback(f, this, RunOnCallerThread)
|
||||||
|
} else {
|
||||||
|
// All done!
|
||||||
|
future.set(Unit)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, RunOnCallerThread)
|
}, RunOnCallerThread)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return future
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun doNextFixing(i: Int, j: Int): ListenableFuture<*>? {
|
private fun doNextFixing(i: Int, j: Int): ListenableFuture<Unit>? {
|
||||||
println("Doing a fixing between $i and $j")
|
println("Doing a fixing between $i and $j")
|
||||||
val node1: SimulatedNode = banks[i]
|
val node1: SimulatedNode = banks[i]
|
||||||
val node2: SimulatedNode = banks[j]
|
val node2: SimulatedNode = banks[j]
|
||||||
@ -77,12 +92,14 @@ class IRSSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork
|
|||||||
// We have to start the protocols in separate iterations, as adding to the SMM effectively 'iterates' that node
|
// We have to start the protocols in separate iterations, as adding to the SMM effectively 'iterates' that node
|
||||||
// in the simulation, so if we don't do this then the two sides seem to act simultaneously.
|
// in the simulation, so if we don't do this then the two sides seem to act simultaneously.
|
||||||
|
|
||||||
val retFuture = SettableFuture.create<Any>()
|
val retFuture = SettableFuture.create<Unit>()
|
||||||
val futA = node1.smm.add("floater", sideA)
|
val futA = node1.smm.add("floater", sideA)
|
||||||
executeOnNextIteration += {
|
executeOnNextIteration += {
|
||||||
val futB = node2.smm.add("fixer", sideB)
|
val futB = node2.smm.add("fixer", sideB)
|
||||||
Futures.allAsList(futA, futB).then {
|
Futures.allAsList(futA, futB) success {
|
||||||
retFuture.set(null)
|
retFuture.set(null)
|
||||||
|
} failure { throwable ->
|
||||||
|
retFuture.setException(throwable)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return retFuture
|
return retFuture
|
||||||
@ -102,11 +119,6 @@ class IRSSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork
|
|||||||
irs.fixedLeg.fixedRatePayer = node1.info.identity
|
irs.fixedLeg.fixedRatePayer = node1.info.identity
|
||||||
irs.floatingLeg.floatingRatePayer = node2.info.identity
|
irs.floatingLeg.floatingRatePayer = node2.info.identity
|
||||||
|
|
||||||
if (irs.fixedLeg.effectiveDate < irs.floatingLeg.effectiveDate)
|
|
||||||
currentDay = irs.fixedLeg.effectiveDate
|
|
||||||
else
|
|
||||||
currentDay = irs.floatingLeg.effectiveDate
|
|
||||||
|
|
||||||
val sessionID = random63BitValue()
|
val sessionID = random63BitValue()
|
||||||
|
|
||||||
val instigator = TwoPartyDealProtocol.Instigator(node2.net.myAddress, notary.info,
|
val instigator = TwoPartyDealProtocol.Instigator(node2.net.myAddress, notary.info,
|
||||||
|
@ -128,7 +128,7 @@ abstract class Simulation(val runAsync: Boolean,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val network = MockNetwork(false)
|
val network = MockNetwork(runAsync)
|
||||||
// This one must come first.
|
// This one must come first.
|
||||||
val networkMap: SimulatedNode
|
val networkMap: SimulatedNode
|
||||||
= network.createNode(null, nodeFactory = NetworkMapNodeFactory, advertisedServices = NetworkMapService.Type) as SimulatedNode
|
= network.createNode(null, nodeFactory = NetworkMapNodeFactory, advertisedServices = NetworkMapService.Type) as SimulatedNode
|
||||||
@ -212,19 +212,19 @@ abstract class Simulation(val runAsync: Boolean,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun start() {
|
fun start(): ListenableFuture<Unit> {
|
||||||
network.startNodes()
|
network.startNodes()
|
||||||
// Wait for all the nodes to have finished registering with the network map service.
|
// Wait for all the nodes to have finished registering with the network map service.
|
||||||
Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture }).then {
|
val startup: ListenableFuture<List<Unit>> = Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture })
|
||||||
startMainSimulation()
|
return Futures.transformAsync(startup) { l: List<Unit>? -> startMainSimulation() }
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sub-classes should override this to trigger whatever they want to simulate. This method will be invoked once the
|
* Sub-classes should override this to trigger whatever they want to simulate. This method will be invoked once the
|
||||||
* network bringup has been simulated.
|
* network bringup has been simulated.
|
||||||
*/
|
*/
|
||||||
protected open fun startMainSimulation() {
|
protected open fun startMainSimulation(): ListenableFuture<Unit> {
|
||||||
|
return Futures.immediateFuture(Unit)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun stop() {
|
fun stop() {
|
||||||
|
@ -3,11 +3,12 @@ package core.testing
|
|||||||
import com.google.common.util.concurrent.Futures
|
import com.google.common.util.concurrent.Futures
|
||||||
import com.google.common.util.concurrent.ListenableFuture
|
import com.google.common.util.concurrent.ListenableFuture
|
||||||
import contracts.CommercialPaper
|
import contracts.CommercialPaper
|
||||||
import core.*
|
|
||||||
import core.contracts.DOLLARS
|
import core.contracts.DOLLARS
|
||||||
import core.contracts.SignedTransaction
|
import core.contracts.SignedTransaction
|
||||||
|
import core.days
|
||||||
import core.node.subsystems.NodeWalletService
|
import core.node.subsystems.NodeWalletService
|
||||||
import core.utilities.BriefLogFormatter
|
import core.random63BitValue
|
||||||
|
import core.seconds
|
||||||
import protocols.TwoPartyTradeProtocol
|
import protocols.TwoPartyTradeProtocol
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
@ -16,9 +17,9 @@ import java.time.Instant
|
|||||||
* then B and C trade with each other, then C and A etc).
|
* then B and C trade with each other, then C and A etc).
|
||||||
*/
|
*/
|
||||||
class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(runAsync, latencyInjector) {
|
class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(runAsync, latencyInjector) {
|
||||||
override fun startMainSimulation() {
|
override fun startMainSimulation(): ListenableFuture<Unit> {
|
||||||
BriefLogFormatter.loggingOn("bank", "core.contract.TransactionGroup", "recordingmap")
|
|
||||||
startTradingCircle { i, j -> tradeBetween(i, j) }
|
startTradingCircle { i, j -> tradeBetween(i, j) }
|
||||||
|
return Futures.immediateFailedFuture(UnsupportedOperationException("This future never completes"))
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun tradeBetween(buyerBankIndex: Int, sellerBankIndex: Int): ListenableFuture<MutableList<SignedTransaction>> {
|
private fun tradeBetween(buyerBankIndex: Int, sellerBankIndex: Int): ListenableFuture<MutableList<SignedTransaction>> {
|
||||||
|
23
src/test/kotlin/core/testing/IRSSimulationTest.kt
Normal file
23
src/test/kotlin/core/testing/IRSSimulationTest.kt
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
package core.testing
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables
|
||||||
|
import core.utilities.BriefLogFormatter
|
||||||
|
import org.junit.Test
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test doesn't check anything except that the simulation finishes and there are no exceptions at any point.
|
||||||
|
* The details of the IRS contract are verified in other unit tests.
|
||||||
|
*/
|
||||||
|
class IRSSimulationTest {
|
||||||
|
@Test fun `runs to completion`() {
|
||||||
|
BriefLogFormatter.initVerbose("messaging")
|
||||||
|
val sim = IRSSimulation(false, null)
|
||||||
|
val future = sim.start()
|
||||||
|
while (!future.isDone) sim.iterate()
|
||||||
|
try {
|
||||||
|
future.get()
|
||||||
|
} catch(e: Throwable) {
|
||||||
|
throw Throwables.getRootCause(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user