mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
Network map redesign: Change field types in NodeInfo, move away messaging data from NodeInfo (#921)
* First stage of changing fields in NodeInfo. Part of work related to NetworkMapService upgrade. Create slots for multiple IP addresses and legalIdentities per node. * NodeInfo stores HostAndPort. Move information specific to messaging layer away from NodeInfo. Only HostAndPort addresses are stored. Add peer name - peer handle mapping to MockNetwork to reflect that change.
This commit is contained in:
committed by
GitHub
parent
4139cf497d
commit
58da76c052
@ -50,7 +50,7 @@ class BFTNotaryServiceTests {
|
||||
val notaryClusterAddresses = replicaIds.map { HostAndPort.fromParts("localhost", 11000 + it * 10) }
|
||||
replicaIds.forEach { replicaId ->
|
||||
mockNet.createNode(
|
||||
node.info.address,
|
||||
node.network.myAddress,
|
||||
advertisedServices = bftNotaryService,
|
||||
configOverrides = {
|
||||
whenever(it.bftReplicaId).thenReturn(replicaId)
|
||||
|
@ -16,6 +16,7 @@ import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.network.NetworkMapService.RegistrationRequest
|
||||
import net.corda.node.services.network.NodeRegistration
|
||||
import net.corda.node.utilities.AddOrRemove
|
||||
import net.corda.testing.MOCK_HOST_AND_PORT
|
||||
import net.corda.testing.MOCK_VERSION_INFO
|
||||
import net.corda.testing.node.NodeBasedTest
|
||||
import net.corda.testing.node.SimpleNode
|
||||
@ -69,7 +70,7 @@ class P2PSecurityTest : NodeBasedTest() {
|
||||
|
||||
private fun SimpleNode.registerWithNetworkMap(registrationName: X500Name): ListenableFuture<NetworkMapService.RegistrationResponse> {
|
||||
val legalIdentity = getTestPartyAndCertificate(registrationName, identity.public)
|
||||
val nodeInfo = NodeInfo(network.myAddress, legalIdentity, MOCK_VERSION_INFO.platformVersion)
|
||||
val nodeInfo = NodeInfo(listOf(MOCK_HOST_AND_PORT), legalIdentity, setOf(legalIdentity), MOCK_VERSION_INFO.platformVersion)
|
||||
val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX)
|
||||
val request = RegistrationRequest(registration.toWire(keyService, identity.public), network.myAddress)
|
||||
return network.sendRequest<NetworkMapService.RegistrationResponse>(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.network.myAddress)
|
||||
|
@ -3,6 +3,7 @@ package net.corda.node.internal
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import com.google.common.collect.MutableClassToInstanceMap
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
@ -58,6 +59,7 @@ import net.corda.node.utilities.AddOrRemove.ADD
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
@ -158,7 +160,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
open fun findMyLocation(): PhysicalLocation? {
|
||||
open fun findMyLocation(): WorldMapLocation? {
|
||||
return configuration.myLegalName.locationOrNull?.let { CityDatabase[it] }
|
||||
}
|
||||
|
||||
@ -548,7 +550,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
private fun makeInfo(): NodeInfo {
|
||||
val advertisedServiceEntries = makeServiceEntries()
|
||||
val legalIdentity = obtainLegalIdentity()
|
||||
return NodeInfo(network.myAddress, legalIdentity, platformVersion, advertisedServiceEntries, findMyLocation())
|
||||
val allIdentitiesSet = advertisedServiceEntries.map { it.identity }.toSet() + legalIdentity
|
||||
val addresses = myAddresses() // TODO There is no support for multiple IP addresses yet.
|
||||
return NodeInfo(addresses, legalIdentity, allIdentitiesSet, platformVersion, advertisedServiceEntries, findMyLocation())
|
||||
}
|
||||
|
||||
/**
|
||||
@ -641,7 +645,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
require(networkMapAddress != null || NetworkMapService.type in advertisedServices.map { it.type }) {
|
||||
"Initial network map address must indicate a node that provides a network map service"
|
||||
}
|
||||
val address = networkMapAddress ?: info.address
|
||||
val address: SingleMessageRecipient = networkMapAddress ?:
|
||||
network.getAddressOfParty(PartyInfo.Node(info)) as SingleMessageRecipient
|
||||
// Register for updates, even if we're the one running the network map.
|
||||
return sendNetworkMapRegistration(address).flatMap { (error) ->
|
||||
check(error == null) { "Unable to register with the network map service: $error" }
|
||||
@ -660,6 +665,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
return network.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapAddress)
|
||||
}
|
||||
|
||||
/** Return list of node's addresses. It's overridden in MockNetwork as we don't have real addresses for MockNodes. */
|
||||
protected abstract fun myAddresses(): List<HostAndPort>
|
||||
|
||||
/** This is overriden by the mock node implementation to enable operation without any network map service */
|
||||
protected open fun noNetworkMapConfigured(): ListenableFuture<Unit> {
|
||||
// TODO: There should be a consistent approach to configuration error exceptions.
|
||||
|
@ -32,6 +32,7 @@ import net.corda.node.services.transactions.RaftUniquenessProvider
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.node.utilities.AddressUtils
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.IP_REQUEST_PREFIX
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.NetworkMapAddress
|
||||
@ -274,6 +275,11 @@ open class Node(override val configuration: FullNodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
override fun myAddresses(): List<HostAndPort> {
|
||||
val address = network.myAddress as ArtemisMessagingComponent.ArtemisPeerAddress
|
||||
return listOf(address.hostAndPort)
|
||||
}
|
||||
|
||||
/**
|
||||
* If the node is persisting to an embedded H2 database, then expose this via TCP with a JDBC URL of the form:
|
||||
* jdbc:h2:tcp://<host>:<port>/node
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.node.services.api
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import net.corda.core.flows.FlowInitiator
|
||||
import net.corda.core.flows.FlowLogic
|
||||
|
@ -298,12 +298,8 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
|
||||
fun deployBridgeToPeer(nodeInfo: NodeInfo) {
|
||||
log.debug("Deploying bridge for $queueName to $nodeInfo")
|
||||
val address = nodeInfo.address
|
||||
if (address is ArtemisPeerAddress) {
|
||||
deployBridge(queueName, address.hostAndPort, nodeInfo.legalIdentity.name)
|
||||
} else {
|
||||
log.error("Don't know how to deal with $address for queue $queueName")
|
||||
}
|
||||
val address = nodeInfo.addresses.first() // TODO Load balancing.
|
||||
deployBridge(queueName, address, nodeInfo.legalIdentity.name)
|
||||
}
|
||||
|
||||
when {
|
||||
@ -342,7 +338,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
*/
|
||||
private fun updateBridgesOnNetworkChange(change: MapChange) {
|
||||
fun gatherAddresses(node: NodeInfo): Sequence<ArtemisPeerAddress> {
|
||||
val peerAddress = node.address as ArtemisPeerAddress
|
||||
val peerAddress = getArtemisPeerAddress(node)
|
||||
val addresses = mutableListOf(peerAddress)
|
||||
node.advertisedServices.mapTo(addresses) { NodeAddress.asService(it.identity.owningKey, peerAddress.hostAndPort) }
|
||||
return addresses.asSequence()
|
||||
|
@ -564,11 +564,10 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
|
||||
return when (partyInfo) {
|
||||
is PartyInfo.Node -> partyInfo.node.address
|
||||
is PartyInfo.Service -> ArtemisMessagingComponent.ServiceAddress(partyInfo.service.identity.owningKey)
|
||||
is PartyInfo.Node -> getArtemisPeerAddress(partyInfo.node)
|
||||
is PartyInfo.Service -> ServiceAddress(partyInfo.service.identity.owningKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -134,7 +134,9 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
|
||||
override fun deregisterForUpdates(network: MessagingService, service: NodeInfo): ListenableFuture<Unit> {
|
||||
// Fetch the network map and register for updates at the same time
|
||||
val req = NetworkMapService.SubscribeRequest(false, network.myAddress)
|
||||
val future = network.sendRequest<SubscribeResponse>(NetworkMapService.SUBSCRIPTION_TOPIC, req, service.address).map {
|
||||
// `network.getAddressOfParty(partyInfo)` is a work-around for MockNetwork and InMemoryMessaging to get rid of SingleMessageRecipient in NodeInfo.
|
||||
val address = network.getAddressOfParty(PartyInfo.Node(service))
|
||||
val future = network.sendRequest<SubscribeResponse>(NetworkMapService.SUBSCRIPTION_TOPIC, req, address).map {
|
||||
if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed()
|
||||
}
|
||||
_registrationFuture.setFuture(future)
|
||||
|
@ -273,11 +273,11 @@ abstract class AbstractNetworkMapService(services: ServiceHubInternal,
|
||||
// subscribers
|
||||
when (change.type) {
|
||||
ADD -> {
|
||||
logger.info("Added node ${node.address} to network map")
|
||||
logger.info("Added node ${node.addresses} to network map")
|
||||
services.networkMapCache.addNode(change.node)
|
||||
}
|
||||
REMOVE -> {
|
||||
logger.info("Removed node ${node.address} from network map")
|
||||
logger.info("Removed node ${node.addresses} from network map")
|
||||
services.networkMapCache.removeNode(change.node)
|
||||
}
|
||||
}
|
||||
|
@ -59,8 +59,8 @@ class CordaRPCOpsImplTest {
|
||||
fun setup() {
|
||||
mockNet = MockNetwork()
|
||||
val networkMap = mockNet.createNode(advertisedServices = ServiceInfo(NetworkMapService.type))
|
||||
aliceNode = mockNet.createNode(networkMapAddress = networkMap.info.address)
|
||||
notaryNode = mockNet.createNode(advertisedServices = ServiceInfo(SimpleNotaryService.type), networkMapAddress = networkMap.info.address)
|
||||
aliceNode = mockNet.createNode(networkMapAddress = networkMap.network.myAddress)
|
||||
notaryNode = mockNet.createNode(advertisedServices = ServiceInfo(SimpleNotaryService.type), networkMapAddress = networkMap.network.myAddress)
|
||||
rpc = CordaRPCOpsImpl(aliceNode.services, aliceNode.smm, aliceNode.database)
|
||||
CURRENT_RPC_CONTEXT.set(RpcContext(User("user", "pwd", permissions = setOf(
|
||||
startFlowPermission<CashIssueFlow>(),
|
||||
|
@ -114,7 +114,7 @@ class AttachmentTests {
|
||||
}
|
||||
}
|
||||
}, true, null, null, ServiceInfo(NetworkMapService.type), ServiceInfo(SimpleNotaryService.type))
|
||||
val n1 = mockNet.createNode(n0.info.address)
|
||||
val n1 = mockNet.createNode(n0.network.myAddress)
|
||||
|
||||
val attachment = fakeAttachment()
|
||||
// Insert an attachment into node zero's store directly.
|
||||
|
@ -34,15 +34,15 @@ class InMemoryMessagingTests {
|
||||
@Test
|
||||
fun basics() {
|
||||
val node1 = mockNet.createNode(advertisedServices = ServiceInfo(NetworkMapService.type))
|
||||
val node2 = mockNet.createNode(networkMapAddress = node1.info.address)
|
||||
val node3 = mockNet.createNode(networkMapAddress = node1.info.address)
|
||||
val node2 = mockNet.createNode(networkMapAddress = node1.network.myAddress)
|
||||
val node3 = mockNet.createNode(networkMapAddress = node1.network.myAddress)
|
||||
|
||||
val bits = "test-content".toByteArray()
|
||||
var finalDelivery: Message? = null
|
||||
|
||||
with(node2) {
|
||||
node2.network.addMessageHandler { msg, _ ->
|
||||
node2.network.send(msg, node3.info.address)
|
||||
node2.network.send(msg, node3.network.myAddress)
|
||||
}
|
||||
}
|
||||
|
||||
@ -53,7 +53,7 @@ class InMemoryMessagingTests {
|
||||
}
|
||||
|
||||
// Node 1 sends a message and it should end up in finalDelivery, after we run the network
|
||||
node1.network.send(node1.network.createMessage("test.topic", DEFAULT_SESSION_ID, bits), node2.info.address)
|
||||
node1.network.send(node1.network.createMessage("test.topic", DEFAULT_SESSION_ID, bits), node2.network.myAddress)
|
||||
|
||||
mockNet.runNetwork(rounds = 1)
|
||||
|
||||
@ -63,8 +63,8 @@ class InMemoryMessagingTests {
|
||||
@Test
|
||||
fun broadcast() {
|
||||
val node1 = mockNet.createNode(advertisedServices = ServiceInfo(NetworkMapService.type))
|
||||
val node2 = mockNet.createNode(networkMapAddress = node1.info.address)
|
||||
val node3 = mockNet.createNode(networkMapAddress = node1.info.address)
|
||||
val node2 = mockNet.createNode(networkMapAddress = node1.network.myAddress)
|
||||
val node3 = mockNet.createNode(networkMapAddress = node1.network.myAddress)
|
||||
|
||||
val bits = "test-content".toByteArray()
|
||||
|
||||
@ -82,7 +82,7 @@ class InMemoryMessagingTests {
|
||||
@Test
|
||||
fun `skip unhandled messages`() {
|
||||
val node1 = mockNet.createNode(advertisedServices = ServiceInfo(NetworkMapService.type))
|
||||
val node2 = mockNet.createNode(networkMapAddress = node1.info.address)
|
||||
val node2 = mockNet.createNode(networkMapAddress = node1.network.myAddress)
|
||||
var received: Int = 0
|
||||
|
||||
node1.network.addMessageHandler("valid_message") { _, _ ->
|
||||
|
@ -127,8 +127,8 @@ class TwoPartyTradeFlowTests {
|
||||
|
||||
ledger {
|
||||
val notaryNode = mockNet.createNotaryNode(null, DUMMY_NOTARY.name)
|
||||
val aliceNode = mockNet.createPartyNode(notaryNode.info.address, ALICE.name)
|
||||
val bobNode = mockNet.createPartyNode(notaryNode.info.address, BOB.name)
|
||||
val aliceNode = mockNet.createPartyNode(notaryNode.network.myAddress, ALICE.name)
|
||||
val bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name)
|
||||
|
||||
aliceNode.disableDBCloseOnStop()
|
||||
bobNode.disableDBCloseOnStop()
|
||||
@ -173,13 +173,13 @@ class TwoPartyTradeFlowTests {
|
||||
fun `shutdown and restore`() {
|
||||
ledger {
|
||||
val notaryNode = mockNet.createNotaryNode(null, DUMMY_NOTARY.name)
|
||||
val aliceNode = mockNet.createPartyNode(notaryNode.info.address, ALICE.name)
|
||||
var bobNode = mockNet.createPartyNode(notaryNode.info.address, BOB.name)
|
||||
val aliceNode = mockNet.createPartyNode(notaryNode.network.myAddress, ALICE.name)
|
||||
var bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name)
|
||||
aliceNode.disableDBCloseOnStop()
|
||||
bobNode.disableDBCloseOnStop()
|
||||
|
||||
val bobAddr = bobNode.network.myAddress as InMemoryMessagingNetwork.PeerHandle
|
||||
val networkMapAddr = notaryNode.info.address
|
||||
val networkMapAddr = notaryNode.network.myAddress
|
||||
|
||||
mockNet.runNetwork() // Clear network map registration messages
|
||||
|
||||
@ -291,8 +291,8 @@ class TwoPartyTradeFlowTests {
|
||||
@Test
|
||||
fun `check dependencies of sale asset are resolved`() {
|
||||
val notaryNode = mockNet.createNotaryNode(null, DUMMY_NOTARY.name)
|
||||
val aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name)
|
||||
val bobNode = makeNodeWithTracking(notaryNode.info.address, BOB.name)
|
||||
val aliceNode = makeNodeWithTracking(notaryNode.network.myAddress, ALICE.name)
|
||||
val bobNode = makeNodeWithTracking(notaryNode.network.myAddress, BOB.name)
|
||||
|
||||
ledger(aliceNode.services) {
|
||||
|
||||
@ -390,8 +390,8 @@ class TwoPartyTradeFlowTests {
|
||||
@Test
|
||||
fun `track works`() {
|
||||
val notaryNode = mockNet.createNotaryNode(null, DUMMY_NOTARY.name)
|
||||
val aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name)
|
||||
val bobNode = makeNodeWithTracking(notaryNode.info.address, BOB.name)
|
||||
val aliceNode = makeNodeWithTracking(notaryNode.network.myAddress, ALICE.name)
|
||||
val bobNode = makeNodeWithTracking(notaryNode.network.myAddress, BOB.name)
|
||||
|
||||
ledger(aliceNode.services) {
|
||||
|
||||
@ -528,8 +528,8 @@ class TwoPartyTradeFlowTests {
|
||||
expectedMessageSubstring: String
|
||||
) {
|
||||
val notaryNode = mockNet.createNotaryNode(null, DUMMY_NOTARY.name)
|
||||
val aliceNode = mockNet.createPartyNode(notaryNode.info.address, ALICE.name)
|
||||
val bobNode = mockNet.createPartyNode(notaryNode.info.address, BOB.name)
|
||||
val aliceNode = mockNet.createPartyNode(notaryNode.network.myAddress, ALICE.name)
|
||||
val bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name)
|
||||
val issuer = MEGA_CORP.ref(1, 2, 3)
|
||||
|
||||
val bobsBadCash = bobNode.database.transaction {
|
||||
|
@ -37,9 +37,9 @@ class NotaryChangeTests {
|
||||
oldNotaryNode = mockNet.createNode(
|
||||
legalName = DUMMY_NOTARY.name,
|
||||
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(SimpleNotaryService.type)))
|
||||
clientNodeA = mockNet.createNode(networkMapAddress = oldNotaryNode.info.address)
|
||||
clientNodeB = mockNet.createNode(networkMapAddress = oldNotaryNode.info.address)
|
||||
newNotaryNode = mockNet.createNode(networkMapAddress = oldNotaryNode.info.address, advertisedServices = ServiceInfo(SimpleNotaryService.type))
|
||||
clientNodeA = mockNet.createNode(networkMapAddress = oldNotaryNode.network.myAddress)
|
||||
clientNodeB = mockNet.createNode(networkMapAddress = oldNotaryNode.network.myAddress)
|
||||
newNotaryNode = mockNet.createNode(networkMapAddress = oldNotaryNode.network.myAddress, advertisedServices = ServiceInfo(SimpleNotaryService.type))
|
||||
|
||||
mockNet.runNetwork() // Clear network map registration messages
|
||||
}
|
||||
|
@ -94,8 +94,8 @@ class ScheduledFlowTests {
|
||||
notaryNode = mockNet.createNode(
|
||||
legalName = DUMMY_NOTARY.name,
|
||||
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type)))
|
||||
nodeA = mockNet.createNode(notaryNode.info.address, start = false)
|
||||
nodeB = mockNet.createNode(notaryNode.info.address, start = false)
|
||||
nodeA = mockNet.createNode(notaryNode.network.myAddress, start = false)
|
||||
nodeB = mockNet.createNode(notaryNode.network.myAddress, start = false)
|
||||
mockNet.startNodes()
|
||||
}
|
||||
|
||||
|
@ -186,8 +186,8 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
|
||||
}
|
||||
|
||||
private fun MockNode.fetchMap(subscribe: Boolean = false, ifChangedSinceVersion: Int? = null): List<Changed> {
|
||||
val request = FetchMapRequest(subscribe, ifChangedSinceVersion, info.address)
|
||||
val response = services.networkService.sendRequest<FetchMapResponse>(FETCH_TOPIC, request, mapServiceNode.info.address)
|
||||
val request = FetchMapRequest(subscribe, ifChangedSinceVersion, network.myAddress)
|
||||
val response = services.networkService.sendRequest<FetchMapResponse>(FETCH_TOPIC, request, mapServiceNode.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
return response.getOrThrow().nodes?.map { it.toChanged() } ?: emptyList()
|
||||
}
|
||||
@ -198,8 +198,8 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
|
||||
}
|
||||
|
||||
private fun MockNode.identityQuery(): NodeInfo? {
|
||||
val request = QueryIdentityRequest(info.legalIdentityAndCert, info.address)
|
||||
val response = services.networkService.sendRequest<QueryIdentityResponse>(QUERY_TOPIC, request, mapServiceNode.info.address)
|
||||
val request = QueryIdentityRequest(info.legalIdentityAndCert, network.myAddress)
|
||||
val response = services.networkService.sendRequest<QueryIdentityResponse>(QUERY_TOPIC, request, mapServiceNode.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
return response.getOrThrow().node
|
||||
}
|
||||
@ -216,39 +216,39 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
|
||||
}
|
||||
val expires = Instant.now() + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
|
||||
val nodeRegistration = NodeRegistration(info, distinctSerial, addOrRemove, expires)
|
||||
val request = RegistrationRequest(nodeRegistration.toWire(services.keyManagementService, services.legalIdentityKey), info.address)
|
||||
val response = services.networkService.sendRequest<RegistrationResponse>(REGISTER_TOPIC, request, mapServiceNode.info.address)
|
||||
val request = RegistrationRequest(nodeRegistration.toWire(services.keyManagementService, services.legalIdentityKey), network.myAddress)
|
||||
val response = services.networkService.sendRequest<RegistrationResponse>(REGISTER_TOPIC, request, mapServiceNode.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
return response
|
||||
}
|
||||
|
||||
private fun MockNode.subscribe(): List<Update> {
|
||||
val request = SubscribeRequest(true, info.address)
|
||||
val request = SubscribeRequest(true, network.myAddress)
|
||||
val updates = BlockingArrayQueue<Update>()
|
||||
services.networkService.addMessageHandler(PUSH_TOPIC, DEFAULT_SESSION_ID) { message, _ ->
|
||||
updates += message.data.deserialize<Update>()
|
||||
}
|
||||
val response = services.networkService.sendRequest<SubscribeResponse>(SUBSCRIPTION_TOPIC, request, mapServiceNode.info.address)
|
||||
val response = services.networkService.sendRequest<SubscribeResponse>(SUBSCRIPTION_TOPIC, request, mapServiceNode.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
assertThat(response.getOrThrow().confirmed).isTrue()
|
||||
return updates
|
||||
}
|
||||
|
||||
private fun MockNode.unsubscribe() {
|
||||
val request = SubscribeRequest(false, info.address)
|
||||
val response = services.networkService.sendRequest<SubscribeResponse>(SUBSCRIPTION_TOPIC, request, mapServiceNode.info.address)
|
||||
val request = SubscribeRequest(false, network.myAddress)
|
||||
val response = services.networkService.sendRequest<SubscribeResponse>(SUBSCRIPTION_TOPIC, request, mapServiceNode.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
assertThat(response.getOrThrow().confirmed).isTrue()
|
||||
}
|
||||
|
||||
private fun MockNode.ackUpdate(mapVersion: Int) {
|
||||
val request = UpdateAcknowledge(mapVersion, services.networkService.myAddress)
|
||||
services.networkService.send(PUSH_ACK_TOPIC, DEFAULT_SESSION_ID, request, mapServiceNode.info.address)
|
||||
services.networkService.send(PUSH_ACK_TOPIC, DEFAULT_SESSION_ID, request, mapServiceNode.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
}
|
||||
|
||||
private fun addNewNodeToNetworkMap(legalName: X500Name): MockNode {
|
||||
val node = mockNet.createNode(networkMapAddress = mapServiceNode.info.address, legalName = legalName)
|
||||
val node = mockNet.createNode(networkMapAddress = mapServiceNode.network.myAddress, legalName = legalName)
|
||||
mockNet.runNetwork()
|
||||
lastSerial = System.currentTimeMillis()
|
||||
return node
|
||||
|
@ -16,7 +16,7 @@ class InMemoryNetworkMapCacheTest {
|
||||
@Test
|
||||
fun registerWithNetwork() {
|
||||
val (n0, n1) = mockNet.createTwoNodes()
|
||||
val future = n1.services.networkMapCache.addMapService(n1.network, n0.info.address, false, null)
|
||||
val future = n1.services.networkMapCache.addMapService(n1.network, n0.network.myAddress, false, null)
|
||||
mockNet.runNetwork()
|
||||
future.getOrThrow()
|
||||
}
|
||||
|
@ -81,8 +81,8 @@ class FlowFrameworkTests {
|
||||
val overrideServices = mapOf(Pair(notaryService, notaryKeyPair))
|
||||
// Note that these notaries don't operate correctly as they don't share their state. They are only used for testing
|
||||
// service addressing.
|
||||
notary1 = mockNet.createNotaryNode(networkMapAddr = node1.services.myInfo.address, overrideServices = overrideServices, serviceName = notaryService.name)
|
||||
notary2 = mockNet.createNotaryNode(networkMapAddr = node1.services.myInfo.address, overrideServices = overrideServices, serviceName = notaryService.name)
|
||||
notary1 = mockNet.createNotaryNode(networkMapAddr = node1.network.myAddress, overrideServices = overrideServices, serviceName = notaryService.name)
|
||||
notary2 = mockNet.createNotaryNode(networkMapAddr = node1.network.myAddress, overrideServices = overrideServices, serviceName = notaryService.name)
|
||||
|
||||
mockNet.messagingNetwork.receivedMessages.toSessionTransfers().forEach { sessionTransfers += it }
|
||||
mockNet.runNetwork()
|
||||
@ -148,7 +148,7 @@ class FlowFrameworkTests {
|
||||
|
||||
@Test
|
||||
fun `flow added before network map does run after init`() {
|
||||
val node3 = mockNet.createNode(node1.info.address) //create vanilla node
|
||||
val node3 = mockNet.createNode(node1.network.myAddress) //create vanilla node
|
||||
val flow = NoOpFlow()
|
||||
node3.services.startFlow(flow)
|
||||
assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet
|
||||
@ -158,14 +158,14 @@ class FlowFrameworkTests {
|
||||
|
||||
@Test
|
||||
fun `flow added before network map will be init checkpointed`() {
|
||||
var node3 = mockNet.createNode(node1.info.address) //create vanilla node
|
||||
var node3 = mockNet.createNode(node1.network.myAddress) //create vanilla node
|
||||
val flow = NoOpFlow()
|
||||
node3.services.startFlow(flow)
|
||||
assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet
|
||||
node3.disableDBCloseOnStop()
|
||||
node3.stop()
|
||||
|
||||
node3 = mockNet.createNode(node1.info.address, forcedID = node3.id)
|
||||
node3 = mockNet.createNode(node1.network.myAddress, forcedID = node3.id)
|
||||
val restoredFlow = node3.getSingleFlow<NoOpFlow>().first
|
||||
assertEquals(false, restoredFlow.flowStarted) // Not started yet as no network activity has been allowed yet
|
||||
mockNet.runNetwork() // Allow network map messages to flow
|
||||
@ -175,7 +175,7 @@ class FlowFrameworkTests {
|
||||
node3.stop()
|
||||
|
||||
// Now it is completed the flow should leave no Checkpoint.
|
||||
node3 = mockNet.createNode(node1.info.address, forcedID = node3.id)
|
||||
node3 = mockNet.createNode(node1.network.myAddress, forcedID = node3.id)
|
||||
mockNet.runNetwork() // Allow network map messages to flow
|
||||
node3.smm.executor.flush()
|
||||
assertTrue(node3.smm.findStateMachines(NoOpFlow::class.java).isEmpty())
|
||||
@ -201,7 +201,7 @@ class FlowFrameworkTests {
|
||||
var sentCount = 0
|
||||
mockNet.messagingNetwork.sentMessages.toSessionTransfers().filter { it.isPayloadTransfer }.forEach { sentCount++ }
|
||||
|
||||
val node3 = mockNet.createNode(node1.info.address)
|
||||
val node3 = mockNet.createNode(node1.network.myAddress)
|
||||
val secondFlow = node3.registerFlowFactory(PingPongFlow::class) { PingPongFlow(it, payload2) }
|
||||
mockNet.runNetwork()
|
||||
|
||||
@ -218,7 +218,7 @@ class FlowFrameworkTests {
|
||||
node2.database.transaction {
|
||||
assertEquals(1, node2.checkpointStorage.checkpoints().size) // confirm checkpoint
|
||||
}
|
||||
val node2b = mockNet.createNode(node1.info.address, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray())
|
||||
val node2b = mockNet.createNode(node1.network.myAddress, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray())
|
||||
node2.manuallyCloseDB()
|
||||
val (firstAgain, fut1) = node2b.getSingleFlow<PingPongFlow>()
|
||||
// Run the network which will also fire up the second flow. First message should get deduped. So message data stays in sync.
|
||||
@ -245,7 +245,7 @@ class FlowFrameworkTests {
|
||||
|
||||
@Test
|
||||
fun `sending to multiple parties`() {
|
||||
val node3 = mockNet.createNode(node1.info.address)
|
||||
val node3 = mockNet.createNode(node1.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
node2.registerFlowFactory(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
|
||||
node3.registerFlowFactory(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
|
||||
@ -277,7 +277,7 @@ class FlowFrameworkTests {
|
||||
|
||||
@Test
|
||||
fun `receiving from multiple parties`() {
|
||||
val node3 = mockNet.createNode(node1.info.address)
|
||||
val node3 = mockNet.createNode(node1.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
val node2Payload = "Test 1"
|
||||
val node3Payload = "Test 2"
|
||||
@ -457,7 +457,7 @@ class FlowFrameworkTests {
|
||||
|
||||
@Test
|
||||
fun `FlowException propagated in invocation chain`() {
|
||||
val node3 = mockNet.createNode(node1.info.address)
|
||||
val node3 = mockNet.createNode(node1.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
|
||||
node3.registerFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Chain") } }
|
||||
@ -471,7 +471,7 @@ class FlowFrameworkTests {
|
||||
|
||||
@Test
|
||||
fun `FlowException thrown and there is a 3rd unrelated party flow`() {
|
||||
val node3 = mockNet.createNode(node1.info.address)
|
||||
val node3 = mockNet.createNode(node1.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
|
||||
// Node 2 will send its payload and then block waiting for the receive from node 1. Meanwhile node 1 will move
|
||||
@ -675,7 +675,7 @@ class FlowFrameworkTests {
|
||||
private inline fun <reified P : FlowLogic<*>> MockNode.restartAndGetRestoredFlow(networkMapNode: MockNode? = null): P {
|
||||
disableDBCloseOnStop() // Handover DB to new node copy
|
||||
stop()
|
||||
val newNode = mockNet.createNode(networkMapNode?.info?.address, id, advertisedServices = *advertisedServices.toTypedArray())
|
||||
val newNode = mockNet.createNode(networkMapNode?.network?.myAddress, id, advertisedServices = *advertisedServices.toTypedArray())
|
||||
newNode.acceptableLiveFiberCountOnStop = 1
|
||||
manuallyCloseDB()
|
||||
mockNet.runNetwork() // allow NetworkMapService messages to stabilise and thus start the state machine
|
||||
|
@ -35,7 +35,7 @@ class NotaryServiceTests {
|
||||
notaryNode = mockNet.createNode(
|
||||
legalName = DUMMY_NOTARY.name,
|
||||
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(SimpleNotaryService.type)))
|
||||
clientNode = mockNet.createNode(networkMapAddress = notaryNode.info.address)
|
||||
clientNode = mockNet.createNode(networkMapAddress = notaryNode.network.myAddress)
|
||||
mockNet.runNetwork() // Clear network map registration messages
|
||||
}
|
||||
|
||||
|
@ -33,7 +33,7 @@ class ValidatingNotaryServiceTests {
|
||||
legalName = DUMMY_NOTARY.name,
|
||||
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type))
|
||||
)
|
||||
clientNode = mockNet.createNode(networkMapAddress = notaryNode.info.address)
|
||||
clientNode = mockNet.createNode(networkMapAddress = notaryNode.network.myAddress)
|
||||
mockNet.runNetwork() // Clear network map registration messages
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user