mirror of
https://github.com/corda/corda.git
synced 2025-02-20 17:33:15 +00:00
node: add ability to pump sending as well as receiving, expose both streams
This commit is contained in:
parent
634d109e97
commit
03c1550394
@ -26,7 +26,7 @@ import java.util.*
|
||||
/**
|
||||
* A simulation in which banks execute interest rate swaps with each other, including the fixing events.
|
||||
*/
|
||||
class IRSSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(runAsync, latencyInjector) {
|
||||
class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(networkSendManuallyPumped, runAsync, latencyInjector) {
|
||||
val om = JsonSupport.createDefaultMapper(MockIdentityService(network.identities))
|
||||
|
||||
init {
|
||||
@ -152,4 +152,4 @@ class IRSSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork
|
||||
executeOnNextIteration.removeAt(0)()
|
||||
return super.iterate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -36,11 +36,12 @@ import java.util.*
|
||||
*
|
||||
* BriefLogFormatter.initVerbose("+messaging")
|
||||
*/
|
||||
class MockNetwork(private val threadPerNode: Boolean = false,
|
||||
class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
private val threadPerNode: Boolean = false,
|
||||
private val defaultFactory: Factory = MockNetwork.DefaultFactory) {
|
||||
private var counter = 0
|
||||
val filesystem = Jimfs.newFileSystem(Configuration.unix())
|
||||
val messagingNetwork = InMemoryMessagingNetwork()
|
||||
val messagingNetwork = InMemoryMessagingNetwork(networkSendManuallyPumped)
|
||||
|
||||
val identities = ArrayList<Party>()
|
||||
|
||||
@ -133,13 +134,17 @@ class MockNetwork(private val threadPerNode: Boolean = false,
|
||||
* stability (no nodes sent any messages in the last round).
|
||||
*/
|
||||
fun runNetwork(rounds: Int = -1) {
|
||||
fun pumpAll() = messagingNetwork.endpoints.map { it.pump(false) }
|
||||
check(!networkSendManuallyPumped)
|
||||
fun pumpAll() = messagingNetwork.endpoints.map { it.pumpReceive(false) }
|
||||
|
||||
if (rounds == -1)
|
||||
if (rounds == -1) {
|
||||
while (pumpAll().any { it != null }) {
|
||||
}
|
||||
else
|
||||
repeat(rounds) { pumpAll() }
|
||||
} else {
|
||||
repeat(rounds) {
|
||||
pumpAll()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -168,4 +173,4 @@ class MockNetwork(private val threadPerNode: Boolean = false,
|
||||
require(nodes.isNotEmpty())
|
||||
nodes.forEach { if (it.started) it.stop() }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -28,7 +28,8 @@ import java.util.*
|
||||
* Sets up some nodes that can run protocols between each other, and exposes their progress trackers. Provides banks
|
||||
* in a few cities around the world.
|
||||
*/
|
||||
abstract class Simulation(val runAsync: Boolean,
|
||||
abstract class Simulation(val networkSendManuallyPumped: Boolean,
|
||||
val runAsync: Boolean,
|
||||
val latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) {
|
||||
init {
|
||||
if (!runAsync && latencyInjector != null)
|
||||
@ -130,7 +131,7 @@ abstract class Simulation(val runAsync: Boolean,
|
||||
}
|
||||
}
|
||||
|
||||
val network = MockNetwork(runAsync)
|
||||
val network = MockNetwork(networkSendManuallyPumped, runAsync)
|
||||
// This one must come first.
|
||||
val networkMap: SimulatedNode
|
||||
= network.createNode(null, nodeFactory = NetworkMapNodeFactory, advertisedServices = NetworkMapService.Type) as SimulatedNode
|
||||
@ -181,11 +182,16 @@ abstract class Simulation(val runAsync: Boolean,
|
||||
* @return the message that was processed, or null if no node accepted a message in this round.
|
||||
*/
|
||||
open fun iterate(): InMemoryMessagingNetwork.MessageTransfer? {
|
||||
|
||||
if (networkSendManuallyPumped) {
|
||||
network.messagingNetwork.pumpSend(false)
|
||||
}
|
||||
|
||||
// Keep going until one of the nodes has something to do, or we have checked every node.
|
||||
val endpoints = network.messagingNetwork.endpoints
|
||||
var countDown = endpoints.size
|
||||
while (countDown > 0) {
|
||||
val handledMessage = endpoints[pumpCursor].pump(false)
|
||||
val handledMessage = endpoints[pumpCursor].pumpReceive(false)
|
||||
if (handledMessage != null)
|
||||
return handledMessage
|
||||
// If this node had nothing to do, advance the cursor with wraparound and try again.
|
||||
|
@ -19,7 +19,7 @@ import java.time.Instant
|
||||
* Simulates a never ending series of trades that go pair-wise through the banks (e.g. A and B trade with each other,
|
||||
* then B and C trade with each other, then C and A etc).
|
||||
*/
|
||||
class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(runAsync, latencyInjector) {
|
||||
class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(false, runAsync, latencyInjector) {
|
||||
override fun startMainSimulation(): ListenableFuture<Unit> {
|
||||
startTradingCircle { i, j -> tradeBetween(i, j) }
|
||||
return Futures.immediateFailedFuture(UnsupportedOperationException("This future never completes"))
|
||||
@ -57,4 +57,4 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
|
||||
|
||||
return Futures.successfulAsList(buyerFuture, sellerFuture)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package com.r3corda.node.services.network
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.crypto.sha256
|
||||
import com.r3corda.core.messaging.*
|
||||
@ -29,7 +30,7 @@ import kotlin.concurrent.thread
|
||||
* testing).
|
||||
*/
|
||||
@ThreadSafe
|
||||
class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
|
||||
class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
val MESSAGES_LOG_NAME = "messages"
|
||||
private val log = LoggerFactory.getLogger(MESSAGES_LOG_NAME)
|
||||
@ -42,11 +43,24 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
|
||||
override fun toString() = "${message.topic} from '${sender.myAddress}' to '$recipients'"
|
||||
}
|
||||
|
||||
// All sent messages are kept here until pumpSend is called, or manuallyPumped is set to false
|
||||
// The corresponding sentMessages stream reflects when a message was pumpSend'd
|
||||
private val messageSendQueue = LinkedBlockingQueue<MessageTransfer>()
|
||||
private val _sentMessages = PublishSubject.create<MessageTransfer>()
|
||||
/** A stream of (sender, message, recipients) triples */
|
||||
val sentMessages: Observable<MessageTransfer>
|
||||
get() = _sentMessages
|
||||
|
||||
// All messages are kept here until the messages are pumped off the queue by a caller to the node class.
|
||||
// Queues are created on-demand when a message is sent to an address: the receiving node doesn't have to have
|
||||
// been created yet. If the node identified by the given handle has gone away/been shut down then messages
|
||||
// stack up here waiting for it to come back. The intent of this is to simulate a reliable messaging network.
|
||||
private val messageQueues = HashMap<Handle, LinkedBlockingQueue<MessageTransfer>>()
|
||||
// The corresponding stream reflects when a message was pumpReceive'd
|
||||
private val messageReceiveQueues = HashMap<Handle, LinkedBlockingQueue<MessageTransfer>>()
|
||||
private val _receivedMessages = PublishSubject.create<MessageTransfer>()
|
||||
/** A stream of (sender, message, recipients) triples */
|
||||
val receivedMessages: Observable<MessageTransfer>
|
||||
get() = _receivedMessages
|
||||
|
||||
val endpoints: List<InMemoryMessaging> @Synchronized get() = handleEndpointMap.values.toList()
|
||||
|
||||
@ -79,11 +93,6 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
|
||||
return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"))
|
||||
}
|
||||
|
||||
private val _allMessages = PublishSubject.create<MessageTransfer>()
|
||||
/** A stream of (sender, message, recipients) triples */
|
||||
val allMessages: Observable<MessageTransfer>
|
||||
get() = _allMessages
|
||||
|
||||
interface LatencyCalculator {
|
||||
fun between(sender: SingleMessageRecipient, receiver: SingleMessageRecipient): Duration
|
||||
}
|
||||
@ -95,30 +104,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
|
||||
@Synchronized
|
||||
private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) {
|
||||
val transfer = MessageTransfer(from, message, recipients)
|
||||
log.trace { transfer.toString() }
|
||||
val calc = latencyCalculator
|
||||
if (calc != null && recipients is SingleMessageRecipient) {
|
||||
// Inject some artificial latency.
|
||||
timer.schedule(calc.between(from.myAddress, recipients).toMillis()) {
|
||||
msgSendInternal(transfer)
|
||||
}
|
||||
} else {
|
||||
msgSendInternal(transfer)
|
||||
}
|
||||
}
|
||||
|
||||
private fun msgSendInternal(transfer: MessageTransfer) {
|
||||
when (transfer.recipients) {
|
||||
is Handle -> getQueueForHandle(transfer.recipients).add(transfer)
|
||||
|
||||
is AllPossibleRecipients -> {
|
||||
// This means all possible recipients _that the network knows about at the time_, not literally everyone
|
||||
// who joins into the indefinite future.
|
||||
for (handle in handleEndpointMap.keys)
|
||||
getQueueForHandle(handle).add(transfer)
|
||||
}
|
||||
else -> throw IllegalArgumentException("Unknown type of recipient handle")
|
||||
}
|
||||
messageSendQueue.add(transfer)
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
@ -127,7 +113,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
private fun getQueueForHandle(recipients: Handle) = messageQueues.getOrPut(recipients) { LinkedBlockingQueue() }
|
||||
private fun getQueueForHandle(recipients: Handle) = messageReceiveQueues.getOrPut(recipients) { LinkedBlockingQueue() }
|
||||
|
||||
val everyoneOnline: AllPossibleRecipients = object : AllPossibleRecipients {}
|
||||
|
||||
@ -141,7 +127,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
|
||||
node.stop()
|
||||
|
||||
handleEndpointMap.clear()
|
||||
messageQueues.clear()
|
||||
messageReceiveQueues.clear()
|
||||
}
|
||||
|
||||
inner class Builder(val manuallyPumped: Boolean, val id: Handle) : MessagingServiceBuilder<InMemoryMessaging> {
|
||||
@ -160,6 +146,46 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
|
||||
override fun hashCode() = id.hashCode()
|
||||
}
|
||||
|
||||
// If block is set to true this function will only return once a message has been pushed onto the recipients' queues
|
||||
fun pumpSend(block: Boolean): MessageTransfer? {
|
||||
val transfer = (if (block) messageSendQueue.take() else messageSendQueue.poll()) ?: return null
|
||||
val recipients = transfer.recipients
|
||||
val from = transfer.sender.myAddress
|
||||
|
||||
log.trace { transfer.toString() }
|
||||
val calc = latencyCalculator
|
||||
if (calc != null && recipients is SingleMessageRecipient) {
|
||||
val messageSent = SettableFuture.create<Unit>()
|
||||
// Inject some artificial latency.
|
||||
timer.schedule(calc.between(from, recipients).toMillis()) {
|
||||
pumpSendInternal(transfer)
|
||||
messageSent.set(Unit)
|
||||
}
|
||||
if (block) {
|
||||
messageSent.get()
|
||||
}
|
||||
} else {
|
||||
pumpSendInternal(transfer)
|
||||
}
|
||||
|
||||
return transfer
|
||||
}
|
||||
|
||||
fun pumpSendInternal(transfer: MessageTransfer) {
|
||||
when (transfer.recipients) {
|
||||
is Handle -> getQueueForHandle(transfer.recipients).add(transfer)
|
||||
|
||||
is AllPossibleRecipients -> {
|
||||
// This means all possible recipients _that the network knows about at the time_, not literally everyone
|
||||
// who joins into the indefinite future.
|
||||
for (handle in handleEndpointMap.keys)
|
||||
getQueueForHandle(handle).add(transfer)
|
||||
}
|
||||
else -> throw IllegalArgumentException("Unknown type of recipient handle")
|
||||
}
|
||||
_sentMessages.onNext(transfer)
|
||||
}
|
||||
|
||||
/**
|
||||
* An [InMemoryMessaging] provides a [MessagingService] that isn't backed by any kind of network or disk storage
|
||||
* system, but just uses regular queues on the heap instead. It is intended for unit testing and developer convenience
|
||||
@ -188,7 +214,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
|
||||
thread(isDaemon = true, name = "In-memory message dispatcher") {
|
||||
while (!Thread.currentThread().isInterrupted) {
|
||||
try {
|
||||
pumpInternal(true)
|
||||
pumpReceiveInternal(true)
|
||||
} catch(e: InterruptedException) {
|
||||
break
|
||||
}
|
||||
@ -203,8 +229,9 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
|
||||
pendingRedelivery.clear()
|
||||
Pair(handler, items)
|
||||
}
|
||||
for (it in items)
|
||||
msgSend(this, it.message, handle)
|
||||
for (it in items) {
|
||||
send(it.message, handle)
|
||||
}
|
||||
return handler
|
||||
}
|
||||
|
||||
@ -216,6 +243,9 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
|
||||
override fun send(message: Message, target: MessageRecipients) {
|
||||
check(running)
|
||||
msgSend(this, message, target)
|
||||
if (!sendManuallyPumped) {
|
||||
pumpSend(false)
|
||||
}
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
@ -247,13 +277,13 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
|
||||
*
|
||||
* @return the message that was processed, if any in this round.
|
||||
*/
|
||||
fun pump(block: Boolean): MessageTransfer? {
|
||||
fun pumpReceive(block: Boolean): MessageTransfer? {
|
||||
check(manuallyPumped)
|
||||
check(running)
|
||||
return pumpInternal(block)
|
||||
return pumpReceiveInternal(block)
|
||||
}
|
||||
|
||||
private fun pumpInternal(block: Boolean): MessageTransfer? {
|
||||
private fun pumpReceiveInternal(block: Boolean): MessageTransfer? {
|
||||
val q = getQueueForHandle(handle)
|
||||
val transfer = (if (block) q.take() else q.poll()) ?: return null
|
||||
val deliverTo = state.locked {
|
||||
@ -276,7 +306,6 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
|
||||
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
|
||||
(handler.executor ?: MoreExecutors.directExecutor()).execute {
|
||||
try {
|
||||
_allMessages.onNext(transfer)
|
||||
handler.callback(transfer.message, handler)
|
||||
} catch(e: Exception) {
|
||||
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topic}", e)
|
||||
@ -284,6 +313,8 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
|
||||
}
|
||||
}
|
||||
|
||||
_receivedMessages.onNext(transfer)
|
||||
|
||||
return transfer
|
||||
}
|
||||
}
|
||||
|
@ -118,4 +118,4 @@ class AttachmentTests {
|
||||
rootCauseExceptions { f1.get() }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -87,7 +87,7 @@ class TwoPartyTradeProtocolTests {
|
||||
// We run this in parallel threads to help catch any race conditions that may exist. The other tests
|
||||
// we run in the unit test thread exclusively to speed things up, ensure deterministic results and
|
||||
// allow interruption half way through.
|
||||
net = MockNetwork(true)
|
||||
net = MockNetwork(false, true)
|
||||
transactionGroupFor<ContractState> {
|
||||
val notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
|
||||
val aliceNode = net.createPartyNode(notaryNode.info, ALICE.name, ALICE_KEY)
|
||||
@ -173,9 +173,9 @@ class TwoPartyTradeProtocolTests {
|
||||
|
||||
// Everything is on this thread so we can now step through the protocol one step at a time.
|
||||
// Seller Alice already sent a message to Buyer Bob. Pump once:
|
||||
fun pumpAlice() = (aliceNode.net as InMemoryMessagingNetwork.InMemoryMessaging).pump(false)
|
||||
fun pumpAlice() = (aliceNode.net as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(false)
|
||||
|
||||
fun pumpBob() = (bobNode.net as InMemoryMessagingNetwork.InMemoryMessaging).pump(false)
|
||||
fun pumpBob() = (bobNode.net as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(false)
|
||||
|
||||
pumpBob()
|
||||
|
||||
|
@ -10,7 +10,7 @@ class IRSSimulationTest {
|
||||
|
||||
@Test fun `runs to completion`() {
|
||||
BriefLogFormatter.initVerbose("messaging")
|
||||
val sim = IRSSimulation(false, null)
|
||||
val sim = IRSSimulation(false, false, null)
|
||||
val future = sim.start()
|
||||
while (!future.isDone) sim.iterate()
|
||||
try {
|
||||
|
Loading…
x
Reference in New Issue
Block a user