mirror of
synced 2025-03-11 06:54:04 +00:00
Merge pull request #57 from corda/aslemmer-service-identity-sessions
Service identity sessions
This commit is contained in:
@ -8,7 +8,8 @@ import net.corda.core.random63BitValue
import net.corda.core.serialization.OpaqueBytes
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.node.driver.NodeInfoAndConfig
import net.corda.node.driver.DriverBasedTest
import net.corda.node.driver.NodeHandle
import net.corda.node.driver.driver
import net.corda.node.services.User
import net.corda.node.services.config.configureTestSSL
@ -24,32 +25,16 @@ import org.junit.Test
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread
class CordaRPCClientTest {
class CordaRPCClientTest : DriverBasedTest() {
private val rpcUser = User("user1", "test", permissions = setOf(startFlowPermission<CashFlow>()))
private val stopDriver = CountDownLatch(1)
private var driverThread: Thread? = null
private lateinit var client: CordaRPCClient
private lateinit var driverInfo: NodeInfoAndConfig
private lateinit var driverInfo: NodeHandle
fun start() {
val driverStarted = CountDownLatch(1)
driverThread = thread {
driver(isDebug = true) {
driverInfo = startNode(rpcUsers = listOf(rpcUser), advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type))).getOrThrow()
client = CordaRPCClient(toHostAndPort(driverInfo.nodeInfo.address), configureTestSSL())
fun stop() {
override fun setup() = driver(isDebug = true) {
driverInfo = startNode(rpcUsers = listOf(rpcUser), advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type))).getOrThrow()
client = CordaRPCClient(toHostAndPort(driverInfo.nodeInfo.address), configureTestSSL())
@ -19,6 +19,7 @@ import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.node.driver.DriverBasedTest
import net.corda.node.driver.driver
import net.corda.node.services.User
import net.corda.node.services.config.configureTestSSL
@ -29,19 +30,13 @@ import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.testing.expect
import net.corda.testing.expectEvents
import net.corda.testing.sequence
import org.junit.After
import org.junit.Before
import org.junit.Test
import rx.Observable
import rx.Observer
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread
class NodeMonitorModelTest {
class NodeMonitorModelTest : DriverBasedTest() {
lateinit var aliceNode: NodeInfo
lateinit var notaryNode: NodeInfo
val stopDriver = CountDownLatch(1)
var driverThread: Thread? = null
lateinit var stateMachineTransactionMapping: Observable<StateMachineTransactionMapping>
lateinit var stateMachineUpdates: Observable<StateMachineUpdate>
@ -52,40 +47,26 @@ class NodeMonitorModelTest {
lateinit var clientToService: Observer<CashCommand>
lateinit var newNode: (String) -> NodeInfo
fun start() {
val driverStarted = CountDownLatch(1)
driverThread = thread {
driver {
val cashUser = User("user1", "test", permissions = setOf(startFlowPermission<CashFlow>()))
val aliceNodeFuture = startNode("Alice", rpcUsers = listOf(cashUser))
val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
override fun setup() = driver {
val cashUser = User("user1", "test", permissions = setOf(startFlowPermission<CashFlow>()))
val aliceNodeFuture = startNode("Alice", rpcUsers = listOf(cashUser))
val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
aliceNode = aliceNodeFuture.getOrThrow().nodeInfo
notaryNode = notaryNodeFuture.getOrThrow().nodeInfo
newNode = { nodeName -> startNode(nodeName).getOrThrow().nodeInfo }
val monitor = NodeMonitorModel()
aliceNode = aliceNodeFuture.getOrThrow().nodeInfo
notaryNode = notaryNodeFuture.getOrThrow().nodeInfo
newNode = { nodeName -> startNode(nodeName).getOrThrow().nodeInfo }
val monitor = NodeMonitorModel()
stateMachineTransactionMapping = monitor.stateMachineTransactionMapping.bufferUntilSubscribed()
stateMachineUpdates = monitor.stateMachineUpdates.bufferUntilSubscribed()
progressTracking = monitor.progressTracking.bufferUntilSubscribed()
transactions = monitor.transactions.bufferUntilSubscribed()
vaultUpdates = monitor.vaultUpdates.bufferUntilSubscribed()
networkMapUpdates = monitor.networkMap.bufferUntilSubscribed()
clientToService = monitor.clientToService
stateMachineTransactionMapping = monitor.stateMachineTransactionMapping.bufferUntilSubscribed()
stateMachineUpdates = monitor.stateMachineUpdates.bufferUntilSubscribed()
progressTracking = monitor.progressTracking.bufferUntilSubscribed()
transactions = monitor.transactions.bufferUntilSubscribed()
vaultUpdates = monitor.vaultUpdates.bufferUntilSubscribed()
networkMapUpdates = monitor.networkMap.bufferUntilSubscribed()
clientToService = monitor.clientToService
monitor.register(ArtemisMessagingComponent.toHostAndPort(aliceNode.address), configureTestSSL(), cashUser.username, cashUser.password)
fun stop() {
monitor.register(ArtemisMessagingComponent.toHostAndPort(aliceNode.address), configureTestSSL(), cashUser.username, cashUser.password)
@ -23,8 +23,12 @@ data class StateMachineInfo(
sealed class StateMachineUpdate(val id: StateMachineRunId) {
class Added(val stateMachineInfo: StateMachineInfo) : StateMachineUpdate(stateMachineInfo.id)
class Removed(id: StateMachineRunId) : StateMachineUpdate(id)
class Added(val stateMachineInfo: StateMachineInfo) : StateMachineUpdate(stateMachineInfo.id) {
override fun toString() = "Added($id, ${stateMachineInfo.flowLogicClassName})"
class Removed(id: StateMachineRunId) : StateMachineUpdate(id) {
override fun toString() = "Removed($id)"
@ -4,6 +4,7 @@ import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.catch
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.DeserializeAsKotlinObjectDef
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
@ -79,6 +80,9 @@ interface MessagingService {
fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message
/** Given information about either a specific node or a service returns its corresponding address */
fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients
/** Returns an address that refers to this node. */
val myAddress: SingleMessageRecipient
@ -81,7 +81,6 @@ interface ServiceHub {
* Typical use is during signing in flows and for unit test signing.
val notaryIdentityKey: KeyPair get() = this.keyManagementService.toKeyPair(this.myInfo.notaryIdentity.owningKey.keys)
@ -5,9 +5,11 @@ import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.Contract
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.Party
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.MessagingService
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceEntry
import net.corda.core.randomOrNull
import rx.Observable
@ -63,31 +65,27 @@ interface NetworkMapCache {
/** Look up the node info for a legal name. */
fun getNodeByLegalName(name: String): NodeInfo? = partyNodes.singleOrNull { it.legalIdentity.name == name }
/** Look up the node info for a composite key. */
fun getNodeByCompositeKey(compositeKey: CompositeKey): NodeInfo? {
* In general, nodes can advertise multiple identities: a legal identity, and separate identities for each of
* the services it provides. In case of a distributed service – run by multiple nodes – each participant advertises
* the identity of the *whole group*.
/** Look up the node info for a specific peer key. */
fun getNodeByLegalIdentityKey(compositeKey: CompositeKey): NodeInfo? {
// Although we should never have more than one match, it is theoretically possible. Report an error if it happens.
val candidates = partyNodes.filter {
(it.legalIdentity.owningKey == compositeKey)
|| it.advertisedServices.any { it.identity.owningKey == compositeKey }
val candidates = partyNodes.filter { it.legalIdentity.owningKey == compositeKey }
check(candidates.size <= 1) { "Found more than one match for key $compositeKey" }
return candidates.singleOrNull()
* Given a [party], returns a node advertising it as an identity. If more than one node found the result
* is chosen at random.
* In general, nodes can advertise multiple identities: a legal identity, and separate identities for each of
* the services it provides. In case of a distributed service – run by multiple nodes – each participant advertises
* the identity of the *whole group*. If the provided [party] is a group identity, multiple nodes advertising it
* will be found, and this method will return a randomly chosen one. If [party] is an individual (legal) identity,
* we currently assume that it will be advertised by one node only, which will be returned as the result.
fun getRepresentativeNode(party: Party): NodeInfo? {
return partyNodes.randomOrNull { it.legalIdentity == party || it.advertisedServices.any { it.identity == party } }
/** Look up all nodes advertising the service owned by [compositeKey] */
fun getNodesByAdvertisedServiceIdentityKey(compositeKey: CompositeKey): List<NodeInfo> {
return partyNodes.filter { it.advertisedServices.any { it.identity.owningKey == compositeKey } }
/** Returns information about the party, which may be a specific node or a service */
fun getPartyInfo(party: Party): PartyInfo?
/** Gets a notary identity by the given name. */
fun getNotary(name: String): Party? {
val notaryNode = notaryNodes.randomOrNull {
@ -0,0 +1,18 @@
package net.corda.core.node.services
import net.corda.core.crypto.Party
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceEntry
* Holds information about a [Party], which may refer to either a specific node or a service.
sealed class PartyInfo() {
abstract val party: Party
class Node(val node: NodeInfo) : PartyInfo() {
override val party = node.legalIdentity
class Service(val service: ServiceEntry) : PartyInfo() {
override val party = service.identity
@ -72,7 +72,7 @@ abstract class AbstractStateReplacementFlow<T> {
private fun collectSignatures(participants: List<CompositeKey>, stx: SignedTransaction): List<DigitalSignature.WithKey> {
val parties = participants.map {
val participantNode = serviceHub.networkMapCache.getNodeByCompositeKey(it) ?:
val participantNode = serviceHub.networkMapCache.getNodeByLegalIdentityKey(it) ?:
throw IllegalStateException("Participant $it to state $originalState not found on the network")
@ -39,7 +39,6 @@ The network map currently supports:
* Looking up node for a party
* Suggesting a node providing a specific service, based on suitability for a contract and parties, for example suggesting
an appropriate interest rates oracle for a interest rate swap contract. Currently no recommendation logic is in place.
The code simply picks the first registered node that supports the required service.
Message queues
@ -58,6 +57,14 @@ for maintenance and other minor purposes.
creates a bridge from this queue to the peer's ``p2p.inbound`` queue, using the network map service to lookup the
peer's network address.
These are private queues the node may use to route messages to services. The queue name ends in the base 58 encoding
of the service's owning identity key. There is at most one queue per service identity (but note that any one service
may have several identities). The broker creates bridges to all nodes in the network advertising the service in
question. When a session is initiated with a service counterparty the handshake is pushed onto this queue, and a
corresponding bridge is used to forward the message to an advertising peer's p2p queue. Once a peer is picked the
session continues on as normal.
This is another private queue just for the node which functions in a similar manner to the ``internal.peers.*`` queues
except this is used to form a connection to the network map node. The node running the network map service is treated
@ -0,0 +1,138 @@
package net.corda.node.services
import net.corda.core.bufferUntilSubscribed
import net.corda.core.contracts.POUNDS
import net.corda.core.contracts.issuedBy
import net.corda.core.crypto.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.OpaqueBytes
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.CashFlowResult
import net.corda.node.driver.DriverBasedTest
import net.corda.node.driver.NodeHandle
import net.corda.node.driver.driver
import net.corda.node.services.config.configureTestSSL
import net.corda.node.services.messaging.ArtemisMessagingComponent
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.testing.expect
import net.corda.testing.expectEvents
import net.corda.testing.replicate
import org.junit.Test
import rx.Observable
import java.util.*
import kotlin.test.assertEquals
class RaftValidatingNotaryServiceTests : DriverBasedTest() {
lateinit var alice: NodeInfo
lateinit var notaries: List<NodeHandle>
lateinit var aliceProxy: CordaRPCOps
lateinit var raftNotaryIdentity: Party
lateinit var notaryStateMachines: Observable<Pair<NodeInfo, StateMachineUpdate>>
override fun setup() = driver {
// Start Alice and 3 raft notaries
val clusterSize = 3
val testUser = User("test", "test", permissions = setOf(startFlowPermission<CashFlow>()))
val aliceFuture = startNode("Alice", rpcUsers = listOf(testUser))
val notariesFuture = startNotaryCluster(
rpcUsers = listOf(testUser),
clusterSize = clusterSize,
type = RaftValidatingNotaryService.type
alice = aliceFuture.get().nodeInfo
val (notaryIdentity, notaryNodes) = notariesFuture.get()
raftNotaryIdentity = notaryIdentity
notaries = notaryNodes
assertEquals(notaries.size, clusterSize)
assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size)
// Connect to Alice and the notaries
fun connectRpc(node: NodeInfo): CordaRPCOps {
val client = CordaRPCClient(ArtemisMessagingComponent.toHostAndPort(node.address), configureTestSSL())
client.start("test", "test")
return client.proxy()
aliceProxy = connectRpc(alice)
val notaryProxies = notaries.map { connectRpc(it.nodeInfo) }
notaryStateMachines = Observable.from(notaryProxies.map { proxy ->
proxy.stateMachinesAndUpdates().second.map { Pair(proxy.nodeIdentity(), it) }
}).flatMap { it.onErrorResumeNext(Observable.empty()) }.bufferUntilSubscribed()
fun `notarisation requests are distributed evenly in raft cluster`() {
// Issue 100 pounds, then pay ourselves 50x2 pounds
val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity))
require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
for (i in 1..50) {
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(2.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
// The state machines added in the notaries should map one-to-one to notarisation requests
val notarisationsPerNotary = HashMap<Party, Int>()
notaryStateMachines.expectEvents(isStrict = false) {
replicate<Pair<NodeInfo, StateMachineUpdate>>(50) {
expect(match = { it.second is StateMachineUpdate.Added }) {
val (notary, update) = it
update as StateMachineUpdate.Added
notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 }
// The distribution of requests should be very close to sg like 16/17/17 as by default artemis does round robin
println("Notarisation distribution: $notarisationsPerNotary")
require(notarisationsPerNotary.size == 3)
// We allow some leeway for artemis as it doesn't always produce perfect distribution
require(notarisationsPerNotary.values.all { it > 10 })
fun `cluster survives if a notary is killed`() {
// Issue 100 pounds, then pay ourselves 10x5 pounds
val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity))
require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
for (i in 1..10) {
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
// Now kill a notary
with(notaries[0].process) {
// Pay ourselves another 10x5 pounds
for (i in 1..10) {
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
// Artemis still dispatches some requests to the dead notary but all others should go through.
val notarisationsPerNotary = HashMap<Party, Int>()
notaryStateMachines.expectEvents(isStrict = false) {
replicate<Pair<NodeInfo, StateMachineUpdate>>(15) {
expect(match = { it.second is StateMachineUpdate.Added }) {
val (notary, update) = it
update as StateMachineUpdate.Added
notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 }
println("Notarisation distribution: $notarisationsPerNotary")
require(notarisationsPerNotary.size == 3)
@ -1,6 +1,7 @@
package net.corda.services.messaging
import net.corda.node.driver.driver
import net.corda.node.driver.getTimestampAsDirectoryName
import org.junit.Test
import java.nio.file.Paths
import java.time.Instant
@ -22,8 +23,4 @@ class ArtemisMessagingServerTest {
arrayOf(startNode("NodeA"), startNode("NodeB"), startNode("Notary")).forEach { it.get(5, TimeUnit.MINUTES) }
private fun getTimestampAsDirectoryName(): String {
return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(ZoneOffset.UTC).format(Instant.now())
@ -5,12 +5,12 @@ package net.corda.node.driver
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import net.corda.core.ThreadBox
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.div
import net.corda.core.future
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
@ -33,21 +33,20 @@ import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.assertEquals
* This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
* The process the driver is run in behaves as an Artemis client and starts up other processes. Namely it first
* bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes.
* TODO The driver actually starts up as an Artemis server now that may route traffic. Fix this once the client MessagingService is done.
* TODO The nodes are started up sequentially which is quite slow. Either speed up node startup or make startup parallel somehow.
* TODO The driver now polls the network map cache for info about newly started up nodes, this could be done asynchronously(?).
* TODO The network map service bootstrap is hacky (needs to fake the service's public key in order to retrieve the true one), needs some thought.
private val log: Logger = loggerFor<DriverDSL>()
@ -68,7 +67,7 @@ interface DriverDSLExposedInterface {
fun startNode(providedName: String? = null,
advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(),
customOverrides: Map<String, Any?> = emptyMap()): Future<NodeInfoAndConfig>
customOverrides: Map<String, Any?> = emptyMap()): Future<NodeHandle>
* Starts a distributed notary cluster.
@ -76,8 +75,14 @@ interface DriverDSLExposedInterface {
* @param notaryName The legal name of the advertised distributed notary service.
* @param clusterSize Number of nodes to create for the cluster.
* @param type The advertised notary service type. Currently the only supported type is [RaftValidatingNotaryService.type].
* @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list.
* @return The [Party] identity of the distributed notary service, and the [NodeInfo]s of the notaries in the cluster.
fun startNotaryCluster(notaryName: String, clusterSize: Int = 3, type: ServiceType = RaftValidatingNotaryService.type)
fun startNotaryCluster(
notaryName: String,
clusterSize: Int = 3,
type: ServiceType = RaftValidatingNotaryService.type,
rpcUsers: List<User> = emptyList()): Future<Pair<Party, List<NodeHandle>>>
fun waitForAllNodesToFinish()
@ -87,7 +92,11 @@ interface DriverDSLInternalInterface : DriverDSLExposedInterface {
fun shutdown()
data class NodeInfoAndConfig(val nodeInfo: NodeInfo, val config: Config)
data class NodeHandle(
val nodeInfo: NodeInfo,
val config: Config,
val process: Process
sealed class PortAllocation {
abstract fun nextPort(): Int
@ -120,7 +129,7 @@ sealed class PortAllocation {
* Note that [DriverDSL.startNode] does not wait for the node to start up synchronously, but rather returns a [Future]
* of the [NodeInfo] that may be waited on, which completes when the new node registered with the network map service.
* The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache].
* The driver implicitly bootstraps a [NetworkMapService].
* @param driverDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node
* directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to "<randomName>-<messagingPort>"
@ -176,6 +185,9 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
return returnValue
} catch (exception: Throwable) {
println("Driver shutting down because of exception $exception")
throw exception
} finally {
if (shutdownHook != null) {
@ -184,7 +196,7 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
private fun getTimestampAsDirectoryName(): String {
fun getTimestampAsDirectoryName(): String {
return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(UTC).format(Instant.now())
@ -313,7 +325,7 @@ open class DriverDSL(
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>,
rpcUsers: List<User>, customOverrides: Map<String, Any?>): Future<NodeInfoAndConfig> {
rpcUsers: List<User>, customOverrides: Map<String, Any?>): ListenableFuture<NodeHandle> {
val messagingAddress = portAllocation.nextHostAndPort()
val apiAddress = portAllocation.nextHostAndPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
@ -344,12 +356,18 @@ open class DriverDSL(
return future {
registerProcess(DriverDSL.startNode(FullNodeConfiguration(config), quasarJarPath, debugPort))
NodeInfoAndConfig(queryNodeInfo(apiAddress)!!, config)
val process = DriverDSL.startNode(FullNodeConfiguration(config), quasarJarPath, debugPort)
NodeHandle(queryNodeInfo(apiAddress)!!, config, process)
override fun startNotaryCluster(notaryName: String, clusterSize: Int, type: ServiceType) {
override fun startNotaryCluster(
notaryName: String,
clusterSize: Int,
type: ServiceType,
rpcUsers: List<User>
): ListenableFuture<Pair<Party, List<NodeHandle>>> {
val nodeNames = (1..clusterSize).map { "Notary Node $it" }
val paths = nodeNames.map { driverDirectory / it }
ServiceIdentityGenerator.generateToDisk(paths, type.id, notaryName)
@ -359,12 +377,19 @@ open class DriverDSL(
val notaryClusterAddress = portAllocation.nextHostAndPort()
// Start the first node that will bootstrap the cluster
startNode(nodeNames.first(), advertisedService, emptyList(), mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
val firstNotaryFuture = startNode(nodeNames.first(), advertisedService, rpcUsers, mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
// All other nodes will join the cluster
nodeNames.drop(1).forEach {
val restNotaryFutures = nodeNames.drop(1).map {
val nodeAddress = portAllocation.nextHostAndPort()
val configOverride = mapOf("notaryNodeAddress" to nodeAddress.toString(), "notaryClusterAddresses" to listOf(notaryClusterAddress.toString()))
startNode(it, advertisedService, emptyList(), configOverride)
startNode(it, advertisedService, rpcUsers, configOverride)
return firstNotaryFuture.flatMap { firstNotary ->
val notaryParty = firstNotary.nodeInfo.notaryIdentity
Futures.allAsList(restNotaryFutures).map { restNotaries ->
Pair(notaryParty, listOf(firstNotary) + restNotaries)
@ -0,0 +1,39 @@
package net.corda.node.driver
import org.junit.After
import org.junit.Before
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread
abstract class DriverBasedTest {
private val stopDriver = CountDownLatch(1)
private var driverThread: Thread? = null
private lateinit var driverStarted: CountDownLatch
protected sealed class RunTestToken {
internal object Token : RunTestToken()
protected abstract fun setup(): RunTestToken
protected fun DriverDSLExposedInterface.runTest(): RunTestToken {
return RunTestToken.Token
fun start() {
driverStarted = CountDownLatch(1)
driverThread = thread {
fun stop() {
@ -277,7 +277,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
* A service entry contains the advertised [ServiceInfo] along with the service identity. The identity *name* is
* taken from the configuration or, if non specified, generated by combining the node's legal name and the service id.
private fun makeServiceEntries(): List<ServiceEntry> {
protected fun makeServiceEntries(): List<ServiceEntry> {
return advertisedServices.map {
val serviceId = it.type.id
val serviceName = it.name ?: "$serviceId|${configuration.myLegalName}"
@ -3,6 +3,7 @@ package net.corda.node.services.messaging
import com.google.common.annotations.VisibleForTesting
import com.google.common.net.HostAndPort
import net.corda.core.crypto.CompositeKey
import net.corda.core.messaging.MessageRecipientGroup
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.read
@ -34,6 +35,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
const val INTERNAL_PREFIX = "internal."
const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers."
const val SERVICES_PREFIX = "${INTERNAL_PREFIX}services."
const val CLIENTS_PREFIX = "clients."
const val P2P_QUEUE = "p2p.inbound"
const val RPC_REQUESTS_QUEUE = "rpc.requests"
@ -50,17 +52,20 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
fun toHostAndPort(target: MessageRecipients): HostAndPort {
val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address")
val addr = target as? ArtemisMessagingComponent.ArtemisPeerAddress ?: throw IllegalArgumentException("Not an Artemis address")
return addr.hostAndPort
protected interface ArtemisAddress {
protected interface ArtemisAddress : MessageRecipients {
val queueName: SimpleString
protected interface ArtemisPeerAddress : ArtemisAddress, SingleMessageRecipient {
val hostAndPort: HostAndPort
data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisPeerAddress {
override val queueName: SimpleString get() = NETWORK_MAP_ADDRESS
@ -68,10 +73,36 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
* This is the class used to implement [SingleMessageRecipient], for now. Note that in future this class
* may change or evolve and code that relies upon it being a simple host/port may not function correctly.
* For instance it may contain onion routing data.
* [NodeAddress] identifies a specific peer node and an associated queue. The queue may be the peer's p2p queue or
* an advertised service's queue.
* @param queueName The name of the queue this address is associated with.
* @param hostAndPort The address of the node.
data class NodeAddress(val identity: CompositeKey, override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
override val queueName: SimpleString = SimpleString("$PEERS_PREFIX${identity.toBase58String()}")
override fun toString(): String = "${javaClass.simpleName}(identity = $queueName, $hostAndPort)"
data class NodeAddress(override val queueName: SimpleString, override val hostAndPort: HostAndPort) : ArtemisPeerAddress {
companion object {
fun asPeer(peerIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress {
return NodeAddress(SimpleString("$PEERS_PREFIX${peerIdentity.toBase58String()}"), hostAndPort)
fun asService(serviceIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress {
return NodeAddress(SimpleString("$SERVICES_PREFIX${serviceIdentity.toBase58String()}"), hostAndPort)
override fun toString(): String = "${javaClass.simpleName}(queue = $queueName, $hostAndPort)"
* [ServiceAddress] implements [MessageRecipientGroup]. It holds a queue associated with a service advertised by
* zero or more nodes. Each advertising node has an associated consumer.
* By sending to such an address Artemis will pick a consumer (uses Round Robin by default) and sends the message
* there. We use this to establish sessions involving service counterparties.
* @param identity The service identity's owning key.
data class ServiceAddress(val identity: CompositeKey) : ArtemisAddress, MessageRecipientGroup {
override val queueName: SimpleString = SimpleString("$SERVICES_PREFIX${identity.toBase58String()}")
/** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */
@ -9,6 +9,7 @@ import net.corda.core.crypto.X509Utilities.CORDA_CLIENT_CA
import net.corda.core.crypto.X509Utilities.CORDA_ROOT_CA
import net.corda.core.crypto.newSecureRandom
import net.corda.core.div
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.utilities.debug
@ -92,7 +93,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
fun start() = mutex.locked {
if (!running) {
networkChangeHandle = networkMapCache.changed.subscribe { destroyOrCreateBridge(it) }
networkChangeHandle = networkMapCache.changed.subscribe { destroyOrCreateBridges(it) }
running = true
@ -120,14 +121,38 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
* We create the bridges indirectly now because the network map is not persisted and there are no ways to obtain host and port information on startup.
* TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service.
private fun destroyOrCreateBridge(change: MapChange) {
val (newNode, staleNode) = when (change) {
is MapChange.Modified -> change.node to change.previousNode
is MapChange.Removed -> null to change.node
is MapChange.Added -> change.node to null
private fun destroyOrCreateBridges(change: MapChange) {
fun addAddresses(node: NodeInfo, targets: MutableSet<ArtemisPeerAddress>) {
// Add the node's address with the p2p queue.
val nodeAddress = node.address as ArtemisPeerAddress
// Add the node's address with service queues, one per service.
node.advertisedServices.forEach {
targets.add(NodeAddress.asService(it.identity.owningKey, nodeAddress.hostAndPort))
val addressesToCreateBridgesTo = HashSet<ArtemisPeerAddress>()
val addressesToRemoveBridgesFrom = HashSet<ArtemisPeerAddress>()
when (change) {
is MapChange.Modified -> {
addAddresses(change.node, addressesToCreateBridgesTo)
addAddresses(change.previousNode, addressesToRemoveBridgesFrom)
is MapChange.Removed -> {
addAddresses(change.node, addressesToRemoveBridgesFrom)
is MapChange.Added -> {
addAddresses(change.node, addressesToCreateBridgesTo)
(addressesToRemoveBridgesFrom - addressesToCreateBridgesTo).forEach {
addressesToCreateBridgesTo.forEach {
if (activeMQServer.queueQuery(it.queueName).isExists) maybeDeployBridgeForAddress(it)
(staleNode?.address as? ArtemisAddress)?.let { maybeDestroyBridge(it.queueName) }
(newNode?.address as? ArtemisAddress)?.let { if (activeMQServer.queueQuery(it.queueName).isExists) maybeDeployBridgeForAddress(it) }
private fun configureAndStartServer() {
@ -138,31 +163,48 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
registerActivationFailureListener { exception -> throw exception }
// Some types of queue might need special preparation on our side, like dialling back or preparing
// a lazily initialised subsystem.
registerPostQueueCreationCallback { deployBridgeFromNewPeerQueue(it) }
registerPostQueueCreationCallback { deployBridgeFromNewQueue(it) }
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
printBasicNodeInfo("Node listening on address", myHostPort.toString())
private fun deployBridgeFromNewPeerQueue(queueName: SimpleString) {
log.debug { "Queue created: $queueName" }
if (!queueName.startsWith(PEERS_PREFIX)) return
try {
val identity = CompositeKey.parseFromBase58(queueName.substring(PEERS_PREFIX.length))
val nodeInfo = networkMapCache.getNodeByCompositeKey(identity)
if (nodeInfo != null) {
val address = nodeInfo.address
if (address is NodeAddress) {
private fun maybeDeployBridgeForNode(queueName: SimpleString, nodeInfo: NodeInfo) {
log.debug("Deploying bridge for $queueName to $nodeInfo")
val address = nodeInfo.address
if (address is NodeAddress) {
maybeDeployBridgeForAddress(NodeAddress(queueName, address.hostAndPort))
} else {
log.error("Don't know how to deal with $address")
private fun deployBridgeFromNewQueue(queueName: SimpleString) {
log.debug { "Queue created: $queueName, deploying bridge(s)" }
when {
queueName.startsWith(PEERS_PREFIX) -> try {
val identity = CompositeKey.parseFromBase58(queueName.substring(PEERS_PREFIX.length))
val nodeInfo = networkMapCache.getNodeByLegalIdentityKey(identity)
if (nodeInfo != null) {
maybeDeployBridgeForNode(queueName, nodeInfo)
} else {
log.error("Don't know how to deal with $address")
log.error("Queue created for a peer that we don't know from the network map: $queueName")
} else {
log.error("Queue created for a peer that we don't know from the network map: $queueName")
} catch (e: AddressFormatException) {
log.error("Flow violation: Could not parse peer queue name as Base 58: $queueName")
queueName.startsWith(SERVICES_PREFIX) -> try {
val identity = CompositeKey.parseFromBase58(queueName.substring(SERVICES_PREFIX.length))
val nodeInfos = networkMapCache.getNodesByAdvertisedServiceIdentityKey(identity)
// Create a bridge for each node advertising the service.
for (nodeInfo in nodeInfos) {
maybeDeployBridgeForNode(queueName, nodeInfo)
} catch (e: AddressFormatException) {
log.error("Flow violation: Could not parse service queue name as Base 58: $queueName")
} catch (e: AddressFormatException) {
log.error("Flow violation: Could not parse queue name as Base 58: $queueName")
@ -240,29 +282,32 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
tcpTransport(OUTBOUND, hostAndPort.hostText, hostAndPort.port)
private fun bridgeExists(name: SimpleString) = activeMQServer.clusterManager.bridges.containsKey(name.toString())
private fun bridgeExists(name: String) = activeMQServer.clusterManager.bridges.containsKey(name)
private fun maybeDeployBridgeForAddress(address: ArtemisAddress) {
if (!connectorExists(address.hostAndPort)) {
private fun maybeDeployBridgeForAddress(peerAddress: ArtemisPeerAddress) {
if (!connectorExists(peerAddress.hostAndPort)) {
if (!bridgeExists(address.queueName)) {
val bridgeName = bridgeNameForAddress(peerAddress)
if (!bridgeExists(bridgeName)) {
deployBridge(bridgeName, peerAddress)
private fun bridgeNameForAddress(peerAddress: ArtemisPeerAddress) = "${peerAddress.queueName}-${peerAddress.hostAndPort}"
* All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
* messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it,
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
* P2P address.
private fun deployBridge(address: ArtemisAddress) {
private fun deployBridge(bridgeName: String, peerAddress: ArtemisPeerAddress) {
activeMQServer.deployBridge(BridgeConfiguration().apply {
name = address.queueName.toString()
queueName = address.queueName.toString()
name = bridgeName
queueName = peerAddress.queueName.toString()
forwardingAddress = P2P_QUEUE
staticConnectors = listOf(address.hostAndPort.toString())
staticConnectors = listOf(peerAddress.hostAndPort.toString())
confirmationWindowSize = 100000 // a guess
isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic
// As a peer of the target node we must connect to it using the peer user. Actual authentication is done using
@ -272,9 +317,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private fun maybeDestroyBridge(name: SimpleString) {
private fun maybeDestroyBridge(name: String) {
if (bridgeExists(name)) {
@ -5,6 +5,7 @@ import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.ThreadBox
import net.corda.core.crypto.CompositeKey
import net.corda.core.messaging.*
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.opaque
import net.corda.core.success
@ -96,7 +97,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
* Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache.
override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort)
override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress.asPeer(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort)
private val state = ThreadBox(InnerState())
private val handlers = CopyOnWriteArrayList<Handler>()
@ -449,4 +450,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
return when (partyInfo) {
is PartyInfo.Node -> partyInfo.node.address
is PartyInfo.Service -> ArtemisMessagingComponent.ServiceAddress(partyInfo.service.identity.owningKey)
@ -200,11 +200,11 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
read = { kryo, input ->
CompositeKey.parseFromBase58(kryo.readObject(input, String::class.java)),
kryo.readObject(input, SimpleString::class.java),
kryo.readObject(input, HostAndPort::class.java))
write = { kryo, output, nodeAddress ->
kryo.writeObject(output, nodeAddress.identity.toBase58String())
kryo.writeObject(output, nodeAddress.queueName)
kryo.writeObject(output, nodeAddress.hostAndPort)
@ -14,6 +14,7 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.NetworkCacheError
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
@ -52,6 +53,21 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
private var registeredForPush = false
protected var registeredNodes: MutableMap<Party, NodeInfo> = Collections.synchronizedMap(HashMap<Party, NodeInfo>())
override fun getPartyInfo(party: Party): PartyInfo? {
val node = registeredNodes[party]
if (node != null) {
return PartyInfo.Node(node)
for (entry in registeredNodes) {
for (service in entry.value.advertisedServices) {
if (service.identity == party) {
return PartyInfo.Service(service)
return null
override fun track(): Pair<List<NodeInfo>, Observable<MapChange>> {
synchronized(_changed) {
return Pair(partyNodes, _changed.bufferUntilSubscribed())
@ -154,9 +154,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
private fun createSessionData(session: FlowSession, payload: Any): SessionData {
val otherPartySessionId = session.otherPartySessionId
?: throw IllegalStateException("We've somehow held onto an unconfirmed session: $session")
return SessionData(otherPartySessionId, payload)
val sessionState = session.state
val peerSessionId = when (sessionState) {
is StateMachineManager.FlowSessionState.Initiating -> throw IllegalStateException("We've somehow held onto an unconfirmed session: $session")
is StateMachineManager.FlowSessionState.Initiated -> sessionState.peerSessionId
return SessionData(peerSessionId, payload)
@ -166,10 +169,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
private inline fun <reified M : SessionMessage> receiveInternal(session: FlowSession): M {
return suspendAndExpectReceive(ReceiveOnly(session, M::class.java))
return suspendAndExpectReceive(ReceiveOnly(session, M::class.java)).message
private inline fun <reified M : SessionMessage> sendAndReceiveInternal(session: FlowSession, message: SessionMessage): M {
return suspendAndExpectReceive(SendAndReceive(session, message, M::class.java)).message
private inline fun <reified M : SessionMessage> sendAndReceiveInternalWithParty(session: FlowSession, message: SessionMessage): ReceivedSessionMessage<M> {
return suspendAndExpectReceive(SendAndReceive(session, message, M::class.java))
@ -191,26 +198,25 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
private fun startNewSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?): FlowSession {
val node = serviceHub.networkMapCache.getRepresentativeNode(otherParty) ?: throw IllegalArgumentException("Don't know about party $otherParty")
val nodeIdentity = node.legalIdentity
logger.trace { "Initiating a new session with $nodeIdentity (representative of $otherParty)" }
val session = FlowSession(sessionFlow, nodeIdentity, random63BitValue(), null)
openSessions[Pair(sessionFlow, nodeIdentity)] = session
val counterpartyFlow = sessionFlow.getCounterpartyMarker(nodeIdentity).name
logger.trace { "Initiating a new session with $otherParty" }
val session = FlowSession(sessionFlow, random63BitValue(), FlowSessionState.Initiating(otherParty))
openSessions[Pair(sessionFlow, otherParty)] = session
val counterpartyFlow = sessionFlow.getCounterpartyMarker(otherParty).name
val sessionInit = SessionInit(session.ourSessionId, counterpartyFlow, firstPayload)
val sessionInitResponse = sendAndReceiveInternal<SessionInitResponse>(session, sessionInit)
val (peerParty, sessionInitResponse) = sendAndReceiveInternalWithParty<SessionInitResponse>(session, sessionInit)
if (sessionInitResponse is SessionConfirm) {
session.otherPartySessionId = sessionInitResponse.initiatedSessionId
require(session.state is FlowSessionState.Initiating)
session.state = FlowSessionState.Initiated(peerParty, sessionInitResponse.initiatedSessionId)
return session
} else {
sessionInitResponse as SessionReject
throw FlowSessionException("Party $nodeIdentity rejected session attempt: ${sessionInitResponse.errorMessage}")
throw FlowSessionException("Party $otherParty rejected session attempt: ${sessionInitResponse.errorMessage}")
private fun <M : SessionMessage> suspendAndExpectReceive(receiveRequest: ReceiveRequest<M>): M {
fun getReceivedMessage(): ExistingSessionMessage? = receiveRequest.session.receivedMessages.poll()
private fun <M : SessionMessage> suspendAndExpectReceive(receiveRequest: ReceiveRequest<M>): ReceivedSessionMessage<M> {
fun getReceivedMessage(): ReceivedSessionMessage<ExistingSessionMessage>? = receiveRequest.session.receivedMessages.poll()
val polledMessage = getReceivedMessage()
val receivedMessage = if (polledMessage != null) {
@ -226,11 +232,11 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
?: throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but got nothing: $receiveRequest")
if (receivedMessage is SessionEnd) {
if (receivedMessage.message is SessionEnd) {
throw FlowSessionException("Counterparty on ${receiveRequest.session.otherParty} has prematurely ended on $receiveRequest")
} else if (receiveRequest.receiveType.isInstance(receivedMessage)) {
return receiveRequest.receiveType.cast(receivedMessage)
throw FlowSessionException("Counterparty on ${receiveRequest.session.state.sendToParty} has prematurely ended on $receiveRequest")
} else if (receiveRequest.receiveType.isInstance(receivedMessage.message)) {
return ReceivedSessionMessage(receivedMessage.sendingParty, receiveRequest.receiveType.cast(receivedMessage.message))
} else {
throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but got $receivedMessage: $receiveRequest")
@ -28,6 +28,8 @@ import net.corda.core.utilities.trace
import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState.Initiated
import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState.Initiating
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.bufferUntilDatabaseCommit
@ -214,17 +216,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
serviceHub.networkService.addMessageHandler(sessionTopic) { message, reg ->
val sessionMessage = message.data.deserialize<SessionMessage>()
when (sessionMessage) {
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage)
is SessionInit -> {
// TODO Look up the party with the full X.500 name instead of just the legal name
val otherParty = serviceHub.networkMapCache.getNodeByLegalName(message.peer.commonName)?.legalIdentity
if (otherParty != null) {
onSessionInit(sessionMessage, otherParty)
} else {
logger.error("Unknown peer ${message.peer} in $sessionMessage")
val otherParty = serviceHub.networkMapCache.getNodeByLegalName(message.peer.commonName)?.legalIdentity
if (otherParty != null) {
when (sessionMessage) {
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, otherParty)
is SessionInit -> onSessionInit(sessionMessage, otherParty)
} else {
logger.error("Unknown peer ${message.peer} in $sessionMessage")
@ -238,14 +237,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun onExistingSessionMessage(message: ExistingSessionMessage) {
private fun onExistingSessionMessage(message: ExistingSessionMessage, otherParty: Party) {
val session = openSessions[message.recipientSessionId]
if (session != null) {
session.psm.logger.trace { "Received $message on $session" }
if (message is SessionEnd) {
session.receivedMessages += message
session.receivedMessages += ReceivedSessionMessage(otherParty, message)
if (session.waitingForResponse) {
// We only want to resume once, so immediately reset the flag.
session.waitingForResponse = false
@ -253,11 +252,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
} else {
val otherParty = recentlyClosedSessions.remove(message.recipientSessionId)
if (otherParty != null) {
val peerParty = recentlyClosedSessions.remove(message.recipientSessionId)
if (peerParty != null) {
if (message is SessionConfirm) {
logger.debug { "Received session confirmation but associated fiber has already terminated, so sending session end" }
sendSessionMessage(otherParty, SessionEnd(message.initiatedSessionId), null)
sendSessionMessage(peerParty, SessionEnd(message.initiatedSessionId), null)
} else {
logger.trace { "Ignoring session end message for already closed session: $message" }
@ -276,9 +275,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
if (flowFactory != null) {
val flow = flowFactory(otherParty)
val psm = createFiber(flow)
val session = FlowSession(flow, otherParty, random63BitValue(), otherPartySessionId)
val session = FlowSession(flow, random63BitValue(), FlowSessionState.Initiated(otherParty, otherPartySessionId))
if (sessionInit.firstPayload != null) {
session.receivedMessages += SessionData(session.ourSessionId, sessionInit.firstPayload)
session.receivedMessages += ReceivedSessionMessage(otherParty, SessionData(session.ourSessionId, sessionInit.firstPayload))
openSessions[session.ourSessionId] = session
psm.openSessions[Pair(flow, otherParty)] = session
@ -355,11 +354,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun endAllFiberSessions(psm: FlowStateMachineImpl<*>) {
openSessions.values.removeIf { session ->
if (session.psm == psm) {
val otherPartySessionId = session.otherPartySessionId
if (otherPartySessionId != null) {
sendSessionMessage(session.otherParty, SessionEnd(otherPartySessionId), psm)
val initiatedState = session.state as? FlowSessionState.Initiated
if (initiatedState != null) {
sendSessionMessage(initiatedState.peerParty, SessionEnd(initiatedState.peerSessionId), psm)
recentlyClosedSessions[session.ourSessionId] = initiatedState.peerParty
recentlyClosedSessions[session.ourSessionId] = session.otherParty
} else {
@ -437,7 +436,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
if (ioRequest.message is SessionInit) {
openSessions[ioRequest.session.ourSessionId] = ioRequest.session
sendSessionMessage(ioRequest.session.otherParty, ioRequest.message, ioRequest.session.psm)
sendSessionMessage(ioRequest.session.state.sendToParty, ioRequest.message, ioRequest.session.psm)
if (ioRequest !is ReceiveRequest<*>) {
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
@ -446,13 +445,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun sendSessionMessage(party: Party, message: SessionMessage, psm: FlowStateMachineImpl<*>?) {
val node = serviceHub.networkMapCache.getNodeByCompositeKey(party.owningKey)
val partyInfo = serviceHub.networkMapCache.getPartyInfo(party)
?: throw IllegalArgumentException("Don't know about party $party")
val address = serviceHub.networkService.getAddressOfParty(partyInfo)
val logger = psm?.logger ?: logger
logger.trace { "Sending $message to party $party" }
serviceHub.networkService.send(sessionTopic, message, node.address)
logger.debug { "Sending $message to party $party, address: $address" }
serviceHub.networkService.send(sessionTopic, message, address)
data class ReceivedSessionMessage<out M : SessionMessage>(val sendingParty: Party, val message: M)
interface SessionMessage
@ -480,16 +481,37 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
data class SessionEnd(override val recipientSessionId: Long) : ExistingSessionMessage
* [FlowSessionState] describes the session's state.
* [Initiating] is pre-handshake. [Initiating.otherParty] at this point holds a [Party] corresponding to either a
* specific peer or a service.
* [Initiated] is post-handshake. At this point [Initiating.otherParty] will have been resolved to a specific peer
* [Initiated.peerParty], and the peer's sessionId has been initialised.
sealed class FlowSessionState {
abstract val sendToParty: Party
class Initiating(
val otherParty: Party /** This may be a specific peer or a service party */
) : FlowSessionState() {
override val sendToParty: Party get() = otherParty
class Initiated(
val peerParty: Party, /** This must be a peer party */
val peerSessionId: Long
) : FlowSessionState() {
override val sendToParty: Party get() = peerParty
data class FlowSession(val flow: FlowLogic<*>,
val otherParty: Party,
val ourSessionId: Long,
var otherPartySessionId: Long?,
@Volatile var waitingForResponse: Boolean = false) {
val receivedMessages = ConcurrentLinkedQueue<ExistingSessionMessage>()
data class FlowSession(
val flow: FlowLogic<*>,
val ourSessionId: Long,
var state: FlowSessionState,
@Volatile var waitingForResponse: Boolean = false
) {
val receivedMessages = ConcurrentLinkedQueue<ReceivedSessionMessage<ExistingSessionMessage>>()
val psm: FlowStateMachineImpl<*> get() = flow.fsm as FlowStateMachineImpl<*>
@ -85,7 +85,7 @@ class TwoPartyTradeFlowTests {
net = MockNetwork(false, true)
ledger {
notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
aliceNode = net.createPartyNode(notaryNode.info.address, ALICE.name, ALICE_KEY)
bobNode = net.createPartyNode(notaryNode.info.address, BOB.name, BOB_KEY)
val aliceKey = aliceNode.services.legalIdentityKey
@ -125,7 +125,7 @@ class TwoPartyTradeFlowTests {
fun `shutdown and restore`() {
ledger {
notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
aliceNode = net.createPartyNode(notaryNode.info.address, ALICE.name, ALICE_KEY)
bobNode = net.createPartyNode(notaryNode.info.address, BOB.name, BOB_KEY)
@ -133,7 +133,7 @@ class TwoPartyTradeFlowTests {
val aliceKey = aliceNode.services.legalIdentityKey
val notaryKey = notaryNode.services.notaryIdentityKey
val bobAddr = bobNode.net.myAddress as InMemoryMessagingNetwork.Handle
val bobAddr = bobNode.net.myAddress as InMemoryMessagingNetwork.PeerHandle
val networkMapAddr = notaryNode.info.address
net.runNetwork() // Clear network map registration messages
@ -235,7 +235,7 @@ class TwoPartyTradeFlowTests {
fun `check dependencies of sale asset are resolved`() {
notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name, ALICE_KEY)
bobNode = makeNodeWithTracking(notaryNode.info.address, BOB.name, BOB_KEY)
val aliceKey = aliceNode.services.legalIdentityKey
@ -327,7 +327,7 @@ class TwoPartyTradeFlowTests {
fun `track() works`() {
notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name, ALICE_KEY)
bobNode = makeNodeWithTracking(notaryNode.info.address, BOB.name, BOB_KEY)
val aliceKey = aliceNode.services.legalIdentityKey
@ -427,7 +427,7 @@ class TwoPartyTradeFlowTests {
aliceError: Boolean,
expectedMessageSubstring: String
) {
notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
aliceNode = net.createPartyNode(notaryNode.info.address, ALICE.name, ALICE_KEY)
bobNode = net.createPartyNode(notaryNode.info.address, BOB.name, BOB_KEY)
val aliceKey = aliceNode.services.legalIdentityKey
@ -29,14 +29,14 @@ class InMemoryNetworkMapCacheTest {
val nodeB = network.createNode(null, -1, MockNetwork.DefaultFactory, true, "Node B", keyPair, ServiceInfo(NetworkMapService.type))
// Node A currently knows only about itself, so this returns node A
assertEquals(nodeA.netMapCache.getNodeByCompositeKey(keyPair.public.composite), nodeA.info)
assertEquals(nodeA.netMapCache.getNodeByLegalIdentityKey(keyPair.public.composite), nodeA.info)
databaseTransaction(nodeA.database) {
// Now both nodes match, so it throws an error
expect<IllegalStateException> {
@ -81,7 +81,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
databaseTransaction(database) {
val kms = MockKeyManagementService(ALICE_KEY)
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.Handle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), database)
val mockMessagingService = InMemoryMessagingNetwork(false).InMemoryMessaging(false, InMemoryMessagingNetwork.PeerHandle(0, "None"), AffinityExecutor.ServiceAffinityExecutor("test", 1), database)
services = object : MockServiceHubInternal(overrideClock = testClock, keyManagement = kms, net = mockMessagingService), TestReference {
override val testReference = this@NodeSchedulerServiceTest
@ -3,20 +3,30 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.issuedBy
import net.corda.core.crypto.Party
import net.corda.core.crypto.generateKeyPair
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSessionException
import net.corda.core.getOrThrow
import net.corda.core.random63BitValue
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.serialization.deserialize
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.NotaryFlow
import net.corda.node.services.persistence.checkpoints
import net.corda.node.services.statemachine.StateMachineManager.*
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.expect
import net.corda.testing.expectEvents
import net.corda.testing.initiateSingleShotFlow
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import net.corda.testing.sequence
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
@ -30,16 +40,24 @@ import kotlin.test.assertTrue
class StateMachineManagerTests {
private val net = MockNetwork()
private val net = MockNetwork(servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin())
private val sessionTransfers = ArrayList<SessionTransfer>()
private lateinit var node1: MockNode
private lateinit var node2: MockNode
private lateinit var notary1: MockNode
private lateinit var notary2: MockNode
fun start() {
val nodes = net.createTwoNodes()
node1 = nodes.first
node2 = nodes.second
val notaryKeyPair = generateKeyPair()
// Note that these notaries don't operate correctly as they don't share their state. They are only used for testing
// service addressing.
notary1 = net.createNotaryNode(networkMapAddr = node1.services.myInfo.address, keyPair = notaryKeyPair, serviceName = "notary-service-2000")
notary2 = net.createNotaryNode(networkMapAddr = node1.services.myInfo.address, keyPair = notaryKeyPair, serviceName = "notary-service-2000")
net.messagingNetwork.receivedMessages.toSessionTransfers().forEach { sessionTransfers += it }
@ -260,6 +278,57 @@ class StateMachineManagerTests {
fun `different notaries are picked when addressing shared notary identity`() {
assertEquals(notary1.info.notaryIdentity, notary2.info.notaryIdentity)
// We pay a couple of times, the notary picking should go round robin
for (i in 1 .. 3) {
sessionTransfers.expectEvents(isStrict = false) {
// First Pay
expect(match = { it.message is SessionInit && it.message.flowName == NotaryFlow.Client::class.java.name }) {
it.message as SessionInit
require(it.from == node1.id)
require(it.to == TransferRecipient.Service(notary1.info.notaryIdentity))
expect(match = { it.message is SessionConfirm }) {
it.message as SessionConfirm
require(it.from == notary1.id)
// Second pay
expect(match = { it.message is SessionInit && it.message.flowName == NotaryFlow.Client::class.java.name }) {
it.message as SessionInit
require(it.from == node1.id)
require(it.to == TransferRecipient.Service(notary1.info.notaryIdentity))
expect(match = { it.message is SessionConfirm }) {
it.message as SessionConfirm
require(it.from == notary2.id)
// Third pay
expect(match = { it.message is SessionInit && it.message.flowName == NotaryFlow.Client::class.java.name }) {
it.message as SessionInit
require(it.from == node1.id)
require(it.to == TransferRecipient.Service(notary1.info.notaryIdentity))
expect(match = { it.message is SessionConfirm }) {
it.message as SessionConfirm
require(it.from == notary1.id)
fun `exception thrown on other side`() {
node2.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { ExceptionFlow }
@ -301,11 +370,16 @@ class StateMachineManagerTests {
private fun assertSessionTransfers(node: MockNode, vararg expected: SessionTransfer) {
val actualForNode = sessionTransfers.filter { it.from == node.id || it.to == node.id }
val actualForNode = sessionTransfers.filter { it.from == node.id || it.to == TransferRecipient.Peer(node.id) }
private data class SessionTransfer(val from: Int, val message: SessionMessage, val to: Int) {
private interface TransferRecipient {
data class Peer(val id: Int) : TransferRecipient
data class Service(val identity: Party) : TransferRecipient
private data class SessionTransfer(val from: Int, val message: SessionMessage, val to: TransferRecipient) {
val isPayloadTransfer: Boolean get() = message is SessionData || message is SessionInit && message.firstPayload != null
override fun toString(): String = "$from sent $message to $to"
@ -314,7 +388,12 @@ class StateMachineManagerTests {
return filter { it.message.topicSession == StateMachineManager.sessionTopic }.map {
val from = it.sender.id
val message = it.message.data.deserialize<SessionMessage>()
val to = (it.recipients as InMemoryMessagingNetwork.Handle).id
val recipients = it.recipients
val to = when (recipients) {
is InMemoryMessagingNetwork.PeerHandle -> TransferRecipient.Peer(recipients.id)
is InMemoryMessagingNetwork.ServiceHandle -> TransferRecipient.Service(recipients.service.identity)
else -> throw IllegalStateException("Unknown recipients $recipients")
SessionTransfer(from, sanitise(message), to)
@ -330,7 +409,7 @@ class StateMachineManagerTests {
private infix fun MockNode.sent(message: SessionMessage): Pair<Int, SessionMessage> = Pair(id, message)
private infix fun Pair<Int, SessionMessage>.to(node: MockNode): SessionTransfer = SessionTransfer(first, second, node.id)
private infix fun Pair<Int, SessionMessage>.to(node: MockNode): SessionTransfer = SessionTransfer(first, second, TransferRecipient.Peer(node.id))
private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic<Unit>() {
@ -34,7 +34,7 @@ class IssuerFlowTest {
fun `test issuer flow`() {
net = MockNetwork(false, true)
ledger {
notaryNode = net.createNotaryNode(DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name, DUMMY_NOTARY_KEY)
bankOfCordaNode = net.createPartyNode(notaryNode.info.address, BOC_ISSUER_PARTY.name, BOC_KEY)
bankClientNode = net.createPartyNode(notaryNode.info.address, MEGA_CORP.name, MEGA_CORP_KEY)
@ -4,9 +4,11 @@ import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.ThreadBox
import net.corda.core.getOrThrow
import net.corda.core.crypto.X509Utilities
import net.corda.core.getOrThrow
import net.corda.core.messaging.*
import net.corda.core.node.ServiceEntry
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.trace
import net.corda.node.services.api.MessagingServiceBuilder
@ -35,18 +37,24 @@ import kotlin.concurrent.thread
* messages one by one to registered handlers. Alternatively, a messaging system may be manually pumped, in which
* case no thread is created and a caller is expected to force delivery one at a time (this is useful for unit
* testing).
* @param servicePeerAllocationStrategy defines the strategy to be used when determining which peer to send to in case
* a service is addressed.
class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSerializeAsToken() {
class InMemoryMessagingNetwork(
val sendManuallyPumped: Boolean,
val servicePeerAllocationStrategy: ServicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random()
) : SingletonSerializeAsToken() {
companion object {
val MESSAGES_LOG_NAME = "messages"
private val log = LoggerFactory.getLogger(MESSAGES_LOG_NAME)
private var counter = 0 // -1 means stopped.
private val handleEndpointMap = HashMap<Handle, InMemoryMessaging>()
private val handleEndpointMap = HashMap<PeerHandle, InMemoryMessaging>()
data class MessageTransfer(val sender: Handle, val message: Message, val recipients: MessageRecipients) {
data class MessageTransfer(val sender: PeerHandle, val message: Message, val recipients: MessageRecipients) {
override fun toString() = "${message.topicSession} from '$sender' to '$recipients'"
@ -64,9 +72,12 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
// been created yet. If the node identified by the given handle has gone away/been shut down then messages
// stack up here waiting for it to come back. The intent of this is to simulate a reliable messaging network.
// The corresponding stream reflects when a message was pumpReceive'd
private val messageReceiveQueues = HashMap<Handle, LinkedBlockingQueue<MessageTransfer>>()
private val messageReceiveQueues = HashMap<PeerHandle, LinkedBlockingQueue<MessageTransfer>>()
private val _receivedMessages = PublishSubject.create<MessageTransfer>()
// Holds the mapping from services to peers advertising the service.
private val serviceToPeersMapping = HashMap<ServiceHandle, LinkedHashSet<PeerHandle>>()
val messagesInFlight = ReusableLatch()
@Suppress("unused") // Used by the visualiser tool.
@ -90,9 +101,10 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
fun createNode(manuallyPumped: Boolean,
executor: AffinityExecutor,
database: Database): Pair<Handle, MessagingServiceBuilder<InMemoryMessaging>> {
advertisedServices: List<ServiceEntry>,
database: Database): Pair<PeerHandle, MessagingServiceBuilder<InMemoryMessaging>> {
check(counter >= 0) { "In memory network stopped: please recreate." }
val builder = createNodeWithID(manuallyPumped, counter, executor, database = database) as Builder
val builder = createNodeWithID(manuallyPumped, counter, executor, advertisedServices, database = database) as Builder
val id = builder.id
return Pair(id, builder)
@ -106,10 +118,15 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
* @param description text string that identifies this node for message logging (if is enabled) or null to autogenerate.
* @param persistenceTx a lambda to wrap message handling in a transaction if necessary.
fun createNodeWithID(manuallyPumped: Boolean, id: Int, executor: AffinityExecutor, description: String? = null,
database: Database)
fun createNodeWithID(
manuallyPumped: Boolean,
id: Int,
executor: AffinityExecutor,
advertisedServices: List<ServiceEntry>,
description: String? = null,
database: Database)
: MessagingServiceBuilder<InMemoryMessaging> {
return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"), executor, database = database)
return Builder(manuallyPumped, PeerHandle(id, description ?: "In memory node $id"), advertisedServices.map(::ServiceHandle), executor, database = database)
interface LatencyCalculator {
@ -127,12 +144,20 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
private fun netNodeHasShutdown(handle: Handle) {
private fun netNodeHasShutdown(peerHandle: PeerHandle) {
private fun getQueueForHandle(recipients: Handle) = messageReceiveQueues.getOrPut(recipients) { LinkedBlockingQueue() }
private fun getQueueForPeerHandle(recipients: PeerHandle) = messageReceiveQueues.getOrPut(recipients) { LinkedBlockingQueue() }
private fun getQueuesForServiceHandle(recipients: ServiceHandle): List<LinkedBlockingQueue<MessageTransfer>> {
return serviceToPeersMapping[recipients]!!.map {
messageReceiveQueues.getOrPut(it) { LinkedBlockingQueue() }
val everyoneOnline: AllPossibleRecipients = object : AllPossibleRecipients {}
@ -149,22 +174,56 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
inner class Builder(val manuallyPumped: Boolean, val id: Handle, val executor: AffinityExecutor, val database: Database) : MessagingServiceBuilder<InMemoryMessaging> {
inner class Builder(
val manuallyPumped: Boolean,
val id: PeerHandle,
val serviceHandles: List<ServiceHandle>,
val executor: AffinityExecutor,
val database: Database) : MessagingServiceBuilder<InMemoryMessaging> {
override fun start(): ListenableFuture<InMemoryMessaging> {
synchronized(this@InMemoryMessagingNetwork) {
val node = InMemoryMessaging(manuallyPumped, id, executor, database)
handleEndpointMap[id] = node
serviceHandles.forEach {
serviceToPeersMapping.getOrPut(it) { LinkedHashSet<PeerHandle>() }.add(id)
return Futures.immediateFuture(node)
class Handle(val id: Int, val description: String) : SingleMessageRecipient {
data class PeerHandle(val id: Int, val description: String) : SingleMessageRecipient {
override fun toString() = description
override fun equals(other: Any?) = other is Handle && other.id == id
override fun equals(other: Any?) = other is PeerHandle && other.id == id
override fun hashCode() = id.hashCode()
data class ServiceHandle(val service: ServiceEntry) : MessageRecipientGroup {
override fun toString() = "Service($service)"
* Mock service loadbalancing
sealed class ServicePeerAllocationStrategy {
abstract fun <A> pickNext(service: ServiceHandle, pickFrom: List<A>): A
class Random(val random: SplittableRandom = SplittableRandom()) : ServicePeerAllocationStrategy() {
override fun <A> pickNext(service: ServiceHandle, pickFrom: List<A>): A {
return pickFrom[random.nextInt(pickFrom.size)]
class RoundRobin : ServicePeerAllocationStrategy() {
val previousPicks = HashMap<ServiceHandle, Int>()
override fun <A> pickNext(service: ServiceHandle, pickFrom: List<A>): A {
val nextIndex = previousPicks.compute(service) { _key, previous ->
(previous?.plus(1) ?: 0) % pickFrom.size
return pickFrom[nextIndex]
// If block is set to true this function will only return once a message has been pushed onto the recipients' queues
fun pumpSend(block: Boolean): MessageTransfer? {
val transfer = (if (block) messageSendQueue.take() else messageSendQueue.poll()) ?: return null
@ -190,12 +249,17 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
fun pumpSendInternal(transfer: MessageTransfer) {
when (transfer.recipients) {
is Handle -> getQueueForHandle(transfer.recipients).add(transfer)
is PeerHandle -> getQueueForPeerHandle(transfer.recipients).add(transfer)
is ServiceHandle -> {
val queues = getQueuesForServiceHandle(transfer.recipients)
val queue = servicePeerAllocationStrategy.pickNext(transfer.recipients, queues)
is AllPossibleRecipients -> {
// This means all possible recipients _that the network knows about at the time_, not literally everyone
// who joins into the indefinite future.
for (handle in handleEndpointMap.keys)
else -> throw IllegalArgumentException("Unknown type of recipient handle")
@ -211,7 +275,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
inner class InMemoryMessaging(private val manuallyPumped: Boolean,
private val handle: Handle,
private val peerHandle: PeerHandle,
private val executor: AffinityExecutor,
private val database: Database) : SingletonSerializeAsToken(), MessagingServiceInternal {
inner class Handler(val topicSession: TopicSession,
@ -228,7 +292,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
private val state = ThreadBox(InnerState())
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(HashSet<UUID>())
override val myAddress: Handle get() = handle
override val myAddress: PeerHandle get() = peerHandle
private val backgroundThread = if (manuallyPumped) null else
thread(isDaemon = true, name = "In-memory message dispatcher") {
@ -241,6 +305,13 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
return when (partyInfo) {
is PartyInfo.Node -> partyInfo.node.address
is PartyInfo.Service -> ServiceHandle(partyInfo.service)
override fun addMessageHandler(topic: String, sessionID: Long, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
= addMessageHandler(TopicSession(topic, sessionID), callback)
@ -279,7 +350,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
running = false
/** Returns the given (topic & session, data) pair as a newly created message object. */
@ -347,7 +418,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
private fun pumpReceiveInternal(block: Boolean): MessageTransfer? {
val q = getQueueForHandle(handle)
val q = getQueueForPeerHandle(peerHandle)
val next = getNextQueue(q, block) ?: return null
val (transfer, deliverTo) = next
@ -5,6 +5,7 @@ import com.google.common.jimfs.Jimfs
import com.google.common.util.concurrent.Futures
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.PhysicalLocation
@ -15,7 +16,6 @@ import net.corda.node.internal.AbstractNode
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.keys.E2ETestKeyManagementService
import net.corda.core.messaging.RPCOps
import net.corda.node.services.network.InMemoryNetworkMapService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.InMemoryUniquenessProvider
@ -47,10 +47,12 @@ import java.util.concurrent.atomic.AtomicInteger
class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
private val threadPerNode: Boolean = false,
private val servicePeerAllocationStrategy: InMemoryMessagingNetwork.ServicePeerAllocationStrategy =
private val defaultFactory: Factory = MockNetwork.DefaultFactory) {
private var nextNodeId = 0
val filesystem: FileSystem = Jimfs.newFileSystem(unix())
val messagingNetwork = InMemoryMessagingNetwork(networkSendManuallyPumped)
val messagingNetwork = InMemoryMessagingNetwork(networkSendManuallyPumped, servicePeerAllocationStrategy)
// A unique identifier for this network to segregate databases with the same nodeID but different networks.
private val networkId = random63BitValue()
@ -118,7 +120,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
// through the java.nio API which we are already mocking via Jimfs.
override fun makeMessagingService(): MessagingServiceInternal {
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, serverThread, configuration.myLegalName, database).start().getOrThrow()
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, serverThread, makeServiceEntries(), configuration.myLegalName, database).start().getOrThrow()
override fun makeIdentityService() = MockIdentityService(mockNet.identities)
@ -268,8 +270,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
return BasketOfNodes(nodes, notaryNode, mapNode)
fun createNotaryNode(legalName: String? = null, keyPair: KeyPair? = null): MockNode {
return createNode(null, -1, defaultFactory, true, legalName, keyPair, ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type))
fun createNotaryNode(networkMapAddr: SingleMessageRecipient? = null, legalName: String? = null, keyPair: KeyPair? = null, serviceName: String? = null): MockNode {
return createNode(networkMapAddr, -1, defaultFactory, true, legalName, keyPair, ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type, serviceName))
fun createPartyNode(networkMapAddr: SingleMessageRecipient, legalName: String? = null, keyPair: KeyPair? = null): MockNode {
Reference in New Issue
Block a user