NetworkMapCache database backed (#1135)

Work on database backed NetworkMapCache

Make NodeInfo JPA entity.

Enable node startup with it's database network map cache. Fix schema.
Make node not wait for finishing network map service registration if it
successfully loaded data from database.

Add tests for startup without NetworkMapService.

* Rename networkMapRegistrationFuture

Change networkMapRegistrationFuture to nodeReadyFuture, it no longer
indicates the NetworkMapService registration, because we are able to run
network without map service configured.

* Partially integrate database into NetworkMapCache

Full integrtion will come with service removal.

Move MockServiceHubInternal to net.corda.node.testing

* Add workaround to transaction scope race

Temporary workaround to force isolated transaction (otherwise it causes race conditions when processing
network map registration on network map node).

* Remove WorldMapLocation from NodeInfo

Infer the node's location based on X500 name

Add serial number on NodeInfo

For tests of running without NetworkMap, start nodes with nonexistent NetworkMap address

Make clearNetworkMapCache callable via RPC.
This commit is contained in:
Katarzyna Streich 2017-08-31 11:00:11 +01:00 committed by GitHub
parent 3e5fa9ee6a
commit 472ecc65c6
41 changed files with 885 additions and 287 deletions

View File

@ -257,7 +257,7 @@ interface CordaRPCOps : RPCOps {
* complete with an exception if it is unable to. * complete with an exception if it is unable to.
*/ */
@RPCReturnsObservables @RPCReturnsObservables
fun waitUntilRegisteredWithNetworkMap(): CordaFuture<Void?> fun waitUntilNetworkReady(): CordaFuture<Void?>
// TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of // TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of
// the node's state locally and query that directly. // the node's state locally and query that directly.
@ -299,6 +299,11 @@ interface CordaRPCOps : RPCOps {
* @return the node info if available. * @return the node info if available.
*/ */
fun nodeIdentityFromParty(party: AbstractParty): NodeInfo? fun nodeIdentityFromParty(party: AbstractParty): NodeInfo?
/**
* Clear all network map data from local node cache.
*/
fun clearNetworkMapCache()
} }
inline fun <reified T : ContractState> CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(), inline fun <reified T : ContractState> CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(),

View File

@ -1,12 +1,16 @@
package net.corda.core.node package net.corda.core.node
import net.corda.core.crypto.locationOrNull
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate import net.corda.core.identity.PartyAndCertificate
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType import net.corda.core.node.services.ServiceType
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.NodeInfoSchemaV1
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.NonEmptySet import net.corda.core.utilities.NonEmptySet
import net.corda.core.serialization.serialize
/** /**
* Information for an advertised service including the service specific identity information. * Information for an advertised service including the service specific identity information.
@ -21,11 +25,13 @@ data class ServiceEntry(val info: ServiceInfo, val identity: PartyAndCertificate
// TODO We currently don't support multi-IP/multi-identity nodes, we only left slots in the data structures. // TODO We currently don't support multi-IP/multi-identity nodes, we only left slots in the data structures.
@CordaSerializable @CordaSerializable
data class NodeInfo(val addresses: List<NetworkHostAndPort>, data class NodeInfo(val addresses: List<NetworkHostAndPort>,
val legalIdentityAndCert: PartyAndCertificate, //TODO This field will be removed in future PR which gets rid of services. // TODO After removing of services these two fields will be merged together and made NonEmptySet.
val legalIdentitiesAndCerts: NonEmptySet<PartyAndCertificate>, val legalIdentityAndCert: PartyAndCertificate,
val legalIdentitiesAndCerts: Set<PartyAndCertificate>,
val platformVersion: Int, val platformVersion: Int,
val advertisedServices: List<ServiceEntry> = emptyList(), val advertisedServices: List<ServiceEntry> = emptyList(),
val worldMapLocation: WorldMapLocation? = null) { val serial: Long
) {
init { init {
require(advertisedServices.none { it.identity == legalIdentityAndCert }) { require(advertisedServices.none { it.identity == legalIdentityAndCert }) {
"Service identities must be different from node legal identity" "Service identities must be different from node legal identity"
@ -37,4 +43,12 @@ data class NodeInfo(val addresses: List<NetworkHostAndPort>,
fun serviceIdentities(type: ServiceType): List<Party> { fun serviceIdentities(type: ServiceType): List<Party> {
return advertisedServices.mapNotNull { if (it.info.type.isSubTypeOf(type)) it.identity.party else null } return advertisedServices.mapNotNull { if (it.info.type.isSubTypeOf(type)) it.identity.party else null }
} }
/**
* Uses node's owner X500 name to infer the node's location. Used in Explorer in map view.
*/
fun getWorldMapLocation(): WorldMapLocation? {
val nodeOwnerLocation = legalIdentity.name.locationOrNull
return nodeOwnerLocation?.let { CityDatabase[it] }
}
} }

View File

@ -8,6 +8,7 @@ import net.corda.core.internal.randomOrNull
import net.corda.core.messaging.DataFeed import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.NetworkHostAndPort
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
import rx.Observable import rx.Observable
import java.security.PublicKey import java.security.PublicKey
@ -44,7 +45,7 @@ interface NetworkMapCache {
/** Tracks changes to the network map cache */ /** Tracks changes to the network map cache */
val changed: Observable<MapChange> val changed: Observable<MapChange>
/** Future to track completion of the NetworkMapService registration. */ /** Future to track completion of the NetworkMapService registration. */
val mapServiceRegistered: CordaFuture<Void?> val nodeReady: CordaFuture<Void?>
/** /**
* Atomically get the current party nodes and a stream of updates. Note that the Observable buffers updates until the * Atomically get the current party nodes and a stream of updates. Note that the Observable buffers updates until the
@ -76,7 +77,10 @@ interface NetworkMapCache {
fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo?
/** Look up the node info for a legal name. */ /** Look up the node info for a legal name. */
fun getNodeByLegalName(principal: X500Name): NodeInfo? = partyNodes.singleOrNull { it.legalIdentity.name == principal } fun getNodeByLegalName(principal: X500Name): NodeInfo?
/** Look up the node info for a host and port. */
fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo?
/** /**
* In general, nodes can advertise multiple identities: a legal identity, and separate identities for each of * In general, nodes can advertise multiple identities: a legal identity, and separate identities for each of
@ -144,4 +148,9 @@ interface NetworkMapCache {
"Your options are: ${notaryNodes.map { "\"${it.notaryIdentity.name}\"" }.joinToString()}.") "Your options are: ${notaryNodes.map { "\"${it.notaryIdentity.name}\"" }.joinToString()}.")
return notary.advertisedServices.any { it.info.type.isValidatingNotary() } return notary.advertisedServices.any { it.info.type.isValidatingNotary() }
} }
/**
* Clear all network map data from local node cache.
*/
fun clearNetworkMapCache()
} }

View File

@ -0,0 +1,142 @@
package net.corda.core.schemas
import net.corda.core.crypto.toBase58String
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceEntry
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import java.io.Serializable
import java.security.cert.CertPath
import javax.persistence.CascadeType
import javax.persistence.Column
import javax.persistence.ElementCollection
import javax.persistence.Embeddable
import javax.persistence.EmbeddedId
import javax.persistence.Entity
import javax.persistence.GeneratedValue
import javax.persistence.Id
import javax.persistence.JoinColumn
import javax.persistence.JoinTable
import javax.persistence.Lob
import javax.persistence.ManyToMany
import javax.persistence.OneToMany
import javax.persistence.Table
object NodeInfoSchema
object NodeInfoSchemaV1 : MappedSchema(
schemaFamily = NodeInfoSchema.javaClass,
version = 1,
mappedTypes = listOf(PersistentNodeInfo::class.java, DBPartyAndCertificate::class.java, DBHostAndPort::class.java)
) {
@Entity
@Table(name = "node_infos")
class PersistentNodeInfo(
@Id
@GeneratedValue
@Column(name = "node_info_id")
var id: Int,
@Column(name = "addresses")
@OneToMany(cascade = arrayOf(CascadeType.ALL), orphanRemoval = true)
val addresses: List<NodeInfoSchemaV1.DBHostAndPort>,
@Column(name = "legal_identities_certs")
@ManyToMany(cascade = arrayOf(CascadeType.ALL))
@JoinTable(name = "link_nodeinfo_party",
joinColumns = arrayOf(JoinColumn(name="node_info_id")),
inverseJoinColumns = arrayOf(JoinColumn(name="party_name")))
val legalIdentitiesAndCerts: Set<DBPartyAndCertificate>,
@Column(name = "platform_version")
val platformVersion: Int,
@Column(name = "advertised_services")
@ElementCollection
var advertisedServices: List<DBServiceEntry> = emptyList(),
/**
* serial is an increasing value which represents the version of [NodeInfo].
* Not expected to be sequential, but later versions of the registration must have higher values
* Similar to the serial number on DNS records.
*/
@Column(name = "serial")
val serial: Long
) {
fun toNodeInfo(): NodeInfo {
return NodeInfo(
this.addresses.map { it.toHostAndPort() },
this.legalIdentitiesAndCerts.filter { it.isMain }.single().toLegalIdentityAndCert(), // TODO Workaround, it will be changed after PR with services removal.
this.legalIdentitiesAndCerts.filter { !it.isMain }.map { it.toLegalIdentityAndCert() }.toSet(),
this.platformVersion,
this.advertisedServices.map {
it.serviceEntry?.deserialize<ServiceEntry>() ?: throw IllegalStateException("Service entry shouldn't be null")
},
this.serial
)
}
}
@Embeddable
data class PKHostAndPort(
val host: String? = null,
val port: Int? = null
) : Serializable
@Entity
data class DBHostAndPort(
@EmbeddedId
private val pk: PKHostAndPort
) {
companion object {
fun fromHostAndPort(hostAndPort: NetworkHostAndPort) = DBHostAndPort(
PKHostAndPort(hostAndPort.host, hostAndPort.port)
)
}
fun toHostAndPort(): NetworkHostAndPort {
return NetworkHostAndPort(this.pk.host!!, this.pk.port!!)
}
}
@Embeddable // TODO To be removed with services.
data class DBServiceEntry(
@Column(length = 65535)
val serviceEntry: ByteArray? = null
)
/**
* PartyAndCertificate entity (to be replaced by referencing final Identity Schema).
*/
@Entity
@Table(name = "node_info_party_cert")
data class DBPartyAndCertificate(
@Id
@Column(name = "owning_key", length = 65535, nullable = false)
val owningKey: String,
//@Id // TODO Do we assume that names are unique? Note: We can't have it as Id, because our toString on X500 is inconsistent.
@Column(name = "party_name", nullable = false)
val name: String,
@Column(name = "certificate")
@Lob
val certificate: ByteArray,
@Column(name = "certificate_path")
@Lob
val certPath: ByteArray,
val isMain: Boolean,
@ManyToMany(mappedBy = "legalIdentitiesAndCerts", cascade = arrayOf(CascadeType.ALL)) // ManyToMany because of distributed services.
private val persistentNodeInfos: Set<PersistentNodeInfo> = emptySet()
) {
constructor(partyAndCert: PartyAndCertificate, isMain: Boolean = false)
: this(partyAndCert.party.owningKey.toBase58String(), partyAndCert.party.name.toString(), partyAndCert.certificate.serialize().bytes, partyAndCert.certPath.serialize().bytes, isMain)
fun toLegalIdentityAndCert(): PartyAndCertificate {
return PartyAndCertificate(certPath.deserialize<CertPath>())
}
}
}

View File

@ -17,9 +17,9 @@ The ORM mapping is specified using the `Java Persistence API <https://en.wikiped
in the node's local vault as part of a transaction. in the node's local vault as part of a transaction.
.. note:: Presently the node includes an instance of the H2 database but any database that supports JDBC is a .. note:: Presently the node includes an instance of the H2 database but any database that supports JDBC is a
candidate and the node will in the future support a range of database implementations via their JDBC drivers. Much candidate and the node will in the future support a range of database implementations via their JDBC drivers. Much
of the node internal state is also persisted there. You can access the internal H2 database via JDBC, please see the of the node internal state is also persisted there. You can access the internal H2 database via JDBC, please see the
info in ":doc:`node-administration`" for details. info in ":doc:`node-administration`" for details.
Schemas Schemas
------- -------

View File

@ -45,8 +45,8 @@ class IntegrationTestingTutorial {
val bobClient = bob.rpcClientToNode() val bobClient = bob.rpcClientToNode()
val bobProxy = bobClient.start("bobUser", "testPassword2").proxy val bobProxy = bobClient.start("bobUser", "testPassword2").proxy
aliceProxy.waitUntilRegisteredWithNetworkMap().getOrThrow() aliceProxy.waitUntilNetworkReady().getOrThrow()
bobProxy.waitUntilRegisteredWithNetworkMap().getOrThrow() bobProxy.waitUntilNetworkReady().getOrThrow()
// END 2 // END 2
// START 3 // START 3

View File

@ -43,6 +43,7 @@ class P2PMessagingTest : NodeBasedTest() {
val startUpDuration = elapsedTime { startNodes().getOrThrow() } val startUpDuration = elapsedTime { startNodes().getOrThrow() }
// Start the network map a second time - this will restore message queues from the journal. // Start the network map a second time - this will restore message queues from the journal.
// This will hang and fail prior the fix. https://github.com/corda/corda/issues/37 // This will hang and fail prior the fix. https://github.com/corda/corda/issues/37
clearAllNodeInfoDb() // Clear network map data from nodes databases.
stopAllNodes() stopAllNodes()
startNodes().getOrThrow(timeout = startUpDuration * 3) startNodes().getOrThrow(timeout = startUpDuration * 3)
} }
@ -149,7 +150,6 @@ class P2PMessagingTest : NodeBasedTest() {
// Restart the node and expect a response // Restart the node and expect a response
val aliceRestarted = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow() val aliceRestarted = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow()
val response = aliceRestarted.network.onNext<Any>(dummyTopic, sessionId).getOrThrow(5.seconds) val response = aliceRestarted.network.onNext<Any>(dummyTopic, sessionId).getOrThrow(5.seconds)
assertThat(requestsReceived.get()).isGreaterThanOrEqualTo(2) assertThat(requestsReceived.get()).isGreaterThanOrEqualTo(2)
assertThat(response).isEqualTo(responseMessage) assertThat(response).isEqualTo(responseMessage)
} }

View File

@ -67,7 +67,7 @@ class P2PSecurityTest : NodeBasedTest() {
private fun SimpleNode.registerWithNetworkMap(registrationName: X500Name): CordaFuture<NetworkMapService.RegistrationResponse> { private fun SimpleNode.registerWithNetworkMap(registrationName: X500Name): CordaFuture<NetworkMapService.RegistrationResponse> {
val legalIdentity = getTestPartyAndCertificate(registrationName, identity.public) val legalIdentity = getTestPartyAndCertificate(registrationName, identity.public)
val nodeInfo = NodeInfo(listOf(MOCK_HOST_AND_PORT), legalIdentity, NonEmptySet.of(legalIdentity), 1) val nodeInfo = NodeInfo(listOf(MOCK_HOST_AND_PORT), legalIdentity, NonEmptySet.of(legalIdentity), 1, serial = 1)
val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX) val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX)
val request = RegistrationRequest(registration.toWire(keyService, identity.public), network.myAddress) val request = RegistrationRequest(registration.toWire(keyService, identity.public), network.myAddress)
return network.sendRequest<NetworkMapService.RegistrationResponse>(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.network.myAddress) return network.sendRequest<NetworkMapService.RegistrationResponse>(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.network.myAddress)

View File

@ -12,6 +12,7 @@ import net.corda.core.flows.*
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.* import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
@ -39,7 +40,7 @@ import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.sendRequest import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.network.InMemoryNetworkMapCache import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.RegistrationRequest import net.corda.node.services.network.NetworkMapService.RegistrationRequest
import net.corda.node.services.network.NetworkMapService.RegistrationResponse import net.corda.node.services.network.NetworkMapService.RegistrationResponse
@ -138,10 +139,11 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
var isPreviousCheckpointsPresent = false var isPreviousCheckpointsPresent = false
private set private set
protected val _networkMapRegistrationFuture = openFuture<Unit>() protected val _nodeReadyFuture = openFuture<Unit>()
/** Completes once the node has successfully registered with the network map service */ /** Completes once the node has successfully registered with the network map service
val networkMapRegistrationFuture: CordaFuture<Unit> * or has loaded network map data from local database */
get() = _networkMapRegistrationFuture val nodeReadyFuture: CordaFuture<Unit>
get() = _nodeReadyFuture
/** Fetch CordaPluginRegistry classes registered in META-INF/services/net.corda.core.node.CordaPluginRegistry files that exist in the classpath */ /** Fetch CordaPluginRegistry classes registered in META-INF/services/net.corda.core.node.CordaPluginRegistry files that exist in the classpath */
open val pluginRegistries: List<CordaPluginRegistry> by lazy { open val pluginRegistries: List<CordaPluginRegistry> by lazy {
@ -155,10 +157,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
/** The implementation of the [CordaRPCOps] interface used by this node. */ /** The implementation of the [CordaRPCOps] interface used by this node. */
open val rpcOps: CordaRPCOps by lazy { CordaRPCOpsImpl(services, smm, database) } // Lazy to avoid init ordering issue with the SMM. open val rpcOps: CordaRPCOps by lazy { CordaRPCOpsImpl(services, smm, database) } // Lazy to avoid init ordering issue with the SMM.
open fun findMyLocation(): WorldMapLocation? {
return configuration.myLegalName.locationOrNull?.let { CityDatabase[it] }
}
open fun start() { open fun start() {
require(!started) { "Node has already been started" } require(!started) { "Node has already been started" }
@ -208,7 +206,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
} }
runOnStop += network::stop runOnStop += network::stop
_networkMapRegistrationFuture.captureLater(registerWithNetworkMapIfConfigured()) }
// If we successfully loaded network data from database, we set this future to Unit.
_nodeReadyFuture.captureLater(registerWithNetworkMapIfConfigured())
database.transaction {
smm.start() smm.start()
// Shut down the SMM so no Fibers are scheduled. // Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) } runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
@ -483,9 +484,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
private fun makeInfo(legalIdentity: PartyAndCertificate): NodeInfo { private fun makeInfo(legalIdentity: PartyAndCertificate): NodeInfo {
val advertisedServiceEntries = makeServiceEntries() val advertisedServiceEntries = makeServiceEntries()
val allIdentities = (advertisedServiceEntries.map { it.identity } + legalIdentity).toNonEmptySet() val allIdentities = advertisedServiceEntries.map { it.identity }.toSet() // TODO Add node's legalIdentity (after services removal).
val addresses = myAddresses() // TODO There is no support for multiple IP addresses yet. val addresses = myAddresses() // TODO There is no support for multiple IP addresses yet.
return NodeInfo(addresses, legalIdentity, allIdentities, platformVersion, advertisedServiceEntries, findMyLocation()) return NodeInfo(addresses, legalIdentity, allIdentities, platformVersion, advertisedServiceEntries, platformClock.instant().toEpochMilli())
} }
/** /**
@ -580,7 +581,15 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
services.networkMapCache.runWithoutMapService() services.networkMapCache.runWithoutMapService()
noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache
} else { } else {
registerWithNetworkMap() val netMapRegistration = registerWithNetworkMap()
// We may want to start node immediately with database data and not wait for network map registration (but send it either way).
// So we are ready to go.
if (services.networkMapCache.loadDBSuccess) {
log.info("Node successfully loaded network map data from the database.")
doneFuture(Unit)
} else {
netMapRegistration
}
} }
} }
@ -606,7 +615,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
// Register this node against the network // Register this node against the network
val instant = platformClock.instant() val instant = platformClock.instant()
val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
val reg = NodeRegistration(info, instant.toEpochMilli(), ADD, expires) val reg = NodeRegistration(info, info.serial, ADD, expires)
val request = RegistrationRequest(reg.toWire(services.keyManagementService, info.legalIdentityAndCert.owningKey), network.myAddress) val request = RegistrationRequest(reg.toWire(services.keyManagementService, info.legalIdentityAndCert.owningKey), network.myAddress)
return network.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapAddress) return network.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapAddress)
} }
@ -616,9 +625,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
/** This is overriden by the mock node implementation to enable operation without any network map service */ /** This is overriden by the mock node implementation to enable operation without any network map service */
protected open fun noNetworkMapConfigured(): CordaFuture<Unit> { protected open fun noNetworkMapConfigured(): CordaFuture<Unit> {
// TODO: There should be a consistent approach to configuration error exceptions. if (services.networkMapCache.loadDBSuccess) {
throw IllegalStateException("Configuration error: this node isn't being asked to act as the network map, nor " + return doneFuture(Unit)
"has any other map node been configured.") } else {
// TODO: There should be a consistent approach to configuration error exceptions.
throw IllegalStateException("Configuration error: this node isn't being asked to act as the network map, nor " +
"has any other map node been configured.")
}
} }
protected open fun makeKeyManagementService(identityService: IdentityService): KeyManagementService { protected open fun makeKeyManagementService(identityService: IdentityService): KeyManagementService {
@ -754,7 +767,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
override val monitoringService = MonitoringService(MetricRegistry()) override val monitoringService = MonitoringService(MetricRegistry())
override val validatedTransactions = makeTransactionStorage() override val validatedTransactions = makeTransactionStorage()
override val transactionVerifierService by lazy { makeTransactionVerifierService() } override val transactionVerifierService by lazy { makeTransactionVerifierService() }
override val networkMapCache by lazy { InMemoryNetworkMapCache(this) } override val schemaService by lazy { NodeSchemaService(pluginRegistries.flatMap { it.requiredSchemas }.toSet()) }
override val networkMapCache by lazy { PersistentNetworkMapCache(this) }
override val vaultService by lazy { NodeVaultService(this) } override val vaultService by lazy { NodeVaultService(this) }
override val vaultQueryService by lazy { override val vaultQueryService by lazy {
HibernateVaultQueryImpl(database.hibernateConfig, vaultService) HibernateVaultQueryImpl(database.hibernateConfig, vaultService)
@ -776,7 +790,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
override val networkService: MessagingService get() = network override val networkService: MessagingService get() = network
override val clock: Clock get() = platformClock override val clock: Clock get() = platformClock
override val myInfo: NodeInfo get() = info override val myInfo: NodeInfo get() = info
override val schemaService by lazy { NodeSchemaService(pluginRegistries.flatMap { it.requiredSchemas }.toSet()) }
override val database: CordaPersistence get() = this@AbstractNode.database override val database: CordaPersistence get() = this@AbstractNode.database
override val configuration: NodeConfiguration get() = this@AbstractNode.configuration override val configuration: NodeConfiguration get() = this@AbstractNode.configuration

View File

@ -173,7 +173,7 @@ class CordaRPCOpsImpl(
override fun authoriseContractUpgrade(state: StateAndRef<*>, upgradedContractClass: Class<out UpgradedContract<*, *>>) = services.vaultService.authoriseContractUpgrade(state, upgradedContractClass) override fun authoriseContractUpgrade(state: StateAndRef<*>, upgradedContractClass: Class<out UpgradedContract<*, *>>) = services.vaultService.authoriseContractUpgrade(state, upgradedContractClass)
override fun deauthoriseContractUpgrade(state: StateAndRef<*>) = services.vaultService.deauthoriseContractUpgrade(state) override fun deauthoriseContractUpgrade(state: StateAndRef<*>) = services.vaultService.deauthoriseContractUpgrade(state)
override fun currentNodeTime(): Instant = Instant.now(services.clock) override fun currentNodeTime(): Instant = Instant.now(services.clock)
override fun waitUntilRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered override fun waitUntilNetworkReady() = services.networkMapCache.nodeReady
override fun partyFromAnonymous(party: AbstractParty): Party? = services.identityService.partyFromAnonymous(party) override fun partyFromAnonymous(party: AbstractParty): Party? = services.identityService.partyFromAnonymous(party)
override fun partyFromKey(key: PublicKey) = services.identityService.partyFromKey(key) override fun partyFromKey(key: PublicKey) = services.identityService.partyFromKey(key)
override fun partyFromX500Name(x500Name: X500Name) = services.identityService.partyFromX500Name(x500Name) override fun partyFromX500Name(x500Name: X500Name) = services.identityService.partyFromX500Name(x500Name)
@ -182,6 +182,10 @@ class CordaRPCOpsImpl(
override fun registeredFlows(): List<String> = services.rpcFlows.map { it.name }.sorted() override fun registeredFlows(): List<String> = services.rpcFlows.map { it.name }.sorted()
override fun clearNetworkMapCache() {
services.networkMapCache.clearNetworkMapCache()
}
companion object { companion object {
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo { private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.flowInitiator, flowLogic.track()) return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.flowInitiator, flowLogic.track())

View File

@ -155,7 +155,7 @@ open class Node(override val configuration: FullNodeConfiguration,
myIdentityOrNullIfNetworkMapService, myIdentityOrNullIfNetworkMapService,
serverThread, serverThread,
database, database,
networkMapRegistrationFuture, nodeReadyFuture,
services.monitoringService, services.monitoringService,
advertisedAddress) advertisedAddress)
} }
@ -210,15 +210,17 @@ open class Node(override val configuration: FullNodeConfiguration,
log.trace { "Trying to detect public hostname through the Network Map Service at $serverAddress" } log.trace { "Trying to detect public hostname through the Network Map Service at $serverAddress" }
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, configuration) val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, configuration)
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
initialConnectAttempts = 5 initialConnectAttempts = 2 // TODO Public host discovery needs rewriting, as we may start nodes without network map, and we don't want to wait that long on startup.
retryInterval = 5.seconds.toMillis() retryInterval = 2.seconds.toMillis()
retryIntervalMultiplier = 1.5 retryIntervalMultiplier = 1.5
maxRetryInterval = 3.minutes.toMillis() maxRetryInterval = 3.minutes.toMillis()
} }
val clientFactory = try { val clientFactory = try {
locator.createSessionFactory() locator.createSessionFactory()
} catch (e: ActiveMQNotConnectedException) { } catch (e: ActiveMQNotConnectedException) {
throw IOException("Unable to connect to the Network Map Service at $serverAddress for IP address discovery", e) log.warn("Unable to connect to the Network Map Service at $serverAddress for IP address discovery. " +
"Using the provided \"${configuration.p2pAddress.host}\" as the advertised address.")
return null
} }
val session = clientFactory.createSession(PEER_USER, PEER_USER, false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) val session = clientFactory.createSession(PEER_USER, PEER_USER, false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
@ -306,7 +308,7 @@ open class Node(override val configuration: FullNodeConfiguration,
} }
super.start() super.start()
networkMapRegistrationFuture.thenMatch({ nodeReadyFuture.thenMatch({
serverThread.execute { serverThread.execute {
// Begin exporting our own metrics via JMX. These can be monitored using any agent, e.g. Jolokia: // Begin exporting our own metrics via JMX. These can be monitored using any agent, e.g. Jolokia:
// //

View File

@ -102,7 +102,7 @@ open class NodeStartup(val args: Array<String>) {
node.start() node.start()
printPluginsAndServices(node) printPluginsAndServices(node)
node.networkMapRegistrationFuture.thenMatch({ node.nodeReadyFuture.thenMatch({
val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0 val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0
// TODO: Replace this with a standard function to get an unambiguous rendering of the X.500 name. // TODO: Replace this with a standard function to get an unambiguous rendering of the X.500 name.
val name = node.info.legalIdentity.name.orgName ?: node.info.legalIdentity.name.commonName val name = node.info.legalIdentity.name.orgName ?: node.info.legalIdentity.name.commonName

View File

@ -53,6 +53,9 @@ interface NetworkMapCacheInternal : NetworkMapCache {
/** For testing where the network map cache is manipulated marks the service as immediately ready. */ /** For testing where the network map cache is manipulated marks the service as immediately ready. */
@VisibleForTesting @VisibleForTesting
fun runWithoutMapService() fun runWithoutMapService()
/** Indicates if loading network map data from database was successful. */
val loadDBSuccess: Boolean
} }
@CordaSerializable @CordaSerializable

View File

@ -338,6 +338,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
* TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service. * TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service.
*/ */
private fun updateBridgesOnNetworkChange(change: MapChange) { private fun updateBridgesOnNetworkChange(change: MapChange) {
log.debug { "Updating bridges on network map change: ${change.node}" }
fun gatherAddresses(node: NodeInfo): Sequence<ArtemisPeerAddress> { fun gatherAddresses(node: NodeInfo): Sequence<ArtemisPeerAddress> {
val peerAddress = getArtemisPeerAddress(node) val peerAddress = getArtemisPeerAddress(node)
val addresses = mutableListOf(peerAddress) val addresses = mutableListOf(peerAddress)

View File

@ -1,185 +0,0 @@
package net.corda.node.services.network
import net.corda.core.concurrent.CordaFuture
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.NetworkCacheError
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.createMessage
import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.network.NetworkMapService.FetchMapResponse
import net.corda.node.services.network.NetworkMapService.SubscribeResponse
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.wrapWithDatabaseTransaction
import rx.Observable
import rx.subjects.PublishSubject
import java.security.PublicKey
import java.security.SignatureException
import java.util.*
import javax.annotation.concurrent.ThreadSafe
/**
* Extremely simple in-memory cache of the network map.
*
* @param serviceHub an optional service hub from which we'll take the identity service. We take a service hub rather
* than the identity service directly, as this avoids problems with service start sequence (network map cache
* and identity services depend on each other). Should always be provided except for unit test cases.
*/
@ThreadSafe
open class InMemoryNetworkMapCache(private val serviceHub: ServiceHub?) : SingletonSerializeAsToken(), NetworkMapCacheInternal {
companion object {
val logger = loggerFor<InMemoryNetworkMapCache>()
}
override val partyNodes: List<NodeInfo> get() = registeredNodes.map { it.value }
override val networkMapNodes: List<NodeInfo> get() = getNodesWithService(NetworkMapService.type)
private val _changed = PublishSubject.create<MapChange>()
// We use assignment here so that multiple subscribers share the same wrapped Observable.
override val changed: Observable<MapChange> = _changed.wrapWithDatabaseTransaction()
private val changePublisher: rx.Observer<MapChange> get() = _changed.bufferUntilDatabaseCommit()
private val _registrationFuture = openFuture<Void?>()
override val mapServiceRegistered: CordaFuture<Void?> get() = _registrationFuture
private var registeredForPush = false
protected var registeredNodes: MutableMap<PublicKey, NodeInfo> = Collections.synchronizedMap(HashMap())
override fun getPartyInfo(party: Party): PartyInfo? {
val node = registeredNodes[party.owningKey]
if (node != null) {
return PartyInfo.Node(node)
}
for ((_, value) in registeredNodes) {
for (service in value.advertisedServices) {
if (service.identity.party == party) {
return PartyInfo.Service(service)
}
}
}
return null
}
override fun getNodeByLegalIdentityKey(identityKey: PublicKey): NodeInfo? = registeredNodes[identityKey]
override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? {
val wellKnownParty = if (serviceHub != null) {
serviceHub.identityService.partyFromAnonymous(party)
} else {
party
}
return wellKnownParty?.let {
getNodeByLegalIdentityKey(it.owningKey)
}
}
override fun track(): DataFeed<List<NodeInfo>, MapChange> {
synchronized(_changed) {
return DataFeed(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction())
}
}
override fun addMapService(network: MessagingService, networkMapAddress: SingleMessageRecipient, subscribe: Boolean,
ifChangedSinceVer: Int?): CordaFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
network.addMessageHandler(NetworkMapService.PUSH_TOPIC) { message, _ ->
try {
val req = message.data.deserialize<NetworkMapService.Update>()
val ackMessage = network.createMessage(NetworkMapService.PUSH_ACK_TOPIC,
data = NetworkMapService.UpdateAcknowledge(req.mapVersion, network.myAddress).serialize().bytes)
network.send(ackMessage, req.replyTo)
processUpdatePush(req)
} catch(e: NodeMapError) {
logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}")
} catch(e: Exception) {
logger.error("Exception processing update from network map service", e)
}
}
registeredForPush = true
}
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, network.myAddress)
val future = network.sendRequest<FetchMapResponse>(NetworkMapService.FETCH_TOPIC, req, networkMapAddress).map { (nodes) ->
// We may not receive any nodes back, if the map hasn't changed since the version specified
nodes?.forEach { processRegistration(it) }
Unit
}
_registrationFuture.captureLater(future.map { null })
return future
}
override fun addNode(node: NodeInfo) {
synchronized(_changed) {
val previousNode = registeredNodes.put(node.legalIdentity.owningKey, node)
if (previousNode == null) {
changePublisher.onNext(MapChange.Added(node))
} else if (previousNode != node) {
changePublisher.onNext(MapChange.Modified(node, previousNode))
}
}
}
override fun removeNode(node: NodeInfo) {
synchronized(_changed) {
registeredNodes.remove(node.legalIdentity.owningKey)
changePublisher.onNext(MapChange.Removed(node))
}
}
/**
* Unsubscribes from updates from the given map service.
* @param service the network map service to listen to updates from.
*/
override fun deregisterForUpdates(network: MessagingService, service: NodeInfo): CordaFuture<Unit> {
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.SubscribeRequest(false, network.myAddress)
// `network.getAddressOfParty(partyInfo)` is a work-around for MockNetwork and InMemoryMessaging to get rid of SingleMessageRecipient in NodeInfo.
val address = network.getAddressOfParty(PartyInfo.Node(service))
val future = network.sendRequest<SubscribeResponse>(NetworkMapService.SUBSCRIPTION_TOPIC, req, address).map {
if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed()
}
_registrationFuture.captureLater(future.map { null })
return future
}
fun processUpdatePush(req: NetworkMapService.Update) {
try {
val reg = req.wireReg.verified()
processRegistration(reg)
} catch (e: SignatureException) {
throw NodeMapError.InvalidSignature()
}
}
private fun processRegistration(reg: NodeRegistration) {
// TODO: Implement filtering by sequence number, so we only accept changes that are
// more recent than the latest change we've processed.
when (reg.type) {
AddOrRemove.ADD -> addNode(reg.node)
AddOrRemove.REMOVE -> removeNode(reg.node)
}
}
@VisibleForTesting
override fun runWithoutMapService() {
_registrationFuture.set(null)
}
}

View File

@ -0,0 +1,339 @@
package net.corda.node.services.network
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.crypto.parsePublicKeyBase58
import net.corda.core.crypto.toBase58String
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.PartyInfo
import net.corda.core.schemas.NodeInfoSchemaV1
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.NetworkCacheError
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.createMessage
import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.network.NetworkMapService.FetchMapResponse
import net.corda.node.services.network.NetworkMapService.SubscribeResponse
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.DatabaseTransactionManager
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.wrapWithDatabaseTransaction
import org.bouncycastle.asn1.x500.X500Name
import org.hibernate.Session
import rx.Observable
import rx.subjects.PublishSubject
import java.security.PublicKey
import java.security.SignatureException
import java.util.*
import javax.annotation.concurrent.ThreadSafe
/**
* Extremely simple in-memory cache of the network map.
*
* @param serviceHub an optional service hub from which we'll take the identity service. We take a service hub rather
* than the identity service directly, as this avoids problems with service start sequence (network map cache
* and identity services depend on each other). Should always be provided except for unit test cases.
*/
@ThreadSafe
open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal) : SingletonSerializeAsToken(), NetworkMapCacheInternal {
companion object {
val logger = loggerFor<PersistentNetworkMapCache>()
}
private var registeredForPush = false
// TODO Small explanation, partyNodes and registeredNodes is left in memory as it was before, because it will be removed in
// next PR that gets rid of services. These maps are used only for queries by service.
override val partyNodes: List<NodeInfo> get() = registeredNodes.map { it.value }
override val networkMapNodes: List<NodeInfo> get() = getNodesWithService(NetworkMapService.type)
private val _changed = PublishSubject.create<MapChange>()
// We use assignment here so that multiple subscribers share the same wrapped Observable.
override val changed: Observable<MapChange> = _changed.wrapWithDatabaseTransaction()
private val changePublisher: rx.Observer<MapChange> get() = _changed.bufferUntilDatabaseCommit()
private val _registrationFuture = openFuture<Void?>()
override val nodeReady: CordaFuture<Void?> get() = _registrationFuture
protected val registeredNodes: MutableMap<PublicKey, NodeInfo> = Collections.synchronizedMap(HashMap())
private var _loadDBSuccess: Boolean = false
override val loadDBSuccess get() = _loadDBSuccess
init {
serviceHub.database.transaction { loadFromDB() }
}
override fun getPartyInfo(party: Party): PartyInfo? {
val nodes = serviceHub.database.transaction { queryByIdentityKey(party.owningKey) }
if (nodes.size == 1 && nodes[0].legalIdentity == party) {
return PartyInfo.Node(nodes[0])
}
for (node in nodes) {
for (service in node.advertisedServices) {
if (service.identity.party == party) {
return PartyInfo.Service(service)
}
}
}
return null
}
// TODO See comment to queryByLegalName why it's left like that.
override fun getNodeByLegalName(principal: X500Name): NodeInfo? = partyNodes.singleOrNull { it.legalIdentity.name == principal }
//serviceHub!!.database.transaction { queryByLegalName(principal).firstOrNull() }
override fun getNodeByLegalIdentityKey(identityKey: PublicKey): NodeInfo? =
serviceHub.database.transaction { queryByIdentityKey(identityKey).firstOrNull() }
override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? {
val wellKnownParty = serviceHub.identityService.partyFromAnonymous(party)
return wellKnownParty?.let {
getNodeByLegalIdentityKey(it.owningKey)
}
}
override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = serviceHub.database.transaction { queryByAddress(address) }
override fun track(): DataFeed<List<NodeInfo>, MapChange> {
synchronized(_changed) {
return DataFeed(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction())
}
}
override fun addMapService(network: MessagingService, networkMapAddress: SingleMessageRecipient, subscribe: Boolean,
ifChangedSinceVer: Int?): CordaFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
network.addMessageHandler(NetworkMapService.PUSH_TOPIC) { message, _ ->
try {
val req = message.data.deserialize<NetworkMapService.Update>()
val ackMessage = network.createMessage(NetworkMapService.PUSH_ACK_TOPIC,
data = NetworkMapService.UpdateAcknowledge(req.mapVersion, network.myAddress).serialize().bytes)
network.send(ackMessage, req.replyTo)
processUpdatePush(req)
} catch(e: NodeMapError) {
logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}")
} catch(e: Exception) {
logger.error("Exception processing update from network map service", e)
}
}
registeredForPush = true
}
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, network.myAddress)
val future = network.sendRequest<FetchMapResponse>(NetworkMapService.FETCH_TOPIC, req, networkMapAddress).map { (nodes) ->
// We may not receive any nodes back, if the map hasn't changed since the version specified
nodes?.forEach { processRegistration(it) }
Unit
}
_registrationFuture.captureLater(future.map { null })
return future
}
override fun addNode(node: NodeInfo) {
synchronized(_changed) {
val previousNode = registeredNodes.put(node.legalIdentity.owningKey, node)
if (previousNode == null) {
serviceHub.database.transaction {
updateInfoDB(node)
changePublisher.onNext(MapChange.Added(node))
}
} else if (previousNode != node) {
serviceHub.database.transaction {
updateInfoDB(node)
changePublisher.onNext(MapChange.Modified(node, previousNode))
}
}
}
}
override fun removeNode(node: NodeInfo) {
synchronized(_changed) {
registeredNodes.remove(node.legalIdentity.owningKey)
serviceHub.database.transaction {
removeInfoDB(node)
changePublisher.onNext(MapChange.Removed(node))
}
}
}
/**
* Unsubscribes from updates from the given map service.
* @param service the network map service to listen to updates from.
*/
override fun deregisterForUpdates(network: MessagingService, service: NodeInfo): CordaFuture<Unit> {
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.SubscribeRequest(false, network.myAddress)
// `network.getAddressOfParty(partyInfo)` is a work-around for MockNetwork and InMemoryMessaging to get rid of SingleMessageRecipient in NodeInfo.
val address = network.getAddressOfParty(PartyInfo.Node(service))
val future = network.sendRequest<SubscribeResponse>(NetworkMapService.SUBSCRIPTION_TOPIC, req, address).map {
if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed()
}
_registrationFuture.captureLater(future.map { null })
return future
}
fun processUpdatePush(req: NetworkMapService.Update) {
try {
val reg = req.wireReg.verified()
processRegistration(reg)
} catch (e: SignatureException) {
throw NodeMapError.InvalidSignature()
}
}
private fun processRegistration(reg: NodeRegistration) {
// TODO: Implement filtering by sequence number, so we only accept changes that are
// more recent than the latest change we've processed.
when (reg.type) {
AddOrRemove.ADD -> addNode(reg.node)
AddOrRemove.REMOVE -> removeNode(reg.node)
}
}
@VisibleForTesting
override fun runWithoutMapService() {
_registrationFuture.set(null)
}
// Changes related to NetworkMap redesign
// TODO It will be properly merged into network map cache after services removal.
private inline fun <T> createSession(block: (Session) -> T): T {
return DatabaseTransactionManager.current().session.let { block(it) }
}
private fun getAllInfos(session: Session): List<NodeInfoSchemaV1.PersistentNodeInfo> {
val criteria = session.criteriaBuilder.createQuery(NodeInfoSchemaV1.PersistentNodeInfo::class.java)
criteria.select(criteria.from(NodeInfoSchemaV1.PersistentNodeInfo::class.java))
return session.createQuery(criteria).resultList
}
/**
* Load NetworkMap data from the database if present. Node can start without having NetworkMapService configured.
*/
private fun loadFromDB() {
logger.info("Loading network map from database...")
createSession {
val result = getAllInfos(it)
for (nodeInfo in result) {
try {
logger.info("Loaded node info: $nodeInfo")
val publicKey = parsePublicKeyBase58(nodeInfo.legalIdentitiesAndCerts.single { it.isMain }.owningKey)
val node = nodeInfo.toNodeInfo()
registeredNodes.put(publicKey, node)
changePublisher.onNext(MapChange.Added(node)) // Redeploy bridges after reading from DB on startup.
_loadDBSuccess = true // This is used in AbstractNode to indicate that node is ready.
} catch (e: Exception) {
logger.warn("Exception parsing network map from the database.", e)
}
}
if (loadDBSuccess) {
_registrationFuture.set(null) // Useful only if we don't have NetworkMapService configured so StateMachineManager can start.
}
}
}
private fun updateInfoDB(nodeInfo: NodeInfo) {
// TODO Temporary workaround to force isolated transaction (otherwise it causes race conditions when processing
// network map registration on network map node)
val session = serviceHub.database.entityManagerFactory.withOptions().connection(serviceHub.database.dataSource.connection
.apply {
transactionIsolation = 1
}).openSession()
val tx = session.beginTransaction()
// TODO For now the main legal identity is left in NodeInfo, this should be set comparision/come up with index for NodeInfo?
val info = findByIdentityKey(session, nodeInfo.legalIdentity.owningKey)
val nodeInfoEntry = generateMappedObject(nodeInfo)
if (info.isNotEmpty()) {
nodeInfoEntry.id = info[0].id
}
session.merge(nodeInfoEntry)
tx.commit()
session.close()
}
private fun removeInfoDB(nodeInfo: NodeInfo) {
createSession {
val info = findByIdentityKey(it, nodeInfo.legalIdentity.owningKey).single()
it.remove(info)
}
}
private fun findByIdentityKey(session: Session, identityKey: PublicKey): List<NodeInfoSchemaV1.PersistentNodeInfo> {
val query = session.createQuery(
"SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.owningKey = :owningKey",
NodeInfoSchemaV1.PersistentNodeInfo::class.java)
query.setParameter("owningKey", identityKey.toBase58String())
return query.resultList
}
private fun queryByIdentityKey(identityKey: PublicKey): List<NodeInfo> {
createSession {
val result = findByIdentityKey(it, identityKey)
return result.map { it.toNodeInfo() }
}
}
// TODO It's useless for now, because toString on X500 names is inconsistent and we have:
// C=ES,L=Madrid,O=Alice Corp,CN=Alice Corp
// CN=Alice Corp,O=Alice Corp,L=Madrid,C=ES
private fun queryByLegalName(name: X500Name): List<NodeInfo> {
createSession {
val query = it.createQuery(
"SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name",
NodeInfoSchemaV1.PersistentNodeInfo::class.java)
query.setParameter("name", name.toString())
val result = query.resultList
return result.map { it.toNodeInfo() }
}
}
private fun queryByAddress(hostAndPort: NetworkHostAndPort): NodeInfo? {
createSession {
val query = it.createQuery(
"SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.addresses a WHERE a.pk.host = :host AND a.pk.port = :port",
NodeInfoSchemaV1.PersistentNodeInfo::class.java)
query.setParameter("host", hostAndPort.host)
query.setParameter("port", hostAndPort.port)
val result = query.resultList
return if (result.isEmpty()) null
else result.map { it.toNodeInfo() }.singleOrNull() ?: throw IllegalStateException("More than one node with the same host and port")
}
}
/** Object Relational Mapping support. */
private fun generateMappedObject(nodeInfo: NodeInfo): NodeInfoSchemaV1.PersistentNodeInfo {
return NodeInfoSchemaV1.PersistentNodeInfo(
id = 0,
addresses = nodeInfo.addresses.map { NodeInfoSchemaV1.DBHostAndPort.fromHostAndPort(it) },
legalIdentitiesAndCerts = nodeInfo.legalIdentitiesAndCerts.map { NodeInfoSchemaV1.DBPartyAndCertificate(it) }.toSet()
// TODO It's workaround to keep the main identity, will be removed in future PR getting rid of services.
+ NodeInfoSchemaV1.DBPartyAndCertificate(nodeInfo.legalIdentityAndCert, isMain = true),
platformVersion = nodeInfo.platformVersion,
advertisedServices = nodeInfo.advertisedServices.map { NodeInfoSchemaV1.DBServiceEntry(it.serialize().bytes) },
serial = nodeInfo.serial
)
}
override fun clearNetworkMapCache() {
serviceHub.database.transaction {
createSession {
val result = getAllInfos(it)
for (nodeInfo in result) it.remove(nodeInfo)
}
}
}
}

View File

@ -5,6 +5,7 @@ import net.corda.core.contracts.FungibleAsset
import net.corda.core.contracts.LinearState import net.corda.core.contracts.LinearState
import net.corda.core.schemas.CommonSchemaV1 import net.corda.core.schemas.CommonSchemaV1
import net.corda.core.schemas.MappedSchema import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.NodeInfoSchemaV1
import net.corda.core.schemas.PersistentState import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState import net.corda.core.schemas.QueryableState
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
@ -58,6 +59,7 @@ class NodeSchemaService(customSchemas: Set<MappedSchema> = emptySet()) : SchemaS
val requiredSchemas: Map<MappedSchema, SchemaService.SchemaOptions> = val requiredSchemas: Map<MappedSchema, SchemaService.SchemaOptions> =
mapOf(Pair(CommonSchemaV1, SchemaService.SchemaOptions()), mapOf(Pair(CommonSchemaV1, SchemaService.SchemaOptions()),
Pair(VaultSchemaV1, SchemaService.SchemaOptions()), Pair(VaultSchemaV1, SchemaService.SchemaOptions()),
Pair(NodeInfoSchemaV1, SchemaService.SchemaOptions()),
Pair(NodeServicesV1, SchemaService.SchemaOptions())) Pair(NodeServicesV1, SchemaService.SchemaOptions()))
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas.plus(customSchemas.map { override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas.plus(customSchemas.map {

View File

@ -39,6 +39,7 @@ import net.corda.node.utilities.wrapWithDatabaseTransaction
import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl
import net.corda.nodeapi.internal.serialization.withTokenContext import net.corda.nodeapi.internal.serialization.withTokenContext
import org.apache.activemq.artemis.utils.ReusableLatch import org.apache.activemq.artemis.utils.ReusableLatch
import org.bouncycastle.asn1.x500.X500Name
import org.slf4j.Logger import org.slf4j.Logger
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
@ -170,7 +171,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
checkQuasarJavaAgentPresence() checkQuasarJavaAgentPresence()
restoreFibersFromCheckpoints() restoreFibersFromCheckpoints()
listenToLedgerTransactions() listenToLedgerTransactions()
serviceHub.networkMapCache.mapServiceRegistered.then { executor.execute(this::resumeRestoredFibers) } serviceHub.networkMapCache.nodeReady.then { executor.execute(this::resumeRestoredFibers) }
} }
private fun checkQuasarJavaAgentPresence() { private fun checkQuasarJavaAgentPresence() {

View File

@ -11,12 +11,12 @@ import net.corda.core.node.services.VaultService
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.days import net.corda.core.utilities.days
import net.corda.node.services.MockServiceHubInternal
import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.NodeVaultService
import net.corda.node.testing.MockServiceHubInternal
import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.configureDatabase

View File

@ -12,9 +12,10 @@ import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.api.MonitoringService import net.corda.node.services.api.MonitoringService
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.network.InMemoryNetworkMapCache import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.testing.MockServiceHubInternal
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.configureDatabase
@ -54,8 +55,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() {
var messagingClient: NodeMessagingClient? = null var messagingClient: NodeMessagingClient? = null
var messagingServer: ArtemisMessagingServer? = null var messagingServer: ArtemisMessagingServer? = null
// TODO: We should have a dummy service hub rather than change behaviour in tests lateinit var networkMapCache: PersistentNetworkMapCache
val networkMapCache = InMemoryNetworkMapCache(serviceHub = null)
val rpcOps = object : RPCOps { val rpcOps = object : RPCOps {
override val protocolVersion: Int get() = throw UnsupportedOperationException() override val protocolVersion: Int get() = throw UnsupportedOperationException()
@ -71,6 +71,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() {
LogHelper.setLevel(PersistentUniquenessProvider::class) LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), createIdentityService = ::makeTestIdentityService) database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), createIdentityService = ::makeTestIdentityService)
networkMapRegistrationFuture = doneFuture(Unit) networkMapRegistrationFuture = doneFuture(Unit)
networkMapCache = PersistentNetworkMapCache(serviceHub = object : MockServiceHubInternal(database, config) {})
} }
@After @After

View File

@ -6,13 +6,14 @@ import net.corda.core.utilities.getOrThrow
import net.corda.testing.ALICE import net.corda.testing.ALICE
import net.corda.testing.BOB import net.corda.testing.BOB
import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockNetwork
import org.assertj.core.api.Assertions.assertThat
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import java.math.BigInteger import java.math.BigInteger
import kotlin.test.assertEquals import kotlin.test.assertEquals
class InMemoryNetworkMapCacheTest { class NetworkMapCacheTest {
lateinit var mockNet: MockNetwork lateinit var mockNet: MockNetwork
@Before @Before
@ -47,9 +48,7 @@ class InMemoryNetworkMapCacheTest {
// Node A currently knows only about itself, so this returns node A // Node A currently knows only about itself, so this returns node A
assertEquals(nodeA.services.networkMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeA.info) assertEquals(nodeA.services.networkMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeA.info)
nodeA.database.transaction { nodeA.services.networkMapCache.addNode(nodeB.info)
nodeA.services.networkMapCache.addNode(nodeB.info)
}
// The details of node B write over those for node A // The details of node B write over those for node A
assertEquals(nodeA.services.networkMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeB.info) assertEquals(nodeA.services.networkMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeB.info)
} }
@ -68,4 +67,20 @@ class InMemoryNetworkMapCacheTest {
// TODO: Should have a test case with anonymous lookup // TODO: Should have a test case with anonymous lookup
} }
@Test
fun `remove node from cache`() {
val nodes = mockNet.createSomeNodes(1)
val n0 = nodes.mapNode
val n1 = nodes.partyNodes[0]
val node0Cache = n0.services.networkMapCache as PersistentNetworkMapCache
mockNet.runNetwork()
n0.database.transaction {
assertThat(node0Cache.getNodeByLegalIdentity(n1.info.legalIdentity) != null)
node0Cache.removeNode(n1.info)
assertThat(node0Cache.getNodeByLegalIdentity(n1.info.legalIdentity) == null)
assertThat(node0Cache.getNodeByLegalIdentity(n0.info.legalIdentity) != null)
assertThat(node0Cache.getNodeByLegalName(n1.info.legalIdentity.name) == null)
}
}
} }

View File

@ -0,0 +1,194 @@
package net.corda.node.services.network
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.toBase58String
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.node.internal.Node
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.CHARLIE
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.node.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.bouncycastle.asn1.x500.X500Name
import org.junit.Before
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.assertFails
class PersistentNetworkMapCacheTest : NodeBasedTest() {
val partiesList = listOf(DUMMY_NOTARY, ALICE, BOB)
val addressesMap: HashMap<X500Name, NetworkHostAndPort> = HashMap()
val infos: MutableSet<NodeInfo> = HashSet()
@Before
fun start() {
val nodes = startNodesWithPort(partiesList)
nodes.forEach { it.nodeReadyFuture.get() } // Need to wait for network map registration, as these tests are ran without waiting.
nodes.forEach {
infos.add(it.info)
addressesMap[it.info.legalIdentity.name] = it.info.addresses[0]
it.stop() // We want them to communicate with NetworkMapService to save data to cache.
}
}
@Test
fun `get nodes by owning key and by name, no network map service`() {
val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0]
val netCache = alice.services.networkMapCache as PersistentNetworkMapCache
alice.database.transaction {
val res = netCache.getNodeByLegalIdentity(alice.info.legalIdentity)
assertEquals(alice.info, res)
val res2 = netCache.getNodeByLegalName(DUMMY_NOTARY.name)
assertEquals(infos.filter { it.legalIdentity.name == DUMMY_NOTARY.name }.singleOrNull(), res2)
}
}
@Test
fun `get nodes by address no network map service`() {
val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0]
val netCache = alice.services.networkMapCache as PersistentNetworkMapCache
alice.database.transaction {
val res = netCache.getNodeByAddress(alice.info.addresses[0])
assertEquals(alice.info, res)
}
}
@Test
fun `restart node with DB map cache and no network map`() {
val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0]
val partyNodes = alice.services.networkMapCache.partyNodes
assert(NetworkMapService.type !in alice.info.advertisedServices.map { it.info.type })
assertEquals(null, alice.inNodeNetworkMapService)
assertEquals(infos.size, partyNodes.size)
assertEquals(infos.map { it.legalIdentity }.toSet(), partyNodes.map { it.legalIdentity }.toSet())
}
@Test
fun `start 2 nodes without pointing at NetworkMapService and communicate with each other`() {
val parties = partiesList.subList(1, partiesList.size)
val nodes = startNodesWithPort(parties, noNetworkMap = true)
assert(nodes.all { it.inNodeNetworkMapService == null })
assert(nodes.all { NetworkMapService.type !in it.info.advertisedServices.map { it.info.type } })
nodes.forEach {
val partyNodes = it.services.networkMapCache.partyNodes
assertEquals(infos.size, partyNodes.size)
assertEquals(infos.map { it.legalIdentity }.toSet(), partyNodes.map { it.legalIdentity }.toSet())
}
checkConnectivity(nodes)
}
@Test
fun `start 2 nodes pointing at NetworkMapService but don't start network map node`() {
val parties = partiesList.subList(1, partiesList.size)
val nodes = startNodesWithPort(parties, noNetworkMap = false)
assert(nodes.all { it.inNodeNetworkMapService == null })
assert(nodes.all { NetworkMapService.type !in it.info.advertisedServices.map { it.info.type } })
nodes.forEach {
val partyNodes = it.services.networkMapCache.partyNodes
assertEquals(infos.size, partyNodes.size)
assertEquals(infos.map { it.legalIdentity }.toSet(), partyNodes.map { it.legalIdentity }.toSet())
}
checkConnectivity(nodes)
}
@Test
fun `start node and network map communicate`() {
val parties = partiesList.subList(0, 2)
val nodes = startNodesWithPort(parties, noNetworkMap = false)
checkConnectivity(nodes)
}
@Test
fun `start node without networkMapService and no database - fail`() {
assertFails { startNode(CHARLIE.name, noNetworkMap = true).getOrThrow(2.seconds) }
}
@Test
fun `new node joins network without network map started`() {
val parties = partiesList.subList(1, partiesList.size)
// Start 2 nodes pointing at network map, but don't start network map service.
val otherNodes = startNodesWithPort(parties, noNetworkMap = false)
otherNodes.forEach { node ->
assert(infos.any { it.legalIdentity == node.info.legalIdentity })
}
// Start node that is not in databases of other nodes. Point to NMS. Which has't started yet.
val charlie = startNodesWithPort(listOf(CHARLIE), noNetworkMap = false)[0]
otherNodes.forEach {
assert(charlie.info.legalIdentity !in it.services.networkMapCache.partyNodes.map { it.legalIdentity })
}
// Start Network Map and see that charlie node appears in caches.
val nms = startNodesWithPort(listOf(DUMMY_NOTARY), noNetworkMap = false)[0]
nms.startupComplete.get()
assert(nms.inNodeNetworkMapService != null)
assert(infos.any {it.legalIdentity == nms.info.legalIdentity})
otherNodes.forEach {
assert(nms.info.legalIdentity in it.services.networkMapCache.partyNodes.map { it.legalIdentity })
}
charlie.nodeReadyFuture.get() // Finish registration.
checkConnectivity(listOf(otherNodes[0], nms)) // Checks connectivity from A to NMS.
val cacheA = otherNodes[0].services.networkMapCache.partyNodes
val cacheB = otherNodes[1].services.networkMapCache.partyNodes
val cacheC = charlie.services.networkMapCache.partyNodes
assertEquals(4, cacheC.size) // Charlie fetched data from NetworkMap
assert(charlie.info.legalIdentity in cacheB.map { it.legalIdentity }) // Other nodes also fetched data from Network Map with node C.
assertEquals(cacheA.toSet(), cacheB.toSet())
assertEquals(cacheA.toSet(), cacheC.toSet())
}
// HELPERS
// Helper function to restart nodes with the same host and port.
private fun startNodesWithPort(nodesToStart: List<Party>, noNetworkMap: Boolean = false): List<Node> {
return nodesToStart.map { party ->
val configOverrides = addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap()
if (party == DUMMY_NOTARY) {
startNetworkMapNode(party.name, configOverrides = configOverrides)
}
else {
startNode(party.name,
configOverrides = configOverrides,
noNetworkMap = noNetworkMap,
waitForConnection = false).getOrThrow()
}
}
}
// Check that nodes are functional, communicate each with each.
private fun checkConnectivity(nodes: List<Node>) {
nodes.forEach { node1 ->
nodes.forEach { node2 ->
node2.registerInitiatedFlow(SendBackFlow::class.java)
val resultFuture = node1.services.startFlow(SendFlow(node2.info.legalIdentity)).resultFuture
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello!")
}
}
}
@InitiatingFlow
private class SendFlow(val otherParty: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
println("SEND FLOW to $otherParty")
println("Party key ${otherParty.owningKey.toBase58String()}")
return sendAndReceive<String>(otherParty, "Hi!").unwrap { it }
}
}
@InitiatedBy(SendFlow::class)
private class SendBackFlow(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
println("SEND BACK FLOW to $otherParty")
println("Party key ${otherParty.owningKey.toBase58String()}")
send(otherParty, "Hello!")
}
}
}

View File

@ -220,6 +220,7 @@ class FlowFrameworkTests {
node2.stop() node2.stop()
node2.database.transaction { node2.database.transaction {
assertEquals(1, node2.checkpointStorage.checkpoints().size) // confirm checkpoint assertEquals(1, node2.checkpointStorage.checkpoints().size) // confirm checkpoint
node2.services.networkMapCache.clearNetworkMapCache()
} }
val node2b = mockNet.createNode(node1.network.myAddress, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray()) val node2b = mockNet.createNode(node1.network.myAddress, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray())
node2.manuallyCloseDB() node2.manuallyCloseDB()

View File

@ -85,7 +85,7 @@ class VisualiserViewModel {
// bottom left: -23.2031,29.8406 // bottom left: -23.2031,29.8406
// top right: 33.0469,64.3209 // top right: 33.0469,64.3209
try { try {
return node.place.coordinate.project(view.mapImage.fitWidth, view.mapImage.fitHeight, 64.3209, 29.8406, -23.2031, 33.0469) return node.place.coordinate!!.project(view.mapImage.fitWidth, view.mapImage.fitHeight, 64.3209, 29.8406, -23.2031, 33.0469)
} catch(e: Exception) { } catch(e: Exception) {
throw Exception("Cannot project ${node.info.legalIdentity}", e) throw Exception("Cannot project ${node.info.legalIdentity}", e)
} }

View File

@ -261,7 +261,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
} }
} }
val networkInitialisationFinished = mockNet.nodes.map { it.networkMapRegistrationFuture }.transpose() val networkInitialisationFinished = mockNet.nodes.map { it.nodeReadyFuture }.transpose()
fun start(): CordaFuture<Unit> { fun start(): CordaFuture<Unit> {
mockNet.startNodes() mockNet.startNodes()

View File

@ -78,5 +78,4 @@ class DriverTests {
assertThat(debugLinesPresent).isTrue() assertThat(debugLinesPresent).isTrue()
} }
} }
} }

View File

@ -1,4 +1,4 @@
package net.corda.node.services package net.corda.node.testing
import com.codahale.metrics.MetricRegistry import com.codahale.metrics.MetricRegistry
import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowInitiator
@ -63,7 +63,7 @@ open class MockServiceHubInternal(
override val clock: Clock override val clock: Clock
get() = overrideClock ?: throw UnsupportedOperationException() get() = overrideClock ?: throw UnsupportedOperationException()
override val myInfo: NodeInfo override val myInfo: NodeInfo
get() = NodeInfo(listOf(MOCK_HOST_AND_PORT), DUMMY_IDENTITY_1, NonEmptySet.of(DUMMY_IDENTITY_1), 1) // Required to get a dummy platformVersion when required for tests. get() = NodeInfo(listOf(MOCK_HOST_AND_PORT), DUMMY_IDENTITY_1, NonEmptySet.of(DUMMY_IDENTITY_1), 1, serial = 1L) // Required to get a dummy platformVersion when required for tests.
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val rpcFlows: List<Class<out FlowLogic<*>>> override val rpcFlows: List<Class<out FlowLogic<*>>>
get() = throw UnsupportedOperationException() get() = throw UnsupportedOperationException()

View File

@ -223,7 +223,8 @@ sealed class PortAllocation {
* } * }
* *
* Note that [DriverDSL.startNode] does not wait for the node to start up synchronously, but rather returns a [CordaFuture] * Note that [DriverDSL.startNode] does not wait for the node to start up synchronously, but rather returns a [CordaFuture]
* of the [NodeInfo] that may be waited on, which completes when the new node registered with the network map service. * of the [NodeInfo] that may be waited on, which completes when the new node registered with the network map service or
* loaded node data from database.
* *
* The driver implicitly bootstraps a [NetworkMapService]. * The driver implicitly bootstraps a [NetworkMapService].
* *
@ -698,7 +699,8 @@ class DriverDSL(
// node port numbers to be shifted, so all demos and docs need to be updated accordingly. // node port numbers to be shifted, so all demos and docs need to be updated accordingly.
"webAddress" to webAddress.toString(), "webAddress" to webAddress.toString(),
"p2pAddress" to dedicatedNetworkMapAddress.toString(), "p2pAddress" to dedicatedNetworkMapAddress.toString(),
"useTestClock" to useTestClock "useTestClock" to useTestClock,
"extraAdvertisedServiceIds" to listOf(ServiceInfo(NetworkMapService.type).toString())
) )
) )
return startNodeInternal(config, webAddress, startInProcess) return startNodeInternal(config, webAddress, startInProcess)
@ -716,7 +718,7 @@ class DriverDSL(
) )
return nodeAndThreadFuture.flatMap { (node, thread) -> return nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration, openFuture()).flatMap { rpc -> establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration, openFuture()).flatMap { rpc ->
rpc.waitUntilRegisteredWithNetworkMap().map { rpc.waitUntilNetworkReady().map {
NodeHandle.InProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, node, thread) NodeHandle.InProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, node, thread)
} }
} }
@ -731,9 +733,9 @@ class DriverDSL(
} }
// We continue to use SSL enabled port for RPC when its for node user. // We continue to use SSL enabled port for RPC when its for node user.
establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration, processDeathFuture).flatMap { rpc -> establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration, processDeathFuture).flatMap { rpc ->
// Call waitUntilRegisteredWithNetworkMap in background in case RPC is failing over: // Call waitUntilNetworkReady in background in case RPC is failing over:
val networkMapFuture = executorService.fork { val networkMapFuture = executorService.fork {
rpc.waitUntilRegisteredWithNetworkMap() rpc.waitUntilNetworkReady()
}.flatMap { it } }.flatMap { it }
firstOf(processDeathFuture, networkMapFuture) { firstOf(processDeathFuture, networkMapFuture) {
if (it == processDeathFuture) { if (it == processDeathFuture) {

View File

@ -4,11 +4,11 @@ import co.paralleluniverse.common.util.VisibleForTesting
import net.corda.core.crypto.entropyToKeyPair import net.corda.core.crypto.entropyToKeyPair
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.NonEmptySet import net.corda.core.utilities.NonEmptySet
import net.corda.node.services.network.InMemoryNetworkMapCache import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.testing.getTestPartyAndCertificate import net.corda.testing.getTestPartyAndCertificate
import net.corda.testing.getTestX509Name import net.corda.testing.getTestX509Name
import rx.Observable import rx.Observable
@ -18,7 +18,7 @@ import java.math.BigInteger
/** /**
* Network map cache with no backing map service. * Network map cache with no backing map service.
*/ */
class MockNetworkMapCache(serviceHub: ServiceHub) : InMemoryNetworkMapCache(serviceHub) { class MockNetworkMapCache(serviceHub: ServiceHubInternal) : PersistentNetworkMapCache(serviceHub) {
private companion object { private companion object {
val BANK_C = getTestPartyAndCertificate(getTestX509Name("Bank C"), entropyToKeyPair(BigInteger.valueOf(1000)).public) val BANK_C = getTestPartyAndCertificate(getTestX509Name("Bank C"), entropyToKeyPair(BigInteger.valueOf(1000)).public)
val BANK_D = getTestPartyAndCertificate(getTestX509Name("Bank D"), entropyToKeyPair(BigInteger.valueOf(2000)).public) val BANK_D = getTestPartyAndCertificate(getTestX509Name("Bank D"), entropyToKeyPair(BigInteger.valueOf(2000)).public)
@ -29,8 +29,8 @@ class MockNetworkMapCache(serviceHub: ServiceHub) : InMemoryNetworkMapCache(serv
override val changed: Observable<NetworkMapCache.MapChange> = PublishSubject.create<NetworkMapCache.MapChange>() override val changed: Observable<NetworkMapCache.MapChange> = PublishSubject.create<NetworkMapCache.MapChange>()
init { init {
val mockNodeA = NodeInfo(listOf(BANK_C_ADDR), BANK_C, NonEmptySet.of(BANK_C), 1) val mockNodeA = NodeInfo(listOf(BANK_C_ADDR), BANK_C, NonEmptySet.of(BANK_C), 1, serial = 1L)
val mockNodeB = NodeInfo(listOf(BANK_D_ADDR), BANK_D, NonEmptySet.of(BANK_D), 1) val mockNodeB = NodeInfo(listOf(BANK_D_ADDR), BANK_D, NonEmptySet.of(BANK_D), 1, serial = 1L)
registeredNodes[mockNodeA.legalIdentity.owningKey] = mockNodeA registeredNodes[mockNodeA.legalIdentity.owningKey] = mockNodeA
registeredNodes[mockNodeB.legalIdentity.owningKey] = mockNodeB registeredNodes[mockNodeB.legalIdentity.owningKey] = mockNodeB
runWithoutMapService() runWithoutMapService()

View File

@ -212,7 +212,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
override fun noNetworkMapConfigured() = doneFuture(Unit) override fun noNetworkMapConfigured() = doneFuture(Unit)
// There is no need to slow down the unit tests by initialising CityDatabase // There is no need to slow down the unit tests by initialising CityDatabase
override fun findMyLocation(): WorldMapLocation? = null open fun findMyLocation(): WorldMapLocation? = null // It's left only for NetworkVisualiserSimulation
override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1) override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1)
@ -302,7 +302,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
return nodeFactory.create(config, this, networkMapAddress, advertisedServices.toSet(), id, overrideServices, entropyRoot).apply { return nodeFactory.create(config, this, networkMapAddress, advertisedServices.toSet(), id, overrideServices, entropyRoot).apply {
if (start) { if (start) {
start() start()
if (threadPerNode && networkMapAddress != null) networkMapRegistrationFuture.getOrThrow() // XXX: What about manually-started nodes? if (threadPerNode && networkMapAddress != null) nodeReadyFuture.getOrThrow() // XXX: What about manually-started nodes?
} }
_nodes.add(this) _nodes.add(this)
} }

View File

@ -160,7 +160,7 @@ open class MockServices(vararg val keys: KeyPair) : ServiceHub {
override val clock: Clock get() = Clock.systemUTC() override val clock: Clock get() = Clock.systemUTC()
override val myInfo: NodeInfo get() { override val myInfo: NodeInfo get() {
val identity = getTestPartyAndCertificate(MEGA_CORP.name, key.public) val identity = getTestPartyAndCertificate(MEGA_CORP.name, key.public)
return NodeInfo(emptyList(), identity, NonEmptySet.of(identity), 1) return NodeInfo(emptyList(), identity, NonEmptySet.of(identity), 1, serial = 1L)
} }
override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2) override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2)

View File

@ -4,6 +4,7 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.appendToCommonName import net.corda.core.crypto.appendToCommonName
import net.corda.core.crypto.commonName import net.corda.core.crypto.commonName
import net.corda.core.crypto.getX509Name import net.corda.core.crypto.getX509Name
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.map
@ -19,6 +20,8 @@ import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.config.configOf import net.corda.node.services.config.configOf
import net.corda.node.services.config.plus import net.corda.node.services.config.plus
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.ServiceIdentityGenerator import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.nodeapi.User import net.corda.nodeapi.User
@ -76,6 +79,13 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() {
portNotBoundChecks.transpose().getOrThrow() portNotBoundChecks.transpose().getOrThrow()
} }
/**
* Clear network map data from nodes' databases.
*/
fun clearAllNodeInfoDb() {
nodes.forEach { it.services.networkMapCache.clearNetworkMapCache() }
}
/** /**
* You can use this method to start the network map node in a more customised manner. Otherwise it * You can use this method to start the network map node in a more customised manner. Otherwise it
* will automatically be started with the default parameters. * will automatically be started with the default parameters.
@ -85,30 +95,44 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() {
advertisedServices: Set<ServiceInfo> = emptySet(), advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(), rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): Node { configOverrides: Map<String, Any> = emptyMap()): Node {
check(_networkMapNode == null) check(_networkMapNode == null || _networkMapNode!!.info.legalIdentity.name == legalName)
return startNodeInternal(legalName, platformVersion, advertisedServices, rpcUsers, configOverrides).apply { return startNodeInternal(legalName, platformVersion, advertisedServices + ServiceInfo(NetworkMapService.type), rpcUsers, configOverrides).apply {
_networkMapNode = this _networkMapNode = this
} }
} }
@JvmOverloads
fun startNode(legalName: X500Name, fun startNode(legalName: X500Name,
platformVersion: Int = 1, platformVersion: Int = 1,
advertisedServices: Set<ServiceInfo> = emptySet(), advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(), rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): CordaFuture<Node> { configOverrides: Map<String, Any> = emptyMap(),
noNetworkMap: Boolean = false,
waitForConnection: Boolean = true): CordaFuture<Node> {
val networkMapConf = if (noNetworkMap) {
// Nonexistent network map service address.
mapOf(
"networkMapService" to mapOf(
"address" to "localhost:10000",
"legalName" to networkMapNode.info.legalIdentity.name.toString()
)
)
} else {
mapOf(
"networkMapService" to mapOf(
"address" to networkMapNode.configuration.p2pAddress.toString(),
"legalName" to networkMapNode.info.legalIdentity.name.toString()
)
)
}
val node = startNodeInternal( val node = startNodeInternal(
legalName, legalName,
platformVersion, platformVersion,
advertisedServices, advertisedServices,
rpcUsers, rpcUsers,
mapOf( networkMapConf + configOverrides,
"networkMapService" to mapOf( noNetworkMap)
"address" to networkMapNode.configuration.p2pAddress.toString(), return if (waitForConnection) node.nodeReadyFuture.map { node } else doneFuture(node)
"legalName" to networkMapNode.info.legalIdentity.name.toString()
)
) + configOverrides
)
return node.networkMapRegistrationFuture.map { node }
} }
fun startNotaryCluster(notaryName: X500Name, fun startNotaryCluster(notaryName: X500Name,
@ -149,18 +173,21 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() {
platformVersion: Int, platformVersion: Int,
advertisedServices: Set<ServiceInfo>, advertisedServices: Set<ServiceInfo>,
rpcUsers: List<User>, rpcUsers: List<User>,
configOverrides: Map<String, Any>): Node { configOverrides: Map<String, Any>,
noNetworkMap: Boolean = false): Node {
val baseDirectory = baseDirectory(legalName).createDirectories() val baseDirectory = baseDirectory(legalName).createDirectories()
val localPort = getFreeLocalPorts("localhost", 2) val localPort = getFreeLocalPorts("localhost", 2)
val p2pAddress = configOverrides["p2pAddress"] ?: localPort[0].toString()
val config = ConfigHelper.loadConfig( val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory, baseDirectory = baseDirectory,
allowMissingConfig = true, allowMissingConfig = true,
configOverrides = configOf( configOverrides = configOf(
"myLegalName" to legalName.toString(), "myLegalName" to legalName.toString(),
"p2pAddress" to localPort[0].toString(), "p2pAddress" to p2pAddress,
"rpcAddress" to localPort[1].toString(), "rpcAddress" to localPort[1].toString(),
"extraAdvertisedServiceIds" to advertisedServices.map { it.toString() }, "extraAdvertisedServiceIds" to advertisedServices.map { it.toString() },
"rpcUsers" to rpcUsers.map { it.toMap() } "rpcUsers" to rpcUsers.map { it.toMap() },
"noNetworkMap" to noNetworkMap
) + configOverrides ) + configOverrides
) )

View File

@ -15,8 +15,8 @@ import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.keys.E2ETestKeyManagementService import net.corda.node.services.keys.E2ETestKeyManagementService
import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.testing.MockServiceHubInternal
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.configureDatabase
@ -42,7 +42,8 @@ class SimpleNode(val config: NodeConfiguration, val address: NetworkHostAndPort
val keyService: KeyManagementService = E2ETestKeyManagementService(identityService, setOf(identity)) val keyService: KeyManagementService = E2ETestKeyManagementService(identityService, setOf(identity))
val executor = ServiceAffinityExecutor(config.myLegalName.commonName, 1) val executor = ServiceAffinityExecutor(config.myLegalName.commonName, 1)
// TODO: We should have a dummy service hub rather than change behaviour in tests // TODO: We should have a dummy service hub rather than change behaviour in tests
val broker = ArtemisMessagingServer(config, address.port, rpcAddress.port, InMemoryNetworkMapCache(serviceHub = null), userService) val broker = ArtemisMessagingServer(config, address.port, rpcAddress.port,
MockNetworkMapCache(serviceHub = object : MockServiceHubInternal(database = database, configuration = config) {}), userService)
val networkMapRegistrationFuture = openFuture<Unit>() val networkMapRegistrationFuture = openFuture<Unit>()
val network = database.transaction { val network = database.transaction {
NodeMessagingClient( NodeMessagingClient(

View File

@ -1,6 +1,8 @@
package net.corda.demobench.model package net.corda.demobench.model
import com.typesafe.config.Config import com.typesafe.config.Config
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.utilities.parseNetworkHostAndPort import net.corda.core.utilities.parseNetworkHostAndPort
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
import tornadofx.* import tornadofx.*
@ -51,14 +53,14 @@ class InstallFactory : Controller() {
return port return port
} }
private fun Config.parseExtraServices(path: String): List<String> { private fun Config.parseExtraServices(path: String): MutableList<String> {
val services = serviceController.services.toSortedSet() val services = serviceController.services.toSortedSet() + ServiceInfo(ServiceType.networkMap).toString()
return this.getStringList(path) return this.getStringList(path)
.filter { !it.isNullOrEmpty() } .filter { !it.isNullOrEmpty() }
.map { svc -> .map { svc ->
require(svc in services, { "Unknown service '$svc'." }) require(svc in services, { "Unknown service '$svc'." })
svc svc
}.toList() }.toMutableList()
} }
} }

View File

@ -16,7 +16,7 @@ class NodeConfig(
val rpcPort: Int, val rpcPort: Int,
val webPort: Int, val webPort: Int,
val h2Port: Int, val h2Port: Int,
val extraServices: List<String>, val extraServices: MutableList<String> = mutableListOf(),
val users: List<User> = listOf(defaultUser), val users: List<User> = listOf(defaultUser),
var networkMap: NetworkMapConfig? = null var networkMap: NetworkMapConfig? = null
) : NetworkMapConfig(legalName, p2pPort), HasPlugins { ) : NetworkMapConfig(legalName, p2pPort), HasPlugins {

View File

@ -1,6 +1,8 @@
package net.corda.demobench.model package net.corda.demobench.model
import net.corda.core.crypto.getX509Name import net.corda.core.crypto.getX509Name
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.demobench.plugin.PluginController import net.corda.demobench.plugin.PluginController
import net.corda.demobench.pty.R3Pty import net.corda.demobench.pty.R3Pty
import tornadofx.* import tornadofx.*
@ -54,15 +56,15 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() {
baseDir, baseDir,
getX509Name( getX509Name(
myLegalName = nodeData.legalName.value.trim(), myLegalName = nodeData.legalName.value.trim(),
email = "corda@city.${location.countryCode.toLowerCase()}.example", email = "corda@city.${location.countryCode!!.toLowerCase()}.example",
nearestCity = location.description, nearestCity = location.description!!,
country = location.countryCode country = location.countryCode
), ),
nodeData.p2pPort.value, nodeData.p2pPort.value,
nodeData.rpcPort.value, nodeData.rpcPort.value,
nodeData.webPort.value, nodeData.webPort.value,
nodeData.h2Port.value, nodeData.h2Port.value,
nodeData.extraServices.value nodeData.extraServices.toMutableList()
) )
if (nodes.putIfAbsent(config.key, config) != null) { if (nodes.putIfAbsent(config.key, config) != null) {
@ -98,6 +100,7 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() {
if (hasNetworkMap()) { if (hasNetworkMap()) {
config.networkMap = networkMapConfig config.networkMap = networkMapConfig
} else { } else {
config.extraServices.add(ServiceInfo(ServiceType.networkMap).toString())
networkMapConfig = config networkMapConfig = config
log.info("Network map provided by: ${config.legalName}") log.info("Network map provided by: ${config.legalName}")
} }

View File

@ -220,7 +220,7 @@ class NodeTabView : Fragment() {
imageview { imageview {
image = flags.get()[it.countryCode] image = flags.get()[it.countryCode]
} }
label(it.description) label(it.description!!)
alignment = Pos.CENTER_LEFT alignment = Pos.CENTER_LEFT
} }
} }

View File

@ -82,7 +82,7 @@ class NodeConfigTest {
@Test @Test
fun `test services`() { fun `test services`() {
val config = createConfig(services = listOf("my.service")) val config = createConfig(services = mutableListOf("my.service"))
assertEquals(listOf("my.service"), config.extraServices) assertEquals(listOf("my.service"), config.extraServices)
} }
@ -107,13 +107,13 @@ class NodeConfigTest {
@Test @Test
fun `test cash issuer`() { fun `test cash issuer`() {
val config = createConfig(services = listOf("corda.issuer.GBP")) val config = createConfig(services = mutableListOf("corda.issuer.GBP"))
assertTrue(config.isCashIssuer) assertTrue(config.isCashIssuer)
} }
@Test @Test
fun `test not cash issuer`() { fun `test not cash issuer`() {
val config = createConfig(services = listOf("corda.issuerubbish")) val config = createConfig(services = mutableListOf("corda.issuerubbish"))
assertFalse(config.isCashIssuer) assertFalse(config.isCashIssuer)
} }
@ -138,7 +138,7 @@ class NodeConfigTest {
rpcPort = 40002, rpcPort = 40002,
webPort = 20001, webPort = 20001,
h2Port = 30001, h2Port = 30001,
services = listOf("my.service"), services = mutableListOf("my.service"),
users = listOf(user("jenny")) users = listOf(user("jenny"))
) )
assertEquals(prettyPrint("{" assertEquals(prettyPrint("{"
@ -164,7 +164,7 @@ class NodeConfigTest {
rpcPort = 40002, rpcPort = 40002,
webPort = 20001, webPort = 20001,
h2Port = 30001, h2Port = 30001,
services = listOf("my.service"), services = mutableListOf("my.service"),
users = listOf(user("jenny")) users = listOf(user("jenny"))
) )
config.networkMap = NetworkMapConfig(DUMMY_NOTARY.name, 12345) config.networkMap = NetworkMapConfig(DUMMY_NOTARY.name, 12345)
@ -193,7 +193,7 @@ class NodeConfigTest {
rpcPort = 40002, rpcPort = 40002,
webPort = 20001, webPort = 20001,
h2Port = 30001, h2Port = 30001,
services = listOf("my.service"), services = mutableListOf("my.service"),
users = listOf(user("jenny")) users = listOf(user("jenny"))
) )
config.networkMap = NetworkMapConfig(DUMMY_NOTARY.name, 12345) config.networkMap = NetworkMapConfig(DUMMY_NOTARY.name, 12345)
@ -223,7 +223,7 @@ class NodeConfigTest {
rpcPort = 40002, rpcPort = 40002,
webPort = 20001, webPort = 20001,
h2Port = 30001, h2Port = 30001,
services = listOf("my.service"), services = mutableListOf("my.service"),
users = listOf(user("jenny")) users = listOf(user("jenny"))
) )
config.networkMap = NetworkMapConfig(DUMMY_NOTARY.name, 12345) config.networkMap = NetworkMapConfig(DUMMY_NOTARY.name, 12345)
@ -257,7 +257,7 @@ class NodeConfigTest {
rpcPort: Int = -1, rpcPort: Int = -1,
webPort: Int = -1, webPort: Int = -1,
h2Port: Int = -1, h2Port: Int = -1,
services: List<String> = listOf("extra.service"), services: MutableList<String> = mutableListOf("extra.service"),
users: List<User> = listOf(user("guest")) users: List<User> = listOf(user("guest"))
) = NodeConfig( ) = NodeConfig(
baseDir, baseDir,

View File

@ -170,7 +170,7 @@ class NodeControllerTest {
rpcPort: Int = -1, rpcPort: Int = -1,
webPort: Int = -1, webPort: Int = -1,
h2Port: Int = -1, h2Port: Int = -1,
services: List<String> = listOf("extra.service"), services: MutableList<String> = mutableListOf("extra.service"),
users: List<User> = listOf(user("guest")) users: List<User> = listOf(user("guest"))
) = NodeConfig( ) = NodeConfig(
baseDir, baseDir,

View File

@ -27,10 +27,13 @@ import javafx.util.Duration
import net.corda.client.jfx.model.* import net.corda.client.jfx.model.*
import net.corda.client.jfx.utils.* import net.corda.client.jfx.utils.*
import net.corda.core.contracts.ContractState import net.corda.core.contracts.ContractState
import net.corda.core.crypto.locationOrNull
import net.corda.core.crypto.toBase58String import net.corda.core.crypto.toBase58String
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.node.CityDatabase
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.ScreenCoordinate import net.corda.core.node.ScreenCoordinate
import net.corda.core.node.WorldMapLocation
import net.corda.explorer.formatters.PartyNameFormatter import net.corda.explorer.formatters.PartyNameFormatter
import net.corda.explorer.model.CordaView import net.corda.explorer.model.CordaView
import tornadofx.* import tornadofx.*
@ -99,7 +102,7 @@ class Network : CordaView() {
copyableLabel(SimpleObjectProperty(node.legalIdentity.owningKey.toBase58String())).apply { minWidth = 400.0 } copyableLabel(SimpleObjectProperty(node.legalIdentity.owningKey.toBase58String())).apply { minWidth = 400.0 }
} }
row("Services :") { label(node.advertisedServices.map { it.info }.joinToString(", ")) } row("Services :") { label(node.advertisedServices.map { it.info }.joinToString(", ")) }
node.worldMapLocation?.apply { row("Location :") { label(this@apply.description) } } node.getWorldMapLocation()?.apply { row("Location :") { label(this@apply.description!!) } }
} }
} }
setOnMouseClicked { setOnMouseClicked {
@ -123,7 +126,7 @@ class Network : CordaView() {
contentDisplay = ContentDisplay.TOP contentDisplay = ContentDisplay.TOP
val coordinate = Bindings.createObjectBinding({ val coordinate = Bindings.createObjectBinding({
// These coordinates are obtained when we generate the map using TileMill. // These coordinates are obtained when we generate the map using TileMill.
node.worldMapLocation?.coordinate?.project(mapPane.width, mapPane.height, 85.0511, -85.0511, -180.0, 180.0) ?: ScreenCoordinate(0.0, 0.0) node.getWorldMapLocation()?.coordinate?.project(mapPane.width, mapPane.height, 85.0511, -85.0511, -180.0, 180.0) ?: ScreenCoordinate(0.0, 0.0)
}, arrayOf(mapPane.widthProperty(), mapPane.heightProperty())) }, arrayOf(mapPane.widthProperty(), mapPane.heightProperty()))
// Center point of the label. // Center point of the label.
layoutXProperty().bind(coordinate.map { it.screenX - width / 2 }) layoutXProperty().bind(coordinate.map { it.screenX - width / 2 })

View File

@ -185,7 +185,7 @@ fun runLoadTests(configuration: LoadTestConfiguration, tests: List<Pair<LoadTest
" ${it.legalIdentity.name}: ${it.legalIdentity.owningKey.toBase58String()}" " ${it.legalIdentity.name}: ${it.legalIdentity.owningKey.toBase58String()}"
}.joinToString("\n") }.joinToString("\n")
log.info("${connection.remoteNode.hostname} waiting for network map") log.info("${connection.remoteNode.hostname} waiting for network map")
connection.proxy.waitUntilRegisteredWithNetworkMap().get() connection.proxy.waitUntilNetworkReady().get()
log.info("${connection.remoteNode.hostname} sees\n$pubKeysString") log.info("${connection.remoteNode.hostname} sees\n$pubKeysString")
hostNodeMap.put(connection.remoteNode.hostname, connection) hostNodeMap.put(connection.remoteNode.hostname, connection)
} }