Remove for good network map service node (#1942)

* [CORDA-446] Kill network map registration and fix NodeBasedTest
This commit is contained in:
Alberto Arri 2017-10-30 11:45:52 +00:00 committed by GitHub
parent 05d6fb91c7
commit 9176fcb8e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 81 additions and 559 deletions

View File

@ -118,13 +118,6 @@ path to the node's base directory.
Only one of ``raft``, ``bftSMaRt`` or ``custom`` configuration values may be specified.
:networkMapService: If `null`, or missing the node is declaring itself as the NetworkMapService host. Otherwise this is
a config object with the details of the network map service:
:address: Host and port string of the ArtemisMQ broker hosting the network map node
:legalName: Legal name of the node. This is required as part of the TLS host verification process. The node will
reject the connection to the network map service if it provides a TLS common name which doesn't match with this value.
:minimumPlatformVersion: Used by the node if it's running the network map service to enforce a minimum version requirement
on registrations - any node on a Platform Version lower than this value will have their registration rejected.
Defaults to 1 if absent.
@ -163,4 +156,4 @@ path to the node's base directory.
:jarDirs: An optional list of file system directories containing JARs to include in the classpath when launching via ``corda.jar`` only.
Each should be a string. Only the JARs in the directories are added, not the directories themselves. This is useful
for including JDBC drivers and the like. e.g. ``jarDirs = [ 'lib' ]``
for including JDBC drivers and the like. e.g. ``jarDirs = [ 'lib' ]``

View File

@ -1,7 +1,5 @@
package net.corda.nodeapi
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.messaging.MessageRecipientGroup
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
@ -42,11 +40,6 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
val hostAndPort: NetworkHostAndPort
}
@CordaSerializable
data class NetworkMapAddress(override val hostAndPort: NetworkHostAndPort) : ArtemisPeerAddress {
override val queueName: String get() = NETWORK_MAP_QUEUE
}
/**
* This is the class used to implement [SingleMessageRecipient], for now. Note that in future this class
* may change or evolve and code that relies upon it being a simple host/port may not function correctly.
@ -60,11 +53,8 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
*/
@CordaSerializable
data class NodeAddress(override val queueName: String, override val hostAndPort: NetworkHostAndPort) : ArtemisPeerAddress {
companion object {
fun asSingleNode(peerIdentity: PublicKey, hostAndPort: NetworkHostAndPort): NodeAddress {
return NodeAddress("$PEERS_PREFIX${peerIdentity.toBase58String()}", hostAndPort)
}
}
constructor(peerIdentity: PublicKey, hostAndPort: NetworkHostAndPort) :
this("$PEERS_PREFIX${peerIdentity.toBase58String()}", hostAndPort)
}
/**
@ -82,13 +72,4 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
/** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */
abstract val config: SSLConfiguration?
// Used for bridges creation.
fun getArtemisPeerAddress(party: Party, address: NetworkHostAndPort, netMapName: CordaX500Name? = null): ArtemisPeerAddress {
return if (party.name == netMapName) {
NetworkMapAddress(address)
} else {
NodeAddress.asSingleNode(party.owningKey, address) // It also takes care of services nodes treated as peer nodes
}
}
}

View File

@ -16,22 +16,13 @@ import net.corda.testing.node.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Before
import org.junit.Test
import java.time.Duration
import kotlin.test.assertEquals
import kotlin.test.assertFails
import kotlin.test.assertTrue
private const val BRIDGE_RETRY_MS: Long = 100
class PersistentNetworkMapCacheTest : NodeBasedTest() {
private val partiesList = listOf(DUMMY_NOTARY, ALICE, BOB)
private val addressesMap: HashMap<CordaX500Name, NetworkHostAndPort> = HashMap()
private val infos: MutableSet<NodeInfo> = HashSet()
companion object {
val logger = loggerFor<PersistentNetworkMapCacheTest>()
}
@Before
fun start() {
val nodes = startNodesWithPort(partiesList)
@ -45,7 +36,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
@Test
fun `get nodes by owning key and by name, no network map service`() {
val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0]
val alice = startNodesWithPort(listOf(ALICE))[0]
val netCache = alice.services.networkMapCache
alice.database.transaction {
val res = netCache.getNodeByLegalIdentity(alice.info.chooseIdentity())
@ -57,7 +48,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
@Test
fun `get nodes by address no network map service`() {
val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0]
val alice = startNodesWithPort(listOf(ALICE))[0]
val netCache = alice.services.networkMapCache
alice.database.transaction {
val res = netCache.getNodeByAddress(alice.info.addresses[0])
@ -67,9 +58,8 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
@Test
fun `restart node with DB map cache and no network map`() {
val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0]
val alice = startNodesWithPort(listOf(ALICE))[0]
val partyNodes = alice.services.networkMapCache.allNodes
assertEquals(NullNetworkMapService, alice.inNodeNetworkMapService)
assertEquals(infos.size, partyNodes.size)
assertEquals(infos.flatMap { it.legalIdentities }.toSet(), partyNodes.flatMap { it.legalIdentities }.toSet())
}
@ -77,8 +67,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
@Test
fun `start 2 nodes without pointing at NetworkMapService and communicate with each other`() {
val parties = partiesList.subList(1, partiesList.size)
val nodes = startNodesWithPort(parties, noNetworkMap = true)
assertTrue(nodes.all { it.inNodeNetworkMapService == NullNetworkMapService })
val nodes = startNodesWithPort(parties)
nodes.forEach {
val partyNodes = it.services.networkMapCache.allNodes
assertEquals(infos.size, partyNodes.size)
@ -90,8 +79,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
@Test
fun `start 2 nodes pointing at NetworkMapService but don't start network map node`() {
val parties = partiesList.subList(1, partiesList.size)
val nodes = startNodesWithPort(parties, noNetworkMap = false)
assertTrue(nodes.all { it.inNodeNetworkMapService == NullNetworkMapService })
val nodes = startNodesWithPort(parties)
nodes.forEach {
val partyNodes = it.services.networkMapCache.allNodes
assertEquals(infos.size, partyNodes.size)
@ -103,79 +91,24 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
@Test
fun `start node and network map communicate`() {
val parties = partiesList.subList(0, 2)
val nodes = startNodesWithPort(parties, noNetworkMap = false)
val nodes = startNodesWithPort(parties)
checkConnectivity(nodes)
}
@Test
fun `start node without networkMapService and no database - fail`() {
assertFails { startNode(CHARLIE.name, noNetworkMap = true).getOrThrow(2.seconds) }
}
@Test
fun `new node joins network without network map started`() {
fun customNodesStart(parties: List<Party>): List<StartedNode<Node>> =
startNodesWithPort(parties, noNetworkMap = false, customRetryIntervalMs = BRIDGE_RETRY_MS)
val parties = partiesList.subList(1, partiesList.size)
// Start 2 nodes pointing at network map, but don't start network map service.
val otherNodes = customNodesStart(parties)
otherNodes.forEach { node ->
assertTrue(infos.any { it.legalIdentitiesAndCerts.toSet() == node.info.legalIdentitiesAndCerts.toSet() })
}
// Start node that is not in databases of other nodes. Point to NMS. Which has't started yet.
val charlie = customNodesStart(listOf(CHARLIE)).single()
otherNodes.forEach {
assertThat(it.services.networkMapCache.allNodes).doesNotContain(charlie.info)
}
// Start Network Map and see that charlie node appears in caches.
val nms = customNodesStart(listOf(DUMMY_NOTARY)).single()
nms.internals.startupComplete.get()
assertTrue(nms.inNodeNetworkMapService != NullNetworkMapService)
assertTrue(infos.any { it.legalIdentities.toSet() == nms.info.legalIdentities.toSet() })
otherNodes.forEach {
assertTrue(nms.info.chooseIdentity() in it.services.networkMapCache.allNodes.map { it.chooseIdentity() })
}
charlie.internals.nodeReadyFuture.get() // Finish registration.
val allTheStartedNodesPopulation = otherNodes.plus(charlie).plus(nms)
// This is prediction of the longest time it will take to get the cluster into a stable state such that further
// testing can be performed upon it
val maxInstabilityInterval = BRIDGE_RETRY_MS * allTheStartedNodesPopulation.size * 30
logger.info("Instability interval is set to: $maxInstabilityInterval ms")
// TODO: Re-visit this sort of re-try for stable cluster once network map redesign is finished.
eventually<AssertionError, Unit>(Duration.ofMillis(maxInstabilityInterval)) {
logger.info("Checking connectivity")
checkConnectivity(listOf(otherNodes[0], nms)) // Checks connectivity from A to NMS.
logger.info("Loading caches")
val cacheA = otherNodes[0].services.networkMapCache.allNodes
val cacheB = otherNodes[1].services.networkMapCache.allNodes
val cacheC = charlie.services.networkMapCache.allNodes
logger.info("Performing verification")
assertEquals(4, cacheC.size) // Charlie fetched data from NetworkMap
assertThat(cacheB).contains(charlie.info)
assertEquals(cacheA.toSet(), cacheB.toSet())
assertEquals(cacheA.toSet(), cacheC.toSet())
}
fun `start node without networkMapService and no database - success`() {
startNode(CHARLIE.name).getOrThrow(2.seconds)
}
// HELPERS
// Helper function to restart nodes with the same host and port.
private fun startNodesWithPort(nodesToStart: List<Party>, noNetworkMap: Boolean = false, customRetryIntervalMs: Long? = null): List<StartedNode<Node>> {
private fun startNodesWithPort(nodesToStart: List<Party>, customRetryIntervalMs: Long? = null): List<StartedNode<Node>> {
return nodesToStart.map { party ->
val configOverrides = (addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap()) +
(customRetryIntervalMs?.let { mapOf("activeMQServer.bridge.retryIntervalMs" to it.toString()) } ?: emptyMap())
if (party == DUMMY_NOTARY) {
startNetworkMapNode(party.name, configOverrides = configOverrides)
} else {
startNode(party.name,
configOverrides = configOverrides,
noNetworkMap = noNetworkMap,
waitForConnection = false).getOrThrow()
}
startNode(party.name,
configOverrides = configOverrides,
waitForConnection = false).getOrThrow()
}
}

View File

@ -45,7 +45,6 @@ class P2PMessagingTest : NodeBasedTest() {
startNodes().getOrThrow(timeout = startUpDuration * 3)
}
@Ignore
@Test
fun `communicating with a distributed service which we're part of`() {
val distributedService = startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2).getOrThrow()
@ -85,6 +84,7 @@ class P2PMessagingTest : NodeBasedTest() {
}
@Test
@Ignore("Fails on Team City due to issues with restaring nodes.")
fun `distributed service request retries are persisted across client node restarts`() {
val distributedServiceNodes = startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2).getOrThrow()
val alice = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
@ -110,6 +110,8 @@ class P2PMessagingTest : NodeBasedTest() {
// Wait until the first request is received
crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS)
// Stop alice's node after we ensured that the first request was delivered and ignored.
alice.services.networkMapCache.clearNetworkMapCache()
alice.dispose()
val numberOfRequestsReceived = crashingNodes.requestsReceived.get()
assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1)
@ -117,7 +119,7 @@ class P2PMessagingTest : NodeBasedTest() {
crashingNodes.ignoreRequests = false
// Restart the node and expect a response
val aliceRestarted = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
val aliceRestarted = startNode(ALICE.name, waitForConnection = true, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 5)).getOrThrow()
val response = aliceRestarted.network.onNext<Any>(dummyTopic, sessionId).getOrThrow(5.seconds)
assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived)

View File

@ -1,79 +0,0 @@
package net.corda.services.messaging
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name
import net.corda.core.node.NodeInfo
import net.corda.core.internal.cert
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.config.ActiveMqServerConfiguration
import net.corda.node.services.config.BridgeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.RegistrationRequest
import net.corda.node.services.network.NodeRegistration
import net.corda.node.utilities.AddOrRemove
import net.corda.testing.*
import net.corda.testing.node.NodeBasedTest
import net.corda.testing.node.SimpleNode
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import java.security.cert.X509Certificate
import java.time.Instant
import java.util.concurrent.TimeoutException
class P2PSecurityTest : NodeBasedTest() {
@Test
fun `incorrect legal name for the network map service config`() {
val incorrectNetworkMapName = CordaX500Name(organisation = "NetworkMap-${random63BitValue()}",
locality = "London", country = "GB")
val node = startNode(BOB.name, configOverrides = mapOf(
"networkMapService" to mapOf(
"address" to networkMapNode.internals.configuration.p2pAddress.toString(),
"legalName" to incorrectNetworkMapName.toString()
)
))
// The connection will be rejected as the legal name doesn't match
assertThatThrownBy { node.getOrThrow() }.hasMessageContaining(incorrectNetworkMapName.toString())
}
@Test
fun `register with the network map service using a legal name different from the TLS CN`() {
startSimpleNode(DUMMY_BANK_A.name, DEV_TRUST_ROOT.cert).use {
// Register with the network map using a different legal name
val response = it.registerWithNetworkMap(DUMMY_BANK_B.name)
// We don't expect a response because the network map's host verification will prevent a connection back
// to the attacker as the TLS CN will not match the legal name it has just provided
assertThatExceptionOfType(TimeoutException::class.java).isThrownBy {
response.getOrThrow(2.seconds)
}
}
}
private fun startSimpleNode(legalName: CordaX500Name,
trustRoot: X509Certificate): SimpleNode {
val config = testNodeConfiguration(
baseDirectory = baseDirectory(legalName),
myLegalName = legalName).also {
doReturn(NetworkMapInfo(networkMapNode.internals.configuration.p2pAddress, networkMapNode.info.chooseIdentity().name)).whenever(it).networkMapService
doReturn(ActiveMqServerConfiguration(BridgeConfiguration(1001, 2, 3.4))).whenever(it).activeMQServer
}
config.configureWithDevSSLCertificate() // This creates the node's TLS cert with the CN as the legal name
return SimpleNode(config, trustRoot = trustRoot).apply { start() }
}
private fun SimpleNode.registerWithNetworkMap(registrationName: CordaX500Name): CordaFuture<NetworkMapService.RegistrationResponse> {
val legalIdentity = getTestPartyAndCertificate(registrationName, identity.public)
val nodeInfo = NodeInfo(listOf(MOCK_HOST_AND_PORT), listOf(legalIdentity), 1, serial = 1)
val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX)
val request = RegistrationRequest(registration.toWire(keyService, identity.public), network.myAddress)
return network.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.network.myAddress)
}
}

View File

@ -12,8 +12,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.*
import net.corda.core.node.*
@ -87,7 +86,6 @@ import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
* Marked as SingletonSerializeAsToken to prevent the invisible reference to AbstractNode in the ServiceHub accidentally
* sweeping up the Node into the Kryo checkpoint serialization via any flows holding a reference to ServiceHub.
*/
// TODO: Where this node is the initial network map service, currently no networkMapService is provided.
// In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the
// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in.
abstract class AbstractNode(config: NodeConfiguration,
@ -108,7 +106,6 @@ abstract class AbstractNode(config: NodeConfiguration,
override val checkpointStorage: CheckpointStorage,
override val smm: StateMachineManager,
override val attachments: NodeAttachmentService,
override val inNodeNetworkMapService: NetworkMapService,
override val network: MessagingService,
override val database: CordaPersistence,
override val rpcOps: CordaRPCOps,
@ -116,14 +113,8 @@ abstract class AbstractNode(config: NodeConfiguration,
internal val schedulerService: NodeSchedulerService) : StartedNode<N> {
override val services: StartedNodeServices = object : StartedNodeServices, ServiceHubInternal by services, FlowStarter by flowStarter {}
}
// TODO: Persist this, as well as whether the node is registered.
/**
* Sequence number of changes sent to the network map service, when registering/de-registering this node.
*/
var networkMapSeq: Long = 1
protected abstract val log: Logger
protected abstract val networkMapAddress: SingleMessageRecipient?
// We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the
// low-performance prototyping period.
@ -143,7 +134,6 @@ abstract class AbstractNode(config: NodeConfiguration,
protected lateinit var smm: StateMachineManager
private lateinit var tokenizableServices: List<Any>
protected lateinit var attachments: NodeAttachmentService
protected lateinit var inNodeNetworkMapService: NetworkMapService
protected lateinit var network: MessagingService
protected val runOnStop = ArrayList<() -> Any?>()
protected lateinit var database: CordaPersistence
@ -229,10 +219,12 @@ abstract class AbstractNode(config: NodeConfiguration,
FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader
runOnStop += network::stop
StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, inNodeNetworkMapService, network, database, rpcOps, flowStarter, schedulerService)
StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, schedulerService)
}
// If we successfully loaded network data from database, we set this future to Unit.
_nodeReadyFuture.captureLater(registerWithNetworkMapIfConfigured())
services.networkMapCache.addNode(info)
_nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit })
return startedImpl.apply {
database.transaction {
smm.start(tokenizableServices)
@ -489,7 +481,7 @@ abstract class AbstractNode(config: NodeConfiguration,
services.auditService, services.monitoringService, networkMapCache, services.schemaService,
services.transactionVerifierService, services.validatedTransactions, services.contractUpgradeService,
services, cordappProvider, this)
makeNetworkServices(network, networkMapCache, tokenizableServices)
makeNetworkServices(tokenizableServices)
return tokenizableServices
}
@ -550,16 +542,7 @@ abstract class AbstractNode(config: NodeConfiguration,
}
}
private fun setupInNodeNetworkMapService(networkMapCache: NetworkMapCacheInternal) {
inNodeNetworkMapService =
if (configuration.networkMapService == null && !configuration.noNetworkMapServiceMode)
makeNetworkMapService(network, networkMapCache)
else
NullNetworkMapService
}
private fun makeNetworkServices(network: MessagingService, networkMapCache: NetworkMapCacheInternal, tokenizableServices: MutableList<Any>) {
setupInNodeNetworkMapService(networkMapCache)
private fun makeNetworkServices(tokenizableServices: MutableList<Any>) {
configuration.notary?.let {
val notaryService = makeCoreNotaryService(it)
tokenizableServices.add(notaryService)
@ -570,40 +553,6 @@ abstract class AbstractNode(config: NodeConfiguration,
}
}
private fun registerWithNetworkMapIfConfigured(): CordaFuture<Unit> {
services.networkMapCache.addNode(info)
// In the unit test environment, we may sometimes run without any network map service
return if (networkMapAddress == null && inNodeNetworkMapService == NullNetworkMapService) {
services.networkMapCache.runWithoutMapService()
noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache
} else {
val netMapRegistration = registerWithNetworkMap()
// We may want to start node immediately with database data and not wait for network map registration (but send it either way).
// So we are ready to go.
if (services.networkMapCache.loadDBSuccess) {
log.info("Node successfully loaded network map data from the database.")
doneFuture(Unit)
} else {
netMapRegistration
}
}
}
/**
* Register this node with the network map cache, and load network map from a remote service (and register for
* updates) if one has been supplied.
*/
protected open fun registerWithNetworkMap(): CordaFuture<Unit> {
val address: SingleMessageRecipient = networkMapAddress ?:
network.getAddressOfParty(PartyInfo.SingleNode(services.myInfo.legalIdentitiesAndCerts.first().party, info.addresses)) as SingleMessageRecipient
// Register for updates, even if we're the one running the network map.
return sendNetworkMapRegistration(address).flatMap { (error) ->
check(error == null) { "Unable to register with the network map service: $error" }
// The future returned addMapService will complete on the same executor as sendNetworkMapRegistration, namely the one used by net
services.networkMapCache.addMapService(network, address, true, null)
}
}
private fun sendNetworkMapRegistration(networkMapAddress: SingleMessageRecipient): CordaFuture<RegistrationResponse> {
// Register this node against the network
val instant = platformClock.instant()
@ -616,14 +565,10 @@ abstract class AbstractNode(config: NodeConfiguration,
/** Return list of node's addresses. It's overridden in MockNetwork as we don't have real addresses for MockNodes. */
protected abstract fun myAddresses(): List<NetworkHostAndPort>
/** This is overriden by the mock node implementation to enable operation without any network map service */
protected open fun noNetworkMapConfigured(): CordaFuture<Unit> {
if (services.networkMapCache.loadDBSuccess || configuration.noNetworkMapServiceMode) {
return doneFuture(Unit)
} else {
open protected fun checkNetworkMapIsInitialized() {
if (!services.networkMapCache.loadDBSuccess) {
// TODO: There should be a consistent approach to configuration error exceptions.
throw IllegalStateException("Configuration error: this node isn't being asked to act as the network map, nor " +
"has any other map node been configured.")
throw NetworkMapCacheEmptyException()
}
}
@ -815,3 +760,8 @@ internal class FlowStarterImpl(private val serverThread: AffinityExecutor, priva
return serverThread.fetchFrom { smm.startFlow(logic, flowInitiator, ourIdentity) }
}
}
/**
* Thrown when a node is about to start and its network map cache doesn't contain any node.
*/
internal class NetworkMapCacheEmptyException: Exception()

View File

@ -5,8 +5,6 @@ import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.internal.uncheckedCast
@ -36,7 +34,6 @@ import net.corda.node.utilities.TestClock
import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.IP_REQUEST_PREFIX
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.ArtemisMessagingComponent.NetworkMapAddress
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.internal.ShutdownHook
@ -98,7 +95,6 @@ open class Node(override val configuration: NodeConfiguration,
}
override val log: Logger get() = logger
override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress)
override fun makeTransactionVerifierService() = (network as NodeMessagingClient).verifierService
private val sameVmNodeNumber = sameVmNodeCounter.incrementAndGet() // Under normal (non-test execution) it will always be "1"
@ -156,12 +152,11 @@ open class Node(override val configuration: NodeConfiguration,
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())
val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) legalIdentity.owningKey else null
return NodeMessagingClient(
configuration,
versionInfo,
serverAddress,
myIdentityOrNullIfNetworkMapService,
legalIdentity.owningKey,
serverThread,
database,
nodeReadyFuture,
@ -189,16 +184,14 @@ open class Node(override val configuration: NodeConfiguration,
/**
* Checks whether the specified [host] is a public IP address or hostname. If not, tries to discover the current
* machine's public IP address to be used instead. It first looks through the network interfaces, and if no public IP
* is found, asks the network map service to provide it.
* machine's public IP address to be used instead by looking through the network interfaces.
* TODO this code used to rely on the networkmap node, we might want to look at a different solution.
*/
private fun tryDetectIfNotPublicHost(host: String): String? {
if (!AddressUtils.isPublic(host)) {
val foundPublicIP = AddressUtils.tryDetectPublicIP()
if (foundPublicIP == null) {
networkMapAddress?.let { return discoverPublicHost(it.hostAndPort) }
} else {
if (foundPublicIP != null) {
log.info("Detected public IP: ${foundPublicIP.hostAddress}. This will be used instead of the provided \"$host\" as the advertised address.")
return foundPublicIP.hostAddress
}
@ -264,15 +257,6 @@ open class Node(override val configuration: NodeConfiguration,
(network as NodeMessagingClient).start(rpcOps, userService)
}
/**
* Insert an initial step in the registration process which will throw an exception if a non-recoverable error is
* encountered when trying to connect to the network map node.
*/
override fun registerWithNetworkMap(): CordaFuture<Unit> {
val networkMapConnection = messageBroker?.networkMapConnectionFuture ?: doneFuture(Unit)
return networkMapConnection.flatMap { super.registerWithNetworkMap() }
}
override fun myAddresses(): List<NetworkHostAndPort> {
val address = network.myAddress as ArtemisMessagingComponent.ArtemisPeerAddress
return listOf(address.hostAndPort)

View File

@ -7,13 +7,10 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.StateLoader
import net.corda.core.node.services.CordaService
import net.corda.core.node.services.TransactionStorage
import net.corda.core.serialization.SerializeAsToken
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.CordaPersistence
@ -25,7 +22,6 @@ interface StartedNode<out N : AbstractNode> {
val checkpointStorage: CheckpointStorage
val smm: StateMachineManager
val attachments: NodeAttachmentService
val inNodeNetworkMapService: NetworkMapService
val network: MessagingService
val database: CordaPersistence
val rpcOps: CordaRPCOps

View File

@ -11,7 +11,6 @@ import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
@ -33,35 +32,12 @@ import net.corda.node.utilities.CordaPersistence
interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBaseInternal
interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {
/**
* Deregister from updates from the given map service.
* @param network the network messaging service.
* @param mapParty the network map service party to fetch current state from.
*/
fun deregisterForUpdates(network: MessagingService, mapParty: Party): CordaFuture<Unit>
/**
* Add a network map service; fetches a copy of the latest map from the service and subscribes to any further
* updates.
* @param network the network messaging service.
* @param networkMapAddress the network map service to fetch current state from.
* @param subscribe if the cache should subscribe to updates.
* @param ifChangedSinceVer an optional version number to limit updating the map based on. If the latest map
* version is less than or equal to the given version, no update is fetched.
*/
fun addMapService(network: MessagingService, networkMapAddress: SingleMessageRecipient,
subscribe: Boolean, ifChangedSinceVer: Int? = null): CordaFuture<Unit>
/** Adds a node to the local cache (generally only used for adding ourselves). */
fun addNode(node: NodeInfo)
/** Removes a node from the local cache. */
fun removeNode(node: NodeInfo)
/** For testing where the network map cache is manipulated marks the service as immediately ready. */
@VisibleForTesting
fun runWithoutMapService()
/** Indicates if loading network map data from database was successful. */
val loadDBSuccess: Boolean
}

View File

@ -4,7 +4,6 @@ import com.typesafe.config.Config
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.messaging.CertificateChainCheckPolicy
import net.corda.nodeapi.User
import net.corda.nodeapi.config.NodeSSLConfiguration
@ -19,12 +18,6 @@ interface NodeConfiguration : NodeSSLConfiguration {
// myLegalName should be only used in the initial network registration, we should use the name from the certificate instead of this.
// TODO: Remove this so we don't accidentally use this identity in the code?
val myLegalName: CordaX500Name
/**
* If null then configure the node to run as the netwok map service, otherwise use this to connect to the network map
* service.
*/
val networkMapService: NetworkMapInfo?
val noNetworkMapServiceMode: Boolean
val minimumPlatformVersion: Int
val emailAddress: String
val exportJMXto: String
@ -92,8 +85,6 @@ data class NodeConfigurationImpl(
override val dataSourceProperties: Properties,
override val database: Properties?,
override val certificateSigningService: URL,
override val networkMapService: NetworkMapInfo?,
override val noNetworkMapServiceMode: Boolean = false,
override val minimumPlatformVersion: Int = 1,
override val rpcUsers: List<User>,
override val verifierType: VerifierType,

View File

@ -1,21 +1,20 @@
package net.corda.node.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import io.netty.handler.ssl.SslHandler
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.AddressFormatException
import net.corda.core.crypto.newSecureRandom
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.div
import net.corda.core.internal.noneOrSingle
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.utilities.*
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.parsePublicKeyBase58
import net.corda.node.internal.Node
import net.corda.node.services.RPCUserService
import net.corda.node.services.config.NodeConfiguration
@ -37,12 +36,10 @@ import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
import org.apache.activemq.artemis.core.message.impl.CoreMessage
import org.apache.activemq.artemis.core.remoting.impl.netty.*
import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import org.apache.activemq.artemis.spi.core.remoting.*
@ -111,14 +108,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private val mutex = ThreadBox(InnerState())
private lateinit var activeMQServer: ActiveMQServer
val serverControl: ActiveMQServerControl get() = activeMQServer.activeMQServerControl
private val _networkMapConnectionFuture = config.networkMapService?.let { openFuture<Unit>() }
/**
* A [ListenableFuture] which completes when the server successfully connects to the network map node. If a
* non-recoverable error is encountered then the Future will complete with an exception.
*/
val networkMapConnectionFuture: CordaFuture<Unit>? get() = _networkMapConnectionFuture
private var networkChangeHandle: Subscription? = null
private val nodeRunsNetworkMapService = config.networkMapService == null
init {
config.baseDirectory.requireOnDefaultFileSystem()
@ -132,8 +122,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
fun start() = mutex.locked {
if (!running) {
configureAndStartServer()
// Deploy bridge to the network map service
config.networkMapService?.let { deployBridge(NetworkMapAddress(it.address), setOf(it.legalName)) }
networkChangeHandle = networkMapCache.changed.subscribe { updateBridgesOnNetworkChange(it) }
running = true
}
@ -158,7 +146,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
// Some types of queue might need special preparation on our side, like dialling back or preparing
// a lazily initialised subsystem.
registerPostQueueCreationCallback { deployBridgesFromNewQueue(it.toString()) }
if (nodeRunsNetworkMapService) registerPostQueueCreationCallback { handleIpDetectionRequest(it.toString()) }
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
}
activeMQServer.start()
@ -247,12 +234,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true))
// TODO remove the NODE_USER role once the webserver doesn't need it
securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$NODE_USER.#"] = setOf(nodeInternalRole)
if (nodeRunsNetworkMapService) {
securityRoles["$IP_REQUEST_PREFIX*"] = setOf(
nodeInternalRole,
restrictedRole(PEER_ROLE, consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true)
)
}
for ((username) in userService.users) {
securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.#"] = setOf(
nodeInternalRole,
@ -330,7 +311,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
log.debug { "Updating bridges on network map change: ${change.node}" }
fun gatherAddresses(node: NodeInfo): Sequence<ArtemisPeerAddress> {
val address = node.addresses.first()
return node.legalIdentitiesAndCerts.map { getArtemisPeerAddress(it.party, address, config.networkMapService?.legalName) }.asSequence()
return node.legalIdentitiesAndCerts.map { NodeAddress(it.party.owningKey, address) }.asSequence()
}
fun deployBridges(node: NodeInfo) {
@ -409,47 +390,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private val ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort)
private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
// This is called on one of Artemis' background threads
internal fun hostVerificationFail(expectedLegalNames: Set<CordaX500Name>, errorMsg: String?) {
log.error(errorMsg)
if (config.networkMapService?.legalName in expectedLegalNames) {
// If the peer that failed host verification was the network map node then we're in big trouble and need to bail!
_networkMapConnectionFuture!!.setException(IOException("${config.networkMapService} failed host verification check"))
}
}
// This is called on one of Artemis' background threads
internal fun onTcpConnection(peerLegalName: CordaX500Name) {
if (peerLegalName == config.networkMapService?.legalName) {
_networkMapConnectionFuture!!.set(Unit)
}
}
private fun handleIpDetectionRequest(queueName: String) {
fun getRemoteAddress(requestId: String): String? {
val session = activeMQServer.sessions.first {
it.getMetaData(ipDetectRequestProperty) == requestId
}
return session.remotingConnection.remoteAddress
}
fun sendResponse(remoteAddress: String?) {
val responseMessage = CoreMessage(random63BitValue(), 0).apply {
putStringProperty(ipDetectResponseProperty, remoteAddress)
}
val routingContext = RoutingContextImpl(null)
val queue = activeMQServer.locateQueue(SimpleString(queueName))
queue.route(responseMessage, routingContext)
activeMQServer.postOffice.processRoute(responseMessage, routingContext, true)
}
if (!queueName.startsWith(IP_REQUEST_PREFIX)) return
val requestId = queueName.substringAfter(IP_REQUEST_PREFIX)
val remoteAddress = getRemoteAddress(requestId)
log.debug { "Detected remote address $remoteAddress for request $requestId" }
sendResponse(remoteAddress)
}
}
class VerifyingNettyConnectorFactory : NettyConnectorFactory() {
@ -473,7 +413,10 @@ private class VerifyingNettyConnector(configuration: MutableMap<String, Any>,
scheduledThreadPool: ScheduledExecutorService?,
protocolManager: ClientProtocolManager?) :
NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) {
private val server = configuration[ArtemisMessagingServer::class.java.name] as ArtemisMessagingServer
companion object {
private val log = loggerFor<VerifyingNettyConnector>()
}
private val sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration)
override fun createConnection(): Connection? {
@ -504,10 +447,9 @@ private class VerifyingNettyConnector(configuration: MutableMap<String, Any>,
"misconfiguration by the remote peer or an SSL man-in-the-middle attack!"
}
X509Utilities.validateCertificateChain(session.localCertificates.last() as java.security.cert.X509Certificate, *session.peerCertificates)
server.onTcpConnection(peerLegalName)
} catch (e: IllegalArgumentException) {
connection.close()
server.hostVerificationFail(expectedLegalNames, e.message)
log.error(e.message)
return null
}
}

View File

@ -48,7 +48,6 @@ import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Lob
import javax.security.auth.x500.X500Principal
// TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox
@ -76,7 +75,7 @@ import javax.security.auth.x500.X500Principal
class NodeMessagingClient(override val config: NodeConfiguration,
private val versionInfo: VersionInfo,
private val serverAddress: NetworkHostAndPort,
private val myIdentity: PublicKey?,
private val myIdentity: PublicKey,
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
val database: CordaPersistence,
private val networkMapRegistrationFuture: CordaFuture<Unit>,
@ -172,14 +171,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
/** An executor for sending messages */
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1)
/**
* Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache.
*/
override val myAddress: SingleMessageRecipient = if (myIdentity != null) {
NodeAddress.asSingleNode(myIdentity, advertisedAddress)
} else {
NetworkMapAddress(advertisedAddress)
}
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
private val state = ThreadBox(InnerState())
private val handlers = CopyOnWriteArrayList<Handler>()
@ -634,9 +626,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// TODO Rethink PartyInfo idea and merging PeerAddress/ServiceAddress (the only difference is that Service address doesn't hold host and port)
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
return when (partyInfo) {
is PartyInfo.SingleNode -> {
getArtemisPeerAddress(partyInfo.party, partyInfo.addresses.first(), config.networkMapService?.legalName)
}
is PartyInfo.SingleNode -> NodeAddress(partyInfo.party.owningKey, partyInfo.addresses.first())
is PartyInfo.DistributedNode -> ServiceAddress(partyInfo.party.owningKey)
}
}

View File

@ -1,49 +1,33 @@
package net.corda.node.services.network
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.NotaryService
import net.corda.core.node.services.PartyInfo
import net.corda.core.schemas.NodeInfoSchemaV1
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.toBase58String
import net.corda.node.services.api.NetworkCacheException
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.createMessage
import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.network.NetworkMapService.FetchMapResponse
import net.corda.node.services.network.NetworkMapService.SubscribeResponse
import net.corda.node.utilities.*
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.wrapWithDatabaseTransaction
import org.hibernate.Session
import rx.Observable
import rx.subjects.PublishSubject
import java.security.PublicKey
import java.security.SignatureException
import java.util.*
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.HashMap
@ -78,7 +62,6 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence, con
val logger = loggerFor<PersistentNetworkMapCache>()
}
private var registeredForPush = false
// TODO Small explanation, partyNodes and registeredNodes is left in memory as it was before, because it will be removed in
// next PR that gets rid of services. These maps are used only for queries by service.
protected val registeredNodes: MutableMap<PublicKey, NodeInfo> = Collections.synchronizedMap(HashMap())
@ -88,6 +71,8 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence, con
override val changed: Observable<MapChange> = _changed.wrapWithDatabaseTransaction()
private val changePublisher: rx.Observer<MapChange> get() = _changed.bufferUntilDatabaseCommit()
// TODO revisit the logic under which nodeReady and loadDBSuccess are set.
// with the NetworkMapService redesign their meaning is not too well defined.
private val _registrationFuture = openFuture<Void?>()
override val nodeReady: CordaFuture<Void?> get() = _registrationFuture
private var _loadDBSuccess: Boolean = false
@ -152,38 +137,6 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence, con
}
}
override fun addMapService(network: MessagingService, networkMapAddress: SingleMessageRecipient, subscribe: Boolean,
ifChangedSinceVer: Int?): CordaFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
network.addMessageHandler(NetworkMapService.PUSH_TOPIC) { message, _ ->
try {
val req = message.data.deserialize<NetworkMapService.Update>()
val ackMessage = network.createMessage(NetworkMapService.PUSH_ACK_TOPIC,
data = NetworkMapService.UpdateAcknowledge(req.mapVersion, network.myAddress).serialize().bytes)
network.send(ackMessage, req.replyTo)
processUpdatePush(req)
} catch (e: NodeMapException) {
logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}")
} catch (e: Exception) {
logger.error("Exception processing update from network map service", e)
}
}
registeredForPush = true
}
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, network.myAddress)
val future = network.sendRequest<FetchMapResponse>(NetworkMapService.FETCH_TOPIC, req, networkMapAddress).map { (nodes) ->
// We may not receive any nodes back, if the map hasn't changed since the version specified
nodes?.forEach { processRegistration(it) }
Unit
}
_registrationFuture.captureLater(future.map { null })
return future
}
override fun addNode(node: NodeInfo) {
logger.info("Adding node with info: $node")
synchronized(_changed) {
@ -210,6 +163,8 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence, con
logger.info("Previous node was identical to incoming one - doing nothing")
}
}
_loadDBSuccess = true // This is used in AbstractNode to indicate that node is ready.
_registrationFuture.set(null)
logger.info("Done adding node with info: $node")
}
@ -225,49 +180,11 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence, con
logger.info("Done removing node with info: $node")
}
/**
* Unsubscribes from updates from the given map service.
* @param mapParty the network map service party to listen to updates from.
*/
override fun deregisterForUpdates(network: MessagingService, mapParty: Party): CordaFuture<Unit> {
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.SubscribeRequest(false, network.myAddress)
// `network.getAddressOfParty(partyInfo)` is a work-around for MockNetwork and InMemoryMessaging to get rid of SingleMessageRecipient in NodeInfo.
val address = getPartyInfo(mapParty)?.let { network.getAddressOfParty(it) } ?:
throw IllegalArgumentException("Can't deregister for updates, don't know the party: $mapParty")
val future = network.sendRequest<SubscribeResponse>(NetworkMapService.SUBSCRIPTION_TOPIC, req, address).map {
if (it.confirmed) Unit else throw NetworkCacheException.DeregistrationFailed()
}
_registrationFuture.captureLater(future.map { null })
return future
}
fun processUpdatePush(req: NetworkMapService.Update) {
try {
val reg = req.wireReg.verified()
processRegistration(reg)
} catch (e: SignatureException) {
throw NodeMapException.InvalidSignature()
}
}
override val allNodes: List<NodeInfo>
get() = database.transaction {
getAllInfos(session).map { it.toNodeInfo() }
}
private fun processRegistration(reg: NodeRegistration) {
when (reg.type) {
AddOrRemove.ADD -> addNode(reg.node)
AddOrRemove.REMOVE -> removeNode(reg.node)
}
}
@VisibleForTesting
override fun runWithoutMapService() {
_registrationFuture.set(null)
}
// Changes related to NetworkMap redesign
// TODO It will be properly merged into network map cache after services removal.
@ -288,14 +205,10 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence, con
logger.info("Loaded node info: $nodeInfo")
val node = nodeInfo.toNodeInfo()
addNode(node)
_loadDBSuccess = true // This is used in AbstractNode to indicate that node is ready.
} catch (e: Exception) {
logger.warn("Exception parsing network map from the database.", e)
}
}
if (loadDBSuccess) {
_registrationFuture.set(null) // Useful only if we don't have NetworkMapService configured so StateMachineManager can start.
}
}
private fun updateInfoDB(nodeInfo: NodeInfo) {

View File

@ -36,7 +36,6 @@ class NodeConfigurationImplTest {
private val testConfiguration = NodeConfigurationImpl(
baseDirectory = Paths.get("."),
myLegalName = ALICE.name,
networkMapService = null,
emailAddress = "",
keyStorePassword = "cordacadevpass",
trustStorePassword = "trustpass",

View File

@ -59,7 +59,6 @@ fun testNodeConfiguration(
myLegalName: CordaX500Name): NodeConfiguration {
abstract class MockableNodeConfiguration : NodeConfiguration // Otherwise Mockito is defeated by val getters.
return rigorousMock<MockableNodeConfiguration>().also {
doReturn(true).whenever(it).noNetworkMapServiceMode
doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(myLegalName).whenever(it).myLegalName
doReturn(1).whenever(it).minimumPlatformVersion
@ -77,7 +76,6 @@ fun testNodeConfiguration(
doReturn(VerifierType.InMemory).whenever(it).verifierType
doReturn(5).whenever(it).messageRedeliveryDelaySeconds
doReturn(0L).whenever(it).additionalNodeInfoPollingFrequencyMsec
doReturn(null).whenever(it).networkMapService
doReturn(null).whenever(it).devModeOptions
doCallRealMethod().whenever(it).certificatesDirectory
doCallRealMethod().whenever(it).trustStoreFile

View File

@ -1,9 +1,9 @@
package net.corda.testing.node
import co.paralleluniverse.common.util.VisibleForTesting
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.entropyToKeyPair
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.utilities.NetworkHostAndPort
@ -27,32 +27,12 @@ class MockNetworkMapCache(database: CordaPersistence, configuration: NodeConfigu
}
override val changed: Observable<NetworkMapCache.MapChange> = PublishSubject.create<NetworkMapCache.MapChange>()
override val nodeReady: CordaFuture<Void?> get() = doneFuture(null)
init {
val mockNodeA = NodeInfo(listOf(BANK_C_ADDR), listOf(BANK_C), 1, serial = 1L)
val mockNodeB = NodeInfo(listOf(BANK_D_ADDR), listOf(BANK_D), 1, serial = 1L)
partyNodes.add(mockNodeA)
partyNodes.add(mockNodeB)
runWithoutMapService()
}
/**
* Directly add a registration to the internal cache. DOES NOT fire the change listeners, as it's
* not a change being received.
*/
@VisibleForTesting
fun addRegistration(node: NodeInfo) {
val previousIndex = partyNodes.indexOfFirst { it.legalIdentitiesAndCerts == node.legalIdentitiesAndCerts }
if (previousIndex != -1) partyNodes[previousIndex] = node
else partyNodes.add(node)
}
/**
* Directly remove a registration from the internal cache. DOES NOT fire the change listeners, as it's
* not a change being received.
*/
@VisibleForTesting
fun deleteRegistration(legalIdentity: Party): Boolean {
return partyNodes.removeIf { legalIdentity.owningKey in it.legalIdentitiesAndCerts.map { it.owningKey } }
}
}

View File

@ -8,7 +8,6 @@ import net.corda.core.crypto.entropyToKeyPair
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.createDirectories
import net.corda.core.internal.createDirectory
import net.corda.core.internal.uncheckedCast
@ -182,7 +181,6 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
CordappLoader.createDefaultWithTestPackages(args.config, args.network.cordappPackages),
args.network.busyLatch) {
val mockNet = args.network
override val networkMapAddress = null
val id = args.id
internal val notaryIdentity = args.notaryIdentity
val entropyRoot = args.entropyRoot
@ -235,8 +233,11 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
return entropyToKeyPair(counter)
}
// It's OK to not have a network map service in the mock network.
override fun noNetworkMapConfigured() = doneFuture(Unit)
/**
* MockNetwork will ensure nodes are connected to each other. The nodes themselves
* won't be able to tell if that happened already or not.
*/
override fun checkNetworkMapIsInitialized() = Unit
override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1)

View File

@ -5,6 +5,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.concurrent.*
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.Node
@ -14,7 +15,6 @@ import net.corda.node.services.config.*
import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.nodeapi.User
import net.corda.nodeapi.config.toConfig
import net.corda.testing.DUMMY_MAP
import net.corda.testing.TestDependencyInjectionBase
import net.corda.testing.driver.addressMustNotBeBoundFuture
import net.corda.testing.getFreeLocalPorts
@ -41,9 +41,7 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
val tempFolder = TemporaryFolder()
private val nodes = mutableListOf<StartedNode<Node>>()
private var _networkMapNode: StartedNode<Node>? = null
val networkMapNode: StartedNode<Node> get() = _networkMapNode ?: startNetworkMapNode()
private val nodeInfos = mutableListOf<NodeInfo>()
init {
System.setProperty("consoleLogLevel", Level.DEBUG.name().toLowerCase())
@ -65,7 +63,6 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
)
}.filterNotNull()
nodes.clear()
_networkMapNode = null
portNotBoundChecks.transpose().getOrThrow()
}
@ -76,49 +73,17 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
nodes.forEach { it.services.networkMapCache.clearNetworkMapCache() }
}
/**
* You can use this method to start the network map node in a more customised manner. Otherwise it
* will automatically be started with the default parameters.
*/
fun startNetworkMapNode(legalName: CordaX500Name = DUMMY_MAP.name,
platformVersion: Int = 1,
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): StartedNode<Node> {
check(_networkMapNode == null || _networkMapNode!!.info.legalIdentitiesAndCerts.first().name == legalName)
return startNodeInternal(legalName, platformVersion, rpcUsers, configOverrides).apply {
_networkMapNode = this
}
}
@JvmOverloads
fun startNode(legalName: CordaX500Name,
platformVersion: Int = 1,
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap(),
noNetworkMap: Boolean = false,
waitForConnection: Boolean = true): CordaFuture<StartedNode<Node>> {
val networkMapConf = if (noNetworkMap) {
// Nonexistent network map service address.
mapOf(
"networkMapService" to mapOf(
"address" to "localhost:10000",
"legalName" to networkMapNode.info.legalIdentitiesAndCerts.first().name.toString()
)
)
} else {
mapOf(
"networkMapService" to mapOf(
"address" to networkMapNode.internals.configuration.p2pAddress.toString(),
"legalName" to networkMapNode.info.legalIdentitiesAndCerts.first().name.toString()
)
)
}
val node = startNodeInternal(
legalName,
platformVersion,
rpcUsers,
networkMapConf + configOverrides,
noNetworkMap)
configOverrides)
return if (waitForConnection) node.internals.nodeReadyFuture.map { node } else doneFuture(node)
}
@ -169,11 +134,19 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
protected fun baseDirectory(legalName: CordaX500Name) = tempFolder.root.toPath() / legalName.organisation.replace(WHITESPACE, "")
private fun ensureAllNetworkMapCachesHaveAllNodeInfos() {
val runningNodes = nodes.filter { it.internals.started != null }
val runningNodesInfo = runningNodes.map { it.info }
for (node in runningNodes)
for (nodeInfo in runningNodesInfo) {
node.services.networkMapCache.addNode(nodeInfo)
}
}
private fun startNodeInternal(legalName: CordaX500Name,
platformVersion: Int,
rpcUsers: List<User>,
configOverrides: Map<String, Any>,
noNetworkMap: Boolean = false): StartedNode<Node> {
configOverrides: Map<String, Any>): StartedNode<Node> {
val baseDirectory = baseDirectory(legalName).createDirectories()
val localPort = getFreeLocalPorts("localhost", 2)
val p2pAddress = configOverrides["p2pAddress"] ?: localPort[0].toString()
@ -184,8 +157,7 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
"myLegalName" to legalName.toString(),
"p2pAddress" to p2pAddress,
"rpcAddress" to localPort[1].toString(),
"rpcUsers" to rpcUsers.map { it.toMap() },
"noNetworkMap" to noNetworkMap
"rpcUsers" to rpcUsers.map { it.toMap() }
) + configOverrides
)
@ -196,9 +168,11 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
initialiseSerialization = false,
cordappLoader = CordappLoader.createDefaultWithTestPackages(parsedConfig, cordappPackages)).start()
nodes += node
ensureAllNetworkMapCachesHaveAllNodeInfos()
thread(name = legalName.organisation) {
node.internals.run()
}
return node
}
}

View File

@ -4,7 +4,6 @@ import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigValueFactory
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.config.parseAsNodeConfiguration
import net.corda.nodeapi.User
import net.corda.nodeapi.config.toConfig
@ -47,7 +46,6 @@ class NodeConfigTest {
assertEquals(localPort(40002), fullConfig.rpcAddress)
assertEquals(localPort(10001), fullConfig.p2pAddress)
assertEquals(listOf(user("jenny")), fullConfig.rpcUsers)
assertEquals(NetworkMapInfo(localPort(12345), DUMMY_NOTARY.name), fullConfig.networkMapService)
assertThat(fullConfig.dataSourceProperties["dataSource.url"] as String).contains("AUTO_SERVER_PORT=30001")
assertTrue(fullConfig.useTestClock)
assertFalse(fullConfig.detectPublicIp)