mirror of
https://github.com/corda/corda.git
synced 2025-06-22 17:09:00 +00:00
Move test code from node to test-utils
This commit is contained in:
@ -1,145 +0,0 @@
|
||||
package com.r3corda.node.internal.testing
|
||||
|
||||
import com.fasterxml.jackson.module.kotlin.readValue
|
||||
import com.google.common.util.concurrent.FutureCallback
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.contracts.InterestRateSwap
|
||||
import com.r3corda.core.RunOnCallerThread
|
||||
import com.r3corda.core.contracts.SignedTransaction
|
||||
import com.r3corda.core.contracts.StateAndRef
|
||||
import com.r3corda.core.contracts.UniqueIdentifier
|
||||
import com.r3corda.core.failure
|
||||
import com.r3corda.core.node.services.linearHeadsOfType
|
||||
import com.r3corda.testing.node.MockIdentityService
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.success
|
||||
import com.r3corda.node.services.network.InMemoryMessagingNetwork
|
||||
import com.r3corda.node.utilities.JsonSupport
|
||||
import com.r3corda.protocols.TwoPartyDealProtocol
|
||||
import java.security.KeyPair
|
||||
import java.time.LocalDate
|
||||
import java.util.*
|
||||
|
||||
|
||||
/**
|
||||
* A simulation in which banks execute interest rate swaps with each other, including the fixing events.
|
||||
*/
|
||||
class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(networkSendManuallyPumped, runAsync, latencyInjector) {
|
||||
val om = JsonSupport.createDefaultMapper(MockIdentityService(network.identities))
|
||||
|
||||
init {
|
||||
currentDateAndTime = LocalDate.of(2016, 3, 8).atStartOfDay()
|
||||
}
|
||||
|
||||
private var nodeAKey: KeyPair? = null
|
||||
private var nodeBKey: KeyPair? = null
|
||||
|
||||
private val executeOnNextIteration = Collections.synchronizedList(LinkedList<() -> Unit>())
|
||||
|
||||
override fun startMainSimulation(): ListenableFuture<Unit> {
|
||||
val future = SettableFuture.create<Unit>()
|
||||
|
||||
nodeAKey = banks[0].keyManagement.freshKey()
|
||||
nodeBKey = banks[1].keyManagement.freshKey()
|
||||
|
||||
startIRSDealBetween(0, 1).success {
|
||||
// Next iteration is a pause.
|
||||
executeOnNextIteration.add {}
|
||||
executeOnNextIteration.add {
|
||||
// Keep fixing until there's no more left to do.
|
||||
val initialFixFuture = doNextFixing(0, 1)
|
||||
|
||||
Futures.addCallback(initialFixFuture, object : FutureCallback<Unit> {
|
||||
override fun onFailure(t: Throwable) {
|
||||
future.setException(t) // Propagate the error.
|
||||
}
|
||||
|
||||
override fun onSuccess(result: Unit?) {
|
||||
// Pause for an iteration.
|
||||
executeOnNextIteration.add {}
|
||||
executeOnNextIteration.add {
|
||||
val f = doNextFixing(0, 1)
|
||||
if (f != null) {
|
||||
Futures.addCallback(f, this, RunOnCallerThread)
|
||||
} else {
|
||||
// All done!
|
||||
future.set(Unit)
|
||||
}
|
||||
}
|
||||
}
|
||||
}, RunOnCallerThread)
|
||||
}
|
||||
}
|
||||
return future
|
||||
}
|
||||
|
||||
private fun doNextFixing(i: Int, j: Int): ListenableFuture<Unit>? {
|
||||
println("Doing a fixing between $i and $j")
|
||||
val node1: SimulatedNode = banks[i]
|
||||
val node2: SimulatedNode = banks[j]
|
||||
|
||||
val swaps: Map<UniqueIdentifier, StateAndRef<InterestRateSwap.State>> = node1.services.walletService.linearHeadsOfType<InterestRateSwap.State>()
|
||||
val theDealRef: StateAndRef<InterestRateSwap.State> = swaps.values.single()
|
||||
|
||||
// Do we have any more days left in this deal's lifetime? If not, return.
|
||||
val nextFixingDate = theDealRef.state.data.calculation.nextFixingDate() ?: return null
|
||||
extraNodeLabels[node1] = "Fixing event on $nextFixingDate"
|
||||
extraNodeLabels[node2] = "Fixing event on $nextFixingDate"
|
||||
|
||||
val retFuture = SettableFuture.create<Unit>()
|
||||
// Complete the future when the state has been consumed on both nodes
|
||||
val futA = node1.services.walletService.whenConsumed(theDealRef.ref)
|
||||
val futB = node2.services.walletService.whenConsumed(theDealRef.ref)
|
||||
Futures.allAsList(futA, futB) success {
|
||||
retFuture.set(null)
|
||||
} failure { throwable ->
|
||||
retFuture.setException(throwable)
|
||||
}
|
||||
|
||||
showProgressFor(listOf(node1, node2))
|
||||
showConsensusFor(listOf(node1, node2, regulators[0]))
|
||||
|
||||
// For some reason the first fix is always before the effective date.
|
||||
if (nextFixingDate > currentDateAndTime.toLocalDate())
|
||||
currentDateAndTime = nextFixingDate.atTime(15, 0)
|
||||
|
||||
return retFuture
|
||||
}
|
||||
|
||||
private fun startIRSDealBetween(i: Int, j: Int): ListenableFuture<SignedTransaction> {
|
||||
val node1: SimulatedNode = banks[i]
|
||||
val node2: SimulatedNode = banks[j]
|
||||
|
||||
extraNodeLabels[node1] = "Setting up deal"
|
||||
extraNodeLabels[node2] = "Setting up deal"
|
||||
|
||||
// We load the IRS afresh each time because the leg parts of the structure aren't data classes so they don't
|
||||
// have the convenient copy() method that'd let us make small adjustments. Instead they're partly mutable.
|
||||
// TODO: We should revisit this in post-Excalibur cleanup and fix, e.g. by introducing an interface.
|
||||
val irs = om.readValue<InterestRateSwap.State>(javaClass.getResource("trade.json"))
|
||||
irs.fixedLeg.fixedRatePayer = node1.info.identity
|
||||
irs.floatingLeg.floatingRatePayer = node2.info.identity
|
||||
|
||||
val sessionID = random63BitValue()
|
||||
|
||||
val instigator = TwoPartyDealProtocol.Instigator(node2.info.identity, notary.info.identity, irs, nodeAKey!!, sessionID)
|
||||
val acceptor = TwoPartyDealProtocol.Acceptor(node1.info.identity, notary.info.identity, irs, sessionID)
|
||||
|
||||
showProgressFor(listOf(node1, node2))
|
||||
showConsensusFor(listOf(node1, node2, regulators[0]))
|
||||
|
||||
val instigatorFuture: ListenableFuture<SignedTransaction> = node1.services.startProtocol("instigator", instigator)
|
||||
|
||||
return Futures.transformAsync(Futures.allAsList(instigatorFuture, node2.services.startProtocol("acceptor", acceptor))) {
|
||||
instigatorFuture
|
||||
}
|
||||
}
|
||||
|
||||
override fun iterate(): InMemoryMessagingNetwork.MessageTransfer? {
|
||||
if (executeOnNextIteration.isNotEmpty())
|
||||
executeOnNextIteration.removeAt(0)()
|
||||
return super.iterate()
|
||||
}
|
||||
}
|
@ -1,219 +0,0 @@
|
||||
package com.r3corda.node.internal.testing
|
||||
|
||||
import com.google.common.jimfs.Configuration
|
||||
import com.google.common.jimfs.Jimfs
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.PhysicalLocation
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.node.services.WalletService
|
||||
import com.r3corda.testing.node.MockIdentityService
|
||||
import com.r3corda.testing.node.makeTestDataSourceProperties
|
||||
import com.r3corda.core.testing.InMemoryWalletService
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.internal.AbstractNode
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.network.InMemoryMessagingNetwork
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import org.slf4j.Logger
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.security.KeyPair
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* A mock node brings up a suite of in-memory services in a fast manner suitable for unit testing.
|
||||
* Components that do IO are either swapped out for mocks, or pointed to a [Jimfs] in memory filesystem.
|
||||
*
|
||||
* Mock network nodes require manual pumping by default: they will not run asynchronous. This means that
|
||||
* for message exchanges to take place (and associated handlers to run), you must call the [runNetwork]
|
||||
* method.
|
||||
*
|
||||
* You can get a printout of every message sent by using code like:
|
||||
*
|
||||
* LogHelper.setLevel("+messages")
|
||||
*/
|
||||
class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
private val threadPerNode: Boolean = false,
|
||||
private val defaultFactory: Factory = MockNetwork.DefaultFactory) {
|
||||
private var counter = 0
|
||||
val filesystem = Jimfs.newFileSystem(Configuration.unix())
|
||||
val messagingNetwork = InMemoryMessagingNetwork(networkSendManuallyPumped)
|
||||
|
||||
val identities = ArrayList<Party>()
|
||||
|
||||
private val _nodes = ArrayList<MockNode>()
|
||||
/** A read only view of the current set of executing nodes. */
|
||||
val nodes: List<MockNode> = _nodes
|
||||
|
||||
init {
|
||||
Files.createDirectory(filesystem.getPath("/nodes"))
|
||||
}
|
||||
|
||||
/** Allows customisation of how nodes are created. */
|
||||
interface Factory {
|
||||
fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
|
||||
advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?): MockNode
|
||||
}
|
||||
|
||||
object DefaultFactory : Factory {
|
||||
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
|
||||
advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?): MockNode {
|
||||
return MockNode(dir, config, network, networkMapAddr, advertisedServices, id, keyPair)
|
||||
}
|
||||
}
|
||||
|
||||
open class MockNode(dir: Path, config: NodeConfiguration, val mockNet: MockNetwork, networkMapAddr: NodeInfo?,
|
||||
advertisedServices: Set<ServiceType>, val id: Int, val keyPair: KeyPair?) : AbstractNode(dir, config, networkMapAddr, advertisedServices, TestClock()) {
|
||||
override val log: Logger = loggerFor<MockNode>()
|
||||
override val serverThread: AffinityExecutor =
|
||||
if (mockNet.threadPerNode)
|
||||
AffinityExecutor.ServiceAffinityExecutor("Mock node thread", 1)
|
||||
else
|
||||
AffinityExecutor.SAME_THREAD
|
||||
|
||||
// We only need to override the messaging service here, as currently everything that hits disk does so
|
||||
// through the java.nio API which we are already mocking via Jimfs.
|
||||
|
||||
override fun makeMessagingService(): MessagingServiceInternal {
|
||||
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
|
||||
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, configuration.myLegalName).start().get()
|
||||
}
|
||||
|
||||
override fun makeIdentityService() = MockIdentityService(mockNet.identities)
|
||||
|
||||
override fun makeWalletService(): WalletService = InMemoryWalletService(services)
|
||||
|
||||
override fun startMessagingService() {
|
||||
// Nothing to do
|
||||
}
|
||||
|
||||
override fun generateKeyPair(): KeyPair = keyPair ?: super.generateKeyPair()
|
||||
|
||||
// It's OK to not have a network map service in the mock network.
|
||||
override fun noNetworkMapConfigured() = Futures.immediateFuture(Unit)
|
||||
|
||||
// There is no need to slow down the unit tests by initialising CityDatabase
|
||||
override fun findMyLocation(): PhysicalLocation? = null
|
||||
|
||||
override fun start(): MockNode {
|
||||
super.start()
|
||||
mockNet.identities.add(storage.myLegalIdentity)
|
||||
return this
|
||||
}
|
||||
|
||||
// This does not indirect through the NodeInfo object so it can be called before the node is started.
|
||||
// It is used from the network visualiser tool.
|
||||
@Suppress("unused") val place: PhysicalLocation get() = findMyLocation()!!
|
||||
}
|
||||
|
||||
/** Returns a node, optionally created by the passed factory method. */
|
||||
fun createNode(networkMapAddress: NodeInfo? = null, forcedID: Int = -1, nodeFactory: Factory = defaultFactory,
|
||||
start: Boolean = true, legalName: String? = null, keyPair: KeyPair? = null,
|
||||
databasePersistence: Boolean = false, vararg advertisedServices: ServiceType): MockNode {
|
||||
val newNode = forcedID == -1
|
||||
val id = if (newNode) counter++ else forcedID
|
||||
|
||||
val path = filesystem.getPath("/nodes/$id")
|
||||
if (newNode)
|
||||
Files.createDirectories(path.resolve("attachments"))
|
||||
val config = object : NodeConfiguration {
|
||||
override val myLegalName: String = legalName ?: "Mock Company $id"
|
||||
override val exportJMXto: String = ""
|
||||
override val nearestCity: String = "Atlantis"
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
override val dataSourceProperties: Properties get() = if (databasePersistence) makeTestDataSourceProperties("node_$id") else Properties()
|
||||
}
|
||||
val node = nodeFactory.create(path, config, this, networkMapAddress, advertisedServices.toSet(), id, keyPair)
|
||||
if (start) {
|
||||
node.setup().start()
|
||||
if (threadPerNode && networkMapAddress != null)
|
||||
node.networkMapRegistrationFuture.get() // Block and wait for the node to register in the net map.
|
||||
}
|
||||
_nodes.add(node)
|
||||
return node
|
||||
}
|
||||
|
||||
/**
|
||||
* Asks every node in order to process any queued up inbound messages. This may in turn result in nodes
|
||||
* sending more messages to each other, thus, a typical usage is to call runNetwork with the [rounds]
|
||||
* parameter set to -1 (the default) which simply runs as many rounds as necessary to result in network
|
||||
* stability (no nodes sent any messages in the last round).
|
||||
*/
|
||||
fun runNetwork(rounds: Int = -1) {
|
||||
check(!networkSendManuallyPumped)
|
||||
fun pumpAll() = messagingNetwork.endpoints.map { it.pumpReceive(false) }
|
||||
|
||||
if (rounds == -1) {
|
||||
while (pumpAll().any { it != null }) {
|
||||
}
|
||||
} else {
|
||||
repeat(rounds) {
|
||||
pumpAll()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Move this to using createSomeNodes which doesn't conflate network services with network users.
|
||||
/**
|
||||
* Sets up a two node network, in which the first node runs network map and notary services and the other
|
||||
* doesn't.
|
||||
*/
|
||||
fun createTwoNodes(nodeFactory: Factory = defaultFactory, notaryKeyPair: KeyPair? = null): Pair<MockNode, MockNode> {
|
||||
require(nodes.isEmpty())
|
||||
return Pair(
|
||||
createNode(null, -1, nodeFactory, true, null, notaryKeyPair, false, NetworkMapService.Type, SimpleNotaryService.Type),
|
||||
createNode(nodes[0].info, -1, nodeFactory, true, null)
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* A bundle that separates the generic user nodes and service-providing nodes. A real network might not be so
|
||||
* clearly separated, but this is convenient for testing.
|
||||
*/
|
||||
data class BasketOfNodes(val partyNodes: List<MockNode>, val notaryNode: MockNode, val mapNode: MockNode)
|
||||
|
||||
/**
|
||||
* Sets up a network with the requested number of nodes (defaulting to two), with one or more service nodes that
|
||||
* run a notary, network map, any oracles etc. Can't be combined with [createTwoNodes].
|
||||
*/
|
||||
fun createSomeNodes(numPartyNodes: Int = 2, nodeFactory: Factory = defaultFactory, notaryKeyPair: KeyPair? = DUMMY_NOTARY_KEY): BasketOfNodes {
|
||||
require(nodes.isEmpty())
|
||||
val mapNode = createNode(null, nodeFactory = nodeFactory, advertisedServices = NetworkMapService.Type)
|
||||
val notaryNode = createNode(mapNode.info, nodeFactory = nodeFactory, keyPair = notaryKeyPair,
|
||||
advertisedServices = SimpleNotaryService.Type)
|
||||
val nodes = ArrayList<MockNode>()
|
||||
repeat(numPartyNodes) {
|
||||
nodes += createPartyNode(mapNode.info)
|
||||
}
|
||||
return BasketOfNodes(nodes, notaryNode, mapNode)
|
||||
}
|
||||
|
||||
fun createNotaryNode(legalName: String? = null, keyPair: KeyPair? = null): MockNode {
|
||||
return createNode(null, -1, defaultFactory, true, legalName, keyPair, false, NetworkMapService.Type, SimpleNotaryService.Type)
|
||||
}
|
||||
|
||||
fun createPartyNode(networkMapAddr: NodeInfo, legalName: String? = null, keyPair: KeyPair? = null): MockNode {
|
||||
return createNode(networkMapAddr, -1, defaultFactory, true, legalName, keyPair)
|
||||
}
|
||||
|
||||
@Suppress("unused") // This is used from the network visualiser tool.
|
||||
fun addressToNode(address: SingleMessageRecipient): MockNode = nodes.single { it.net.myAddress == address }
|
||||
|
||||
fun startNodes() {
|
||||
require(nodes.isNotEmpty())
|
||||
nodes.forEach { if (!it.started) it.start() }
|
||||
}
|
||||
|
||||
fun stopNodes() {
|
||||
require(nodes.isNotEmpty())
|
||||
nodes.forEach { if (it.started) it.stop() }
|
||||
}
|
||||
}
|
@ -1,299 +0,0 @@
|
||||
package com.r3corda.node.internal.testing
|
||||
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.r3corda.core.node.CityDatabase
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.PhysicalLocation
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.then
|
||||
import com.r3corda.core.utilities.ProgressTracker
|
||||
import com.r3corda.node.services.clientapi.NodeInterestRates
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.network.InMemoryMessagingNetwork
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.nio.file.Path
|
||||
import java.security.KeyPair
|
||||
import java.time.LocalDate
|
||||
import java.time.LocalDateTime
|
||||
import java.time.ZoneOffset
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* Base class for network simulations that are based on the unit test / mock environment.
|
||||
*
|
||||
* Sets up some nodes that can run protocols between each other, and exposes their progress trackers. Provides banks
|
||||
* in a few cities around the world.
|
||||
*/
|
||||
abstract class Simulation(val networkSendManuallyPumped: Boolean,
|
||||
runAsync: Boolean,
|
||||
latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) {
|
||||
init {
|
||||
if (!runAsync && latencyInjector != null)
|
||||
throw IllegalArgumentException("The latency injector is only useful when using manual pumping.")
|
||||
}
|
||||
|
||||
val bankLocations = listOf("London", "Frankfurt", "Rome")
|
||||
|
||||
// This puts together a mock network of SimulatedNodes.
|
||||
|
||||
open class SimulatedNode(dir: Path, config: NodeConfiguration, mockNet: MockNetwork, networkMapAddress: NodeInfo?,
|
||||
advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?) : MockNetwork.MockNode(dir, config, mockNet, networkMapAddress, advertisedServices, id, keyPair) {
|
||||
override fun findMyLocation(): PhysicalLocation? = CityDatabase[configuration.nearestCity]
|
||||
}
|
||||
|
||||
inner class BankFactory : MockNetwork.Factory {
|
||||
var counter = 0
|
||||
|
||||
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
|
||||
advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode {
|
||||
val letter = 'A' + counter
|
||||
val city = bankLocations[counter++ % bankLocations.size]
|
||||
val cfg = object : NodeConfiguration {
|
||||
// TODO: Set this back to "Bank of $city" after video day.
|
||||
override val myLegalName: String = "Bank $letter"
|
||||
override val exportJMXto: String = ""
|
||||
override val nearestCity: String = city
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
}
|
||||
return SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id, keyPair)
|
||||
}
|
||||
|
||||
fun createAll(): List<SimulatedNode> = bankLocations.
|
||||
map { network.createNode(networkMap.info, start = false, nodeFactory = this) as SimulatedNode }
|
||||
}
|
||||
|
||||
val bankFactory = BankFactory()
|
||||
|
||||
object NetworkMapNodeFactory : MockNetwork.Factory {
|
||||
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork,
|
||||
networkMapAddr: NodeInfo?, advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode {
|
||||
require(advertisedServices.contains(NetworkMapService.Type))
|
||||
val cfg = object : NodeConfiguration {
|
||||
override val myLegalName: String = "Network coordination center"
|
||||
override val exportJMXto: String = ""
|
||||
override val nearestCity: String = "Amsterdam"
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
}
|
||||
|
||||
return object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id, keyPair) {}
|
||||
}
|
||||
}
|
||||
|
||||
object NotaryNodeFactory : MockNetwork.Factory {
|
||||
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
|
||||
advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode {
|
||||
require(advertisedServices.contains(SimpleNotaryService.Type))
|
||||
val cfg = object : NodeConfiguration {
|
||||
override val myLegalName: String = "Notary Service"
|
||||
override val exportJMXto: String = ""
|
||||
override val nearestCity: String = "Zurich"
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
}
|
||||
return SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id, keyPair)
|
||||
}
|
||||
}
|
||||
|
||||
object RatesOracleFactory : MockNetwork.Factory {
|
||||
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
|
||||
advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode {
|
||||
require(advertisedServices.contains(NodeInterestRates.Type))
|
||||
val cfg = object : NodeConfiguration {
|
||||
override val myLegalName: String = "Rates Service Provider"
|
||||
override val exportJMXto: String = ""
|
||||
override val nearestCity: String = "Madrid"
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
}
|
||||
|
||||
return object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id, keyPair) {
|
||||
override fun start(): MockNetwork.MockNode {
|
||||
super.start()
|
||||
findService<NodeInterestRates.Service>().upload(javaClass.getResourceAsStream("example.rates.txt"))
|
||||
return this
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object RegulatorFactory : MockNetwork.Factory {
|
||||
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
|
||||
advertisedServices: Set<ServiceType>, id: Int, keyPair: KeyPair?): MockNetwork.MockNode {
|
||||
val cfg = object : NodeConfiguration {
|
||||
override val myLegalName: String = "Regulator A"
|
||||
override val exportJMXto: String = ""
|
||||
override val nearestCity: String = "Paris"
|
||||
override val keyStorePassword: String = "dummy"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
}
|
||||
|
||||
val n = object : SimulatedNode(dir, cfg, network, networkMapAddr, advertisedServices, id, keyPair) {
|
||||
// TODO: Regulatory nodes don't actually exist properly, this is a last minute demo request.
|
||||
// So we just fire a message at a node that doesn't know how to handle it, and it'll ignore it.
|
||||
// But that's fine for visualisation purposes.
|
||||
}
|
||||
return n
|
||||
}
|
||||
}
|
||||
|
||||
val network = MockNetwork(networkSendManuallyPumped, runAsync)
|
||||
// This one must come first.
|
||||
val networkMap: SimulatedNode
|
||||
= network.createNode(null, nodeFactory = NetworkMapNodeFactory, advertisedServices = NetworkMapService.Type) as SimulatedNode
|
||||
val notary: SimulatedNode
|
||||
= network.createNode(networkMap.info, nodeFactory = NotaryNodeFactory, advertisedServices = SimpleNotaryService.Type) as SimulatedNode
|
||||
val regulators: List<SimulatedNode> = listOf(network.createNode(networkMap.info, start = false, nodeFactory = RegulatorFactory) as SimulatedNode)
|
||||
val ratesOracle: SimulatedNode
|
||||
= network.createNode(networkMap.info, start = false, nodeFactory = RatesOracleFactory, advertisedServices = NodeInterestRates.Type) as SimulatedNode
|
||||
|
||||
// All nodes must be in one of these two lists for the purposes of the visualiser tool.
|
||||
val serviceProviders: List<SimulatedNode> = listOf(notary, ratesOracle, networkMap)
|
||||
val banks: List<SimulatedNode> = bankFactory.createAll()
|
||||
|
||||
val clocks = (serviceProviders + regulators + banks).map { it.services.clock as TestClock }
|
||||
|
||||
// These are used from the network visualiser tool.
|
||||
private val _allProtocolSteps = PublishSubject.create<Pair<SimulatedNode, ProgressTracker.Change>>()
|
||||
private val _doneSteps = PublishSubject.create<Collection<SimulatedNode>>()
|
||||
@Suppress("unused") val allProtocolSteps: Observable<Pair<SimulatedNode, ProgressTracker.Change>> = _allProtocolSteps
|
||||
@Suppress("unused") val doneSteps: Observable<Collection<SimulatedNode>> = _doneSteps
|
||||
|
||||
private var pumpCursor = 0
|
||||
|
||||
/**
|
||||
* The current simulated date. By default this never changes. If you want it to change, you should do so from
|
||||
* within your overridden [iterate] call. Changes in the current day surface in the [dateChanges] observable.
|
||||
*/
|
||||
var currentDateAndTime: LocalDateTime = LocalDate.now().atStartOfDay()
|
||||
protected set(value) {
|
||||
field = value
|
||||
_dateChanges.onNext(value)
|
||||
}
|
||||
|
||||
private val _dateChanges = PublishSubject.create<LocalDateTime>()
|
||||
val dateChanges: Observable<LocalDateTime> get() = _dateChanges
|
||||
|
||||
init {
|
||||
// Advance node clocks when current time is changed
|
||||
dateChanges.subscribe {
|
||||
clocks.setTo(currentDateAndTime.toInstant(ZoneOffset.UTC))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A place for simulations to stash human meaningful text about what the node is "thinking", which might appear
|
||||
* in the UI somewhere.
|
||||
*/
|
||||
val extraNodeLabels = Collections.synchronizedMap(HashMap<SimulatedNode, String>())
|
||||
|
||||
/**
|
||||
* Iterates the simulation by one step.
|
||||
*
|
||||
* The default implementation circles around the nodes, pumping until one of them handles a message. The next call
|
||||
* will carry on from where this one stopped. In an environment where you want to take actions between anything
|
||||
* interesting happening, or control the precise speed at which things operate (beyond the latency injector), this
|
||||
* is a useful way to do things.
|
||||
*
|
||||
* @return the message that was processed, or null if no node accepted a message in this round.
|
||||
*/
|
||||
open fun iterate(): InMemoryMessagingNetwork.MessageTransfer? {
|
||||
|
||||
if (networkSendManuallyPumped) {
|
||||
network.messagingNetwork.pumpSend(false)
|
||||
}
|
||||
|
||||
// Keep going until one of the nodes has something to do, or we have checked every node.
|
||||
val endpoints = network.messagingNetwork.endpoints
|
||||
var countDown = endpoints.size
|
||||
while (countDown > 0) {
|
||||
val handledMessage = endpoints[pumpCursor].pumpReceive(false)
|
||||
if (handledMessage != null)
|
||||
return handledMessage
|
||||
// If this node had nothing to do, advance the cursor with wraparound and try again.
|
||||
pumpCursor = (pumpCursor + 1) % endpoints.size
|
||||
countDown--
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
protected fun showProgressFor(nodes: List<SimulatedNode>) {
|
||||
nodes.forEach { node ->
|
||||
node.smm.changes.filter { it.second == AddOrRemove.ADD }.first().subscribe {
|
||||
linkProtocolProgress(node, it.first)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun linkProtocolProgress(node: SimulatedNode, protocol: ProtocolLogic<*>) {
|
||||
val pt = protocol.progressTracker ?: return
|
||||
pt.changes.subscribe { change: ProgressTracker.Change ->
|
||||
// Runs on node thread.
|
||||
_allProtocolSteps.onNext(Pair(node, change))
|
||||
}
|
||||
// This isn't technically a "change" but it helps with UIs to send this notification of the first step.
|
||||
_allProtocolSteps.onNext(Pair(node, ProgressTracker.Change.Position(pt, pt.steps[1])))
|
||||
}
|
||||
|
||||
|
||||
protected fun showConsensusFor(nodes: List<SimulatedNode>) {
|
||||
val node = nodes.first()
|
||||
node.smm.changes.filter { it.second == AddOrRemove.ADD }.first().subscribe {
|
||||
linkConsensus(nodes, it.first)
|
||||
}
|
||||
}
|
||||
|
||||
private fun linkConsensus(nodes: Collection<SimulatedNode>, protocol: ProtocolLogic<*>) {
|
||||
protocol.progressTracker?.changes?.subscribe { change: ProgressTracker.Change ->
|
||||
// Runs on node thread.
|
||||
if (protocol.progressTracker!!.currentStep == ProgressTracker.DONE) {
|
||||
_doneSteps.onNext(nodes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("unused") // Used from the network visualiser tool.
|
||||
val networkInitialisationFinished: ListenableFuture<*> =
|
||||
Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture })
|
||||
|
||||
fun start(): ListenableFuture<Unit> {
|
||||
network.startNodes()
|
||||
// Wait for all the nodes to have finished registering with the network map service.
|
||||
val startup: ListenableFuture<List<Unit>> = Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture })
|
||||
return Futures.transformAsync(startup) { l: List<Unit>? -> startMainSimulation() }
|
||||
}
|
||||
|
||||
/**
|
||||
* Sub-classes should override this to trigger whatever they want to simulate. This method will be invoked once the
|
||||
* network bringup has been simulated.
|
||||
*/
|
||||
protected open fun startMainSimulation(): ListenableFuture<Unit> {
|
||||
return Futures.immediateFuture(Unit)
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
network.stopNodes()
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a function that returns a future, iterates that function with arguments like (0, 1), (1, 2), (2, 3) etc
|
||||
* each time the returned future completes.
|
||||
*/
|
||||
fun startTradingCircle(tradeBetween: (indexA: Int, indexB: Int) -> ListenableFuture<*>) {
|
||||
fun next(i: Int, j: Int) {
|
||||
tradeBetween(i, j).then {
|
||||
val ni = (i + 1) % banks.size
|
||||
val nj = (j + 1) % banks.size
|
||||
next(ni, nj)
|
||||
}
|
||||
}
|
||||
next(0, 1)
|
||||
}
|
||||
}
|
@ -1,57 +0,0 @@
|
||||
package com.r3corda.node.internal.testing
|
||||
|
||||
import com.r3corda.core.serialization.SerializeAsToken
|
||||
import com.r3corda.core.serialization.SerializeAsTokenContext
|
||||
import com.r3corda.core.serialization.SingletonSerializationToken
|
||||
import com.r3corda.node.utilities.MutableClock
|
||||
import java.time.Clock
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.time.ZoneId
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
|
||||
/**
|
||||
* A [Clock] that can have the time advanced for use in testing.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class TestClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock(), SerializeAsToken {
|
||||
|
||||
private val token = SingletonSerializationToken(this)
|
||||
|
||||
override fun toToken(context: SerializeAsTokenContext) = SingletonSerializationToken.registerWithContext(token, this, context)
|
||||
|
||||
/**
|
||||
* Advance this [Clock] by the specified [Duration] for testing purposes.
|
||||
*/
|
||||
@Synchronized fun advanceBy(duration: Duration) {
|
||||
delegateClock = offset(delegateClock, duration)
|
||||
notifyMutationObservers()
|
||||
}
|
||||
|
||||
/**
|
||||
* Move this [Clock] to the specified [Instant] for testing purposes.
|
||||
*
|
||||
* This will only be approximate due to the time ticking away, but will be some time shortly after the requested [Instant].
|
||||
*/
|
||||
@Synchronized fun setTo(newInstant: Instant) = advanceBy(Duration.between(instant(), newInstant))
|
||||
|
||||
@Synchronized override fun instant(): Instant {
|
||||
return delegateClock.instant()
|
||||
}
|
||||
|
||||
@Deprecated("Do not use this. Instead seek to use ZonedDateTime methods.", level = DeprecationLevel.ERROR)
|
||||
override fun withZone(zone: ZoneId): Clock {
|
||||
throw UnsupportedOperationException("Tokenized clock does not support withZone()")
|
||||
}
|
||||
|
||||
@Synchronized override fun getZone(): ZoneId {
|
||||
return delegateClock.zone
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper method to set several [TestClock]s to approximately the same time. The clocks may drift by the time it
|
||||
* takes for each [TestClock] to have it's time set and any observers to execute.
|
||||
*/
|
||||
fun Iterable<TestClock>.setTo(instant: Instant) = this.forEach { it.setTo(instant) }
|
@ -1,69 +0,0 @@
|
||||
package com.r3corda.node.internal.testing
|
||||
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.r3corda.contracts.CommercialPaper
|
||||
import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER
|
||||
import com.r3corda.contracts.testing.fillWithSomeTestCash
|
||||
import com.r3corda.core.contracts.DOLLARS
|
||||
import com.r3corda.core.contracts.OwnableState
|
||||
import com.r3corda.core.contracts.SignedTransaction
|
||||
import com.r3corda.core.contracts.`issued by`
|
||||
import com.r3corda.core.days
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.seconds
|
||||
import com.r3corda.node.services.network.InMemoryMessagingNetwork
|
||||
import com.r3corda.protocols.TwoPartyTradeProtocol
|
||||
import java.time.Instant
|
||||
|
||||
/**
|
||||
* Simulates a never ending series of trades that go pair-wise through the banks (e.g. A and B trade with each other,
|
||||
* then B and C trade with each other, then C and A etc).
|
||||
*/
|
||||
class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(false, runAsync, latencyInjector) {
|
||||
override fun startMainSimulation(): ListenableFuture<Unit> {
|
||||
startTradingCircle { i, j -> tradeBetween(i, j) }
|
||||
return Futures.immediateFailedFuture(UnsupportedOperationException("This future never completes"))
|
||||
}
|
||||
|
||||
private fun tradeBetween(buyerBankIndex: Int, sellerBankIndex: Int): ListenableFuture<MutableList<SignedTransaction>> {
|
||||
val buyer = banks[buyerBankIndex]
|
||||
val seller = banks[sellerBankIndex]
|
||||
|
||||
buyer.services.fillWithSomeTestCash(1500.DOLLARS, notary.info.identity)
|
||||
|
||||
val issuance = run {
|
||||
val tx = CommercialPaper().generateIssue(seller.info.identity.ref(1, 2, 3), 1100.DOLLARS `issued by` DUMMY_CASH_ISSUER,
|
||||
Instant.now() + 10.days, notary.info.identity)
|
||||
tx.setTime(Instant.now(), 30.seconds)
|
||||
tx.signWith(notary.storage.myLegalIdentityKey)
|
||||
tx.signWith(seller.storage.myLegalIdentityKey)
|
||||
tx.toSignedTransaction(true)
|
||||
}
|
||||
seller.services.recordTransactions(issuance)
|
||||
|
||||
val amount = 1000.DOLLARS
|
||||
val sessionID = random63BitValue()
|
||||
val buyerProtocol = TwoPartyTradeProtocol.Buyer(
|
||||
seller.info.identity,
|
||||
notary.info.identity,
|
||||
amount,
|
||||
CommercialPaper.State::class.java,
|
||||
sessionID)
|
||||
val sellerProtocol = TwoPartyTradeProtocol.Seller(
|
||||
buyer.info.identity,
|
||||
notary.info,
|
||||
issuance.tx.outRef<OwnableState>(0),
|
||||
amount,
|
||||
seller.storage.myLegalIdentityKey,
|
||||
sessionID)
|
||||
|
||||
showConsensusFor(listOf(buyer, seller, notary))
|
||||
showProgressFor(listOf(buyer, seller))
|
||||
|
||||
val buyerFuture = buyer.services.startProtocol("bank.$buyerBankIndex.${TwoPartyTradeProtocol.TOPIC}.buyer", buyerProtocol)
|
||||
val sellerFuture = seller.services.startProtocol("bank.$sellerBankIndex.${TwoPartyTradeProtocol.TOPIC}.seller", sellerProtocol)
|
||||
|
||||
return Futures.successfulAsList(buyerFuture, sellerFuture)
|
||||
}
|
||||
}
|
@ -1,354 +0,0 @@
|
||||
package com.r3corda.node.services.network
|
||||
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.crypto.sha256
|
||||
import com.r3corda.core.messaging.*
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.api.MessagingServiceBuilder
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.node.services.network.InMemoryMessagingNetwork.InMemoryMessaging
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import kotlin.concurrent.schedule
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
/**
|
||||
* An in-memory network allows you to manufacture [InMemoryMessaging]s for a set of participants. Each
|
||||
* [InMemoryMessaging] maintains a queue of messages it has received, and a background thread that dispatches
|
||||
* messages one by one to registered handlers. Alternatively, a messaging system may be manually pumped, in which
|
||||
* case no thread is created and a caller is expected to force delivery one at a time (this is useful for unit
|
||||
* testing).
|
||||
*/
|
||||
@ThreadSafe
|
||||
class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
val MESSAGES_LOG_NAME = "messages"
|
||||
private val log = LoggerFactory.getLogger(MESSAGES_LOG_NAME)
|
||||
}
|
||||
|
||||
private var counter = 0 // -1 means stopped.
|
||||
private val handleEndpointMap = HashMap<Handle, InMemoryMessaging>()
|
||||
|
||||
data class MessageTransfer(val sender: InMemoryMessaging, val message: Message, val recipients: MessageRecipients) {
|
||||
override fun toString() = "${message.topicSession} from '${sender.myAddress}' to '$recipients'"
|
||||
}
|
||||
|
||||
// All sent messages are kept here until pumpSend is called, or manuallyPumped is set to false
|
||||
// The corresponding sentMessages stream reflects when a message was pumpSend'd
|
||||
private val messageSendQueue = LinkedBlockingQueue<MessageTransfer>()
|
||||
private val _sentMessages = PublishSubject.create<MessageTransfer>()
|
||||
@Suppress("unused") // Used by the visualiser tool.
|
||||
/** A stream of (sender, message, recipients) triples */
|
||||
val sentMessages: Observable<MessageTransfer>
|
||||
get() = _sentMessages
|
||||
|
||||
// All messages are kept here until the messages are pumped off the queue by a caller to the node class.
|
||||
// Queues are created on-demand when a message is sent to an address: the receiving node doesn't have to have
|
||||
// been created yet. If the node identified by the given handle has gone away/been shut down then messages
|
||||
// stack up here waiting for it to come back. The intent of this is to simulate a reliable messaging network.
|
||||
// The corresponding stream reflects when a message was pumpReceive'd
|
||||
private val messageReceiveQueues = HashMap<Handle, LinkedBlockingQueue<MessageTransfer>>()
|
||||
private val _receivedMessages = PublishSubject.create<MessageTransfer>()
|
||||
|
||||
@Suppress("unused") // Used by the visualiser tool.
|
||||
/** A stream of (sender, message, recipients) triples */
|
||||
val receivedMessages: Observable<MessageTransfer>
|
||||
get() = _receivedMessages
|
||||
|
||||
val endpoints: List<InMemoryMessaging> @Synchronized get() = handleEndpointMap.values.toList()
|
||||
|
||||
/**
|
||||
* Creates a node and returns the new object that identifies its location on the network to senders, and the
|
||||
* [InMemoryMessaging] that the recipient/in-memory node uses to receive messages and send messages itself.
|
||||
*
|
||||
* If [manuallyPumped] is set to true, then you are expected to call the [InMemoryMessaging.pump] method on the [InMemoryMessaging]
|
||||
* in order to cause the delivery of a single message, which will occur on the thread of the caller. If set to false
|
||||
* then this class will set up a background thread to deliver messages asynchronously, if the handler specifies no
|
||||
* executor.
|
||||
*/
|
||||
@Synchronized
|
||||
fun createNode(manuallyPumped: Boolean): Pair<Handle, MessagingServiceBuilder<InMemoryMessaging>> {
|
||||
check(counter >= 0) { "In memory network stopped: please recreate." }
|
||||
val builder = createNodeWithID(manuallyPumped, counter) as Builder
|
||||
counter++
|
||||
val id = builder.id
|
||||
return Pair(id, builder)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a node at the given address: useful if you want to recreate a node to simulate a restart.
|
||||
*
|
||||
* @param manuallyPumped see [createNode].
|
||||
* @param id the numeric ID to use, e.g. set to whatever ID the node used last time.
|
||||
* @param description text string that identifies this node for message logging (if is enabled) or null to autogenerate.
|
||||
*/
|
||||
fun createNodeWithID(manuallyPumped: Boolean, id: Int, description: String? = null): MessagingServiceBuilder<InMemoryMessaging> {
|
||||
return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"))
|
||||
}
|
||||
|
||||
interface LatencyCalculator {
|
||||
fun between(sender: SingleMessageRecipient, receiver: SingleMessageRecipient): Duration
|
||||
}
|
||||
|
||||
/** This can be set to an object which can inject artificial latency between sender/recipient pairs. */
|
||||
@Volatile var latencyCalculator: LatencyCalculator? = null
|
||||
private val timer = Timer()
|
||||
|
||||
@Synchronized
|
||||
private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) {
|
||||
val transfer = MessageTransfer(from, message, recipients)
|
||||
messageSendQueue.add(transfer)
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
private fun netNodeHasShutdown(handle: Handle) {
|
||||
handleEndpointMap.remove(handle)
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
private fun getQueueForHandle(recipients: Handle) = messageReceiveQueues.getOrPut(recipients) { LinkedBlockingQueue() }
|
||||
|
||||
val everyoneOnline: AllPossibleRecipients = object : AllPossibleRecipients {}
|
||||
|
||||
fun stop() {
|
||||
val nodes = synchronized(this) {
|
||||
counter = -1
|
||||
handleEndpointMap.values.toList()
|
||||
}
|
||||
|
||||
for (node in nodes)
|
||||
node.stop()
|
||||
|
||||
handleEndpointMap.clear()
|
||||
messageReceiveQueues.clear()
|
||||
}
|
||||
|
||||
inner class Builder(val manuallyPumped: Boolean, val id: Handle) : MessagingServiceBuilder<InMemoryMessaging> {
|
||||
override fun start(): ListenableFuture<InMemoryMessaging> {
|
||||
synchronized(this@InMemoryMessagingNetwork) {
|
||||
val node = InMemoryMessaging(manuallyPumped, id)
|
||||
handleEndpointMap[id] = node
|
||||
return Futures.immediateFuture(node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class Handle(val id: Int, val description: String) : SingleMessageRecipient {
|
||||
override fun toString() = description
|
||||
override fun equals(other: Any?) = other is Handle && other.id == id
|
||||
override fun hashCode() = id.hashCode()
|
||||
}
|
||||
|
||||
// If block is set to true this function will only return once a message has been pushed onto the recipients' queues
|
||||
fun pumpSend(block: Boolean): MessageTransfer? {
|
||||
val transfer = (if (block) messageSendQueue.take() else messageSendQueue.poll()) ?: return null
|
||||
val recipients = transfer.recipients
|
||||
val from = transfer.sender.myAddress
|
||||
|
||||
log.trace { transfer.toString() }
|
||||
val calc = latencyCalculator
|
||||
if (calc != null && recipients is SingleMessageRecipient) {
|
||||
val messageSent = SettableFuture.create<Unit>()
|
||||
// Inject some artificial latency.
|
||||
timer.schedule(calc.between(from, recipients).toMillis()) {
|
||||
pumpSendInternal(transfer)
|
||||
messageSent.set(Unit)
|
||||
}
|
||||
if (block) {
|
||||
messageSent.get()
|
||||
}
|
||||
} else {
|
||||
pumpSendInternal(transfer)
|
||||
}
|
||||
|
||||
return transfer
|
||||
}
|
||||
|
||||
fun pumpSendInternal(transfer: MessageTransfer) {
|
||||
when (transfer.recipients) {
|
||||
is Handle -> getQueueForHandle(transfer.recipients).add(transfer)
|
||||
|
||||
is AllPossibleRecipients -> {
|
||||
// This means all possible recipients _that the network knows about at the time_, not literally everyone
|
||||
// who joins into the indefinite future.
|
||||
for (handle in handleEndpointMap.keys)
|
||||
getQueueForHandle(handle).add(transfer)
|
||||
}
|
||||
else -> throw IllegalArgumentException("Unknown type of recipient handle")
|
||||
}
|
||||
_sentMessages.onNext(transfer)
|
||||
}
|
||||
|
||||
/**
|
||||
* An [InMemoryMessaging] provides a [MessagingService] that isn't backed by any kind of network or disk storage
|
||||
* system, but just uses regular queues on the heap instead. It is intended for unit testing and developer convenience
|
||||
* when all entities on 'the network' are being simulated in-process.
|
||||
*
|
||||
* An instance can be obtained by creating a builder and then using the start method.
|
||||
*/
|
||||
@ThreadSafe
|
||||
inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : SingletonSerializeAsToken(), MessagingServiceInternal {
|
||||
inner class Handler(val executor: Executor?, val topicSession: TopicSession,
|
||||
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
|
||||
@Volatile
|
||||
private var running = true
|
||||
|
||||
private inner class InnerState {
|
||||
val handlers: MutableList<Handler> = ArrayList()
|
||||
val pendingRedelivery = LinkedList<MessageTransfer>()
|
||||
}
|
||||
|
||||
private val state = ThreadBox(InnerState())
|
||||
|
||||
override val myAddress: SingleMessageRecipient = handle
|
||||
|
||||
private val backgroundThread = if (manuallyPumped) null else
|
||||
thread(isDaemon = true, name = "In-memory message dispatcher") {
|
||||
while (!Thread.currentThread().isInterrupted) {
|
||||
try {
|
||||
pumpReceiveInternal(true)
|
||||
} catch(e: InterruptedException) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
|
||||
= addMessageHandler(TopicSession(topic, sessionID), executor, callback)
|
||||
|
||||
override fun addMessageHandler(topicSession: TopicSession, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
|
||||
check(running)
|
||||
val (handler, items) = state.locked {
|
||||
val handler = Handler(executor, topicSession, callback).apply { handlers.add(this) }
|
||||
val items = ArrayList(pendingRedelivery)
|
||||
pendingRedelivery.clear()
|
||||
Pair(handler, items)
|
||||
}
|
||||
for ((sender, message) in items) {
|
||||
send(message, handle)
|
||||
}
|
||||
return handler
|
||||
}
|
||||
|
||||
override fun removeMessageHandler(registration: MessageHandlerRegistration) {
|
||||
check(running)
|
||||
state.locked { check(handlers.remove(registration as Handler)) }
|
||||
}
|
||||
|
||||
override fun send(message: Message, target: MessageRecipients) {
|
||||
check(running)
|
||||
msgSend(this, message, target)
|
||||
if (!sendManuallyPumped) {
|
||||
pumpSend(false)
|
||||
}
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
if (backgroundThread != null) {
|
||||
backgroundThread.interrupt()
|
||||
backgroundThread.join()
|
||||
}
|
||||
running = false
|
||||
netNodeHasShutdown(handle)
|
||||
}
|
||||
|
||||
/** Returns the given (topic & session, data) pair as a newly created message object. */
|
||||
override fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message
|
||||
= createMessage(TopicSession(topic, sessionID), data)
|
||||
|
||||
/** Returns the given (topic & session, data) pair as a newly created message object. */
|
||||
override fun createMessage(topicSession: TopicSession, data: ByteArray): Message {
|
||||
return object : Message {
|
||||
override val topicSession: TopicSession get() = topicSession
|
||||
override val data: ByteArray get() = data
|
||||
override val debugTimestamp: Instant = Instant.now()
|
||||
override fun serialise(): ByteArray = this.serialise()
|
||||
override val debugMessageID: String get() = serialise().sha256().prefixChars()
|
||||
|
||||
override fun toString() = topicSession.toString() + "#" + String(data)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delivers a single message from the internal queue. If there are no messages waiting to be delivered and block
|
||||
* is true, waits until one has been provided on a different thread via send. If block is false, the return
|
||||
* result indicates whether a message was delivered or not.
|
||||
*
|
||||
* @return the message that was processed, if any in this round.
|
||||
*/
|
||||
fun pumpReceive(block: Boolean): MessageTransfer? {
|
||||
check(manuallyPumped)
|
||||
check(running)
|
||||
return pumpReceiveInternal(block)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next transfer, and matching queue, that is ready to handle. Any pending transfers without handlers
|
||||
* are placed into `pendingRedelivery` to try again later.
|
||||
*
|
||||
* @param block if this should block until a message it can process.
|
||||
*/
|
||||
private fun getNextQueue(q: LinkedBlockingQueue<MessageTransfer>, block: Boolean): Pair<MessageTransfer, List<Handler>>? {
|
||||
var deliverTo: List<Handler>? = null
|
||||
// Pop transfers off the queue until we run out (and are not blocking), or find something we can process
|
||||
while (deliverTo == null) {
|
||||
val transfer = (if (block) q.take() else q.poll()) ?: return null
|
||||
deliverTo = state.locked {
|
||||
val h = handlers.filter { if (it.topicSession.isBlank()) true else transfer.message.topicSession == it.topicSession }
|
||||
|
||||
if (h.isEmpty()) {
|
||||
// Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
|
||||
// handler has been registered. The purpose of this path is to make unit tests that have multi-threading
|
||||
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
|
||||
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
|
||||
// least sometimes.
|
||||
log.warn("Message to ${transfer.message.topicSession} could not be delivered")
|
||||
pendingRedelivery.add(transfer)
|
||||
null
|
||||
} else {
|
||||
h
|
||||
}
|
||||
}
|
||||
if (deliverTo != null) {
|
||||
return Pair(transfer, deliverTo)
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
private fun pumpReceiveInternal(block: Boolean): MessageTransfer? {
|
||||
val q = getQueueForHandle(handle)
|
||||
val next = getNextQueue(q, block) ?: return null
|
||||
val (transfer, deliverTo) = next
|
||||
|
||||
for (handler in deliverTo) {
|
||||
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
|
||||
(handler.executor ?: MoreExecutors.directExecutor()).execute {
|
||||
try {
|
||||
handler.callback(transfer.message, handler)
|
||||
} catch(e: Exception) {
|
||||
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topicSession}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_receivedMessages.onNext(transfer)
|
||||
|
||||
return transfer
|
||||
}
|
||||
}
|
||||
}
|
@ -1,44 +0,0 @@
|
||||
package com.r3corda.node.services.network
|
||||
|
||||
import co.paralleluniverse.common.util.VisibleForTesting
|
||||
import com.r3corda.core.crypto.DummyPublicKey
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
|
||||
/**
|
||||
* Network map cache with no backing map service.
|
||||
*/
|
||||
class MockNetworkMapCache() : InMemoryNetworkMapCache() {
|
||||
override val changed: Observable<NetworkMapCache.MapChange> = PublishSubject.create<NetworkMapCache.MapChange>()
|
||||
|
||||
data class MockAddress(val id: String): SingleMessageRecipient
|
||||
|
||||
init {
|
||||
val mockNodeA = NodeInfo(MockAddress("bankC:8080"), Party("Bank C", DummyPublicKey("Bank C")))
|
||||
val mockNodeB = NodeInfo(MockAddress("bankD:8080"), Party("Bank D", DummyPublicKey("Bank D")))
|
||||
registeredNodes[mockNodeA.identity] = mockNodeA
|
||||
registeredNodes[mockNodeB.identity] = mockNodeB
|
||||
}
|
||||
|
||||
/**
|
||||
* Directly add a registration to the internal cache. DOES NOT fire the change listeners, as it's
|
||||
* not a change being received.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
fun addRegistration(node: NodeInfo) {
|
||||
registeredNodes[node.identity] = node
|
||||
}
|
||||
|
||||
/**
|
||||
* Directly remove a registration from the internal cache. DOES NOT fire the change listeners, as it's
|
||||
* not a change being received.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
fun deleteRegistration(identity: Party) : Boolean {
|
||||
return registeredNodes.remove(identity) != null
|
||||
}
|
||||
}
|
@ -6,7 +6,7 @@ import com.r3corda.core.crypto.sha256
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.serialization.OpaqueBytes
|
||||
import com.r3corda.node.internal.testing.MockNetwork
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.persistence.NodeAttachmentService
|
||||
|
@ -5,7 +5,7 @@ package com.r3corda.node.messaging
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.messaging.TopicStringValidator
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.node.internal.testing.MockNetwork
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
|
@ -18,9 +18,9 @@ import com.r3corda.core.utilities.DUMMY_NOTARY
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
|
||||
import com.r3corda.core.utilities.LogHelper
|
||||
import com.r3corda.core.utilities.TEST_TX_TIME
|
||||
import com.r3corda.node.internal.testing.MockNetwork
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.network.InMemoryMessagingNetwork
|
||||
import com.r3corda.testing.node.InMemoryMessagingNetwork
|
||||
import com.r3corda.node.services.persistence.NodeAttachmentService
|
||||
import com.r3corda.node.services.persistence.PerFileTransactionStorage
|
||||
import com.r3corda.node.services.persistence.StorageServiceImpl
|
||||
|
@ -1,6 +1,6 @@
|
||||
package com.r3corda.node.services
|
||||
|
||||
import com.r3corda.node.internal.testing.MockNetwork
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
|
||||
|
@ -5,7 +5,7 @@ import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.node.internal.testing.MockNetwork
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapService
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.network.NodeRegistration
|
||||
|
@ -10,7 +10,7 @@ import com.r3corda.node.serialization.NodeClock
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.node.services.api.MonitoringService
|
||||
import com.r3corda.node.services.api.ServiceHubInternal
|
||||
import com.r3corda.node.services.network.MockNetworkMapCache
|
||||
import com.r3corda.testing.node.MockNetworkMapCache
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.persistence.DataVending
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
|
@ -13,7 +13,7 @@ import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.generateKeyPair
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY
|
||||
import com.r3corda.core.utilities.LogHelper
|
||||
import com.r3corda.node.internal.testing.MockNetwork
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import com.r3corda.node.services.clientapi.NodeInterestRates
|
||||
import com.r3corda.protocols.RatesFixProtocol
|
||||
import com.r3corda.testing.ALICE_PUBKEY
|
||||
|
@ -11,9 +11,9 @@ import com.r3corda.core.protocols.ProtocolLogicRef
|
||||
import com.r3corda.core.protocols.ProtocolLogicRefFactory
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY
|
||||
import com.r3corda.node.internal.testing.TestClock
|
||||
import com.r3corda.testing.node.TestClock
|
||||
import com.r3corda.node.services.events.NodeSchedulerService
|
||||
import com.r3corda.node.services.network.InMemoryMessagingNetwork
|
||||
import com.r3corda.testing.node.InMemoryMessagingNetwork
|
||||
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
|
@ -7,7 +7,7 @@ import com.r3corda.core.seconds
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
|
||||
import com.r3corda.node.internal.AbstractNode
|
||||
import com.r3corda.node.internal.testing.MockNetwork
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.r3corda.protocols.NotaryChangeProtocol
|
||||
|
@ -5,7 +5,7 @@ import com.r3corda.core.contracts.TransactionType
|
||||
import com.r3corda.core.seconds
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
|
||||
import com.r3corda.node.internal.testing.MockNetwork
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.r3corda.protocols.NotaryError
|
||||
|
@ -5,7 +5,7 @@ import com.r3corda.core.contracts.DummyContract
|
||||
import com.r3corda.core.contracts.TransactionType
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
|
||||
import com.r3corda.node.internal.testing.MockNetwork
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.transactions.ValidatingNotaryService
|
||||
import com.r3corda.protocols.NotaryError
|
||||
|
@ -16,7 +16,7 @@ import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.serialization.serialize
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY
|
||||
import com.r3corda.core.utilities.DUMMY_PUBKEY_1
|
||||
import com.r3corda.node.internal.testing.MockNetwork
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import com.r3corda.node.services.monitor.*
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
|
@ -6,7 +6,7 @@ import com.r3corda.core.contracts.Issued
|
||||
import com.r3corda.core.contracts.TransactionType
|
||||
import com.r3corda.core.contracts.USD
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY
|
||||
import com.r3corda.node.internal.testing.MockNetwork
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import com.r3corda.testing.MEGA_CORP
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
|
@ -7,7 +7,7 @@ import com.r3corda.node.services.MockServiceHubInternal
|
||||
import com.r3corda.node.services.api.Checkpoint
|
||||
import com.r3corda.node.services.api.CheckpointStorage
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.node.services.network.InMemoryMessagingNetwork
|
||||
import com.r3corda.testing.node.InMemoryMessagingNetwork
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
|
@ -5,7 +5,7 @@ import co.paralleluniverse.fibers.FiberExecutorScheduler
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.node.internal.testing.TestClock
|
||||
import com.r3corda.testing.node.TestClock
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
|
@ -5,7 +5,7 @@ import co.paralleluniverse.fibers.FiberExecutorScheduler
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import com.r3corda.core.RetryableException
|
||||
import com.r3corda.node.internal.testing.TestClock
|
||||
import com.r3corda.testing.node.TestClock
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
|
Reference in New Issue
Block a user