mirror of
synced 2025-03-25 21:38:06 +00:00
CORDA-716 Call stop on InMemoryMessagingNetwork (#2077)
* Inline code used by only 1 test * Remove superfluous interface * Warnings crusade * Inline Builder, remove unused method * Remove stop from interface * Register stops up-front
This commit is contained in:
@ -5,46 +5,45 @@ import net.corda.core.identity.Party
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.testing.node.network
import net.corda.testing.node.MockNetwork
import net.corda.testing.singleIdentity
import net.corda.testing.startFlow
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Test
class ReceiveMultipleFlowTests {
private val mockNet = MockNetwork()
private val nodes = (0..2).map { mockNet.createPartyNode() }
fun stopNodes() {
fun `receive all messages in parallel using map style`() {
network(3) { nodes ->
val doubleValue = 5.0
nodes[1].registerAnswer(AlgorithmDefinition::class, doubleValue)
val stringValue = "Thriller"
nodes[2].registerAnswer(AlgorithmDefinition::class, stringValue)
val flow = nodes[0].services.startFlow(ParallelAlgorithmMap(nodes[1].info.singleIdentity(), nodes[2].info.singleIdentity()))
val result = flow.resultFuture.getOrThrow()
assertThat(result).isEqualTo(doubleValue * stringValue.length)
val doubleValue = 5.0
nodes[1].registerAnswer(AlgorithmDefinition::class, doubleValue)
val stringValue = "Thriller"
nodes[2].registerAnswer(AlgorithmDefinition::class, stringValue)
val flow = nodes[0].services.startFlow(ParallelAlgorithmMap(nodes[1].info.singleIdentity(), nodes[2].info.singleIdentity()))
val result = flow.resultFuture.getOrThrow()
assertThat(result).isEqualTo(doubleValue * stringValue.length)
fun `receive all messages in parallel using list style`() {
network(3) { nodes ->
val value1 = 5.0
nodes[1].registerAnswer(ParallelAlgorithmList::class, value1)
val value2 = 6.0
nodes[2].registerAnswer(ParallelAlgorithmList::class, value2)
val flow = nodes[0].services.startFlow(ParallelAlgorithmList(nodes[1].info.singleIdentity(), nodes[2].info.singleIdentity()))
val data = flow.resultFuture.getOrThrow()
assertThat(data.fold(1.0) { a, b -> a * b }).isEqualTo(value1 * value2)
val value1 = 5.0
nodes[1].registerAnswer(ParallelAlgorithmList::class, value1)
val value2 = 6.0
nodes[2].registerAnswer(ParallelAlgorithmList::class, value2)
val flow = nodes[0].services.startFlow(ParallelAlgorithmList(nodes[1].info.singleIdentity(), nodes[2].info.singleIdentity()))
val data = flow.resultFuture.getOrThrow()
assertThat(data.fold(1.0) { a, b -> a * b }).isEqualTo(value1 * value2)
class ParallelAlgorithmMap(doubleMember: Party, stringMember: Party) : AlgorithmDefinition(doubleMember, stringMember) {
@ -213,11 +213,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader
runOnStop += network::stop
Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
@ -16,7 +16,6 @@ import net.corda.core.utilities.contextLogger
import net.corda.node.VersionInfo
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.serialization.KryoServerSerializationScheme
import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.NodeConfiguration
@ -206,14 +205,17 @@ open class Node(configuration: NodeConfiguration,
// Start up the MQ clients.
rpcMessagingClient.run {
start(rpcOps, userService)
runOnStop += this::stop
start(rpcOps, userService)
verifierMessagingClient?.run {
runOnStop += this::stop
(network as P2PMessagingClient).apply {
runOnStop += this::stop
(network as P2PMessagingClient).start()
@ -1,6 +1,5 @@
package net.corda.node.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.concurrent.openFuture
@ -133,14 +132,6 @@ interface MessagingService {
/** Returns an address that refers to this node. */
val myAddress: SingleMessageRecipient
* Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
* then this will block until all in-flight messages have finished being handled and acknowledged. If called
* from a thread that's a part of the [net.corda.node.utilities.AffinityExecutor] given to the constructor,
* it returns immediately and shutdown is asynchronous.
fun stop()
@ -323,7 +323,13 @@ class P2PMessagingClient(config: NodeConfiguration,
override fun stop() {
* Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
* then this will block until all in-flight messages have finished being handled and acknowledged. If called
* from a thread that's a part of the [net.corda.node.utilities.AffinityExecutor] given to the constructor,
* it returns immediately and shutdown is asynchronous.
fun stop() {
val running = state.locked {
// We allow stop() to be called without a run() in between, but it must have at least been started.
check(artemis.started != null)
@ -19,7 +19,6 @@ import net.corda.core.utilities.getOrThrow
import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT
import net.corda.node.services.messaging.RpcAuthContext
import net.corda.node.services.messaging.RpcPermissions
import rx.Observable
import java.io.InputStream
import java.security.PublicKey
import java.time.Instant
@ -1,6 +1,5 @@
package net.corda.testing.node
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.CompositeKey
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.ThreadBox
@ -10,7 +9,6 @@ import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.CordaSerializable
@ -33,19 +31,6 @@ import javax.annotation.concurrent.ThreadSafe
import kotlin.concurrent.schedule
import kotlin.concurrent.thread
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
* on the messaging service interface until you have successfully started up the system. One of these objects should
* be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations
* may let you cast the returned future to an object that lets you get status info.
* A specific implementation of the controller class will have extra features that let you customise it before starting
* it up.
interface MessagingServiceBuilder<out T : MessagingService> {
fun start(): CordaFuture<out T>
* An in-memory network allows you to manufacture [InMemoryMessaging]s for a set of participants. Each
* [InMemoryMessaging] maintains a queue of messages it has received, and a background thread that dispatches
@ -59,11 +44,11 @@ interface MessagingServiceBuilder<out T : MessagingService> {
class InMemoryMessagingNetwork(
val sendManuallyPumped: Boolean,
val servicePeerAllocationStrategy: ServicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random(),
private val servicePeerAllocationStrategy: ServicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random(),
private val messagesInFlight: ReusableLatch = ReusableLatch()
) : SingletonSerializeAsToken() {
companion object {
const val MESSAGES_LOG_NAME = "messages"
private const val MESSAGES_LOG_NAME = "messages"
private val log = LoggerFactory.getLogger(MESSAGES_LOG_NAME)
@ -103,38 +88,15 @@ class InMemoryMessagingNetwork(
get() = _receivedMessages
val endpoints: List<InMemoryMessaging> @Synchronized get() = handleEndpointMap.values.toList()
fun endpoint(peer: PeerHandle): InMemoryMessaging? = handleEndpointMap.get(peer)
* Creates a node and returns the new object that identifies its location on the network to senders, and the
* [InMemoryMessaging] that the recipient/in-memory node uses to receive messages and send messages itself.
* If [manuallyPumped] is set to true, then you are expected to call the [InMemoryMessaging.pump] method on the [InMemoryMessaging]
* in order to cause the delivery of a single message, which will occur on the thread of the caller. If set to false
* then this class will set up a background thread to deliver messages asynchronously, if the handler specifies no
* executor.
* @param persistenceTx a lambda to wrap message handling in a transaction if necessary. Defaults to a no-op.
fun createNode(manuallyPumped: Boolean,
executor: AffinityExecutor,
notaryService: PartyAndCertificate?,
database: CordaPersistence): Pair<PeerHandle, MessagingServiceBuilder<InMemoryMessaging>> {
check(counter >= 0) { "In memory network stopped: please recreate." }
val builder = createNodeWithID(manuallyPumped, counter, executor, notaryService, database = database) as Builder
val id = builder.id
return Pair(id, builder)
* Creates a node at the given address: useful if you want to recreate a node to simulate a restart.
* @param manuallyPumped see [createNode].
* @param manuallyPumped if set to true, then you are expected to call the [InMemoryMessaging.pump] method on the [InMemoryMessaging]
* in order to cause the delivery of a single message, which will occur on the thread of the caller. If set to false
* then this class will set up a background thread to deliver messages asynchronously, if the handler specifies no
* executor.
* @param id the numeric ID to use, e.g. set to whatever ID the node used last time.
* @param description text string that identifies this node for message logging (if is enabled) or null to autogenerate.
* @param persistenceTx a lambda to wrap message handling in a transaction if necessary.
fun createNodeWithID(
manuallyPumped: Boolean,
@ -143,12 +105,19 @@ class InMemoryMessagingNetwork(
notaryService: PartyAndCertificate?,
description: CordaX500Name = CordaX500Name(organisation = "In memory node $id", locality = "London", country = "UK"),
database: CordaPersistence)
: MessagingServiceBuilder<InMemoryMessaging> {
: InMemoryMessaging {
val peerHandle = PeerHandle(id, description)
peersMapping[peerHandle.description] = peerHandle // Assume that the same name - the same entity in MockNetwork.
notaryService?.let { if (it.owningKey !is CompositeKey) peersMapping[it.name] = peerHandle }
val serviceHandles = notaryService?.let { listOf(ServiceHandle(it.party)) } ?: emptyList() //TODO only notary can be distributed?
return Builder(manuallyPumped, peerHandle, serviceHandles, executor, database = database)
synchronized(this) {
val node = InMemoryMessaging(manuallyPumped, peerHandle, executor, database)
handleEndpointMap[peerHandle] = node
serviceHandles.forEach {
serviceToPeersMapping.getOrPut(it) { LinkedHashSet() }.add(peerHandle)
return node
interface LatencyCalculator {
@ -157,7 +126,7 @@ class InMemoryMessagingNetwork(
/** This can be set to an object which can inject artificial latency between sender/recipient pairs. */
var latencyCalculator: LatencyCalculator? = null
private var latencyCalculator: LatencyCalculator? = null
private val timer = Timer()
@ -197,25 +166,6 @@ class InMemoryMessagingNetwork(
inner class Builder(
val manuallyPumped: Boolean,
val id: PeerHandle,
val serviceHandles: List<ServiceHandle>,
val executor: AffinityExecutor,
val database: CordaPersistence) : MessagingServiceBuilder<InMemoryMessaging> {
override fun start(): CordaFuture<InMemoryMessaging> {
synchronized(this@InMemoryMessagingNetwork) {
val node = InMemoryMessaging(manuallyPumped, id, executor, database)
handleEndpointMap[id] = node
serviceHandles.forEach {
serviceToPeersMapping.getOrPut(it) { LinkedHashSet<PeerHandle>() }.add(id)
return doneFuture(node)
data class PeerHandle(val id: Int, val description: CordaX500Name) : SingleMessageRecipient {
override fun toString() = description.toString()
@ -240,7 +190,7 @@ class InMemoryMessagingNetwork(
class RoundRobin : ServicePeerAllocationStrategy() {
val previousPicks = HashMap<ServiceHandle, Int>()
private val previousPicks = HashMap<ServiceHandle, Int>()
override fun <A> pickNext(service: ServiceHandle, pickFrom: List<A>): A {
val nextIndex = previousPicks.compute(service) { _, previous ->
(previous?.plus(1) ?: 0) % pickFrom.size
@ -392,7 +342,7 @@ class InMemoryMessagingNetwork(
override fun stop() {
fun stop() {
if (backgroundThread != null) {
@ -463,9 +413,7 @@ class InMemoryMessagingNetwork(
private fun pumpReceiveInternal(block: Boolean): MessageTransfer? {
val q = getQueueForPeerHandle(peerHandle)
val next = getNextQueue(q, block) ?: return null
val (transfer, deliverTo) = next
val (transfer, deliverTo) = getNextQueue(q, block) ?: return null
if (transfer.message.uniqueMessageId !in processedMessages) {
executor.execute {
database.transaction {
@ -22,7 +22,6 @@ import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.StartedNode
import net.corda.node.internal.cordapp.CordappLoader
@ -45,8 +44,6 @@ import net.corda.testing.setGlobalSerialization
import net.corda.testing.testNodeConfiguration
import org.apache.activemq.artemis.utils.ReusableLatch
import org.apache.sshd.common.util.security.SecurityUtils
import org.slf4j.Logger
import java.io.Closeable
import java.math.BigInteger
import java.nio.file.Path
import java.security.KeyPair
@ -123,7 +120,7 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
private val defaultFactory: (MockNodeArgs) -> MockNode = defaultParameters.defaultFactory,
initialiseSerialization: Boolean = defaultParameters.initialiseSerialization,
private val notarySpecs: List<NotarySpec> = listOf(NotarySpec(DUMMY_NOTARY.name)),
private val cordappPackages: List<String> = defaultParameters.cordappPackages) : Closeable {
private val cordappPackages: List<String> = defaultParameters.cordappPackages) {
/** Helper constructor for creating a [MockNetwork] with custom parameters from Java. */
constructor(parameters: MockNetworkParameters) : this(defaultParameters = parameters)
@ -267,8 +264,7 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
database).also { runOnStop += it::stop }
fun setMessagingServiceSpy(messagingServiceSpy: MessagingServiceSpy) {
@ -424,6 +420,7 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
fun stopNodes() {
nodes.forEach { it.started?.dispose() }
// Test method to block until all scheduled activity, active flows
@ -432,22 +429,11 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
override fun close() {
data class NotarySpec(val name: CordaX500Name, val validating: Boolean = true) {
constructor(name: CordaX500Name) : this(name, validating = true)
fun network(nodesCount: Int, action: MockNetwork.(List<StartedNode<MockNetwork.MockNode>>) -> Unit) {
MockNetwork().use { mockNet ->
val nodes = (1..nodesCount).map { mockNet.createPartyNode() }
* Extend this class in order to intercept and modify messages passing through the [MessagingService] when using the [InMemoryMessagingNetwork].
Reference in New Issue
Block a user