Merged in R3-CEV/internal/in-memory-network-pump-send (pull request #148)

Add ability to pump sending as well as receiving, expose both streams
This commit is contained in:
Andras Slemmer 2016-06-20 12:41:42 +01:00
commit ab277d2022
10 changed files with 113 additions and 119 deletions

View File

@ -26,7 +26,7 @@ import java.util.*
/**
* A simulation in which banks execute interest rate swaps with each other, including the fixing events.
*/
class IRSSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(runAsync, latencyInjector) {
class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(networkSendManuallyPumped, runAsync, latencyInjector) {
val om = JsonSupport.createDefaultMapper(MockIdentityService(network.identities))
init {
@ -105,8 +105,8 @@ class IRSSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork
val retFuture = SettableFuture.create<Unit>()
val futA = node1.smm.add("floater", sideA)
executeOnNextIteration += {
val futB = node2.smm.add("fixer", sideB)
executeOnNextIteration += {
Futures.allAsList(futA, futB) success {
retFuture.set(null)
} failure { throwable ->

View File

@ -36,11 +36,12 @@ import java.util.*
*
* BriefLogFormatter.initVerbose("+messaging")
*/
class MockNetwork(private val threadPerNode: Boolean = false,
class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
private val threadPerNode: Boolean = false,
private val defaultFactory: Factory = MockNetwork.DefaultFactory) {
private var counter = 0
val filesystem = Jimfs.newFileSystem(Configuration.unix())
val messagingNetwork = InMemoryMessagingNetwork()
val messagingNetwork = InMemoryMessagingNetwork(networkSendManuallyPumped)
val identities = ArrayList<Party>()
@ -133,13 +134,17 @@ class MockNetwork(private val threadPerNode: Boolean = false,
* stability (no nodes sent any messages in the last round).
*/
fun runNetwork(rounds: Int = -1) {
fun pumpAll() = messagingNetwork.endpoints.map { it.pump(false) }
check(!networkSendManuallyPumped)
fun pumpAll() = messagingNetwork.endpoints.map { it.pumpReceive(false) }
if (rounds == -1)
if (rounds == -1) {
while (pumpAll().any { it != null }) {
}
else
repeat(rounds) { pumpAll() }
} else {
repeat(rounds) {
pumpAll()
}
}
}
/**

View File

@ -28,7 +28,8 @@ import java.util.*
* Sets up some nodes that can run protocols between each other, and exposes their progress trackers. Provides banks
* in a few cities around the world.
*/
abstract class Simulation(val runAsync: Boolean,
abstract class Simulation(val networkSendManuallyPumped: Boolean,
val runAsync: Boolean,
val latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) {
init {
if (!runAsync && latencyInjector != null)
@ -130,7 +131,7 @@ abstract class Simulation(val runAsync: Boolean,
}
}
val network = MockNetwork(runAsync)
val network = MockNetwork(networkSendManuallyPumped, runAsync)
// This one must come first.
val networkMap: SimulatedNode
= network.createNode(null, nodeFactory = NetworkMapNodeFactory, advertisedServices = NetworkMapService.Type) as SimulatedNode
@ -181,11 +182,16 @@ abstract class Simulation(val runAsync: Boolean,
* @return the message that was processed, or null if no node accepted a message in this round.
*/
open fun iterate(): InMemoryMessagingNetwork.MessageTransfer? {
if (networkSendManuallyPumped) {
network.messagingNetwork.pumpSend(false)
}
// Keep going until one of the nodes has something to do, or we have checked every node.
val endpoints = network.messagingNetwork.endpoints
var countDown = endpoints.size
while (countDown > 0) {
val handledMessage = endpoints[pumpCursor].pump(false)
val handledMessage = endpoints[pumpCursor].pumpReceive(false)
if (handledMessage != null)
return handledMessage
// If this node had nothing to do, advance the cursor with wraparound and try again.

View File

@ -19,7 +19,7 @@ import java.time.Instant
* Simulates a never ending series of trades that go pair-wise through the banks (e.g. A and B trade with each other,
* then B and C trade with each other, then C and A etc).
*/
class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(runAsync, latencyInjector) {
class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork.LatencyCalculator?) : Simulation(false, runAsync, latencyInjector) {
override fun startMainSimulation(): ListenableFuture<Unit> {
startTradingCircle { i, j -> tradeBetween(i, j) }
return Futures.immediateFailedFuture(UnsupportedOperationException("This future never completes"))

View File

@ -3,6 +3,7 @@ package com.r3corda.node.services.network
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.sha256
import com.r3corda.core.messaging.*
@ -29,7 +30,7 @@ import kotlin.concurrent.thread
* testing).
*/
@ThreadSafe
class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSerializeAsToken() {
companion object {
val MESSAGES_LOG_NAME = "messages"
private val log = LoggerFactory.getLogger(MESSAGES_LOG_NAME)
@ -42,11 +43,24 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
override fun toString() = "${message.topic} from '${sender.myAddress}' to '$recipients'"
}
// All sent messages are kept here until pumpSend is called, or manuallyPumped is set to false
// The corresponding sentMessages stream reflects when a message was pumpSend'd
private val messageSendQueue = LinkedBlockingQueue<MessageTransfer>()
private val _sentMessages = PublishSubject.create<MessageTransfer>()
/** A stream of (sender, message, recipients) triples */
val sentMessages: Observable<MessageTransfer>
get() = _sentMessages
// All messages are kept here until the messages are pumped off the queue by a caller to the node class.
// Queues are created on-demand when a message is sent to an address: the receiving node doesn't have to have
// been created yet. If the node identified by the given handle has gone away/been shut down then messages
// stack up here waiting for it to come back. The intent of this is to simulate a reliable messaging network.
private val messageQueues = HashMap<Handle, LinkedBlockingQueue<MessageTransfer>>()
// The corresponding stream reflects when a message was pumpReceive'd
private val messageReceiveQueues = HashMap<Handle, LinkedBlockingQueue<MessageTransfer>>()
private val _receivedMessages = PublishSubject.create<MessageTransfer>()
/** A stream of (sender, message, recipients) triples */
val receivedMessages: Observable<MessageTransfer>
get() = _receivedMessages
val endpoints: List<InMemoryMessaging> @Synchronized get() = handleEndpointMap.values.toList()
@ -79,11 +93,6 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"))
}
private val _allMessages = PublishSubject.create<MessageTransfer>()
/** A stream of (sender, message, recipients) triples */
val allMessages: Observable<MessageTransfer>
get() = _allMessages
interface LatencyCalculator {
fun between(sender: SingleMessageRecipient, receiver: SingleMessageRecipient): Duration
}
@ -95,30 +104,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
@Synchronized
private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) {
val transfer = MessageTransfer(from, message, recipients)
log.trace { transfer.toString() }
val calc = latencyCalculator
if (calc != null && recipients is SingleMessageRecipient) {
// Inject some artificial latency.
timer.schedule(calc.between(from.myAddress, recipients).toMillis()) {
msgSendInternal(transfer)
}
} else {
msgSendInternal(transfer)
}
}
private fun msgSendInternal(transfer: MessageTransfer) {
when (transfer.recipients) {
is Handle -> getQueueForHandle(transfer.recipients).add(transfer)
is AllPossibleRecipients -> {
// This means all possible recipients _that the network knows about at the time_, not literally everyone
// who joins into the indefinite future.
for (handle in handleEndpointMap.keys)
getQueueForHandle(handle).add(transfer)
}
else -> throw IllegalArgumentException("Unknown type of recipient handle")
}
messageSendQueue.add(transfer)
}
@Synchronized
@ -127,7 +113,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
}
@Synchronized
private fun getQueueForHandle(recipients: Handle) = messageQueues.getOrPut(recipients) { LinkedBlockingQueue() }
private fun getQueueForHandle(recipients: Handle) = messageReceiveQueues.getOrPut(recipients) { LinkedBlockingQueue() }
val everyoneOnline: AllPossibleRecipients = object : AllPossibleRecipients {}
@ -141,7 +127,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
node.stop()
handleEndpointMap.clear()
messageQueues.clear()
messageReceiveQueues.clear()
}
inner class Builder(val manuallyPumped: Boolean, val id: Handle) : MessagingServiceBuilder<InMemoryMessaging> {
@ -160,6 +146,46 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
override fun hashCode() = id.hashCode()
}
// If block is set to true this function will only return once a message has been pushed onto the recipients' queues
fun pumpSend(block: Boolean): MessageTransfer? {
val transfer = (if (block) messageSendQueue.take() else messageSendQueue.poll()) ?: return null
val recipients = transfer.recipients
val from = transfer.sender.myAddress
log.trace { transfer.toString() }
val calc = latencyCalculator
if (calc != null && recipients is SingleMessageRecipient) {
val messageSent = SettableFuture.create<Unit>()
// Inject some artificial latency.
timer.schedule(calc.between(from, recipients).toMillis()) {
pumpSendInternal(transfer)
messageSent.set(Unit)
}
if (block) {
messageSent.get()
}
} else {
pumpSendInternal(transfer)
}
return transfer
}
fun pumpSendInternal(transfer: MessageTransfer) {
when (transfer.recipients) {
is Handle -> getQueueForHandle(transfer.recipients).add(transfer)
is AllPossibleRecipients -> {
// This means all possible recipients _that the network knows about at the time_, not literally everyone
// who joins into the indefinite future.
for (handle in handleEndpointMap.keys)
getQueueForHandle(handle).add(transfer)
}
else -> throw IllegalArgumentException("Unknown type of recipient handle")
}
_sentMessages.onNext(transfer)
}
/**
* An [InMemoryMessaging] provides a [MessagingService] that isn't backed by any kind of network or disk storage
* system, but just uses regular queues on the heap instead. It is intended for unit testing and developer convenience
@ -177,7 +203,6 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
protected inner class InnerState {
val handlers: MutableList<Handler> = ArrayList()
val pendingRedelivery = LinkedList<MessageTransfer>()
}
protected val state = ThreadBox(InnerState())
@ -188,7 +213,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
thread(isDaemon = true, name = "In-memory message dispatcher") {
while (!Thread.currentThread().isInterrupted) {
try {
pumpInternal(true)
pumpReceiveInternal(true)
} catch(e: InterruptedException) {
break
}
@ -197,15 +222,9 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
check(running)
val (handler, items) = state.locked {
val handler = Handler(executor, topic, callback).apply { handlers.add(this) }
val items = ArrayList(pendingRedelivery)
pendingRedelivery.clear()
Pair(handler, items)
return state.locked {
Handler(executor, topic, callback).apply { handlers.add(this) }
}
for (it in items)
msgSend(this, it.message, handle)
return handler
}
override fun removeMessageHandler(registration: MessageHandlerRegistration) {
@ -216,6 +235,9 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
override fun send(message: Message, target: MessageRecipients) {
check(running)
msgSend(this, message, target)
if (!sendManuallyPumped) {
pumpSend(false)
}
}
override fun stop() {
@ -247,25 +269,20 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
*
* @return the message that was processed, if any in this round.
*/
fun pump(block: Boolean): MessageTransfer? {
fun pumpReceive(block: Boolean): MessageTransfer? {
check(manuallyPumped)
check(running)
return pumpInternal(block)
return pumpReceiveInternal(block)
}
private fun pumpInternal(block: Boolean): MessageTransfer? {
private fun pumpReceiveInternal(block: Boolean): MessageTransfer? {
val q = getQueueForHandle(handle)
val transfer = (if (block) q.take() else q.poll()) ?: return null
val deliverTo = state.locked {
val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic }
if (h.isEmpty()) {
// Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
// handler has been registered. The purpose of this path is to make unit tests that have multi-threading
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
// least sometimes.
pendingRedelivery.add(transfer)
loggerFor<InMemoryMessagingNetwork>().warn("No handlers for message ${transfer}")
return null
}
@ -276,7 +293,6 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
(handler.executor ?: MoreExecutors.directExecutor()).execute {
try {
_allMessages.onNext(transfer)
handler.callback(transfer.message, handler)
} catch(e: Exception) {
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topic}", e)
@ -284,6 +300,8 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
}
}
_receivedMessages.onNext(transfer)
return transfer
}
}

View File

@ -84,40 +84,4 @@ class InMemoryMessagingTests {
network.runNetwork(rounds = 1)
assertEquals(3, counter)
}
@Test
fun downAndUp() {
// Test (re)delivery of messages to nodes that aren't created yet, or were stopped and then restarted.
// The purpose of this functionality is to simulate a reliable messaging system that keeps trying until
// messages are delivered.
val node1 = network.createNode()
var node2 = network.createNode()
node1.net.send("test.topic", "hello!", node2.info.address)
network.runNetwork(rounds = 1) // No handler registered, so the message goes into a holding area.
var runCount = 0
node2.net.addMessageHandler("test.topic") { msg, registration ->
if (msg.data.deserialize<String>() == "hello!")
runCount++
}
network.runNetwork(rounds = 1) // Try again now the handler is registered
assertEquals(1, runCount)
// Shut node2 down for a while. Node 1 keeps sending it messages though.
node2.stop()
node1.net.send("test.topic", "are you there?", node2.info.address)
node1.net.send("test.topic", "wake up!", node2.info.address)
// Now re-create node2 with the same address as last time, and re-register a message handler.
// Check that the messages that were sent whilst it was gone are still there, waiting for it.
node2 = network.createNode(null, node2.id)
node2.net.addMessageHandler("test.topic") { a, b -> runCount++ }
network.runNetwork(rounds = 1)
assertEquals(2, runCount)
network.runNetwork(rounds = 1)
assertEquals(3, runCount)
network.runNetwork(rounds = 1)
assertEquals(3, runCount)
}
}

View File

@ -87,7 +87,7 @@ class TwoPartyTradeProtocolTests {
// We run this in parallel threads to help catch any race conditions that may exist. The other tests
// we run in the unit test thread exclusively to speed things up, ensure deterministic results and
// allow interruption half way through.
net = MockNetwork(true)
net = MockNetwork(false, true)
transactionGroupFor<ContractState> {
val notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
val aliceNode = net.createPartyNode(notaryNode.info, ALICE.name, ALICE_KEY)
@ -102,6 +102,15 @@ class TwoPartyTradeProtocolTests {
val buyerSessionID = random63BitValue()
// We start the Buyer first, as the Seller sends the first message
val bobResult = runBuyer(
bobNode.smm,
notaryNode.info,
aliceNode.net.myAddress,
1000.DOLLARS `issued by` issuer,
CommercialPaper.State::class.java,
buyerSessionID
)
val aliceResult = runSeller(
aliceNode.smm,
notaryNode.info,
@ -111,14 +120,6 @@ class TwoPartyTradeProtocolTests {
ALICE_KEY,
buyerSessionID
)
val bobResult = runBuyer(
bobNode.smm,
notaryNode.info,
aliceNode.net.myAddress,
1000.DOLLARS `issued by` issuer,
CommercialPaper.State::class.java,
buyerSessionID
)
// TODO: Verify that the result was inserted into the transaction database.
// assertEquals(bobResult.get(), aliceNode.storage.validatedTransactions[aliceResult.get().id])
@ -173,9 +174,9 @@ class TwoPartyTradeProtocolTests {
// Everything is on this thread so we can now step through the protocol one step at a time.
// Seller Alice already sent a message to Buyer Bob. Pump once:
fun pumpAlice() = (aliceNode.net as InMemoryMessagingNetwork.InMemoryMessaging).pump(false)
fun pumpAlice() = (aliceNode.net as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(false)
fun pumpBob() = (bobNode.net as InMemoryMessagingNetwork.InMemoryMessaging).pump(false)
fun pumpBob() = (bobNode.net as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(false)
pumpBob()

View File

@ -17,7 +17,7 @@ import java.util.*
class StateMachineManagerTests {
val checkpointStorage = RecordingCheckpointStorage()
val network = InMemoryMessagingNetwork().InMemoryMessaging(true, InMemoryMessagingNetwork.Handle(1, "mock"))
val network = InMemoryMessagingNetwork(false).InMemoryMessaging(true, InMemoryMessagingNetwork.Handle(1, "mock"))
val smm = createManager()
@After

View File

@ -10,7 +10,7 @@ class IRSSimulationTest {
@Test fun `runs to completion`() {
BriefLogFormatter.initVerbose("messaging")
val sim = IRSSimulation(false, null)
val sim = IRSSimulation(false, false, null)
val future = sim.start()
while (!future.isDone) sim.iterate()
try {