CORDA-716 Unpollute MockNode with Simulation-specific code (#1945)

This commit is contained in:
Andrzej Cichocki 2017-10-25 14:33:36 +01:00 committed by GitHub
parent 5bb4601812
commit ba75146446
4 changed files with 24 additions and 48 deletions

View File

@ -15,7 +15,6 @@ import net.corda.core.serialization.deserialize
import net.corda.core.utilities.ProgressTracker
import net.corda.netmap.VisualiserViewModel.Style
import net.corda.netmap.simulation.IRSSimulation
import net.corda.netmap.simulation.Simulation
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.statemachine.SessionConfirm
import net.corda.node.services.statemachine.SessionEnd
@ -118,7 +117,7 @@ class NetworkMapVisualiser : Application() {
}
}
// Pulse all parties in a trade when the trade completes
simulation.doneSteps.observeOn(uiThread).subscribe { nodes: Collection<Simulation.SimulatedNode> ->
simulation.doneSteps.observeOn(uiThread).subscribe { nodes: Collection<MockNetwork.MockNode> ->
nodes.forEach { viewModel.nodesToWidgets[it]!!.longPulseAnim.play() }
}

View File

@ -11,6 +11,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.ProgressTracker
import net.corda.finance.utils.ScreenCoordinate
import net.corda.netmap.simulation.IRSSimulation
import net.corda.netmap.simulation.place
import net.corda.testing.chooseIdentity
import net.corda.testing.node.MockNetwork
import java.util.*

View File

@ -4,16 +4,15 @@ import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.uncheckedCast
import net.corda.core.utilities.ProgressTracker
import net.corda.finance.utils.CityDatabase
import net.corda.finance.utils.WorldMapLocation
import net.corda.irs.api.NodeInterestRates
import net.corda.node.internal.StartedNode
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.DUMMY_REGULATOR
import net.corda.testing.node.*
import net.corda.testing.node.MockNetwork.MockNode
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import rx.Observable
import rx.subjects.PublishSubject
@ -26,6 +25,8 @@ import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletableFuture.allOf
import java.util.concurrent.Future
internal val MockNode.place get() = configuration.myLegalName.locality.let { CityDatabase[it] }!!
/**
* Base class for network simulations that are based on the unit test / mock environment.
*
@ -49,24 +50,12 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
val bankLocations = listOf(Pair("London", "GB"), Pair("Frankfurt", "DE"), Pair("Rome", "IT"))
// This puts together a mock network of SimulatedNodes.
open class SimulatedNode(args: MockNodeArgs) : MockNetwork.MockNode(args) {
override val started: StartedNode<SimulatedNode>? get() = uncheckedCast(super.started)
override fun findMyLocation(): WorldMapLocation? {
return configuration.myLegalName.locality.let { CityDatabase[it] }
}
}
private object SimulatedNodeFactory : MockNetwork.Factory<SimulatedNode> {
override fun create(args: MockNodeArgs) = SimulatedNode(args)
}
object RatesOracleFactory : MockNetwork.Factory<SimulatedNode> {
object RatesOracleFactory : MockNetwork.Factory<MockNode> {
// TODO: Make a more realistic legal name
val RATES_SERVICE_NAME = CordaX500Name(organisation = "Rates Service Provider", locality = "Madrid", country = "ES")
override fun create(args: MockNodeArgs): SimulatedNode {
return object : SimulatedNode(args) {
override fun create(args: MockNodeArgs): MockNode {
return object : MockNode(args) {
override fun start() = super.start().apply {
registerInitiatedFlow(NodeInterestRates.FixQueryHandler::class.java)
registerInitiatedFlow(NodeInterestRates.FixSignHandler::class.java)
@ -84,28 +73,28 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
networkSendManuallyPumped = networkSendManuallyPumped,
threadPerNode = runAsync,
cordappPackages = listOf("net.corda.irs.contract", "net.corda.finance.contract", "net.corda.irs"))
val notary = mockNet.createNotaryNode(defaultParams.copy(legalName = DUMMY_NOTARY.name), false, SimulatedNodeFactory)
val notary = mockNet.createNotaryNode(DUMMY_NOTARY.name, false)
// TODO: Regulatory nodes don't actually exist properly, this is a last minute demo request.
// So we just fire a message at a node that doesn't know how to handle it, and it'll ignore it.
// But that's fine for visualisation purposes.
val regulators = listOf(mockNet.createUnstartedNode(defaultParams.copy(legalName = DUMMY_REGULATOR.name), SimulatedNodeFactory))
val regulators = listOf(mockNet.createUnstartedNode(defaultParams.copy(legalName = DUMMY_REGULATOR.name)))
val ratesOracle = mockNet.createUnstartedNode(defaultParams.copy(legalName = RatesOracleFactory.RATES_SERVICE_NAME), RatesOracleFactory)
// All nodes must be in one of these two lists for the purposes of the visualiser tool.
val serviceProviders: List<SimulatedNode> = listOf(notary.internals, ratesOracle)
val banks: List<SimulatedNode> = bankLocations.mapIndexed { i, (city, country) ->
val serviceProviders: List<MockNode> = listOf(notary.internals, ratesOracle)
val banks: List<MockNode> = bankLocations.mapIndexed { i, (city, country) ->
val legalName = CordaX500Name(organisation = "Bank ${'A' + i}", locality = city, country = country)
// Use deterministic seeds so the simulation is stable. Needed so that party owning keys are stable.
mockNet.createUnstartedNode(defaultParams.copy(legalName = legalName, entropyRoot = BigInteger.valueOf(i.toLong())), SimulatedNodeFactory)
mockNet.createUnstartedNode(defaultParams.copy(legalName = legalName, entropyRoot = BigInteger.valueOf(i.toLong())))
}
val clocks = (serviceProviders + regulators + banks).map { it.platformClock as TestClock }
// These are used from the network visualiser tool.
private val _allFlowSteps = PublishSubject.create<Pair<SimulatedNode, ProgressTracker.Change>>()
private val _doneSteps = PublishSubject.create<Collection<SimulatedNode>>()
private val _allFlowSteps = PublishSubject.create<Pair<MockNode, ProgressTracker.Change>>()
private val _doneSteps = PublishSubject.create<Collection<MockNode>>()
@Suppress("unused")
val allFlowSteps: Observable<Pair<SimulatedNode, ProgressTracker.Change>> = _allFlowSteps
val allFlowSteps: Observable<Pair<MockNode, ProgressTracker.Change>> = _allFlowSteps
@Suppress("unused")
val doneSteps: Observable<Collection<SimulatedNode>> = _doneSteps
val doneSteps: Observable<Collection<MockNode>> = _doneSteps
private var pumpCursor = 0
@ -133,7 +122,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
* A place for simulations to stash human meaningful text about what the node is "thinking", which might appear
* in the UI somewhere.
*/
val extraNodeLabels: MutableMap<SimulatedNode, String> = Collections.synchronizedMap(HashMap())
val extraNodeLabels: MutableMap<MockNode, String> = Collections.synchronizedMap(HashMap())
/**
* Iterates the simulation by one step.
@ -164,7 +153,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
return null
}
protected fun showProgressFor(nodes: List<StartedNode<SimulatedNode>>) {
protected fun showProgressFor(nodes: List<StartedNode<MockNode>>) {
nodes.forEach { node ->
node.smm.changes.filter { it is StateMachineManager.Change.Add }.subscribe {
linkFlowProgress(node.internals, it.logic)
@ -172,7 +161,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
}
}
private fun linkFlowProgress(node: SimulatedNode, flow: FlowLogic<*>) {
private fun linkFlowProgress(node: MockNode, flow: FlowLogic<*>) {
val pt = flow.progressTracker ?: return
pt.changes.subscribe { change: ProgressTracker.Change ->
// Runs on node thread.
@ -181,14 +170,14 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
}
protected fun showConsensusFor(nodes: List<SimulatedNode>) {
protected fun showConsensusFor(nodes: List<MockNode>) {
val node = nodes.first()
node.started!!.smm.changes.filter { it is StateMachineManager.Change.Add }.first().subscribe {
linkConsensus(nodes, it.logic)
}
}
private fun linkConsensus(nodes: Collection<SimulatedNode>, flow: FlowLogic<*>) {
private fun linkConsensus(nodes: Collection<MockNode>, flow: FlowLogic<*>) {
flow.progressTracker?.changes?.subscribe { _: ProgressTracker.Change ->
// Runs on node thread.
if (flow.progressTracker!!.currentStep == ProgressTracker.DONE) {

View File

@ -17,12 +17,10 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.finance.utils.WorldMapLocation
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.StartedNode
import net.corda.node.internal.cordapp.CordappLoader
@ -54,7 +52,6 @@ import java.math.BigInteger
import java.nio.file.Path
import java.security.KeyPair
import java.security.PublicKey
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
@ -241,9 +238,6 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
// It's OK to not have a network map service in the mock network.
override fun noNetworkMapConfigured() = doneFuture(Unit)
// There is no need to slow down the unit tests by initialising CityDatabase
open fun findMyLocation(): WorldMapLocation? = null // It's left only for NetworkVisualiserSimulation
override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1)
override fun myAddresses() = emptyList<NetworkHostAndPort>()
@ -253,13 +247,6 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
val testSerializationWhitelists by lazy { super.serializationWhitelists.toMutableList() }
override val serializationWhitelists: List<SerializationWhitelist>
get() = testSerializationWhitelists
// This does not indirect through the NodeInfo object so it can be called before the node is started.
// It is used from the network visualiser tool.
@Suppress("unused")
val place: WorldMapLocation
get() = findMyLocation()!!
private var dbCloser: (() -> Any?)? = null
override fun <T> initialiseDatabasePersistence(schemaService: SchemaService, insideTransaction: () -> T) = super.initialiseDatabasePersistence(schemaService) {
dbCloser = database::close
@ -430,12 +417,12 @@ fun network(nodesCount: Int, action: MockNetwork.(nodes: List<StartedNode<MockNe
}
/**
* Extend this class in order to intercept and modify messages passing through the [MessagingService] when using the [InMemoryNetwork].
* Extend this class in order to intercept and modify messages passing through the [MessagingService] when using the [InMemoryMessagingNetwork].
*/
open class MessagingServiceSpy(val messagingService: MessagingService) : MessagingService by messagingService
/**
* Attach a [MessagingServiceSpy] to the [MockNode] allowing interception and modification of messages.
* Attach a [MessagingServiceSpy] to the [MockNetwork.MockNode] allowing interception and modification of messages.
*/
fun StartedNode<MockNetwork.MockNode>.setMessagingServiceSpy(messagingServiceSpy: MessagingServiceSpy) {
internals.setMessagingServiceSpy(messagingServiceSpy)